diff --git a/dynamodb/transact.odin b/dynamodb/transact.odin index 5a592ee..8ea5d02 100644 --- a/dynamodb/transact.odin +++ b/dynamodb/transact.odin @@ -124,13 +124,11 @@ transact_write_items :: proc( table_set[action.table_name] = true } - // Acquire exclusive locks on all tables in deterministic order - // to prevent deadlocks table_names := make([dynamic]string, allocator = context.temp_allocator) for name in table_set { append(&table_names, name) } - // Simple sort for deterministic lock ordering + // Sort for deterministic lock ordering for i := 0; i < len(table_names); i += 1 { for j := i + 1; j < len(table_names); j += 1 { if table_names[j] < table_names[i] { @@ -146,17 +144,15 @@ transact_write_items :: proc( append(&locks, lock) } defer { - // Release all locks in reverse order for i := len(locks) - 1; i >= 0; i -= 1 { sync.rw_mutex_unlock(locks[i]) } } - // ---- Step 2: Pre-flight — fetch metadata and existing items, evaluate conditions ---- + // ---- Step 2: Fetch metadata and evaluate conditions ---- reasons := make([]Cancellation_Reason, len(actions)) any_failed := false - // Cache table metadata to avoid redundant lookups metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator) defer { for _, meta in metadata_cache { @@ -166,7 +162,6 @@ transact_write_items :: proc( } for action, idx in actions { - // Get table metadata (cached) metadata: ^Table_Metadata if cached, found := &metadata_cache[action.table_name]; found { metadata = cached @@ -184,12 +179,11 @@ transact_write_items :: proc( metadata = &metadata_cache[action.table_name] } - // Determine the key item for this action key_item: Item switch action.type { case .Put: if item, has := action.item.?; has { - key_item = item // For Put, key is extracted from the item + key_item = item } else { reasons[idx] = Cancellation_Reason{ code = "ValidationError", @@ -211,9 +205,8 @@ transact_write_items :: proc( } } - // Evaluate ConditionExpression if present + // Evaluate ConditionExpression if cond_str, has_cond := action.condition_expr.?; has_cond { - // Fetch existing item existing_item, get_err := get_item_internal(engine, action.table_name, key_item, metadata) if get_err != .None && get_err != .Item_Not_Found { reasons[idx] = Cancellation_Reason{ @@ -230,7 +223,6 @@ transact_write_items :: proc( } } - // Parse and evaluate condition filter_node, parse_ok := parse_filter_expression( cond_str, action.expr_attr_names, action.expr_attr_values, ) @@ -263,13 +255,12 @@ transact_write_items :: proc( } } - // ConditionCheck actions only validate — they don't mutate if action.type == .Condition_Check { reasons[idx] = Cancellation_Reason{code = "None"} continue } - // Validate key/item against schema + // Validate key/item switch action.type { case .Put: if item, has := action.item.?; has { @@ -286,58 +277,112 @@ transact_write_items :: proc( } } case .Delete, .Update: - // Key validation happens during execution + // Key validation happens during batch building case .Condition_Check: - // Already handled above + // Already handled } reasons[idx] = Cancellation_Reason{code = "None"} } - // ---- Step 3: If any condition failed, return cancellation ---- if any_failed { result.cancellation_reasons = reasons return result, .Cancelled } - // ---- Step 4: Apply all mutations ---- - for &action, idx in actions { + // ---- Step 3: Build atomic WriteBatch with all operations ---- + batch, batch_err := rocksdb.batch_create() + if batch_err != .None { + result.cancellation_reasons = reasons + return result, .Internal_Error + } + defer rocksdb.batch_destroy(&batch) + + // Read old items for GSI cleanup (must happen before batch write) + old_items := make([]Maybe(Item), len(actions), allocator = context.temp_allocator) + defer { + for old_item in old_items { + if old, has := old_item.?; has { + old_copy := old + item_destroy(&old_copy) + } + } + } + + for action, idx in actions { + if action.type == .Condition_Check { + continue + } + metadata := &metadata_cache[action.table_name] - apply_err := transact_apply_action(engine, &action, metadata) + // Read old item if needed for GSI cleanup + key_item: Item + #partial switch action.type { + case .Put: + if item, has := action.item.?; has { + key_item = item + } + case .Delete, .Update: + if key, has := action.key.?; has { + key_item = key + } + } + + existing, _ := get_item_internal(engine, action.table_name, key_item, metadata) + old_items[idx] = existing + } + + // Add all operations to batch + for &action, idx in actions { + if action.type == .Condition_Check { + continue + } + + metadata := &metadata_cache[action.table_name] + old_item := old_items[idx] + + apply_err := transact_apply_action_batch(&batch, engine, &action, metadata, old_item) if apply_err != .None { - // This shouldn't happen after pre-validation, but handle gracefully reasons[idx] = Cancellation_Reason{ code = "InternalError", - message = "Failed to apply mutation", + message = "Failed to build mutation", } - // In a real impl we'd need to rollback. For now, report the failure. result.cancellation_reasons = reasons return result, .Internal_Error } } + // ---- Step 4: Write batch atomically (ALL or NOTHING) ---- + write_err := rocksdb.batch_write(&engine.db, &batch) + if write_err != .None { + result.cancellation_reasons = reasons + return result, .Internal_Error + } + delete(reasons) return result, .None } // Apply a single transact write action (called after all conditions have passed) @(private = "file") -transact_apply_action :: proc( +transact_apply_action_batch :: proc( + batch: ^rocksdb.WriteBatch, engine: ^Storage_Engine, action: ^Transact_Write_Action, metadata: ^Table_Metadata, + old_item: Maybe(Item), ) -> Storage_Error { switch action.type { case .Put: if item, has := action.item.?; has { - return put_item_internal(engine, action.table_name, item, metadata) + return put_item_batch(batch, engine, action.table_name, item, metadata, old_item) } return .Invalid_Key case .Delete: if key, has := action.key.?; has { - return delete_item_internal(engine, action.table_name, key, metadata) + return delete_item_batch(batch, engine, action.table_name, key, metadata, old_item) } return .Invalid_Key @@ -345,19 +390,177 @@ transact_apply_action :: proc( if key, has := action.key.?; has { if plan, has_plan := action.update_plan.?; has_plan { plan_copy := plan - _, _, err := update_item_internal(engine, action.table_name, key, &plan_copy, metadata) - return err + return update_item_batch(batch, engine, action.table_name, key, &plan_copy, metadata, old_item) } return .Invalid_Key } return .Invalid_Key case .Condition_Check: - return .None // No mutation + return .None } return .None } +@(private = "file") +put_item_batch :: proc( + batch: ^rocksdb.WriteBatch, + engine: ^Storage_Engine, + table_name: string, + item: Item, + metadata: ^Table_Metadata, + old_item: Maybe(Item), +) -> Storage_Error { + key_struct, key_ok := key_from_item(item, metadata.key_schema) + if !key_ok { + return .Missing_Key_Attribute + } + defer key_destroy(&key_struct) + + key_values, kv_ok := key_get_values(&key_struct) + if !kv_ok { + return .Invalid_Key + } + + storage_key := build_data_key(table_name, key_values.pk, key_values.sk) + defer delete(storage_key) + + encoded_item, encode_ok := encode(item) + if !encode_ok { + return .Serialization_Error + } + defer delete(encoded_item) + + // Add base item to batch + rocksdb.batch_put(batch, storage_key, encoded_item) + + // Add old GSI deletions to batch + if old, has_old := old_item.?; has_old { + gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata) + if gsi_del_err != .None { + return gsi_del_err + } + } + + // Add new GSI writes to batch + gsi_write_err := gsi_batch_write_entries(batch, table_name, item, metadata) + if gsi_write_err != .None { + return gsi_write_err + } + + return .None +} + +// Add delete operation to batch (with GSI cleanup) +@(private = "file") +delete_item_batch :: proc( + batch: ^rocksdb.WriteBatch, + engine: ^Storage_Engine, + table_name: string, + key: Item, + metadata: ^Table_Metadata, + old_item: Maybe(Item), +) -> Storage_Error { + key_struct, key_ok := key_from_item(key, metadata.key_schema) + if !key_ok { + return .Missing_Key_Attribute + } + defer key_destroy(&key_struct) + + key_values, kv_ok := key_get_values(&key_struct) + if !kv_ok { + return .Invalid_Key + } + + storage_key := build_data_key(table_name, key_values.pk, key_values.sk) + defer delete(storage_key) + + // Add base item delete to batch + rocksdb.batch_delete(batch, storage_key) + + // Add GSI deletions to batch + if old, has_old := old_item.?; has_old { + gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata) + if gsi_del_err != .None { + return gsi_del_err + } + } + + return .None +} + +// Add update operation to batch (with GSI maintenance) +@(private = "file") +update_item_batch :: proc( + batch: ^rocksdb.WriteBatch, + engine: ^Storage_Engine, + table_name: string, + key_item: Item, + plan: ^Update_Plan, + metadata: ^Table_Metadata, + old_item_pre: Maybe(Item), +) -> Storage_Error { + key_struct, key_ok := key_from_item(key_item, metadata.key_schema) + if !key_ok { + return .Missing_Key_Attribute + } + defer key_destroy(&key_struct) + + key_values, kv_ok := key_get_values(&key_struct) + if !kv_ok { + return .Invalid_Key + } + + storage_key := build_data_key(table_name, key_values.pk, key_values.sk) + defer delete(storage_key) + + // Start with existing item or create new + existing_item: Item + if old, has_old := old_item_pre.?; has_old { + existing_item = item_deep_copy(old) + } else { + existing_item = make(Item) + for ks in metadata.key_schema { + if val, found := key_item[ks.attribute_name]; found { + existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) + } + } + } + defer item_destroy(&existing_item) + + // Apply update plan + if !execute_update_plan(&existing_item, plan) { + return .Invalid_Key + } + + // Encode updated item + encoded_item, encode_ok := encode(existing_item) + if !encode_ok { + return .Serialization_Error + } + defer delete(encoded_item) + + // Add base item to batch + rocksdb.batch_put(batch, storage_key, encoded_item) + + // Add old GSI deletions to batch + if old, has_old := old_item_pre.?; has_old { + gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata) + if gsi_del_err != .None { + return gsi_del_err + } + } + + // Add new GSI writes to batch + gsi_write_err := gsi_batch_write_entries(batch, table_name, existing_item, metadata) + if gsi_write_err != .None { + return gsi_write_err + } + + return .None +} + + // ============================================================================ // Internal storage operations that skip lock acquisition // (Used by transact_write_items which manages its own locking) @@ -400,146 +603,6 @@ get_item_internal :: proc( return item, .None } -put_item_internal :: proc( - engine: ^Storage_Engine, - table_name: string, - item: Item, - metadata: ^Table_Metadata, -) -> Storage_Error { - key_struct, key_ok := key_from_item(item, metadata.key_schema) - if !key_ok { - return .Missing_Key_Attribute - } - defer key_destroy(&key_struct) - - key_values, kv_ok := key_get_values(&key_struct) - if !kv_ok { - return .Invalid_Key - } - - storage_key := build_data_key(table_name, key_values.pk, key_values.sk) - defer delete(storage_key) - - encoded_item, encode_ok := encode(item) - if !encode_ok { - return .Serialization_Error - } - defer delete(encoded_item) - - put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item) - if put_err != .None { - return .RocksDB_Error - } - - return .None -} - -delete_item_internal :: proc( - engine: ^Storage_Engine, - table_name: string, - key: Item, - metadata: ^Table_Metadata, -) -> Storage_Error { - key_struct, key_ok := key_from_item(key, metadata.key_schema) - if !key_ok { - return .Missing_Key_Attribute - } - defer key_destroy(&key_struct) - - key_values, kv_ok := key_get_values(&key_struct) - if !kv_ok { - return .Invalid_Key - } - - storage_key := build_data_key(table_name, key_values.pk, key_values.sk) - defer delete(storage_key) - - del_err := rocksdb.db_delete(&engine.db, storage_key) - if del_err != .None { - return .RocksDB_Error - } - - return .None -} - -update_item_internal :: proc( - engine: ^Storage_Engine, - table_name: string, - key_item: Item, - plan: ^Update_Plan, - metadata: ^Table_Metadata, -) -> (old_item: Maybe(Item), new_item: Maybe(Item), err: Storage_Error) { - key_struct, key_ok := key_from_item(key_item, metadata.key_schema) - if !key_ok { - return nil, nil, .Missing_Key_Attribute - } - defer key_destroy(&key_struct) - - key_values, kv_ok := key_get_values(&key_struct) - if !kv_ok { - return nil, nil, .Invalid_Key - } - - storage_key := build_data_key(table_name, key_values.pk, key_values.sk) - defer delete(storage_key) - - // Fetch existing item - existing_encoded, get_err := rocksdb.db_get(&engine.db, storage_key) - existing_item: Item - - if get_err == .None && existing_encoded != nil { - defer delete(existing_encoded) - decoded, decode_ok := decode(existing_encoded) - if !decode_ok { - return nil, nil, .Serialization_Error - } - existing_item = decoded - old_item = item_deep_copy(existing_item) - } else if get_err == .NotFound || existing_encoded == nil { - existing_item = make(Item) - for ks in metadata.key_schema { - if val, found := key_item[ks.attribute_name]; found { - existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) - } - } - } else { - return nil, nil, .RocksDB_Error - } - - if !execute_update_plan(&existing_item, plan) { - item_destroy(&existing_item) - if old, has := old_item.?; has { - old_copy := old - item_destroy(&old_copy) - } - return nil, nil, .Invalid_Key - } - - encoded_item, encode_ok := encode(existing_item) - if !encode_ok { - item_destroy(&existing_item) - if old, has := old_item.?; has { - old_copy := old - item_destroy(&old_copy) - } - return nil, nil, .Serialization_Error - } - defer delete(encoded_item) - - put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item) - if put_err != .None { - item_destroy(&existing_item) - if old, has := old_item.?; has { - old_copy := old - item_destroy(&old_copy) - } - return nil, nil, .RocksDB_Error - } - - new_item = existing_item - return old_item, new_item, .None -} - // ============================================================================ // TransactGetItems Types // ============================================================================