From ffd3eda63cd7bd3887be534e3d9f16d0af8fa5aa Mon Sep 17 00:00:00 2001 From: biondizzle Date: Mon, 16 Feb 2026 00:18:20 -0500 Subject: [PATCH] make batch operations work --- ARCHITECTURE.md | 18 -- README.md | 29 +-- TODO.md | 54 +--- dynamodb/batch.odin | 199 +++++++++++++++ dynamodb/condition.odin | 118 +++++++++ main.odin | 551 +++++++++++++++++++++++++++++++++++++++- 6 files changed, 877 insertions(+), 92 deletions(-) create mode 100644 dynamodb/batch.odin create mode 100644 dynamodb/condition.odin diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index f775b33..8352654 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -170,24 +170,6 @@ Encoded as: 30 // value bytes (stored as string) ``` -## Module Structure - -``` -jormundb/ -├── main.odin # Entry point, HTTP server -├── rocksdb/ # RocksDB C FFI bindings -│ └── rocksdb.odin # db_open, db_put, db_get, etc. -├── dynamodb/ # DynamoDB protocol implementation -│ ├── types.odin # Core types (Attribute_Value, Item, Key, etc.) -│ ├── json.odin # DynamoDB JSON parsing/serialization -│ ├── storage.odin # Storage engine (CRUD, scan, query) -│ └── handler.odin # HTTP request handlers -├── key_codec/ # Binary key encoding -│ └── key_codec.odin # build_data_key, decode_data_key, etc. -└── item_codec/ # Binary TLV item encoding - └── item_codec.odin # encode, decode -``` - ## Request Flow ``` diff --git a/README.md b/README.md index 575f4ce..8c3ea62 100644 --- a/README.md +++ b/README.md @@ -119,23 +119,6 @@ Binary encoding → Disk JSON response → Client ``` -### Module Structure - -``` -jormundb/ -├── rocksdb/ - C FFI bindings to librocksdb -├── rocksdb_shim/ - C++ FFI bindings to librocksdb (so we can use the WAL helper functions) -├── dynamodb/ - Core types and operations -│ ├── types.odin - AttributeValue, Item, Key, etc. -│ ├── json.odin - DynamoDB JSON serialization -│ ├── storage.odin - Storage engine with RocksDB -│ ├── key_codec.odin - Binary key encoding -│ ├── item_codec.odin - Binary TLV item encoding -│ └── handler.odin - HTTP request handlers -├── http.odin - HTTP Server -└── main.odin - HTTP Router and handler (entry point) -``` - ### Storage Format **Keys** (varint-length-prefixed segments): @@ -230,17 +213,17 @@ Scan (full table) | 5000 ops | 234.56 ms | 21320 ops/sec - ✅ DeleteItem - ✅ Query (with KeyConditionExpression) - ✅ Scan (with pagination) +- ✅ ConditionExpression +- ✅ FilterExpression +- ✅ ProjectionExpression +- ✅ BatchWriteItem +- ✅ BatchGetItem ### Coming Soon -- ⏳ UpdateItem (with UpdateExpression) -- ⏳ BatchWriteItem -- ⏳ BatchGetItem +- ⏳ UpdateItem (works but needs UPDATED_NEW/UPDATED_OLD response filtering to work for full Dynamo Parity) - ⏳ Global Secondary Indexes - ⏳ Local Secondary Indexes -- ⏳ ConditionExpression -- ⏳ FilterExpression -- ⏳ ProjectionExpression ## Configuration diff --git a/TODO.md b/TODO.md index 57b2650..e90d728 100644 --- a/TODO.md +++ b/TODO.md @@ -1,22 +1,7 @@ # JormunDB (Odin rewrite) — TODO -This tracks the rewrite from Zig (ZynamoDB) → Odin (JormunDB), and what's left to stabilize + extend. +This tracks what's left to stabilize + extend the project -## Status Snapshot - -### ✅ Ported / Working (core) -- [x] Project layout + Makefile targets (build/run/test/fmt) -- [x] RocksDB bindings / integration -- [x] Core DynamoDB types (AttributeValue / Item / Key / TableDescription, etc.) -- [x] Binary key codec (varint length-prefixed segments) -- [x] Binary item codec (TLV encoding / decoding) -- [x] Storage engine: tables + CRUD + scan/query plumbing -- [x] Table-level RW locks (read ops shared / write ops exclusive) -- [x] HTTP server + request routing via `X-Amz-Target` -- [x] DynamoDB JSON (parse + serialize) -- [x] Expression parsing for Query key conditions (basic support) - ---- ## Now (MVP correctness + polish) Goal: "aws cli works reliably for CreateTable/ListTables/PutItem/GetItem/DeleteItem/Scan/Query" with correct DynamoDB-ish responses. @@ -68,7 +53,8 @@ Goal: "aws cli works reliably for CreateTable/ListTables/PutItem/GetItem/DeleteI ### 5) UpdateItem / conditional logic groundwork - [x] `UpdateItem` handler registered in router (currently returns clear "not yet supported" error) - [x] Implement `UpdateItem` (initially minimal: SET for scalar attrs) -- [ ] Add `ConditionExpression` support for Put/Delete/Update (start with simple comparisons) + - [ ] `UpdateItem` needs UPDATED_NEW/UPDATED_OLD response filtering for perfect parity with Dynamo +- [x] Add `ConditionExpression` support for Put/Delete/Update (start with simple comparisons) - [x] Define internal "update plan" representation (parsed ops → applied mutations) ### 6) Response completeness / options @@ -86,36 +72,6 @@ Goal: "aws cli works reliably for CreateTable/ListTables/PutItem/GetItem/DeleteI --- -## Bug Fixes Applied This Session - -### Pagination (scan + query) -**Bug**: `last_evaluated_key` was set to the key of the *next unread* item (the item at `count == limit`). When the client resumed with that key as `ExclusiveStartKey`, it would seek-then-skip, **dropping one item** from the result set. - -**Fix**: Now tracks the key of the *last successfully returned* item. Only emits `LastEvaluatedKey` when we confirm there are more items beyond the returned set (via `has_more` flag). - -### Sort key condition filtering -**Bug**: `query()` performed a partition-prefix scan but never applied the sort key condition (=, <, BETWEEN, begins_with, etc.) from `KeyConditionExpression`. All items in the partition were returned regardless of sort key predicates. - -**Fix**: `query()` now accepts an optional `Sort_Key_Condition` parameter. The handler extracts it from the parsed `Key_Condition` and passes it through. `evaluate_sort_key_condition()` compares the item's SK attribute against the condition using string comparison (matching DynamoDB's lexicographic semantics for S/N/B keys). - -### Write locking -**Bug**: `put_item` and `delete_item` acquired *shared* (read) locks. Multiple concurrent writes to the same table could interleave without mutual exclusion. - -**Fix**: Both now acquire *exclusive* (write) locks via `sync.rw_mutex_lock`. Read operations (`get_item`, `scan`, `query`) continue to use shared locks. - -### delete_table item cleanup -**Bug**: `delete_table` only deleted the metadata key, leaving all data items orphaned in RocksDB. - -**Fix**: Before deleting metadata, `delete_table` now iterates over all keys with the table's data prefix and deletes them individually. - -### Item key type validation -**New**: `put_item` now validates that the item's key attribute types match the table's `AttributeDefinitions`. E.g., if PK is declared as `S`, putting an item with a numeric PK is rejected with `Invalid_Key`. - -### Error response standardization -**Fix**: Centralized all storage-error-to-HTTP-error mapping in `handle_storage_error`. InternalServerError maps to HTTP 500; all client errors (validation, not-found, etc.) map to HTTP 400. Missing `X-Amz-Target` now returns `SerializationException` to match real DynamoDB behavior. - ---- - ## Later (big features) These align with the "Future Enhancements" list in ARCHITECTURE.md. @@ -125,8 +81,8 @@ These align with the "Future Enhancements" list in ARCHITECTURE.md. - [ ] Index backfill + write-path maintenance ### 9) Batch + transactions -- [ ] BatchWriteItem -- [ ] BatchGetItem +- [x] BatchWriteItem +- [x] BatchGetItem - [ ] Transactions (TransactWriteItems / TransactGetItems) ### 10) Performance / ops diff --git a/dynamodb/batch.odin b/dynamodb/batch.odin new file mode 100644 index 0000000..6fb7eee --- /dev/null +++ b/dynamodb/batch.odin @@ -0,0 +1,199 @@ +// BatchWriteItem and BatchGetItem storage operations +// +// BatchWriteItem: Puts or deletes multiple items across one or more tables. +// - Up to 25 items per batch (DynamoDB limit) +// - Each item is an independent PutRequest or DeleteRequest +// - Partial failures are reported via UnprocessedItems +// +// BatchGetItem: Retrieves multiple items from one or more tables. +// - Up to 100 items per batch (DynamoDB limit) +// - Each table request contains a list of Keys +// - Partial failures reported via UnprocessedKeys +package dynamodb + +// ============================================================================ +// BatchWriteItem Types +// ============================================================================ + +Write_Request_Type :: enum { + Put, + Delete, +} + +Write_Request :: struct { + type: Write_Request_Type, + item: Item, // For Put: the full item. For Delete: the key item. +} + +Batch_Write_Table_Request :: struct { + table_name: string, + requests: []Write_Request, +} + +Batch_Write_Result :: struct { + // UnprocessedItems — requests that failed and should be retried. + // For now we process everything or return an error, so this is + // typically empty. Populated only on partial failures. + unprocessed: [dynamic]Batch_Write_Table_Request, +} + +batch_write_result_destroy :: proc(result: ^Batch_Write_Result) { + for &table_req in result.unprocessed { + for &req in table_req.requests { + item_destroy(&req.item) + } + delete(table_req.requests) + } + delete(result.unprocessed) +} + +// ============================================================================ +// BatchWriteItem — Execute a batch of put/delete operations +// +// DynamoDB semantics: +// - Operations within a batch are NOT atomic (some may succeed, some fail) +// - Each operation is validated independently +// - Failed operations go into UnprocessedItems +// - Limit: 25 operations total across all tables +// ============================================================================ + +batch_write_item :: proc( + engine: ^Storage_Engine, + table_requests: []Batch_Write_Table_Request, +) -> (Batch_Write_Result, Storage_Error) { + result := Batch_Write_Result{ + unprocessed = make([dynamic]Batch_Write_Table_Request), + } + + for table_req in table_requests { + failed_requests := make([dynamic]Write_Request) + + for req in table_req.requests { + var_err: Storage_Error + + switch req.type { + case .Put: + var_err = put_item(engine, table_req.table_name, req.item) + case .Delete: + var_err = delete_item(engine, table_req.table_name, req.item) + } + + if var_err != .None { + // Deep copy the failed request for UnprocessedItems + failed_item := item_deep_copy(req.item) + append(&failed_requests, Write_Request{ + type = req.type, + item = failed_item, + }) + } + } + + if len(failed_requests) > 0 { + append(&result.unprocessed, Batch_Write_Table_Request{ + table_name = table_req.table_name, + requests = failed_requests[:], + }) + } else { + delete(failed_requests) + } + } + + return result, .None +} + +// ============================================================================ +// BatchGetItem Types +// ============================================================================ + +Batch_Get_Table_Request :: struct { + table_name: string, + keys: []Item, +} + +Batch_Get_Table_Result :: struct { + table_name: string, + items: []Item, +} + +Batch_Get_Result :: struct { + responses: [dynamic]Batch_Get_Table_Result, + unprocessed_keys: [dynamic]Batch_Get_Table_Request, +} + +batch_get_result_destroy :: proc(result: ^Batch_Get_Result) { + for &table_result in result.responses { + for &item in table_result.items { + item_destroy(&item) + } + delete(table_result.items) + } + delete(result.responses) + + for &table_req in result.unprocessed_keys { + for &key in table_req.keys { + item_destroy(&key) + } + delete(table_req.keys) + } + delete(result.unprocessed_keys) +} + +// ============================================================================ +// BatchGetItem — Retrieve multiple items from one or more tables +// +// DynamoDB semantics: +// - Each key is fetched independently +// - Missing items are silently omitted (no error) +// - Failed lookups go into UnprocessedKeys +// - Limit: 100 keys total across all tables +// ============================================================================ + +batch_get_item :: proc( + engine: ^Storage_Engine, + table_requests: []Batch_Get_Table_Request, +) -> (Batch_Get_Result, Storage_Error) { + result := Batch_Get_Result{ + responses = make([dynamic]Batch_Get_Table_Result), + unprocessed_keys = make([dynamic]Batch_Get_Table_Request), + } + + for table_req in table_requests { + found_items := make([dynamic]Item) + failed_keys := make([dynamic]Item) + + for key in table_req.keys { + item_result, get_err := get_item(engine, table_req.table_name, key) + + if get_err != .None && get_err != .Item_Not_Found { + // Storage error — add to unprocessed + append(&failed_keys, item_deep_copy(key)) + continue + } + + if item_val, has_item := item_result.?; has_item { + append(&found_items, item_val) + } + // If item not found, silently omit (DynamoDB behavior) + } + + if len(found_items) > 0 { + append(&result.responses, Batch_Get_Table_Result{ + table_name = table_req.table_name, + items = found_items[:], + }) + } else { + delete(found_items) + } + + if len(failed_keys) > 0 { + append(&result.unprocessed_keys, Batch_Get_Table_Request{ + table_name = table_req.table_name, + keys = failed_keys[:], + }) + } else { + delete(failed_keys) + } + } + + return result, .None +} diff --git a/dynamodb/condition.odin b/dynamodb/condition.odin new file mode 100644 index 0000000..82bafeb --- /dev/null +++ b/dynamodb/condition.odin @@ -0,0 +1,118 @@ +// ConditionExpression support for PutItem, DeleteItem, and UpdateItem +// +// ConditionExpression uses the same grammar as FilterExpression but is evaluated +// against the *existing* item (before the mutation). If the condition evaluates +// to false, the operation is rejected with ConditionalCheckFailedException. +// +// When there is no existing item: +// - attribute_not_exists(path) → true (attribute doesn't exist on a non-existent item) +// - attribute_exists(path) → false +// - All comparisons → false (no attribute to compare) +// +// This file provides: +// 1. parse_condition_expression_string — extract ConditionExpression from JSON body +// 2. evaluate_condition — evaluate parsed condition against an item +// 3. Condition_Result — result enum for condition evaluation +package dynamodb + +import "core:encoding/json" + +// ============================================================================ +// Condition Evaluation Result +// ============================================================================ + +Condition_Result :: enum { + Passed, // Condition met (or no condition specified) + Failed, // Condition not met → ConditionalCheckFailedException + Parse_Error, // Malformed ConditionExpression → ValidationException +} + +// ============================================================================ +// Request Parsing +// ============================================================================ + +// Extract the raw ConditionExpression string from the request body. +parse_condition_expression_string :: proc(request_body: []byte) -> (expr: string, ok: bool) { + data, parse_err := json.parse(request_body, allocator = context.temp_allocator) + if parse_err != nil { + return + } + defer json.destroy_value(data) + + root, root_ok := data.(json.Object) + if !root_ok { + return + } + + ce_val, found := root["ConditionExpression"] + if !found { + return + } + + ce_str, str_ok := ce_val.(json.String) + if !str_ok { + return + } + + expr = string(ce_str) + ok = true + return +} + +// ============================================================================ +// Full Condition Evaluation Pipeline +// +// Parses ConditionExpression + ExpressionAttributeNames/Values from the +// request body, then evaluates against the existing item. +// +// Parameters: +// request_body — full JSON request body +// existing_item — the item currently in the database (nil if no item exists) +// attr_names — pre-parsed ExpressionAttributeNames (caller may already have these) +// attr_values — pre-parsed ExpressionAttributeValues +// +// Returns Condition_Result: +// .Passed — no ConditionExpression, or condition evaluated to true +// .Failed — condition evaluated to false +// .Parse_Error — ConditionExpression is malformed +// ============================================================================ + +evaluate_condition_expression :: proc( + request_body: []byte, + existing_item: Maybe(Item), + attr_names: Maybe(map[string]string), + attr_values: map[string]Attribute_Value, +) -> Condition_Result { + // Extract ConditionExpression string + condition_str, has_condition := parse_condition_expression_string(request_body) + if !has_condition { + return .Passed // No condition → always pass + } + + // Parse the condition into a filter tree (same grammar as FilterExpression) + filter_node, parse_ok := parse_filter_expression(condition_str, attr_names, attr_values) + if !parse_ok || filter_node == nil { + return .Parse_Error + } + defer { + filter_node_destroy(filter_node) + free(filter_node) + } + + // If there is no existing item, build an empty item for evaluation. + // This means attribute_not_exists → true, attribute_exists → false, + // all comparisons → false (attribute not found). + eval_item: Item + if item, has_item := existing_item.?; has_item { + eval_item = item + } else { + // Empty item — no attributes exist + eval_item = Item{} + } + + if evaluate_filter(eval_item, filter_node) { + return .Passed + } + + return .Failed +} diff --git a/main.odin b/main.odin index f059fa1..767b87a 100644 --- a/main.odin +++ b/main.odin @@ -102,6 +102,10 @@ handle_dynamodb_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_all handle_query(engine, request, &response) case .Scan: handle_scan(engine, request, &response) + case .BatchWriteItem: + handle_batch_write_item(engine, request, &response) + case .BatchGetItem: + handle_batch_get_item(engine, request, &response) case .Unknown: return make_error_response(&response, .ValidationException, "Unknown operation") case: @@ -300,6 +304,95 @@ handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request } defer dynamodb.item_destroy(&item) + // ---- ConditionExpression evaluation ---- + _, has_condition := dynamodb.parse_condition_expression_string(request.body) + if has_condition { + // Parse shared expression attributes + attr_names := dynamodb.parse_expression_attribute_names(request.body) + defer { + if names, has_names := attr_names.?; has_names { + for k, v in names { + delete(k) + delete(v) + } + names_copy := names + delete(names_copy) + } + } + + attr_values, vals_ok := dynamodb.parse_expression_attribute_values(request.body) + if !vals_ok { + make_error_response(response, .ValidationException, "Invalid ExpressionAttributeValues") + return + } + defer { + for k, v in attr_values { + delete(k) + v_copy := v + dynamodb.attr_value_destroy(&v_copy) + } + delete(attr_values) + } + + // Fetch existing item to evaluate condition against + key_item, key_ok := dynamodb.parse_key_from_request(request.body) + existing_item: Maybe(dynamodb.Item) + + if !key_ok { + // If no explicit Key field, extract key from Item + // (PutItem doesn't have a Key field — the key is in the Item itself) + existing_maybe, get_err := dynamodb.get_item(engine, table_name, item) + if get_err != .None && get_err != .Table_Not_Found { + // Table not found is handled by put_item below + if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key { + handle_storage_error(response, get_err) + return + } + } + existing_item = existing_maybe + } else { + defer dynamodb.item_destroy(&key_item) + existing_maybe, get_err := dynamodb.get_item(engine, table_name, key_item) + if get_err != .None && get_err != .Table_Not_Found { + if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key { + handle_storage_error(response, get_err) + return + } + } + existing_item = existing_maybe + } + + defer { + if ex, has_ex := existing_item.?; has_ex { + ex_copy := ex + dynamodb.item_destroy(&ex_copy) + } + } + + // Evaluate condition + cond_result := dynamodb.evaluate_condition_expression( + request.body, existing_item, attr_names, attr_values, + ) + + switch cond_result { + case .Failed: + make_error_response( + response, .ConditionalCheckFailedException, + "The conditional request failed", + ) + return + case .Parse_Error: + make_error_response( + response, .ValidationException, + "Invalid ConditionExpression", + ) + return + case .Passed: + // Continue with put + } + } + + // ---- Execute PutItem ---- err := dynamodb.put_item(engine, table_name, item) if err != .None { handle_storage_error(response, err) @@ -353,6 +446,70 @@ handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ } defer dynamodb.item_destroy(&key) + // ---- ConditionExpression evaluation ---- + _, has_condition := dynamodb.parse_condition_expression_string(request.body) + if has_condition { + attr_names := dynamodb.parse_expression_attribute_names(request.body) + defer { + if names, has_names := attr_names.?; has_names { + for k, v in names { + delete(k) + delete(v) + } + names_copy := names + delete(names_copy) + } + } + + attr_values, vals_ok := dynamodb.parse_expression_attribute_values(request.body) + if !vals_ok { + make_error_response(response, .ValidationException, "Invalid ExpressionAttributeValues") + return + } + defer { + for k, v in attr_values { + delete(k) + v_copy := v + dynamodb.attr_value_destroy(&v_copy) + } + delete(attr_values) + } + + // Fetch existing item + existing_item, get_err := dynamodb.get_item(engine, table_name, key) + if get_err != .None && get_err != .Table_Not_Found { + if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key { + handle_storage_error(response, get_err) + return + } + } + defer { + if ex, has_ex := existing_item.?; has_ex { + ex_copy := ex + dynamodb.item_destroy(&ex_copy) + } + } + + cond_result := dynamodb.evaluate_condition_expression( + request.body, existing_item, attr_names, attr_values, + ) + + switch cond_result { + case .Failed: + make_error_response( + response, .ConditionalCheckFailedException, + "The conditional request failed", + ) + return + case .Parse_Error: + make_error_response(response, .ValidationException, "Invalid ConditionExpression") + return + case .Passed: + // Continue with delete + } + } + + // ---- Execute DeleteItem ---- err := dynamodb.delete_item(engine, table_name, key) if err != .None { handle_storage_error(response, err) @@ -413,6 +570,43 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ delete(attr_values) } + // ---- ConditionExpression evaluation ---- + _, has_condition := dynamodb.parse_condition_expression_string(request.body) + if has_condition { + // Fetch existing item to evaluate condition against + existing_item, get_err := dynamodb.get_item(engine, table_name, key_item) + if get_err != .None && get_err != .Table_Not_Found { + if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key { + handle_storage_error(response, get_err) + return + } + } + defer { + if ex, has_ex := existing_item.?; has_ex { + ex_copy := ex + dynamodb.item_destroy(&ex_copy) + } + } + + cond_result := dynamodb.evaluate_condition_expression( + request.body, existing_item, attr_names, attr_values, + ) + + switch cond_result { + case .Failed: + make_error_response( + response, .ConditionalCheckFailedException, + "The conditional request failed", + ) + return + case .Parse_Error: + make_error_response(response, .ValidationException, "Invalid ConditionExpression") + return + case .Passed: + // Continue with update + } + } + // Parse update plan plan, plan_ok := dynamodb.parse_update_expression(update_expr, attr_names, attr_values) if !plan_ok { @@ -462,8 +656,6 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ } case "UPDATED_NEW": - // Return only the attributes that were updated (in the new item) - // For simplicity, return the full new item (DynamoDB returns affected attributes) if new_val, has := new_item.?; has { item_json := dynamodb.serialize_item(new_val) resp := fmt.aprintf(`{"Attributes":%s}`, item_json) @@ -487,6 +679,361 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ } } +handle_batch_write_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + data, parse_err := json.parse(request.body, allocator = context.allocator) + if parse_err != nil { + make_error_response(response, .SerializationException, "Invalid JSON") + return + } + defer json.destroy_value(data) + + root, root_ok := data.(json.Object) + if !root_ok { + make_error_response(response, .SerializationException, "Request must be an object") + return + } + + request_items_val, found := root["RequestItems"] + if !found { + make_error_response(response, .ValidationException, "Missing RequestItems") + return + } + + request_items, ri_ok := request_items_val.(json.Object) + if !ri_ok { + make_error_response(response, .ValidationException, "RequestItems must be an object") + return + } + + // Count total operations for limit enforcement + total_ops := 0 + table_requests := make([dynamic]dynamodb.Batch_Write_Table_Request) + defer { + for &tr in table_requests { + for &req in tr.requests { + dynamodb.item_destroy(&req.item) + } + delete(tr.requests) + } + delete(table_requests) + } + + for table_name, table_val in request_items { + table_array, arr_ok := table_val.(json.Array) + if !arr_ok { + make_error_response(response, .ValidationException, + fmt.tprintf("RequestItems for table '%s' must be an array", table_name)) + return + } + + requests := make([dynamic]dynamodb.Write_Request) + + for elem in table_array { + elem_obj, elem_ok := elem.(json.Object) + if !elem_ok { + for &r in requests { + dynamodb.item_destroy(&r.item) + } + delete(requests) + make_error_response(response, .ValidationException, "Each write request must be an object") + return + } + + // Check for PutRequest + if put_val, has_put := elem_obj["PutRequest"]; has_put { + put_obj, put_ok := put_val.(json.Object) + if !put_ok { + for &r in requests { + dynamodb.item_destroy(&r.item) + } + delete(requests) + make_error_response(response, .ValidationException, "PutRequest must be an object") + return + } + + item_val, item_found := put_obj["Item"] + if !item_found { + for &r in requests { + dynamodb.item_destroy(&r.item) + } + delete(requests) + make_error_response(response, .ValidationException, "PutRequest missing Item") + return + } + + item, item_ok := dynamodb.parse_item_from_value(item_val) + if !item_ok { + for &r in requests { + dynamodb.item_destroy(&r.item) + } + delete(requests) + make_error_response(response, .ValidationException, "Invalid Item in PutRequest") + return + } + + append(&requests, dynamodb.Write_Request{type = .Put, item = item}) + total_ops += 1 + continue + } + + // Check for DeleteRequest + if del_val, has_del := elem_obj["DeleteRequest"]; has_del { + del_obj, del_ok := del_val.(json.Object) + if !del_ok { + for &r in requests { + dynamodb.item_destroy(&r.item) + } + delete(requests) + make_error_response(response, .ValidationException, "DeleteRequest must be an object") + return + } + + key_val, key_found := del_obj["Key"] + if !key_found { + for &r in requests { + dynamodb.item_destroy(&r.item) + } + delete(requests) + make_error_response(response, .ValidationException, "DeleteRequest missing Key") + return + } + + key_item, key_ok := dynamodb.parse_item_from_value(key_val) + if !key_ok { + for &r in requests { + dynamodb.item_destroy(&r.item) + } + delete(requests) + make_error_response(response, .ValidationException, "Invalid Key in DeleteRequest") + return + } + + append(&requests, dynamodb.Write_Request{type = .Delete, item = key_item}) + total_ops += 1 + continue + } + + // Neither PutRequest nor DeleteRequest + for &r in requests { + dynamodb.item_destroy(&r.item) + } + delete(requests) + make_error_response(response, .ValidationException, + "Each write request must contain PutRequest or DeleteRequest") + return + } + + append(&table_requests, dynamodb.Batch_Write_Table_Request{ + table_name = string(table_name), + requests = requests[:], + }) + } + + // Enforce 25-operation limit + if total_ops > 25 { + make_error_response(response, .ValidationException, + "Too many items requested for the BatchWriteItem call (max 25)") + return + } + + if total_ops == 0 { + make_error_response(response, .ValidationException, + "RequestItems must contain at least one table with at least one request") + return + } + + // Execute batch + result, err := dynamodb.batch_write_item(engine, table_requests[:]) + if err != .None { + handle_storage_error(response, err) + return + } + defer dynamodb.batch_write_result_destroy(&result) + + // Build response + builder := strings.builder_make() + strings.write_string(&builder, `{"UnprocessedItems":{`) + + unprocessed_count := 0 + for table_req, ti in result.unprocessed { + if ti > 0 { + strings.write_string(&builder, ",") + } + fmt.sbprintf(&builder, `"%s":[`, table_req.table_name) + + for req, ri in table_req.requests { + if ri > 0 { + strings.write_string(&builder, ",") + } + + item_json := dynamodb.serialize_item(req.item) + switch req.type { + case .Put: + fmt.sbprintf(&builder, `{"PutRequest":{"Item":%s}}`, item_json) + case .Delete: + fmt.sbprintf(&builder, `{"DeleteRequest":{"Key":%s}}`, item_json) + } + } + + strings.write_string(&builder, "]") + unprocessed_count += len(table_req.requests) + } + + strings.write_string(&builder, "}}") + + resp_body := strings.to_string(builder) + response_set_body(response, transmute([]byte)resp_body) +} + +handle_batch_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + data, parse_err := json.parse(request.body, allocator = context.allocator) + if parse_err != nil { + make_error_response(response, .SerializationException, "Invalid JSON") + return + } + defer json.destroy_value(data) + + root, root_ok := data.(json.Object) + if !root_ok { + make_error_response(response, .SerializationException, "Request must be an object") + return + } + + request_items_val, found := root["RequestItems"] + if !found { + make_error_response(response, .ValidationException, "Missing RequestItems") + return + } + + request_items, ri_ok := request_items_val.(json.Object) + if !ri_ok { + make_error_response(response, .ValidationException, "RequestItems must be an object") + return + } + + total_keys := 0 + table_requests := make([dynamic]dynamodb.Batch_Get_Table_Request) + defer { + for &tr in table_requests { + for &key in tr.keys { + dynamodb.item_destroy(&key) + } + delete(tr.keys) + } + delete(table_requests) + } + + for table_name, table_val in request_items { + table_obj, obj_ok := table_val.(json.Object) + if !obj_ok { + make_error_response(response, .ValidationException, + fmt.tprintf("RequestItems for table '%s' must be an object", table_name)) + return + } + + keys_val, keys_found := table_obj["Keys"] + if !keys_found { + make_error_response(response, .ValidationException, + fmt.tprintf("Missing Keys for table '%s'", table_name)) + return + } + + keys_array, keys_ok := keys_val.(json.Array) + if !keys_ok { + make_error_response(response, .ValidationException, + fmt.tprintf("Keys for table '%s' must be an array", table_name)) + return + } + + keys := make([dynamic]dynamodb.Item) + + for key_val in keys_array { + key_item, key_ok := dynamodb.parse_item_from_value(key_val) + if !key_ok { + for &k in keys { + dynamodb.item_destroy(&k) + } + delete(keys) + make_error_response(response, .ValidationException, "Invalid key in BatchGetItem") + return + } + append(&keys, key_item) + total_keys += 1 + } + + append(&table_requests, dynamodb.Batch_Get_Table_Request{ + table_name = string(table_name), + keys = keys[:], + }) + } + + // Enforce 100-key limit + if total_keys > 100 { + make_error_response(response, .ValidationException, + "Too many items requested for the BatchGetItem call (max 100)") + return + } + + if total_keys == 0 { + make_error_response(response, .ValidationException, + "RequestItems must contain at least one table with at least one key") + return + } + + // Execute batch get + result, err := dynamodb.batch_get_item(engine, table_requests[:]) + if err != .None { + handle_storage_error(response, err) + return + } + defer dynamodb.batch_get_result_destroy(&result) + + // Build response + builder := strings.builder_make() + strings.write_string(&builder, `{"Responses":{`) + + for table_result, ti in result.responses { + if ti > 0 { + strings.write_string(&builder, ",") + } + fmt.sbprintf(&builder, `"%s":[`, table_result.table_name) + + for item, ii in table_result.items { + if ii > 0 { + strings.write_string(&builder, ",") + } + item_json := dynamodb.serialize_item(item) + strings.write_string(&builder, item_json) + } + + strings.write_string(&builder, "]") + } + + strings.write_string(&builder, `},"UnprocessedKeys":{`) + + for table_req, ti in result.unprocessed_keys { + if ti > 0 { + strings.write_string(&builder, ",") + } + fmt.sbprintf(&builder, `"%s":{"Keys":[`, table_req.table_name) + + for key, ki in table_req.keys { + if ki > 0 { + strings.write_string(&builder, ",") + } + key_json := dynamodb.serialize_item(key) + strings.write_string(&builder, key_json) + } + + strings.write_string(&builder, "]}") + } + + strings.write_string(&builder, "}}") + + resp_body := strings.to_string(builder) + response_set_body(response, transmute([]byte)resp_body) +} + // ============================================================================ // Query and Scan Operations // ============================================================================