From 5ee3df86f100217fa7937ef0836861194e11982d Mon Sep 17 00:00:00 2001 From: biondizzle Date: Sat, 21 Feb 2026 20:50:14 -0500 Subject: [PATCH] more house keeping --- dynamodb/batch.odin | 11 ++++++-- dynamodb/json.odin | 45 +++++++++++++++--------------- dynamodb/storage.odin | 31 ++++++++++++++++----- dynamodb/transact.odin | 26 ++++++++++++++++- main.odin | 63 ++++++++++++++++++++++++++++++++---------- 5 files changed, 129 insertions(+), 47 deletions(-) diff --git a/dynamodb/batch.odin b/dynamodb/batch.odin index b7072f0..71dd602 100644 --- a/dynamodb/batch.odin +++ b/dynamodb/batch.odin @@ -98,8 +98,15 @@ batch_write_item :: proc( delete(failed_requests) return result, var_err - case .RocksDB_Error, .Item_Not_Found, .Table_Not_Found: - // Transient/throttling errors — add to unprocessed + case .Table_Not_Found: + // Non-existent table is a hard request failure, not a retryable condition. + // DynamoDB returns ResourceNotFoundException for the whole request. + batch_write_result_destroy(&result) + delete(failed_requests) + return result, .Table_Not_Found + + case .RocksDB_Error, .Item_Not_Found: + // Genuinely transient/infrastructure errors — add to UnprocessedItems. failed_item := item_deep_copy(req.item) append(&failed_requests, Write_Request{ type = req.type, diff --git a/dynamodb/json.odin b/dynamodb/json.odin index a9e2ff2..2f056fe 100644 --- a/dynamodb/json.odin +++ b/dynamodb/json.odin @@ -529,45 +529,49 @@ parse_limit :: proc(request_body: []byte) -> int { // Returns nil (not an error) when the field is absent. // ============================================================================ +// Returns (key, ok, body_parse_err). +// ok=true, body_parse_err=false → key present and valid, or key absent (no pagination) +// ok=false, body_parse_err=true → request body is not valid JSON or not an object +// ok=false, body_parse_err=false → ExclusiveStartKey present but malformed/invalid parse_exclusive_start_key :: proc( request_body: []byte, table_name: string, key_schema: []Key_Schema_Element, -) -> (result: Maybe([]byte), ok: bool) { +) -> (result: Maybe([]byte), ok: bool, body_err: bool) { data, parse_err := json.parse(request_body, allocator = context.temp_allocator) if parse_err != nil { - return nil, true // no ESK is fine + return nil, false, true // body is not valid JSON — real error } defer json.destroy_value(data) root, root_ok := data.(json.Object) if !root_ok { - return nil, true + return nil, false, true // root must be an object — real error } esk_val, found := root["ExclusiveStartKey"] if !found { - return nil, true // absent → no pagination, that's ok + return nil, true, false // absent → no pagination, that's ok } // Parse ExclusiveStartKey as a DynamoDB Item key_item, item_ok := parse_item_from_value(esk_val) if !item_ok { - return nil, false // present but malformed → real error + return nil, false, false // present but malformed → validation error } defer item_destroy(&key_item) // Validate and extract key struct using schema key_struct, key_ok := key_from_item(key_item, key_schema) if !key_ok { - return nil, false // missing required key attributes + return nil, false, false // missing required key attributes } defer key_destroy(&key_struct) // Get raw byte values key_values, kv_ok := key_get_values(&key_struct) if !kv_ok { - return nil, false + return nil, false, false } // Build binary storage key @@ -578,44 +582,41 @@ parse_exclusive_start_key :: proc( } // parse_exclusive_start_key_gsi ... Just a helper for GSI keys +// Returns (key, ok, body_parse_err) — same contract as parse_exclusive_start_key. parse_exclusive_start_key_gsi :: proc( request_body: []byte, table_name: string, metadata: ^Table_Metadata, gsi: ^Global_Secondary_Index, -) -> (Maybe([]byte), bool) { - root, ok := json.parse(request_body) - if ok != nil do return nil, false +) -> (Maybe([]byte), bool, bool) { + root, parse_err := json.parse(request_body) + if parse_err != nil do return nil, false, true // body not valid JSON defer json.destroy_value(root) - // Assert the root Value as an Object before indexing obj, obj_ok := root.(json.Object) - if !obj_ok do return nil, false + if !obj_ok do return nil, false, true // root must be an object esk_val, has := obj["ExclusiveStartKey"] - if !has do return nil, true + if !has do return nil, true, false // absent → no pagination key_item, key_ok := parse_item_from_value(esk_val) - if !key_ok do return nil, false + if !key_ok do return nil, false, false defer item_destroy(&key_item) - // index key idx_key, idx_ok := key_from_item(key_item, gsi.key_schema) - if !idx_ok do return nil, false + if !idx_ok do return nil, false, false defer key_destroy(&idx_key) idx_vals, idx_vals_ok := key_get_values(&idx_key) - if !idx_vals_ok do return nil, false + if !idx_vals_ok do return nil, false, false - // base key base_key, base_ok := key_from_item(key_item, metadata.key_schema) - if !base_ok do return nil, false + if !base_ok do return nil, false, false defer key_destroy(&base_key) base_vals, base_vals_ok := key_get_values(&base_key) - if !base_vals_ok do return nil, false + if !base_vals_ok do return nil, false, false - // build the actual RocksDB GSI key k := build_gsi_key( table_name, gsi.index_name, @@ -624,7 +625,7 @@ parse_exclusive_start_key_gsi :: proc( base_vals.pk, base_vals.sk, ) - return k, true + return k, true, false } // ============================================================================ diff --git a/dynamodb/storage.odin b/dynamodb/storage.odin index 63a768d..a3c8797 100644 --- a/dynamodb/storage.odin +++ b/dynamodb/storage.odin @@ -793,12 +793,20 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto // --- Check if item already exists (need old item for GSI cleanup) --- old_item: Maybe(Item) = nil existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key) - if existing_err == .None && existing_value != nil { + if existing_err == .NotFound { + // Item does not exist — nothing to clean up, proceed normally. + } else if existing_err != .None { + // Unexpected RocksDB I/O error — fail closed to avoid orphaned GSI entries. + return .RocksDB_Error + } else if existing_value != nil { defer delete(existing_value) decoded_old, decode_ok := decode(existing_value) - if decode_ok { - old_item = decoded_old + if !decode_ok { + // Value exists but is unreadable — fail closed rather than leaving + // stale GSI entries behind after the overwrite. + return .Serialization_Error } + old_item = decoded_old } // Cleanup old_item at the end defer { @@ -945,12 +953,21 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S // --- Read existing item to know which GSI entries to remove --- old_item: Maybe(Item) = nil existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key) - if existing_err == .None && existing_value != nil { + if existing_err == .NotFound { + // Item does not exist — nothing to delete (DynamoDB idempotent delete). + return .None + } else if existing_err != .None { + // Unexpected RocksDB I/O error — fail closed. + return .RocksDB_Error + } else if existing_value != nil { defer delete(existing_value) decoded_old, decode_ok := decode(existing_value) - if decode_ok { - old_item = decoded_old + if !decode_ok { + // Value exists but is corrupt — fail closed rather than deleting the + // base item while leaving its GSI entries dangling. + return .Serialization_Error } + old_item = decoded_old } // Cleanup old_item at the end defer { @@ -960,7 +977,7 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S } } - // If item doesn't exist, nothing to delete (not an error in DynamoDB) + // If item doesn't exist (existing_value was nil with no error), nothing to delete. if _, has_old := old_item.?; !has_old { return .None } diff --git a/dynamodb/transact.odin b/dynamodb/transact.odin index 8a907d9..89055e1 100644 --- a/dynamodb/transact.odin +++ b/dynamodb/transact.odin @@ -333,7 +333,31 @@ transact_write_items :: proc( } } - existing, _ := get_item_internal(engine, action.table_name, key_item, metadata) + existing, read_err := get_item_internal(engine, action.table_name, key_item, metadata) + #partial switch read_err { + case .None: + // Item found or not found — both fine. + case .RocksDB_Error, .Serialization_Error, .Internal_Error: + // Cannot safely determine old index keys — cancel the entire transaction. + reasons[idx] = Cancellation_Reason{ + code = "InternalError", + message = "Failed to read existing item for index maintenance", + } + result.cancellation_reasons = reasons + return result, .Internal_Error + case .Missing_Key_Attribute, .Invalid_Key: + // The key we built from the action's own item/key should always be valid + // by this point (validated earlier), but treat defensively. + reasons[idx] = Cancellation_Reason{ + code = "ValidationError", + message = "Invalid key when reading existing item", + } + result.cancellation_reasons = reasons + return result, .Internal_Error + case .Table_Not_Found, .Item_Not_Found, .Validation_Error: + // These should not be returned by get_item_internal, but handle + // defensively — treat as "item does not exist" and continue. + } old_items[idx] = existing } diff --git a/main.odin b/main.odin index e11272c..3d4fbe5 100644 --- a/main.odin +++ b/main.odin @@ -421,12 +421,21 @@ handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request // If no explicit Key field, extract key from Item // (PutItem doesn't have a Key field — the key is in the Item itself) existing_maybe, get_err := dynamodb.get_item(engine, table_name, item) - if get_err != .None && get_err != .Table_Not_Found { - // Table not found is handled by put_item below - if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key { + #partial switch get_err { + case .None: + // Item found or not found — both are fine, condition evaluates against + // whatever was returned (nil item = item doesn't exist). + case .Table_Not_Found: + // Table will be caught and reported properly by put_item below. + case .Missing_Key_Attribute, .Invalid_Key: handle_storage_error(response, get_err) return - } + case .RocksDB_Error, .Serialization_Error, .Internal_Error: + make_error_response(response, .InternalServerError, "Failed to fetch existing item") + return + case .Validation_Error, .Item_Not_Found: + // Item_Not_Found shouldn't reach here (get_item returns nil, .None), + // but treat defensively. } existing_item = existing_maybe } else { @@ -566,11 +575,19 @@ handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ // Fetch existing item existing_item, get_err := dynamodb.get_item(engine, table_name, key) - if get_err != .None && get_err != .Table_Not_Found { - if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key { + #partial switch get_err { + case .None: + // Item found or not found — condition evaluates against whatever was returned. + case .Table_Not_Found: + // Table will be caught and reported properly by delete_item below. + case .Missing_Key_Attribute, .Invalid_Key: handle_storage_error(response, get_err) return - } + case .RocksDB_Error, .Serialization_Error, .Internal_Error: + make_error_response(response, .InternalServerError, "Failed to fetch existing item") + return + case .Validation_Error, .Item_Not_Found: + // Defensive — shouldn't reach here normally. } defer { if ex, has_ex := existing_item.?; has_ex { @@ -1202,11 +1219,15 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r } // Parse ExclusiveStartKey - exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key( + exclusive_start_key, esk_ok, esk_body_err := dynamodb.parse_exclusive_start_key( request.body, table_name, metadata.key_schema, ) if !esk_ok { - make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") + if esk_body_err { + make_error_response(response, .SerializationException, "Request body is not valid JSON") + } else { + make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") + } return } defer { @@ -1257,11 +1278,15 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r return } - esk_gsi, esk_gsi_ok := dynamodb.parse_exclusive_start_key_gsi( + esk_gsi, esk_gsi_ok, esk_gsi_body_err := dynamodb.parse_exclusive_start_key_gsi( request.body, table_name, &metadata, gsi, ) if !esk_gsi_ok { - make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") + if esk_gsi_body_err { + make_error_response(response, .SerializationException, "Request body is not valid JSON") + } else { + make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") + } return } defer { @@ -1402,11 +1427,15 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re limit = 100 } - exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key( + exclusive_start_key, esk_ok, esk_body_err := dynamodb.parse_exclusive_start_key( request.body, table_name, metadata.key_schema, ) if !esk_ok { - make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") + if esk_body_err { + make_error_response(response, .SerializationException, "Request body is not valid JSON") + } else { + make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") + } return } defer { @@ -1451,11 +1480,15 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re return } - esk_gsi, esk_gsi_ok := dynamodb.parse_exclusive_start_key_gsi( + esk_gsi, esk_gsi_ok, esk_gsi_body_err := dynamodb.parse_exclusive_start_key_gsi( request.body, table_name, &metadata, gsi, ) if !esk_gsi_ok { - make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") + if esk_gsi_body_err { + make_error_response(response, .SerializationException, "Request body is not valid JSON") + } else { + make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") + } return } defer {