dont ever use cstring again
This commit is contained in:
531
dynamodb/storage.odin
Normal file
531
dynamodb/storage.odin
Normal file
@@ -0,0 +1,531 @@
|
||||
// Storage engine mapping DynamoDB operations to RocksDB
|
||||
package dynamodb
|
||||
|
||||
import "core:encoding/json"
|
||||
import "core:fmt"
|
||||
import "core:mem"
|
||||
import "core:slice"
|
||||
import "core:strings"
|
||||
import "core:sync"
|
||||
import "core:time"
|
||||
import "../key_codec"
|
||||
import "../item_codec"
|
||||
import "../rocksdb"
|
||||
|
||||
Storage_Error :: enum {
|
||||
None,
|
||||
Table_Not_Found,
|
||||
Table_Already_Exists,
|
||||
Item_Not_Found,
|
||||
Invalid_Key,
|
||||
Missing_Key_Attribute,
|
||||
Serialization_Error,
|
||||
RocksDB_Error,
|
||||
Out_Of_Memory,
|
||||
}
|
||||
|
||||
// Result type for Scan operations with pagination
|
||||
Scan_Result :: struct {
|
||||
items: []Item,
|
||||
last_evaluated_key: Maybe([]byte),
|
||||
}
|
||||
|
||||
scan_result_destroy :: proc(result: ^Scan_Result) {
|
||||
for item in result.items {
|
||||
item_copy := item
|
||||
item_destroy(&item_copy)
|
||||
}
|
||||
delete(result.items)
|
||||
|
||||
if last_key, ok := result.last_evaluated_key.?; ok {
|
||||
delete(last_key)
|
||||
}
|
||||
}
|
||||
|
||||
// Result type for Query operations with pagination
|
||||
Query_Result :: struct {
|
||||
items: []Item,
|
||||
last_evaluated_key: Maybe([]byte),
|
||||
}
|
||||
|
||||
query_result_destroy :: proc(result: ^Query_Result) {
|
||||
for item in result.items {
|
||||
item_copy := item
|
||||
item_destroy(&item_copy)
|
||||
}
|
||||
delete(result.items)
|
||||
|
||||
if last_key, ok := result.last_evaluated_key.?; ok {
|
||||
delete(last_key)
|
||||
}
|
||||
}
|
||||
|
||||
// In-memory representation of table metadata
|
||||
Table_Metadata :: struct {
|
||||
table_name: string,
|
||||
key_schema: []Key_Schema_Element,
|
||||
attribute_definitions: []Attribute_Definition,
|
||||
table_status: Table_Status,
|
||||
creation_date_time: i64,
|
||||
global_secondary_indexes: Maybe([]Global_Secondary_Index),
|
||||
local_secondary_indexes: Maybe([]Local_Secondary_Index),
|
||||
}
|
||||
|
||||
table_metadata_destroy :: proc(metadata: ^Table_Metadata, allocator: mem.Allocator) {
|
||||
delete(metadata.table_name, allocator)
|
||||
|
||||
for ks in metadata.key_schema {
|
||||
delete(ks.attribute_name, allocator)
|
||||
}
|
||||
delete(metadata.key_schema, allocator)
|
||||
|
||||
for ad in metadata.attribute_definitions {
|
||||
delete(ad.attribute_name, allocator)
|
||||
}
|
||||
delete(metadata.attribute_definitions, allocator)
|
||||
|
||||
// TODO: Free GSI/LSI if we implement them
|
||||
}
|
||||
|
||||
// Get the partition key attribute name
|
||||
table_metadata_get_partition_key_name :: proc(metadata: ^Table_Metadata) -> Maybe(string) {
|
||||
for ks in metadata.key_schema {
|
||||
if ks.key_type == .HASH {
|
||||
return ks.attribute_name
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the sort key attribute name (if any)
|
||||
table_metadata_get_sort_key_name :: proc(metadata: ^Table_Metadata) -> Maybe(string) {
|
||||
for ks in metadata.key_schema {
|
||||
if ks.key_type == .RANGE {
|
||||
return ks.attribute_name
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Storage engine
|
||||
Storage_Engine :: struct {
|
||||
db: rocksdb.DB,
|
||||
allocator: mem.Allocator,
|
||||
table_locks: map[string]^sync.RW_Mutex,
|
||||
table_locks_mutex: sync.Mutex,
|
||||
}
|
||||
|
||||
storage_engine_init :: proc(allocator: mem.Allocator, data_dir: string) -> (^Storage_Engine, Storage_Error) {
|
||||
db, db_err := rocksdb.db_open(data_dir, true)
|
||||
if db_err != .None {
|
||||
return nil, .RocksDB_Error
|
||||
}
|
||||
|
||||
engine := new(Storage_Engine, allocator)
|
||||
engine.db = db
|
||||
engine.allocator = allocator
|
||||
engine.table_locks = make(map[string]^sync.RW_Mutex, allocator = allocator)
|
||||
|
||||
return engine, .None
|
||||
}
|
||||
|
||||
storage_engine_destroy :: proc(engine: ^Storage_Engine) {
|
||||
// Free all table locks
|
||||
for key, lock in engine.table_locks {
|
||||
delete(key, engine.allocator)
|
||||
free(lock, engine.allocator)
|
||||
}
|
||||
delete(engine.table_locks)
|
||||
|
||||
// Close database
|
||||
rocksdb.db_close(&engine.db)
|
||||
|
||||
// Free engine
|
||||
free(engine, engine.allocator)
|
||||
}
|
||||
|
||||
// Get or create a table lock
|
||||
get_or_create_table_lock :: proc(engine: ^Storage_Engine, table_name: string) -> ^sync.RW_Mutex {
|
||||
sync.mutex_lock(&engine.table_locks_mutex)
|
||||
defer sync.mutex_unlock(&engine.table_locks_mutex)
|
||||
|
||||
if lock, found := engine.table_locks[table_name]; found {
|
||||
return lock
|
||||
}
|
||||
|
||||
lock := new(sync.RW_Mutex, engine.allocator)
|
||||
owned_name := strings.clone(table_name, engine.allocator)
|
||||
engine.table_locks[owned_name] = lock
|
||||
return lock
|
||||
}
|
||||
|
||||
// Remove a table lock
|
||||
remove_table_lock :: proc(engine: ^Storage_Engine, table_name: string) {
|
||||
sync.mutex_lock(&engine.table_locks_mutex)
|
||||
defer sync.mutex_unlock(&engine.table_locks_mutex)
|
||||
|
||||
if lock, found := engine.table_locks[table_name]; found {
|
||||
delete(table_name, engine.allocator)
|
||||
free(lock, engine.allocator)
|
||||
delete_key(&engine.table_locks, table_name)
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Table Metadata Operations
|
||||
// ============================================================================
|
||||
|
||||
// Serialize table metadata to binary format
|
||||
serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) {
|
||||
// Create a temporary item to hold metadata
|
||||
meta_item := make(Item, context.temp_allocator)
|
||||
defer delete(meta_item)
|
||||
|
||||
// Encode key schema as JSON string
|
||||
ks_builder := strings.builder_make(context.temp_allocator)
|
||||
defer strings.builder_destroy(&ks_builder)
|
||||
|
||||
strings.write_string(&ks_builder, "[")
|
||||
for ks, i in metadata.key_schema {
|
||||
if i > 0 {
|
||||
strings.write_string(&ks_builder, ",")
|
||||
}
|
||||
fmt.sbprintf(&ks_builder, `{"AttributeName":"%s","KeyType":"%s"}`,
|
||||
ks.attribute_name, key_type_to_string(ks.key_type))
|
||||
}
|
||||
strings.write_string(&ks_builder, "]")
|
||||
|
||||
meta_item["KeySchema"] = String(strings.clone(strings.to_string(ks_builder)))
|
||||
|
||||
// Encode attribute definitions as JSON string
|
||||
ad_builder := strings.builder_make(context.temp_allocator)
|
||||
defer strings.builder_destroy(&ad_builder)
|
||||
|
||||
strings.write_string(&ad_builder, "[")
|
||||
for ad, i in metadata.attribute_definitions {
|
||||
if i > 0 {
|
||||
strings.write_string(&ad_builder, ",")
|
||||
}
|
||||
fmt.sbprintf(&ad_builder, `{"AttributeName":"%s","AttributeType":"%s"}`,
|
||||
ad.attribute_name, scalar_type_to_string(ad.attribute_type))
|
||||
}
|
||||
strings.write_string(&ad_builder, "]")
|
||||
|
||||
meta_item["AttributeDefinitions"] = String(strings.clone(strings.to_string(ad_builder)))
|
||||
|
||||
// Add other metadata
|
||||
meta_item["TableStatus"] = String(strings.clone(table_status_to_string(metadata.table_status)))
|
||||
meta_item["CreationDateTime"] = Number(fmt.aprint(metadata.creation_date_time))
|
||||
|
||||
// Encode to binary
|
||||
return item_codec.encode(meta_item)
|
||||
}
|
||||
|
||||
// Deserialize table metadata from binary format
|
||||
deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (Table_Metadata, bool) {
|
||||
meta_item, ok := item_codec.decode(data)
|
||||
if !ok {
|
||||
return {}, false
|
||||
}
|
||||
defer item_destroy(&meta_item)
|
||||
|
||||
metadata: Table_Metadata
|
||||
|
||||
// TODO: Parse KeySchema and AttributeDefinitions from JSON strings
|
||||
// For now, return empty - this will be implemented when needed
|
||||
|
||||
return metadata, true
|
||||
}
|
||||
|
||||
// Get table metadata
|
||||
get_table_metadata :: proc(engine: ^Storage_Engine, table_name: string) -> (Table_Metadata, Storage_Error) {
|
||||
meta_key := key_codec.build_meta_key(table_name)
|
||||
defer delete(meta_key)
|
||||
|
||||
value, get_err := rocksdb.db_get(&engine.db, meta_key)
|
||||
if get_err != .None {
|
||||
if get_err == .NotFound {
|
||||
return {}, .Table_Not_Found
|
||||
}
|
||||
return {}, .RocksDB_Error
|
||||
}
|
||||
defer delete(value)
|
||||
|
||||
metadata, ok := deserialize_table_metadata(value, engine.allocator)
|
||||
if !ok {
|
||||
return {}, .Serialization_Error
|
||||
}
|
||||
|
||||
return metadata, .None
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Table Operations
|
||||
// ============================================================================
|
||||
|
||||
// Create table
|
||||
create_table :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
table_name: string,
|
||||
key_schema: []Key_Schema_Element,
|
||||
attribute_definitions: []Attribute_Definition,
|
||||
) -> (Table_Description, Storage_Error) {
|
||||
table_lock := get_or_create_table_lock(engine, table_name)
|
||||
sync.rw_mutex_lock(table_lock)
|
||||
defer sync.rw_mutex_unlock(table_lock)
|
||||
|
||||
// Check if table already exists
|
||||
meta_key := key_codec.build_meta_key(table_name)
|
||||
defer delete(meta_key)
|
||||
|
||||
existing, get_err := rocksdb.db_get(&engine.db, meta_key)
|
||||
if get_err == .None && len(existing) > 0 {
|
||||
delete(existing)
|
||||
return {}, .Table_Already_Exists
|
||||
}
|
||||
if get_err == .None {
|
||||
delete(existing)
|
||||
}
|
||||
|
||||
// Create metadata
|
||||
now := time.now()._nsec / 1_000_000_000
|
||||
|
||||
metadata := Table_Metadata{
|
||||
table_name = strings.clone(table_name, engine.allocator),
|
||||
key_schema = slice.clone(key_schema, engine.allocator),
|
||||
attribute_definitions = slice.clone(attribute_definitions, engine.allocator),
|
||||
table_status = .ACTIVE,
|
||||
creation_date_time = now,
|
||||
}
|
||||
|
||||
// Deep copy key schema and attr defs
|
||||
for &ks in metadata.key_schema {
|
||||
ks.attribute_name = strings.clone(ks.attribute_name, engine.allocator)
|
||||
}
|
||||
for &ad in metadata.attribute_definitions {
|
||||
ad.attribute_name = strings.clone(ad.attribute_name, engine.allocator)
|
||||
}
|
||||
|
||||
// Serialize and store
|
||||
meta_value, serialize_ok := serialize_table_metadata(&metadata)
|
||||
if !serialize_ok {
|
||||
table_metadata_destroy(&metadata, engine.allocator)
|
||||
return {}, .Serialization_Error
|
||||
}
|
||||
defer delete(meta_value)
|
||||
|
||||
put_err := rocksdb.db_put(&engine.db, meta_key, meta_value)
|
||||
if put_err != .None {
|
||||
return {}, .RocksDB_Error
|
||||
}
|
||||
|
||||
// Return description
|
||||
desc := Table_Description{
|
||||
table_name = table_name,
|
||||
key_schema = key_schema,
|
||||
attribute_definitions = attribute_definitions,
|
||||
table_status = .ACTIVE,
|
||||
creation_date_time = now,
|
||||
item_count = 0,
|
||||
table_size_bytes = 0,
|
||||
}
|
||||
|
||||
return desc, .None
|
||||
}
|
||||
|
||||
// Delete table
|
||||
delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Error {
|
||||
table_lock := get_or_create_table_lock(engine, table_name)
|
||||
sync.rw_mutex_lock(table_lock)
|
||||
defer sync.rw_mutex_unlock(table_lock)
|
||||
|
||||
// Check table exists
|
||||
meta_key := key_codec.build_meta_key(table_name)
|
||||
defer delete(meta_key)
|
||||
|
||||
existing, get_err := rocksdb.db_get(&engine.db, meta_key)
|
||||
if get_err == .NotFound {
|
||||
return .Table_Not_Found
|
||||
}
|
||||
if get_err != .None {
|
||||
return .RocksDB_Error
|
||||
}
|
||||
delete(existing)
|
||||
|
||||
// Delete metadata
|
||||
del_err := rocksdb.db_delete(&engine.db, meta_key)
|
||||
if del_err != .None {
|
||||
return .RocksDB_Error
|
||||
}
|
||||
|
||||
// TODO: Delete all items in table using iterator
|
||||
// For now, just delete metadata
|
||||
|
||||
remove_table_lock(engine, table_name)
|
||||
return .None
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Item Operations
|
||||
// ============================================================================
|
||||
|
||||
// Put item
|
||||
put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Storage_Error {
|
||||
table_lock := get_or_create_table_lock(engine, table_name)
|
||||
sync.rw_mutex_shared_lock(table_lock)
|
||||
defer sync.rw_mutex_shared_unlock(table_lock)
|
||||
|
||||
// Get table metadata
|
||||
metadata, meta_err := get_table_metadata(engine, table_name)
|
||||
if meta_err != .None {
|
||||
return meta_err
|
||||
}
|
||||
defer table_metadata_destroy(&metadata, engine.allocator)
|
||||
|
||||
// Extract key from item
|
||||
key, key_ok := key_from_item(item, metadata.key_schema)
|
||||
if !key_ok {
|
||||
return .Missing_Key_Attribute
|
||||
}
|
||||
defer {
|
||||
pk := key.pk
|
||||
attr_value_destroy(&pk)
|
||||
if sk, ok := key.sk.?; ok {
|
||||
sk_copy := sk
|
||||
attr_value_destroy(&sk_copy)
|
||||
}
|
||||
}
|
||||
|
||||
// Get key values
|
||||
key_values, kv_ok := key_get_values(&key)
|
||||
if !kv_ok {
|
||||
return .Invalid_Key
|
||||
}
|
||||
|
||||
// Build storage key
|
||||
storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk)
|
||||
defer delete(storage_key)
|
||||
|
||||
// Encode item
|
||||
encoded_item, encode_ok := item_codec.encode(item)
|
||||
if !encode_ok {
|
||||
return .Serialization_Error
|
||||
}
|
||||
defer delete(encoded_item)
|
||||
|
||||
// Store in RocksDB
|
||||
put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item)
|
||||
if put_err != .None {
|
||||
return .RocksDB_Error
|
||||
}
|
||||
|
||||
return .None
|
||||
}
|
||||
|
||||
// Get item
|
||||
get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (Maybe(Item), Storage_Error) {
|
||||
table_lock := get_or_create_table_lock(engine, table_name)
|
||||
sync.rw_mutex_shared_lock(table_lock)
|
||||
defer sync.rw_mutex_shared_unlock(table_lock)
|
||||
|
||||
// Get table metadata
|
||||
metadata, meta_err := get_table_metadata(engine, table_name)
|
||||
if meta_err != .None {
|
||||
return nil, meta_err
|
||||
}
|
||||
defer table_metadata_destroy(&metadata, engine.allocator)
|
||||
|
||||
// Extract key
|
||||
key_struct, key_ok := key_from_item(key, metadata.key_schema)
|
||||
if !key_ok {
|
||||
return nil, .Missing_Key_Attribute
|
||||
}
|
||||
defer {
|
||||
pk := key_struct.pk
|
||||
attr_value_destroy(&pk)
|
||||
if sk, ok := key_struct.sk.?; ok {
|
||||
sk_copy := sk
|
||||
attr_value_destroy(&sk_copy)
|
||||
}
|
||||
}
|
||||
|
||||
// Get key values
|
||||
key_values, kv_ok := key_get_values(&key_struct)
|
||||
if !kv_ok {
|
||||
return nil, .Invalid_Key
|
||||
}
|
||||
|
||||
// Build storage key
|
||||
storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk)
|
||||
defer delete(storage_key)
|
||||
|
||||
// Get from RocksDB
|
||||
value, get_err := rocksdb.db_get(&engine.db, storage_key)
|
||||
if get_err == .NotFound {
|
||||
return nil, .None
|
||||
}
|
||||
if get_err != .None {
|
||||
return nil, .RocksDB_Error
|
||||
}
|
||||
defer delete(value)
|
||||
|
||||
// Decode item
|
||||
item, decode_ok := item_codec.decode(value)
|
||||
if !decode_ok {
|
||||
return nil, .Serialization_Error
|
||||
}
|
||||
|
||||
return item, .None
|
||||
}
|
||||
|
||||
// Delete item
|
||||
delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> Storage_Error {
|
||||
table_lock := get_or_create_table_lock(engine, table_name)
|
||||
sync.rw_mutex_shared_lock(table_lock)
|
||||
defer sync.rw_mutex_shared_unlock(table_lock)
|
||||
|
||||
// Get table metadata
|
||||
metadata, meta_err := get_table_metadata(engine, table_name)
|
||||
if meta_err != .None {
|
||||
return meta_err
|
||||
}
|
||||
defer table_metadata_destroy(&metadata, engine.allocator)
|
||||
|
||||
// Extract key
|
||||
key_struct, key_ok := key_from_item(key, metadata.key_schema)
|
||||
if !key_ok {
|
||||
return .Missing_Key_Attribute
|
||||
}
|
||||
defer {
|
||||
pk := key_struct.pk
|
||||
attr_value_destroy(&pk)
|
||||
if sk, ok := key_struct.sk.?; ok {
|
||||
sk_copy := sk
|
||||
attr_value_destroy(&sk_copy)
|
||||
}
|
||||
}
|
||||
|
||||
// Get key values
|
||||
key_values, kv_ok := key_get_values(&key_struct)
|
||||
if !kv_ok {
|
||||
return .Invalid_Key
|
||||
}
|
||||
|
||||
// Build storage key
|
||||
storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk)
|
||||
defer delete(storage_key)
|
||||
|
||||
// Delete from RocksDB
|
||||
del_err := rocksdb.db_delete(&engine.db, storage_key)
|
||||
if del_err != .None {
|
||||
return .RocksDB_Error
|
||||
}
|
||||
|
||||
return .None
|
||||
}
|
||||
|
||||
// List tables (simplified - returns empty list for now)
|
||||
list_tables :: proc(engine: ^Storage_Engine) -> []string {
|
||||
// TODO: Implement by iterating over meta keys
|
||||
return {}
|
||||
}
|
||||
Reference in New Issue
Block a user