package dynamodb import "core:strings" import "core:sync" import "../rocksdb" // UpdateItem — fetch existing item, apply update plan, write back // Uses EXCLUSIVE lock (write operation) // ATOMICITY: Uses WriteBatch to ensure base item + all GSI updates are atomic // // Returns: // - old_item: the item BEFORE mutations (if it existed), for ReturnValues // - new_item: the item AFTER mutations // - error update_item :: proc( engine: ^Storage_Engine, table_name: string, key_item: Item, plan: ^Update_Plan, ) -> (old_item: Maybe(Item), new_item: Maybe(Item), err: Storage_Error) { table_lock := get_or_create_table_lock(engine, table_name) sync.rw_mutex_lock(table_lock) defer sync.rw_mutex_unlock(table_lock) // Get table metadata metadata, meta_err := get_table_metadata(engine, table_name) if meta_err != .None { return nil, nil, meta_err } defer table_metadata_destroy(&metadata, engine.allocator) // Extract key from the provided key item key_struct, key_ok := key_from_item(key_item, metadata.key_schema) if !key_ok { return nil, nil, .Missing_Key_Attribute } defer key_destroy(&key_struct) // Get key values key_values, kv_ok := key_get_values(&key_struct) if !kv_ok { return nil, nil, .Invalid_Key } // Build storage key storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) // Fetch existing item (if any) existing_encoded, get_err := rocksdb.db_get(&engine.db, storage_key) existing_item: Item if get_err == .None && existing_encoded != nil { defer delete(existing_encoded) decoded, decode_ok := decode(existing_encoded) if !decode_ok { return nil, nil, .Serialization_Error } existing_item = decoded // Save old item for ReturnValues (and for GSI cleanup) old_item = item_deep_copy(existing_item) } else if get_err == .NotFound || existing_encoded == nil { // Item doesn't exist yet — start with just the key attributes existing_item = make(Item) for ks in metadata.key_schema { if val, found := key_item[ks.attribute_name]; found { existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) } } } else { return nil, nil, .RocksDB_Error } // Apply update plan. if exec_err := execute_update_plan(&existing_item, plan); exec_err != .None { item_destroy(&existing_item) if old, has := old_item.?; has { old_copy := old item_destroy(&old_copy) } return nil, nil, .Validation_Error } // Validate key attributes are still present and correct type validation_err := validate_item_key_types( existing_item, metadata.key_schema, metadata.attribute_definitions, ) if validation_err != .None { item_destroy(&existing_item) if old, has := old_item.?; has { old_copy := old item_destroy(&old_copy) } return nil, nil, validation_err } // Encode updated item encoded_item, encode_ok := encode(existing_item) if !encode_ok { item_destroy(&existing_item) if old, has := old_item.?; has { old_copy := old item_destroy(&old_copy) } return nil, nil, .Serialization_Error } defer delete(encoded_item) // --- ATOMIC WRITE BATCH: base item + all GSI updates --- batch, batch_err := rocksdb.batch_create() if batch_err != .None { item_destroy(&existing_item) if old, has := old_item.?; has { old_copy := old item_destroy(&old_copy) } return nil, nil, .RocksDB_Error } defer rocksdb.batch_destroy(&batch) // Add base item write to batch rocksdb.batch_put(&batch, storage_key, encoded_item) // Add old GSI entry deletions to batch (if item existed before) if old, has := old_item.?; has { gsi_del_err := gsi_batch_delete_entries(&batch, table_name, old, &metadata) if gsi_del_err != .None { item_destroy(&existing_item) old_copy := old item_destroy(&old_copy) return nil, nil, gsi_del_err } } // Add new GSI entry writes to batch gsi_write_err := gsi_batch_write_entries(&batch, table_name, existing_item, &metadata) if gsi_write_err != .None { item_destroy(&existing_item) if old, has := old_item.?; has { old_copy := old item_destroy(&old_copy) } return nil, nil, gsi_write_err } // Write batch atomically - ALL or NOTHING write_err := rocksdb.batch_write(&engine.db, &batch) if write_err != .None { item_destroy(&existing_item) if old, has := old_item.?; has { old_copy := old item_destroy(&old_copy) } return nil, nil, .RocksDB_Error } new_item = existing_item return old_item, new_item, .None }