more house keeping
This commit is contained in:
@@ -98,8 +98,15 @@ batch_write_item :: proc(
|
|||||||
delete(failed_requests)
|
delete(failed_requests)
|
||||||
return result, var_err
|
return result, var_err
|
||||||
|
|
||||||
case .RocksDB_Error, .Item_Not_Found, .Table_Not_Found:
|
case .Table_Not_Found:
|
||||||
// Transient/throttling errors — add to unprocessed
|
// Non-existent table is a hard request failure, not a retryable condition.
|
||||||
|
// DynamoDB returns ResourceNotFoundException for the whole request.
|
||||||
|
batch_write_result_destroy(&result)
|
||||||
|
delete(failed_requests)
|
||||||
|
return result, .Table_Not_Found
|
||||||
|
|
||||||
|
case .RocksDB_Error, .Item_Not_Found:
|
||||||
|
// Genuinely transient/infrastructure errors — add to UnprocessedItems.
|
||||||
failed_item := item_deep_copy(req.item)
|
failed_item := item_deep_copy(req.item)
|
||||||
append(&failed_requests, Write_Request{
|
append(&failed_requests, Write_Request{
|
||||||
type = req.type,
|
type = req.type,
|
||||||
|
|||||||
@@ -529,45 +529,49 @@ parse_limit :: proc(request_body: []byte) -> int {
|
|||||||
// Returns nil (not an error) when the field is absent.
|
// Returns nil (not an error) when the field is absent.
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
|
// Returns (key, ok, body_parse_err).
|
||||||
|
// ok=true, body_parse_err=false → key present and valid, or key absent (no pagination)
|
||||||
|
// ok=false, body_parse_err=true → request body is not valid JSON or not an object
|
||||||
|
// ok=false, body_parse_err=false → ExclusiveStartKey present but malformed/invalid
|
||||||
parse_exclusive_start_key :: proc(
|
parse_exclusive_start_key :: proc(
|
||||||
request_body: []byte,
|
request_body: []byte,
|
||||||
table_name: string,
|
table_name: string,
|
||||||
key_schema: []Key_Schema_Element,
|
key_schema: []Key_Schema_Element,
|
||||||
) -> (result: Maybe([]byte), ok: bool) {
|
) -> (result: Maybe([]byte), ok: bool, body_err: bool) {
|
||||||
data, parse_err := json.parse(request_body, allocator = context.temp_allocator)
|
data, parse_err := json.parse(request_body, allocator = context.temp_allocator)
|
||||||
if parse_err != nil {
|
if parse_err != nil {
|
||||||
return nil, true // no ESK is fine
|
return nil, false, true // body is not valid JSON — real error
|
||||||
}
|
}
|
||||||
defer json.destroy_value(data)
|
defer json.destroy_value(data)
|
||||||
|
|
||||||
root, root_ok := data.(json.Object)
|
root, root_ok := data.(json.Object)
|
||||||
if !root_ok {
|
if !root_ok {
|
||||||
return nil, true
|
return nil, false, true // root must be an object — real error
|
||||||
}
|
}
|
||||||
|
|
||||||
esk_val, found := root["ExclusiveStartKey"]
|
esk_val, found := root["ExclusiveStartKey"]
|
||||||
if !found {
|
if !found {
|
||||||
return nil, true // absent → no pagination, that's ok
|
return nil, true, false // absent → no pagination, that's ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse ExclusiveStartKey as a DynamoDB Item
|
// Parse ExclusiveStartKey as a DynamoDB Item
|
||||||
key_item, item_ok := parse_item_from_value(esk_val)
|
key_item, item_ok := parse_item_from_value(esk_val)
|
||||||
if !item_ok {
|
if !item_ok {
|
||||||
return nil, false // present but malformed → real error
|
return nil, false, false // present but malformed → validation error
|
||||||
}
|
}
|
||||||
defer item_destroy(&key_item)
|
defer item_destroy(&key_item)
|
||||||
|
|
||||||
// Validate and extract key struct using schema
|
// Validate and extract key struct using schema
|
||||||
key_struct, key_ok := key_from_item(key_item, key_schema)
|
key_struct, key_ok := key_from_item(key_item, key_schema)
|
||||||
if !key_ok {
|
if !key_ok {
|
||||||
return nil, false // missing required key attributes
|
return nil, false, false // missing required key attributes
|
||||||
}
|
}
|
||||||
defer key_destroy(&key_struct)
|
defer key_destroy(&key_struct)
|
||||||
|
|
||||||
// Get raw byte values
|
// Get raw byte values
|
||||||
key_values, kv_ok := key_get_values(&key_struct)
|
key_values, kv_ok := key_get_values(&key_struct)
|
||||||
if !kv_ok {
|
if !kv_ok {
|
||||||
return nil, false
|
return nil, false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build binary storage key
|
// Build binary storage key
|
||||||
@@ -578,44 +582,41 @@ parse_exclusive_start_key :: proc(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// parse_exclusive_start_key_gsi ... Just a helper for GSI keys
|
// parse_exclusive_start_key_gsi ... Just a helper for GSI keys
|
||||||
|
// Returns (key, ok, body_parse_err) — same contract as parse_exclusive_start_key.
|
||||||
parse_exclusive_start_key_gsi :: proc(
|
parse_exclusive_start_key_gsi :: proc(
|
||||||
request_body: []byte,
|
request_body: []byte,
|
||||||
table_name: string,
|
table_name: string,
|
||||||
metadata: ^Table_Metadata,
|
metadata: ^Table_Metadata,
|
||||||
gsi: ^Global_Secondary_Index,
|
gsi: ^Global_Secondary_Index,
|
||||||
) -> (Maybe([]byte), bool) {
|
) -> (Maybe([]byte), bool, bool) {
|
||||||
root, ok := json.parse(request_body)
|
root, parse_err := json.parse(request_body)
|
||||||
if ok != nil do return nil, false
|
if parse_err != nil do return nil, false, true // body not valid JSON
|
||||||
defer json.destroy_value(root)
|
defer json.destroy_value(root)
|
||||||
|
|
||||||
// Assert the root Value as an Object before indexing
|
|
||||||
obj, obj_ok := root.(json.Object)
|
obj, obj_ok := root.(json.Object)
|
||||||
if !obj_ok do return nil, false
|
if !obj_ok do return nil, false, true // root must be an object
|
||||||
|
|
||||||
esk_val, has := obj["ExclusiveStartKey"]
|
esk_val, has := obj["ExclusiveStartKey"]
|
||||||
if !has do return nil, true
|
if !has do return nil, true, false // absent → no pagination
|
||||||
|
|
||||||
key_item, key_ok := parse_item_from_value(esk_val)
|
key_item, key_ok := parse_item_from_value(esk_val)
|
||||||
if !key_ok do return nil, false
|
if !key_ok do return nil, false, false
|
||||||
defer item_destroy(&key_item)
|
defer item_destroy(&key_item)
|
||||||
|
|
||||||
// index key
|
|
||||||
idx_key, idx_ok := key_from_item(key_item, gsi.key_schema)
|
idx_key, idx_ok := key_from_item(key_item, gsi.key_schema)
|
||||||
if !idx_ok do return nil, false
|
if !idx_ok do return nil, false, false
|
||||||
defer key_destroy(&idx_key)
|
defer key_destroy(&idx_key)
|
||||||
|
|
||||||
idx_vals, idx_vals_ok := key_get_values(&idx_key)
|
idx_vals, idx_vals_ok := key_get_values(&idx_key)
|
||||||
if !idx_vals_ok do return nil, false
|
if !idx_vals_ok do return nil, false, false
|
||||||
|
|
||||||
// base key
|
|
||||||
base_key, base_ok := key_from_item(key_item, metadata.key_schema)
|
base_key, base_ok := key_from_item(key_item, metadata.key_schema)
|
||||||
if !base_ok do return nil, false
|
if !base_ok do return nil, false, false
|
||||||
defer key_destroy(&base_key)
|
defer key_destroy(&base_key)
|
||||||
|
|
||||||
base_vals, base_vals_ok := key_get_values(&base_key)
|
base_vals, base_vals_ok := key_get_values(&base_key)
|
||||||
if !base_vals_ok do return nil, false
|
if !base_vals_ok do return nil, false, false
|
||||||
|
|
||||||
// build the actual RocksDB GSI key
|
|
||||||
k := build_gsi_key(
|
k := build_gsi_key(
|
||||||
table_name,
|
table_name,
|
||||||
gsi.index_name,
|
gsi.index_name,
|
||||||
@@ -624,7 +625,7 @@ parse_exclusive_start_key_gsi :: proc(
|
|||||||
base_vals.pk,
|
base_vals.pk,
|
||||||
base_vals.sk,
|
base_vals.sk,
|
||||||
)
|
)
|
||||||
return k, true
|
return k, true, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|||||||
@@ -793,12 +793,20 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto
|
|||||||
// --- Check if item already exists (need old item for GSI cleanup) ---
|
// --- Check if item already exists (need old item for GSI cleanup) ---
|
||||||
old_item: Maybe(Item) = nil
|
old_item: Maybe(Item) = nil
|
||||||
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
|
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
|
||||||
if existing_err == .None && existing_value != nil {
|
if existing_err == .NotFound {
|
||||||
|
// Item does not exist — nothing to clean up, proceed normally.
|
||||||
|
} else if existing_err != .None {
|
||||||
|
// Unexpected RocksDB I/O error — fail closed to avoid orphaned GSI entries.
|
||||||
|
return .RocksDB_Error
|
||||||
|
} else if existing_value != nil {
|
||||||
defer delete(existing_value)
|
defer delete(existing_value)
|
||||||
decoded_old, decode_ok := decode(existing_value)
|
decoded_old, decode_ok := decode(existing_value)
|
||||||
if decode_ok {
|
if !decode_ok {
|
||||||
old_item = decoded_old
|
// Value exists but is unreadable — fail closed rather than leaving
|
||||||
|
// stale GSI entries behind after the overwrite.
|
||||||
|
return .Serialization_Error
|
||||||
}
|
}
|
||||||
|
old_item = decoded_old
|
||||||
}
|
}
|
||||||
// Cleanup old_item at the end
|
// Cleanup old_item at the end
|
||||||
defer {
|
defer {
|
||||||
@@ -945,12 +953,21 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
|
|||||||
// --- Read existing item to know which GSI entries to remove ---
|
// --- Read existing item to know which GSI entries to remove ---
|
||||||
old_item: Maybe(Item) = nil
|
old_item: Maybe(Item) = nil
|
||||||
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
|
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
|
||||||
if existing_err == .None && existing_value != nil {
|
if existing_err == .NotFound {
|
||||||
|
// Item does not exist — nothing to delete (DynamoDB idempotent delete).
|
||||||
|
return .None
|
||||||
|
} else if existing_err != .None {
|
||||||
|
// Unexpected RocksDB I/O error — fail closed.
|
||||||
|
return .RocksDB_Error
|
||||||
|
} else if existing_value != nil {
|
||||||
defer delete(existing_value)
|
defer delete(existing_value)
|
||||||
decoded_old, decode_ok := decode(existing_value)
|
decoded_old, decode_ok := decode(existing_value)
|
||||||
if decode_ok {
|
if !decode_ok {
|
||||||
old_item = decoded_old
|
// Value exists but is corrupt — fail closed rather than deleting the
|
||||||
|
// base item while leaving its GSI entries dangling.
|
||||||
|
return .Serialization_Error
|
||||||
}
|
}
|
||||||
|
old_item = decoded_old
|
||||||
}
|
}
|
||||||
// Cleanup old_item at the end
|
// Cleanup old_item at the end
|
||||||
defer {
|
defer {
|
||||||
@@ -960,7 +977,7 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If item doesn't exist, nothing to delete (not an error in DynamoDB)
|
// If item doesn't exist (existing_value was nil with no error), nothing to delete.
|
||||||
if _, has_old := old_item.?; !has_old {
|
if _, has_old := old_item.?; !has_old {
|
||||||
return .None
|
return .None
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -333,7 +333,31 @@ transact_write_items :: proc(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
existing, _ := get_item_internal(engine, action.table_name, key_item, metadata)
|
existing, read_err := get_item_internal(engine, action.table_name, key_item, metadata)
|
||||||
|
#partial switch read_err {
|
||||||
|
case .None:
|
||||||
|
// Item found or not found — both fine.
|
||||||
|
case .RocksDB_Error, .Serialization_Error, .Internal_Error:
|
||||||
|
// Cannot safely determine old index keys — cancel the entire transaction.
|
||||||
|
reasons[idx] = Cancellation_Reason{
|
||||||
|
code = "InternalError",
|
||||||
|
message = "Failed to read existing item for index maintenance",
|
||||||
|
}
|
||||||
|
result.cancellation_reasons = reasons
|
||||||
|
return result, .Internal_Error
|
||||||
|
case .Missing_Key_Attribute, .Invalid_Key:
|
||||||
|
// The key we built from the action's own item/key should always be valid
|
||||||
|
// by this point (validated earlier), but treat defensively.
|
||||||
|
reasons[idx] = Cancellation_Reason{
|
||||||
|
code = "ValidationError",
|
||||||
|
message = "Invalid key when reading existing item",
|
||||||
|
}
|
||||||
|
result.cancellation_reasons = reasons
|
||||||
|
return result, .Internal_Error
|
||||||
|
case .Table_Not_Found, .Item_Not_Found, .Validation_Error:
|
||||||
|
// These should not be returned by get_item_internal, but handle
|
||||||
|
// defensively — treat as "item does not exist" and continue.
|
||||||
|
}
|
||||||
old_items[idx] = existing
|
old_items[idx] = existing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
63
main.odin
63
main.odin
@@ -421,12 +421,21 @@ handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request
|
|||||||
// If no explicit Key field, extract key from Item
|
// If no explicit Key field, extract key from Item
|
||||||
// (PutItem doesn't have a Key field — the key is in the Item itself)
|
// (PutItem doesn't have a Key field — the key is in the Item itself)
|
||||||
existing_maybe, get_err := dynamodb.get_item(engine, table_name, item)
|
existing_maybe, get_err := dynamodb.get_item(engine, table_name, item)
|
||||||
if get_err != .None && get_err != .Table_Not_Found {
|
#partial switch get_err {
|
||||||
// Table not found is handled by put_item below
|
case .None:
|
||||||
if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key {
|
// Item found or not found — both are fine, condition evaluates against
|
||||||
|
// whatever was returned (nil item = item doesn't exist).
|
||||||
|
case .Table_Not_Found:
|
||||||
|
// Table will be caught and reported properly by put_item below.
|
||||||
|
case .Missing_Key_Attribute, .Invalid_Key:
|
||||||
handle_storage_error(response, get_err)
|
handle_storage_error(response, get_err)
|
||||||
return
|
return
|
||||||
}
|
case .RocksDB_Error, .Serialization_Error, .Internal_Error:
|
||||||
|
make_error_response(response, .InternalServerError, "Failed to fetch existing item")
|
||||||
|
return
|
||||||
|
case .Validation_Error, .Item_Not_Found:
|
||||||
|
// Item_Not_Found shouldn't reach here (get_item returns nil, .None),
|
||||||
|
// but treat defensively.
|
||||||
}
|
}
|
||||||
existing_item = existing_maybe
|
existing_item = existing_maybe
|
||||||
} else {
|
} else {
|
||||||
@@ -566,11 +575,19 @@ handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
|
|||||||
|
|
||||||
// Fetch existing item
|
// Fetch existing item
|
||||||
existing_item, get_err := dynamodb.get_item(engine, table_name, key)
|
existing_item, get_err := dynamodb.get_item(engine, table_name, key)
|
||||||
if get_err != .None && get_err != .Table_Not_Found {
|
#partial switch get_err {
|
||||||
if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key {
|
case .None:
|
||||||
|
// Item found or not found — condition evaluates against whatever was returned.
|
||||||
|
case .Table_Not_Found:
|
||||||
|
// Table will be caught and reported properly by delete_item below.
|
||||||
|
case .Missing_Key_Attribute, .Invalid_Key:
|
||||||
handle_storage_error(response, get_err)
|
handle_storage_error(response, get_err)
|
||||||
return
|
return
|
||||||
}
|
case .RocksDB_Error, .Serialization_Error, .Internal_Error:
|
||||||
|
make_error_response(response, .InternalServerError, "Failed to fetch existing item")
|
||||||
|
return
|
||||||
|
case .Validation_Error, .Item_Not_Found:
|
||||||
|
// Defensive — shouldn't reach here normally.
|
||||||
}
|
}
|
||||||
defer {
|
defer {
|
||||||
if ex, has_ex := existing_item.?; has_ex {
|
if ex, has_ex := existing_item.?; has_ex {
|
||||||
@@ -1202,11 +1219,15 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Parse ExclusiveStartKey
|
// Parse ExclusiveStartKey
|
||||||
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key(
|
exclusive_start_key, esk_ok, esk_body_err := dynamodb.parse_exclusive_start_key(
|
||||||
request.body, table_name, metadata.key_schema,
|
request.body, table_name, metadata.key_schema,
|
||||||
)
|
)
|
||||||
if !esk_ok {
|
if !esk_ok {
|
||||||
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
if esk_body_err {
|
||||||
|
make_error_response(response, .SerializationException, "Request body is not valid JSON")
|
||||||
|
} else {
|
||||||
|
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer {
|
defer {
|
||||||
@@ -1257,11 +1278,15 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
esk_gsi, esk_gsi_ok := dynamodb.parse_exclusive_start_key_gsi(
|
esk_gsi, esk_gsi_ok, esk_gsi_body_err := dynamodb.parse_exclusive_start_key_gsi(
|
||||||
request.body, table_name, &metadata, gsi,
|
request.body, table_name, &metadata, gsi,
|
||||||
)
|
)
|
||||||
if !esk_gsi_ok {
|
if !esk_gsi_ok {
|
||||||
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
if esk_gsi_body_err {
|
||||||
|
make_error_response(response, .SerializationException, "Request body is not valid JSON")
|
||||||
|
} else {
|
||||||
|
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer {
|
defer {
|
||||||
@@ -1402,11 +1427,15 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
|
|||||||
limit = 100
|
limit = 100
|
||||||
}
|
}
|
||||||
|
|
||||||
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key(
|
exclusive_start_key, esk_ok, esk_body_err := dynamodb.parse_exclusive_start_key(
|
||||||
request.body, table_name, metadata.key_schema,
|
request.body, table_name, metadata.key_schema,
|
||||||
)
|
)
|
||||||
if !esk_ok {
|
if !esk_ok {
|
||||||
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
if esk_body_err {
|
||||||
|
make_error_response(response, .SerializationException, "Request body is not valid JSON")
|
||||||
|
} else {
|
||||||
|
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer {
|
defer {
|
||||||
@@ -1451,11 +1480,15 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
esk_gsi, esk_gsi_ok := dynamodb.parse_exclusive_start_key_gsi(
|
esk_gsi, esk_gsi_ok, esk_gsi_body_err := dynamodb.parse_exclusive_start_key_gsi(
|
||||||
request.body, table_name, &metadata, gsi,
|
request.body, table_name, &metadata, gsi,
|
||||||
)
|
)
|
||||||
if !esk_gsi_ok {
|
if !esk_gsi_ok {
|
||||||
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
if esk_gsi_body_err {
|
||||||
|
make_error_response(response, .SerializationException, "Request body is not valid JSON")
|
||||||
|
} else {
|
||||||
|
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer {
|
defer {
|
||||||
|
|||||||
Reference in New Issue
Block a user