// TransactWriteItems and TransactGetItems storage operations // // TransactWriteItems: Atomic write of up to 100 items across multiple tables. // - Supports Put, Delete, Update, and ConditionCheck actions // - ALL actions succeed or ALL fail (all-or-nothing) // - ConditionExpressions are evaluated BEFORE any mutations // - Uses exclusive locks on all involved tables // // TransactGetItems: Atomic read of up to 100 items across multiple tables. // - Each item specifies TableName + Key + optional ProjectionExpression // - All reads are consistent (snapshot isolation via table locks) package dynamodb import "core:strings" import "core:sync" import "../rocksdb" // ============================================================================ // TransactWriteItems Types // ============================================================================ Transact_Write_Action_Type :: enum { Put, Delete, Update, Condition_Check, } Transact_Write_Action :: struct { type: Transact_Write_Action_Type, table_name: string, // For Put: the full item to write item: Maybe(Item), // For Delete/Update/ConditionCheck: the key item key: Maybe(Item), // For Update: the parsed update plan update_plan: Maybe(Update_Plan), // ConditionExpression components (shared across all action types) condition_expr: Maybe(string), expr_attr_names: Maybe(map[string]string), expr_attr_values: map[string]Attribute_Value, // For Update: ReturnValuesOnConditionCheckFailure (not implemented yet, placeholder) } Transact_Write_Result :: struct { // For now, either all succeed (no error) or we return a // TransactionCanceledException with reasons per action. cancellation_reasons: []Cancellation_Reason, } Cancellation_Reason :: struct { code: string, // "None", "ConditionalCheckFailed", "ValidationError", etc. message: string, } transact_write_action_destroy :: proc(action: ^Transact_Write_Action) { if item, has := action.item.?; has { item_copy := item item_destroy(&item_copy) } if key, has := action.key.?; has { key_copy := key item_destroy(&key_copy) } if plan, has := action.update_plan.?; has { plan_copy := plan update_plan_destroy(&plan_copy) } if names, has := action.expr_attr_names.?; has { for k, v in names { delete(k) delete(v) } names_copy := names delete(names_copy) } for k, v in action.expr_attr_values { delete(k) v_copy := v attr_value_destroy(&v_copy) } delete(action.expr_attr_values) } transact_write_result_destroy :: proc(result: ^Transact_Write_Result) { if result.cancellation_reasons != nil { delete(result.cancellation_reasons) } } // ============================================================================ // TransactWriteItems — Execute an atomic batch of write operations // // DynamoDB semantics: // 1. Acquire exclusive locks on all involved tables // 2. Evaluate ALL ConditionExpressions (pre-flight check) // 3. If any condition fails → cancel entire transaction // 4. If all pass → apply all mutations // 5. Release locks // // Returns .None on success, Transaction_Cancelled on condition failure. // ============================================================================ Transaction_Error :: enum { None, Cancelled, // One or more conditions failed Validation_Error, // Bad request data Internal_Error, // Storage/serialization failure } transact_write_items :: proc( engine: ^Storage_Engine, actions: []Transact_Write_Action, ) -> (Transact_Write_Result, Transaction_Error) { result: Transact_Write_Result if len(actions) == 0 { return result, .Validation_Error } // ---- Step 1: Collect unique table names and acquire locks ---- table_set := make(map[string]bool, allocator = context.temp_allocator) for action in actions { table_set[action.table_name] = true } table_names := make([dynamic]string, allocator = context.temp_allocator) for name in table_set { append(&table_names, name) } // Sort for deterministic lock ordering for i := 0; i < len(table_names); i += 1 { for j := i + 1; j < len(table_names); j += 1 { if table_names[j] < table_names[i] { table_names[i], table_names[j] = table_names[j], table_names[i] } } } locks := make([dynamic]^sync.RW_Mutex, allocator = context.temp_allocator) for name in table_names { lock := get_or_create_table_lock(engine, name) sync.rw_mutex_lock(lock) append(&locks, lock) } defer { for i := len(locks) - 1; i >= 0; i -= 1 { sync.rw_mutex_unlock(locks[i]) } } // ---- Step 2: Fetch metadata and evaluate conditions ---- reasons := make([]Cancellation_Reason, len(actions)) any_failed := false metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator) defer { for _, meta in metadata_cache { meta_copy := meta table_metadata_destroy(&meta_copy, engine.allocator) } } for action, idx in actions { metadata: ^Table_Metadata if cached, found := &metadata_cache[action.table_name]; found { metadata = cached } else { meta, meta_err := get_table_metadata(engine, action.table_name) if meta_err != .None { reasons[idx] = Cancellation_Reason{ code = "ValidationError", message = "Table not found", } any_failed = true continue } metadata_cache[action.table_name] = meta metadata = &metadata_cache[action.table_name] } key_item: Item switch action.type { case .Put: if item, has := action.item.?; has { key_item = item } else { reasons[idx] = Cancellation_Reason{ code = "ValidationError", message = "Put action missing Item", } any_failed = true continue } case .Delete, .Update, .Condition_Check: if key, has := action.key.?; has { key_item = key } else { reasons[idx] = Cancellation_Reason{ code = "ValidationError", message = "Action missing Key", } any_failed = true continue } } // Evaluate ConditionExpression if cond_str, has_cond := action.condition_expr.?; has_cond { existing_item, get_err := get_item_internal(engine, action.table_name, key_item, metadata) if get_err != .None && get_err != .Item_Not_Found { reasons[idx] = Cancellation_Reason{ code = "InternalError", message = "Failed to read existing item", } any_failed = true continue } defer { if ex, has_ex := existing_item.?; has_ex { ex_copy := ex item_destroy(&ex_copy) } } filter_node, parse_ok := parse_filter_expression( cond_str, action.expr_attr_names, action.expr_attr_values, ) if !parse_ok || filter_node == nil { reasons[idx] = Cancellation_Reason{ code = "ValidationError", message = "Invalid ConditionExpression", } any_failed = true continue } defer { filter_node_destroy(filter_node) } eval_item: Item if item, has_item := existing_item.?; has_item { eval_item = item } else { eval_item = Item{} } if !evaluate_filter(eval_item, filter_node) { reasons[idx] = Cancellation_Reason{ code = "ConditionalCheckFailed", message = "The conditional request failed", } any_failed = true continue } } if action.type == .Condition_Check { reasons[idx] = Cancellation_Reason{code = "None"} continue } // Validate key/item switch action.type { case .Put: if item, has := action.item.?; has { validation_err := validate_item_key_types( item, metadata.key_schema, metadata.attribute_definitions, ) if validation_err != .None { reasons[idx] = Cancellation_Reason{ code = "ValidationError", message = "Key attribute type mismatch", } any_failed = true continue } } case .Delete, .Update: // Key validation happens during batch building case .Condition_Check: // Already handled } reasons[idx] = Cancellation_Reason{code = "None"} } if any_failed { result.cancellation_reasons = reasons return result, .Cancelled } // ---- Step 3: Build atomic WriteBatch with all operations ---- batch, batch_err := rocksdb.batch_create() if batch_err != .None { result.cancellation_reasons = reasons return result, .Internal_Error } defer rocksdb.batch_destroy(&batch) // Read old items for GSI cleanup (must happen before batch write) old_items := make([]Maybe(Item), len(actions), allocator = context.temp_allocator) defer { for old_item in old_items { if old, has := old_item.?; has { old_copy := old item_destroy(&old_copy) } } } for action, idx in actions { if action.type == .Condition_Check { continue } metadata := &metadata_cache[action.table_name] // Read old item if needed for GSI cleanup key_item: Item #partial switch action.type { case .Put: if item, has := action.item.?; has { key_item = item } case .Delete, .Update: if key, has := action.key.?; has { key_item = key } } existing, _ := get_item_internal(engine, action.table_name, key_item, metadata) old_items[idx] = existing } // Add all operations to batch for &action, idx in actions { if action.type == .Condition_Check { continue } metadata := &metadata_cache[action.table_name] old_item := old_items[idx] apply_err := transact_apply_action_batch(&batch, engine, &action, metadata, old_item) if apply_err != .None { reasons[idx] = Cancellation_Reason{ code = "InternalError", message = "Failed to build mutation", } result.cancellation_reasons = reasons return result, .Internal_Error } } // ---- Step 4: Write batch atomically (ALL or NOTHING) ---- write_err := rocksdb.batch_write(&engine.db, &batch) if write_err != .None { result.cancellation_reasons = reasons return result, .Internal_Error } delete(reasons) return result, .None } // Apply a single transact write action (called after all conditions have passed) @(private = "file") transact_apply_action_batch :: proc( batch: ^rocksdb.WriteBatch, engine: ^Storage_Engine, action: ^Transact_Write_Action, metadata: ^Table_Metadata, old_item: Maybe(Item), ) -> Storage_Error { switch action.type { case .Put: if item, has := action.item.?; has { return put_item_batch(batch, engine, action.table_name, item, metadata, old_item) } return .Invalid_Key case .Delete: if key, has := action.key.?; has { return delete_item_batch(batch, engine, action.table_name, key, metadata, old_item) } return .Invalid_Key case .Update: if key, has := action.key.?; has { if plan, has_plan := action.update_plan.?; has_plan { plan_copy := plan return update_item_batch(batch, engine, action.table_name, key, &plan_copy, metadata, old_item) } return .Invalid_Key } return .Invalid_Key case .Condition_Check: return .None } return .None } @(private = "file") put_item_batch :: proc( batch: ^rocksdb.WriteBatch, engine: ^Storage_Engine, table_name: string, item: Item, metadata: ^Table_Metadata, old_item: Maybe(Item), ) -> Storage_Error { key_struct, key_ok := key_from_item(item, metadata.key_schema) if !key_ok { return .Missing_Key_Attribute } defer key_destroy(&key_struct) key_values, kv_ok := key_get_values(&key_struct) if !kv_ok { return .Invalid_Key } storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) encoded_item, encode_ok := encode(item) if !encode_ok { return .Serialization_Error } defer delete(encoded_item) // Add base item to batch rocksdb.batch_put(batch, storage_key, encoded_item) // Add old GSI deletions to batch if old, has_old := old_item.?; has_old { gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata) if gsi_del_err != .None { return gsi_del_err } } // Add new GSI writes to batch gsi_write_err := gsi_batch_write_entries(batch, table_name, item, metadata) if gsi_write_err != .None { return gsi_write_err } return .None } // Add delete operation to batch (with GSI cleanup) @(private = "file") delete_item_batch :: proc( batch: ^rocksdb.WriteBatch, engine: ^Storage_Engine, table_name: string, key: Item, metadata: ^Table_Metadata, old_item: Maybe(Item), ) -> Storage_Error { key_struct, key_ok := key_from_item(key, metadata.key_schema) if !key_ok { return .Missing_Key_Attribute } defer key_destroy(&key_struct) key_values, kv_ok := key_get_values(&key_struct) if !kv_ok { return .Invalid_Key } storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) // Add base item delete to batch rocksdb.batch_delete(batch, storage_key) // Add GSI deletions to batch if old, has_old := old_item.?; has_old { gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata) if gsi_del_err != .None { return gsi_del_err } } return .None } // Add update operation to batch (with GSI maintenance) @(private = "file") update_item_batch :: proc( batch: ^rocksdb.WriteBatch, engine: ^Storage_Engine, table_name: string, key_item: Item, plan: ^Update_Plan, metadata: ^Table_Metadata, old_item_pre: Maybe(Item), ) -> Storage_Error { key_struct, key_ok := key_from_item(key_item, metadata.key_schema) if !key_ok { return .Missing_Key_Attribute } defer key_destroy(&key_struct) key_values, kv_ok := key_get_values(&key_struct) if !kv_ok { return .Invalid_Key } storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) // Start with existing item or create new existing_item: Item if old, has_old := old_item_pre.?; has_old { existing_item = item_deep_copy(old) } else { existing_item = make(Item) for ks in metadata.key_schema { if val, found := key_item[ks.attribute_name]; found { existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) } } } defer item_destroy(&existing_item) // Apply update plan if !execute_update_plan(&existing_item, plan) { return .Invalid_Key } // Encode updated item encoded_item, encode_ok := encode(existing_item) if !encode_ok { return .Serialization_Error } defer delete(encoded_item) // Add base item to batch rocksdb.batch_put(batch, storage_key, encoded_item) // Add old GSI deletions to batch if old, has_old := old_item_pre.?; has_old { gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata) if gsi_del_err != .None { return gsi_del_err } } // Add new GSI writes to batch gsi_write_err := gsi_batch_write_entries(batch, table_name, existing_item, metadata) if gsi_write_err != .None { return gsi_write_err } return .None } // ============================================================================ // Internal storage operations that skip lock acquisition // (Used by transact_write_items which manages its own locking) // ============================================================================ get_item_internal :: proc( engine: ^Storage_Engine, table_name: string, key: Item, metadata: ^Table_Metadata, ) -> (Maybe(Item), Storage_Error) { key_struct, key_ok := key_from_item(key, metadata.key_schema) if !key_ok { return nil, .Missing_Key_Attribute } defer key_destroy(&key_struct) key_values, kv_ok := key_get_values(&key_struct) if !kv_ok { return nil, .Invalid_Key } storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) value, get_err := rocksdb.db_get(&engine.db, storage_key) if get_err == .NotFound { return nil, .None } if get_err != .None { return nil, .RocksDB_Error } defer delete(value) item, decode_ok := decode(value) if !decode_ok { return nil, .Serialization_Error } return item, .None } // ============================================================================ // TransactGetItems Types // ============================================================================ Transact_Get_Action :: struct { table_name: string, key: Item, projection: Maybe([]string), // Optional ProjectionExpression paths } Transact_Get_Result :: struct { items: []Maybe(Item), // One per action, nil if item not found } transact_get_action_destroy :: proc(action: ^Transact_Get_Action) { item_destroy(&action.key) if proj, has := action.projection.?; has { delete(proj) } } transact_get_result_destroy :: proc(result: ^Transact_Get_Result) { for &maybe_item in result.items { if item, has := maybe_item.?; has { item_copy := item item_destroy(&item_copy) } } delete(result.items) } // ============================================================================ // TransactGetItems — Atomically read up to 100 items // // DynamoDB semantics: // - All reads are performed with a consistent snapshot // - Missing items are returned as nil (no error) // - ProjectionExpression is applied per-item // ============================================================================ transact_get_items :: proc( engine: ^Storage_Engine, actions: []Transact_Get_Action, ) -> (Transact_Get_Result, Transaction_Error) { result: Transact_Get_Result if len(actions) == 0 { return result, .Validation_Error } // Collect unique tables and acquire shared locks in deterministic order table_set := make(map[string]bool, allocator = context.temp_allocator) for action in actions { table_set[action.table_name] = true } table_names := make([dynamic]string, allocator = context.temp_allocator) for name in table_set { append(&table_names, name) } for i := 0; i < len(table_names); i += 1 { for j := i + 1; j < len(table_names); j += 1 { if table_names[j] < table_names[i] { table_names[i], table_names[j] = table_names[j], table_names[i] } } } locks := make([dynamic]^sync.RW_Mutex, allocator = context.temp_allocator) for name in table_names { lock := get_or_create_table_lock(engine, name) sync.rw_mutex_shared_lock(lock) append(&locks, lock) } defer { for i := len(locks) - 1; i >= 0; i -= 1 { sync.rw_mutex_shared_unlock(locks[i]) } } // Cache metadata metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator) defer { for _, meta in metadata_cache { meta_copy := meta table_metadata_destroy(&meta_copy, engine.allocator) } } items := make([]Maybe(Item), len(actions)) for action, idx in actions { // Get metadata (cached) metadata: ^Table_Metadata if cached, found := &metadata_cache[action.table_name]; found { metadata = cached } else { meta, meta_err := get_table_metadata(engine, action.table_name) if meta_err != .None { items[idx] = nil continue } metadata_cache[action.table_name] = meta metadata = &metadata_cache[action.table_name] } // Fetch item item_result, get_err := get_item_internal(engine, action.table_name, action.key, metadata) if get_err != .None { items[idx] = nil continue } // Apply projection if specified if item, has_item := item_result.?; has_item { if proj, has_proj := action.projection.?; has_proj && len(proj) > 0 { projected := apply_projection(item, proj) item_copy := item item_destroy(&item_copy) items[idx] = projected } else { items[idx] = item } } else { items[idx] = nil } } result.items = items return result, .None } // ============================================================================ // Helper: Extract modified attribute paths from an Update_Plan // // Used for UPDATED_NEW / UPDATED_OLD ReturnValues filtering. // DynamoDB only returns the attributes that were actually modified // by the UpdateExpression, not the entire item. // ============================================================================ get_update_plan_modified_paths :: proc(plan: ^Update_Plan) -> []string { paths := make(map[string]bool, allocator = context.temp_allocator) for action in plan.sets { paths[action.path] = true } for action in plan.removes { paths[action.path] = true } for action in plan.adds { paths[action.path] = true } for action in plan.deletes { paths[action.path] = true } result := make([]string, len(paths)) i := 0 for path in paths { result[i] = path i += 1 } return result } // Filter an item to only include the specified attribute paths. // Returns a new deep-copied item containing only matching attributes. filter_item_to_paths :: proc(item: Item, paths: []string) -> Item { result := make(Item) for path in paths { if val, found := item[path]; found { result[strings.clone(path)] = attr_value_deep_copy(val) } } return result }