From 29fe8a60c3e0ea15eab4ce1dc618d4c05d3bd437 Mon Sep 17 00:00:00 2001 From: biondizzle Date: Mon, 16 Feb 2026 09:13:33 -0500 Subject: [PATCH] make GSI less shit --- dynamodb/gsi.odin | 96 ++++++++++++++++++++++++++++++++++++--- dynamodb/storage.odin | 95 ++++++++++++++++++++++++++++++-------- dynamodb/update_item.odin | 54 +++++++++++++++++----- 3 files changed, 209 insertions(+), 36 deletions(-) diff --git a/dynamodb/gsi.odin b/dynamodb/gsi.odin index d33a382..e6fc0ad 100644 --- a/dynamodb/gsi.odin +++ b/dynamodb/gsi.odin @@ -20,6 +20,9 @@ // delete → for each GSI, extract GSI key attrs from the OLD item, delete GSI entry // update → delete OLD GSI entries, write NEW GSI entries // +// ATOMICITY: All GSI operations use WriteBatch to ensure that GSI entries are +// maintained atomically with the base item write/delete. +// package dynamodb import "core:slice" @@ -156,14 +159,95 @@ gsi_project_item :: proc( } // ============================================================================ -// GSI Write Maintenance +// GSI Write Maintenance - ATOMIC via WriteBatch // -// Called after a successful data write to maintain GSI entries. -// Uses WriteBatch for atomicity (all GSI entries for one item in one batch). +// These procedures add GSI operations to a WriteBatch instead of performing +// direct database writes. This ensures atomicity with the base item operation. // ============================================================================ +// Add GSI write operations to a WriteBatch for an item across all GSIs. +// Called during put_item or update_item to maintain NEW GSI entries. +gsi_batch_write_entries :: proc( + batch: ^rocksdb.WriteBatch, + table_name: string, + item: Item, + metadata: ^Table_Metadata, +) -> Storage_Error { + gsis, has_gsis := metadata.global_secondary_indexes.? + if !has_gsis || len(gsis) == 0 { + return .None + } + + for &gsi in gsis { + // Extract GSI key from item + gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema) + if !kv_ok { + continue // Sparse: item doesn't have GSI PK, skip + } + + // Build GSI storage key + gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) + defer delete(gsi_storage_key) + + // Build projected item + projected := gsi_project_item(item, &gsi, metadata.key_schema) + defer item_destroy(&projected) + + // Encode projected item + encoded, encode_ok := encode(projected) + if !encode_ok { + return .Serialization_Error + } + defer delete(encoded) + + // Add to batch (not written yet) + rocksdb.batch_put(batch, gsi_storage_key, encoded) + } + + return .None +} + +// Add GSI delete operations to a WriteBatch for an item across all GSIs. +// Called during delete_item or update_item to remove OLD GSI entries. +// Needs the OLD item to know which GSI keys to remove. +gsi_batch_delete_entries :: proc( + batch: ^rocksdb.WriteBatch, + table_name: string, + old_item: Item, + metadata: ^Table_Metadata, +) -> Storage_Error { + gsis, has_gsis := metadata.global_secondary_indexes.? + if !has_gsis || len(gsis) == 0 { + return .None + } + + for &gsi in gsis { + gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema) + if !kv_ok { + continue // Item didn't have a GSI entry + } + + gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) + defer delete(gsi_storage_key) + + // Add to batch (not written yet) + rocksdb.batch_delete(batch, gsi_storage_key) + } + + return .None +} + +// ============================================================================ +// DEPRECATED - Non-atomic GSI maintenance +// +// These procedures are kept for backwards compatibility but should NOT be used. +// They perform individual database writes which is NOT atomic. +// Use gsi_batch_write_entries and gsi_batch_delete_entries instead. +// ============================================================================ + +// DEPRECATED: Use gsi_batch_write_entries instead for atomic operations. // Write GSI entries for an item across all GSIs defined on the table. -// Should be called AFTER the main data key is written. +// WARNING: This performs individual writes which is NOT atomic! gsi_write_entries :: proc( engine: ^Storage_Engine, table_name: string, @@ -207,9 +291,9 @@ gsi_write_entries :: proc( return .None } +// DEPRECATED: Use gsi_batch_delete_entries instead for atomic operations. // Delete GSI entries for an item across all GSIs. -// Should be called BEFORE or AFTER the main data key is deleted. -// Needs the OLD item to know which GSI keys to remove. +// WARNING: This performs individual writes which is NOT atomic! gsi_delete_entries :: proc( engine: ^Storage_Engine, table_name: string, diff --git a/dynamodb/storage.odin b/dynamodb/storage.odin index b507ec9..42cfbee 100644 --- a/dynamodb/storage.odin +++ b/dynamodb/storage.odin @@ -729,6 +729,7 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err // ============================================================================ // Put item — uses EXCLUSIVE lock (write operation) +// ATOMICITY: Uses WriteBatch to ensure base item + all GSI updates are atomic put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Storage_Error { table_lock := get_or_create_table_lock(engine, table_name) sync.rw_mutex_lock(table_lock) @@ -771,34 +772,59 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) - // --- GSI cleanup: delete OLD GSI entries if item already exists --- + // --- Check if item already exists (need old item for GSI cleanup) --- + old_item: Maybe(Item) = nil existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key) if existing_err == .None && existing_value != nil { defer delete(existing_value) - old_item, decode_ok := decode(existing_value) + decoded_old, decode_ok := decode(existing_value) if decode_ok { - defer item_destroy(&old_item) - gsi_delete_entries(engine, table_name, old_item, &metadata) + old_item = decoded_old + } + } + // Cleanup old_item at the end + defer { + if old, has_old := old_item.?; has_old { + old_copy := old + item_destroy(&old_copy) } } - // Encode item + // Encode new item encoded_item, encode_ok := encode(item) if !encode_ok { return .Serialization_Error } defer delete(encoded_item) - // Store in RocksDB - put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item) - if put_err != .None { + // --- ATOMIC WRITE BATCH: base item + all GSI updates --- + batch, batch_err := rocksdb.batch_create() + if batch_err != .None { return .RocksDB_Error } + defer rocksdb.batch_destroy(&batch) - // --- GSI maintenance: write NEW GSI entries --- - gsi_err := gsi_write_entries(engine, table_name, item, &metadata) - if gsi_err != .None { - return gsi_err + // Add base item write to batch + rocksdb.batch_put(&batch, storage_key, encoded_item) + + // Add old GSI entry deletions to batch (if item existed) + if old, has_old := old_item.?; has_old { + gsi_del_err := gsi_batch_delete_entries(&batch, table_name, old, &metadata) + if gsi_del_err != .None { + return gsi_del_err + } + } + + // Add new GSI entry writes to batch + gsi_write_err := gsi_batch_write_entries(&batch, table_name, item, &metadata) + if gsi_write_err != .None { + return gsi_write_err + } + + // Write batch atomically - ALL or NOTHING + write_err := rocksdb.batch_write(&engine.db, &batch) + if write_err != .None { + return .RocksDB_Error } return .None @@ -861,6 +887,7 @@ get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (May } // Delete item — uses EXCLUSIVE lock (write operation) +// ATOMICITY: Uses WriteBatch to ensure base item + all GSI deletions are atomic delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> Storage_Error { table_lock := get_or_create_table_lock(engine, table_name) sync.rw_mutex_lock(table_lock) @@ -897,20 +924,50 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) - // --- GSI cleanup: read existing item to know which GSI entries to remove --- + // --- Read existing item to know which GSI entries to remove --- + old_item: Maybe(Item) = nil existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key) if existing_err == .None && existing_value != nil { defer delete(existing_value) - old_item, decode_ok := decode(existing_value) + decoded_old, decode_ok := decode(existing_value) if decode_ok { - defer item_destroy(&old_item) - gsi_delete_entries(engine, table_name, old_item, &metadata) + old_item = decoded_old + } + } + // Cleanup old_item at the end + defer { + if old, has_old := old_item.?; has_old { + old_copy := old + item_destroy(&old_copy) } } - // Delete from RocksDB - del_err := rocksdb.db_delete(&engine.db, storage_key) - if del_err != .None { + // If item doesn't exist, nothing to delete (not an error in DynamoDB) + if _, has_old := old_item.?; !has_old { + return .None + } + + // --- ATOMIC WRITE BATCH: base item deletion + all GSI deletions --- + batch, batch_err := rocksdb.batch_create() + if batch_err != .None { + return .RocksDB_Error + } + defer rocksdb.batch_destroy(&batch) + + // Add base item delete to batch + rocksdb.batch_delete(&batch, storage_key) + + // Add GSI entry deletions to batch + if old, has_old := old_item.?; has_old { + gsi_del_err := gsi_batch_delete_entries(&batch, table_name, old, &metadata) + if gsi_del_err != .None { + return gsi_del_err + } + } + + // Write batch atomically - ALL or NOTHING + write_err := rocksdb.batch_write(&engine.db, &batch) + if write_err != .None { return .RocksDB_Error } diff --git a/dynamodb/update_item.odin b/dynamodb/update_item.odin index 81fc503..ea02227 100644 --- a/dynamodb/update_item.odin +++ b/dynamodb/update_item.odin @@ -8,6 +8,7 @@ import "../rocksdb" // UpdateItem — fetch existing item, apply update plan, write back // Uses EXCLUSIVE lock (write operation) +// ATOMICITY: Uses WriteBatch to ensure base item + all GSI updates are atomic // // Returns: // - old_item: the item BEFORE mutations (if it existed), for ReturnValues @@ -59,7 +60,7 @@ update_item :: proc( return nil, nil, .Serialization_Error } existing_item = decoded - // Save old item for ReturnValues + // Save old item for ReturnValues (and for GSI cleanup) old_item = item_deep_copy(existing_item) } else if get_err == .NotFound || existing_encoded == nil { // Item doesn't exist yet — start with just the key attributes @@ -109,9 +110,46 @@ update_item :: proc( } defer delete(encoded_item) - // Write back to RocksDB - put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item) - if put_err != .None { + // --- ATOMIC WRITE BATCH: base item + all GSI updates --- + batch, batch_err := rocksdb.batch_create() + if batch_err != .None { + item_destroy(&existing_item) + if old, has := old_item.?; has { + old_copy := old + item_destroy(&old_copy) + } + return nil, nil, .RocksDB_Error + } + defer rocksdb.batch_destroy(&batch) + + // Add base item write to batch + rocksdb.batch_put(&batch, storage_key, encoded_item) + + // Add old GSI entry deletions to batch (if item existed before) + if old, has := old_item.?; has { + gsi_del_err := gsi_batch_delete_entries(&batch, table_name, old, &metadata) + if gsi_del_err != .None { + item_destroy(&existing_item) + old_copy := old + item_destroy(&old_copy) + return nil, nil, gsi_del_err + } + } + + // Add new GSI entry writes to batch + gsi_write_err := gsi_batch_write_entries(&batch, table_name, existing_item, &metadata) + if gsi_write_err != .None { + item_destroy(&existing_item) + if old, has := old_item.?; has { + old_copy := old + item_destroy(&old_copy) + } + return nil, nil, gsi_write_err + } + + // Write batch atomically - ALL or NOTHING + write_err := rocksdb.batch_write(&engine.db, &batch) + if write_err != .None { item_destroy(&existing_item) if old, has := old_item.?; has { old_copy := old @@ -120,12 +158,6 @@ update_item :: proc( return nil, nil, .RocksDB_Error } - // --- GSI maintenance: delete old entries, write new entries --- - if old, has := old_item.?; has { - gsi_delete_entries(engine, table_name, old, &metadata) - } - gsi_write_entries(engine, table_name, existing_item, &metadata) - new_item = existing_item return old_item, new_item, .None -} +} \ No newline at end of file