Files
jormun-db/dynamodb/update_item.odin
2026-02-16 09:26:21 -05:00

161 lines
4.3 KiB
Odin

package dynamodb
import "core:strings"
import "core:sync"
import "../rocksdb"
// UpdateItem — fetch existing item, apply update plan, write back
// Uses EXCLUSIVE lock (write operation)
// ATOMICITY: Uses WriteBatch to ensure base item + all GSI updates are atomic
//
// Returns:
// - old_item: the item BEFORE mutations (if it existed), for ReturnValues
// - new_item: the item AFTER mutations
// - error
update_item :: proc(
engine: ^Storage_Engine,
table_name: string,
key_item: Item,
plan: ^Update_Plan,
) -> (old_item: Maybe(Item), new_item: Maybe(Item), err: Storage_Error) {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_lock(table_lock)
defer sync.rw_mutex_unlock(table_lock)
// Get table metadata
metadata, meta_err := get_table_metadata(engine, table_name)
if meta_err != .None {
return nil, nil, meta_err
}
defer table_metadata_destroy(&metadata, engine.allocator)
// Extract key from the provided key item
key_struct, key_ok := key_from_item(key_item, metadata.key_schema)
if !key_ok {
return nil, nil, .Missing_Key_Attribute
}
defer key_destroy(&key_struct)
// Get key values
key_values, kv_ok := key_get_values(&key_struct)
if !kv_ok {
return nil, nil, .Invalid_Key
}
// Build storage key
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
defer delete(storage_key)
// Fetch existing item (if any)
existing_encoded, get_err := rocksdb.db_get(&engine.db, storage_key)
existing_item: Item
if get_err == .None && existing_encoded != nil {
defer delete(existing_encoded)
decoded, decode_ok := decode(existing_encoded)
if !decode_ok {
return nil, nil, .Serialization_Error
}
existing_item = decoded
// Save old item for ReturnValues (and for GSI cleanup)
old_item = item_deep_copy(existing_item)
} else if get_err == .NotFound || existing_encoded == nil {
// Item doesn't exist yet — start with just the key attributes
existing_item = make(Item)
for ks in metadata.key_schema {
if val, found := key_item[ks.attribute_name]; found {
existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val)
}
}
} else {
return nil, nil, .RocksDB_Error
}
// Apply update plan
if !execute_update_plan(&existing_item, plan) {
item_destroy(&existing_item)
if old, has := old_item.?; has {
old_copy := old
item_destroy(&old_copy)
}
return nil, nil, .Invalid_Key
}
// Validate key attributes are still present and correct type
validation_err := validate_item_key_types(
existing_item, metadata.key_schema, metadata.attribute_definitions,
)
if validation_err != .None {
item_destroy(&existing_item)
if old, has := old_item.?; has {
old_copy := old
item_destroy(&old_copy)
}
return nil, nil, validation_err
}
// Encode updated item
encoded_item, encode_ok := encode(existing_item)
if !encode_ok {
item_destroy(&existing_item)
if old, has := old_item.?; has {
old_copy := old
item_destroy(&old_copy)
}
return nil, nil, .Serialization_Error
}
defer delete(encoded_item)
// --- ATOMIC WRITE BATCH: base item + all GSI updates ---
batch, batch_err := rocksdb.batch_create()
if batch_err != .None {
item_destroy(&existing_item)
if old, has := old_item.?; has {
old_copy := old
item_destroy(&old_copy)
}
return nil, nil, .RocksDB_Error
}
defer rocksdb.batch_destroy(&batch)
// Add base item write to batch
rocksdb.batch_put(&batch, storage_key, encoded_item)
// Add old GSI entry deletions to batch (if item existed before)
if old, has := old_item.?; has {
gsi_del_err := gsi_batch_delete_entries(&batch, table_name, old, &metadata)
if gsi_del_err != .None {
item_destroy(&existing_item)
old_copy := old
item_destroy(&old_copy)
return nil, nil, gsi_del_err
}
}
// Add new GSI entry writes to batch
gsi_write_err := gsi_batch_write_entries(&batch, table_name, existing_item, &metadata)
if gsi_write_err != .None {
item_destroy(&existing_item)
if old, has := old_item.?; has {
old_copy := old
item_destroy(&old_copy)
}
return nil, nil, gsi_write_err
}
// Write batch atomically - ALL or NOTHING
write_err := rocksdb.batch_write(&engine.db, &batch)
if write_err != .None {
item_destroy(&existing_item)
if old, has := old_item.?; has {
old_copy := old
item_destroy(&old_copy)
}
return nil, nil, .RocksDB_Error
}
new_item = existing_item
return old_item, new_item, .None
}