Compare commits

...

13 Commits

Author SHA1 Message Date
5ee3df86f1 more house keeping 2026-02-21 20:50:14 -05:00
47eefd0fe5 house keeping 2026-02-21 19:17:36 -05:00
443562dfb6 better readme 2026-02-21 19:14:00 -05:00
9cf54e1b9f make this configurable 2026-02-17 15:02:57 -05:00
a7f2a5ab59 enforce body size 2026-02-17 14:54:29 -05:00
6bc1a03347 spawn new thread per connection, add yaml for docs and also add GSI to readme 2026-02-17 14:50:50 -05:00
178b38fe18 fix all the gsi stuff 2026-02-17 12:56:09 -05:00
b92dc61b08 more GSI fixes 2026-02-17 12:36:38 -05:00
64da021148 fix GSI output 2026-02-17 10:44:56 -05:00
12ba2e57d7 try to get the storage json correct 2026-02-17 10:07:50 -05:00
a5a5d41e50 shes working 2026-02-17 09:57:35 -05:00
d8a80bd728 fix expression aritmetic 2026-02-17 08:49:30 -05:00
225a1533cc fix cloned string cleanup 2026-02-17 02:03:40 -05:00
23 changed files with 1713 additions and 1437 deletions

View File

@@ -1,64 +0,0 @@
# Multi-stage build for Odin + Python test environment with RocksDB
FROM debian:bookworm-slim AS odin-builder
# Install dependencies for building Odin
RUN apt-get update && apt-get install -y \
git \
curl \
build-essential \
clang \
llvm \
&& rm -rf /var/lib/apt/lists/*
# Install Odin compiler
WORKDIR /opt
RUN git clone https://github.com/odin-lang/Odin.git odin \
&& cd odin \
&& ./build_odin.sh release
# Final stage with both Odin and Python
FROM python:3.12-slim
# Install runtime and build dependencies including RocksDB
RUN apt-get update && apt-get install -y \
clang \
llvm \
make \
git \
build-essential \
cmake \
pkg-config \
# RocksDB and compression libraries
librocksdb-dev \
librocksdb8.7 \
# Compression libraries that RocksDB depends on
libsnappy-dev \
libgflags-dev \
libz-dev \
libbz2-dev \
liblz4-dev \
libzstd-dev \
# Additional common dependencies
libssl-dev \
libcurl4-openssl-dev \
&& rm -rf /var/lib/apt/lists/*
# Copy Odin compiler from builder stage
COPY --from=odin-builder /opt/odin /opt/odin
# Add Odin to PATH
ENV PATH="/opt/odin:${PATH}"
ENV ODIN_ROOT="/opt/odin"
# Set up library paths for RocksDB
ENV LD_LIBRARY_PATH="/usr/lib/x86_64-linux-gnu:${LD_LIBRARY_PATH}"
ENV PKG_CONFIG_PATH="/usr/lib/x86_64-linux-gnu/pkgconfig:${PKG_CONFIG_PATH}"
# Install Python dependencies
RUN pip install --no-cache-dir boto3 pytest requests
# Set working directory
WORKDIR /workspace
# Default command
CMD ["/bin/bash"]

View File

@@ -6,66 +6,6 @@ ODIN := odin
BUILD_DIR := build BUILD_DIR := build
SRC_DIR := . SRC_DIR := .
# Docker configuration for test SDK
TEST_SDK_IMAGE := your-dockerhub-username/odin-python-test-sdk
TEST_SDK_TAG := latest
JORMUN_PORT ?= 8002
# Build the test SDK Docker image
.PHONY: build-test-sdk
build-test-sdk:
@echo "Building test SDK Docker image..."
docker build -f Dockerfile_test_sdk -t $(TEST_SDK_IMAGE):$(TEST_SDK_TAG) .
@echo "Test SDK image built successfully"
# Push the test SDK image to registry
.PHONY: push-test-sdk
push-test-sdk: build-test-sdk
@echo "Pushing test SDK image to registry..."
docker push $(TEST_SDK_IMAGE):$(TEST_SDK_TAG)
@echo "Test SDK image pushed successfully"
# Pull the test SDK image from registry
.PHONY: pull-test-sdk
pull-test-sdk:
@echo "Pulling test SDK image from registry..."
docker pull $(TEST_SDK_IMAGE):$(TEST_SDK_TAG)
# Run SDK tests in the consolidated container
.PHONY: test-sdk
test-sdk:
@echo "Running SDK tests..."
docker run --rm \
--network host \
-v $(PWD):/workspace \
-w /workspace \
-e JORMUN_ENDPOINT=http://localhost:$(JORMUN_PORT) \
-e AWS_ACCESS_KEY_ID=local \
-e AWS_SECRET_ACCESS_KEY=local \
-e AWS_DEFAULT_REGION=us-east-1 \
$(TEST_SDK_IMAGE):$(TEST_SDK_TAG) \
sh -c "make build && python tests/sdk/test_sdk.py"
# Run SDK tests with live rebuild (for development)
.PHONY: test-sdk-dev
test-sdk-dev:
@echo "Running SDK tests with live rebuild..."
docker run --rm -it \
--network host \
-v $(PWD):/workspace \
-w /workspace \
-e JORMUN_ENDPOINT=http://localhost:$(JORMUN_PORT) \
-e AWS_ACCESS_KEY_ID=local \
-e AWS_SECRET_ACCESS_KEY=local \
-e AWS_DEFAULT_REGION=us-east-1 \
$(TEST_SDK_IMAGE):$(TEST_SDK_TAG) \
/bin/bash
# One-time setup: build and push test SDK image
.PHONY: setup-test-sdk
setup-test-sdk: build-test-sdk push-test-sdk
@echo "Test SDK setup complete"
# C++ shim (WAL replication helpers via RocksDB C++ API) # C++ shim (WAL replication helpers via RocksDB C++ API)
SHIM_DIR := rocksdb_shim SHIM_DIR := rocksdb_shim
SHIM_LIB := $(BUILD_DIR)/libjormun_rocksdb_shim.a SHIM_LIB := $(BUILD_DIR)/libjormun_rocksdb_shim.a
@@ -207,36 +147,6 @@ check-deps:
@pkg-config --exists rocksdb || (echo "$(RED)✗ RocksDB not found$(NC)" && exit 1) @pkg-config --exists rocksdb || (echo "$(RED)✗ RocksDB not found$(NC)" && exit 1)
@echo "$(GREEN)✓ All dependencies found$(NC)" @echo "$(GREEN)✓ All dependencies found$(NC)"
# AWS CLI test commands
aws-test: run &
@sleep 2
@echo "$(BLUE)Testing with AWS CLI...$(NC)"
@echo "\n$(YELLOW)Creating table...$(NC)"
@aws dynamodb create-table \
--endpoint-url http://localhost:$(PORT) \
--table-name TestTable \
--key-schema AttributeName=pk,KeyType=HASH \
--attribute-definitions AttributeName=pk,AttributeType=S \
--billing-mode PAY_PER_REQUEST || true
@echo "\n$(YELLOW)Listing tables...$(NC)"
@aws dynamodb list-tables --endpoint-url http://localhost:$(PORT)
@echo "\n$(YELLOW)Putting item...$(NC)"
@aws dynamodb put-item \
--endpoint-url http://localhost:$(PORT) \
--table-name TestTable \
--item '{"pk":{"S":"test1"},"data":{"S":"hello world"}}'
@echo "\n$(YELLOW)Getting item...$(NC)"
@aws dynamodb get-item \
--endpoint-url http://localhost:$(PORT) \
--table-name TestTable \
--key '{"pk":{"S":"test1"}}'
@echo "\n$(YELLOW)Scanning table...$(NC)"
@aws dynamodb scan \
--endpoint-url http://localhost:$(PORT) \
--table-name TestTable
@echo "\n$(GREEN)✓ AWS CLI test complete$(NC)"
# Development workflow # Development workflow
dev: clean build run dev: clean build run
@@ -261,7 +171,6 @@ help:
@echo "" @echo ""
@echo "$(GREEN)Test Commands:$(NC)" @echo "$(GREEN)Test Commands:$(NC)"
@echo " make test - Run unit tests" @echo " make test - Run unit tests"
@echo " make aws-test - Test with AWS CLI commands"
@echo "" @echo ""
@echo "$(GREEN)Utility Commands:$(NC)" @echo "$(GREEN)Utility Commands:$(NC)"
@echo " make fmt - Format source code" @echo " make fmt - Format source code"

View File

@@ -404,7 +404,6 @@ brew upgrade odin # macOS
## Next Steps ## Next Steps
- Read [ARCHITECTURE.md](ARCHITECTURE.md) for internals
- Check [TODO.md](TODO.md) for implementation status - Check [TODO.md](TODO.md) for implementation status
- Browse source code in `dynamodb/`, `rocksdb/`, etc. - Browse source code in `dynamodb/`, `rocksdb/`, etc.

View File

@@ -189,16 +189,38 @@ make run PORT=9000 DATA_DIR=/tmp/db VERBOSE=1
## Performance ## Performance
From benchmarks on the original Zig version (Odin expected to be similar or better): Benchmarked on single node localhost, 1000 iterations per test.
``` ### Basic Operations
Sequential Writes | 10000 ops | 245.32 ms | 40765 ops/sec
Random Reads | 10000 ops | 312.45 ms | 32006 ops/sec | Operation | Throughput | Avg Latency | P95 Latency | P99 Latency |
Batch Writes | 10000 ops | 89.23 ms | 112071 ops/sec |-----------|------------|-------------|-------------|-------------|
PutItem | 5000 ops | 892.34 ms | 5604 ops/sec | **PutItem** | 1,021 ops/sec | 0.98ms | 1.02ms | 1.64ms |
GetItem | 5000 ops | 678.91 ms | 7365 ops/sec | **GetItem** | 1,207 ops/sec | 0.83ms | 0.90ms | 1.14ms |
Scan (full table) | 5000 ops | 234.56 ms | 21320 ops/sec | **Query** | 1,002 ops/sec | 1.00ms | 1.11ms | 1.85ms |
``` | **Scan** (100 items) | 18,804 ops/sec | 0.05ms | - | - |
| **DeleteItem** | 1,254 ops/sec | 0.80ms | - | - |
### Batch Operations
| Operation | Throughput | Batch Size |
|-----------|------------|------------|
| **BatchWriteItem** | 9,297 ops/sec | 25 items |
| **BatchGetItem** | 9,113 ops/sec | 25 items |
### Concurrent Operations
| Workers | Throughput | Avg Latency | P95 Latency | P99 Latency |
|---------|------------|-------------|-------------|-------------|
| **10 concurrent** | 1,286 ops/sec | 7.70ms | 15.16ms | 19.72ms |
### Large Payloads
| Payload Size | Throughput | Avg Latency |
|--------------|------------|-------------|
| **10KB** | 522 ops/sec | 1.91ms |
| **50KB** | 166 ops/sec | 6.01ms |
| **100KB** | 96 ops/sec | 10.33ms |
## API Compatibility ## API Compatibility
@@ -218,11 +240,11 @@ Scan (full table) | 5000 ops | 234.56 ms | 21320 ops/sec
- ✅ ProjectionExpression - ✅ ProjectionExpression
- ✅ BatchWriteItem - ✅ BatchWriteItem
- ✅ BatchGetItem - ✅ BatchGetItem
- ✅ Global Secondary Indexes
### Coming Soon ### Coming Soon
- ⏳ UpdateItem (works but needs UPDATED_NEW/UPDATED_OLD response filtering to work for full Dynamo Parity) - ⏳ UpdateItem (works but needs UPDATED_NEW/UPDATED_OLD response filtering to work for full Dynamo Parity)
- ⏳ Global Secondary Indexes
- ⏳ Local Secondary Indexes - ⏳ Local Secondary Indexes
## Configuration ## Configuration

View File

@@ -8,7 +8,7 @@ Goal: "aws cli works reliably for CreateTable/ListTables/PutItem/GetItem/DeleteI
### 1) HTTP + routing hardening ### 1) HTTP + routing hardening
- [ ] Audit request parsing boundaries: - [ ] Audit request parsing boundaries:
- Max body size enforcement (config exists, need to verify enforcement path) - Max body size enforcement **DONE**
- Missing/invalid headers → correct DynamoDB error types - Missing/invalid headers → correct DynamoDB error types
- Content-Type handling (be permissive but consistent) - Content-Type handling (be permissive but consistent)
- [x] Ensure **all request-scoped allocations** come from the request arena (no accidental long-lived allocs) - [x] Ensure **all request-scoped allocations** come from the request arena (no accidental long-lived allocs)

View File

@@ -1,14 +0,0 @@
services:
sdk-test:
image: python:3.12-slim
network_mode: host
working_dir: /tests
volumes:
- ./tests/sdk:/tests
environment:
- JORMUN_ENDPOINT=http://localhost:${JORMUN_PORT:-8002}
- AWS_ACCESS_KEY_ID=local
- AWS_SECRET_ACCESS_KEY=local
- AWS_DEFAULT_REGION=us-east-1
command: >
sh -c "pip install --quiet boto3 && python test_sdk.py"

View File

@@ -98,8 +98,15 @@ batch_write_item :: proc(
delete(failed_requests) delete(failed_requests)
return result, var_err return result, var_err
case .RocksDB_Error, .Item_Not_Found, .Table_Not_Found: case .Table_Not_Found:
// Transient/throttling errors — add to unprocessed // Non-existent table is a hard request failure, not a retryable condition.
// DynamoDB returns ResourceNotFoundException for the whole request.
batch_write_result_destroy(&result)
delete(failed_requests)
return result, .Table_Not_Found
case .RocksDB_Error, .Item_Not_Found:
// Genuinely transient/infrastructure errors — add to UnprocessedItems.
failed_item := item_deep_copy(req.item) failed_item := item_deep_copy(req.item)
append(&failed_requests, Write_Request{ append(&failed_requests, Write_Request{
type = req.type, type = req.type,

View File

@@ -16,6 +16,7 @@
package dynamodb package dynamodb
import "core:encoding/json" import "core:encoding/json"
import "core:strings"
// ============================================================================ // ============================================================================
// Condition Evaluation Result // Condition Evaluation Result
@@ -54,7 +55,7 @@ parse_condition_expression_string :: proc(request_body: []byte) -> (expr: string
return return
} }
expr = string(ce_str) expr = strings.clone(string(ce_str))
ok = true ok = true
return return
} }
@@ -88,6 +89,7 @@ evaluate_condition_expression :: proc(
if !has_condition { if !has_condition {
return .Passed // No condition → always pass return .Passed // No condition → always pass
} }
defer delete(condition_str)
// Parse the condition into a filter tree (same grammar as FilterExpression) // Parse the condition into a filter tree (same grammar as FilterExpression)
filter_node, parse_ok := parse_filter_expression(condition_str, attr_names, attr_values) filter_node, parse_ok := parse_filter_expression(condition_str, attr_names, attr_values)

View File

@@ -111,7 +111,7 @@ tokenizer_next :: proc(t: ^Tokenizer) -> Maybe(string) {
} }
// Single-character operators // Single-character operators
if c == '=' || c == '<' || c == '>' { if c == '=' || c == '<' || c == '>' || c == '+' || c == '-' {
t.pos += 1 t.pos += 1
return t.input[start:t.pos] return t.input[start:t.pos]
} }
@@ -140,7 +140,7 @@ is_ident_char :: proc(c: byte) -> bool {
return (c >= 'a' && c <= 'z') || return (c >= 'a' && c <= 'z') ||
(c >= 'A' && c <= 'Z') || (c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9') || (c >= '0' && c <= '9') ||
c == '_' || c == ':' || c == '#' || c == '-' || c == '.' c == '_' || c == ':' || c == '#' || c == '.'
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -480,7 +480,7 @@ parse_key_condition_expression_string :: proc(request_body: []byte) -> (expr: st
return return
} }
expr = string(kce_str) expr = strings.clone(string(kce_str))
ok = true ok = true
return return
} }
@@ -488,6 +488,7 @@ parse_key_condition_expression_string :: proc(request_body: []byte) -> (expr: st
// Convenience: parse a complete Query key condition from request body // Convenience: parse a complete Query key condition from request body
parse_query_key_condition :: proc(request_body: []byte) -> (kc: Key_Condition, ok: bool) { parse_query_key_condition :: proc(request_body: []byte) -> (kc: Key_Condition, ok: bool) {
expression := parse_key_condition_expression_string(request_body) or_return expression := parse_key_condition_expression_string(request_body) or_return
defer delete(expression)
attr_names := parse_expression_attribute_names(request_body) attr_names := parse_expression_attribute_names(request_body)
defer { defer {

View File

@@ -817,7 +817,7 @@ parse_filter_expression_string :: proc(request_body: []byte) -> (expr: string, o
return return
} }
expr = string(fe_str) expr = strings.clone(string(fe_str))
ok = true ok = true
return return
} }

View File

@@ -175,15 +175,31 @@ gsi_batch_write_entries :: proc(
return .None return .None
} }
base_key, base_ok := key_from_item(item, metadata.key_schema)
if !base_ok {
return .Missing_Key_Attribute
}
defer key_destroy(&base_key)
base_vals, base_vals_ok := key_get_values(&base_key)
if !base_vals_ok {
return .Invalid_Key
}
for &gsi in gsis { for &gsi in gsis {
// Extract GSI key from item // Extract GSI key from item
gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema) gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema)
if !kv_ok { if !kv_ok do continue // item doesn't have GSI PK, skip
continue // Sparse: item doesn't have GSI PK, skip
}
// Build GSI storage key // Build GSI storage key
gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) gsi_storage_key := build_gsi_key(
table_name,
gsi.index_name,
gsi_kv.pk,
gsi_kv.sk,
base_vals.pk,
base_vals.sk,
)
defer delete(gsi_storage_key) defer delete(gsi_storage_key)
// Build projected item // Build projected item
@@ -218,103 +234,35 @@ gsi_batch_delete_entries :: proc(
return .None return .None
} }
for &gsi in gsis { base_key, base_ok := key_from_item(old_item, metadata.key_schema)
gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema) if !base_ok {
if !kv_ok { return .Missing_Key_Attribute
continue // Item didn't have a GSI entry
} }
defer key_destroy(&base_key)
gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) base_vals, base_vals_ok := key_get_values(&base_key)
defer delete(gsi_storage_key) if !base_vals_ok {
return .Invalid_Key
// Add to batch (not written yet)
rocksdb.batch_delete(batch, gsi_storage_key)
}
return .None
}
// ============================================================================
// DEPRECATED - Non-atomic GSI maintenance
//
// These procedures are kept for backwards compatibility but should NOT be used.
// They perform individual database writes which is NOT atomic.
// Use gsi_batch_write_entries and gsi_batch_delete_entries instead.
// ============================================================================
// DEPRECATED: Use gsi_batch_write_entries instead for atomic operations.
// Write GSI entries for an item across all GSIs defined on the table.
// WARNING: This performs individual writes which is NOT atomic!
gsi_write_entries :: proc(
engine: ^Storage_Engine,
table_name: string,
item: Item,
metadata: ^Table_Metadata,
) -> Storage_Error {
gsis, has_gsis := metadata.global_secondary_indexes.?
if !has_gsis || len(gsis) == 0 {
return .None
} }
for &gsi in gsis { for &gsi in gsis {
// Extract GSI key from item // Extract GSI key from item
gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema) gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema)
if !kv_ok { if !kv_ok do continue // old item doesn't have GSI PK, skip
continue // Sparse: item doesn't have GSI PK, skip
}
// Build GSI storage key // Build GSI storage key
gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) gsi_storage_key := build_gsi_key(
table_name,
gsi.index_name,
gsi_kv.pk,
gsi_kv.sk,
base_vals.pk,
base_vals.sk,
)
defer delete(gsi_storage_key) defer delete(gsi_storage_key)
// Build projected item // Add to batch (not written yet)
projected := gsi_project_item(item, &gsi, metadata.key_schema) rocksdb.batch_delete(batch, gsi_storage_key)
defer item_destroy(&projected)
// Encode projected item
encoded, encode_ok := encode(projected)
if !encode_ok {
return .Serialization_Error
}
defer delete(encoded)
// Write to RocksDB
put_err := rocksdb.db_put(&engine.db, gsi_storage_key, encoded)
if put_err != .None {
return .RocksDB_Error
}
}
return .None
}
// DEPRECATED: Use gsi_batch_delete_entries instead for atomic operations.
// Delete GSI entries for an item across all GSIs.
// WARNING: This performs individual writes which is NOT atomic!
gsi_delete_entries :: proc(
engine: ^Storage_Engine,
table_name: string,
old_item: Item,
metadata: ^Table_Metadata,
) -> Storage_Error {
gsis, has_gsis := metadata.global_secondary_indexes.?
if !has_gsis || len(gsis) == 0 {
return .None
}
for &gsi in gsis {
gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema)
if !kv_ok {
continue // Item didn't have a GSI entry
}
gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk)
defer delete(gsi_storage_key)
del_err := rocksdb.db_delete(&engine.db, gsi_storage_key)
if del_err != .None {
return .RocksDB_Error
}
} }
return .None return .None

View File

@@ -322,17 +322,25 @@ serialize_item_to_builder :: proc(b: ^strings.Builder, item: Item) {
serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) { serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) {
switch v in attr { switch v in attr {
case String: case String:
fmt.sbprintf(b, `{"S":"%s"}`, string(v)) strings.write_string(b, `{"S":"`)
strings.write_string(b, string(v))
strings.write_string(b, `"}`)
case DDB_Number: case DDB_Number:
num_str := format_ddb_number(v) num_str := format_ddb_number(v)
fmt.sbprintf(b, `{"N":"%s"}`, num_str) strings.write_string(b, `{"N":"`)
strings.write_string(b, num_str)
strings.write_string(b, `"}`)
case Binary: case Binary:
fmt.sbprintf(b, `{"B":"%s"}`, string(v)) strings.write_string(b, `{"B":"`)
strings.write_string(b, string(v))
strings.write_string(b, `"}`)
case Bool: case Bool:
fmt.sbprintf(b, `{"BOOL":%v}`, bool(v)) strings.write_string(b, `{"BOOL":`)
if bool(v) { strings.write_string(b, "true") } else { strings.write_string(b, "false") }
strings.write_string(b, "}")
case Null: case Null:
strings.write_string(b, `{"NULL":true}`) strings.write_string(b, `{"NULL":true}`)
@@ -431,7 +439,7 @@ parse_table_name :: proc(request_body: []byte) -> (string, bool) {
return "", false return "", false
} }
return string(table_name_str), true return strings.clone(string(table_name_str)), true
} }
// Parse Item field from request body // Parse Item field from request body
@@ -521,45 +529,49 @@ parse_limit :: proc(request_body: []byte) -> int {
// Returns nil (not an error) when the field is absent. // Returns nil (not an error) when the field is absent.
// ============================================================================ // ============================================================================
// Returns (key, ok, body_parse_err).
// ok=true, body_parse_err=false → key present and valid, or key absent (no pagination)
// ok=false, body_parse_err=true → request body is not valid JSON or not an object
// ok=false, body_parse_err=false → ExclusiveStartKey present but malformed/invalid
parse_exclusive_start_key :: proc( parse_exclusive_start_key :: proc(
request_body: []byte, request_body: []byte,
table_name: string, table_name: string,
key_schema: []Key_Schema_Element, key_schema: []Key_Schema_Element,
) -> (result: Maybe([]byte), ok: bool) { ) -> (result: Maybe([]byte), ok: bool, body_err: bool) {
data, parse_err := json.parse(request_body, allocator = context.temp_allocator) data, parse_err := json.parse(request_body, allocator = context.temp_allocator)
if parse_err != nil { if parse_err != nil {
return nil, true // no ESK is fine return nil, false, true // body is not valid JSON — real error
} }
defer json.destroy_value(data) defer json.destroy_value(data)
root, root_ok := data.(json.Object) root, root_ok := data.(json.Object)
if !root_ok { if !root_ok {
return nil, true return nil, false, true // root must be an object — real error
} }
esk_val, found := root["ExclusiveStartKey"] esk_val, found := root["ExclusiveStartKey"]
if !found { if !found {
return nil, true // absent → no pagination, that's ok return nil, true, false // absent → no pagination, that's ok
} }
// Parse ExclusiveStartKey as a DynamoDB Item // Parse ExclusiveStartKey as a DynamoDB Item
key_item, item_ok := parse_item_from_value(esk_val) key_item, item_ok := parse_item_from_value(esk_val)
if !item_ok { if !item_ok {
return nil, false // present but malformed → real error return nil, false, false // present but malformed → validation error
} }
defer item_destroy(&key_item) defer item_destroy(&key_item)
// Validate and extract key struct using schema // Validate and extract key struct using schema
key_struct, key_ok := key_from_item(key_item, key_schema) key_struct, key_ok := key_from_item(key_item, key_schema)
if !key_ok { if !key_ok {
return nil, false // missing required key attributes return nil, false, false // missing required key attributes
} }
defer key_destroy(&key_struct) defer key_destroy(&key_struct)
// Get raw byte values // Get raw byte values
key_values, kv_ok := key_get_values(&key_struct) key_values, kv_ok := key_get_values(&key_struct)
if !kv_ok { if !kv_ok {
return nil, false return nil, false, false
} }
// Build binary storage key // Build binary storage key
@@ -569,6 +581,53 @@ parse_exclusive_start_key :: proc(
return return
} }
// parse_exclusive_start_key_gsi ... Just a helper for GSI keys
// Returns (key, ok, body_parse_err) — same contract as parse_exclusive_start_key.
parse_exclusive_start_key_gsi :: proc(
request_body: []byte,
table_name: string,
metadata: ^Table_Metadata,
gsi: ^Global_Secondary_Index,
) -> (Maybe([]byte), bool, bool) {
root, parse_err := json.parse(request_body)
if parse_err != nil do return nil, false, true // body not valid JSON
defer json.destroy_value(root)
obj, obj_ok := root.(json.Object)
if !obj_ok do return nil, false, true // root must be an object
esk_val, has := obj["ExclusiveStartKey"]
if !has do return nil, true, false // absent → no pagination
key_item, key_ok := parse_item_from_value(esk_val)
if !key_ok do return nil, false, false
defer item_destroy(&key_item)
idx_key, idx_ok := key_from_item(key_item, gsi.key_schema)
if !idx_ok do return nil, false, false
defer key_destroy(&idx_key)
idx_vals, idx_vals_ok := key_get_values(&idx_key)
if !idx_vals_ok do return nil, false, false
base_key, base_ok := key_from_item(key_item, metadata.key_schema)
if !base_ok do return nil, false, false
defer key_destroy(&base_key)
base_vals, base_vals_ok := key_get_values(&base_key)
if !base_vals_ok do return nil, false, false
k := build_gsi_key(
table_name,
gsi.index_name,
idx_vals.pk,
idx_vals.sk,
base_vals.pk,
base_vals.sk,
)
return k, true, false
}
// ============================================================================ // ============================================================================
// LastEvaluatedKey Generation (Pagination Output) // LastEvaluatedKey Generation (Pagination Output)
// //
@@ -640,3 +699,122 @@ serialize_last_evaluated_key :: proc(
return serialize_item(item), true return serialize_item(item), true
} }
Decoded_GSI_Key_Full :: struct {
gsi_pk: []byte,
gsi_sk: Maybe([]byte),
base_pk: []byte,
base_sk: Maybe([]byte),
}
// Decode binary GSI key:
//
// [gsi][table_name][index_name][gsi_pk][gsi_sk?][base_pk][base_sk?]
//
// Presence of gsi_sk/base_sk depends on whether the index/table has a RANGE key.
decode_gsi_key_full_borrowed :: proc(
binary_key: []byte,
gsi_has_sort_key: bool,
table_has_sort_key: bool,
) -> (result: Decoded_GSI_Key_Full, ok: bool) {
decoder := Key_Decoder{data = binary_key, pos = 0}
et := decoder_read_entity_type(&decoder) or_return
if et != .GSI {
return {}, false
}
// Skip table name + index name
_ = decoder_read_segment_borrowed(&decoder) or_return
_ = decoder_read_segment_borrowed(&decoder) or_return
// Read GSI PK
result.gsi_pk = decoder_read_segment_borrowed(&decoder) or_return
// Read GSI SK if index has one
if gsi_has_sort_key {
sk := decoder_read_segment_borrowed(&decoder) or_return
result.gsi_sk = sk
}
// Read base PK
result.base_pk = decoder_read_segment_borrowed(&decoder) or_return
// Read base SK if table has one
if table_has_sort_key {
sk := decoder_read_segment_borrowed(&decoder) or_return
result.base_sk = sk
}
return result, true
}
// Serialize a binary *GSI* key into a DynamoDB LastEvaluatedKey JSON object.
// The output must include the *index* key attrs + the *base table* primary key attrs,
// so boto can round-trip ExclusiveStartKey correctly.
serialize_last_evaluated_key_gsi :: proc(
binary_key: []byte,
metadata: ^Table_Metadata,
gsi: ^Global_Secondary_Index,
) -> (result: string, ok: bool) {
// Determine whether index/table have range keys
_, gsi_has_sk := gsi_get_sort_key_name(gsi).?
_, tbl_has_sk := table_metadata_get_sort_key_name(metadata).?
decoded, dec_ok := decode_gsi_key_full_borrowed(binary_key, gsi_has_sk, tbl_has_sk)
if !dec_ok {
return "", false
}
// Resolve key attribute names + types
idx_pk_name := gsi_get_partition_key_name(gsi).? or_return
idx_pk_type := table_metadata_get_attribute_type(metadata, idx_pk_name).? or_return
idx_sk_name: Maybe(string) = gsi_get_sort_key_name(gsi)
idx_sk_type: Maybe(Scalar_Attribute_Type) = nil
if n, has := idx_sk_name.?; has {
idx_sk_type = table_metadata_get_attribute_type(metadata, n)
}
base_pk_name := table_metadata_get_partition_key_name(metadata).? or_return
base_pk_type := table_metadata_get_attribute_type(metadata, base_pk_name).? or_return
base_sk_name: Maybe(string) = table_metadata_get_sort_key_name(metadata)
base_sk_type: Maybe(Scalar_Attribute_Type) = nil
if n, has := base_sk_name.?; has {
base_sk_type = table_metadata_get_attribute_type(metadata, n)
}
// Build LEK item
lek := make(Item)
defer item_destroy(&lek)
add_attr_once :: proc(item: ^Item, name: string, raw: []byte, t: Scalar_Attribute_Type) {
if _, exists := item^[name]; exists {
return
}
item^[strings.clone(name)] = build_attribute_value_with_type(raw, t)
}
// Index keys
add_attr_once(&lek, idx_pk_name, decoded.gsi_pk, idx_pk_type)
if sk_raw, has := decoded.gsi_sk.?; has {
skn := idx_sk_name.? or_return
skt := idx_sk_type.? or_return
add_attr_once(&lek, skn, sk_raw, skt)
}
// Base table keys
add_attr_once(&lek, base_pk_name, decoded.base_pk, base_pk_type)
if sk_raw, has := decoded.base_sk.?; has {
skn := base_sk_name.? or_return
skt := base_sk_type.? or_return
add_attr_once(&lek, skn, sk_raw, skt)
}
return serialize_item(lek), true
}

View File

@@ -130,32 +130,43 @@ build_partition_prefix :: proc(table_name: string, pk_value: []byte) -> []byte {
return bytes.buffer_to_bytes(&buf) return bytes.buffer_to_bytes(&buf)
} }
// Build GSI key: [gsi][table_name][index_name][gsi_pk][gsi_sk?] // Build GSI key: [gsi][table_name][index_name][gsi_pk][gsi_sk?][base_pk][base_sk?]
build_gsi_key :: proc(table_name: string, index_name: string, gsi_pk: []byte, gsi_sk: Maybe([]byte)) -> []byte { build_gsi_key :: proc(
table_name: string,
index_name: string,
gsi_pk: []byte,
gsi_sk: Maybe([]byte),
base_pk: []byte,
base_sk: Maybe([]byte),
) -> []byte {
buf: bytes.Buffer buf: bytes.Buffer
bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) bytes.buffer_init_allocator(&buf, 0, 512, context.allocator)
// Write entity type
bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI)) bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI))
// Write table name
encode_varint(&buf, len(table_name)) encode_varint(&buf, len(table_name))
bytes.buffer_write_string(&buf, table_name) bytes.buffer_write_string(&buf, table_name)
// Write index name
encode_varint(&buf, len(index_name)) encode_varint(&buf, len(index_name))
bytes.buffer_write_string(&buf, index_name) bytes.buffer_write_string(&buf, index_name)
// Write GSI partition key
encode_varint(&buf, len(gsi_pk)) encode_varint(&buf, len(gsi_pk))
bytes.buffer_write(&buf, gsi_pk) bytes.buffer_write(&buf, gsi_pk)
// Write GSI sort key if present
if sk, ok := gsi_sk.?; ok { if sk, ok := gsi_sk.?; ok {
encode_varint(&buf, len(sk)) encode_varint(&buf, len(sk))
bytes.buffer_write(&buf, sk) bytes.buffer_write(&buf, sk)
} }
// tie-breaker: base table primary key
encode_varint(&buf, len(base_pk))
bytes.buffer_write(&buf, base_pk)
if sk, ok := base_sk.?; ok {
encode_varint(&buf, len(sk))
bytes.buffer_write(&buf, sk)
}
return bytes.buffer_to_bytes(&buf) return bytes.buffer_to_bytes(&buf)
} }

View File

@@ -273,7 +273,7 @@ serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) {
fmt.sbprintf(&gsi_builder, `{{"AttributeName":"%s","KeyType":"%s"}}`, fmt.sbprintf(&gsi_builder, `{{"AttributeName":"%s","KeyType":"%s"}}`,
ks.attribute_name, key_type_to_string(ks.key_type)) ks.attribute_name, key_type_to_string(ks.key_type))
} }
strings.write_string(&gsi_builder, `],"Projection":{{"ProjectionType":"`) strings.write_string(&gsi_builder, `],"Projection":{"ProjectionType":"`)
switch gsi.projection.projection_type { switch gsi.projection.projection_type {
case .ALL: strings.write_string(&gsi_builder, "ALL") case .ALL: strings.write_string(&gsi_builder, "ALL")
case .KEYS_ONLY: strings.write_string(&gsi_builder, "KEYS_ONLY") case .KEYS_ONLY: strings.write_string(&gsi_builder, "KEYS_ONLY")
@@ -532,6 +532,10 @@ get_table_metadata :: proc(engine: ^Storage_Engine, table_name: string) -> (Tabl
return {}, .Serialization_Error return {}, .Serialization_Error
} }
// table_name is not stored in the serialized blob (it IS the RocksDB key),
// so we populate it here from the argument we already have.
metadata.table_name = strings.clone(table_name, engine.allocator)
return metadata, .None return metadata, .None
} }
@@ -789,12 +793,20 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto
// --- Check if item already exists (need old item for GSI cleanup) --- // --- Check if item already exists (need old item for GSI cleanup) ---
old_item: Maybe(Item) = nil old_item: Maybe(Item) = nil
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key) existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
if existing_err == .None && existing_value != nil { if existing_err == .NotFound {
// Item does not exist — nothing to clean up, proceed normally.
} else if existing_err != .None {
// Unexpected RocksDB I/O error — fail closed to avoid orphaned GSI entries.
return .RocksDB_Error
} else if existing_value != nil {
defer delete(existing_value) defer delete(existing_value)
decoded_old, decode_ok := decode(existing_value) decoded_old, decode_ok := decode(existing_value)
if decode_ok { if !decode_ok {
old_item = decoded_old // Value exists but is unreadable — fail closed rather than leaving
// stale GSI entries behind after the overwrite.
return .Serialization_Error
} }
old_item = decoded_old
} }
// Cleanup old_item at the end // Cleanup old_item at the end
defer { defer {
@@ -941,12 +953,21 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
// --- Read existing item to know which GSI entries to remove --- // --- Read existing item to know which GSI entries to remove ---
old_item: Maybe(Item) = nil old_item: Maybe(Item) = nil
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key) existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
if existing_err == .None && existing_value != nil { if existing_err == .NotFound {
// Item does not exist — nothing to delete (DynamoDB idempotent delete).
return .None
} else if existing_err != .None {
// Unexpected RocksDB I/O error — fail closed.
return .RocksDB_Error
} else if existing_value != nil {
defer delete(existing_value) defer delete(existing_value)
decoded_old, decode_ok := decode(existing_value) decoded_old, decode_ok := decode(existing_value)
if decode_ok { if !decode_ok {
old_item = decoded_old // Value exists but is corrupt — fail closed rather than deleting the
// base item while leaving its GSI entries dangling.
return .Serialization_Error
} }
old_item = decoded_old
} }
// Cleanup old_item at the end // Cleanup old_item at the end
defer { defer {
@@ -956,7 +977,7 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
} }
} }
// If item doesn't exist, nothing to delete (not an error in DynamoDB) // If item doesn't exist (existing_value was nil with no error), nothing to delete.
if _, has_old := old_item.?; !has_old { if _, has_old := old_item.?; !has_old {
return .None return .None
} }

View File

@@ -54,6 +54,10 @@ Cancellation_Reason :: struct {
} }
transact_write_action_destroy :: proc(action: ^Transact_Write_Action) { transact_write_action_destroy :: proc(action: ^Transact_Write_Action) {
delete(action.table_name)
if ce, has := action.condition_expr.?; has {
delete(ce)
}
if item, has := action.item.?; has { if item, has := action.item.?; has {
item_copy := item item_copy := item
item_destroy(&item_copy) item_destroy(&item_copy)
@@ -329,7 +333,31 @@ transact_write_items :: proc(
} }
} }
existing, _ := get_item_internal(engine, action.table_name, key_item, metadata) existing, read_err := get_item_internal(engine, action.table_name, key_item, metadata)
#partial switch read_err {
case .None:
// Item found or not found — both fine.
case .RocksDB_Error, .Serialization_Error, .Internal_Error:
// Cannot safely determine old index keys — cancel the entire transaction.
reasons[idx] = Cancellation_Reason{
code = "InternalError",
message = "Failed to read existing item for index maintenance",
}
result.cancellation_reasons = reasons
return result, .Internal_Error
case .Missing_Key_Attribute, .Invalid_Key:
// The key we built from the action's own item/key should always be valid
// by this point (validated earlier), but treat defensively.
reasons[idx] = Cancellation_Reason{
code = "ValidationError",
message = "Invalid key when reading existing item",
}
result.cancellation_reasons = reasons
return result, .Internal_Error
case .Table_Not_Found, .Item_Not_Found, .Validation_Error:
// These should not be returned by get_item_internal, but handle
// defensively — treat as "item does not exist" and continue.
}
old_items[idx] = existing old_items[idx] = existing
} }
@@ -528,9 +556,9 @@ update_item_batch :: proc(
} }
defer item_destroy(&existing_item) defer item_destroy(&existing_item)
// Apply update plan // Apply update plan.
if !execute_update_plan(&existing_item, plan) { if exec_err := execute_update_plan(&existing_item, plan); exec_err != .None {
return .Invalid_Key return .Validation_Error
} }
// Encode updated item // Encode updated item
@@ -618,8 +646,12 @@ Transact_Get_Result :: struct {
} }
transact_get_action_destroy :: proc(action: ^Transact_Get_Action) { transact_get_action_destroy :: proc(action: ^Transact_Get_Action) {
delete(action.table_name)
item_destroy(&action.key) item_destroy(&action.key)
if proj, has := action.projection.?; has { if proj, has := action.projection.?; has {
for path in proj {
delete(path)
}
delete(proj) delete(proj)
} }
} }

View File

@@ -594,7 +594,23 @@ is_clause_keyword :: proc(tok: string) -> bool {
// Execute Update Plan — apply mutations to an Item (in-place) // Execute Update Plan — apply mutations to an Item (in-place)
// ============================================================================ // ============================================================================
execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool { // Reasons an update plan can fail at execution time.
// All of these map to ValidationException at the HTTP layer.
Update_Exec_Error :: enum {
None,
// SET x = source +/- val: source attribute does not exist in the item
Operand_Not_Found,
// SET x = source +/- val: source or value attribute is not a Number
Operand_Not_Number,
// SET x = list_append(source, val): source attribute is not a List
Operand_Not_List,
// ADD path val: existing attribute is not a Number, String_Set, or Number_Set
Add_Type_Mismatch,
// ADD path val: value type does not match the existing set type
Add_Value_Type_Mismatch,
}
execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> Update_Exec_Error {
// Execute SET actions // Execute SET actions
for &action in plan.sets { for &action in plan.sets {
switch action.value_kind { switch action.value_kind {
@@ -613,11 +629,11 @@ execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool {
if src, found := item[action.source]; found { if src, found := item[action.source]; found {
existing = src existing = src
} else { } else {
return false // source attribute not found return .Operand_Not_Found
} }
result, add_ok := numeric_add(existing, action.value) result, add_ok := numeric_add(existing, action.value)
if !add_ok { if !add_ok {
return false return .Operand_Not_Number
} }
if old, found := item[action.path]; found { if old, found := item[action.path]; found {
old_copy := old old_copy := old
@@ -632,11 +648,11 @@ execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool {
if src, found := item[action.source]; found { if src, found := item[action.source]; found {
existing = src existing = src
} else { } else {
return false return .Operand_Not_Found
} }
result, sub_ok := numeric_subtract(existing, action.value) result, sub_ok := numeric_subtract(existing, action.value)
if !sub_ok { if !sub_ok {
return false return .Operand_Not_Number
} }
if old, found := item[action.path]; found { if old, found := item[action.path]; found {
old_copy := old old_copy := old
@@ -664,7 +680,7 @@ execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool {
if l, is_list := src.(List); is_list { if l, is_list := src.(List); is_list {
existing_list = ([]Attribute_Value)(l) existing_list = ([]Attribute_Value)(l)
} else { } else {
return false return .Operand_Not_List
} }
} else { } else {
existing_list = {} existing_list = {}
@@ -674,7 +690,7 @@ execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool {
if l, is_list := action.value.(List); is_list { if l, is_list := action.value.(List); is_list {
append_list = ([]Attribute_Value)(l) append_list = ([]Attribute_Value)(l)
} else { } else {
return false return .Operand_Not_List
} }
new_list := make([]Attribute_Value, len(existing_list) + len(append_list)) new_list := make([]Attribute_Value, len(existing_list) + len(append_list))
@@ -711,7 +727,7 @@ execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool {
case DDB_Number: case DDB_Number:
result, add_ok := numeric_add(existing, action.value) result, add_ok := numeric_add(existing, action.value)
if !add_ok { if !add_ok {
return false return .Operand_Not_Number
} }
old_copy := existing old_copy := existing
attr_value_destroy(&old_copy) attr_value_destroy(&old_copy)
@@ -727,7 +743,7 @@ execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool {
delete_key(item, action.path) delete_key(item, action.path)
item[strings.clone(action.path)] = String_Set(merged) item[strings.clone(action.path)] = String_Set(merged)
} else { } else {
return false return .Add_Value_Type_Mismatch
} }
case DDB_Number_Set: case DDB_Number_Set:
@@ -738,11 +754,11 @@ execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool {
delete_key(item, action.path) delete_key(item, action.path)
item[strings.clone(action.path)] = DDB_Number_Set(merged) item[strings.clone(action.path)] = DDB_Number_Set(merged)
} else { } else {
return false return .Add_Value_Type_Mismatch
} }
case: case:
return false return .Add_Type_Mismatch
} }
} else { } else {
// Attribute doesn't exist — create it // Attribute doesn't exist — create it
@@ -786,7 +802,7 @@ execute_update_plan :: proc(item: ^Item, plan: ^Update_Plan) -> bool {
} }
} }
return true return .None
} }
// ============================================================================ // ============================================================================
@@ -930,7 +946,7 @@ parse_update_expression_string :: proc(request_body: []byte) -> (expr: string, o
return return
} }
expr = string(ue_str) expr = strings.clone(string(ue_str))
ok = true ok = true
return return
} }
@@ -939,24 +955,24 @@ parse_update_expression_string :: proc(request_body: []byte) -> (expr: string, o
parse_return_values :: proc(request_body: []byte) -> string { parse_return_values :: proc(request_body: []byte) -> string {
data, parse_err := json.parse(request_body, allocator = context.temp_allocator) data, parse_err := json.parse(request_body, allocator = context.temp_allocator)
if parse_err != nil { if parse_err != nil {
return "NONE" return strings.clone("NONE")
} }
defer json.destroy_value(data) defer json.destroy_value(data)
root, root_ok := data.(json.Object) root, root_ok := data.(json.Object)
if !root_ok { if !root_ok {
return "NONE" return strings.clone("NONE")
} }
rv_val, found := root["ReturnValues"] rv_val, found := root["ReturnValues"]
if !found { if !found {
return "NONE" return strings.clone("NONE")
} }
rv_str, str_ok := rv_val.(json.String) rv_str, str_ok := rv_val.(json.String)
if !str_ok { if !str_ok {
return "NONE" return strings.clone("NONE")
} }
return string(rv_str) return strings.clone(string(rv_str))
} }

View File

@@ -73,14 +73,14 @@ update_item :: proc(
return nil, nil, .RocksDB_Error return nil, nil, .RocksDB_Error
} }
// Apply update plan // Apply update plan.
if !execute_update_plan(&existing_item, plan) { if exec_err := execute_update_plan(&existing_item, plan); exec_err != .None {
item_destroy(&existing_item) item_destroy(&existing_item)
if old, has := old_item.?; has { if old, has := old_item.?; has {
old_copy := old old_copy := old
item_destroy(&old_copy) item_destroy(&old_copy)
} }
return nil, nil, .Invalid_Key return nil, nil, .Validation_Error
} }
// Validate key attributes are still present and correct type // Validate key attributes are still present and correct type

View File

@@ -259,7 +259,7 @@ parse_index_name :: proc(request_body: []byte) -> Maybe(string) {
return nil return nil
} }
return string(idx_str) return strings.clone(string(idx_str))
} }
// ============================================================================ // ============================================================================

110
http.odin
View File

@@ -6,6 +6,7 @@ import vmem "core:mem/virtual"
import "core:net" import "core:net"
import "core:strings" import "core:strings"
import "core:strconv" import "core:strconv"
import "core:thread"
// HTTP Method enumeration // HTTP Method enumeration
HTTP_Method :: enum { HTTP_Method :: enum {
@@ -100,9 +101,16 @@ response_set_body :: proc(resp: ^HTTP_Response, data: []byte) {
} }
// Request handler function type // Request handler function type
// Takes context pointer, request, and request-scoped allocator
Request_Handler :: #type proc(ctx: rawptr, request: ^HTTP_Request, request_alloc: mem.Allocator) -> HTTP_Response Request_Handler :: #type proc(ctx: rawptr, request: ^HTTP_Request, request_alloc: mem.Allocator) -> HTTP_Response
// Parse error enum
Parse_Error :: enum {
None,
Connection_Closed,
Invalid_Request,
Body_Too_Large,
}
// Server configuration // Server configuration
Server_Config :: struct { Server_Config :: struct {
max_body_size: int, // default 100MB max_body_size: int, // default 100MB
@@ -122,6 +130,13 @@ default_server_config :: proc() -> Server_Config {
} }
} }
// Connection task data - passed to worker threads
Connection_Task_Data :: struct {
server: ^Server,
conn: net.TCP_Socket,
source: net.Endpoint,
}
// Server // Server
Server :: struct { Server :: struct {
allocator: mem.Allocator, allocator: mem.Allocator,
@@ -168,9 +183,12 @@ server_start :: proc(server: ^Server) -> bool {
server.socket = socket server.socket = socket
server.running = true server.running = true
fmt.printfln("HTTP server listening on %v", server.endpoint) fmt.printfln("HTTP server listening on %v (thread-per-connection)", server.endpoint)
fmt.printfln(" Max body size: %d MB", server.config.max_body_size / (1024 * 1024))
fmt.printfln(" Max headers: %d", server.config.max_headers)
fmt.printfln(" Keep-alive: %v", server.config.enable_keep_alive)
// Accept loop // Accept loop - spawn a thread for each connection
for server.running { for server.running {
conn, source, accept_err := net.accept_tcp(socket) conn, source, accept_err := net.accept_tcp(socket)
if accept_err != nil { if accept_err != nil {
@@ -180,9 +198,23 @@ server_start :: proc(server: ^Server) -> bool {
continue continue
} }
// Handle connection in separate goroutine would go here // Allocate connection data
// For now, handle synchronously (should spawn thread) conn_data := new(Connection_Task_Data, server.allocator)
handle_connection(server, conn, source) conn_data.server = server
conn_data.conn = conn
conn_data.source = source
// Spawn a new thread for this connection
t := thread.create(connection_worker_thread)
if t != nil {
t.init_context = context
t.data = conn_data
thread.start(t)
} else {
// Failed to create thread, close connection
net.close(conn)
free(conn_data, server.allocator)
}
} }
return true return true
@@ -190,12 +222,33 @@ server_start :: proc(server: ^Server) -> bool {
server_stop :: proc(server: ^Server) { server_stop :: proc(server: ^Server) {
server.running = false server.running = false
// Close listening socket
if sock, ok := server.socket.?; ok { if sock, ok := server.socket.?; ok {
net.close(sock) net.close(sock)
server.socket = nil server.socket = nil
} }
} }
// Worker thread procedure
connection_worker_thread :: proc(t: ^thread.Thread) {
defer thread.destroy(t)
conn_data := cast(^Connection_Task_Data)t.data
defer free(conn_data, conn_data.server.allocator)
handle_connection(conn_data.server, conn_data.conn, conn_data.source)
}
// Create error response
make_error_response_simple :: proc(allocator: mem.Allocator, status: HTTP_Status, message: string) -> HTTP_Response {
response := response_init(allocator)
response_set_status(&response, status)
response_add_header(&response, "Content-Type", "text/plain")
response_set_body(&response, transmute([]byte)message)
return response
}
// Handle a single connection // Handle a single connection
handle_connection :: proc(server: ^Server, conn: net.TCP_Socket, source: net.Endpoint) { handle_connection :: proc(server: ^Server, conn: net.TCP_Socket, source: net.Endpoint) {
defer net.close(conn) defer net.close(conn)
@@ -214,13 +267,26 @@ handle_connection :: proc(server: ^Server, conn: net.TCP_Socket, source: net.End
request_alloc := vmem.arena_allocator(&arena) request_alloc := vmem.arena_allocator(&arena)
// TODO: Double check if we want *all* downstream allocations to use the request arena? // Set request arena as context allocator for downstream allocations
old := context.allocator old := context.allocator
context.allocator = request_alloc context.allocator = request_alloc
defer context.allocator = old defer context.allocator = old
request, parse_ok := parse_request(conn, request_alloc, server.config) request, parse_err := parse_request(conn, request_alloc, server.config)
if !parse_ok {
// Handle parse errors
if parse_err != .None {
#partial switch parse_err {
case .Body_Too_Large:
// Send 413 Payload Too Large
response := make_error_response_simple(request_alloc, .Payload_Too_Large,
fmt.tprintf("Request body exceeds maximum size of %d bytes", server.config.max_body_size))
send_response(conn, &response, request_alloc)
case .Invalid_Request:
// Send 400 Bad Request
response := make_error_response_simple(request_alloc, .Bad_Request, "Invalid HTTP request")
send_response(conn, &response, request_alloc)
}
break break
} }
@@ -250,13 +316,13 @@ parse_request :: proc(
conn: net.TCP_Socket, conn: net.TCP_Socket,
allocator: mem.Allocator, allocator: mem.Allocator,
config: Server_Config, config: Server_Config,
) -> (HTTP_Request, bool) { ) -> (HTTP_Request, Parse_Error) {
// Read request line and headers // Read request line and headers
buffer := make([]byte, config.read_buffer_size, allocator) buffer := make([]byte, config.read_buffer_size, allocator)
bytes_read, read_err := net.recv_tcp(conn, buffer) bytes_read, read_err := net.recv_tcp(conn, buffer)
if read_err != nil || bytes_read == 0 { if read_err != nil || bytes_read == 0 {
return {}, false return {}, .Connection_Closed
} }
request_data := buffer[:bytes_read] request_data := buffer[:bytes_read]
@@ -264,7 +330,7 @@ parse_request :: proc(
// Find end of headers (\r\n\r\n) // Find end of headers (\r\n\r\n)
header_end_idx := strings.index(string(request_data), "\r\n\r\n") header_end_idx := strings.index(string(request_data), "\r\n\r\n")
if header_end_idx < 0 { if header_end_idx < 0 {
return {}, false return {}, .Invalid_Request
} }
header_section := string(request_data[:header_end_idx]) header_section := string(request_data[:header_end_idx])
@@ -273,13 +339,13 @@ parse_request :: proc(
// Parse request line // Parse request line
lines := strings.split_lines(header_section, allocator) lines := strings.split_lines(header_section, allocator)
if len(lines) == 0 { if len(lines) == 0 {
return {}, false return {}, .Invalid_Request
} }
request_line := lines[0] request_line := lines[0]
parts := strings.split(request_line, " ", allocator) parts := strings.split(request_line, " ", allocator)
if len(parts) < 3 { if len(parts) < 3 {
return {}, false return {}, .Invalid_Request
} }
method := method_from_string(parts[0]) method := method_from_string(parts[0])
@@ -305,6 +371,11 @@ parse_request :: proc(
name = strings.clone(name, allocator), name = strings.clone(name, allocator),
value = strings.clone(value, allocator), value = strings.clone(value, allocator),
}) })
// Check max headers limit
if len(headers) > config.max_headers {
return {}, .Invalid_Request
}
} }
// Read body if Content-Length present // Read body if Content-Length present
@@ -314,7 +385,12 @@ parse_request :: proc(
if cl, ok := content_length_header.?; ok { if cl, ok := content_length_header.?; ok {
content_length := strconv.parse_int(cl) or_else 0 content_length := strconv.parse_int(cl) or_else 0
if content_length > 0 && content_length <= config.max_body_size { // Check if body size exceeds limit
if content_length > config.max_body_size {
return {}, .Body_Too_Large
}
if content_length > 0 {
// Check if we already have the body in buffer // Check if we already have the body in buffer
existing_body := request_data[body_start:] existing_body := request_data[body_start:]
@@ -336,7 +412,7 @@ parse_request :: proc(
n, err := net.recv_tcp(conn, chunk) n, err := net.recv_tcp(conn, chunk)
if err != nil || n == 0 { if err != nil || n == 0 {
return {}, false return {}, .Connection_Closed
} }
copy(body[body_written:], chunk[:n]) copy(body[body_written:], chunk[:n])
@@ -352,7 +428,7 @@ parse_request :: proc(
path = path, path = path,
headers = headers[:], headers = headers[:],
body = body, body = body,
}, true }, .None
} }
// Helper to get header from slice // Helper to get header from slice

386
main.odin
View File

@@ -13,6 +13,13 @@ Config :: struct {
port: int, port: int,
data_dir: string, data_dir: string,
verbose: bool, verbose: bool,
// HTTP server config
max_body_size: int,
max_headers: int,
read_buffer_size: int,
enable_keep_alive: bool,
max_requests_per_connection: int,
} }
main :: proc() { main :: proc() {
@@ -36,8 +43,14 @@ main :: proc() {
fmt.printfln("Storage engine initialized at %s", config.data_dir) fmt.printfln("Storage engine initialized at %s", config.data_dir)
fmt.printfln("Starting DynamoDB-compatible server on %s:%d", config.host, config.port) fmt.printfln("Starting DynamoDB-compatible server on %s:%d", config.host, config.port)
// Create HTTP server // Create HTTP server with config values
server_config := default_server_config() server_config := Server_Config{
max_body_size = config.max_body_size,
max_headers = config.max_headers,
read_buffer_size = config.read_buffer_size,
enable_keep_alive = config.enable_keep_alive,
max_requests_per_connection = config.max_requests_per_connection,
}
server, server_ok := server_init( server, server_ok := server_init(
context.allocator, context.allocator,
@@ -190,6 +203,15 @@ handle_create_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Req
} }
} }
if gsi_val, found2 := root["GlobalSecondaryIndexes"]; found2 {
if gsi_arr, ok2 := gsi_val.(json.Array); ok2 && len(gsi_arr) > 0 {
if _, has := gsis.?; !has {
make_error_response(response, .ValidationException, "Invalid GlobalSecondaryIndexes definition")
return
}
}
}
// Create the table // Create the table
desc, create_err := dynamodb.create_table(engine, string(table_name), key_schema, attr_defs, gsis) desc, create_err := dynamodb.create_table(engine, string(table_name), key_schema, attr_defs, gsis)
if create_err != .None { if create_err != .None {
@@ -219,6 +241,7 @@ handle_delete_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Req
make_error_response(response, .ValidationException, "Invalid request or missing TableName") make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return return
} }
defer delete(table_name)
err := dynamodb.delete_table(engine, table_name) err := dynamodb.delete_table(engine, table_name)
if err != .None { if err != .None {
@@ -241,6 +264,7 @@ handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
make_error_response(response, .ValidationException, "Invalid request or missing TableName") make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return return
} }
defer delete(table_name)
metadata, err := dynamodb.get_table_metadata(engine, table_name) metadata, err := dynamodb.get_table_metadata(engine, table_name)
if err != .None { if err != .None {
@@ -266,16 +290,22 @@ handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
for ks, i in metadata.key_schema { for ks, i in metadata.key_schema {
if i > 0 do strings.write_string(&builder, ",") if i > 0 do strings.write_string(&builder, ",")
fmt.sbprintf(&builder, `{"AttributeName":"%s","KeyType":"%s"}`, strings.write_string(&builder, `{"AttributeName":"`)
ks.attribute_name, dynamodb.key_type_to_string(ks.key_type)) strings.write_string(&builder, ks.attribute_name)
strings.write_string(&builder, `","KeyType":"`)
strings.write_string(&builder, dynamodb.key_type_to_string(ks.key_type))
strings.write_string(&builder, `"}`)
} }
strings.write_string(&builder, `],"AttributeDefinitions":[`) strings.write_string(&builder, `],"AttributeDefinitions":[`)
for ad, i in metadata.attribute_definitions { for ad, i in metadata.attribute_definitions {
if i > 0 do strings.write_string(&builder, ",") if i > 0 do strings.write_string(&builder, ",")
fmt.sbprintf(&builder, `{"AttributeName":"%s","AttributeType":"%s"}`, strings.write_string(&builder, `{"AttributeName":"`)
ad.attribute_name, dynamodb.scalar_type_to_string(ad.attribute_type)) strings.write_string(&builder, ad.attribute_name)
strings.write_string(&builder, `","AttributeType":"`)
strings.write_string(&builder, dynamodb.scalar_type_to_string(ad.attribute_type))
strings.write_string(&builder, `"}`)
} }
strings.write_string(&builder, `]`) strings.write_string(&builder, `]`)
@@ -290,8 +320,11 @@ handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
strings.write_string(&builder, `","KeySchema":[`) strings.write_string(&builder, `","KeySchema":[`)
for ks, ki in gsi.key_schema { for ks, ki in gsi.key_schema {
if ki > 0 do strings.write_string(&builder, ",") if ki > 0 do strings.write_string(&builder, ",")
fmt.sbprintf(&builder, `{"AttributeName":"%s","KeyType":"%s"}`, strings.write_string(&builder, `{"AttributeName":"`)
ks.attribute_name, dynamodb.key_type_to_string(ks.key_type)) strings.write_string(&builder, ks.attribute_name)
strings.write_string(&builder, `","KeyType":"`)
strings.write_string(&builder, dynamodb.key_type_to_string(ks.key_type))
strings.write_string(&builder, `"}`)
} }
strings.write_string(&builder, `],"Projection":{"ProjectionType":"`) strings.write_string(&builder, `],"Projection":{"ProjectionType":"`)
strings.write_string(&builder, projection_type_to_string(gsi.projection.projection_type)) strings.write_string(&builder, projection_type_to_string(gsi.projection.projection_type))
@@ -340,6 +373,7 @@ handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request
make_error_response(response, .ValidationException, "Invalid request or missing TableName") make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return return
} }
defer delete(table_name)
item, item_ok := dynamodb.parse_item_from_request(request.body) item, item_ok := dynamodb.parse_item_from_request(request.body)
if !item_ok { if !item_ok {
@@ -349,8 +383,9 @@ handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request
defer dynamodb.item_destroy(&item) defer dynamodb.item_destroy(&item)
// ---- ConditionExpression evaluation ---- // ---- ConditionExpression evaluation ----
_, has_condition := dynamodb.parse_condition_expression_string(request.body) cond_str, has_condition := dynamodb.parse_condition_expression_string(request.body)
if has_condition { if has_condition {
defer delete(cond_str)
// Parse shared expression attributes // Parse shared expression attributes
attr_names := dynamodb.parse_expression_attribute_names(request.body) attr_names := dynamodb.parse_expression_attribute_names(request.body)
defer { defer {
@@ -386,12 +421,21 @@ handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request
// If no explicit Key field, extract key from Item // If no explicit Key field, extract key from Item
// (PutItem doesn't have a Key field — the key is in the Item itself) // (PutItem doesn't have a Key field — the key is in the Item itself)
existing_maybe, get_err := dynamodb.get_item(engine, table_name, item) existing_maybe, get_err := dynamodb.get_item(engine, table_name, item)
if get_err != .None && get_err != .Table_Not_Found { #partial switch get_err {
// Table not found is handled by put_item below case .None:
if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key { // Item found or not found — both are fine, condition evaluates against
// whatever was returned (nil item = item doesn't exist).
case .Table_Not_Found:
// Table will be caught and reported properly by put_item below.
case .Missing_Key_Attribute, .Invalid_Key:
handle_storage_error(response, get_err) handle_storage_error(response, get_err)
return return
} case .RocksDB_Error, .Serialization_Error, .Internal_Error:
make_error_response(response, .InternalServerError, "Failed to fetch existing item")
return
case .Validation_Error, .Item_Not_Found:
// Item_Not_Found shouldn't reach here (get_item returns nil, .None),
// but treat defensively.
} }
existing_item = existing_maybe existing_item = existing_maybe
} else { } else {
@@ -452,6 +496,7 @@ handle_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request
make_error_response(response, .ValidationException, "Invalid request or missing TableName") make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return return
} }
defer delete(table_name)
key, key_ok := dynamodb.parse_key_from_request(request.body) key, key_ok := dynamodb.parse_key_from_request(request.body)
if !key_ok { if !key_ok {
@@ -468,9 +513,17 @@ handle_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request
if item_val, has_item := item.?; has_item { if item_val, has_item := item.?; has_item {
defer dynamodb.item_destroy(&item_val) defer dynamodb.item_destroy(&item_val)
item_json := dynamodb.serialize_item(item_val)
resp := fmt.aprintf(`{"Item":%s}`, item_json) // Build response directly to avoid intermediate string allocations
response_set_body(response, transmute([]byte)resp) builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
strings.write_string(&builder, `{"Item":`)
dynamodb.serialize_item_to_builder(&builder, item_val)
strings.write_string(&builder, `}`)
resp_body := strings.clone(strings.to_string(builder))
response_set_body(response, transmute([]byte)resp_body)
} else { } else {
response_set_body(response, transmute([]byte)string("{}")) response_set_body(response, transmute([]byte)string("{}"))
} }
@@ -482,6 +535,7 @@ handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
make_error_response(response, .ValidationException, "Invalid request or missing TableName") make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return return
} }
defer delete(table_name)
key, key_ok := dynamodb.parse_key_from_request(request.body) key, key_ok := dynamodb.parse_key_from_request(request.body)
if !key_ok { if !key_ok {
@@ -521,11 +575,19 @@ handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
// Fetch existing item // Fetch existing item
existing_item, get_err := dynamodb.get_item(engine, table_name, key) existing_item, get_err := dynamodb.get_item(engine, table_name, key)
if get_err != .None && get_err != .Table_Not_Found { #partial switch get_err {
if get_err == .Missing_Key_Attribute || get_err == .Invalid_Key { case .None:
// Item found or not found — condition evaluates against whatever was returned.
case .Table_Not_Found:
// Table will be caught and reported properly by delete_item below.
case .Missing_Key_Attribute, .Invalid_Key:
handle_storage_error(response, get_err) handle_storage_error(response, get_err)
return return
} case .RocksDB_Error, .Serialization_Error, .Internal_Error:
make_error_response(response, .InternalServerError, "Failed to fetch existing item")
return
case .Validation_Error, .Item_Not_Found:
// Defensive — shouldn't reach here normally.
} }
defer { defer {
if ex, has_ex := existing_item.?; has_ex { if ex, has_ex := existing_item.?; has_ex {
@@ -571,6 +633,7 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
make_error_response(response, .ValidationException, "Invalid request or missing TableName") make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return return
} }
defer delete(table_name)
// Parse Key // Parse Key
key_item, key_ok := dynamodb.parse_key_from_request(request.body) key_item, key_ok := dynamodb.parse_key_from_request(request.body)
@@ -586,6 +649,7 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
make_error_response(response, .ValidationException, "Missing or invalid UpdateExpression") make_error_response(response, .ValidationException, "Missing or invalid UpdateExpression")
return return
} }
defer delete(update_expr)
// Parse ExpressionAttributeNames and ExpressionAttributeValues // Parse ExpressionAttributeNames and ExpressionAttributeValues
attr_names := dynamodb.parse_expression_attribute_names(request.body) attr_names := dynamodb.parse_expression_attribute_names(request.body)
@@ -661,6 +725,7 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
// Parse ReturnValues // Parse ReturnValues
return_values := dynamodb.parse_return_values(request.body) return_values := dynamodb.parse_return_values(request.body)
defer delete(return_values)
// Execute update // Execute update
old_item, new_item, err := dynamodb.update_item(engine, table_name, key_item, &plan) old_item, new_item, err := dynamodb.update_item(engine, table_name, key_item, &plan)
@@ -680,51 +745,59 @@ handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
} }
// Build response based on ReturnValues // Build response based on ReturnValues
builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
switch return_values { switch return_values {
case "ALL_NEW": case "ALL_NEW":
if new_val, has := new_item.?; has { if new_val, has := new_item.?; has {
item_json := dynamodb.serialize_item(new_val) strings.write_string(&builder, `{"Attributes":`)
resp := fmt.aprintf(`{"Attributes":%s}`, item_json) dynamodb.serialize_item_to_builder(&builder, new_val)
response_set_body(response, transmute([]byte)resp) strings.write_string(&builder, `}`)
} else { } else {
response_set_body(response, transmute([]byte)string("{}")) strings.write_string(&builder, `{}`)
} }
case "ALL_OLD": case "ALL_OLD":
if old, has := old_item.?; has { if old, has := old_item.?; has {
item_json := dynamodb.serialize_item(old) strings.write_string(&builder, `{"Attributes":`)
resp := fmt.aprintf(`{"Attributes":%s}`, item_json) dynamodb.serialize_item_to_builder(&builder, old)
response_set_body(response, transmute([]byte)resp) strings.write_string(&builder, `}`)
} else { } else {
response_set_body(response, transmute([]byte)string("{}")) strings.write_string(&builder, `{}`)
} }
case "UPDATED_NEW": case "UPDATED_NEW":
if new_val, has := new_item.?; has { if new_val, has := new_item.?; has {
filtered := filter_updated_attributes(new_val, &plan) filtered := filter_updated_attributes(new_val, &plan)
defer dynamodb.item_destroy(&filtered) defer dynamodb.item_destroy(&filtered)
item_json := dynamodb.serialize_item(filtered)
resp := fmt.aprintf(`{"Attributes":%s}`, item_json) strings.write_string(&builder, `{"Attributes":`)
response_set_body(response, transmute([]byte)resp) dynamodb.serialize_item_to_builder(&builder, filtered)
strings.write_string(&builder, `}`)
} else { } else {
response_set_body(response, transmute([]byte)string("{}")) strings.write_string(&builder, `{}`)
} }
case "UPDATED_OLD": case "UPDATED_OLD":
if old, has := old_item.?; has { if old, has := old_item.?; has {
filtered := filter_updated_attributes(old, &plan) filtered := filter_updated_attributes(old, &plan)
defer dynamodb.item_destroy(&filtered) defer dynamodb.item_destroy(&filtered)
item_json := dynamodb.serialize_item(filtered)
resp := fmt.aprintf(`{"Attributes":%s}`, item_json) strings.write_string(&builder, `{"Attributes":`)
response_set_body(response, transmute([]byte)resp) dynamodb.serialize_item_to_builder(&builder, filtered)
strings.write_string(&builder, `}`)
} else { } else {
response_set_body(response, transmute([]byte)string("{}")) strings.write_string(&builder, `{}`)
} }
case: case:
// "NONE" or default // "NONE" or default
response_set_body(response, transmute([]byte)string("{}")) strings.write_string(&builder, `{}`)
} }
resp_body := strings.clone(strings.to_string(builder))
response_set_body(response, transmute([]byte)resp_body)
} }
handle_batch_write_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { handle_batch_write_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
@@ -872,7 +945,7 @@ handle_batch_write_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP
} }
append(&table_requests, dynamodb.Batch_Write_Table_Request{ append(&table_requests, dynamodb.Batch_Write_Table_Request{
table_name = string(table_name), table_name = strings.clone(string(table_name)),
requests = requests[:], requests = requests[:],
}) })
} }
@@ -917,9 +990,13 @@ handle_batch_write_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP
item_json := dynamodb.serialize_item(req.item) item_json := dynamodb.serialize_item(req.item)
switch req.type { switch req.type {
case .Put: case .Put:
fmt.sbprintf(&builder, `{"PutRequest":{"Item":%s}}`, item_json) strings.write_string(&builder, `{"PutRequest":{"Item":`)
strings.write_string(&builder, item_json)
strings.write_string(&builder, "}}")
case .Delete: case .Delete:
fmt.sbprintf(&builder, `{"DeleteRequest":{"Key":%s}}`, item_json) strings.write_string(&builder, `{"DeleteRequest":{"Key":`)
strings.write_string(&builder, item_json)
strings.write_string(&builder, "}}")
} }
} }
@@ -1010,7 +1087,7 @@ handle_batch_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
} }
append(&table_requests, dynamodb.Batch_Get_Table_Request{ append(&table_requests, dynamodb.Batch_Get_Table_Request{
table_name = string(table_name), table_name = strings.clone(string(table_name)),
keys = keys[:], keys = keys[:],
}) })
} }
@@ -1037,7 +1114,9 @@ handle_batch_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
defer dynamodb.batch_get_result_destroy(&result) defer dynamodb.batch_get_result_destroy(&result)
// Build response // Build response
builder := strings.builder_make() builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
strings.write_string(&builder, `{"Responses":{`) strings.write_string(&builder, `{"Responses":{`)
for table_result, ti in result.responses { for table_result, ti in result.responses {
@@ -1050,8 +1129,7 @@ handle_batch_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
if ii > 0 { if ii > 0 {
strings.write_string(&builder, ",") strings.write_string(&builder, ",")
} }
item_json := dynamodb.serialize_item(item) dynamodb.serialize_item_to_builder(&builder, item)
strings.write_string(&builder, item_json)
} }
strings.write_string(&builder, "]") strings.write_string(&builder, "]")
@@ -1063,14 +1141,15 @@ handle_batch_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
if ti > 0 { if ti > 0 {
strings.write_string(&builder, ",") strings.write_string(&builder, ",")
} }
fmt.sbprintf(&builder, `"%s":{"Keys":[`, table_req.table_name) strings.write_string(&builder, `"`)
strings.write_string(&builder, table_req.table_name)
strings.write_string(&builder, `":{"Keys":["`)
for key, ki in table_req.keys { for key, ki in table_req.keys {
if ki > 0 { if ki > 0 {
strings.write_string(&builder, ",") strings.write_string(&builder, ",")
} }
key_json := dynamodb.serialize_item(key) dynamodb.serialize_item_to_builder(&builder, key)
strings.write_string(&builder, key_json)
} }
strings.write_string(&builder, "]}") strings.write_string(&builder, "]}")
@@ -1078,7 +1157,8 @@ handle_batch_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
strings.write_string(&builder, "}}") strings.write_string(&builder, "}}")
resp_body := strings.to_string(builder) // clone the god damn string
resp_body := strings.clone(strings.to_string(builder))
response_set_body(response, transmute([]byte)resp_body) response_set_body(response, transmute([]byte)resp_body)
} }
@@ -1093,9 +1173,15 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
make_error_response(response, .ValidationException, "Invalid request or missing TableName") make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return return
} }
defer delete(table_name)
// Grab index name from request body // Grab index name from request body
index_name := parse_index_name(request.body) index_name := parse_index_name(request.body)
defer {
if idx, has := index_name.?; has {
delete(idx)
}
}
// Fetch table metadata early for ExclusiveStartKey parsing // Fetch table metadata early for ExclusiveStartKey parsing
metadata, meta_err := dynamodb.get_table_metadata(engine, table_name) metadata, meta_err := dynamodb.get_table_metadata(engine, table_name)
@@ -1133,11 +1219,15 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
} }
// Parse ExclusiveStartKey // Parse ExclusiveStartKey
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key( exclusive_start_key, esk_ok, esk_body_err := dynamodb.parse_exclusive_start_key(
request.body, table_name, metadata.key_schema, request.body, table_name, metadata.key_schema,
) )
if !esk_ok { if !esk_ok {
if esk_body_err {
make_error_response(response, .SerializationException, "Request body is not valid JSON")
} else {
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
}
return return
} }
defer { defer {
@@ -1181,15 +1271,30 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
// ---- GSI query path ---- // ---- GSI query path ----
if idx_name, has_idx := index_name.?; has_idx { if idx_name, has_idx := index_name.?; has_idx {
_, gsi_found := dynamodb.find_gsi(&metadata, idx_name) gsi, gsi_found := dynamodb.find_gsi(&metadata, idx_name)
if !gsi_found { if !gsi_found {
make_error_response(response, .ValidationException, make_error_response(response, .ValidationException,
fmt.tprintf("The table does not have the specified index: %s", idx_name)) fmt.tprintf("The table does not have the specified index: %s", idx_name))
return return
} }
esk_gsi, esk_gsi_ok, esk_gsi_body_err := dynamodb.parse_exclusive_start_key_gsi(
request.body, table_name, &metadata, gsi,
)
if !esk_gsi_ok {
if esk_gsi_body_err {
make_error_response(response, .SerializationException, "Request body is not valid JSON")
} else {
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
}
return
}
defer {
if k, ok_gsi := esk_gsi.?; ok_gsi { delete(k) }
}
result, err := dynamodb.gsi_query(engine, table_name, idx_name, result, err := dynamodb.gsi_query(engine, table_name, idx_name,
pk_owned, exclusive_start_key, limit, sk_condition) pk_owned, esk_gsi, limit, sk_condition)
if err != .None { if err != .None {
handle_storage_error(response, err) handle_storage_error(response, err)
return return
@@ -1228,7 +1333,7 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
} }
write_items_response_with_pagination_ex( write_items_response_with_pagination_ex(
response, final_items, result.last_evaluated_key, &metadata, scanned_count, response, final_items, result.last_evaluated_key, &metadata, scanned_count, gsi,
) )
if has_proj && len(projection) > 0 { if has_proj && len(projection) > 0 {
@@ -1300,9 +1405,15 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
make_error_response(response, .ValidationException, "Invalid request or missing TableName") make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return return
} }
defer delete(table_name)
// Grab index name from request body // Grab index name from request body
index_name := parse_index_name(request.body) index_name := parse_index_name(request.body)
defer {
if idx, has := index_name.?; has {
delete(idx)
}
}
metadata, meta_err := dynamodb.get_table_metadata(engine, table_name) metadata, meta_err := dynamodb.get_table_metadata(engine, table_name)
if meta_err != .None { if meta_err != .None {
@@ -1316,11 +1427,15 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
limit = 100 limit = 100
} }
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key( exclusive_start_key, esk_ok, esk_body_err := dynamodb.parse_exclusive_start_key(
request.body, table_name, metadata.key_schema, request.body, table_name, metadata.key_schema,
) )
if !esk_ok { if !esk_ok {
if esk_body_err {
make_error_response(response, .SerializationException, "Request body is not valid JSON")
} else {
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey") make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
}
return return
} }
defer { defer {
@@ -1358,14 +1473,29 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
// ---- GSI scan path ---- // ---- GSI scan path ----
if idx_name, has_idx := index_name.?; has_idx { if idx_name, has_idx := index_name.?; has_idx {
_, gsi_found := dynamodb.find_gsi(&metadata, idx_name) gsi, gsi_found := dynamodb.find_gsi(&metadata, idx_name)
if !gsi_found { if !gsi_found {
make_error_response(response, .ValidationException, make_error_response(response, .ValidationException,
fmt.tprintf("The table does not have the specified index: %s", idx_name)) fmt.tprintf("The table does not have the specified index: %s", idx_name))
return return
} }
result, err := dynamodb.gsi_scan(engine, table_name, idx_name, exclusive_start_key, limit) esk_gsi, esk_gsi_ok, esk_gsi_body_err := dynamodb.parse_exclusive_start_key_gsi(
request.body, table_name, &metadata, gsi,
)
if !esk_gsi_ok {
if esk_gsi_body_err {
make_error_response(response, .SerializationException, "Request body is not valid JSON")
} else {
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
}
return
}
defer {
if k, ok_gsi := esk_gsi.?; ok_gsi { delete(k) }
}
result, err := dynamodb.gsi_scan(engine, table_name, idx_name, esk_gsi, limit)
if err != .None { if err != .None {
handle_storage_error(response, err) handle_storage_error(response, err)
return return
@@ -1404,7 +1534,7 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
} }
write_items_response_with_pagination_ex( write_items_response_with_pagination_ex(
response, final_items, result.last_evaluated_key, &metadata, scanned_count, response, final_items, result.last_evaluated_key, &metadata, scanned_count, gsi,
) )
if has_proj && len(projection) > 0 { if has_proj && len(projection) > 0 {
@@ -1482,6 +1612,7 @@ apply_filter_to_items :: proc(
if !has_filter { if !has_filter {
return items, true return items, true
} }
defer delete(filter_expr)
filter_node, filter_ok := dynamodb.parse_filter_expression(filter_expr, attr_names, attr_values) filter_node, filter_ok := dynamodb.parse_filter_expression(filter_expr, attr_names, attr_values)
if !filter_ok || filter_node == nil { if !filter_ok || filter_node == nil {
@@ -1516,14 +1647,16 @@ write_items_response_with_pagination_ex :: proc(
last_evaluated_key_binary: Maybe([]byte), last_evaluated_key_binary: Maybe([]byte),
metadata: ^dynamodb.Table_Metadata, metadata: ^dynamodb.Table_Metadata,
scanned_count: int, scanned_count: int,
gsi: ^dynamodb.Global_Secondary_Index = nil, // ← NEW parameter
) { ) {
builder := strings.builder_make() builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
strings.write_string(&builder, `{"Items":[`) strings.write_string(&builder, `{"Items":[`)
for item, i in items { for item, i in items {
if i > 0 do strings.write_string(&builder, ",") if i > 0 do strings.write_string(&builder, ",")
item_json := dynamodb.serialize_item(item) dynamodb.serialize_item_to_builder(&builder, item)
strings.write_string(&builder, item_json)
} }
strings.write_string(&builder, `],"Count":`) strings.write_string(&builder, `],"Count":`)
@@ -1532,7 +1665,16 @@ write_items_response_with_pagination_ex :: proc(
fmt.sbprintf(&builder, "%d", scanned_count) fmt.sbprintf(&builder, "%d", scanned_count)
if binary_key, has_last := last_evaluated_key_binary.?; has_last { if binary_key, has_last := last_evaluated_key_binary.?; has_last {
lek_json, lek_ok := dynamodb.serialize_last_evaluated_key(binary_key, metadata) lek_json: string
lek_ok: bool
// Use GSI serializer if we have a GSI, otherwise use base table serializer
if gsi != nil {
lek_json, lek_ok = dynamodb.serialize_last_evaluated_key_gsi(binary_key, metadata, gsi)
} else {
lek_json, lek_ok = dynamodb.serialize_last_evaluated_key(binary_key, metadata)
}
if lek_ok { if lek_ok {
strings.write_string(&builder, `,"LastEvaluatedKey":`) strings.write_string(&builder, `,"LastEvaluatedKey":`)
strings.write_string(&builder, lek_json) strings.write_string(&builder, lek_json)
@@ -1541,7 +1683,7 @@ write_items_response_with_pagination_ex :: proc(
strings.write_string(&builder, "}") strings.write_string(&builder, "}")
resp_body := strings.to_string(builder) resp_body := strings.clone(strings.to_string(builder))
response_set_body(response, transmute([]byte)resp_body) response_set_body(response, transmute([]byte)resp_body)
} }
@@ -1561,13 +1703,15 @@ write_items_response_with_pagination :: proc(
last_evaluated_key_binary: Maybe([]byte), last_evaluated_key_binary: Maybe([]byte),
metadata: ^dynamodb.Table_Metadata, metadata: ^dynamodb.Table_Metadata,
) { ) {
builder := strings.builder_make() builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
strings.write_string(&builder, `{"Items":[`) strings.write_string(&builder, `{"Items":[`)
// Use serialize_item_to_builder directly so we always get the correct response payload
for item, i in items { for item, i in items {
if i > 0 do strings.write_string(&builder, ",") if i > 0 do strings.write_string(&builder, ",")
item_json := dynamodb.serialize_item(item) dynamodb.serialize_item_to_builder(&builder, item)
strings.write_string(&builder, item_json)
} }
strings.write_string(&builder, `],"Count":`) strings.write_string(&builder, `],"Count":`)
@@ -1575,20 +1719,18 @@ write_items_response_with_pagination :: proc(
strings.write_string(&builder, `,"ScannedCount":`) strings.write_string(&builder, `,"ScannedCount":`)
fmt.sbprintf(&builder, "%d", len(items)) fmt.sbprintf(&builder, "%d", len(items))
// Emit LastEvaluatedKey if the storage layer produced one
if binary_key, has_last := last_evaluated_key_binary.?; has_last { if binary_key, has_last := last_evaluated_key_binary.?; has_last {
lek_json, lek_ok := dynamodb.serialize_last_evaluated_key(binary_key, metadata) lek_json, lek_ok := dynamodb.serialize_last_evaluated_key(binary_key, metadata)
if lek_ok { if lek_ok {
strings.write_string(&builder, `,"LastEvaluatedKey":`) strings.write_string(&builder, `,"LastEvaluatedKey":`)
strings.write_string(&builder, lek_json) strings.write_string(&builder, lek_json)
} }
// If decoding fails we still return the items — just without a pagination token.
// The client will assume the scan/query is complete.
} }
strings.write_string(&builder, "}") strings.write_string(&builder, "}")
resp_body := strings.to_string(builder) // We have to Clone the string before passing to response_set_body
resp_body := strings.clone(strings.to_string(builder))
response_set_body(response, transmute([]byte)resp_body) response_set_body(response, transmute([]byte)resp_body)
} }
@@ -1611,6 +1753,8 @@ handle_storage_error :: proc(response: ^HTTP_Response, err: dynamodb.Storage_Err
make_error_response(response, .ValidationException, "One or more required key attributes are missing") make_error_response(response, .ValidationException, "One or more required key attributes are missing")
case .Invalid_Key: case .Invalid_Key:
make_error_response(response, .ValidationException, "Invalid key: type mismatch or malformed key value") make_error_response(response, .ValidationException, "Invalid key: type mismatch or malformed key value")
case .Validation_Error:
make_error_response(response, .ValidationException, "Invalid request: type mismatch or incompatible operand")
case .Serialization_Error: case .Serialization_Error:
make_error_response(response, .InternalServerError, "Internal serialization error") make_error_response(response, .InternalServerError, "Internal serialization error")
case .RocksDB_Error: case .RocksDB_Error:
@@ -1917,14 +2061,20 @@ make_error_response :: proc(response: ^HTTP_Response, err_type: dynamodb.DynamoD
parse_config :: proc() -> Config { parse_config :: proc() -> Config {
config := Config{ config := Config{
// Defaults
host = "0.0.0.0", host = "0.0.0.0",
port = 8002, port = 8002,
data_dir = "./data", data_dir = "./data",
verbose = false, verbose = false,
max_body_size = 100 * 1024 * 1024, // 100 MB
max_headers = 100,
read_buffer_size = 8 * 1024, // 8 KB
enable_keep_alive = true,
max_requests_per_connection = 1000,
} }
// Environment variables // Environment variables (lower priority)
if port_str, env_ok := os.lookup_env("JORMUN_PORT"); env_ok { if port_str, ok := os.lookup_env("JORMUN_PORT"); ok {
if port, parse_ok := strconv.parse_int(port_str); parse_ok { if port, parse_ok := strconv.parse_int(port_str); parse_ok {
config.port = port config.port = port
} }
@@ -1942,11 +2092,107 @@ parse_config :: proc() -> Config {
config.verbose = verbose == "1" config.verbose = verbose == "1"
} }
// TODO: Parse command line arguments if max_body_str, ok := os.lookup_env("JORMUN_MAX_BODY_SIZE"); ok {
if max_body, parse_ok := strconv.parse_int(max_body_str); parse_ok {
config.max_body_size = max_body
}
}
// Command line arguments (highest priority)
args := os.args[1:] // Skip program name
for i := 0; i < len(args); i += 1 {
arg := args[i]
// Helper to get next arg value
get_value :: proc(args: []string, i: ^int) -> (string, bool) {
if i^ + 1 < len(args) {
i^ += 1
return args[i^], true
}
return "", false
}
switch arg {
case "--host", "-h":
if value, ok := get_value(args, &i); ok {
config.host = value
}
case "--port", "-p":
if value, ok := get_value(args, &i); ok {
if port, parse_ok := strconv.parse_int(value); parse_ok {
config.port = port
}
}
case "--data-dir", "-d":
if value, ok := get_value(args, &i); ok {
config.data_dir = value
}
case "--verbose", "-v":
config.verbose = true
case "--max-body-size":
if value, ok := get_value(args, &i); ok {
if size, parse_ok := strconv.parse_int(value); parse_ok {
config.max_body_size = size
}
}
case "--max-headers":
if value, ok := get_value(args, &i); ok {
if count, parse_ok := strconv.parse_int(value); parse_ok {
config.max_headers = count
}
}
case "--no-keep-alive":
config.enable_keep_alive = false
case "--help":
print_help()
os.exit(0)
}
}
return config return config
} }
print_help :: proc() {
help_text := `
JormunDB - DynamoDB-Compatible Database Server
USAGE:
jormundb [OPTIONS]
OPTIONS:
--host, -h <HOST> Server bind address (default: 0.0.0.0)
--port, -p <PORT> Server port (default: 8002)
--data-dir, -d <DIR> Data directory path (default: ./data)
--verbose, -v Enable verbose logging
--max-body-size <BYTES> Maximum request body size in bytes (default: 104857600 = 100MB)
--max-headers <COUNT> Maximum number of headers per request (default: 100)
--no-keep-alive Disable HTTP keep-alive connections
--help Show this help message
ENVIRONMENT VARIABLES:
JORMUN_HOST Same as --host
JORMUN_PORT Same as --port
JORMUN_DATA_DIR Same as --data-dir
JORMUN_VERBOSE Set to "1" to enable verbose mode
JORMUN_MAX_BODY_SIZE Same as --max-body-size
EXAMPLES:
# Start with default settings
jormundb
# Custom port and data directory
jormundb --port 9000 --data-dir /var/lib/jormundb
# Limit body size to 10MB
jormundb --max-body-size 10485760
# Use environment variables
JORMUN_PORT=9000 JORMUN_HOST=127.0.0.1 jormundb
`
fmt.println(help_text)
}
print_banner :: proc(config: Config) { print_banner :: proc(config: Config) {
banner := ` banner := `
╔═══════════════════════════════════════════════╗ ╔═══════════════════════════════════════════════╗

884
open_api_doc.yaml Normal file
View File

@@ -0,0 +1,884 @@
openapi: 3.0.3
info:
title: JormunDB DynamoDB Wire API
version: 0.1.0
description: |
DynamoDB-compatible JSON-over-HTTP API implemented by JormunDB.
Requests are POSTed to a single endpoint (/) and routed by the required `X-Amz-Target` header.
servers:
- url: http://localhost:8002
paths:
/:
post:
summary: DynamoDB JSON API endpoint
description: |
Send DynamoDB JSON protocol requests to this endpoint and set `X-Amz-Target` to the operation name,
e.g. `DynamoDB_20120810.GetItem`. The request and response media type is typically
`application/x-amz-json-1.0`.
parameters:
- $ref: '#/components/parameters/XAmzTarget'
- $ref: '#/components/parameters/XAmzDate'
- $ref: '#/components/parameters/Authorization'
- $ref: '#/components/parameters/XAmzSecurityToken'
- $ref: '#/components/parameters/XAmzContentSha256'
requestBody:
required: true
content:
application/x-amz-json-1.0:
schema:
oneOf:
- $ref: '#/components/schemas/CreateTableRequest'
- $ref: '#/components/schemas/DeleteTableRequest'
- $ref: '#/components/schemas/DescribeTableRequest'
- $ref: '#/components/schemas/ListTablesRequest'
- $ref: '#/components/schemas/PutItemRequest'
- $ref: '#/components/schemas/GetItemRequest'
- $ref: '#/components/schemas/DeleteItemRequest'
- $ref: '#/components/schemas/UpdateItemRequest'
- $ref: '#/components/schemas/QueryRequest'
- $ref: '#/components/schemas/ScanRequest'
- $ref: '#/components/schemas/BatchWriteItemRequest'
- $ref: '#/components/schemas/BatchGetItemRequest'
- $ref: '#/components/schemas/TransactWriteItemsRequest'
- $ref: '#/components/schemas/TransactGetItemsRequest'
examples:
CreateTable:
summary: Create a table with a HASH key
value:
TableName: ExampleTable
KeySchema:
- AttributeName: pk
KeyType: HASH
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
responses:
'200':
description: Successful operation response
content:
application/x-amz-json-1.0:
schema:
oneOf:
- $ref: '#/components/schemas/CreateTableResponse'
- $ref: '#/components/schemas/DeleteTableResponse'
- $ref: '#/components/schemas/DescribeTableResponse'
- $ref: '#/components/schemas/ListTablesResponse'
- $ref: '#/components/schemas/PutItemResponse'
- $ref: '#/components/schemas/GetItemResponseUnion'
- $ref: '#/components/schemas/DeleteItemResponse'
- $ref: '#/components/schemas/UpdateItemResponseUnion'
- $ref: '#/components/schemas/QueryResponse'
- $ref: '#/components/schemas/ScanResponse'
- $ref: '#/components/schemas/BatchWriteItemResponse'
- $ref: '#/components/schemas/BatchGetItemResponse'
- $ref: '#/components/schemas/TransactWriteItemsResponse'
- $ref: '#/components/schemas/TransactGetItemsResponse'
'400':
description: Client error (ValidationException, SerializationException, etc.)
content:
application/x-amz-json-1.0:
schema:
oneOf:
- $ref: '#/components/schemas/DynamoDbError'
- $ref: '#/components/schemas/TransactionCanceledException'
'500':
description: Server error
content:
application/x-amz-json-1.0:
schema:
$ref: '#/components/schemas/DynamoDbError'
components:
parameters:
XAmzTarget:
name: X-Amz-Target
in: header
required: true
description: |
DynamoDB JSON protocol operation selector.
JormunDB recognizes targets with the `DynamoDB_20120810.` prefix.
Note: `UpdateTable` may be recognized but not implemented.
schema:
type: string
enum:
- DynamoDB_20120810.CreateTable
- DynamoDB_20120810.DeleteTable
- DynamoDB_20120810.DescribeTable
- DynamoDB_20120810.ListTables
- DynamoDB_20120810.UpdateTable
- DynamoDB_20120810.PutItem
- DynamoDB_20120810.GetItem
- DynamoDB_20120810.DeleteItem
- DynamoDB_20120810.UpdateItem
- DynamoDB_20120810.Query
- DynamoDB_20120810.Scan
- DynamoDB_20120810.BatchGetItem
- DynamoDB_20120810.BatchWriteItem
- DynamoDB_20120810.TransactGetItems
- DynamoDB_20120810.TransactWriteItems
example: DynamoDB_20120810.GetItem
XAmzDate:
name: X-Amz-Date
in: header
required: false
schema:
type: string
description: Optional SigV4 timestamp header (kept for SDK compatibility).
Authorization:
name: Authorization
in: header
required: false
schema:
type: string
description: Optional SigV4 Authorization header (kept for SDK compatibility).
XAmzSecurityToken:
name: X-Amz-Security-Token
in: header
required: false
schema:
type: string
description: Optional SigV4 session token header (kept for SDK compatibility).
XAmzContentSha256:
name: X-Amz-Content-Sha256
in: header
required: false
schema:
type: string
description: Optional SigV4 payload hash header (kept for SDK compatibility).
schemas:
EmptyObject:
type: object
description: Empty JSON object.
additionalProperties: false
# -------------------------
# AttributeValue & helpers
# -------------------------
AttributeValue:
description: DynamoDB AttributeValue (JSON wire format).
type: object
minProperties: 1
maxProperties: 1
oneOf:
- $ref: '#/components/schemas/AttrS'
- $ref: '#/components/schemas/AttrN'
- $ref: '#/components/schemas/AttrB'
- $ref: '#/components/schemas/AttrBOOL'
- $ref: '#/components/schemas/AttrNULL'
- $ref: '#/components/schemas/AttrSS'
- $ref: '#/components/schemas/AttrNS'
- $ref: '#/components/schemas/AttrBS'
- $ref: '#/components/schemas/AttrL'
- $ref: '#/components/schemas/AttrM'
AttrS:
type: object
additionalProperties: false
required: [S]
properties:
S:
type: string
example: hello
AttrN:
type: object
additionalProperties: false
required: [N]
properties:
N:
type: string
description: Numeric values are encoded as strings in DynamoDB's JSON protocol.
example: "42"
AttrB:
type: object
additionalProperties: false
required: [B]
properties:
B:
type: string
description: Base64-encoded binary value.
example: AAECAwQ=
AttrBOOL:
type: object
additionalProperties: false
required: [BOOL]
properties:
BOOL:
type: boolean
example: true
AttrNULL:
type: object
additionalProperties: false
required: [NULL]
properties:
NULL:
type: boolean
enum: [true]
example: true
AttrSS:
type: object
additionalProperties: false
required: [SS]
properties:
SS:
type: array
items: { type: string }
example: [a, b]
AttrNS:
type: object
additionalProperties: false
required: [NS]
properties:
NS:
type: array
description: Numeric set values are encoded as strings.
items: { type: string }
example: ["1", "2"]
AttrBS:
type: object
additionalProperties: false
required: [BS]
properties:
BS:
type: array
description: Base64-encoded binary set values.
items: { type: string }
example: [AAE=, AgM=]
AttrL:
type: object
additionalProperties: false
required: [L]
properties:
L:
type: array
items:
$ref: '#/components/schemas/AttributeValue'
AttrM:
type: object
additionalProperties: false
required: [M]
properties:
M:
$ref: '#/components/schemas/AttributeMap'
AttributeMap:
type: object
additionalProperties:
$ref: '#/components/schemas/AttributeValue'
example:
pk: { S: "user#1" }
sk: { S: "meta" }
age: { N: "30" }
ExpressionAttributeNames:
type: object
additionalProperties: { type: string }
example:
"#pk": "pk"
ExpressionAttributeValues:
type: object
additionalProperties:
$ref: '#/components/schemas/AttributeValue'
example:
":v": { S: "user#1" }
Key:
allOf:
- $ref: '#/components/schemas/AttributeMap'
description: Primary key map (HASH, optionally RANGE) encoded as an AttributeMap.
ReturnValues:
type: string
description: ReturnValues selector used by UpdateItem.
enum: [NONE, ALL_OLD, UPDATED_OLD, ALL_NEW, UPDATED_NEW]
example: ALL_NEW
# -------------------------
# Table shapes
# -------------------------
ScalarAttributeType:
type: string
enum: [S, N, B]
example: S
AttributeDefinition:
type: object
additionalProperties: false
required: [AttributeName, AttributeType]
properties:
AttributeName: { type: string }
AttributeType: { $ref: '#/components/schemas/ScalarAttributeType' }
KeyType:
type: string
enum: [HASH, RANGE]
example: HASH
KeySchemaElement:
type: object
additionalProperties: false
required: [AttributeName, KeyType]
properties:
AttributeName: { type: string }
KeyType: { $ref: '#/components/schemas/KeyType' }
ProjectionType:
type: string
enum: [KEYS_ONLY, INCLUDE, ALL]
example: ALL
Projection:
type: object
additionalProperties: false
required: [ProjectionType]
properties:
ProjectionType: { $ref: '#/components/schemas/ProjectionType' }
NonKeyAttributes:
type: array
items: { type: string }
GlobalSecondaryIndex:
type: object
additionalProperties: false
required: [IndexName, KeySchema, Projection]
properties:
IndexName: { type: string }
KeySchema:
type: array
items: { $ref: '#/components/schemas/KeySchemaElement' }
minItems: 1
Projection: { $ref: '#/components/schemas/Projection' }
TableStatus:
type: string
enum: [CREATING, UPDATING, DELETING, ACTIVE, ARCHIVING, ARCHIVED]
example: ACTIVE
TableDescription:
type: object
additionalProperties: false
required: [TableName, TableStatus]
properties:
TableName: { type: string }
TableStatus: { $ref: '#/components/schemas/TableStatus' }
CreationDateTime:
type: integer
format: int64
description: Unix epoch seconds.
KeySchema:
type: array
items: { $ref: '#/components/schemas/KeySchemaElement' }
AttributeDefinitions:
type: array
items: { $ref: '#/components/schemas/AttributeDefinition' }
GlobalSecondaryIndexes:
type: array
items:
allOf:
- $ref: '#/components/schemas/GlobalSecondaryIndex'
- type: object
properties:
IndexStatus:
type: string
enum: [ACTIVE]
# -------------------------
# Error shapes
# -------------------------
DynamoDbError:
type: object
additionalProperties: false
required: [__type, message]
properties:
__type:
type: string
description: DynamoDB error type identifier.
example: com.amazonaws.dynamodb.v20120810#ValidationException
message:
type: string
example: Invalid request
TransactionCanceledException:
type: object
additionalProperties: false
required: [__type, message, CancellationReasons]
properties:
__type:
type: string
enum: [com.amazonaws.dynamodb.v20120810#TransactionCanceledException]
message:
type: string
CancellationReasons:
type: array
items:
type: object
additionalProperties: false
required: [Code, Message]
properties:
Code: { type: string, example: ConditionalCheckFailed }
Message: { type: string, example: The conditional request failed }
# -------------------------
# API: CreateTable
# -------------------------
CreateTableRequest:
type: object
additionalProperties: true
required: [TableName, KeySchema, AttributeDefinitions]
properties:
TableName: { type: string }
KeySchema:
type: array
items: { $ref: '#/components/schemas/KeySchemaElement' }
minItems: 1
AttributeDefinitions:
type: array
items: { $ref: '#/components/schemas/AttributeDefinition' }
minItems: 1
GlobalSecondaryIndexes:
type: array
items: { $ref: '#/components/schemas/GlobalSecondaryIndex' }
description: |
CreateTable request. JormunDB focuses on TableName, KeySchema, AttributeDefinitions, and optional GSI definitions.
CreateTableResponse:
type: object
additionalProperties: false
required: [TableDescription]
properties:
TableDescription:
type: object
additionalProperties: false
required: [TableName, TableStatus, CreationDateTime]
properties:
TableName: { type: string }
TableStatus: { $ref: '#/components/schemas/TableStatus' }
CreationDateTime: { type: integer, format: int64 }
# -------------------------
# API: DeleteTable / DescribeTable / ListTables
# -------------------------
DeleteTableRequest:
type: object
additionalProperties: true
required: [TableName]
properties:
TableName: { type: string }
DeleteTableResponse:
type: object
additionalProperties: false
required: [TableDescription]
properties:
TableDescription:
type: object
additionalProperties: false
required: [TableName, TableStatus]
properties:
TableName: { type: string }
TableStatus:
type: string
enum: [DELETING]
DescribeTableRequest:
type: object
additionalProperties: true
required: [TableName]
properties:
TableName: { type: string }
DescribeTableResponse:
type: object
additionalProperties: false
required: [Table]
properties:
Table: { $ref: '#/components/schemas/TableDescription' }
ListTablesRequest:
type: object
additionalProperties: true
description: ListTables request. JormunDB ignores request fields for this operation.
ListTablesResponse:
type: object
additionalProperties: false
required: [TableNames]
properties:
TableNames:
type: array
items: { type: string }
# -------------------------
# API: PutItem / GetItem / DeleteItem
# -------------------------
PutItemRequest:
type: object
additionalProperties: true
required: [TableName, Item]
properties:
TableName: { type: string }
Item: { $ref: '#/components/schemas/AttributeMap' }
ConditionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
PutItemResponse:
$ref: '#/components/schemas/EmptyObject'
GetItemRequest:
type: object
additionalProperties: true
required: [TableName, Key]
properties:
TableName: { type: string }
Key: { $ref: '#/components/schemas/Key' }
ProjectionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
GetItemResponse:
type: object
additionalProperties: false
required: [Item]
properties:
Item: { $ref: '#/components/schemas/AttributeMap' }
GetItemResponseUnion:
oneOf:
- $ref: '#/components/schemas/EmptyObject'
- $ref: '#/components/schemas/GetItemResponse'
DeleteItemRequest:
type: object
additionalProperties: true
required: [TableName, Key]
properties:
TableName: { type: string }
Key: { $ref: '#/components/schemas/Key' }
ConditionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
DeleteItemResponse:
$ref: '#/components/schemas/EmptyObject'
# -------------------------
# API: UpdateItem
# -------------------------
UpdateItemRequest:
type: object
additionalProperties: true
required: [TableName, Key, UpdateExpression]
properties:
TableName: { type: string }
Key: { $ref: '#/components/schemas/Key' }
UpdateExpression: { type: string }
ConditionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
ReturnValues: { $ref: '#/components/schemas/ReturnValues' }
UpdateItemResponse:
type: object
additionalProperties: false
required: [Attributes]
properties:
Attributes: { $ref: '#/components/schemas/AttributeMap' }
UpdateItemResponseUnion:
oneOf:
- $ref: '#/components/schemas/EmptyObject'
- $ref: '#/components/schemas/UpdateItemResponse'
# -------------------------
# API: Query / Scan
# -------------------------
QueryRequest:
type: object
additionalProperties: true
required: [TableName, KeyConditionExpression]
properties:
TableName: { type: string }
IndexName: { type: string }
KeyConditionExpression: { type: string }
FilterExpression: { type: string }
ProjectionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
Limit:
type: integer
format: int32
minimum: 1
description: Maximum items to return (default 100 if omitted/0 in JormunDB).
ExclusiveStartKey: { $ref: '#/components/schemas/Key' }
ScanIndexForward:
type: boolean
description: Sort order for RANGE key queries (if applicable).
ScanRequest:
type: object
additionalProperties: true
required: [TableName]
properties:
TableName: { type: string }
IndexName: { type: string }
FilterExpression: { type: string }
ProjectionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
Limit:
type: integer
format: int32
minimum: 1
description: Maximum items to return (default 100 if omitted/0 in JormunDB).
ExclusiveStartKey: { $ref: '#/components/schemas/Key' }
ItemsPage:
type: object
additionalProperties: false
required: [Items, Count, ScannedCount]
properties:
Items:
type: array
items: { $ref: '#/components/schemas/AttributeMap' }
Count:
type: integer
format: int32
ScannedCount:
type: integer
format: int32
LastEvaluatedKey:
$ref: '#/components/schemas/Key'
QueryResponse:
allOf:
- $ref: '#/components/schemas/ItemsPage'
ScanResponse:
allOf:
- $ref: '#/components/schemas/ItemsPage'
# -------------------------
# API: BatchWriteItem
# -------------------------
WriteRequest:
type: object
additionalProperties: false
properties:
PutRequest:
type: object
additionalProperties: false
required: [Item]
properties:
Item: { $ref: '#/components/schemas/AttributeMap' }
DeleteRequest:
type: object
additionalProperties: false
required: [Key]
properties:
Key: { $ref: '#/components/schemas/Key' }
oneOf:
- required: [PutRequest]
- required: [DeleteRequest]
BatchWriteItemRequest:
type: object
additionalProperties: true
required: [RequestItems]
properties:
RequestItems:
type: object
description: Map of table name to write requests.
additionalProperties:
type: array
items: { $ref: '#/components/schemas/WriteRequest' }
BatchWriteItemResponse:
type: object
additionalProperties: false
required: [UnprocessedItems]
properties:
UnprocessedItems:
type: object
additionalProperties:
type: array
items: { $ref: '#/components/schemas/WriteRequest' }
# -------------------------
# API: BatchGetItem
# -------------------------
KeysAndAttributes:
type: object
additionalProperties: true
required: [Keys]
properties:
Keys:
type: array
items: { $ref: '#/components/schemas/Key' }
ProjectionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
BatchGetItemRequest:
type: object
additionalProperties: true
required: [RequestItems]
properties:
RequestItems:
type: object
additionalProperties:
$ref: '#/components/schemas/KeysAndAttributes'
BatchGetItemResponse:
type: object
additionalProperties: false
required: [Responses, UnprocessedKeys]
properties:
Responses:
type: object
additionalProperties:
type: array
items: { $ref: '#/components/schemas/AttributeMap' }
UnprocessedKeys:
type: object
additionalProperties:
$ref: '#/components/schemas/KeysAndAttributes'
# -------------------------
# API: TransactWriteItems / TransactGetItems
# -------------------------
TransactWriteItemsRequest:
type: object
additionalProperties: true
required: [TransactItems]
properties:
TransactItems:
type: array
minItems: 1
maxItems: 100
items:
$ref: '#/components/schemas/TransactWriteItem'
TransactWriteItem:
type: object
additionalProperties: false
oneOf:
- required: [Put]
- required: [Delete]
- required: [Update]
- required: [ConditionCheck]
properties:
Put:
$ref: '#/components/schemas/TransactPut'
Delete:
$ref: '#/components/schemas/TransactDelete'
Update:
$ref: '#/components/schemas/TransactUpdate'
ConditionCheck:
$ref: '#/components/schemas/TransactConditionCheck'
TransactPut:
type: object
additionalProperties: true
required: [TableName, Item]
properties:
TableName: { type: string }
Item: { $ref: '#/components/schemas/AttributeMap' }
ConditionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
TransactDelete:
type: object
additionalProperties: true
required: [TableName, Key]
properties:
TableName: { type: string }
Key: { $ref: '#/components/schemas/Key' }
ConditionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
TransactUpdate:
type: object
additionalProperties: true
required: [TableName, Key, UpdateExpression]
properties:
TableName: { type: string }
Key: { $ref: '#/components/schemas/Key' }
UpdateExpression: { type: string }
ConditionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
TransactConditionCheck:
type: object
additionalProperties: true
required: [TableName, Key, ConditionExpression]
properties:
TableName: { type: string }
Key: { $ref: '#/components/schemas/Key' }
ConditionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
ExpressionAttributeValues: { $ref: '#/components/schemas/ExpressionAttributeValues' }
TransactWriteItemsResponse:
$ref: '#/components/schemas/EmptyObject'
TransactGetItemsRequest:
type: object
additionalProperties: true
required: [TransactItems]
properties:
TransactItems:
type: array
minItems: 1
maxItems: 100
items:
type: object
additionalProperties: false
required: [Get]
properties:
Get:
$ref: '#/components/schemas/TransactGet'
TransactGet:
type: object
additionalProperties: true
required: [TableName, Key]
properties:
TableName: { type: string }
Key: { $ref: '#/components/schemas/Key' }
ProjectionExpression: { type: string }
ExpressionAttributeNames: { $ref: '#/components/schemas/ExpressionAttributeNames' }
TransactGetItemResult:
oneOf:
- $ref: '#/components/schemas/EmptyObject'
- type: object
additionalProperties: false
required: [Item]
properties:
Item: { $ref: '#/components/schemas/AttributeMap' }
TransactGetItemsResponse:
type: object
additionalProperties: false
required: [Responses]
properties:
Responses:
type: array
items: { $ref: '#/components/schemas/TransactGetItemResult' }

File diff suppressed because it is too large Load Diff

View File

@@ -267,7 +267,7 @@ parse_transact_put_action :: proc(
if !tn_ok { if !tn_ok {
return {}, false return {}, false
} }
action.table_name = string(tn_str) action.table_name = strings.clone(string(tn_str))
// Item // Item
item_val, item_found := obj["Item"] item_val, item_found := obj["Item"]
@@ -301,7 +301,7 @@ parse_transact_key_action :: proc(
if !tn_ok { if !tn_ok {
return {}, false return {}, false
} }
action.table_name = string(tn_str) action.table_name = strings.clone(string(tn_str))
// Key // Key
key_val, key_found := obj["Key"] key_val, key_found := obj["Key"]
@@ -335,7 +335,7 @@ parse_transact_update_action :: proc(
if !tn_ok { if !tn_ok {
return {}, false return {}, false
} }
action.table_name = string(tn_str) action.table_name = strings.clone(string(tn_str))
// Key // Key
key_val, key_found := obj["Key"] key_val, key_found := obj["Key"]
@@ -483,7 +483,9 @@ handle_transact_get_items :: proc(
} }
// Build response // Build response
builder := strings.builder_make() builder := strings.builder_make(context.allocator)
defer strings.builder_destroy(&builder)
strings.write_string(&builder, `{"Responses":[`) strings.write_string(&builder, `{"Responses":[`)
for maybe_item, i in result.items { for maybe_item, i in result.items {
@@ -492,8 +494,9 @@ handle_transact_get_items :: proc(
} }
if item, has_item := maybe_item.?; has_item { if item, has_item := maybe_item.?; has_item {
item_json := dynamodb.serialize_item(item) strings.write_string(&builder, `{"Item":`)
fmt.sbprintf(&builder, `{{"Item":%s}}`, item_json) dynamodb.serialize_item_to_builder(&builder, item)
strings.write_string(&builder, `}`)
} else { } else {
strings.write_string(&builder, "{}") strings.write_string(&builder, "{}")
} }
@@ -501,7 +504,8 @@ handle_transact_get_items :: proc(
strings.write_string(&builder, "]}") strings.write_string(&builder, "]}")
resp_body := strings.to_string(builder) // Clone the string or we gonna have issues again
resp_body := strings.clone(strings.to_string(builder))
response_set_body(response, transmute([]byte)resp_body) response_set_body(response, transmute([]byte)resp_body)
} }
@@ -519,7 +523,7 @@ parse_transact_get_action :: proc(obj: json.Object) -> (dynamodb.Transact_Get_Ac
if !tn_ok { if !tn_ok {
return {}, false return {}, false
} }
action.table_name = string(tn_str) action.table_name = strings.clone(string(tn_str))
// Key // Key
key_val, key_found := obj["Key"] key_val, key_found := obj["Key"]