Files
jormun-db/main.odin

2025 lines
58 KiB
Odin
Raw Normal View History

2026-02-15 08:55:22 -05:00
package main
2026-02-15 13:56:08 -05:00
import "core:encoding/json"
2026-02-15 08:55:22 -05:00
import "core:fmt"
import "core:mem"
import "core:os"
import "core:strconv"
2026-02-15 13:56:08 -05:00
import "core:strings"
import "dynamodb"
2026-02-15 08:55:22 -05:00
Config :: struct {
host: string,
port: int,
data_dir: string,
verbose: bool,
}
main :: proc() {
// Parse configuration
config := parse_config()
// Print banner
print_banner(config)
// Create data directory
os.make_directory(config.data_dir)
// Initialize storage engine
2026-02-15 13:56:08 -05:00
engine, err := dynamodb.storage_engine_init(context.allocator, config.data_dir)
2026-02-15 08:55:22 -05:00
if err != .None {
fmt.eprintln("Failed to initialize storage:", err)
os.exit(1)
}
2026-02-15 13:56:08 -05:00
defer dynamodb.storage_engine_destroy(engine)
2026-02-15 08:55:22 -05:00
fmt.printfln("Storage engine initialized at %s", config.data_dir)
fmt.printfln("Starting DynamoDB-compatible server on %s:%d", config.host, config.port)
// Create HTTP server
server_config := default_server_config()
server, server_ok := server_init(
context.allocator,
config.host,
config.port,
2026-02-15 13:56:08 -05:00
handle_dynamodb_request,
engine,
2026-02-15 08:55:22 -05:00
server_config,
)
if !server_ok {
fmt.eprintln("Failed to initialize HTTP server")
os.exit(1)
}
defer server_stop(&server)
fmt.println("Ready to accept connections!")
// Start server (blocks)
if !server_start(&server) {
fmt.eprintln("Server failed to start")
os.exit(1)
}
}
2026-02-15 13:56:08 -05:00
// DynamoDB request handler - called for each HTTP request with request-scoped arena allocator
handle_dynamodb_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_alloc: mem.Allocator) -> HTTP_Response {
engine := cast(^dynamodb.Storage_Engine)ctx
2026-02-15 08:55:22 -05:00
2026-02-15 13:56:08 -05:00
// All allocations in this function use the request arena automatically
2026-02-15 08:55:22 -05:00
response := response_init(request_alloc)
response_add_header(&response, "Content-Type", "application/x-amz-json-1.0")
response_add_header(&response, "x-amzn-RequestId", "local-request-id")
2026-02-15 13:56:08 -05:00
// Get X-Amz-Target header to determine operation
2026-02-15 08:55:22 -05:00
target := request_get_header(request, "X-Amz-Target")
2026-02-15 13:56:08 -05:00
if target == nil {
2026-02-15 20:57:16 -05:00
return make_error_response(&response, .SerializationException, "Missing X-Amz-Target header")
2026-02-15 13:56:08 -05:00
}
2026-02-15 08:55:22 -05:00
2026-02-15 13:56:08 -05:00
operation := dynamodb.operation_from_target(target.?)
// Route to appropriate handler
#partial switch operation {
case .CreateTable:
handle_create_table(engine, request, &response)
case .DeleteTable:
handle_delete_table(engine, request, &response)
case .DescribeTable:
handle_describe_table(engine, request, &response)
case .ListTables:
handle_list_tables(engine, request, &response)
case .PutItem:
handle_put_item(engine, request, &response)
case .GetItem:
handle_get_item(engine, request, &response)
case .DeleteItem:
handle_delete_item(engine, request, &response)
2026-02-15 20:57:16 -05:00
case .UpdateItem:
handle_update_item(engine, request, &response)
2026-02-15 13:56:08 -05:00
case .Query:
handle_query(engine, request, &response)
case .Scan:
handle_scan(engine, request, &response)
2026-02-16 00:18:20 -05:00
case .BatchWriteItem:
handle_batch_write_item(engine, request, &response)
case .BatchGetItem:
handle_batch_get_item(engine, request, &response)
2026-02-16 01:04:52 -05:00
case .TransactWriteItems:
handle_transact_write_items(engine, request, &response)
case .TransactGetItems:
handle_transact_get_items(engine, request, &response)
2026-02-15 13:56:08 -05:00
case .Unknown:
return make_error_response(&response, .ValidationException, "Unknown operation")
case:
return make_error_response(&response, .ValidationException, "Operation not implemented")
2026-02-15 08:55:22 -05:00
}
return response
}
2026-02-15 13:56:08 -05:00
// ============================================================================
// Table Operations
// ============================================================================
handle_create_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
// Parse JSON body
data, parse_err := json.parse(request.body, allocator = context.allocator)
if parse_err != nil {
2026-02-15 20:57:16 -05:00
make_error_response(response, .SerializationException, "Invalid JSON")
2026-02-15 13:56:08 -05:00
return
}
defer json.destroy_value(data)
root, ok := data.(json.Object)
if !ok {
2026-02-15 20:57:16 -05:00
make_error_response(response, .SerializationException, "Request must be an object")
2026-02-15 13:56:08 -05:00
return
}
// Extract TableName
table_name_val, found := root["TableName"]
if !found {
make_error_response(response, .ValidationException, "Missing TableName")
return
}
table_name, name_ok := table_name_val.(json.String)
if !name_ok {
make_error_response(response, .ValidationException, "TableName must be a string")
return
}
// Parse KeySchema
key_schema, ks_err := parse_key_schema(root)
if ks_err != .None {
msg := key_schema_error_message(ks_err)
make_error_response(response, .ValidationException, msg)
return
}
// Parse AttributeDefinitions
attr_defs, ad_err := parse_attribute_definitions(root)
if ad_err != .None {
msg := attribute_definitions_error_message(ad_err)
make_error_response(response, .ValidationException, msg)
return
}
// Validate that key attributes are defined
if !validate_key_attributes_defined(key_schema, attr_defs) {
make_error_response(response, .ValidationException, "Key attribute not defined in AttributeDefinitions")
return
}
2026-02-16 02:15:15 -05:00
// Parse GlobalSecondaryIndexes (optional)
gsis := parse_global_secondary_indexes(root, attr_defs)
defer {
if gsi_list, has := gsis.?; has {
for &g in gsi_list {
delete(g.index_name)
for &ks in g.key_schema { delete(ks.attribute_name) }
delete(g.key_schema)
if nka, has_nka := g.projection.non_key_attributes.?; has_nka {
for a in nka { delete(a) }
delete(nka)
}
}
delete(gsi_list)
}
}
2026-02-15 13:56:08 -05:00
// Create the table
2026-02-16 02:15:15 -05:00
desc, create_err := dynamodb.create_table(engine, string(table_name), key_schema, attr_defs, gsis)
2026-02-15 13:56:08 -05:00
if create_err != .None {
#partial switch create_err {
case .Table_Already_Exists:
make_error_response(response, .ResourceInUseException, "Table already exists")
case:
make_error_response(response, .InternalServerError, "Failed to create table")
}
return
}
// Build response
resp_body := fmt.aprintf(
`{{"TableDescription":{{"TableName":"%s","TableStatus":"%s","CreationDateTime":%d}}}}`,
desc.table_name,
dynamodb.table_status_to_string(desc.table_status),
desc.creation_date_time,
)
response_set_body(response, transmute([]byte)resp_body)
}
handle_delete_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
2026-02-17 09:57:35 -05:00
defer delete(table_name)
2026-02-15 13:56:08 -05:00
err := dynamodb.delete_table(engine, table_name)
if err != .None {
#partial switch err {
case .Table_Not_Found:
make_error_response(response, .ResourceNotFoundException, "Table not found")
case:
make_error_response(response, .InternalServerError, "Failed to delete table")
}
return
}
resp_body := fmt.aprintf(`{{"TableDescription":{{"TableName":"%s","TableStatus":"DELETING"}}}}`, table_name)
response_set_body(response, transmute([]byte)resp_body)
}
handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
2026-02-17 02:03:40 -05:00
defer delete(table_name)
2026-02-15 13:56:08 -05:00
metadata, err := dynamodb.get_table_metadata(engine, table_name)
if err != .None {
#partial switch err {
case .Table_Not_Found:
make_error_response(response, .ResourceNotFoundException, "Table not found")
case:
make_error_response(response, .InternalServerError, "Failed to describe table")
}
return
}
2026-02-15 20:57:16 -05:00
defer dynamodb.table_metadata_destroy(&metadata, engine.allocator)
2026-02-15 13:56:08 -05:00
// Build response with key schema
builder := strings.builder_make()
strings.write_string(&builder, `{"Table":{"TableName":"`)
strings.write_string(&builder, metadata.table_name)
strings.write_string(&builder, `","TableStatus":"`)
strings.write_string(&builder, dynamodb.table_status_to_string(metadata.table_status))
strings.write_string(&builder, `","CreationDateTime":`)
fmt.sbprintf(&builder, "%d", metadata.creation_date_time)
strings.write_string(&builder, `,"KeySchema":[`)
for ks, i in metadata.key_schema {
if i > 0 do strings.write_string(&builder, ",")
2026-02-17 09:57:35 -05:00
strings.write_string(&builder, `{"AttributeName":"`)
strings.write_string(&builder, ks.attribute_name)
strings.write_string(&builder, `","KeyType":"`)
strings.write_string(&builder, dynamodb.key_type_to_string(ks.key_type))
strings.write_string(&builder, `"}`)
2026-02-15 13:56:08 -05:00
}
strings.write_string(&builder, `],"AttributeDefinitions":[`)
for ad, i in metadata.attribute_definitions {
if i > 0 do strings.write_string(&builder, ",")
2026-02-17 09:57:35 -05:00
strings.write_string(&builder, `{"AttributeName":"`)
strings.write_string(&builder, ad.attribute_name)
strings.write_string(&builder, `","AttributeType":"`)
strings.write_string(&builder, dynamodb.scalar_type_to_string(ad.attribute_type))
strings.write_string(&builder, `"}`)
2026-02-15 13:56:08 -05:00
}
2026-02-16 02:15:15 -05:00
strings.write_string(&builder, `]`)
// Include GSI Info — INSIDE the Table object, before the closing braces
if gsis, has_gsis := metadata.global_secondary_indexes.?; has_gsis && len(gsis) > 0 {
strings.write_string(&builder, `,"GlobalSecondaryIndexes":[`)
for gsi, gi in gsis {
if gi > 0 do strings.write_string(&builder, ",")
strings.write_string(&builder, `{"IndexName":"`)
strings.write_string(&builder, gsi.index_name)
strings.write_string(&builder, `","KeySchema":[`)
for ks, ki in gsi.key_schema {
if ki > 0 do strings.write_string(&builder, ",")
2026-02-17 09:57:35 -05:00
strings.write_string(&builder, `{"AttributeName":"`)
strings.write_string(&builder, ks.attribute_name)
strings.write_string(&builder, `","KeyType":"`)
strings.write_string(&builder, dynamodb.key_type_to_string(ks.key_type))
strings.write_string(&builder, `"}`)
2026-02-16 02:15:15 -05:00
}
strings.write_string(&builder, `],"Projection":{"ProjectionType":"`)
strings.write_string(&builder, projection_type_to_string(gsi.projection.projection_type))
strings.write_string(&builder, `"},"IndexStatus":"ACTIVE"}`)
}
strings.write_string(&builder, "]")
}
// Close Table object and root object
strings.write_string(&builder, `}}`)
2026-02-15 13:56:08 -05:00
resp_body := strings.to_string(builder)
response_set_body(response, transmute([]byte)resp_body)
}
handle_list_tables :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
2026-02-15 15:04:43 -05:00
_ = request
2026-02-15 13:56:08 -05:00
2026-02-15 15:04:43 -05:00
tables, err := dynamodb.list_tables(engine)
if err != .None {
make_error_response(response, .InternalServerError, "Failed to list tables")
return
}
2026-02-15 13:56:08 -05:00
builder := strings.builder_make()
strings.write_string(&builder, `{"TableNames":[`)
for table, i in tables {
if i > 0 do strings.write_string(&builder, ",")
fmt.sbprintf(&builder, `"%s"`, table)
}
strings.write_string(&builder, `]}`)
resp_body := strings.to_string(builder)
response_set_body(response, transmute([]byte)resp_body)
}
// ============================================================================
// Item Operations
// ============================================================================
handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
2026-02-17 02:03:40 -05:00
defer delete(table_name)
2026-02-15 13:56:08 -05:00
item, item_ok := dynamodb.parse_item_from_request(request.body)
if !item_ok {
make_error_response(response, .ValidationException, "Invalid or missing Item")
return
}
defer dynamodb.item_destroy(&item)
2026-02-16 00:18:20 -05:00
// ---- ConditionExpression evaluation ----
2026-02-17 02:03:40 -05:00
cond_str, has_condition := dynamodb.parse_condition_expression_string(request.body)
2026-02-16 00:18:20 -05:00
if has_condition {
2026-02-17 02:03:40 -05:00
defer delete(cond_str)
2026-02-16 00:18:20 -05:00
// Parse shared expression attributes
attr_names := dynamodb.parse_expression_attribute_names(request.body)
defer {
if names, has_names := attr_names.?; has_names {
for k, v in names {
delete(k)
delete(v)
}
names_copy := names
delete(names_copy)
}
}
attr_values, vals_ok := dynamodb.parse_expression_attribute_values(request.body)
if !vals_ok {
make_error_response(response, .ValidationException, "Invalid ExpressionAttributeValues")
return
}
defer {
for k, v in attr_values {
delete(k)
v_copy := v
dynamodb.attr_value_destroy(&v_copy)
}
delete(attr_values)
}
// Fetch existing item to evaluate condition against
key_item, key_ok := dynamodb.parse_key_from_request(request.body)
existing_item: Maybe(dynamodb.Item)
if !key_ok {
// If no explicit Key field, extract key from Item
// (PutItem doesn't have a Key field — the key is in the Item itself)
existing_maybe, get_err := dynamodb.get_item(engine, table_name, item)
if get_err != .None && get_err != .Table_Not_Found {
// Table not found is handled by put_item below
if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key {
handle_storage_error(response, get_err)
return
}
}
existing_item = existing_maybe
} else {
defer dynamodb.item_destroy(&key_item)
existing_maybe, get_err := dynamodb.get_item(engine, table_name, key_item)
if get_err != .None && get_err != .Table_Not_Found {
if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key {
handle_storage_error(response, get_err)
return
}
}
existing_item = existing_maybe
}
defer {
if ex, has_ex := existing_item.?; has_ex {
ex_copy := ex
dynamodb.item_destroy(&ex_copy)
}
}
// Evaluate condition
cond_result := dynamodb.evaluate_condition_expression(
request.body, existing_item, attr_names, attr_values,
)
switch cond_result {
case .Failed:
make_error_response(
response, .ConditionalCheckFailedException,
"The conditional request failed",
)
return
case .Parse_Error:
make_error_response(
response, .ValidationException,
"Invalid ConditionExpression",
)
return
case .Passed:
// Continue with put
}
}
// ---- Execute PutItem ----
2026-02-15 13:56:08 -05:00
err := dynamodb.put_item(engine, table_name, item)
if err != .None {
2026-02-15 20:57:16 -05:00
handle_storage_error(response, err)
2026-02-15 13:56:08 -05:00
return
}
response_set_body(response, transmute([]byte)string("{}"))
}
handle_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
2026-02-17 09:57:35 -05:00
defer delete(table_name)
2026-02-15 13:56:08 -05:00
key, key_ok := dynamodb.parse_key_from_request(request.body)
if !key_ok {
make_error_response(response, .ValidationException, "Invalid or missing Key")
return
}
defer dynamodb.item_destroy(&key)
item, err := dynamodb.get_item(engine, table_name, key)
if err != .None {
2026-02-15 20:57:16 -05:00
handle_storage_error(response, err)
2026-02-15 13:56:08 -05:00
return
}
if item_val, has_item := item.?; has_item {
defer dynamodb.item_destroy(&item_val)
2026-02-17 02:03:40 -05:00
// Build response directly to avoid intermediate string allocations
builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
strings.write_string(&builder, `{"Item":`)
dynamodb.serialize_item_to_builder(&builder, item_val)
strings.write_string(&builder, `}`)
resp_body := strings.clone(strings.to_string(builder))
response_set_body(response, transmute([]byte)resp_body)
2026-02-15 13:56:08 -05:00
} else {
response_set_body(response, transmute([]byte)string("{}"))
}
}
handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
2026-02-17 09:57:35 -05:00
defer delete(table_name)
2026-02-15 13:56:08 -05:00
key, key_ok := dynamodb.parse_key_from_request(request.body)
if !key_ok {
make_error_response(response, .ValidationException, "Invalid or missing Key")
return
}
defer dynamodb.item_destroy(&key)
2026-02-16 00:18:20 -05:00
// ---- ConditionExpression evaluation ----
_, has_condition := dynamodb.parse_condition_expression_string(request.body)
if has_condition {
attr_names := dynamodb.parse_expression_attribute_names(request.body)
defer {
if names, has_names := attr_names.?; has_names {
for k, v in names {
delete(k)
delete(v)
}
names_copy := names
delete(names_copy)
}
}
attr_values, vals_ok := dynamodb.parse_expression_attribute_values(request.body)
if !vals_ok {
make_error_response(response, .ValidationException, "Invalid ExpressionAttributeValues")
return
}
defer {
for k, v in attr_values {
delete(k)
v_copy := v
dynamodb.attr_value_destroy(&v_copy)
}
delete(attr_values)
}
// Fetch existing item
existing_item, get_err := dynamodb.get_item(engine, table_name, key)
if get_err != .None && get_err != .Table_Not_Found {
if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key {
handle_storage_error(response, get_err)
return
}
}
defer {
if ex, has_ex := existing_item.?; has_ex {
ex_copy := ex
dynamodb.item_destroy(&ex_copy)
}
}
cond_result := dynamodb.evaluate_condition_expression(
request.body, existing_item, attr_names, attr_values,
)
switch cond_result {
case .Failed:
make_error_response(
response, .ConditionalCheckFailedException,
"The conditional request failed",
)
return
case .Parse_Error:
make_error_response(response, .ValidationException, "Invalid ConditionExpression")
return
case .Passed:
// Continue with delete
}
}
// ---- Execute DeleteItem ----
2026-02-15 13:56:08 -05:00
err := dynamodb.delete_item(engine, table_name, key)
if err != .None {
2026-02-15 20:57:16 -05:00
handle_storage_error(response, err)
2026-02-15 13:56:08 -05:00
return
}
response_set_body(response, transmute([]byte)string("{}"))
}
2026-02-15 23:38:48 -05:00
// UpdateItem ...
2026-02-15 20:57:16 -05:00
handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
2026-02-15 23:38:48 -05:00
// Parse TableName
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
2026-02-17 02:03:40 -05:00
defer delete(table_name)
2026-02-15 23:38:48 -05:00
// Parse Key
key_item, key_ok := dynamodb.parse_key_from_request(request.body)
if !key_ok {
make_error_response(response, .ValidationException, "Invalid or missing Key")
return
}
defer dynamodb.item_destroy(&key_item)
// Parse UpdateExpression
update_expr, ue_ok := dynamodb.parse_update_expression_string(request.body)
if !ue_ok {
make_error_response(response, .ValidationException, "Missing or invalid UpdateExpression")
return
}
2026-02-17 02:03:40 -05:00
defer delete(update_expr)
2026-02-15 23:38:48 -05:00
// Parse ExpressionAttributeNames and ExpressionAttributeValues
attr_names := dynamodb.parse_expression_attribute_names(request.body)
defer {
if names, has_names := attr_names.?; has_names {
for k, v in names {
delete(k)
delete(v)
}
names_copy := names
delete(names_copy)
}
}
attr_values, vals_ok := dynamodb.parse_expression_attribute_values(request.body)
if !vals_ok {
make_error_response(response, .ValidationException, "Invalid ExpressionAttributeValues")
return
}
defer {
for k, v in attr_values {
delete(k)
v_copy := v
dynamodb.attr_value_destroy(&v_copy)
}
delete(attr_values)
}
2026-02-16 00:18:20 -05:00
// ---- ConditionExpression evaluation ----
_, has_condition := dynamodb.parse_condition_expression_string(request.body)
if has_condition {
// Fetch existing item to evaluate condition against
existing_item, get_err := dynamodb.get_item(engine, table_name, key_item)
if get_err != .None && get_err != .Table_Not_Found {
if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key {
handle_storage_error(response, get_err)
return
}
}
defer {
if ex, has_ex := existing_item.?; has_ex {
ex_copy := ex
dynamodb.item_destroy(&ex_copy)
}
}
cond_result := dynamodb.evaluate_condition_expression(
request.body, existing_item, attr_names, attr_values,
)
switch cond_result {
case .Failed:
make_error_response(
response, .ConditionalCheckFailedException,
"The conditional request failed",
)
return
case .Parse_Error:
make_error_response(response, .ValidationException, "Invalid ConditionExpression")
return
case .Passed:
// Continue with update
}
}
2026-02-15 23:38:48 -05:00
// Parse update plan
plan, plan_ok := dynamodb.parse_update_expression(update_expr, attr_names, attr_values)
if !plan_ok {
make_error_response(response, .ValidationException, "Failed to parse UpdateExpression")
return
}
defer dynamodb.update_plan_destroy(&plan)
// Parse ReturnValues
return_values := dynamodb.parse_return_values(request.body)
2026-02-17 02:03:40 -05:00
defer delete(return_values)
2026-02-15 23:38:48 -05:00
// Execute update
old_item, new_item, err := dynamodb.update_item(engine, table_name, key_item, &plan)
if err != .None {
handle_storage_error(response, err)
return
}
defer {
if old, has := old_item.?; has {
old_copy := old
dynamodb.item_destroy(&old_copy)
}
if new_val, has := new_item.?; has {
new_copy := new_val
dynamodb.item_destroy(&new_copy)
}
}
// Build response based on ReturnValues
2026-02-17 02:03:40 -05:00
builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
2026-02-15 23:38:48 -05:00
switch return_values {
case "ALL_NEW":
if new_val, has := new_item.?; has {
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{"Attributes":`)
dynamodb.serialize_item_to_builder(&builder, new_val)
strings.write_string(&builder, `}`)
2026-02-15 23:38:48 -05:00
} else {
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{}`)
2026-02-15 23:38:48 -05:00
}
case "ALL_OLD":
if old, has := old_item.?; has {
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{"Attributes":`)
dynamodb.serialize_item_to_builder(&builder, old)
strings.write_string(&builder, `}`)
2026-02-15 23:38:48 -05:00
} else {
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{}`)
2026-02-15 23:38:48 -05:00
}
case "UPDATED_NEW":
if new_val, has := new_item.?; has {
2026-02-16 01:04:52 -05:00
filtered := filter_updated_attributes(new_val, &plan)
defer dynamodb.item_destroy(&filtered)
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{"Attributes":`)
dynamodb.serialize_item_to_builder(&builder, filtered)
strings.write_string(&builder, `}`)
2026-02-15 23:38:48 -05:00
} else {
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{}`)
2026-02-15 23:38:48 -05:00
}
case "UPDATED_OLD":
if old, has := old_item.?; has {
2026-02-16 01:04:52 -05:00
filtered := filter_updated_attributes(old, &plan)
defer dynamodb.item_destroy(&filtered)
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{"Attributes":`)
dynamodb.serialize_item_to_builder(&builder, filtered)
strings.write_string(&builder, `}`)
2026-02-15 23:38:48 -05:00
} else {
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{}`)
2026-02-15 23:38:48 -05:00
}
case:
// "NONE" or default
2026-02-17 02:03:40 -05:00
strings.write_string(&builder, `{}`)
2026-02-15 23:38:48 -05:00
}
2026-02-17 02:03:40 -05:00
resp_body := strings.clone(strings.to_string(builder))
response_set_body(response, transmute([]byte)resp_body)
2026-02-15 20:57:16 -05:00
}
2026-02-16 00:18:20 -05:00
handle_batch_write_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
data, parse_err := json.parse(request.body, allocator = context.allocator)
if parse_err != nil {
make_error_response(response, .SerializationException, "Invalid JSON")
return
}
defer json.destroy_value(data)
root, root_ok := data.(json.Object)
if !root_ok {
make_error_response(response, .SerializationException, "Request must be an object")
return
}
request_items_val, found := root["RequestItems"]
if !found {
make_error_response(response, .ValidationException, "Missing RequestItems")
return
}
request_items, ri_ok := request_items_val.(json.Object)
if !ri_ok {
make_error_response(response, .ValidationException, "RequestItems must be an object")
return
}
// Count total operations for limit enforcement
total_ops := 0
table_requests := make([dynamic]dynamodb.Batch_Write_Table_Request)
defer {
for &tr in table_requests {
for &req in tr.requests {
dynamodb.item_destroy(&req.item)
}
delete(tr.requests)
}
delete(table_requests)
}
for table_name, table_val in request_items {
table_array, arr_ok := table_val.(json.Array)
if !arr_ok {
make_error_response(response, .ValidationException,
fmt.tprintf("RequestItems for table '%s' must be an array", table_name))
return
}
requests := make([dynamic]dynamodb.Write_Request)
for elem in table_array {
elem_obj, elem_ok := elem.(json.Object)
if !elem_ok {
for &r in requests {
dynamodb.item_destroy(&r.item)
}
delete(requests)
make_error_response(response, .ValidationException, "Each write request must be an object")
return
}
// Check for PutRequest
if put_val, has_put := elem_obj["PutRequest"]; has_put {
put_obj, put_ok := put_val.(json.Object)
if !put_ok {
for &r in requests {
dynamodb.item_destroy(&r.item)
}
delete(requests)
make_error_response(response, .ValidationException, "PutRequest must be an object")
return
}
item_val, item_found := put_obj["Item"]
if !item_found {
for &r in requests {
dynamodb.item_destroy(&r.item)
}
delete(requests)
make_error_response(response, .ValidationException, "PutRequest missing Item")
return
}
item, item_ok := dynamodb.parse_item_from_value(item_val)
if !item_ok {
for &r in requests {
dynamodb.item_destroy(&r.item)
}
delete(requests)
make_error_response(response, .ValidationException, "Invalid Item in PutRequest")
return
}
append(&requests, dynamodb.Write_Request{type = .Put, item = item})
total_ops += 1
continue
}
// Check for DeleteRequest
if del_val, has_del := elem_obj["DeleteRequest"]; has_del {
del_obj, del_ok := del_val.(json.Object)
if !del_ok {
for &r in requests {
dynamodb.item_destroy(&r.item)
}
delete(requests)
make_error_response(response, .ValidationException, "DeleteRequest must be an object")
return
}
key_val, key_found := del_obj["Key"]
if !key_found {
for &r in requests {
dynamodb.item_destroy(&r.item)
}
delete(requests)
make_error_response(response, .ValidationException, "DeleteRequest missing Key")
return
}
key_item, key_ok := dynamodb.parse_item_from_value(key_val)
if !key_ok {
for &r in requests {
dynamodb.item_destroy(&r.item)
}
delete(requests)
make_error_response(response, .ValidationException, "Invalid Key in DeleteRequest")
return
}
append(&requests, dynamodb.Write_Request{type = .Delete, item = key_item})
total_ops += 1
continue
}
// Neither PutRequest nor DeleteRequest
for &r in requests {
dynamodb.item_destroy(&r.item)
}
delete(requests)
make_error_response(response, .ValidationException,
"Each write request must contain PutRequest or DeleteRequest")
return
}
append(&table_requests, dynamodb.Batch_Write_Table_Request{
2026-02-17 02:03:40 -05:00
table_name = strings.clone(string(table_name)),
2026-02-16 00:18:20 -05:00
requests = requests[:],
})
}
// Enforce 25-operation limit
if total_ops > 25 {
make_error_response(response, .ValidationException,
"Too many items requested for the BatchWriteItem call (max 25)")
return
}
if total_ops == 0 {
make_error_response(response, .ValidationException,
"RequestItems must contain at least one table with at least one request")
return
}
// Execute batch
result, err := dynamodb.batch_write_item(engine, table_requests[:])
if err != .None {
handle_storage_error(response, err)
return
}
defer dynamodb.batch_write_result_destroy(&result)
// Build response
builder := strings.builder_make()
strings.write_string(&builder, `{"UnprocessedItems":{`)
unprocessed_count := 0
for table_req, ti in result.unprocessed {
if ti > 0 {
strings.write_string(&builder, ",")
}
fmt.sbprintf(&builder, `"%s":[`, table_req.table_name)
for req, ri in table_req.requests {
if ri > 0 {
strings.write_string(&builder, ",")
}
item_json := dynamodb.serialize_item(req.item)
switch req.type {
case .Put:
2026-02-17 09:57:35 -05:00
strings.write_string(&builder, `{"PutRequest":{"Item":`)
strings.write_string(&builder, item_json)
strings.write_string(&builder, "}}")
2026-02-16 00:18:20 -05:00
case .Delete:
2026-02-17 09:57:35 -05:00
strings.write_string(&builder, `{"DeleteRequest":{"Key":`)
strings.write_string(&builder, item_json)
strings.write_string(&builder, "}}")
2026-02-16 00:18:20 -05:00
}
}
strings.write_string(&builder, "]")
unprocessed_count += len(table_req.requests)
}
strings.write_string(&builder, "}}")
resp_body := strings.to_string(builder)
response_set_body(response, transmute([]byte)resp_body)
}
handle_batch_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
data, parse_err := json.parse(request.body, allocator = context.allocator)
if parse_err != nil {
make_error_response(response, .SerializationException, "Invalid JSON")
return
}
defer json.destroy_value(data)
root, root_ok := data.(json.Object)
if !root_ok {
make_error_response(response, .SerializationException, "Request must be an object")
return
}
request_items_val, found := root["RequestItems"]
if !found {
make_error_response(response, .ValidationException, "Missing RequestItems")
return
}
request_items, ri_ok := request_items_val.(json.Object)
if !ri_ok {
make_error_response(response, .ValidationException, "RequestItems must be an object")
return
}
total_keys := 0
table_requests := make([dynamic]dynamodb.Batch_Get_Table_Request)
defer {
for &tr in table_requests {
for &key in tr.keys {
dynamodb.item_destroy(&key)
}
delete(tr.keys)
}
delete(table_requests)
}
for table_name, table_val in request_items {
table_obj, obj_ok := table_val.(json.Object)
if !obj_ok {
make_error_response(response, .ValidationException,
fmt.tprintf("RequestItems for table '%s' must be an object", table_name))
return
}
keys_val, keys_found := table_obj["Keys"]
if !keys_found {
make_error_response(response, .ValidationException,
fmt.tprintf("Missing Keys for table '%s'", table_name))
return
}
keys_array, keys_ok := keys_val.(json.Array)
if !keys_ok {
make_error_response(response, .ValidationException,
fmt.tprintf("Keys for table '%s' must be an array", table_name))
return
}
keys := make([dynamic]dynamodb.Item)
for key_val in keys_array {
key_item, key_ok := dynamodb.parse_item_from_value(key_val)
if !key_ok {
for &k in keys {
dynamodb.item_destroy(&k)
}
delete(keys)
make_error_response(response, .ValidationException, "Invalid key in BatchGetItem")
return
}
append(&keys, key_item)
total_keys += 1
}
append(&table_requests, dynamodb.Batch_Get_Table_Request{
2026-02-17 02:03:40 -05:00
table_name = strings.clone(string(table_name)),
2026-02-16 00:18:20 -05:00
keys = keys[:],
})
}
// Enforce 100-key limit
if total_keys > 100 {
make_error_response(response, .ValidationException,
"Too many items requested for the BatchGetItem call (max 100)")
return
}
if total_keys == 0 {
make_error_response(response, .ValidationException,
"RequestItems must contain at least one table with at least one key")
return
}
// Execute batch get
result, err := dynamodb.batch_get_item(engine, table_requests[:])
if err != .None {
handle_storage_error(response, err)
return
}
defer dynamodb.batch_get_result_destroy(&result)
// Build response
2026-02-17 02:03:40 -05:00
builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
2026-02-16 00:18:20 -05:00
strings.write_string(&builder, `{"Responses":{`)
for table_result, ti in result.responses {
if ti > 0 {
strings.write_string(&builder, ",")
}
fmt.sbprintf(&builder, `"%s":[`, table_result.table_name)
for item, ii in table_result.items {
if ii > 0 {
strings.write_string(&builder, ",")
}
2026-02-17 02:03:40 -05:00
dynamodb.serialize_item_to_builder(&builder, item)
2026-02-16 00:18:20 -05:00
}
strings.write_string(&builder, "]")
}
strings.write_string(&builder, `},"UnprocessedKeys":{`)
for table_req, ti in result.unprocessed_keys {
if ti > 0 {
strings.write_string(&builder, ",")
}
2026-02-17 09:57:35 -05:00
strings.write_string(&builder, `"`)
strings.write_string(&builder, table_req.table_name)
strings.write_string(&builder, `":{"Keys":["`)
2026-02-16 00:18:20 -05:00
for key, ki in table_req.keys {
if ki > 0 {
strings.write_string(&builder, ",")
}
2026-02-17 02:03:40 -05:00
dynamodb.serialize_item_to_builder(&builder, key)
2026-02-16 00:18:20 -05:00
}
strings.write_string(&builder, "]}")
}
strings.write_string(&builder, "}}")
2026-02-17 02:03:40 -05:00
// clone the god damn string
resp_body := strings.clone(strings.to_string(builder))
2026-02-16 00:18:20 -05:00
response_set_body(response, transmute([]byte)resp_body)
}
2026-02-15 13:56:08 -05:00
// ============================================================================
// Query and Scan Operations
// ============================================================================
2026-02-15 23:38:48 -05:00
// handle_query ...
2026-02-15 13:56:08 -05:00
handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
2026-02-15 15:04:43 -05:00
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
2026-02-17 02:03:40 -05:00
defer delete(table_name)
2026-02-15 15:04:43 -05:00
2026-02-16 02:15:15 -05:00
// Grab index name from request body
index_name := parse_index_name(request.body)
2026-02-17 02:03:40 -05:00
defer {
if idx, has := index_name.?; has {
delete(idx)
}
}
2026-02-16 02:15:15 -05:00
2026-02-15 23:38:48 -05:00
// Fetch table metadata early for ExclusiveStartKey parsing
2026-02-15 20:57:16 -05:00
metadata, meta_err := dynamodb.get_table_metadata(engine, table_name)
if meta_err != .None {
handle_storage_error(response, meta_err)
return
}
defer dynamodb.table_metadata_destroy(&metadata, engine.allocator)
2026-02-15 15:04:43 -05:00
// Parse KeyConditionExpression
kc, kc_ok := dynamodb.parse_query_key_condition(request.body)
if !kc_ok {
make_error_response(response, .ValidationException, "Missing or invalid KeyConditionExpression")
return
}
defer dynamodb.key_condition_destroy(&kc)
// Extract partition key bytes
pk_bytes, pk_ok := dynamodb.key_condition_get_pk_bytes(&kc)
if !pk_ok {
make_error_response(response, .ValidationException, "Invalid partition key type")
return
}
pk_owned := make([]byte, len(pk_bytes))
copy(pk_owned, pk_bytes)
defer delete(pk_owned)
2026-02-16 02:15:15 -05:00
// ---- Parse shared parameters BEFORE the GSI/table branch ----
2026-02-15 15:04:43 -05:00
// Parse Limit
limit := dynamodb.parse_limit(request.body)
if limit == 0 {
limit = 100
}
2026-02-15 23:38:48 -05:00
// Parse ExclusiveStartKey
2026-02-15 20:57:16 -05:00
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key(
request.body, table_name, metadata.key_schema,
)
if !esk_ok {
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
2026-02-15 15:04:43 -05:00
return
}
2026-02-15 20:57:16 -05:00
defer {
if esk, has_esk := exclusive_start_key.?; has_esk {
delete(esk)
}
2026-02-15 15:04:43 -05:00
}
2026-02-15 23:38:48 -05:00
// Pass sort key condition through
2026-02-15 20:57:16 -05:00
sk_condition: Maybe(dynamodb.Sort_Key_Condition) = nil
if skc, has_skc := kc.sk_condition.?; has_skc {
sk_condition = skc
2026-02-15 15:04:43 -05:00
}
2026-02-15 23:38:48 -05:00
// ---- Parse ExpressionAttributeNames/Values for filter/projection ----
attr_names := dynamodb.parse_expression_attribute_names(request.body)
defer {
if names, has_names := attr_names.?; has_names {
for k, v in names {
delete(k)
delete(v)
}
names_copy := names
delete(names_copy)
}
}
2026-02-16 04:49:10 -05:00
attr_values, vals_ok := dynamodb.parse_expression_attribute_values(request.body)
if !vals_ok {
make_error_response(response, .ValidationException, "Invalid ExpressionAttributeValues")
return
}
2026-02-15 23:38:48 -05:00
defer {
for k, v in attr_values {
delete(k)
v_copy := v
dynamodb.attr_value_destroy(&v_copy)
}
delete(attr_values)
}
2026-02-16 02:15:15 -05:00
// ---- GSI query path ----
if idx_name, has_idx := index_name.?; has_idx {
_, gsi_found := dynamodb.find_gsi(&metadata, idx_name)
if !gsi_found {
make_error_response(response, .ValidationException,
fmt.tprintf("The table does not have the specified index: %s", idx_name))
return
}
result, err := dynamodb.gsi_query(engine, table_name, idx_name,
pk_owned, exclusive_start_key, limit, sk_condition)
if err != .None {
handle_storage_error(response, err)
return
}
defer dynamodb.query_result_destroy(&result)
// Apply FilterExpression
2026-02-16 06:45:55 -05:00
filtered_items, filter_ok := apply_filter_to_items(request.body, result.items, attr_names, attr_values)
if !filter_ok {
make_error_response(response, .ValidationException, "Invalid FilterExpression")
return
}
2026-02-16 02:15:15 -05:00
scanned_count := len(result.items)
// Apply ProjectionExpression
projection, has_proj := dynamodb.parse_projection_expression(request.body, attr_names)
2026-02-16 03:11:11 -05:00
defer { // This block just frees the cloned string and projection slice
if has_proj && len(projection) > 0 {
for path in projection {
delete(path) // Free each cloned string
}
delete(projection) // Free the slice
}
}
2026-02-16 02:15:15 -05:00
final_items: []dynamodb.Item
if has_proj && len(projection) > 0 {
projected := make([]dynamodb.Item, len(filtered_items))
for item, i in filtered_items {
projected[i] = dynamodb.apply_projection(item, projection)
}
final_items = projected
} else {
final_items = filtered_items
}
write_items_response_with_pagination_ex(
response, final_items, result.last_evaluated_key, &metadata, scanned_count,
)
if has_proj && len(projection) > 0 {
for &item in final_items {
dynamodb.item_destroy(&item)
}
delete(final_items)
}
return
}
// ---- Main table query path ----
result, err := dynamodb.query(engine, table_name, pk_owned, exclusive_start_key, limit, sk_condition)
if err != .None {
handle_storage_error(response, err)
return
}
defer dynamodb.query_result_destroy(&result)
2026-02-15 23:38:48 -05:00
// ---- Apply FilterExpression (post-query filter) ----
2026-02-16 06:45:55 -05:00
filtered_items, filter_ok := apply_filter_to_items(request.body, result.items, attr_names, attr_values)
if !filter_ok {
make_error_response(response, .ValidationException, "Invalid FilterExpression")
return
}
2026-02-15 23:38:48 -05:00
scanned_count := len(result.items)
// ---- Apply ProjectionExpression ----
projection, has_proj := dynamodb.parse_projection_expression(request.body, attr_names)
2026-02-16 03:11:11 -05:00
defer { // This block just frees the cloned string and projection slice
if has_proj && len(projection) > 0 {
for path in projection {
delete(path) // Free each cloned string
}
delete(projection) // Free the slice
}
}
2026-02-15 23:38:48 -05:00
final_items: []dynamodb.Item
if has_proj && len(projection) > 0 {
projected := make([]dynamodb.Item, len(filtered_items))
for item, i in filtered_items {
projected[i] = dynamodb.apply_projection(item, projection)
}
final_items = projected
} else {
final_items = filtered_items
}
// Build response
write_items_response_with_pagination_ex(
response, final_items, result.last_evaluated_key, &metadata, scanned_count,
)
// Cleanup projected items if we created them
if has_proj && len(projection) > 0 {
for &item in final_items {
dynamodb.item_destroy(&item)
}
delete(final_items)
}
2026-02-15 13:56:08 -05:00
}
2026-02-15 23:38:48 -05:00
// handle_scan ...
2026-02-15 13:56:08 -05:00
handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
2026-02-15 14:10:48 -05:00
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
2026-02-17 02:03:40 -05:00
defer delete(table_name)
2026-02-15 14:10:48 -05:00
2026-02-16 02:15:15 -05:00
// Grab index name from request body
index_name := parse_index_name(request.body)
2026-02-17 02:03:40 -05:00
defer {
if idx, has := index_name.?; has {
delete(idx)
}
}
2026-02-16 02:15:15 -05:00
2026-02-15 20:57:16 -05:00
metadata, meta_err := dynamodb.get_table_metadata(engine, table_name)
if meta_err != .None {
handle_storage_error(response, meta_err)
return
}
defer dynamodb.table_metadata_destroy(&metadata, engine.allocator)
2026-02-15 14:10:48 -05:00
limit := dynamodb.parse_limit(request.body)
if limit == 0 {
limit = 100
}
2026-02-15 20:57:16 -05:00
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key(
request.body, table_name, metadata.key_schema,
)
if !esk_ok {
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
return
}
defer {
if esk, has_esk := exclusive_start_key.?; has_esk {
delete(esk)
}
}
2026-02-15 14:10:48 -05:00
2026-02-15 23:38:48 -05:00
// ---- Parse ExpressionAttributeNames/Values for filter/projection ----
attr_names := dynamodb.parse_expression_attribute_names(request.body)
defer {
if names, has_names := attr_names.?; has_names {
for k, v in names {
delete(k)
delete(v)
}
names_copy := names
delete(names_copy)
}
}
2026-02-16 04:49:10 -05:00
attr_values, vals_ok := dynamodb.parse_expression_attribute_values(request.body)
if !vals_ok {
make_error_response(response, .ValidationException, "Invalid ExpressionAttributeValues")
return
}
2026-02-15 23:38:48 -05:00
defer {
for k, v in attr_values {
delete(k)
v_copy := v
dynamodb.attr_value_destroy(&v_copy)
}
delete(attr_values)
}
2026-02-16 02:15:15 -05:00
// ---- GSI scan path ----
if idx_name, has_idx := index_name.?; has_idx {
_, gsi_found := dynamodb.find_gsi(&metadata, idx_name)
if !gsi_found {
make_error_response(response, .ValidationException,
fmt.tprintf("The table does not have the specified index: %s", idx_name))
return
}
result, err := dynamodb.gsi_scan(engine, table_name, idx_name, exclusive_start_key, limit)
if err != .None {
handle_storage_error(response, err)
return
}
defer dynamodb.scan_result_destroy(&result)
2026-02-16 06:45:55 -05:00
// Apply FilterExpression
filtered_items, filter_ok := apply_filter_to_items(request.body, result.items, attr_names, attr_values)
if !filter_ok {
make_error_response(response, .ValidationException, "Invalid FilterExpression")
return
}
2026-02-16 02:15:15 -05:00
scanned_count := len(result.items)
2026-02-16 06:45:55 -05:00
// Apply ProjectionExpression
2026-02-16 02:15:15 -05:00
projection, has_proj := dynamodb.parse_projection_expression(request.body, attr_names)
2026-02-16 03:11:11 -05:00
defer { // This block just frees the cloned string and projection slice
if has_proj && len(projection) > 0 {
for path in projection {
delete(path) // Free each cloned string
}
delete(projection) // Free the slice
}
}
2026-02-16 02:15:15 -05:00
final_items: []dynamodb.Item
if has_proj && len(projection) > 0 {
projected := make([]dynamodb.Item, len(filtered_items))
for item, i in filtered_items {
projected[i] = dynamodb.apply_projection(item, projection)
}
final_items = projected
} else {
final_items = filtered_items
}
write_items_response_with_pagination_ex(
response, final_items, result.last_evaluated_key, &metadata, scanned_count,
)
if has_proj && len(projection) > 0 {
for &item in final_items {
dynamodb.item_destroy(&item)
}
delete(final_items)
}
return
}
// ---- Main table scan path ----
result, err := dynamodb.scan(engine, table_name, exclusive_start_key, limit)
if err != .None {
handle_storage_error(response, err)
return
}
defer dynamodb.scan_result_destroy(&result)
2026-02-16 06:45:55 -05:00
// ---- Apply FilterExpression (post-scan filter) ----
filtered_items, filter_ok := apply_filter_to_items(request.body, result.items, attr_names, attr_values)
if !filter_ok {
make_error_response(response, .ValidationException, "Invalid FilterExpression")
return
}
2026-02-15 23:38:48 -05:00
scanned_count := len(result.items)
// ---- Apply ProjectionExpression ----
projection, has_proj := dynamodb.parse_projection_expression(request.body, attr_names)
2026-02-16 03:11:11 -05:00
defer { // This block just frees the cloned string and projection slice
if has_proj && len(projection) > 0 {
for path in projection {
delete(path) // Free each cloned string
}
delete(projection) // Free the slice
}
}
2026-02-15 23:38:48 -05:00
final_items: []dynamodb.Item
if has_proj && len(projection) > 0 {
projected := make([]dynamodb.Item, len(filtered_items))
for item, i in filtered_items {
projected[i] = dynamodb.apply_projection(item, projection)
}
final_items = projected
} else {
final_items = filtered_items
}
// Build response
write_items_response_with_pagination_ex(
response, final_items, result.last_evaluated_key, &metadata, scanned_count,
)
if has_proj && len(projection) > 0 {
for &item in final_items {
dynamodb.item_destroy(&item)
}
delete(final_items)
}
2026-02-15 20:57:16 -05:00
}
2026-02-15 23:38:48 -05:00
// ============================================================================
// Shared helper: apply FilterExpression to a set of items
// ============================================================================
apply_filter_to_items :: proc(
request_body: []byte,
items: []dynamodb.Item,
attr_names: Maybe(map[string]string),
attr_values: map[string]dynamodb.Attribute_Value,
2026-02-16 06:45:55 -05:00
) -> (filtered_items: []dynamodb.Item, ok: bool) {
2026-02-15 23:38:48 -05:00
filter_expr, has_filter := dynamodb.parse_filter_expression_string(request_body)
if !has_filter {
2026-02-16 06:45:55 -05:00
return items, true
2026-02-15 23:38:48 -05:00
}
2026-02-17 02:03:40 -05:00
defer delete(filter_expr)
2026-02-15 23:38:48 -05:00
filter_node, filter_ok := dynamodb.parse_filter_expression(filter_expr, attr_names, attr_values)
if !filter_ok || filter_node == nil {
2026-02-16 06:45:55 -05:00
return nil, false
2026-02-15 23:38:48 -05:00
}
defer {
dynamodb.filter_node_destroy(filter_node)
}
// Filter items
filtered := make([dynamic]dynamodb.Item)
for item in items {
if dynamodb.evaluate_filter(item, filter_node) {
append(&filtered, item)
}
}
2026-02-16 06:45:55 -05:00
return filtered[:], true
2026-02-15 23:38:48 -05:00
}
// ============================================================================
// Extended pagination response builder (includes ScannedCount vs Count)
//
// DynamoDB distinguishes:
// Count = number of items AFTER FilterExpression
// ScannedCount = number of items BEFORE FilterExpression
// ============================================================================
write_items_response_with_pagination_ex :: proc(
response: ^HTTP_Response,
items: []dynamodb.Item,
last_evaluated_key_binary: Maybe([]byte),
metadata: ^dynamodb.Table_Metadata,
scanned_count: int,
) {
2026-02-17 02:03:40 -05:00
builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
2026-02-15 23:38:48 -05:00
strings.write_string(&builder, `{"Items":[`)
2026-02-17 02:03:40 -05:00
// Use serialize_item_to_builder directly an NOT serialize_item
2026-02-15 23:38:48 -05:00
for item, i in items {
if i > 0 do strings.write_string(&builder, ",")
2026-02-17 02:03:40 -05:00
dynamodb.serialize_item_to_builder(&builder, item)
2026-02-15 23:38:48 -05:00
}
strings.write_string(&builder, `],"Count":`)
fmt.sbprintf(&builder, "%d", len(items))
strings.write_string(&builder, `,"ScannedCount":`)
fmt.sbprintf(&builder, "%d", scanned_count)
if binary_key, has_last := last_evaluated_key_binary.?; has_last {
lek_json, lek_ok := dynamodb.serialize_last_evaluated_key(binary_key, metadata)
if lek_ok {
strings.write_string(&builder, `,"LastEvaluatedKey":`)
strings.write_string(&builder, lek_json)
}
}
strings.write_string(&builder, "}")
2026-02-17 02:03:40 -05:00
// We have to Clone the string before passing to response_set_body
resp_body := strings.clone(strings.to_string(builder))
2026-02-15 23:38:48 -05:00
response_set_body(response, transmute([]byte)resp_body)
}
2026-02-15 20:57:16 -05:00
// ============================================================================
// Shared Pagination Response Builder
//
// Mirrors the Zig writeItemsResponseWithPagination helper:
// - Serializes Items array
// - Emits Count / ScannedCount
// - Decodes binary last_evaluated_key → DynamoDB JSON LastEvaluatedKey
// ============================================================================
write_items_response_with_pagination :: proc(
response: ^HTTP_Response,
items: []dynamodb.Item,
last_evaluated_key_binary: Maybe([]byte),
metadata: ^dynamodb.Table_Metadata,
) {
2026-02-17 02:03:40 -05:00
builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
2026-02-15 14:10:48 -05:00
strings.write_string(&builder, `{"Items":[`)
2026-02-17 02:03:40 -05:00
// Use serialize_item_to_builder directly so we always get the correct response payload
2026-02-15 20:57:16 -05:00
for item, i in items {
2026-02-15 14:10:48 -05:00
if i > 0 do strings.write_string(&builder, ",")
2026-02-17 02:03:40 -05:00
dynamodb.serialize_item_to_builder(&builder, item)
2026-02-15 14:10:48 -05:00
}
strings.write_string(&builder, `],"Count":`)
2026-02-15 20:57:16 -05:00
fmt.sbprintf(&builder, "%d", len(items))
2026-02-15 14:10:48 -05:00
strings.write_string(&builder, `,"ScannedCount":`)
2026-02-15 20:57:16 -05:00
fmt.sbprintf(&builder, "%d", len(items))
if binary_key, has_last := last_evaluated_key_binary.?; has_last {
lek_json, lek_ok := dynamodb.serialize_last_evaluated_key(binary_key, metadata)
if lek_ok {
strings.write_string(&builder, `,"LastEvaluatedKey":`)
strings.write_string(&builder, lek_json)
}
2026-02-15 14:10:48 -05:00
}
strings.write_string(&builder, "}")
2026-02-17 02:03:40 -05:00
// We have to Clone the string before passing to response_set_body
resp_body := strings.clone(strings.to_string(builder))
2026-02-15 14:10:48 -05:00
response_set_body(response, transmute([]byte)resp_body)
2026-02-15 13:56:08 -05:00
}
2026-02-15 20:57:16 -05:00
// ============================================================================
// Centralized Storage Error → DynamoDB Error mapping
//
// Maps storage errors to the correct DynamoDB error type AND HTTP status code.
// DynamoDB uses:
// 400 — ValidationException, ResourceNotFoundException, ResourceInUseException, etc.
// 500 — InternalServerError
// ============================================================================
handle_storage_error :: proc(response: ^HTTP_Response, err: dynamodb.Storage_Error) {
#partial switch err {
case .Table_Not_Found:
make_error_response(response, .ResourceNotFoundException, "Requested resource not found")
case .Table_Already_Exists:
make_error_response(response, .ResourceInUseException, "Table already exists")
case .Missing_Key_Attribute:
make_error_response(response, .ValidationException, "One or more required key attributes are missing")
case .Invalid_Key:
make_error_response(response, .ValidationException, "Invalid key: type mismatch or malformed key value")
2026-02-17 09:57:35 -05:00
case .Validation_Error:
make_error_response(response, .ValidationException, "Invalid request: type mismatch or incompatible operand")
2026-02-15 20:57:16 -05:00
case .Serialization_Error:
make_error_response(response, .InternalServerError, "Internal serialization error")
case .RocksDB_Error:
make_error_response(response, .InternalServerError, "Internal storage error")
case .Out_Of_Memory:
make_error_response(response, .InternalServerError, "Internal memory error")
case:
make_error_response(response, .InternalServerError, "Unexpected error")
}
}
2026-02-15 13:56:08 -05:00
// ============================================================================
// Schema Parsing Helpers
// ============================================================================
Key_Schema_Error :: enum {
None,
Missing_Key_Schema,
Invalid_Key_Schema,
No_Hash_Key,
Multiple_Hash_Keys,
Multiple_Range_Keys,
Invalid_Key_Type,
}
parse_key_schema :: proc(root: json.Object) -> ([]dynamodb.Key_Schema_Element, Key_Schema_Error) {
key_schema_val, found := root["KeySchema"]
if !found do return nil, .Missing_Key_Schema
key_schema_array, ok := key_schema_val.(json.Array)
if !ok do return nil, .Invalid_Key_Schema
if len(key_schema_array) == 0 || len(key_schema_array) > 2 {
return nil, .Invalid_Key_Schema
}
key_schema := make([]dynamodb.Key_Schema_Element, len(key_schema_array))
hash_count := 0
range_count := 0
for elem, i in key_schema_array {
elem_obj, elem_ok := elem.(json.Object)
if !elem_ok {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
// Get AttributeName
attr_name_val, attr_found := elem_obj["AttributeName"]
if !attr_found {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
attr_name, name_ok := attr_name_val.(json.String)
if !name_ok {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
// Get KeyType
key_type_val, type_found := elem_obj["KeyType"]
if !type_found {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
key_type_str, type_ok := key_type_val.(json.String)
if !type_ok {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
key_type, kt_ok := dynamodb.key_type_from_string(string(key_type_str))
if !kt_ok {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Type
}
// Count key types
switch key_type {
case .HASH: hash_count += 1
case .RANGE: range_count += 1
}
key_schema[i] = dynamodb.Key_Schema_Element{
attribute_name = strings.clone(string(attr_name)),
key_type = key_type,
}
}
// Validate key counts
if hash_count == 0 {
for ks in key_schema {
delete(ks.attribute_name)
}
delete(key_schema)
return nil, .No_Hash_Key
}
if hash_count > 1 {
for ks in key_schema {
delete(ks.attribute_name)
}
delete(key_schema)
return nil, .Multiple_Hash_Keys
}
if range_count > 1 {
for ks in key_schema {
delete(ks.attribute_name)
}
delete(key_schema)
return nil, .Multiple_Range_Keys
}
return key_schema, .None
}
key_schema_error_message :: proc(err: Key_Schema_Error) -> string {
switch err {
case .None: return ""
case .Missing_Key_Schema: return "Missing KeySchema"
case .Invalid_Key_Schema: return "Invalid KeySchema format"
case .No_Hash_Key: return "KeySchema must contain exactly one HASH key"
case .Multiple_Hash_Keys: return "KeySchema can only contain one HASH key"
case .Multiple_Range_Keys: return "KeySchema can only contain one RANGE key"
case .Invalid_Key_Type: return "Invalid KeyType (must be HASH or RANGE)"
}
return "Invalid KeySchema"
}
Attribute_Definitions_Error :: enum {
None,
Missing_Attribute_Definitions,
Invalid_Attribute_Definitions,
Invalid_Attribute_Type,
Duplicate_Attribute_Name,
}
parse_attribute_definitions :: proc(root: json.Object) -> ([]dynamodb.Attribute_Definition, Attribute_Definitions_Error) {
attr_defs_val, found := root["AttributeDefinitions"]
if !found do return nil, .Missing_Attribute_Definitions
attr_defs_array, ok := attr_defs_val.(json.Array)
if !ok do return nil, .Invalid_Attribute_Definitions
if len(attr_defs_array) == 0 {
return nil, .Invalid_Attribute_Definitions
}
attr_defs := make([]dynamodb.Attribute_Definition, len(attr_defs_array))
seen_names := make(map[string]bool, allocator = context.temp_allocator)
defer delete(seen_names)
for elem, i in attr_defs_array {
elem_obj, elem_ok := elem.(json.Object)
if !elem_ok {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
// Get AttributeName
2026-02-15 20:57:16 -05:00
attr_name_val, attr_found := elem_obj["AttributeName"]
if !attr_found {
2026-02-15 13:56:08 -05:00
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
attr_name, name_ok := attr_name_val.(json.String)
if !name_ok {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
// Check for duplicates
if string(attr_name) in seen_names {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Duplicate_Attribute_Name
}
seen_names[string(attr_name)] = true
// Get AttributeType
attr_type_val, type_found := elem_obj["AttributeType"]
if !type_found {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
attr_type_str, type_ok := attr_type_val.(json.String)
if !type_ok {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
attr_type, at_ok := dynamodb.scalar_type_from_string(string(attr_type_str))
if !at_ok {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Type
}
attr_defs[i] = dynamodb.Attribute_Definition{
attribute_name = strings.clone(string(attr_name)),
attribute_type = attr_type,
}
}
return attr_defs, .None
}
attribute_definitions_error_message :: proc(err: Attribute_Definitions_Error) -> string {
switch err {
case .None: return ""
case .Missing_Attribute_Definitions: return "Missing AttributeDefinitions"
case .Invalid_Attribute_Definitions: return "Invalid AttributeDefinitions format"
case .Invalid_Attribute_Type: return "Invalid AttributeType (must be S, N, or B)"
case .Duplicate_Attribute_Name: return "Duplicate attribute name in AttributeDefinitions"
}
return "Invalid AttributeDefinitions"
}
validate_key_attributes_defined :: proc(key_schema: []dynamodb.Key_Schema_Element, attr_defs: []dynamodb.Attribute_Definition) -> bool {
for ks in key_schema {
found := false
for ad in attr_defs {
if ks.attribute_name == ad.attribute_name {
found = true
break
}
}
if !found do return false
}
return true
}
// ============================================================================
// Error Response Helper
2026-02-15 20:57:16 -05:00
//
// Maps DynamoDB error types to correct HTTP status codes:
// 400 — ValidationException, ResourceNotFoundException, ResourceInUseException,
// ConditionalCheckFailedException, SerializationException
// 500 — InternalServerError
2026-02-15 13:56:08 -05:00
// ============================================================================
make_error_response :: proc(response: ^HTTP_Response, err_type: dynamodb.DynamoDB_Error_Type, message: string) -> HTTP_Response {
2026-02-15 20:57:16 -05:00
status: HTTP_Status
#partial switch err_type {
case .InternalServerError:
status = .Internal_Server_Error
case:
status = .Bad_Request
}
response_set_status(response, status)
2026-02-15 13:56:08 -05:00
error_body := dynamodb.error_to_response(err_type, message)
response_set_body(response, transmute([]byte)error_body)
return response^
}
// ============================================================================
// Configuration
// ============================================================================
2026-02-15 08:55:22 -05:00
parse_config :: proc() -> Config {
config := Config{
host = "0.0.0.0",
port = 8002,
data_dir = "./data",
verbose = false,
}
// Environment variables
if port_str, env_ok := os.lookup_env("JORMUN_PORT"); env_ok {
if port, parse_ok := strconv.parse_int(port_str); parse_ok {
config.port = port
}
}
if host, ok := os.lookup_env("JORMUN_HOST"); ok {
config.host = host
}
if data_dir, ok := os.lookup_env("JORMUN_DATA_DIR"); ok {
config.data_dir = data_dir
}
if verbose, ok := os.lookup_env("JORMUN_VERBOSE"); ok {
config.verbose = verbose == "1"
}
// TODO: Parse command line arguments
return config
}
print_banner :: proc(config: Config) {
banner := `
DynamoDB-Compatible Database
Powered by RocksDB + Odin
`
fmt.println(banner)
fmt.printfln(" Port: %d | Data Dir: %s\n", config.port, config.data_dir)
}