just use seperate transaction handlers
This commit is contained in:
11
TODO.md
11
TODO.md
@@ -53,7 +53,8 @@ Goal: "aws cli works reliably for CreateTable/ListTables/PutItem/GetItem/DeleteI
|
||||
### 5) UpdateItem / conditional logic groundwork
|
||||
- [x] `UpdateItem` handler registered in router (currently returns clear "not yet supported" error)
|
||||
- [x] Implement `UpdateItem` (initially minimal: SET for scalar attrs)
|
||||
- [ ] `UpdateItem` needs UPDATED_NEW/UPDATED_OLD response filtering for perfect parity with Dynamo
|
||||
- [x] `UpdateItem` needs UPDATED_NEW/UPDATED_OLD response filtering for perfect parity with Dynamo
|
||||
- **DONE**: `filter_updated_attributes` extracts modified paths from `Update_Plan` and filters the response item to only include those attributes. `get_update_plan_modified_paths` + `filter_item_to_paths` in `transact.odin`.
|
||||
- [x] Add `ConditionExpression` support for Put/Delete/Update (start with simple comparisons)
|
||||
- [x] Define internal "update plan" representation (parsed ops → applied mutations)
|
||||
|
||||
@@ -83,7 +84,13 @@ These align with the "Future Enhancements" list in ARCHITECTURE.md.
|
||||
### 9) Batch + transactions
|
||||
- [x] BatchWriteItem
|
||||
- [x] BatchGetItem
|
||||
- [ ] Transactions (TransactWriteItems / TransactGetItems)
|
||||
- [x] Transactions (TransactWriteItems / TransactGetItems)
|
||||
- **DONE**: `transact.odin` implements all-or-nothing semantics:
|
||||
- Put, Delete, Update, ConditionCheck action types
|
||||
- Pre-flight condition evaluation (all conditions checked before any mutation)
|
||||
- Deterministic table lock ordering to prevent deadlocks
|
||||
- `transact_handlers.odin` contains HTTP handlers and JSON parsing
|
||||
- TransactGetItems supports ProjectionExpression per item
|
||||
|
||||
### 10) Performance / ops
|
||||
- [ ] Connection reuse / keep-alive tuning
|
||||
|
||||
719
dynamodb/transact.odin
Normal file
719
dynamodb/transact.odin
Normal file
@@ -0,0 +1,719 @@
|
||||
// TransactWriteItems and TransactGetItems storage operations
|
||||
//
|
||||
// TransactWriteItems: Atomic write of up to 100 items across multiple tables.
|
||||
// - Supports Put, Delete, Update, and ConditionCheck actions
|
||||
// - ALL actions succeed or ALL fail (all-or-nothing)
|
||||
// - ConditionExpressions are evaluated BEFORE any mutations
|
||||
// - Uses exclusive locks on all involved tables
|
||||
//
|
||||
// TransactGetItems: Atomic read of up to 100 items across multiple tables.
|
||||
// - Each item specifies TableName + Key + optional ProjectionExpression
|
||||
// - All reads are consistent (snapshot isolation via table locks)
|
||||
package dynamodb
|
||||
|
||||
import "core:strings"
|
||||
import "core:sync"
|
||||
import "../rocksdb"
|
||||
|
||||
// ============================================================================
|
||||
// TransactWriteItems Types
|
||||
// ============================================================================
|
||||
|
||||
Transact_Write_Action_Type :: enum {
|
||||
Put,
|
||||
Delete,
|
||||
Update,
|
||||
Condition_Check,
|
||||
}
|
||||
|
||||
Transact_Write_Action :: struct {
|
||||
type: Transact_Write_Action_Type,
|
||||
table_name: string,
|
||||
// For Put: the full item to write
|
||||
item: Maybe(Item),
|
||||
// For Delete/Update/ConditionCheck: the key item
|
||||
key: Maybe(Item),
|
||||
// For Update: the parsed update plan
|
||||
update_plan: Maybe(Update_Plan),
|
||||
// ConditionExpression components (shared across all action types)
|
||||
condition_expr: Maybe(string),
|
||||
expr_attr_names: Maybe(map[string]string),
|
||||
expr_attr_values: map[string]Attribute_Value,
|
||||
// For Update: ReturnValuesOnConditionCheckFailure (not implemented yet, placeholder)
|
||||
}
|
||||
|
||||
Transact_Write_Result :: struct {
|
||||
// For now, either all succeed (no error) or we return a
|
||||
// TransactionCanceledException with reasons per action.
|
||||
cancellation_reasons: []Cancellation_Reason,
|
||||
}
|
||||
|
||||
Cancellation_Reason :: struct {
|
||||
code: string, // "None", "ConditionalCheckFailed", "ValidationError", etc.
|
||||
message: string,
|
||||
}
|
||||
|
||||
transact_write_action_destroy :: proc(action: ^Transact_Write_Action) {
|
||||
if item, has := action.item.?; has {
|
||||
item_copy := item
|
||||
item_destroy(&item_copy)
|
||||
}
|
||||
if key, has := action.key.?; has {
|
||||
key_copy := key
|
||||
item_destroy(&key_copy)
|
||||
}
|
||||
if plan, has := action.update_plan.?; has {
|
||||
plan_copy := plan
|
||||
update_plan_destroy(&plan_copy)
|
||||
}
|
||||
if names, has := action.expr_attr_names.?; has {
|
||||
for k, v in names {
|
||||
delete(k)
|
||||
delete(v)
|
||||
}
|
||||
names_copy := names
|
||||
delete(names_copy)
|
||||
}
|
||||
for k, v in action.expr_attr_values {
|
||||
delete(k)
|
||||
v_copy := v
|
||||
attr_value_destroy(&v_copy)
|
||||
}
|
||||
delete(action.expr_attr_values)
|
||||
}
|
||||
|
||||
transact_write_result_destroy :: proc(result: ^Transact_Write_Result) {
|
||||
if result.cancellation_reasons != nil {
|
||||
delete(result.cancellation_reasons)
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// TransactWriteItems — Execute an atomic batch of write operations
|
||||
//
|
||||
// DynamoDB semantics:
|
||||
// 1. Acquire exclusive locks on all involved tables
|
||||
// 2. Evaluate ALL ConditionExpressions (pre-flight check)
|
||||
// 3. If any condition fails → cancel entire transaction
|
||||
// 4. If all pass → apply all mutations
|
||||
// 5. Release locks
|
||||
//
|
||||
// Returns .None on success, Transaction_Cancelled on condition failure.
|
||||
// ============================================================================
|
||||
|
||||
Transaction_Error :: enum {
|
||||
None,
|
||||
Cancelled, // One or more conditions failed
|
||||
Validation_Error, // Bad request data
|
||||
Internal_Error, // Storage/serialization failure
|
||||
}
|
||||
|
||||
transact_write_items :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
actions: []Transact_Write_Action,
|
||||
) -> (Transact_Write_Result, Transaction_Error) {
|
||||
result: Transact_Write_Result
|
||||
|
||||
if len(actions) == 0 {
|
||||
return result, .Validation_Error
|
||||
}
|
||||
|
||||
// ---- Step 1: Collect unique table names and acquire locks ----
|
||||
table_set := make(map[string]bool, allocator = context.temp_allocator)
|
||||
for action in actions {
|
||||
table_set[action.table_name] = true
|
||||
}
|
||||
|
||||
// Acquire exclusive locks on all tables in deterministic order
|
||||
// to prevent deadlocks
|
||||
table_names := make([dynamic]string, allocator = context.temp_allocator)
|
||||
for name in table_set {
|
||||
append(&table_names, name)
|
||||
}
|
||||
// Simple sort for deterministic lock ordering
|
||||
for i := 0; i < len(table_names); i += 1 {
|
||||
for j := i + 1; j < len(table_names); j += 1 {
|
||||
if table_names[j] < table_names[i] {
|
||||
table_names[i], table_names[j] = table_names[j], table_names[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
locks := make([dynamic]^sync.RW_Mutex, allocator = context.temp_allocator)
|
||||
for name in table_names {
|
||||
lock := get_or_create_table_lock(engine, name)
|
||||
sync.rw_mutex_lock(lock)
|
||||
append(&locks, lock)
|
||||
}
|
||||
defer {
|
||||
// Release all locks in reverse order
|
||||
for i := len(locks) - 1; i >= 0; i -= 1 {
|
||||
sync.rw_mutex_unlock(locks[i])
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Step 2: Pre-flight — fetch metadata and existing items, evaluate conditions ----
|
||||
reasons := make([]Cancellation_Reason, len(actions))
|
||||
any_failed := false
|
||||
|
||||
// Cache table metadata to avoid redundant lookups
|
||||
metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator)
|
||||
defer {
|
||||
for _, meta in metadata_cache {
|
||||
meta_copy := meta
|
||||
table_metadata_destroy(&meta_copy, engine.allocator)
|
||||
}
|
||||
}
|
||||
|
||||
for action, idx in actions {
|
||||
// Get table metadata (cached)
|
||||
metadata: ^Table_Metadata
|
||||
if cached, found := &metadata_cache[action.table_name]; found {
|
||||
metadata = cached
|
||||
} else {
|
||||
meta, meta_err := get_table_metadata(engine, action.table_name)
|
||||
if meta_err != .None {
|
||||
reasons[idx] = Cancellation_Reason{
|
||||
code = "ValidationError",
|
||||
message = "Table not found",
|
||||
}
|
||||
any_failed = true
|
||||
continue
|
||||
}
|
||||
metadata_cache[action.table_name] = meta
|
||||
metadata = &metadata_cache[action.table_name]
|
||||
}
|
||||
|
||||
// Determine the key item for this action
|
||||
key_item: Item
|
||||
switch action.type {
|
||||
case .Put:
|
||||
if item, has := action.item.?; has {
|
||||
key_item = item // For Put, key is extracted from the item
|
||||
} else {
|
||||
reasons[idx] = Cancellation_Reason{
|
||||
code = "ValidationError",
|
||||
message = "Put action missing Item",
|
||||
}
|
||||
any_failed = true
|
||||
continue
|
||||
}
|
||||
case .Delete, .Update, .Condition_Check:
|
||||
if key, has := action.key.?; has {
|
||||
key_item = key
|
||||
} else {
|
||||
reasons[idx] = Cancellation_Reason{
|
||||
code = "ValidationError",
|
||||
message = "Action missing Key",
|
||||
}
|
||||
any_failed = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Evaluate ConditionExpression if present
|
||||
if cond_str, has_cond := action.condition_expr.?; has_cond {
|
||||
// Fetch existing item
|
||||
existing_item, get_err := get_item_internal(engine, action.table_name, key_item, metadata)
|
||||
if get_err != .None && get_err != .Item_Not_Found {
|
||||
reasons[idx] = Cancellation_Reason{
|
||||
code = "InternalError",
|
||||
message = "Failed to read existing item",
|
||||
}
|
||||
any_failed = true
|
||||
continue
|
||||
}
|
||||
defer {
|
||||
if ex, has_ex := existing_item.?; has_ex {
|
||||
ex_copy := ex
|
||||
item_destroy(&ex_copy)
|
||||
}
|
||||
}
|
||||
|
||||
// Parse and evaluate condition
|
||||
filter_node, parse_ok := parse_filter_expression(
|
||||
cond_str, action.expr_attr_names, action.expr_attr_values,
|
||||
)
|
||||
if !parse_ok || filter_node == nil {
|
||||
reasons[idx] = Cancellation_Reason{
|
||||
code = "ValidationError",
|
||||
message = "Invalid ConditionExpression",
|
||||
}
|
||||
any_failed = true
|
||||
continue
|
||||
}
|
||||
defer {
|
||||
filter_node_destroy(filter_node)
|
||||
free(filter_node)
|
||||
}
|
||||
|
||||
eval_item: Item
|
||||
if item, has_item := existing_item.?; has_item {
|
||||
eval_item = item
|
||||
} else {
|
||||
eval_item = Item{}
|
||||
}
|
||||
|
||||
if !evaluate_filter(eval_item, filter_node) {
|
||||
reasons[idx] = Cancellation_Reason{
|
||||
code = "ConditionalCheckFailed",
|
||||
message = "The conditional request failed",
|
||||
}
|
||||
any_failed = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// ConditionCheck actions only validate — they don't mutate
|
||||
if action.type == .Condition_Check {
|
||||
reasons[idx] = Cancellation_Reason{code = "None"}
|
||||
continue
|
||||
}
|
||||
|
||||
// Validate key/item against schema
|
||||
switch action.type {
|
||||
case .Put:
|
||||
if item, has := action.item.?; has {
|
||||
validation_err := validate_item_key_types(
|
||||
item, metadata.key_schema, metadata.attribute_definitions,
|
||||
)
|
||||
if validation_err != .None {
|
||||
reasons[idx] = Cancellation_Reason{
|
||||
code = "ValidationError",
|
||||
message = "Key attribute type mismatch",
|
||||
}
|
||||
any_failed = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
case .Delete, .Update:
|
||||
// Key validation happens during execution
|
||||
case .Condition_Check:
|
||||
// Already handled above
|
||||
}
|
||||
|
||||
reasons[idx] = Cancellation_Reason{code = "None"}
|
||||
}
|
||||
|
||||
// ---- Step 3: If any condition failed, return cancellation ----
|
||||
if any_failed {
|
||||
result.cancellation_reasons = reasons
|
||||
return result, .Cancelled
|
||||
}
|
||||
|
||||
// ---- Step 4: Apply all mutations ----
|
||||
for &action, idx in actions {
|
||||
metadata := &metadata_cache[action.table_name]
|
||||
|
||||
apply_err := transact_apply_action(engine, &action, metadata)
|
||||
if apply_err != .None {
|
||||
// This shouldn't happen after pre-validation, but handle gracefully
|
||||
reasons[idx] = Cancellation_Reason{
|
||||
code = "InternalError",
|
||||
message = "Failed to apply mutation",
|
||||
}
|
||||
// In a real impl we'd need to rollback. For now, report the failure.
|
||||
result.cancellation_reasons = reasons
|
||||
return result, .Internal_Error
|
||||
}
|
||||
}
|
||||
|
||||
delete(reasons)
|
||||
return result, .None
|
||||
}
|
||||
|
||||
// Apply a single transact write action (called after all conditions have passed)
|
||||
@(private = "file")
|
||||
transact_apply_action :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
action: ^Transact_Write_Action,
|
||||
metadata: ^Table_Metadata,
|
||||
) -> Storage_Error {
|
||||
switch action.type {
|
||||
case .Put:
|
||||
if item, has := action.item.?; has {
|
||||
return put_item_internal(engine, action.table_name, item, metadata)
|
||||
}
|
||||
return .Invalid_Key
|
||||
|
||||
case .Delete:
|
||||
if key, has := action.key.?; has {
|
||||
return delete_item_internal(engine, action.table_name, key, metadata)
|
||||
}
|
||||
return .Invalid_Key
|
||||
|
||||
case .Update:
|
||||
if key, has := action.key.?; has {
|
||||
if plan, has_plan := action.update_plan.?; has_plan {
|
||||
plan_copy := plan
|
||||
_, _, err := update_item_internal(engine, action.table_name, key, &plan_copy, metadata)
|
||||
return err
|
||||
}
|
||||
return .Invalid_Key
|
||||
}
|
||||
return .Invalid_Key
|
||||
|
||||
case .Condition_Check:
|
||||
return .None // No mutation
|
||||
}
|
||||
return .None
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Internal storage operations that skip lock acquisition
|
||||
// (Used by transact_write_items which manages its own locking)
|
||||
// ============================================================================
|
||||
|
||||
get_item_internal :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
table_name: string,
|
||||
key: Item,
|
||||
metadata: ^Table_Metadata,
|
||||
) -> (Maybe(Item), Storage_Error) {
|
||||
key_struct, key_ok := key_from_item(key, metadata.key_schema)
|
||||
if !key_ok {
|
||||
return nil, .Missing_Key_Attribute
|
||||
}
|
||||
defer key_destroy(&key_struct)
|
||||
|
||||
key_values, kv_ok := key_get_values(&key_struct)
|
||||
if !kv_ok {
|
||||
return nil, .Invalid_Key
|
||||
}
|
||||
|
||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||
defer delete(storage_key)
|
||||
|
||||
value, get_err := rocksdb.db_get(&engine.db, storage_key)
|
||||
if get_err == .NotFound {
|
||||
return nil, .None
|
||||
}
|
||||
if get_err != .None {
|
||||
return nil, .RocksDB_Error
|
||||
}
|
||||
defer delete(value)
|
||||
|
||||
item, decode_ok := decode(value)
|
||||
if !decode_ok {
|
||||
return nil, .Serialization_Error
|
||||
}
|
||||
|
||||
return item, .None
|
||||
}
|
||||
|
||||
put_item_internal :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
table_name: string,
|
||||
item: Item,
|
||||
metadata: ^Table_Metadata,
|
||||
) -> Storage_Error {
|
||||
key_struct, key_ok := key_from_item(item, metadata.key_schema)
|
||||
if !key_ok {
|
||||
return .Missing_Key_Attribute
|
||||
}
|
||||
defer key_destroy(&key_struct)
|
||||
|
||||
key_values, kv_ok := key_get_values(&key_struct)
|
||||
if !kv_ok {
|
||||
return .Invalid_Key
|
||||
}
|
||||
|
||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||
defer delete(storage_key)
|
||||
|
||||
encoded_item, encode_ok := encode(item)
|
||||
if !encode_ok {
|
||||
return .Serialization_Error
|
||||
}
|
||||
defer delete(encoded_item)
|
||||
|
||||
put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item)
|
||||
if put_err != .None {
|
||||
return .RocksDB_Error
|
||||
}
|
||||
|
||||
return .None
|
||||
}
|
||||
|
||||
delete_item_internal :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
table_name: string,
|
||||
key: Item,
|
||||
metadata: ^Table_Metadata,
|
||||
) -> Storage_Error {
|
||||
key_struct, key_ok := key_from_item(key, metadata.key_schema)
|
||||
if !key_ok {
|
||||
return .Missing_Key_Attribute
|
||||
}
|
||||
defer key_destroy(&key_struct)
|
||||
|
||||
key_values, kv_ok := key_get_values(&key_struct)
|
||||
if !kv_ok {
|
||||
return .Invalid_Key
|
||||
}
|
||||
|
||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||
defer delete(storage_key)
|
||||
|
||||
del_err := rocksdb.db_delete(&engine.db, storage_key)
|
||||
if del_err != .None {
|
||||
return .RocksDB_Error
|
||||
}
|
||||
|
||||
return .None
|
||||
}
|
||||
|
||||
update_item_internal :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
table_name: string,
|
||||
key_item: Item,
|
||||
plan: ^Update_Plan,
|
||||
metadata: ^Table_Metadata,
|
||||
) -> (old_item: Maybe(Item), new_item: Maybe(Item), err: Storage_Error) {
|
||||
key_struct, key_ok := key_from_item(key_item, metadata.key_schema)
|
||||
if !key_ok {
|
||||
return nil, nil, .Missing_Key_Attribute
|
||||
}
|
||||
defer key_destroy(&key_struct)
|
||||
|
||||
key_values, kv_ok := key_get_values(&key_struct)
|
||||
if !kv_ok {
|
||||
return nil, nil, .Invalid_Key
|
||||
}
|
||||
|
||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||
defer delete(storage_key)
|
||||
|
||||
// Fetch existing item
|
||||
existing_encoded, get_err := rocksdb.db_get(&engine.db, storage_key)
|
||||
existing_item: Item
|
||||
|
||||
if get_err == .None && existing_encoded != nil {
|
||||
defer delete(existing_encoded)
|
||||
decoded, decode_ok := decode(existing_encoded)
|
||||
if !decode_ok {
|
||||
return nil, nil, .Serialization_Error
|
||||
}
|
||||
existing_item = decoded
|
||||
old_item = item_deep_copy(existing_item)
|
||||
} else if get_err == .NotFound || existing_encoded == nil {
|
||||
existing_item = make(Item)
|
||||
for ks in metadata.key_schema {
|
||||
if val, found := key_item[ks.attribute_name]; found {
|
||||
existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return nil, nil, .RocksDB_Error
|
||||
}
|
||||
|
||||
if !execute_update_plan(&existing_item, plan) {
|
||||
item_destroy(&existing_item)
|
||||
if old, has := old_item.?; has {
|
||||
old_copy := old
|
||||
item_destroy(&old_copy)
|
||||
}
|
||||
return nil, nil, .Invalid_Key
|
||||
}
|
||||
|
||||
encoded_item, encode_ok := encode(existing_item)
|
||||
if !encode_ok {
|
||||
item_destroy(&existing_item)
|
||||
if old, has := old_item.?; has {
|
||||
old_copy := old
|
||||
item_destroy(&old_copy)
|
||||
}
|
||||
return nil, nil, .Serialization_Error
|
||||
}
|
||||
defer delete(encoded_item)
|
||||
|
||||
put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item)
|
||||
if put_err != .None {
|
||||
item_destroy(&existing_item)
|
||||
if old, has := old_item.?; has {
|
||||
old_copy := old
|
||||
item_destroy(&old_copy)
|
||||
}
|
||||
return nil, nil, .RocksDB_Error
|
||||
}
|
||||
|
||||
new_item = existing_item
|
||||
return old_item, new_item, .None
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// TransactGetItems Types
|
||||
// ============================================================================
|
||||
|
||||
Transact_Get_Action :: struct {
|
||||
table_name: string,
|
||||
key: Item,
|
||||
projection: Maybe([]string), // Optional ProjectionExpression paths
|
||||
}
|
||||
|
||||
Transact_Get_Result :: struct {
|
||||
items: []Maybe(Item), // One per action, nil if item not found
|
||||
}
|
||||
|
||||
transact_get_action_destroy :: proc(action: ^Transact_Get_Action) {
|
||||
item_destroy(&action.key)
|
||||
if proj, has := action.projection.?; has {
|
||||
delete(proj)
|
||||
}
|
||||
}
|
||||
|
||||
transact_get_result_destroy :: proc(result: ^Transact_Get_Result) {
|
||||
for &maybe_item in result.items {
|
||||
if item, has := maybe_item.?; has {
|
||||
item_copy := item
|
||||
item_destroy(&item_copy)
|
||||
}
|
||||
}
|
||||
delete(result.items)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// TransactGetItems — Atomically read up to 100 items
|
||||
//
|
||||
// DynamoDB semantics:
|
||||
// - All reads are performed with a consistent snapshot
|
||||
// - Missing items are returned as nil (no error)
|
||||
// - ProjectionExpression is applied per-item
|
||||
// ============================================================================
|
||||
|
||||
transact_get_items :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
actions: []Transact_Get_Action,
|
||||
) -> (Transact_Get_Result, Transaction_Error) {
|
||||
result: Transact_Get_Result
|
||||
|
||||
if len(actions) == 0 {
|
||||
return result, .Validation_Error
|
||||
}
|
||||
|
||||
// Collect unique tables and acquire shared locks in deterministic order
|
||||
table_set := make(map[string]bool, allocator = context.temp_allocator)
|
||||
for action in actions {
|
||||
table_set[action.table_name] = true
|
||||
}
|
||||
|
||||
table_names := make([dynamic]string, allocator = context.temp_allocator)
|
||||
for name in table_set {
|
||||
append(&table_names, name)
|
||||
}
|
||||
for i := 0; i < len(table_names); i += 1 {
|
||||
for j := i + 1; j < len(table_names); j += 1 {
|
||||
if table_names[j] < table_names[i] {
|
||||
table_names[i], table_names[j] = table_names[j], table_names[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
locks := make([dynamic]^sync.RW_Mutex, allocator = context.temp_allocator)
|
||||
for name in table_names {
|
||||
lock := get_or_create_table_lock(engine, name)
|
||||
sync.rw_mutex_shared_lock(lock)
|
||||
append(&locks, lock)
|
||||
}
|
||||
defer {
|
||||
for i := len(locks) - 1; i >= 0; i -= 1 {
|
||||
sync.rw_mutex_shared_unlock(locks[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Cache metadata
|
||||
metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator)
|
||||
defer {
|
||||
for _, meta in metadata_cache {
|
||||
meta_copy := meta
|
||||
table_metadata_destroy(&meta_copy, engine.allocator)
|
||||
}
|
||||
}
|
||||
|
||||
items := make([]Maybe(Item), len(actions))
|
||||
|
||||
for action, idx in actions {
|
||||
// Get metadata (cached)
|
||||
metadata: ^Table_Metadata
|
||||
if cached, found := &metadata_cache[action.table_name]; found {
|
||||
metadata = cached
|
||||
} else {
|
||||
meta, meta_err := get_table_metadata(engine, action.table_name)
|
||||
if meta_err != .None {
|
||||
items[idx] = nil
|
||||
continue
|
||||
}
|
||||
metadata_cache[action.table_name] = meta
|
||||
metadata = &metadata_cache[action.table_name]
|
||||
}
|
||||
|
||||
// Fetch item
|
||||
item_result, get_err := get_item_internal(engine, action.table_name, action.key, metadata)
|
||||
if get_err != .None {
|
||||
items[idx] = nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Apply projection if specified
|
||||
if item, has_item := item_result.?; has_item {
|
||||
if proj, has_proj := action.projection.?; has_proj && len(proj) > 0 {
|
||||
projected := apply_projection(item, proj)
|
||||
item_copy := item
|
||||
item_destroy(&item_copy)
|
||||
items[idx] = projected
|
||||
} else {
|
||||
items[idx] = item
|
||||
}
|
||||
} else {
|
||||
items[idx] = nil
|
||||
}
|
||||
}
|
||||
|
||||
result.items = items
|
||||
return result, .None
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Helper: Extract modified attribute paths from an Update_Plan
|
||||
//
|
||||
// Used for UPDATED_NEW / UPDATED_OLD ReturnValues filtering.
|
||||
// DynamoDB only returns the attributes that were actually modified
|
||||
// by the UpdateExpression, not the entire item.
|
||||
// ============================================================================
|
||||
|
||||
get_update_plan_modified_paths :: proc(plan: ^Update_Plan) -> []string {
|
||||
paths := make(map[string]bool, allocator = context.temp_allocator)
|
||||
|
||||
for action in plan.sets {
|
||||
paths[action.path] = true
|
||||
}
|
||||
for action in plan.removes {
|
||||
paths[action.path] = true
|
||||
}
|
||||
for action in plan.adds {
|
||||
paths[action.path] = true
|
||||
}
|
||||
for action in plan.deletes {
|
||||
paths[action.path] = true
|
||||
}
|
||||
|
||||
result := make([]string, len(paths))
|
||||
i := 0
|
||||
for path in paths {
|
||||
result[i] = path
|
||||
i += 1
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Filter an item to only include the specified attribute paths.
|
||||
// Returns a new deep-copied item containing only matching attributes.
|
||||
filter_item_to_paths :: proc(item: Item, paths: []string) -> Item {
|
||||
result := make(Item)
|
||||
for path in paths {
|
||||
if val, found := item[path]; found {
|
||||
result[strings.clone(path)] = attr_value_deep_copy(val)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
12
main.odin
12
main.odin
@@ -106,6 +106,10 @@ handle_dynamodb_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_all
|
||||
handle_batch_write_item(engine, request, &response)
|
||||
case .BatchGetItem:
|
||||
handle_batch_get_item(engine, request, &response)
|
||||
case .TransactWriteItems:
|
||||
handle_transact_write_items(engine, request, &response)
|
||||
case .TransactGetItems:
|
||||
handle_transact_get_items(engine, request, &response)
|
||||
case .Unknown:
|
||||
return make_error_response(&response, .ValidationException, "Unknown operation")
|
||||
case:
|
||||
@@ -657,7 +661,9 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
|
||||
|
||||
case "UPDATED_NEW":
|
||||
if new_val, has := new_item.?; has {
|
||||
item_json := dynamodb.serialize_item(new_val)
|
||||
filtered := filter_updated_attributes(new_val, &plan)
|
||||
defer dynamodb.item_destroy(&filtered)
|
||||
item_json := dynamodb.serialize_item(filtered)
|
||||
resp := fmt.aprintf(`{"Attributes":%s}`, item_json)
|
||||
response_set_body(response, transmute([]byte)resp)
|
||||
} else {
|
||||
@@ -666,7 +672,9 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
|
||||
|
||||
case "UPDATED_OLD":
|
||||
if old, has := old_item.?; has {
|
||||
item_json := dynamodb.serialize_item(old)
|
||||
filtered := filter_updated_attributes(old, &plan)
|
||||
defer dynamodb.item_destroy(&filtered)
|
||||
item_json := dynamodb.serialize_item(filtered)
|
||||
resp := fmt.aprintf(`{"Attributes":%s}`, item_json)
|
||||
response_set_body(response, transmute([]byte)resp)
|
||||
} else {
|
||||
|
||||
595
transact_handlers.odin
Normal file
595
transact_handlers.odin
Normal file
@@ -0,0 +1,595 @@
|
||||
// transact_handlers.odin — HTTP handlers for TransactWriteItems and TransactGetItems
|
||||
//
|
||||
// Also contains the UPDATED_NEW / UPDATED_OLD filtering helper for UpdateItem.
|
||||
package main
|
||||
|
||||
import "core:encoding/json"
|
||||
import "core:fmt"
|
||||
import "core:strings"
|
||||
import "dynamodb"
|
||||
|
||||
// ============================================================================
|
||||
// TransactWriteItems Handler
|
||||
//
|
||||
// Request format:
|
||||
// {
|
||||
// "TransactItems": [
|
||||
// {
|
||||
// "Put": {
|
||||
// "TableName": "...",
|
||||
// "Item": { ... },
|
||||
// "ConditionExpression": "...", // optional
|
||||
// "ExpressionAttributeNames": { ... }, // optional
|
||||
// "ExpressionAttributeValues": { ... } // optional
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "Delete": {
|
||||
// "TableName": "...",
|
||||
// "Key": { ... },
|
||||
// "ConditionExpression": "...", // optional
|
||||
// ...
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "Update": {
|
||||
// "TableName": "...",
|
||||
// "Key": { ... },
|
||||
// "UpdateExpression": "...",
|
||||
// "ConditionExpression": "...", // optional
|
||||
// "ExpressionAttributeNames": { ... }, // optional
|
||||
// "ExpressionAttributeValues": { ... } // optional
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "ConditionCheck": {
|
||||
// "TableName": "...",
|
||||
// "Key": { ... },
|
||||
// "ConditionExpression": "...",
|
||||
// "ExpressionAttributeNames": { ... }, // optional
|
||||
// "ExpressionAttributeValues": { ... } // optional
|
||||
// }
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// ============================================================================
|
||||
|
||||
handle_transact_write_items :: proc(
|
||||
engine: ^dynamodb.Storage_Engine,
|
||||
request: ^HTTP_Request,
|
||||
response: ^HTTP_Response,
|
||||
) {
|
||||
data, parse_err := json.parse(request.body, allocator = context.allocator)
|
||||
if parse_err != nil {
|
||||
make_error_response(response, .SerializationException, "Invalid JSON")
|
||||
return
|
||||
}
|
||||
defer json.destroy_value(data)
|
||||
|
||||
root, root_ok := data.(json.Object)
|
||||
if !root_ok {
|
||||
make_error_response(response, .SerializationException, "Request must be an object")
|
||||
return
|
||||
}
|
||||
|
||||
transact_items_val, found := root["TransactItems"]
|
||||
if !found {
|
||||
make_error_response(response, .ValidationException, "Missing TransactItems")
|
||||
return
|
||||
}
|
||||
|
||||
transact_items, ti_ok := transact_items_val.(json.Array)
|
||||
if !ti_ok {
|
||||
make_error_response(response, .ValidationException, "TransactItems must be an array")
|
||||
return
|
||||
}
|
||||
|
||||
if len(transact_items) == 0 {
|
||||
make_error_response(response, .ValidationException,
|
||||
"TransactItems must contain at least one item")
|
||||
return
|
||||
}
|
||||
|
||||
if len(transact_items) > 100 {
|
||||
make_error_response(response, .ValidationException,
|
||||
"Member must have length less than or equal to 100")
|
||||
return
|
||||
}
|
||||
|
||||
// Parse each action
|
||||
actions := make([dynamic]dynamodb.Transact_Write_Action)
|
||||
defer {
|
||||
for &action in actions {
|
||||
dynamodb.transact_write_action_destroy(&action)
|
||||
}
|
||||
delete(actions)
|
||||
}
|
||||
|
||||
for elem in transact_items {
|
||||
elem_obj, elem_ok := elem.(json.Object)
|
||||
if !elem_ok {
|
||||
make_error_response(response, .ValidationException,
|
||||
"Each TransactItem must be an object")
|
||||
return
|
||||
}
|
||||
|
||||
action, action_ok := parse_transact_write_action(elem_obj)
|
||||
if !action_ok {
|
||||
make_error_response(response, .ValidationException,
|
||||
"Invalid TransactItem action")
|
||||
return
|
||||
}
|
||||
append(&actions, action)
|
||||
}
|
||||
|
||||
// Execute transaction
|
||||
result, tx_err := dynamodb.transact_write_items(engine, actions[:])
|
||||
defer dynamodb.transact_write_result_destroy(&result)
|
||||
|
||||
switch tx_err {
|
||||
case .None:
|
||||
response_set_body(response, transmute([]byte)string("{}"))
|
||||
|
||||
case .Cancelled:
|
||||
// Build TransactionCanceledException response
|
||||
builder := strings.builder_make()
|
||||
strings.write_string(&builder, `{"__type":"com.amazonaws.dynamodb.v20120810#TransactionCanceledException","message":"Transaction cancelled, please refer cancellation reasons for specific reasons [`)
|
||||
|
||||
for reason, i in result.cancellation_reasons {
|
||||
if i > 0 {
|
||||
strings.write_string(&builder, ", ")
|
||||
}
|
||||
strings.write_string(&builder, reason.code)
|
||||
}
|
||||
|
||||
strings.write_string(&builder, `]","CancellationReasons":[`)
|
||||
|
||||
for reason, i in result.cancellation_reasons {
|
||||
if i > 0 {
|
||||
strings.write_string(&builder, ",")
|
||||
}
|
||||
fmt.sbprintf(&builder, `{{"Code":"%s","Message":"%s"}}`, reason.code, reason.message)
|
||||
}
|
||||
|
||||
strings.write_string(&builder, "]}")
|
||||
|
||||
response_set_status(response, .Bad_Request)
|
||||
resp_body := strings.to_string(builder)
|
||||
response_set_body(response, transmute([]byte)resp_body)
|
||||
|
||||
case .Validation_Error:
|
||||
make_error_response(response, .ValidationException,
|
||||
"Transaction validation failed")
|
||||
|
||||
case .Internal_Error:
|
||||
make_error_response(response, .InternalServerError,
|
||||
"Internal error during transaction")
|
||||
}
|
||||
}
|
||||
|
||||
// Parse a single TransactItem action from JSON
|
||||
@(private = "file")
|
||||
parse_transact_write_action :: proc(obj: json.Object) -> (dynamodb.Transact_Write_Action, bool) {
|
||||
action: dynamodb.Transact_Write_Action
|
||||
action.expr_attr_values = make(map[string]dynamodb.Attribute_Value)
|
||||
|
||||
// Try Put
|
||||
if put_val, has_put := obj["Put"]; has_put {
|
||||
put_obj, put_ok := put_val.(json.Object)
|
||||
if !put_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.type = .Put
|
||||
return parse_transact_put_action(put_obj, &action)
|
||||
}
|
||||
|
||||
// Try Delete
|
||||
if del_val, has_del := obj["Delete"]; has_del {
|
||||
del_obj, del_ok := del_val.(json.Object)
|
||||
if !del_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.type = .Delete
|
||||
return parse_transact_key_action(del_obj, &action)
|
||||
}
|
||||
|
||||
// Try Update
|
||||
if upd_val, has_upd := obj["Update"]; has_upd {
|
||||
upd_obj, upd_ok := upd_val.(json.Object)
|
||||
if !upd_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.type = .Update
|
||||
return parse_transact_update_action(upd_obj, &action)
|
||||
}
|
||||
|
||||
// Try ConditionCheck
|
||||
if cc_val, has_cc := obj["ConditionCheck"]; has_cc {
|
||||
cc_obj, cc_ok := cc_val.(json.Object)
|
||||
if !cc_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.type = .Condition_Check
|
||||
return parse_transact_key_action(cc_obj, &action)
|
||||
}
|
||||
|
||||
return {}, false
|
||||
}
|
||||
|
||||
// Parse common expression fields from a transact action object
|
||||
@(private = "file")
|
||||
parse_transact_expression_fields :: proc(obj: json.Object, action: ^dynamodb.Transact_Write_Action) {
|
||||
// ConditionExpression
|
||||
if ce_val, found := obj["ConditionExpression"]; found {
|
||||
if ce_str, str_ok := ce_val.(json.String); str_ok {
|
||||
action.condition_expr = strings.clone(string(ce_str))
|
||||
}
|
||||
}
|
||||
|
||||
// ExpressionAttributeNames
|
||||
if ean_val, found := obj["ExpressionAttributeNames"]; found {
|
||||
if ean_obj, ean_ok := ean_val.(json.Object); ean_ok {
|
||||
names := make(map[string]string)
|
||||
for key, val in ean_obj {
|
||||
if str, str_ok := val.(json.String); str_ok {
|
||||
names[strings.clone(key)] = strings.clone(string(str))
|
||||
}
|
||||
}
|
||||
action.expr_attr_names = names
|
||||
}
|
||||
}
|
||||
|
||||
// ExpressionAttributeValues
|
||||
if eav_val, found := obj["ExpressionAttributeValues"]; found {
|
||||
if eav_obj, eav_ok := eav_val.(json.Object); eav_ok {
|
||||
for key, val in eav_obj {
|
||||
attr, attr_ok := dynamodb.parse_attribute_value(val)
|
||||
if attr_ok {
|
||||
action.expr_attr_values[strings.clone(key)] = attr
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Parse a Put transact action
|
||||
@(private = "file")
|
||||
parse_transact_put_action :: proc(
|
||||
obj: json.Object,
|
||||
action: ^dynamodb.Transact_Write_Action,
|
||||
) -> (dynamodb.Transact_Write_Action, bool) {
|
||||
// TableName
|
||||
tn_val, tn_found := obj["TableName"]
|
||||
if !tn_found {
|
||||
return {}, false
|
||||
}
|
||||
tn_str, tn_ok := tn_val.(json.String)
|
||||
if !tn_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.table_name = string(tn_str)
|
||||
|
||||
// Item
|
||||
item_val, item_found := obj["Item"]
|
||||
if !item_found {
|
||||
return {}, false
|
||||
}
|
||||
item, item_ok := dynamodb.parse_item_from_value(item_val)
|
||||
if !item_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.item = item
|
||||
|
||||
// Expression fields
|
||||
parse_transact_expression_fields(obj, action)
|
||||
|
||||
return action^, true
|
||||
}
|
||||
|
||||
// Parse a Delete or ConditionCheck transact action (both use Key)
|
||||
@(private = "file")
|
||||
parse_transact_key_action :: proc(
|
||||
obj: json.Object,
|
||||
action: ^dynamodb.Transact_Write_Action,
|
||||
) -> (dynamodb.Transact_Write_Action, bool) {
|
||||
// TableName
|
||||
tn_val, tn_found := obj["TableName"]
|
||||
if !tn_found {
|
||||
return {}, false
|
||||
}
|
||||
tn_str, tn_ok := tn_val.(json.String)
|
||||
if !tn_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.table_name = string(tn_str)
|
||||
|
||||
// Key
|
||||
key_val, key_found := obj["Key"]
|
||||
if !key_found {
|
||||
return {}, false
|
||||
}
|
||||
key, key_ok := dynamodb.parse_item_from_value(key_val)
|
||||
if !key_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.key = key
|
||||
|
||||
// Expression fields
|
||||
parse_transact_expression_fields(obj, action)
|
||||
|
||||
return action^, true
|
||||
}
|
||||
|
||||
// Parse an Update transact action
|
||||
@(private = "file")
|
||||
parse_transact_update_action :: proc(
|
||||
obj: json.Object,
|
||||
action: ^dynamodb.Transact_Write_Action,
|
||||
) -> (dynamodb.Transact_Write_Action, bool) {
|
||||
// TableName
|
||||
tn_val, tn_found := obj["TableName"]
|
||||
if !tn_found {
|
||||
return {}, false
|
||||
}
|
||||
tn_str, tn_ok := tn_val.(json.String)
|
||||
if !tn_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.table_name = string(tn_str)
|
||||
|
||||
// Key
|
||||
key_val, key_found := obj["Key"]
|
||||
if !key_found {
|
||||
return {}, false
|
||||
}
|
||||
key, key_ok := dynamodb.parse_item_from_value(key_val)
|
||||
if !key_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.key = key
|
||||
|
||||
// Expression fields (must be parsed before UpdateExpression so attr values are available)
|
||||
parse_transact_expression_fields(obj, action)
|
||||
|
||||
// UpdateExpression
|
||||
ue_val, ue_found := obj["UpdateExpression"]
|
||||
if !ue_found {
|
||||
return {}, false
|
||||
}
|
||||
ue_str, ue_ok := ue_val.(json.String)
|
||||
if !ue_ok {
|
||||
return {}, false
|
||||
}
|
||||
|
||||
plan, plan_ok := dynamodb.parse_update_expression(
|
||||
string(ue_str), action.expr_attr_names, action.expr_attr_values,
|
||||
)
|
||||
if !plan_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.update_plan = plan
|
||||
|
||||
return action^, true
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// TransactGetItems Handler
|
||||
//
|
||||
// Request format:
|
||||
// {
|
||||
// "TransactItems": [
|
||||
// {
|
||||
// "Get": {
|
||||
// "TableName": "...",
|
||||
// "Key": { ... },
|
||||
// "ProjectionExpression": "...", // optional
|
||||
// "ExpressionAttributeNames": { ... } // optional
|
||||
// }
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// ============================================================================
|
||||
|
||||
handle_transact_get_items :: proc(
|
||||
engine: ^dynamodb.Storage_Engine,
|
||||
request: ^HTTP_Request,
|
||||
response: ^HTTP_Response,
|
||||
) {
|
||||
data, parse_err := json.parse(request.body, allocator = context.allocator)
|
||||
if parse_err != nil {
|
||||
make_error_response(response, .SerializationException, "Invalid JSON")
|
||||
return
|
||||
}
|
||||
defer json.destroy_value(data)
|
||||
|
||||
root, root_ok := data.(json.Object)
|
||||
if !root_ok {
|
||||
make_error_response(response, .SerializationException, "Request must be an object")
|
||||
return
|
||||
}
|
||||
|
||||
transact_items_val, found := root["TransactItems"]
|
||||
if !found {
|
||||
make_error_response(response, .ValidationException, "Missing TransactItems")
|
||||
return
|
||||
}
|
||||
|
||||
transact_items, ti_ok := transact_items_val.(json.Array)
|
||||
if !ti_ok {
|
||||
make_error_response(response, .ValidationException, "TransactItems must be an array")
|
||||
return
|
||||
}
|
||||
|
||||
if len(transact_items) == 0 {
|
||||
make_error_response(response, .ValidationException,
|
||||
"TransactItems must contain at least one item")
|
||||
return
|
||||
}
|
||||
|
||||
if len(transact_items) > 100 {
|
||||
make_error_response(response, .ValidationException,
|
||||
"Member must have length less than or equal to 100")
|
||||
return
|
||||
}
|
||||
|
||||
// Parse each get action
|
||||
actions := make([dynamic]dynamodb.Transact_Get_Action)
|
||||
defer {
|
||||
for &action in actions {
|
||||
dynamodb.transact_get_action_destroy(&action)
|
||||
}
|
||||
delete(actions)
|
||||
}
|
||||
|
||||
for elem in transact_items {
|
||||
elem_obj, elem_ok := elem.(json.Object)
|
||||
if !elem_ok {
|
||||
make_error_response(response, .ValidationException,
|
||||
"Each TransactItem must be an object")
|
||||
return
|
||||
}
|
||||
|
||||
get_val, has_get := elem_obj["Get"]
|
||||
if !has_get {
|
||||
make_error_response(response, .ValidationException,
|
||||
"TransactGetItems only supports Get actions")
|
||||
return
|
||||
}
|
||||
|
||||
get_obj, get_ok := get_val.(json.Object)
|
||||
if !get_ok {
|
||||
make_error_response(response, .ValidationException,
|
||||
"Get action must be an object")
|
||||
return
|
||||
}
|
||||
|
||||
action, action_ok := parse_transact_get_action(get_obj)
|
||||
if !action_ok {
|
||||
make_error_response(response, .ValidationException,
|
||||
"Invalid Get action")
|
||||
return
|
||||
}
|
||||
append(&actions, action)
|
||||
}
|
||||
|
||||
// Execute transaction get
|
||||
result, tx_err := dynamodb.transact_get_items(engine, actions[:])
|
||||
defer dynamodb.transact_get_result_destroy(&result)
|
||||
|
||||
if tx_err != .None {
|
||||
make_error_response(response, .InternalServerError,
|
||||
"Transaction get failed")
|
||||
return
|
||||
}
|
||||
|
||||
// Build response
|
||||
builder := strings.builder_make()
|
||||
strings.write_string(&builder, `{"Responses":[`)
|
||||
|
||||
for maybe_item, i in result.items {
|
||||
if i > 0 {
|
||||
strings.write_string(&builder, ",")
|
||||
}
|
||||
|
||||
if item, has_item := maybe_item.?; has_item {
|
||||
item_json := dynamodb.serialize_item(item)
|
||||
fmt.sbprintf(&builder, `{{"Item":%s}}`, item_json)
|
||||
} else {
|
||||
strings.write_string(&builder, "{}")
|
||||
}
|
||||
}
|
||||
|
||||
strings.write_string(&builder, "]}")
|
||||
|
||||
resp_body := strings.to_string(builder)
|
||||
response_set_body(response, transmute([]byte)resp_body)
|
||||
}
|
||||
|
||||
// Parse a single TransactGetItems Get action
|
||||
@(private = "file")
|
||||
parse_transact_get_action :: proc(obj: json.Object) -> (dynamodb.Transact_Get_Action, bool) {
|
||||
action: dynamodb.Transact_Get_Action
|
||||
|
||||
// TableName
|
||||
tn_val, tn_found := obj["TableName"]
|
||||
if !tn_found {
|
||||
return {}, false
|
||||
}
|
||||
tn_str, tn_ok := tn_val.(json.String)
|
||||
if !tn_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.table_name = string(tn_str)
|
||||
|
||||
// Key
|
||||
key_val, key_found := obj["Key"]
|
||||
if !key_found {
|
||||
return {}, false
|
||||
}
|
||||
key, key_ok := dynamodb.parse_item_from_value(key_val)
|
||||
if !key_ok {
|
||||
return {}, false
|
||||
}
|
||||
action.key = key
|
||||
|
||||
// ProjectionExpression (optional)
|
||||
if pe_val, pe_found := obj["ProjectionExpression"]; pe_found {
|
||||
if pe_str, pe_ok := pe_val.(json.String); pe_ok {
|
||||
// Parse ExpressionAttributeNames for projection
|
||||
attr_names: Maybe(map[string]string) = nil
|
||||
if ean_val, ean_found := obj["ExpressionAttributeNames"]; ean_found {
|
||||
if ean_obj, ean_ok := ean_val.(json.Object); ean_ok {
|
||||
names := make(map[string]string, allocator = context.temp_allocator)
|
||||
for key_str, val in ean_obj {
|
||||
if str, str_ok := val.(json.String); str_ok {
|
||||
names[key_str] = string(str)
|
||||
}
|
||||
}
|
||||
attr_names = names
|
||||
}
|
||||
}
|
||||
|
||||
parts := strings.split(string(pe_str), ",")
|
||||
paths := make([dynamic]string)
|
||||
for part in parts {
|
||||
trimmed := strings.trim_space(part)
|
||||
if len(trimmed) == 0 {
|
||||
continue
|
||||
}
|
||||
resolved, res_ok := dynamodb.resolve_attribute_name(trimmed, attr_names)
|
||||
if !res_ok {
|
||||
delete(paths)
|
||||
dynamodb.item_destroy(&action.key)
|
||||
return {}, false
|
||||
}
|
||||
append(&paths, strings.clone(resolved))
|
||||
}
|
||||
action.projection = paths[:]
|
||||
}
|
||||
}
|
||||
|
||||
return action, true
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// UPDATED_NEW / UPDATED_OLD Filtering Helper
|
||||
//
|
||||
// DynamoDB ReturnValues semantics:
|
||||
// ALL_NEW → all attributes of the item after the update
|
||||
// ALL_OLD → all attributes of the item before the update
|
||||
// UPDATED_NEW → only the attributes that were modified, with new values
|
||||
// UPDATED_OLD → only the attributes that were modified, with old values
|
||||
//
|
||||
// This filters an item to only include the attributes touched by the
|
||||
// UpdateExpression (the "modified paths").
|
||||
// ============================================================================
|
||||
|
||||
filter_updated_attributes :: proc(
|
||||
item: dynamodb.Item,
|
||||
plan: ^dynamodb.Update_Plan,
|
||||
) -> dynamodb.Item {
|
||||
modified_paths := dynamodb.get_update_plan_modified_paths(plan)
|
||||
defer delete(modified_paths)
|
||||
|
||||
return dynamodb.filter_item_to_paths(item, modified_paths)
|
||||
}
|
||||
Reference in New Issue
Block a user