Files
jormun-db/dynamodb/storage.odin
2026-02-16 02:15:15 -05:00

1324 lines
34 KiB
Odin

// Storage engine mapping DynamoDB operations to RocksDB
package dynamodb
import "core:c"
import "core:encoding/json"
import "core:fmt"
import "core:mem"
import "core:slice"
import "core:strconv"
import "core:strings"
import "core:sync"
import "core:time"
import "../rocksdb"
Storage_Error :: enum {
None,
Table_Not_Found,
Table_Already_Exists,
Item_Not_Found,
Invalid_Key,
Missing_Key_Attribute,
Serialization_Error,
RocksDB_Error,
Out_Of_Memory,
}
// Result type for Scan operations with pagination
Scan_Result :: struct {
items: []Item,
last_evaluated_key: Maybe([]byte),
}
scan_result_destroy :: proc(result: ^Scan_Result) {
for item in result.items {
item_copy := item
item_destroy(&item_copy)
}
delete(result.items)
if last_key, ok := result.last_evaluated_key.?; ok {
delete(last_key)
}
}
// Result type for Query operations with pagination
Query_Result :: struct {
items: []Item,
last_evaluated_key: Maybe([]byte),
}
query_result_destroy :: proc(result: ^Query_Result) {
for item in result.items {
item_copy := item
item_destroy(&item_copy)
}
delete(result.items)
if last_key, ok := result.last_evaluated_key.?; ok {
delete(last_key)
}
}
// In-memory representation of table metadata
Table_Metadata :: struct {
table_name: string,
key_schema: []Key_Schema_Element,
attribute_definitions: []Attribute_Definition,
table_status: Table_Status,
creation_date_time: i64,
global_secondary_indexes: Maybe([]Global_Secondary_Index),
local_secondary_indexes: Maybe([]Local_Secondary_Index),
}
table_metadata_destroy :: proc(metadata: ^Table_Metadata, allocator: mem.Allocator) {
delete(metadata.table_name, allocator)
for ks in metadata.key_schema {
delete(ks.attribute_name, allocator)
}
delete(metadata.key_schema, allocator)
for ad in metadata.attribute_definitions {
delete(ad.attribute_name, allocator)
}
delete(metadata.attribute_definitions, allocator)
// Free GSI definitions
if gsis, has_gsis := metadata.global_secondary_indexes.?; has_gsis {
for gsi in gsis {
delete(gsi.index_name, allocator)
for ks in gsi.key_schema {
delete(ks.attribute_name, allocator)
}
delete(gsi.key_schema, allocator)
if nka, has_nka := gsi.projection.non_key_attributes.?; has_nka {
for attr in nka {
delete(attr, allocator)
}
delete(nka, allocator)
}
}
delete(gsis, allocator)
}
}
// Get the partition key attribute name
table_metadata_get_partition_key_name :: proc(metadata: ^Table_Metadata) -> Maybe(string) {
for ks in metadata.key_schema {
if ks.key_type == .HASH {
return ks.attribute_name
}
}
return nil
}
// Get the attribute type for a given attribute name
table_metadata_get_attribute_type :: proc(metadata: ^Table_Metadata, attr_name: string) -> Maybe(Scalar_Attribute_Type) {
for ad in metadata.attribute_definitions {
if ad.attribute_name == attr_name {
return ad.attribute_type
}
}
return nil
}
// Get the sort key attribute name (if any)
table_metadata_get_sort_key_name :: proc(metadata: ^Table_Metadata) -> Maybe(string) {
for ks in metadata.key_schema {
if ks.key_type == .RANGE {
return ks.attribute_name
}
}
return nil
}
// Storage engine
Storage_Engine :: struct {
db: rocksdb.DB,
allocator: mem.Allocator,
table_locks: map[string]^sync.RW_Mutex,
table_locks_mutex: sync.Mutex,
}
storage_engine_init :: proc(allocator: mem.Allocator, data_dir: string) -> (^Storage_Engine, Storage_Error) {
db, db_err := rocksdb.db_open(data_dir, true)
if db_err != .None {
return nil, .RocksDB_Error
}
engine := new(Storage_Engine, allocator)
engine.db = db
engine.allocator = allocator
engine.table_locks = make(map[string]^sync.RW_Mutex, allocator = allocator)
return engine, .None
}
storage_engine_destroy :: proc(engine: ^Storage_Engine) {
// Free all table locks
for key, lock in engine.table_locks {
delete(key, engine.allocator)
free(lock, engine.allocator)
}
delete(engine.table_locks)
// Close database
rocksdb.db_close(&engine.db)
// Free engine
free(engine, engine.allocator)
}
// Get or create a table lock
get_or_create_table_lock :: proc(engine: ^Storage_Engine, table_name: string) -> ^sync.RW_Mutex {
sync.mutex_lock(&engine.table_locks_mutex)
defer sync.mutex_unlock(&engine.table_locks_mutex)
if lock, found := engine.table_locks[table_name]; found {
return lock
}
lock := new(sync.RW_Mutex, engine.allocator)
owned_name := strings.clone(table_name, engine.allocator)
engine.table_locks[owned_name] = lock
return lock
}
// Remove a table lock
remove_table_lock :: proc(engine: ^Storage_Engine, table_name: string) {
sync.mutex_lock(&engine.table_locks_mutex)
defer sync.mutex_unlock(&engine.table_locks_mutex)
if lock, found := engine.table_locks[table_name]; found {
delete(table_name, engine.allocator)
free(lock, engine.allocator)
delete_key(&engine.table_locks, table_name)
}
}
// ============================================================================
// Table Metadata Operations
// ============================================================================
// Serialize table metadata to binary format
serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) {
meta_item := make(Item, context.temp_allocator)
defer delete(meta_item)
// Encode key schema as JSON string
ks_builder := strings.builder_make(context.temp_allocator)
defer strings.builder_destroy(&ks_builder)
strings.write_string(&ks_builder, "[")
for ks, i in metadata.key_schema {
if i > 0 {
strings.write_string(&ks_builder, ",")
}
fmt.sbprintf(&ks_builder, `{{"AttributeName":"%s","KeyType":"%s"}}`,
ks.attribute_name, key_type_to_string(ks.key_type))
}
strings.write_string(&ks_builder, "]")
meta_item["KeySchema"] = String(strings.clone(strings.to_string(ks_builder)))
// Encode attribute definitions as JSON string
ad_builder := strings.builder_make(context.temp_allocator)
defer strings.builder_destroy(&ad_builder)
strings.write_string(&ad_builder, "[")
for ad, i in metadata.attribute_definitions {
if i > 0 {
strings.write_string(&ad_builder, ",")
}
fmt.sbprintf(&ad_builder, `{{"AttributeName":"%s","AttributeType":"%s"}}`,
ad.attribute_name, scalar_type_to_string(ad.attribute_type))
}
strings.write_string(&ad_builder, "]")
meta_item["AttributeDefinitions"] = String(strings.clone(strings.to_string(ad_builder)))
// Add other metadata
meta_item["TableStatus"] = String(strings.clone(table_status_to_string(metadata.table_status)))
meta_item["CreationDateTime"] = Number(fmt.aprint(metadata.creation_date_time))
// Encode GSI definitions as JSON string
if gsis, has_gsis := metadata.global_secondary_indexes.?; has_gsis && len(gsis) > 0 {
gsi_builder := strings.builder_make(context.temp_allocator)
defer strings.builder_destroy(&gsi_builder)
strings.write_string(&gsi_builder, "[")
for gsi, i in gsis {
if i > 0 {
strings.write_string(&gsi_builder, ",")
}
fmt.sbprintf(&gsi_builder, `{{"IndexName":"%s","KeySchema":[`, gsi.index_name)
for ks, j in gsi.key_schema {
if j > 0 {
strings.write_string(&gsi_builder, ",")
}
fmt.sbprintf(&gsi_builder, `{{"AttributeName":"%s","KeyType":"%s"}}`,
ks.attribute_name, key_type_to_string(ks.key_type))
}
strings.write_string(&gsi_builder, `],"Projection":{{"ProjectionType":"`)
switch gsi.projection.projection_type {
case .ALL: strings.write_string(&gsi_builder, "ALL")
case .KEYS_ONLY: strings.write_string(&gsi_builder, "KEYS_ONLY")
case .INCLUDE: strings.write_string(&gsi_builder, "INCLUDE")
}
strings.write_string(&gsi_builder, `"`)
if nka, has_nka := gsi.projection.non_key_attributes.?; has_nka && len(nka) > 0 {
strings.write_string(&gsi_builder, `,"NonKeyAttributes":[`)
for attr, k in nka {
if k > 0 {
strings.write_string(&gsi_builder, ",")
}
fmt.sbprintf(&gsi_builder, `"%s"`, attr)
}
strings.write_string(&gsi_builder, "]")
}
strings.write_string(&gsi_builder, "}}")
}
strings.write_string(&gsi_builder, "]")
meta_item["GlobalSecondaryIndexes"] = String(strings.clone(strings.to_string(gsi_builder)))
}
// Encode to binary
return encode(meta_item)
}
// Deserialize table metadata from binary format
deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (Table_Metadata, bool) {
meta_item, ok := decode(data)
if !ok {
return {}, false
}
defer item_destroy(&meta_item)
metadata: Table_Metadata
// Parse table status
if status_val, found := meta_item["TableStatus"]; found {
#partial switch v in status_val {
case String:
metadata.table_status = table_status_from_string(string(v))
}
} else {
metadata.table_status = .ACTIVE
}
// Parse creation date time
if time_val, found := meta_item["CreationDateTime"]; found {
#partial switch v in time_val {
case Number:
val, parse_ok := strconv.parse_i64(string(v))
metadata.creation_date_time = val if parse_ok else 0
}
}
// Parse KeySchema from embedded JSON string
if ks_val, found := meta_item["KeySchema"]; found {
#partial switch v in ks_val {
case String:
ks, ks_ok := parse_key_schema_json(string(v), allocator)
if ks_ok {
metadata.key_schema = ks
}
}
}
// Parse AttributeDefinitions from embedded JSON string
if ad_val, found := meta_item["AttributeDefinitions"]; found {
#partial switch v in ad_val {
case String:
ad, ad_ok := parse_attr_defs_json(string(v), allocator)
if ad_ok {
metadata.attribute_definitions = ad
}
}
}
// Parse GlobalSecondaryIndexes from embedded JSON string
if gsi_val, gsi_found := meta_item["GlobalSecondaryIndexes"]; gsi_found {
#partial switch v in gsi_val {
case String:
gsis, gsi_ok := parse_gsis_json(string(v), allocator)
if gsi_ok && len(gsis) > 0 {
metadata.global_secondary_indexes = gsis
}
}
}
return metadata, true
}
// Parse key schema from JSON string like [{"AttributeName":"id","KeyType":"HASH"}]
parse_key_schema_json :: proc(json_str: string, allocator: mem.Allocator) -> ([]Key_Schema_Element, bool) {
data, parse_err := json.parse(transmute([]byte)json_str, allocator = context.temp_allocator)
if parse_err != nil {
return nil, false
}
defer json.destroy_value(data)
arr, ok := data.(json.Array)
if !ok {
return nil, false
}
result := make([]Key_Schema_Element, len(arr), allocator)
for elem, i in arr {
obj, obj_ok := elem.(json.Object)
if !obj_ok {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_name_val, name_found := obj["AttributeName"]
if !name_found {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_name, name_ok := attr_name_val.(json.String)
if !name_ok {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
key_type_val, type_found := obj["KeyType"]
if !type_found {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
key_type_str, type_ok := key_type_val.(json.String)
if !type_ok {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
kt, kt_ok := key_type_from_string(string(key_type_str))
if !kt_ok {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
result[i] = Key_Schema_Element{
attribute_name = strings.clone(string(attr_name), allocator),
key_type = kt,
}
}
return result, true
}
cleanup_key_schema :: proc(elems: []Key_Schema_Element, allocator: mem.Allocator) {
for ks in elems {
delete(ks.attribute_name, allocator)
}
}
// Parse attribute definitions from JSON string
parse_attr_defs_json :: proc(json_str: string, allocator: mem.Allocator) -> ([]Attribute_Definition, bool) {
data, parse_err := json.parse(transmute([]byte)json_str, allocator = context.temp_allocator)
if parse_err != nil {
return nil, false
}
defer json.destroy_value(data)
arr, ok := data.(json.Array)
if !ok {
return nil, false
}
result := make([]Attribute_Definition, len(arr), allocator)
for elem, i in arr {
obj, obj_ok := elem.(json.Object)
if !obj_ok {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_name_val, name_found := obj["AttributeName"]
if !name_found {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_name, name_ok := attr_name_val.(json.String)
if !name_ok {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_type_val, type_found := obj["AttributeType"]
if !type_found {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_type_str, type_ok := attr_type_val.(json.String)
if !type_ok {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
at, at_ok := scalar_type_from_string(string(attr_type_str))
if !at_ok {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
result[i] = Attribute_Definition{
attribute_name = strings.clone(string(attr_name), allocator),
attribute_type = at,
}
}
return result, true
}
cleanup_attr_defs :: proc(elems: []Attribute_Definition, allocator: mem.Allocator) {
for ad in elems {
delete(ad.attribute_name, allocator)
}
}
// Get table metadata
get_table_metadata :: proc(engine: ^Storage_Engine, table_name: string) -> (Table_Metadata, Storage_Error) {
meta_key := build_meta_key(table_name)
defer delete(meta_key)
value, get_err := rocksdb.db_get(&engine.db, meta_key)
if get_err != .None {
if get_err == .NotFound {
return {}, .Table_Not_Found
}
return {}, .RocksDB_Error
}
defer delete(value)
metadata, ok := deserialize_table_metadata(value, engine.allocator)
if !ok {
return {}, .Serialization_Error
}
return metadata, .None
}
// ============================================================================
// Table Operations
// ============================================================================
// Create table
create_table :: proc(
engine: ^Storage_Engine,
table_name: string,
key_schema: []Key_Schema_Element,
attribute_definitions: []Attribute_Definition,
gsis: Maybe([]Global_Secondary_Index) = nil,
) -> (Table_Description, Storage_Error) {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_lock(table_lock)
defer sync.rw_mutex_unlock(table_lock)
// Check if table already exists
meta_key := build_meta_key(table_name)
defer delete(meta_key)
existing, get_err := rocksdb.db_get(&engine.db, meta_key)
if get_err == .None && len(existing) > 0 {
delete(existing)
return {}, .Table_Already_Exists
}
if get_err == .None {
delete(existing)
}
// Create metadata
now := time.now()._nsec / 1_000_000_000
metadata := Table_Metadata{
table_name = strings.clone(table_name, engine.allocator),
key_schema = slice.clone(key_schema, engine.allocator),
attribute_definitions = slice.clone(attribute_definitions, engine.allocator),
table_status = .ACTIVE,
creation_date_time = now,
}
// Deep copy key schema and attr defs
for &ks in metadata.key_schema {
ks.attribute_name = strings.clone(ks.attribute_name, engine.allocator)
}
for &ad in metadata.attribute_definitions {
ad.attribute_name = strings.clone(ad.attribute_name, engine.allocator)
}
// Deep copy GSI definitions into engine allocator
if gsi_defs, has_gsis := gsis.?; has_gsis && len(gsi_defs) > 0 {
owned_gsis := make([]Global_Secondary_Index, len(gsi_defs), engine.allocator)
for gsi_def, i in gsi_defs {
owned_gsis[i] = Global_Secondary_Index{
index_name = strings.clone(gsi_def.index_name, engine.allocator),
key_schema = make([]Key_Schema_Element, len(gsi_def.key_schema), engine.allocator),
projection = Projection{
projection_type = gsi_def.projection.projection_type,
},
}
for ks, j in gsi_def.key_schema {
owned_gsis[i].key_schema[j] = Key_Schema_Element{
attribute_name = strings.clone(ks.attribute_name, engine.allocator),
key_type = ks.key_type,
}
}
if nka, has_nka := gsi_def.projection.non_key_attributes.?; has_nka {
owned_nka := make([]string, len(nka), engine.allocator)
for attr, k in nka {
owned_nka[k] = strings.clone(attr, engine.allocator)
}
owned_gsis[i].projection.non_key_attributes = owned_nka
}
}
metadata.global_secondary_indexes = owned_gsis
}
// Serialize and store
meta_value, serialize_ok := serialize_table_metadata(&metadata)
if !serialize_ok {
table_metadata_destroy(&metadata, engine.allocator)
return {}, .Serialization_Error
}
defer delete(meta_value)
put_err := rocksdb.db_put(&engine.db, meta_key, meta_value)
if put_err != .None {
return {}, .RocksDB_Error
}
// Return description
desc := Table_Description{
table_name = table_name,
key_schema = key_schema,
attribute_definitions = attribute_definitions,
table_status = .ACTIVE,
creation_date_time = now,
item_count = 0,
table_size_bytes = 0,
global_secondary_indexes = gsis,
}
return desc, .None
}
// Delete table — removes metadata AND all items with the table's data prefix
delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Error {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_lock(table_lock)
defer sync.rw_mutex_unlock(table_lock)
// Check table exists
meta_key := build_meta_key(table_name)
defer delete(meta_key)
existing, get_err := rocksdb.db_get(&engine.db, meta_key)
if get_err == .NotFound {
return .Table_Not_Found
}
if get_err != .None {
return .RocksDB_Error
}
delete(existing)
// Delete all data items using a prefix scan
table_prefix := build_table_prefix(table_name)
defer delete(table_prefix)
iter := rocksdb.rocksdb_create_iterator(engine.db.handle, engine.db.read_options)
if iter != nil {
defer rocksdb.rocksdb_iter_destroy(iter)
rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix)))
for rocksdb.rocksdb_iter_valid(iter) != 0 {
key_len: c.size_t
key_ptr := rocksdb.rocksdb_iter_key(iter, &key_len)
key_bytes := key_ptr[:key_len]
if !has_prefix(key_bytes, table_prefix) {
break
}
err: cstring
rocksdb.rocksdb_delete(
engine.db.handle,
engine.db.write_options,
raw_data(key_bytes),
c.size_t(len(key_bytes)),
&err,
)
if err != nil {
rocksdb.rocksdb_free(rawptr(err))
}
rocksdb.rocksdb_iter_next(iter)
}
}
// Delete all GSI entries for this table
gsi_table_prefix := build_gsi_table_prefix(table_name)
defer delete(gsi_table_prefix)
gsi_iter := rocksdb.rocksdb_create_iterator(engine.db.handle, engine.db.read_options)
if gsi_iter != nil {
defer rocksdb.rocksdb_iter_destroy(gsi_iter)
rocksdb.rocksdb_iter_seek(gsi_iter, raw_data(gsi_table_prefix), c.size_t(len(gsi_table_prefix)))
for rocksdb.rocksdb_iter_valid(gsi_iter) != 0 {
key_len: c.size_t
key_ptr := rocksdb.rocksdb_iter_key(gsi_iter, &key_len)
key_bytes := key_ptr[:key_len]
if !has_prefix(key_bytes, gsi_table_prefix) {
break
}
err: cstring
rocksdb.rocksdb_delete(
engine.db.handle,
engine.db.write_options,
raw_data(key_bytes),
c.size_t(len(key_bytes)),
&err,
)
if err != nil {
rocksdb.rocksdb_free(rawptr(err))
}
rocksdb.rocksdb_iter_next(gsi_iter)
}
}
// Delete metadata
del_err := rocksdb.db_delete(&engine.db, meta_key)
if del_err != .None {
return .RocksDB_Error
}
remove_table_lock(engine, table_name)
return .None
}
// ============================================================================
// Item Operations
// ============================================================================
// Put item — uses EXCLUSIVE lock (write operation)
put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Storage_Error {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_lock(table_lock)
defer sync.rw_mutex_unlock(table_lock)
// Get table metadata
metadata, meta_err := get_table_metadata(engine, table_name)
if meta_err != .None {
return meta_err
}
defer table_metadata_destroy(&metadata, engine.allocator)
// Validate key attribute types match schema
validation_err := validate_item_key_types(item, metadata.key_schema, metadata.attribute_definitions)
if validation_err != .None {
return validation_err
}
// Extract key from item
key, key_ok := key_from_item(item, metadata.key_schema)
if !key_ok {
return .Missing_Key_Attribute
}
defer {
pk := key.pk
attr_value_destroy(&pk)
if sk, ok := key.sk.?; ok {
sk_copy := sk
attr_value_destroy(&sk_copy)
}
}
// Get key values
key_values, kv_ok := key_get_values(&key)
if !kv_ok {
return .Invalid_Key
}
// Build storage key
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
defer delete(storage_key)
// --- GSI cleanup: delete OLD GSI entries if item already exists ---
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
if existing_err == .None && existing_value != nil {
defer delete(existing_value)
old_item, decode_ok := decode(existing_value)
if decode_ok {
defer item_destroy(&old_item)
gsi_delete_entries(engine, table_name, old_item, &metadata)
}
}
// Encode item
encoded_item, encode_ok := encode(item)
if !encode_ok {
return .Serialization_Error
}
defer delete(encoded_item)
// Store in RocksDB
put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item)
if put_err != .None {
return .RocksDB_Error
}
// --- GSI maintenance: write NEW GSI entries ---
gsi_err := gsi_write_entries(engine, table_name, item, &metadata)
if gsi_err != .None {
return gsi_err
}
return .None
}
// Get item — uses SHARED lock (read operation)
get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (Maybe(Item), Storage_Error) {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_shared_lock(table_lock)
defer sync.rw_mutex_shared_unlock(table_lock)
// Get table metadata
metadata, meta_err := get_table_metadata(engine, table_name)
if meta_err != .None {
return nil, meta_err
}
defer table_metadata_destroy(&metadata, engine.allocator)
// Extract key
key_struct, key_ok := key_from_item(key, metadata.key_schema)
if !key_ok {
return nil, .Missing_Key_Attribute
}
defer {
pk := key_struct.pk
attr_value_destroy(&pk)
if sk, ok := key_struct.sk.?; ok {
sk_copy := sk
attr_value_destroy(&sk_copy)
}
}
// Get key values
key_values, kv_ok := key_get_values(&key_struct)
if !kv_ok {
return nil, .Invalid_Key
}
// Build storage key
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
defer delete(storage_key)
// Get from RocksDB
value, get_err := rocksdb.db_get(&engine.db, storage_key)
if get_err == .NotFound {
return nil, .None
}
if get_err != .None {
return nil, .RocksDB_Error
}
defer delete(value)
// Decode item
item, decode_ok := decode(value)
if !decode_ok {
return nil, .Serialization_Error
}
return item, .None
}
// Delete item — uses EXCLUSIVE lock (write operation)
delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> Storage_Error {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_lock(table_lock)
defer sync.rw_mutex_unlock(table_lock)
// Get table metadata
metadata, meta_err := get_table_metadata(engine, table_name)
if meta_err != .None {
return meta_err
}
defer table_metadata_destroy(&metadata, engine.allocator)
// Extract key
key_struct, key_ok := key_from_item(key, metadata.key_schema)
if !key_ok {
return .Missing_Key_Attribute
}
defer {
pk := key_struct.pk
attr_value_destroy(&pk)
if sk, ok := key_struct.sk.?; ok {
sk_copy := sk
attr_value_destroy(&sk_copy)
}
}
// Get key values
key_values, kv_ok := key_get_values(&key_struct)
if !kv_ok {
return .Invalid_Key
}
// Build storage key
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
defer delete(storage_key)
// --- GSI cleanup: read existing item to know which GSI entries to remove ---
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
if existing_err == .None && existing_value != nil {
defer delete(existing_value)
old_item, decode_ok := decode(existing_value)
if decode_ok {
defer item_destroy(&old_item)
gsi_delete_entries(engine, table_name, old_item, &metadata)
}
}
// Delete from RocksDB
del_err := rocksdb.db_delete(&engine.db, storage_key)
if del_err != .None {
return .RocksDB_Error
}
return .None
}
// ============================================================================
// Scan — with FIXED pagination
//
// FIX: LastEvaluatedKey must be the key of the LAST RETURNED item, not the
// next unread item. DynamoDB semantics: ExclusiveStartKey resumes
// *after* the given key, so we save the last key we actually returned.
// ============================================================================
scan :: proc(
engine: ^Storage_Engine,
table_name: string,
exclusive_start_key: Maybe([]byte),
limit: int,
) -> (Scan_Result, Storage_Error) {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_shared_lock(table_lock)
defer sync.rw_mutex_shared_unlock(table_lock)
// Verify table exists
metadata, meta_err := get_table_metadata(engine, table_name)
if meta_err != .None {
return {}, meta_err
}
defer table_metadata_destroy(&metadata, engine.allocator)
// Create iterator
iter := rocksdb.rocksdb_create_iterator(engine.db.handle, engine.db.read_options)
if iter == nil {
return {}, .RocksDB_Error
}
defer rocksdb.rocksdb_iter_destroy(iter)
// Build table prefix to scan
table_prefix := build_table_prefix(table_name)
defer delete(table_prefix)
// Seek to start position
if start_key, has_start := exclusive_start_key.?; has_start {
// Resume from pagination token — seek to the key then skip it (exclusive)
rocksdb.rocksdb_iter_seek(iter, raw_data(start_key), c.size_t(len(start_key)))
if rocksdb.rocksdb_iter_valid(iter) != 0 {
rocksdb.rocksdb_iter_next(iter)
}
} else {
// Start from beginning of table
rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix)))
}
max_items := limit if limit > 0 else 1_000_000
// Collect items
items := make([dynamic]Item, context.temp_allocator)
count := 0
last_key: Maybe([]byte) = nil
has_more := false
for rocksdb.rocksdb_iter_valid(iter) != 0 {
// Get current key
key_len: c.size_t
key_ptr := rocksdb.rocksdb_iter_key(iter, &key_len)
key_bytes := key_ptr[:key_len]
// Check if key still has table prefix
if !has_prefix(key_bytes, table_prefix) {
break
}
// Check limit — if we already have enough items, note there's more and stop
if count >= max_items {
has_more = true
break
}
// Get value
value_len: c.size_t
value_ptr := rocksdb.rocksdb_iter_value(iter, &value_len)
value_bytes := value_ptr[:value_len]
// Decode item
item, decode_ok := decode(value_bytes)
if !decode_ok {
// Skip corrupted items
rocksdb.rocksdb_iter_next(iter)
continue
}
append(&items, item)
count += 1
// Track the key of the last successfully returned item
if prev_key, had_prev := last_key.?; had_prev {
delete(prev_key)
}
last_key = slice.clone(key_bytes)
// Move to next
rocksdb.rocksdb_iter_next(iter)
}
// Only emit LastEvaluatedKey if there are more items beyond what we returned
if !has_more {
if lk, had_lk := last_key.?; had_lk {
delete(lk)
}
last_key = nil
}
// Convert to slice
result_items := make([]Item, len(items))
copy(result_items, items[:])
return Scan_Result{
items = result_items,
last_evaluated_key = last_key,
}, .None
}
// ============================================================================
// Query — with sort key condition filtering and FIXED pagination
// ============================================================================
query :: proc(
engine: ^Storage_Engine,
table_name: string,
partition_key_value: []byte,
exclusive_start_key: Maybe([]byte),
limit: int,
sk_condition: Maybe(Sort_Key_Condition) = nil,
) -> (Query_Result, Storage_Error) {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_shared_lock(table_lock)
defer sync.rw_mutex_shared_unlock(table_lock)
// Verify table exists
metadata, meta_err := get_table_metadata(engine, table_name)
if meta_err != .None {
return {}, meta_err
}
defer table_metadata_destroy(&metadata, engine.allocator)
// Build partition prefix
prefix := build_partition_prefix(table_name, partition_key_value)
defer delete(prefix)
iter, iter_err := rocksdb.iter_create(&engine.db)
if iter_err != .None {
return {}, .RocksDB_Error
}
defer rocksdb.iter_destroy(&iter)
max_items := limit if limit > 0 else 1_000_000
// Seek to start position
if start_key, has_start := exclusive_start_key.?; has_start {
if has_prefix(start_key, prefix) {
rocksdb.iter_seek(&iter, start_key)
if rocksdb.iter_valid(&iter) {
rocksdb.iter_next(&iter)
}
} else {
rocksdb.iter_seek(&iter, prefix)
}
} else {
rocksdb.iter_seek(&iter, prefix)
}
items := make([dynamic]Item)
count := 0
last_key: Maybe([]byte) = nil
has_more := false
for rocksdb.iter_valid(&iter) {
key := rocksdb.iter_key(&iter)
if key == nil || !has_prefix(key, prefix) {
break
}
// Hit limit — note there's more and stop
if count >= max_items {
has_more = true
break
}
value := rocksdb.iter_value(&iter)
if value == nil {
rocksdb.iter_next(&iter)
continue
}
item, decode_ok := decode(value)
if !decode_ok {
rocksdb.iter_next(&iter)
continue
}
// ---- Sort key condition filtering ----
if skc, has_skc := sk_condition.?; has_skc {
if !evaluate_sort_key_condition(item, &skc) {
// Item doesn't match SK condition — skip it
item_copy := item
item_destroy(&item_copy)
rocksdb.iter_next(&iter)
continue
}
}
append(&items, item)
count += 1
// Track key of last returned item
if prev_key, had_prev := last_key.?; had_prev {
delete(prev_key)
}
last_key = slice.clone(key)
rocksdb.iter_next(&iter)
}
// Only emit LastEvaluatedKey if there are more items
if !has_more {
if lk, had_lk := last_key.?; had_lk {
delete(lk)
}
last_key = nil
}
result_items := make([]Item, len(items))
copy(result_items, items[:])
return Query_Result{
items = result_items,
last_evaluated_key = last_key,
}, .None
}
// ============================================================================
// Sort Key Condition Evaluation
//
// Extracts the sort key attribute from a decoded item and compares it against
// the parsed Sort_Key_Condition using string comparison (matching DynamoDB's
// byte-level comparison semantics for S/N/B types).
// ============================================================================
evaluate_sort_key_condition :: proc(item: Item, skc: ^Sort_Key_Condition) -> bool {
attr, found := item[skc.sk_name]
if !found {
return false
}
item_sk_str, ok1 := attr_value_to_string_for_compare(attr)
if !ok1 {
return false
}
cond_val_str, ok2 := attr_value_to_string_for_compare(skc.value)
if !ok2 {
return false
}
cmp := strings.compare(item_sk_str, cond_val_str)
switch skc.operator {
case .EQ:
return cmp == 0
case .LT:
return cmp < 0
case .LE:
return cmp <= 0
case .GT:
return cmp > 0
case .GE:
return cmp >= 0
case .BETWEEN:
if v2, has_v2 := skc.value2.?; has_v2 {
upper_str, ok3 := attr_value_to_string_for_compare(v2)
if !ok3 {
return false
}
cmp2 := strings.compare(item_sk_str, upper_str)
return cmp >= 0 && cmp2 <= 0
}
return false
case .BEGINS_WITH:
return strings.has_prefix(item_sk_str, cond_val_str)
}
return false
}
// Extract a comparable string from a scalar AttributeValue
// Package-visible: used by filter.odin for comparisons
attr_value_to_string_for_compare :: proc(attr: Attribute_Value) -> (string, bool) {
#partial switch v in attr {
case String:
return string(v), true
case Number:
return string(v), true
case Binary:
return string(v), true
}
return "", false
}
// ============================================================================
// Item Key Validation
//
// Validates that an item's key attributes match the types declared in
// AttributeDefinitions. E.g., if PK is declared as "S", the item must
// have a String value for that attribute.
// ============================================================================
validate_item_key_types :: proc(
item: Item,
key_schema: []Key_Schema_Element,
attr_defs: []Attribute_Definition,
) -> Storage_Error {
for ks in key_schema {
attr, found := item[ks.attribute_name]
if !found {
return .Missing_Key_Attribute
}
// Find the expected type from attribute definitions
expected_type: Maybe(Scalar_Attribute_Type) = nil
for ad in attr_defs {
if ad.attribute_name == ks.attribute_name {
expected_type = ad.attribute_type
break
}
}
et, has_et := expected_type.?
if !has_et {
continue // No definition found — skip validation (shouldn't happen)
}
// Check actual type matches expected
match := false
#partial switch _ in attr {
case String:
match = (et == .S)
case Number:
match = (et == .N)
case Binary:
match = (et == .B)
}
if !match {
return .Invalid_Key
}
}
return .None
}
// Helper to check if a byte slice has a prefix
has_prefix :: proc(data: []byte, prefix: []byte) -> bool {
if len(data) < len(prefix) {
return false
}
for i in 0..<len(prefix) {
if data[i] != prefix[i] {
return false
}
}
return true
}
// List tables by iterating over meta keys in RocksDB
list_tables :: proc(engine: ^Storage_Engine) -> ([]string, Storage_Error) {
iter, iter_err := rocksdb.iter_create(&engine.db)
if iter_err != .None {
return nil, .RocksDB_Error
}
defer rocksdb.iter_destroy(&iter)
meta_prefix := []byte{u8(Entity_Type.Meta)}
rocksdb.iter_seek(&iter, meta_prefix)
tables := make([dynamic]string)
for rocksdb.iter_valid(&iter) {
key := rocksdb.iter_key(&iter)
if key == nil || len(key) == 0 || key[0] != u8(Entity_Type.Meta) {
break
}
decoder := Key_Decoder{data = key, pos = 0}
_, et_ok := decoder_read_entity_type(&decoder)
if !et_ok {
break
}
tbl_name_bytes, seg_ok := decoder_read_segment_borrowed(&decoder)
if !seg_ok {
break
}
append(&tables, strings.clone(string(tbl_name_bytes)))
rocksdb.iter_next(&iter)
}
return tables[:], .None
}