fix storage issues

This commit is contained in:
2026-02-15 20:57:16 -05:00
parent 280ce15b07
commit b510c000ec
5 changed files with 577 additions and 188 deletions

View File

@@ -527,7 +527,7 @@ create_table :: proc(
return desc, .None
}
// Delete table
// Delete table — removes metadata AND all items with the table's data prefix
delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Error {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_lock(table_lock)
@@ -546,15 +546,48 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err
}
delete(existing)
// Delete all data items using a prefix scan
table_prefix := build_table_prefix(table_name)
defer delete(table_prefix)
iter := rocksdb.rocksdb_create_iterator(engine.db.handle, engine.db.read_options)
if iter != nil {
defer rocksdb.rocksdb_iter_destroy(iter)
rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix)))
for rocksdb.rocksdb_iter_valid(iter) != 0 {
key_len: c.size_t
key_ptr := rocksdb.rocksdb_iter_key(iter, &key_len)
key_bytes := key_ptr[:key_len]
if !has_prefix(key_bytes, table_prefix) {
break
}
// Delete this item
err: cstring
rocksdb.rocksdb_delete(
engine.db.handle,
engine.db.write_options,
raw_data(key_bytes),
c.size_t(len(key_bytes)),
&err,
)
if err != nil {
rocksdb.rocksdb_free(rawptr(err))
}
rocksdb.rocksdb_iter_next(iter)
}
}
// Delete metadata
del_err := rocksdb.db_delete(&engine.db, meta_key)
if del_err != .None {
return .RocksDB_Error
}
// TODO: Delete all items in table using iterator
// For now, just delete metadata
remove_table_lock(engine, table_name)
return .None
}
@@ -563,11 +596,11 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err
// Item Operations
// ============================================================================
// Put item
// Put item — uses EXCLUSIVE lock (write operation)
put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Storage_Error {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_shared_lock(table_lock)
defer sync.rw_mutex_shared_unlock(table_lock)
sync.rw_mutex_lock(table_lock)
defer sync.rw_mutex_unlock(table_lock)
// Get table metadata
metadata, meta_err := get_table_metadata(engine, table_name)
@@ -576,6 +609,12 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto
}
defer table_metadata_destroy(&metadata, engine.allocator)
// Validate key attribute types match schema
validation_err := validate_item_key_types(item, metadata.key_schema, metadata.attribute_definitions)
if validation_err != .None {
return validation_err
}
// Extract key from item
key, key_ok := key_from_item(item, metadata.key_schema)
if !key_ok {
@@ -616,7 +655,7 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto
return .None
}
// Get item
// Get item — uses SHARED lock (read operation)
get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (Maybe(Item), Storage_Error) {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_shared_lock(table_lock)
@@ -672,11 +711,11 @@ get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (May
return item, .None
}
// Delete item
// Delete item — uses EXCLUSIVE lock (write operation)
delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> Storage_Error {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_shared_lock(table_lock)
defer sync.rw_mutex_shared_unlock(table_lock)
sync.rw_mutex_lock(table_lock)
defer sync.rw_mutex_unlock(table_lock)
// Get table metadata
metadata, meta_err := get_table_metadata(engine, table_name)
@@ -718,6 +757,14 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
return .None
}
// ============================================================================
// Scan — with FIXED pagination
//
// FIX: LastEvaluatedKey must be the key of the LAST RETURNED item, not the
// next unread item. DynamoDB semantics: ExclusiveStartKey resumes
// *after* the given key, so we save the last key we actually returned.
// ============================================================================
scan :: proc(
engine: ^Storage_Engine,
table_name: string,
@@ -748,9 +795,8 @@ scan :: proc(
// Seek to start position
if start_key, has_start := exclusive_start_key.?; has_start {
// Resume from pagination token
// Resume from pagination token — seek to the key then skip it (exclusive)
rocksdb.rocksdb_iter_seek(iter, raw_data(start_key), c.size_t(len(start_key)))
// Skip the start key itself (it's exclusive)
if rocksdb.rocksdb_iter_valid(iter) != 0 {
rocksdb.rocksdb_iter_next(iter)
}
@@ -759,10 +805,13 @@ scan :: proc(
rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix)))
}
max_items := limit if limit > 0 else 1_000_000
// Collect items
items := make([dynamic]Item, context.temp_allocator)
count := 0
last_key: Maybe([]byte) = nil
has_more := false
for rocksdb.rocksdb_iter_valid(iter) != 0 {
// Get current key
@@ -775,10 +824,9 @@ scan :: proc(
break
}
// Check limit
if count >= limit {
// Save this key as pagination token
last_key = slice.clone(key_bytes, engine.allocator)
// Check limit — if we already have enough items, note there's more and stop
if count >= max_items {
has_more = true
break
}
@@ -798,12 +846,26 @@ scan :: proc(
append(&items, item)
count += 1
// Track the key of the last successfully returned item
if prev_key, had_prev := last_key.?; had_prev {
delete(prev_key)
}
last_key = slice.clone(key_bytes)
// Move to next
rocksdb.rocksdb_iter_next(iter)
}
// Convert to slice (owned by caller's allocator)
result_items := make([]Item, len(items), engine.allocator)
// Only emit LastEvaluatedKey if there are more items beyond what we returned
if !has_more {
if lk, had_lk := last_key.?; had_lk {
delete(lk)
}
last_key = nil
}
// Convert to slice
result_items := make([]Item, len(items))
copy(result_items, items[:])
return Scan_Result{
@@ -812,13 +874,17 @@ scan :: proc(
}, .None
}
// Query items by partition key with optional pagination
// ============================================================================
// Query — with sort key condition filtering and FIXED pagination
// ============================================================================
query :: proc(
engine: ^Storage_Engine,
table_name: string,
partition_key_value: []byte,
exclusive_start_key: Maybe([]byte),
limit: int,
sk_condition: Maybe(Sort_Key_Condition) = nil,
) -> (Query_Result, Storage_Error) {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_shared_lock(table_lock)
@@ -860,6 +926,7 @@ query :: proc(
items := make([dynamic]Item)
count := 0
last_key: Maybe([]byte) = nil
has_more := false
for rocksdb.iter_valid(&iter) {
key := rocksdb.iter_key(&iter)
@@ -867,9 +934,9 @@ query :: proc(
break
}
// Hit limit — save this key as pagination token and stop
// Hit limit — note there's more and stop
if count >= max_items {
last_key = slice.clone(key)
has_more = true
break
}
@@ -885,11 +952,37 @@ query :: proc(
continue
}
// ---- Sort key condition filtering ----
if skc, has_skc := sk_condition.?; has_skc {
if !evaluate_sort_key_condition(item, &skc) {
// Item doesn't match SK condition — skip it
item_copy := item
item_destroy(&item_copy)
rocksdb.iter_next(&iter)
continue
}
}
append(&items, item)
count += 1
// Track key of last returned item
if prev_key, had_prev := last_key.?; had_prev {
delete(prev_key)
}
last_key = slice.clone(key)
rocksdb.iter_next(&iter)
}
// Only emit LastEvaluatedKey if there are more items
if !has_more {
if lk, had_lk := last_key.?; had_lk {
delete(lk)
}
last_key = nil
}
result_items := make([]Item, len(items))
copy(result_items, items[:])
@@ -899,6 +992,126 @@ query :: proc(
}, .None
}
// ============================================================================
// Sort Key Condition Evaluation
//
// Extracts the sort key attribute from a decoded item and compares it against
// the parsed Sort_Key_Condition using string comparison (matching DynamoDB's
// byte-level comparison semantics for S/N/B types).
// ============================================================================
evaluate_sort_key_condition :: proc(item: Item, skc: ^Sort_Key_Condition) -> bool {
attr, found := item[skc.sk_name]
if !found {
return false
}
item_sk_str, ok1 := attr_value_to_string_for_compare(attr)
if !ok1 {
return false
}
cond_val_str, ok2 := attr_value_to_string_for_compare(skc.value)
if !ok2 {
return false
}
cmp := strings.compare(item_sk_str, cond_val_str)
switch skc.operator {
case .EQ:
return cmp == 0
case .LT:
return cmp < 0
case .LE:
return cmp <= 0
case .GT:
return cmp > 0
case .GE:
return cmp >= 0
case .BETWEEN:
if v2, has_v2 := skc.value2.?; has_v2 {
upper_str, ok3 := attr_value_to_string_for_compare(v2)
if !ok3 {
return false
}
cmp2 := strings.compare(item_sk_str, upper_str)
return cmp >= 0 && cmp2 <= 0
}
return false
case .BEGINS_WITH:
return strings.has_prefix(item_sk_str, cond_val_str)
}
return false
}
// Extract a comparable string from a scalar AttributeValue
@(private = "file")
attr_value_to_string_for_compare :: proc(attr: Attribute_Value) -> (string, bool) {
#partial switch v in attr {
case String:
return string(v), true
case Number:
return string(v), true
case Binary:
return string(v), true
}
return "", false
}
// ============================================================================
// Item Key Validation
//
// Validates that an item's key attributes match the types declared in
// AttributeDefinitions. E.g., if PK is declared as "S", the item must
// have a String value for that attribute.
// ============================================================================
validate_item_key_types :: proc(
item: Item,
key_schema: []Key_Schema_Element,
attr_defs: []Attribute_Definition,
) -> Storage_Error {
for ks in key_schema {
attr, found := item[ks.attribute_name]
if !found {
return .Missing_Key_Attribute
}
// Find the expected type from attribute definitions
expected_type: Maybe(Scalar_Attribute_Type) = nil
for ad in attr_defs {
if ad.attribute_name == ks.attribute_name {
expected_type = ad.attribute_type
break
}
}
et, has_et := expected_type.?
if !has_et {
continue // No definition found — skip validation (shouldn't happen)
}
// Check actual type matches expected
match := false
#partial switch _ in attr {
case String:
match = (et == .S)
case Number:
match = (et == .N)
case Binary:
match = (et == .B)
}
if !match {
return .Invalid_Key
}
}
return .None
}
// Helper to check if a byte slice has a prefix
has_prefix :: proc(data: []byte, prefix: []byte) -> bool {
if len(data) < len(prefix) {
@@ -947,4 +1160,4 @@ list_tables :: proc(engine: ^Storage_Engine) -> ([]string, Storage_Error) {
}
return tables[:], .None
}
}