package main import "core:encoding/json" import "core:fmt" import "core:mem" import "core:os" import "core:strconv" import "core:strings" import "dynamodb" Config :: struct { host: string, port: int, data_dir: string, verbose: bool, } main :: proc() { // Parse configuration config := parse_config() // Print banner print_banner(config) // Create data directory os.make_directory(config.data_dir) // Initialize storage engine engine, err := dynamodb.storage_engine_init(context.allocator, config.data_dir) if err != .None { fmt.eprintln("Failed to initialize storage:", err) os.exit(1) } defer dynamodb.storage_engine_destroy(engine) fmt.printfln("Storage engine initialized at %s", config.data_dir) fmt.printfln("Starting DynamoDB-compatible server on %s:%d", config.host, config.port) // Create HTTP server server_config := default_server_config() server, server_ok := server_init( context.allocator, config.host, config.port, handle_dynamodb_request, engine, server_config, ) if !server_ok { fmt.eprintln("Failed to initialize HTTP server") os.exit(1) } defer server_stop(&server) fmt.println("Ready to accept connections!") // Start server (blocks) if !server_start(&server) { fmt.eprintln("Server failed to start") os.exit(1) } } // DynamoDB request handler - called for each HTTP request with request-scoped arena allocator handle_dynamodb_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_alloc: mem.Allocator) -> HTTP_Response { engine := cast(^dynamodb.Storage_Engine)ctx // All allocations in this function use the request arena automatically response := response_init(request_alloc) response_add_header(&response, "Content-Type", "application/x-amz-json-1.0") response_add_header(&response, "x-amzn-RequestId", "local-request-id") // Get X-Amz-Target header to determine operation target := request_get_header(request, "X-Amz-Target") if target == nil { return make_error_response(&response, .ValidationException, "Missing X-Amz-Target header") } operation := dynamodb.operation_from_target(target.?) // Route to appropriate handler #partial switch operation { case .CreateTable: handle_create_table(engine, request, &response) case .DeleteTable: handle_delete_table(engine, request, &response) case .DescribeTable: handle_describe_table(engine, request, &response) case .ListTables: handle_list_tables(engine, request, &response) case .PutItem: handle_put_item(engine, request, &response) case .GetItem: handle_get_item(engine, request, &response) case .DeleteItem: handle_delete_item(engine, request, &response) case .Query: handle_query(engine, request, &response) case .Scan: handle_scan(engine, request, &response) case .Unknown: return make_error_response(&response, .ValidationException, "Unknown operation") case: return make_error_response(&response, .ValidationException, "Operation not implemented") } return response } // ============================================================================ // Table Operations // ============================================================================ handle_create_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { // Parse JSON body data, parse_err := json.parse(request.body, allocator = context.allocator) if parse_err != nil { make_error_response(response, .ValidationException, "Invalid JSON") return } defer json.destroy_value(data) root, ok := data.(json.Object) if !ok { make_error_response(response, .ValidationException, "Request must be an object") return } // Extract TableName table_name_val, found := root["TableName"] if !found { make_error_response(response, .ValidationException, "Missing TableName") return } table_name, name_ok := table_name_val.(json.String) if !name_ok { make_error_response(response, .ValidationException, "TableName must be a string") return } // Parse KeySchema key_schema, ks_err := parse_key_schema(root) if ks_err != .None { msg := key_schema_error_message(ks_err) make_error_response(response, .ValidationException, msg) return } // Parse AttributeDefinitions attr_defs, ad_err := parse_attribute_definitions(root) if ad_err != .None { msg := attribute_definitions_error_message(ad_err) make_error_response(response, .ValidationException, msg) return } // Validate that key attributes are defined if !validate_key_attributes_defined(key_schema, attr_defs) { make_error_response(response, .ValidationException, "Key attribute not defined in AttributeDefinitions") return } // Create the table desc, create_err := dynamodb.create_table(engine, string(table_name), key_schema, attr_defs) if create_err != .None { #partial switch create_err { case .Table_Already_Exists: make_error_response(response, .ResourceInUseException, "Table already exists") case: make_error_response(response, .InternalServerError, "Failed to create table") } return } // Build response resp_body := fmt.aprintf( `{{"TableDescription":{{"TableName":"%s","TableStatus":"%s","CreationDateTime":%d}}}}`, desc.table_name, dynamodb.table_status_to_string(desc.table_status), desc.creation_date_time, ) response_set_body(response, transmute([]byte)resp_body) } handle_delete_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { table_name, ok := dynamodb.parse_table_name(request.body) if !ok { make_error_response(response, .ValidationException, "Invalid request or missing TableName") return } err := dynamodb.delete_table(engine, table_name) if err != .None { #partial switch err { case .Table_Not_Found: make_error_response(response, .ResourceNotFoundException, "Table not found") case: make_error_response(response, .InternalServerError, "Failed to delete table") } return } resp_body := fmt.aprintf(`{{"TableDescription":{{"TableName":"%s","TableStatus":"DELETING"}}}}`, table_name) response_set_body(response, transmute([]byte)resp_body) } handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { table_name, ok := dynamodb.parse_table_name(request.body) if !ok { make_error_response(response, .ValidationException, "Invalid request or missing TableName") return } metadata, err := dynamodb.get_table_metadata(engine, table_name) if err != .None { #partial switch err { case .Table_Not_Found: make_error_response(response, .ResourceNotFoundException, "Table not found") case: make_error_response(response, .InternalServerError, "Failed to describe table") } return } defer dynamodb.table_metadata_destroy(&metadata, context.allocator) // Build response with key schema builder := strings.builder_make() strings.write_string(&builder, `{"Table":{"TableName":"`) strings.write_string(&builder, metadata.table_name) strings.write_string(&builder, `","TableStatus":"`) strings.write_string(&builder, dynamodb.table_status_to_string(metadata.table_status)) strings.write_string(&builder, `","CreationDateTime":`) fmt.sbprintf(&builder, "%d", metadata.creation_date_time) strings.write_string(&builder, `,"KeySchema":[`) for ks, i in metadata.key_schema { if i > 0 do strings.write_string(&builder, ",") fmt.sbprintf(&builder, `{"AttributeName":"%s","KeyType":"%s"}`, ks.attribute_name, dynamodb.key_type_to_string(ks.key_type)) } strings.write_string(&builder, `],"AttributeDefinitions":[`) for ad, i in metadata.attribute_definitions { if i > 0 do strings.write_string(&builder, ",") fmt.sbprintf(&builder, `{"AttributeName":"%s","AttributeType":"%s"}`, ad.attribute_name, dynamodb.scalar_type_to_string(ad.attribute_type)) } strings.write_string(&builder, `]}}`) resp_body := strings.to_string(builder) response_set_body(response, transmute([]byte)resp_body) } handle_list_tables :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { _ = request tables, err := dynamodb.list_tables(engine) if err != .None { make_error_response(response, .InternalServerError, "Failed to list tables") return } // tables are owned by engine allocator — just read them, don't free builder := strings.builder_make() strings.write_string(&builder, `{"TableNames":[`) for table, i in tables { if i > 0 do strings.write_string(&builder, ",") fmt.sbprintf(&builder, `"%s"`, table) } strings.write_string(&builder, `]}`) resp_body := strings.to_string(builder) response_set_body(response, transmute([]byte)resp_body) } // ============================================================================ // Item Operations // ============================================================================ handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { table_name, ok := dynamodb.parse_table_name(request.body) if !ok { make_error_response(response, .ValidationException, "Invalid request or missing TableName") return } item, item_ok := dynamodb.parse_item_from_request(request.body) if !item_ok { make_error_response(response, .ValidationException, "Invalid or missing Item") return } defer dynamodb.item_destroy(&item) err := dynamodb.put_item(engine, table_name, item) if err != .None { #partial switch err { case .Table_Not_Found: make_error_response(response, .ResourceNotFoundException, "Table not found") case .Missing_Key_Attribute: make_error_response(response, .ValidationException, "Item missing required key attribute") case .Invalid_Key: make_error_response(response, .ValidationException, "Invalid key format") case: make_error_response(response, .InternalServerError, "Failed to put item") } return } response_set_body(response, transmute([]byte)string("{}")) } handle_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { table_name, ok := dynamodb.parse_table_name(request.body) if !ok { make_error_response(response, .ValidationException, "Invalid request or missing TableName") return } key, key_ok := dynamodb.parse_key_from_request(request.body) if !key_ok { make_error_response(response, .ValidationException, "Invalid or missing Key") return } defer dynamodb.item_destroy(&key) item, err := dynamodb.get_item(engine, table_name, key) if err != .None { #partial switch err { case .Table_Not_Found: make_error_response(response, .ResourceNotFoundException, "Table not found") case .Missing_Key_Attribute: make_error_response(response, .ValidationException, "Key missing required attributes") case .Invalid_Key: make_error_response(response, .ValidationException, "Invalid key format") case: make_error_response(response, .InternalServerError, "Failed to get item") } return } if item_val, has_item := item.?; has_item { defer dynamodb.item_destroy(&item_val) item_json := dynamodb.serialize_item(item_val) resp := fmt.aprintf(`{"Item":%s}`, item_json) response_set_body(response, transmute([]byte)resp) } else { response_set_body(response, transmute([]byte)string("{}")) } } handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { table_name, ok := dynamodb.parse_table_name(request.body) if !ok { make_error_response(response, .ValidationException, "Invalid request or missing TableName") return } key, key_ok := dynamodb.parse_key_from_request(request.body) if !key_ok { make_error_response(response, .ValidationException, "Invalid or missing Key") return } defer dynamodb.item_destroy(&key) err := dynamodb.delete_item(engine, table_name, key) if err != .None { #partial switch err { case .Table_Not_Found: make_error_response(response, .ResourceNotFoundException, "Table not found") case .Missing_Key_Attribute: make_error_response(response, .ValidationException, "Key missing required attributes") case .Invalid_Key: make_error_response(response, .ValidationException, "Invalid key format") case: make_error_response(response, .InternalServerError, "Failed to delete item") } return } response_set_body(response, transmute([]byte)string("{}")) } // ============================================================================ // Query and Scan Operations // ============================================================================ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { table_name, ok := dynamodb.parse_table_name(request.body) if !ok { make_error_response(response, .ValidationException, "Invalid request or missing TableName") return } // Parse KeyConditionExpression kc, kc_ok := dynamodb.parse_query_key_condition(request.body) if !kc_ok { make_error_response(response, .ValidationException, "Missing or invalid KeyConditionExpression") return } defer dynamodb.key_condition_destroy(&kc) // Extract partition key bytes pk_bytes, pk_ok := dynamodb.key_condition_get_pk_bytes(&kc) if !pk_ok { make_error_response(response, .ValidationException, "Invalid partition key type") return } // Clone pk_bytes so it survives kc cleanup (kc borrows from the parsed value) pk_owned := make([]byte, len(pk_bytes)) copy(pk_owned, pk_bytes) defer delete(pk_owned) // Parse Limit limit := dynamodb.parse_limit(request.body) if limit == 0 { limit = 100 } // TODO: Parse ExclusiveStartKey properly (requires metadata for type info) exclusive_start_key: Maybe([]byte) = nil result, err := dynamodb.query(engine, table_name, pk_owned, exclusive_start_key, limit) if err != .None { #partial switch err { case .Table_Not_Found: make_error_response(response, .ResourceNotFoundException, "Table not found") case: make_error_response(response, .InternalServerError, "Query failed") } return } defer dynamodb.query_result_destroy(&result) // Build response builder := strings.builder_make() strings.write_string(&builder, `{"Items":[`) for item, i in result.items { if i > 0 do strings.write_string(&builder, ",") item_json := dynamodb.serialize_item(item) strings.write_string(&builder, item_json) } strings.write_string(&builder, `],"Count":`) fmt.sbprintf(&builder, "%d", len(result.items)) strings.write_string(&builder, `,"ScannedCount":`) fmt.sbprintf(&builder, "%d", len(result.items)) // TODO: Add LastEvaluatedKey when pagination is fully wired if last_key, has_last := result.last_evaluated_key.?; has_last { _ = last_key } strings.write_string(&builder, "}") resp_body := strings.to_string(builder) response_set_body(response, transmute([]byte)resp_body) } handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { table_name, ok := dynamodb.parse_table_name(request.body) if !ok { make_error_response(response, .ValidationException, "Invalid request or missing TableName") return } // Parse Limit (default to 100 if not specified) limit := dynamodb.parse_limit(request.body) if limit == 0 { limit = 100 } // Parse ExclusiveStartKey if present // For now, we'll implement basic scan without ExclusiveStartKey parsing // TODO: Parse ExclusiveStartKey from request body and convert to binary key exclusive_start_key: Maybe([]byte) = nil // Perform scan result, err := dynamodb.scan(engine, table_name, exclusive_start_key, limit) if err != .None { #partial switch err { case .Table_Not_Found: make_error_response(response, .ResourceNotFoundException, "Table not found") case: make_error_response(response, .InternalServerError, "Failed to scan table") } return } defer dynamodb.scan_result_destroy(&result) // Build response builder := strings.builder_make() strings.write_string(&builder, `{"Items":[`) for item, i in result.items { if i > 0 do strings.write_string(&builder, ",") item_json := dynamodb.serialize_item(item) strings.write_string(&builder, item_json) } strings.write_string(&builder, `],"Count":`) fmt.sbprintf(&builder, "%d", len(result.items)) strings.write_string(&builder, `,"ScannedCount":`) fmt.sbprintf(&builder, "%d", len(result.items)) // Add LastEvaluatedKey if present (pagination) if last_key, has_last := result.last_evaluated_key.?; has_last { // TODO: Convert binary key back to DynamoDB JSON format // For now, we'll just include it as base64 (not DynamoDB-compatible yet) _ = last_key // When fully implemented, this should decode the key and serialize as: // ,"LastEvaluatedKey":{"pk":{"S":"value"},"sk":{"N":"123"}} } strings.write_string(&builder, "}") resp_body := strings.to_string(builder) response_set_body(response, transmute([]byte)resp_body) } // ============================================================================ // Schema Parsing Helpers // ============================================================================ Key_Schema_Error :: enum { None, Missing_Key_Schema, Invalid_Key_Schema, No_Hash_Key, Multiple_Hash_Keys, Multiple_Range_Keys, Invalid_Key_Type, } parse_key_schema :: proc(root: json.Object) -> ([]dynamodb.Key_Schema_Element, Key_Schema_Error) { key_schema_val, found := root["KeySchema"] if !found do return nil, .Missing_Key_Schema key_schema_array, ok := key_schema_val.(json.Array) if !ok do return nil, .Invalid_Key_Schema if len(key_schema_array) == 0 || len(key_schema_array) > 2 { return nil, .Invalid_Key_Schema } key_schema := make([]dynamodb.Key_Schema_Element, len(key_schema_array)) hash_count := 0 range_count := 0 for elem, i in key_schema_array { elem_obj, elem_ok := elem.(json.Object) if !elem_ok { // Cleanup for j in 0.. 1 { for ks in key_schema { delete(ks.attribute_name) } delete(key_schema) return nil, .Multiple_Hash_Keys } if range_count > 1 { for ks in key_schema { delete(ks.attribute_name) } delete(key_schema) return nil, .Multiple_Range_Keys } return key_schema, .None } key_schema_error_message :: proc(err: Key_Schema_Error) -> string { switch err { case .None: return "" case .Missing_Key_Schema: return "Missing KeySchema" case .Invalid_Key_Schema: return "Invalid KeySchema format" case .No_Hash_Key: return "KeySchema must contain exactly one HASH key" case .Multiple_Hash_Keys: return "KeySchema can only contain one HASH key" case .Multiple_Range_Keys: return "KeySchema can only contain one RANGE key" case .Invalid_Key_Type: return "Invalid KeyType (must be HASH or RANGE)" } return "Invalid KeySchema" } Attribute_Definitions_Error :: enum { None, Missing_Attribute_Definitions, Invalid_Attribute_Definitions, Invalid_Attribute_Type, Duplicate_Attribute_Name, } parse_attribute_definitions :: proc(root: json.Object) -> ([]dynamodb.Attribute_Definition, Attribute_Definitions_Error) { attr_defs_val, found := root["AttributeDefinitions"] if !found do return nil, .Missing_Attribute_Definitions attr_defs_array, ok := attr_defs_val.(json.Array) if !ok do return nil, .Invalid_Attribute_Definitions if len(attr_defs_array) == 0 { return nil, .Invalid_Attribute_Definitions } attr_defs := make([]dynamodb.Attribute_Definition, len(attr_defs_array)) seen_names := make(map[string]bool, allocator = context.temp_allocator) defer delete(seen_names) for elem, i in attr_defs_array { elem_obj, elem_ok := elem.(json.Object) if !elem_ok { for j in 0.. string { switch err { case .None: return "" case .Missing_Attribute_Definitions: return "Missing AttributeDefinitions" case .Invalid_Attribute_Definitions: return "Invalid AttributeDefinitions format" case .Invalid_Attribute_Type: return "Invalid AttributeType (must be S, N, or B)" case .Duplicate_Attribute_Name: return "Duplicate attribute name in AttributeDefinitions" } return "Invalid AttributeDefinitions" } validate_key_attributes_defined :: proc(key_schema: []dynamodb.Key_Schema_Element, attr_defs: []dynamodb.Attribute_Definition) -> bool { for ks in key_schema { found := false for ad in attr_defs { if ks.attribute_name == ad.attribute_name { found = true break } } if !found do return false } return true } // ============================================================================ // Error Response Helper // ============================================================================ make_error_response :: proc(response: ^HTTP_Response, err_type: dynamodb.DynamoDB_Error_Type, message: string) -> HTTP_Response { response_set_status(response, .Bad_Request) error_body := dynamodb.error_to_response(err_type, message) response_set_body(response, transmute([]byte)error_body) return response^ } // ============================================================================ // Configuration // ============================================================================ parse_config :: proc() -> Config { config := Config{ host = "0.0.0.0", port = 8002, data_dir = "./data", verbose = false, } // Environment variables if port_str, env_ok := os.lookup_env("JORMUN_PORT"); env_ok { if port, parse_ok := strconv.parse_int(port_str); parse_ok { config.port = port } } if host, ok := os.lookup_env("JORMUN_HOST"); ok { config.host = host } if data_dir, ok := os.lookup_env("JORMUN_DATA_DIR"); ok { config.data_dir = data_dir } if verbose, ok := os.lookup_env("JORMUN_VERBOSE"); ok { config.verbose = verbose == "1" } // TODO: Parse command line arguments return config } print_banner :: proc(config: Config) { banner := ` ╔═══════════════════════════════════════════════╗ ║ ║ ║ ╦╔═╗╦═╗╔╦╗╦ ╦╔╗╔╔╦╗╔╗ ║ ║ ║║ ║╠╦╝║║║║ ║║║║ ║║╠╩╗ ║ ║ ╚╝╚═╝╩╚═╩ ╩╚═╝╝╚╝═╩╝╚═╝ ║ ║ ║ ║ DynamoDB-Compatible Database ║ ║ Powered by RocksDB + Odin ║ ║ ║ ╚═══════════════════════════════════════════════╝ ` fmt.println(banner) fmt.printfln(" Port: %d | Data Dir: %s\n", config.port, config.data_dir) }