2026-02-16 01:04:52 -05:00
|
|
|
// TransactWriteItems and TransactGetItems storage operations
|
|
|
|
|
//
|
|
|
|
|
// TransactWriteItems: Atomic write of up to 100 items across multiple tables.
|
|
|
|
|
// - Supports Put, Delete, Update, and ConditionCheck actions
|
|
|
|
|
// - ALL actions succeed or ALL fail (all-or-nothing)
|
|
|
|
|
// - ConditionExpressions are evaluated BEFORE any mutations
|
|
|
|
|
// - Uses exclusive locks on all involved tables
|
|
|
|
|
//
|
|
|
|
|
// TransactGetItems: Atomic read of up to 100 items across multiple tables.
|
|
|
|
|
// - Each item specifies TableName + Key + optional ProjectionExpression
|
|
|
|
|
// - All reads are consistent (snapshot isolation via table locks)
|
|
|
|
|
package dynamodb
|
|
|
|
|
|
|
|
|
|
import "core:strings"
|
|
|
|
|
import "core:sync"
|
|
|
|
|
import "../rocksdb"
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
// TransactWriteItems Types
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
Transact_Write_Action_Type :: enum {
|
|
|
|
|
Put,
|
|
|
|
|
Delete,
|
|
|
|
|
Update,
|
|
|
|
|
Condition_Check,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Transact_Write_Action :: struct {
|
|
|
|
|
type: Transact_Write_Action_Type,
|
|
|
|
|
table_name: string,
|
|
|
|
|
// For Put: the full item to write
|
|
|
|
|
item: Maybe(Item),
|
|
|
|
|
// For Delete/Update/ConditionCheck: the key item
|
|
|
|
|
key: Maybe(Item),
|
|
|
|
|
// For Update: the parsed update plan
|
|
|
|
|
update_plan: Maybe(Update_Plan),
|
|
|
|
|
// ConditionExpression components (shared across all action types)
|
|
|
|
|
condition_expr: Maybe(string),
|
|
|
|
|
expr_attr_names: Maybe(map[string]string),
|
|
|
|
|
expr_attr_values: map[string]Attribute_Value,
|
|
|
|
|
// For Update: ReturnValuesOnConditionCheckFailure (not implemented yet, placeholder)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Transact_Write_Result :: struct {
|
|
|
|
|
// For now, either all succeed (no error) or we return a
|
|
|
|
|
// TransactionCanceledException with reasons per action.
|
|
|
|
|
cancellation_reasons: []Cancellation_Reason,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Cancellation_Reason :: struct {
|
|
|
|
|
code: string, // "None", "ConditionalCheckFailed", "ValidationError", etc.
|
|
|
|
|
message: string,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
transact_write_action_destroy :: proc(action: ^Transact_Write_Action) {
|
|
|
|
|
if item, has := action.item.?; has {
|
|
|
|
|
item_copy := item
|
|
|
|
|
item_destroy(&item_copy)
|
|
|
|
|
}
|
|
|
|
|
if key, has := action.key.?; has {
|
|
|
|
|
key_copy := key
|
|
|
|
|
item_destroy(&key_copy)
|
|
|
|
|
}
|
|
|
|
|
if plan, has := action.update_plan.?; has {
|
|
|
|
|
plan_copy := plan
|
|
|
|
|
update_plan_destroy(&plan_copy)
|
|
|
|
|
}
|
|
|
|
|
if names, has := action.expr_attr_names.?; has {
|
|
|
|
|
for k, v in names {
|
|
|
|
|
delete(k)
|
|
|
|
|
delete(v)
|
|
|
|
|
}
|
|
|
|
|
names_copy := names
|
|
|
|
|
delete(names_copy)
|
|
|
|
|
}
|
|
|
|
|
for k, v in action.expr_attr_values {
|
|
|
|
|
delete(k)
|
|
|
|
|
v_copy := v
|
|
|
|
|
attr_value_destroy(&v_copy)
|
|
|
|
|
}
|
|
|
|
|
delete(action.expr_attr_values)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
transact_write_result_destroy :: proc(result: ^Transact_Write_Result) {
|
|
|
|
|
if result.cancellation_reasons != nil {
|
|
|
|
|
delete(result.cancellation_reasons)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
// TransactWriteItems — Execute an atomic batch of write operations
|
|
|
|
|
//
|
|
|
|
|
// DynamoDB semantics:
|
|
|
|
|
// 1. Acquire exclusive locks on all involved tables
|
|
|
|
|
// 2. Evaluate ALL ConditionExpressions (pre-flight check)
|
|
|
|
|
// 3. If any condition fails → cancel entire transaction
|
|
|
|
|
// 4. If all pass → apply all mutations
|
|
|
|
|
// 5. Release locks
|
|
|
|
|
//
|
|
|
|
|
// Returns .None on success, Transaction_Cancelled on condition failure.
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
Transaction_Error :: enum {
|
|
|
|
|
None,
|
|
|
|
|
Cancelled, // One or more conditions failed
|
|
|
|
|
Validation_Error, // Bad request data
|
|
|
|
|
Internal_Error, // Storage/serialization failure
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
transact_write_items :: proc(
|
|
|
|
|
engine: ^Storage_Engine,
|
|
|
|
|
actions: []Transact_Write_Action,
|
|
|
|
|
) -> (Transact_Write_Result, Transaction_Error) {
|
|
|
|
|
result: Transact_Write_Result
|
|
|
|
|
|
|
|
|
|
if len(actions) == 0 {
|
|
|
|
|
return result, .Validation_Error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---- Step 1: Collect unique table names and acquire locks ----
|
|
|
|
|
table_set := make(map[string]bool, allocator = context.temp_allocator)
|
|
|
|
|
for action in actions {
|
|
|
|
|
table_set[action.table_name] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table_names := make([dynamic]string, allocator = context.temp_allocator)
|
|
|
|
|
for name in table_set {
|
|
|
|
|
append(&table_names, name)
|
|
|
|
|
}
|
2026-02-16 09:33:01 -05:00
|
|
|
// Sort for deterministic lock ordering
|
2026-02-16 01:04:52 -05:00
|
|
|
for i := 0; i < len(table_names); i += 1 {
|
|
|
|
|
for j := i + 1; j < len(table_names); j += 1 {
|
|
|
|
|
if table_names[j] < table_names[i] {
|
|
|
|
|
table_names[i], table_names[j] = table_names[j], table_names[i]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
locks := make([dynamic]^sync.RW_Mutex, allocator = context.temp_allocator)
|
|
|
|
|
for name in table_names {
|
|
|
|
|
lock := get_or_create_table_lock(engine, name)
|
|
|
|
|
sync.rw_mutex_lock(lock)
|
|
|
|
|
append(&locks, lock)
|
|
|
|
|
}
|
|
|
|
|
defer {
|
|
|
|
|
for i := len(locks) - 1; i >= 0; i -= 1 {
|
|
|
|
|
sync.rw_mutex_unlock(locks[i])
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// ---- Step 2: Fetch metadata and evaluate conditions ----
|
2026-02-16 01:04:52 -05:00
|
|
|
reasons := make([]Cancellation_Reason, len(actions))
|
|
|
|
|
any_failed := false
|
|
|
|
|
|
|
|
|
|
metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator)
|
|
|
|
|
defer {
|
|
|
|
|
for _, meta in metadata_cache {
|
|
|
|
|
meta_copy := meta
|
|
|
|
|
table_metadata_destroy(&meta_copy, engine.allocator)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for action, idx in actions {
|
|
|
|
|
metadata: ^Table_Metadata
|
|
|
|
|
if cached, found := &metadata_cache[action.table_name]; found {
|
|
|
|
|
metadata = cached
|
|
|
|
|
} else {
|
|
|
|
|
meta, meta_err := get_table_metadata(engine, action.table_name)
|
|
|
|
|
if meta_err != .None {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{
|
|
|
|
|
code = "ValidationError",
|
|
|
|
|
message = "Table not found",
|
|
|
|
|
}
|
|
|
|
|
any_failed = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
metadata_cache[action.table_name] = meta
|
|
|
|
|
metadata = &metadata_cache[action.table_name]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
key_item: Item
|
|
|
|
|
switch action.type {
|
|
|
|
|
case .Put:
|
|
|
|
|
if item, has := action.item.?; has {
|
2026-02-16 09:33:01 -05:00
|
|
|
key_item = item
|
2026-02-16 01:04:52 -05:00
|
|
|
} else {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{
|
|
|
|
|
code = "ValidationError",
|
|
|
|
|
message = "Put action missing Item",
|
|
|
|
|
}
|
|
|
|
|
any_failed = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
case .Delete, .Update, .Condition_Check:
|
|
|
|
|
if key, has := action.key.?; has {
|
|
|
|
|
key_item = key
|
|
|
|
|
} else {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{
|
|
|
|
|
code = "ValidationError",
|
|
|
|
|
message = "Action missing Key",
|
|
|
|
|
}
|
|
|
|
|
any_failed = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Evaluate ConditionExpression
|
2026-02-16 01:04:52 -05:00
|
|
|
if cond_str, has_cond := action.condition_expr.?; has_cond {
|
|
|
|
|
existing_item, get_err := get_item_internal(engine, action.table_name, key_item, metadata)
|
|
|
|
|
if get_err != .None && get_err != .Item_Not_Found {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{
|
|
|
|
|
code = "InternalError",
|
|
|
|
|
message = "Failed to read existing item",
|
|
|
|
|
}
|
|
|
|
|
any_failed = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
defer {
|
|
|
|
|
if ex, has_ex := existing_item.?; has_ex {
|
|
|
|
|
ex_copy := ex
|
|
|
|
|
item_destroy(&ex_copy)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
filter_node, parse_ok := parse_filter_expression(
|
|
|
|
|
cond_str, action.expr_attr_names, action.expr_attr_values,
|
|
|
|
|
)
|
|
|
|
|
if !parse_ok || filter_node == nil {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{
|
|
|
|
|
code = "ValidationError",
|
|
|
|
|
message = "Invalid ConditionExpression",
|
|
|
|
|
}
|
|
|
|
|
any_failed = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
defer {
|
|
|
|
|
filter_node_destroy(filter_node)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
eval_item: Item
|
|
|
|
|
if item, has_item := existing_item.?; has_item {
|
|
|
|
|
eval_item = item
|
|
|
|
|
} else {
|
|
|
|
|
eval_item = Item{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !evaluate_filter(eval_item, filter_node) {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{
|
|
|
|
|
code = "ConditionalCheckFailed",
|
|
|
|
|
message = "The conditional request failed",
|
|
|
|
|
}
|
|
|
|
|
any_failed = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if action.type == .Condition_Check {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{code = "None"}
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Validate key/item
|
2026-02-16 01:04:52 -05:00
|
|
|
switch action.type {
|
|
|
|
|
case .Put:
|
|
|
|
|
if item, has := action.item.?; has {
|
|
|
|
|
validation_err := validate_item_key_types(
|
|
|
|
|
item, metadata.key_schema, metadata.attribute_definitions,
|
|
|
|
|
)
|
|
|
|
|
if validation_err != .None {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{
|
|
|
|
|
code = "ValidationError",
|
|
|
|
|
message = "Key attribute type mismatch",
|
|
|
|
|
}
|
|
|
|
|
any_failed = true
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
case .Delete, .Update:
|
2026-02-16 09:33:01 -05:00
|
|
|
// Key validation happens during batch building
|
2026-02-16 01:04:52 -05:00
|
|
|
case .Condition_Check:
|
2026-02-16 09:33:01 -05:00
|
|
|
// Already handled
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
reasons[idx] = Cancellation_Reason{code = "None"}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if any_failed {
|
|
|
|
|
result.cancellation_reasons = reasons
|
|
|
|
|
return result, .Cancelled
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// ---- Step 3: Build atomic WriteBatch with all operations ----
|
|
|
|
|
batch, batch_err := rocksdb.batch_create()
|
|
|
|
|
if batch_err != .None {
|
|
|
|
|
result.cancellation_reasons = reasons
|
|
|
|
|
return result, .Internal_Error
|
|
|
|
|
}
|
|
|
|
|
defer rocksdb.batch_destroy(&batch)
|
|
|
|
|
|
|
|
|
|
// Read old items for GSI cleanup (must happen before batch write)
|
|
|
|
|
old_items := make([]Maybe(Item), len(actions), allocator = context.temp_allocator)
|
|
|
|
|
defer {
|
|
|
|
|
for old_item in old_items {
|
|
|
|
|
if old, has := old_item.?; has {
|
|
|
|
|
old_copy := old
|
|
|
|
|
item_destroy(&old_copy)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for action, idx in actions {
|
|
|
|
|
if action.type == .Condition_Check {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metadata := &metadata_cache[action.table_name]
|
|
|
|
|
|
|
|
|
|
// Read old item if needed for GSI cleanup
|
|
|
|
|
key_item: Item
|
|
|
|
|
#partial switch action.type {
|
|
|
|
|
case .Put:
|
|
|
|
|
if item, has := action.item.?; has {
|
|
|
|
|
key_item = item
|
|
|
|
|
}
|
|
|
|
|
case .Delete, .Update:
|
|
|
|
|
if key, has := action.key.?; has {
|
|
|
|
|
key_item = key
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
existing, _ := get_item_internal(engine, action.table_name, key_item, metadata)
|
|
|
|
|
old_items[idx] = existing
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add all operations to batch
|
2026-02-16 01:04:52 -05:00
|
|
|
for &action, idx in actions {
|
2026-02-16 09:33:01 -05:00
|
|
|
if action.type == .Condition_Check {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 01:04:52 -05:00
|
|
|
metadata := &metadata_cache[action.table_name]
|
2026-02-16 09:33:01 -05:00
|
|
|
old_item := old_items[idx]
|
2026-02-16 01:04:52 -05:00
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
apply_err := transact_apply_action_batch(&batch, engine, &action, metadata, old_item)
|
2026-02-16 01:04:52 -05:00
|
|
|
if apply_err != .None {
|
|
|
|
|
reasons[idx] = Cancellation_Reason{
|
|
|
|
|
code = "InternalError",
|
2026-02-16 09:33:01 -05:00
|
|
|
message = "Failed to build mutation",
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
result.cancellation_reasons = reasons
|
|
|
|
|
return result, .Internal_Error
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// ---- Step 4: Write batch atomically (ALL or NOTHING) ----
|
|
|
|
|
write_err := rocksdb.batch_write(&engine.db, &batch)
|
|
|
|
|
if write_err != .None {
|
|
|
|
|
result.cancellation_reasons = reasons
|
|
|
|
|
return result, .Internal_Error
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 01:04:52 -05:00
|
|
|
delete(reasons)
|
|
|
|
|
return result, .None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Apply a single transact write action (called after all conditions have passed)
|
|
|
|
|
@(private = "file")
|
2026-02-16 09:33:01 -05:00
|
|
|
transact_apply_action_batch :: proc(
|
|
|
|
|
batch: ^rocksdb.WriteBatch,
|
2026-02-16 01:04:52 -05:00
|
|
|
engine: ^Storage_Engine,
|
|
|
|
|
action: ^Transact_Write_Action,
|
|
|
|
|
metadata: ^Table_Metadata,
|
2026-02-16 09:33:01 -05:00
|
|
|
old_item: Maybe(Item),
|
2026-02-16 01:04:52 -05:00
|
|
|
) -> Storage_Error {
|
|
|
|
|
switch action.type {
|
|
|
|
|
case .Put:
|
|
|
|
|
if item, has := action.item.?; has {
|
2026-02-16 09:33:01 -05:00
|
|
|
return put_item_batch(batch, engine, action.table_name, item, metadata, old_item)
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
return .Invalid_Key
|
|
|
|
|
|
|
|
|
|
case .Delete:
|
|
|
|
|
if key, has := action.key.?; has {
|
2026-02-16 09:33:01 -05:00
|
|
|
return delete_item_batch(batch, engine, action.table_name, key, metadata, old_item)
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
return .Invalid_Key
|
|
|
|
|
|
|
|
|
|
case .Update:
|
|
|
|
|
if key, has := action.key.?; has {
|
|
|
|
|
if plan, has_plan := action.update_plan.?; has_plan {
|
|
|
|
|
plan_copy := plan
|
2026-02-16 09:33:01 -05:00
|
|
|
return update_item_batch(batch, engine, action.table_name, key, &plan_copy, metadata, old_item)
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
return .Invalid_Key
|
|
|
|
|
}
|
|
|
|
|
return .Invalid_Key
|
|
|
|
|
|
|
|
|
|
case .Condition_Check:
|
2026-02-16 09:33:01 -05:00
|
|
|
return .None
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
return .None
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
@(private = "file")
|
|
|
|
|
put_item_batch :: proc(
|
|
|
|
|
batch: ^rocksdb.WriteBatch,
|
2026-02-16 01:04:52 -05:00
|
|
|
engine: ^Storage_Engine,
|
|
|
|
|
table_name: string,
|
|
|
|
|
item: Item,
|
|
|
|
|
metadata: ^Table_Metadata,
|
2026-02-16 09:33:01 -05:00
|
|
|
old_item: Maybe(Item),
|
2026-02-16 01:04:52 -05:00
|
|
|
) -> Storage_Error {
|
|
|
|
|
key_struct, key_ok := key_from_item(item, metadata.key_schema)
|
|
|
|
|
if !key_ok {
|
|
|
|
|
return .Missing_Key_Attribute
|
|
|
|
|
}
|
|
|
|
|
defer key_destroy(&key_struct)
|
|
|
|
|
|
|
|
|
|
key_values, kv_ok := key_get_values(&key_struct)
|
|
|
|
|
if !kv_ok {
|
|
|
|
|
return .Invalid_Key
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
|
|
|
|
defer delete(storage_key)
|
|
|
|
|
|
|
|
|
|
encoded_item, encode_ok := encode(item)
|
|
|
|
|
if !encode_ok {
|
|
|
|
|
return .Serialization_Error
|
|
|
|
|
}
|
|
|
|
|
defer delete(encoded_item)
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Add base item to batch
|
|
|
|
|
rocksdb.batch_put(batch, storage_key, encoded_item)
|
|
|
|
|
|
|
|
|
|
// Add old GSI deletions to batch
|
|
|
|
|
if old, has_old := old_item.?; has_old {
|
|
|
|
|
gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata)
|
|
|
|
|
if gsi_del_err != .None {
|
|
|
|
|
return gsi_del_err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add new GSI writes to batch
|
|
|
|
|
gsi_write_err := gsi_batch_write_entries(batch, table_name, item, metadata)
|
|
|
|
|
if gsi_write_err != .None {
|
|
|
|
|
return gsi_write_err
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return .None
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Add delete operation to batch (with GSI cleanup)
|
|
|
|
|
@(private = "file")
|
|
|
|
|
delete_item_batch :: proc(
|
|
|
|
|
batch: ^rocksdb.WriteBatch,
|
2026-02-16 01:04:52 -05:00
|
|
|
engine: ^Storage_Engine,
|
|
|
|
|
table_name: string,
|
|
|
|
|
key: Item,
|
|
|
|
|
metadata: ^Table_Metadata,
|
2026-02-16 09:33:01 -05:00
|
|
|
old_item: Maybe(Item),
|
2026-02-16 01:04:52 -05:00
|
|
|
) -> Storage_Error {
|
|
|
|
|
key_struct, key_ok := key_from_item(key, metadata.key_schema)
|
|
|
|
|
if !key_ok {
|
|
|
|
|
return .Missing_Key_Attribute
|
|
|
|
|
}
|
|
|
|
|
defer key_destroy(&key_struct)
|
|
|
|
|
|
|
|
|
|
key_values, kv_ok := key_get_values(&key_struct)
|
|
|
|
|
if !kv_ok {
|
|
|
|
|
return .Invalid_Key
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
|
|
|
|
defer delete(storage_key)
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Add base item delete to batch
|
|
|
|
|
rocksdb.batch_delete(batch, storage_key)
|
|
|
|
|
|
|
|
|
|
// Add GSI deletions to batch
|
|
|
|
|
if old, has_old := old_item.?; has_old {
|
|
|
|
|
gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata)
|
|
|
|
|
if gsi_del_err != .None {
|
|
|
|
|
return gsi_del_err
|
|
|
|
|
}
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return .None
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Add update operation to batch (with GSI maintenance)
|
|
|
|
|
@(private = "file")
|
|
|
|
|
update_item_batch :: proc(
|
|
|
|
|
batch: ^rocksdb.WriteBatch,
|
2026-02-16 01:04:52 -05:00
|
|
|
engine: ^Storage_Engine,
|
|
|
|
|
table_name: string,
|
|
|
|
|
key_item: Item,
|
|
|
|
|
plan: ^Update_Plan,
|
|
|
|
|
metadata: ^Table_Metadata,
|
2026-02-16 09:33:01 -05:00
|
|
|
old_item_pre: Maybe(Item),
|
|
|
|
|
) -> Storage_Error {
|
2026-02-16 01:04:52 -05:00
|
|
|
key_struct, key_ok := key_from_item(key_item, metadata.key_schema)
|
|
|
|
|
if !key_ok {
|
2026-02-16 09:33:01 -05:00
|
|
|
return .Missing_Key_Attribute
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
defer key_destroy(&key_struct)
|
|
|
|
|
|
|
|
|
|
key_values, kv_ok := key_get_values(&key_struct)
|
|
|
|
|
if !kv_ok {
|
2026-02-16 09:33:01 -05:00
|
|
|
return .Invalid_Key
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
|
|
|
|
defer delete(storage_key)
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Start with existing item or create new
|
2026-02-16 01:04:52 -05:00
|
|
|
existing_item: Item
|
2026-02-16 09:33:01 -05:00
|
|
|
if old, has_old := old_item_pre.?; has_old {
|
|
|
|
|
existing_item = item_deep_copy(old)
|
|
|
|
|
} else {
|
2026-02-16 01:04:52 -05:00
|
|
|
existing_item = make(Item)
|
|
|
|
|
for ks in metadata.key_schema {
|
|
|
|
|
if val, found := key_item[ks.attribute_name]; found {
|
|
|
|
|
existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-02-16 09:33:01 -05:00
|
|
|
defer item_destroy(&existing_item)
|
2026-02-16 01:04:52 -05:00
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Apply update plan
|
2026-02-16 01:04:52 -05:00
|
|
|
if !execute_update_plan(&existing_item, plan) {
|
2026-02-16 09:33:01 -05:00
|
|
|
return .Invalid_Key
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Encode updated item
|
2026-02-16 01:04:52 -05:00
|
|
|
encoded_item, encode_ok := encode(existing_item)
|
|
|
|
|
if !encode_ok {
|
2026-02-16 09:33:01 -05:00
|
|
|
return .Serialization_Error
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
defer delete(encoded_item)
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Add base item to batch
|
|
|
|
|
rocksdb.batch_put(batch, storage_key, encoded_item)
|
|
|
|
|
|
|
|
|
|
// Add old GSI deletions to batch
|
|
|
|
|
if old, has_old := old_item_pre.?; has_old {
|
|
|
|
|
gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata)
|
|
|
|
|
if gsi_del_err != .None {
|
|
|
|
|
return gsi_del_err
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-16 09:33:01 -05:00
|
|
|
// Add new GSI writes to batch
|
|
|
|
|
gsi_write_err := gsi_batch_write_entries(batch, table_name, existing_item, metadata)
|
|
|
|
|
if gsi_write_err != .None {
|
|
|
|
|
return gsi_write_err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return .None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
// Internal storage operations that skip lock acquisition
|
|
|
|
|
// (Used by transact_write_items which manages its own locking)
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
get_item_internal :: proc(
|
|
|
|
|
engine: ^Storage_Engine,
|
|
|
|
|
table_name: string,
|
|
|
|
|
key: Item,
|
|
|
|
|
metadata: ^Table_Metadata,
|
|
|
|
|
) -> (Maybe(Item), Storage_Error) {
|
|
|
|
|
key_struct, key_ok := key_from_item(key, metadata.key_schema)
|
|
|
|
|
if !key_ok {
|
|
|
|
|
return nil, .Missing_Key_Attribute
|
|
|
|
|
}
|
|
|
|
|
defer key_destroy(&key_struct)
|
|
|
|
|
|
|
|
|
|
key_values, kv_ok := key_get_values(&key_struct)
|
|
|
|
|
if !kv_ok {
|
|
|
|
|
return nil, .Invalid_Key
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
|
|
|
|
defer delete(storage_key)
|
|
|
|
|
|
|
|
|
|
value, get_err := rocksdb.db_get(&engine.db, storage_key)
|
|
|
|
|
if get_err == .NotFound {
|
|
|
|
|
return nil, .None
|
|
|
|
|
}
|
|
|
|
|
if get_err != .None {
|
|
|
|
|
return nil, .RocksDB_Error
|
|
|
|
|
}
|
|
|
|
|
defer delete(value)
|
|
|
|
|
|
|
|
|
|
item, decode_ok := decode(value)
|
|
|
|
|
if !decode_ok {
|
|
|
|
|
return nil, .Serialization_Error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return item, .None
|
2026-02-16 01:04:52 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
// TransactGetItems Types
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
Transact_Get_Action :: struct {
|
|
|
|
|
table_name: string,
|
|
|
|
|
key: Item,
|
|
|
|
|
projection: Maybe([]string), // Optional ProjectionExpression paths
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Transact_Get_Result :: struct {
|
|
|
|
|
items: []Maybe(Item), // One per action, nil if item not found
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
transact_get_action_destroy :: proc(action: ^Transact_Get_Action) {
|
|
|
|
|
item_destroy(&action.key)
|
|
|
|
|
if proj, has := action.projection.?; has {
|
|
|
|
|
delete(proj)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
transact_get_result_destroy :: proc(result: ^Transact_Get_Result) {
|
|
|
|
|
for &maybe_item in result.items {
|
|
|
|
|
if item, has := maybe_item.?; has {
|
|
|
|
|
item_copy := item
|
|
|
|
|
item_destroy(&item_copy)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
delete(result.items)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
// TransactGetItems — Atomically read up to 100 items
|
|
|
|
|
//
|
|
|
|
|
// DynamoDB semantics:
|
|
|
|
|
// - All reads are performed with a consistent snapshot
|
|
|
|
|
// - Missing items are returned as nil (no error)
|
|
|
|
|
// - ProjectionExpression is applied per-item
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
transact_get_items :: proc(
|
|
|
|
|
engine: ^Storage_Engine,
|
|
|
|
|
actions: []Transact_Get_Action,
|
|
|
|
|
) -> (Transact_Get_Result, Transaction_Error) {
|
|
|
|
|
result: Transact_Get_Result
|
|
|
|
|
|
|
|
|
|
if len(actions) == 0 {
|
|
|
|
|
return result, .Validation_Error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Collect unique tables and acquire shared locks in deterministic order
|
|
|
|
|
table_set := make(map[string]bool, allocator = context.temp_allocator)
|
|
|
|
|
for action in actions {
|
|
|
|
|
table_set[action.table_name] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
table_names := make([dynamic]string, allocator = context.temp_allocator)
|
|
|
|
|
for name in table_set {
|
|
|
|
|
append(&table_names, name)
|
|
|
|
|
}
|
|
|
|
|
for i := 0; i < len(table_names); i += 1 {
|
|
|
|
|
for j := i + 1; j < len(table_names); j += 1 {
|
|
|
|
|
if table_names[j] < table_names[i] {
|
|
|
|
|
table_names[i], table_names[j] = table_names[j], table_names[i]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
locks := make([dynamic]^sync.RW_Mutex, allocator = context.temp_allocator)
|
|
|
|
|
for name in table_names {
|
|
|
|
|
lock := get_or_create_table_lock(engine, name)
|
|
|
|
|
sync.rw_mutex_shared_lock(lock)
|
|
|
|
|
append(&locks, lock)
|
|
|
|
|
}
|
|
|
|
|
defer {
|
|
|
|
|
for i := len(locks) - 1; i >= 0; i -= 1 {
|
|
|
|
|
sync.rw_mutex_shared_unlock(locks[i])
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Cache metadata
|
|
|
|
|
metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator)
|
|
|
|
|
defer {
|
|
|
|
|
for _, meta in metadata_cache {
|
|
|
|
|
meta_copy := meta
|
|
|
|
|
table_metadata_destroy(&meta_copy, engine.allocator)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
items := make([]Maybe(Item), len(actions))
|
|
|
|
|
|
|
|
|
|
for action, idx in actions {
|
|
|
|
|
// Get metadata (cached)
|
|
|
|
|
metadata: ^Table_Metadata
|
|
|
|
|
if cached, found := &metadata_cache[action.table_name]; found {
|
|
|
|
|
metadata = cached
|
|
|
|
|
} else {
|
|
|
|
|
meta, meta_err := get_table_metadata(engine, action.table_name)
|
|
|
|
|
if meta_err != .None {
|
|
|
|
|
items[idx] = nil
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
metadata_cache[action.table_name] = meta
|
|
|
|
|
metadata = &metadata_cache[action.table_name]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Fetch item
|
|
|
|
|
item_result, get_err := get_item_internal(engine, action.table_name, action.key, metadata)
|
|
|
|
|
if get_err != .None {
|
|
|
|
|
items[idx] = nil
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Apply projection if specified
|
|
|
|
|
if item, has_item := item_result.?; has_item {
|
|
|
|
|
if proj, has_proj := action.projection.?; has_proj && len(proj) > 0 {
|
|
|
|
|
projected := apply_projection(item, proj)
|
|
|
|
|
item_copy := item
|
|
|
|
|
item_destroy(&item_copy)
|
|
|
|
|
items[idx] = projected
|
|
|
|
|
} else {
|
|
|
|
|
items[idx] = item
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
items[idx] = nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result.items = items
|
|
|
|
|
return result, .None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
// Helper: Extract modified attribute paths from an Update_Plan
|
|
|
|
|
//
|
|
|
|
|
// Used for UPDATED_NEW / UPDATED_OLD ReturnValues filtering.
|
|
|
|
|
// DynamoDB only returns the attributes that were actually modified
|
|
|
|
|
// by the UpdateExpression, not the entire item.
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
get_update_plan_modified_paths :: proc(plan: ^Update_Plan) -> []string {
|
|
|
|
|
paths := make(map[string]bool, allocator = context.temp_allocator)
|
|
|
|
|
|
|
|
|
|
for action in plan.sets {
|
|
|
|
|
paths[action.path] = true
|
|
|
|
|
}
|
|
|
|
|
for action in plan.removes {
|
|
|
|
|
paths[action.path] = true
|
|
|
|
|
}
|
|
|
|
|
for action in plan.adds {
|
|
|
|
|
paths[action.path] = true
|
|
|
|
|
}
|
|
|
|
|
for action in plan.deletes {
|
|
|
|
|
paths[action.path] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result := make([]string, len(paths))
|
|
|
|
|
i := 0
|
|
|
|
|
for path in paths {
|
|
|
|
|
result[i] = path
|
|
|
|
|
i += 1
|
|
|
|
|
}
|
|
|
|
|
return result
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Filter an item to only include the specified attribute paths.
|
|
|
|
|
// Returns a new deep-copied item containing only matching attributes.
|
|
|
|
|
filter_item_to_paths :: proc(item: Item, paths: []string) -> Item {
|
|
|
|
|
result := make(Item)
|
|
|
|
|
for path in paths {
|
|
|
|
|
if val, found := item[path]; found {
|
|
|
|
|
result[strings.clone(path)] = attr_value_deep_copy(val)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return result
|
|
|
|
|
}
|