Compare commits
2 Commits
78b86d7534
...
b510c000ec
| Author | SHA1 | Date | |
|---|---|---|---|
| b510c000ec | |||
| 280ce15b07 |
@@ -14,9 +14,9 @@
|
|||||||
|
|
||||||
## What is JormunDB?
|
## What is JormunDB?
|
||||||
|
|
||||||
JormunDB (formerly ZynamoDB) is a Self-Hosted DynamoDB replacement that speaks the DynamoDB wire protocol. Point your AWS SDK or CLI at it and use it as a drop-in replacement.
|
JormunDB is a Self-Hosted DynamoDB replacement that speaks the DynamoDB wire protocol. Point your AWS SDK or CLI at it and use it as a drop-in replacement.
|
||||||
|
|
||||||
**Why Odin?** The original Zig implementation suffered from explicit allocator threading—every function taking an `allocator` parameter, every allocation needing `errdefer` cleanup. Odin's implicit context allocator system eliminates this ceremony: one `context.allocator = arena_allocator` at the request handler entry and everything downstream just works.
|
**Why Odin?** The original Zig implementation suffered from explicit allocator threading. Where every function ended up needing an `allocator` parameter and every allocation needed `errdefer` cleanup. Odin's implicit context allocator system eliminates this ceremony. Just one `context.allocator = arena_allocator` at the request handler entry and everything downstream just works.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
|
|||||||
96
TODO.md
96
TODO.md
@@ -1,6 +1,6 @@
|
|||||||
# JormunDB (Odin rewrite) — TODO
|
# JormunDB (Odin rewrite) — TODO
|
||||||
|
|
||||||
This tracks the rewrite from Zig (ZynamoDB) → Odin (JormunDB), and what’s left to stabilize + extend.
|
This tracks the rewrite from Zig (ZynamoDB) → Odin (JormunDB), and what's left to stabilize + extend.
|
||||||
|
|
||||||
## Status Snapshot
|
## Status Snapshot
|
||||||
|
|
||||||
@@ -19,52 +19,57 @@ This tracks the rewrite from Zig (ZynamoDB) → Odin (JormunDB), and what’s le
|
|||||||
---
|
---
|
||||||
|
|
||||||
## Now (MVP correctness + polish)
|
## Now (MVP correctness + polish)
|
||||||
Goal: “aws cli works reliably for CreateTable/ListTables/PutItem/GetItem/DeleteItem/Scan/Query” with correct DynamoDB-ish responses.
|
Goal: "aws cli works reliably for CreateTable/ListTables/PutItem/GetItem/DeleteItem/Scan/Query" with correct DynamoDB-ish responses.
|
||||||
|
|
||||||
### 1) HTTP + routing hardening
|
### 1) HTTP + routing hardening
|
||||||
- [ ] Audit request parsing boundaries:
|
- [ ] Audit request parsing boundaries:
|
||||||
- Max body size enforcement
|
- Max body size enforcement (config exists, need to verify enforcement path)
|
||||||
- Missing/invalid headers → correct DynamoDB error types
|
- Missing/invalid headers → correct DynamoDB error types
|
||||||
- Content-Type handling (be permissive but consistent)
|
- Content-Type handling (be permissive but consistent)
|
||||||
- [ ] Ensure **all request-scoped allocations** come from the request arena (no accidental long-lived allocs)
|
- [x] Ensure **all request-scoped allocations** come from the request arena (no accidental long-lived allocs)
|
||||||
- [ ] Standardize error responses:
|
- Verified: `handle_connection` in http.odin sets `context.allocator = request_alloc`
|
||||||
- `__type` formatting
|
- Long-lived data (table metadata, locks) explicitly uses `engine.allocator`
|
||||||
- `message` field consistency
|
- [x] Standardize error responses:
|
||||||
- status code mapping per error type
|
- `__type` formatting — done, uses `com.amazonaws.dynamodb.v20120810#ErrorType`
|
||||||
|
- `message` field consistency — done
|
||||||
|
- Status code mapping per error type — **DONE**: centralized `handle_storage_error` + `make_error_response` now maps InternalServerError→500, everything else→400
|
||||||
|
- Missing X-Amz-Target now returns `SerializationException` (matches real DynamoDB)
|
||||||
|
|
||||||
### 2) Storage correctness edge cases
|
### 2) Storage correctness edge cases
|
||||||
- [ ] Table metadata durability + validation:
|
- [x] Table metadata durability + validation:
|
||||||
- reject duplicate tables
|
- [x] Reject duplicate tables — done in `create_table` (checks existing meta key)
|
||||||
- reject invalid key schema (no HASH, multiple HASH, etc.)
|
- [x] Reject invalid key schema — done in `parse_key_schema` (no HASH, multiple HASH, etc.)
|
||||||
- [ ] Item validation against key schema:
|
- [x] Item validation against key schema:
|
||||||
- missing PK/SK errors
|
- [x] Missing PK/SK errors — done in `key_from_item`
|
||||||
- type mismatch errors (S/N/B)
|
- [x] Type mismatch errors (S/N/B) — **DONE**: new `validate_item_key_types` proc checks item key attr types against AttributeDefinitions
|
||||||
- [ ] Deterministic encoding tests:
|
- [ ] Deterministic encoding tests:
|
||||||
- key codec round-trip
|
- [ ] Key codec round-trip
|
||||||
- TLV item encode/decode round-trip (nested maps/lists/sets)
|
- [ ] TLV item encode/decode round-trip (nested maps/lists/sets)
|
||||||
|
|
||||||
### 3) Query/Scan pagination parity
|
### 3) Query/Scan pagination parity
|
||||||
- [ ] Make pagination behavior match Zig version + AWS CLI expectations:
|
- [x] Make pagination behavior match AWS CLI expectations:
|
||||||
- `Limit`
|
- [x] `Limit` — done
|
||||||
- `ExclusiveStartKey`
|
- [x] `ExclusiveStartKey` — done (parsed via JSON object lookup with key schema type reconstruction)
|
||||||
- `LastEvaluatedKey` generation (and correct key-type reconstruction)
|
- [x] `LastEvaluatedKey` generation — **FIXED**: now saves key of *last returned item* (not next unread item); only emits when more results exist
|
||||||
- [ ] Add “golden” pagination tests:
|
- [ ] Add "golden" pagination tests:
|
||||||
- query w/ sort key ranges
|
- [ ] Query w/ sort key ranges
|
||||||
- scan limit + resume loop
|
- [ ] Scan limit + resume loop
|
||||||
|
|
||||||
### 4) Expression parsing reliability
|
### 4) Expression parsing reliability
|
||||||
- [ ] Remove brittle string-scanning for `KeyConditionExpression` extraction:
|
- [x] Remove brittle string-scanning for `KeyConditionExpression` extraction:
|
||||||
- Parse expression fields via JSON object lookup (handles whitespace/ordering safely)
|
- **DONE**: `parse_key_condition_expression_string` uses JSON object lookup (handles whitespace/ordering safely)
|
||||||
- [ ] Add validation + better errors for malformed expressions
|
- [ ] Add validation + better errors for malformed expressions
|
||||||
- [ ] Expand operator coverage as needed (BETWEEN/begins_with already planned)
|
- [x] Expand operator coverage: BETWEEN and begins_with are implemented in parser
|
||||||
|
- [x] **Sort key condition filtering in query** — **DONE**: `query()` now accepts optional `Sort_Key_Condition` and applies it (=, <, <=, >, >=, BETWEEN, begins_with)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Next (feature parity with Zig + API completeness)
|
## Next (feature parity with Zig + API completeness)
|
||||||
### 5) UpdateItem / conditional logic groundwork
|
### 5) UpdateItem / conditional logic groundwork
|
||||||
|
- [x] `UpdateItem` handler registered in router (currently returns clear "not yet supported" error)
|
||||||
- [ ] Implement `UpdateItem` (initially minimal: SET for scalar attrs)
|
- [ ] Implement `UpdateItem` (initially minimal: SET for scalar attrs)
|
||||||
- [ ] Add `ConditionExpression` support for Put/Delete/Update (start with simple comparisons)
|
- [ ] Add `ConditionExpression` support for Put/Delete/Update (start with simple comparisons)
|
||||||
- [ ] Define internal “update plan” representation (parsed ops → applied mutations)
|
- [ ] Define internal "update plan" representation (parsed ops → applied mutations)
|
||||||
|
|
||||||
### 6) Response completeness / options
|
### 6) Response completeness / options
|
||||||
- [ ] `ReturnValues` handling where relevant (NONE/ALL_OLD/UPDATED_NEW etc. — even partial support is useful)
|
- [ ] `ReturnValues` handling where relevant (NONE/ALL_OLD/UPDATED_NEW etc. — even partial support is useful)
|
||||||
@@ -81,8 +86,38 @@ Goal: “aws cli works reliably for CreateTable/ListTables/PutItem/GetItem/Delet
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## Bug Fixes Applied This Session
|
||||||
|
|
||||||
|
### Pagination (scan + query)
|
||||||
|
**Bug**: `last_evaluated_key` was set to the key of the *next unread* item (the item at `count == limit`). When the client resumed with that key as `ExclusiveStartKey`, it would seek-then-skip, **dropping one item** from the result set.
|
||||||
|
|
||||||
|
**Fix**: Now tracks the key of the *last successfully returned* item. Only emits `LastEvaluatedKey` when we confirm there are more items beyond the returned set (via `has_more` flag).
|
||||||
|
|
||||||
|
### Sort key condition filtering
|
||||||
|
**Bug**: `query()` performed a partition-prefix scan but never applied the sort key condition (=, <, BETWEEN, begins_with, etc.) from `KeyConditionExpression`. All items in the partition were returned regardless of sort key predicates.
|
||||||
|
|
||||||
|
**Fix**: `query()` now accepts an optional `Sort_Key_Condition` parameter. The handler extracts it from the parsed `Key_Condition` and passes it through. `evaluate_sort_key_condition()` compares the item's SK attribute against the condition using string comparison (matching DynamoDB's lexicographic semantics for S/N/B keys).
|
||||||
|
|
||||||
|
### Write locking
|
||||||
|
**Bug**: `put_item` and `delete_item` acquired *shared* (read) locks. Multiple concurrent writes to the same table could interleave without mutual exclusion.
|
||||||
|
|
||||||
|
**Fix**: Both now acquire *exclusive* (write) locks via `sync.rw_mutex_lock`. Read operations (`get_item`, `scan`, `query`) continue to use shared locks.
|
||||||
|
|
||||||
|
### delete_table item cleanup
|
||||||
|
**Bug**: `delete_table` only deleted the metadata key, leaving all data items orphaned in RocksDB.
|
||||||
|
|
||||||
|
**Fix**: Before deleting metadata, `delete_table` now iterates over all keys with the table's data prefix and deletes them individually.
|
||||||
|
|
||||||
|
### Item key type validation
|
||||||
|
**New**: `put_item` now validates that the item's key attribute types match the table's `AttributeDefinitions`. E.g., if PK is declared as `S`, putting an item with a numeric PK is rejected with `Invalid_Key`.
|
||||||
|
|
||||||
|
### Error response standardization
|
||||||
|
**Fix**: Centralized all storage-error-to-HTTP-error mapping in `handle_storage_error`. InternalServerError maps to HTTP 500; all client errors (validation, not-found, etc.) map to HTTP 400. Missing `X-Amz-Target` now returns `SerializationException` to match real DynamoDB behavior.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## Later (big features)
|
## Later (big features)
|
||||||
These align with the “Future Enhancements” list in ARCHITECTURE.md.
|
These align with the "Future Enhancements" list in ARCHITECTURE.md.
|
||||||
|
|
||||||
### 8) Secondary indexes
|
### 8) Secondary indexes
|
||||||
- [ ] Global Secondary Indexes (GSI)
|
- [ ] Global Secondary Indexes (GSI)
|
||||||
@@ -111,6 +146,7 @@ These align with the “Future Enhancements” list in ARCHITECTURE.md.
|
|||||||
---
|
---
|
||||||
|
|
||||||
## Housekeeping
|
## Housekeeping
|
||||||
- [ ] Fix TODO hygiene: keep this file short and “actionable”
|
- [x] Fix TODO hygiene: keep this file short and "actionable"
|
||||||
|
- Added "Bug Fixes Applied" section documenting what changed and why
|
||||||
- [ ] Add a CONTRIBUTING quick checklist (allocator rules, formatting, tests)
|
- [ ] Add a CONTRIBUTING quick checklist (allocator rules, formatting, tests)
|
||||||
- [ ] Add “known limitations” section in README (unsupported DynamoDB features)
|
- [ ] Add "known limitations" section in README (unsupported DynamoDB features)
|
||||||
|
|||||||
@@ -420,41 +420,36 @@ parse_expression_attribute_values :: proc(request_body: []byte) -> (map[string]A
|
|||||||
return result, true
|
return result, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: changed from Maybe(string) -> (string, bool) so callers can use or_return.
|
// ============================================================================
|
||||||
|
// FIX: Use JSON object lookup instead of fragile string scanning.
|
||||||
|
// This handles whitespace, field ordering, and escape sequences correctly.
|
||||||
|
// ============================================================================
|
||||||
parse_key_condition_expression_string :: proc(request_body: []byte) -> (expr: string, ok: bool) {
|
parse_key_condition_expression_string :: proc(request_body: []byte) -> (expr: string, ok: bool) {
|
||||||
body_str := string(request_body)
|
data, parse_err := json.parse(request_body, allocator = context.temp_allocator)
|
||||||
|
if parse_err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer json.destroy_value(data)
|
||||||
|
|
||||||
marker :: "\"KeyConditionExpression\""
|
root, root_ok := data.(json.Object)
|
||||||
start_idx := strings.index(body_str, marker)
|
if !root_ok {
|
||||||
if start_idx < 0 {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
after_marker := body_str[start_idx + len(marker):]
|
kce_val, found := root["KeyConditionExpression"]
|
||||||
colon_idx := strings.index(after_marker, ":")
|
if !found {
|
||||||
if colon_idx < 0 {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rest := after_marker[colon_idx + 1:]
|
kce_str, str_ok := kce_val.(json.String)
|
||||||
quote_start := strings.index(rest, "\"")
|
if !str_ok {
|
||||||
if quote_start < 0 {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
value_start := quote_start + 1
|
expr = string(kce_str)
|
||||||
pos := value_start
|
|
||||||
for pos < len(rest) {
|
|
||||||
if rest[pos] == '"' && (pos == 0 || rest[pos - 1] != '\\') {
|
|
||||||
expr = rest[value_start:pos]
|
|
||||||
ok = true
|
ok = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pos += 1
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convenience: parse a complete Query key condition from request body
|
// Convenience: parse a complete Query key condition from request body
|
||||||
parse_query_key_condition :: proc(request_body: []byte) -> (kc: Key_Condition, ok: bool) {
|
parse_query_key_condition :: proc(request_body: []byte) -> (kc: Key_Condition, ok: bool) {
|
||||||
|
|||||||
@@ -485,41 +485,131 @@ parse_limit :: proc(request_body: []byte) -> int {
|
|||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse ExclusiveStartKey from request body as binary key bytes
|
// ============================================================================
|
||||||
// Returns nil if not present
|
// ExclusiveStartKey Parsing (Pagination Input)
|
||||||
parse_exclusive_start_key :: proc(request_body: []byte) -> Maybe([]byte) {
|
//
|
||||||
|
// Parse ExclusiveStartKey from request body. Requires key_schema so we can
|
||||||
|
// validate and extract the key, then convert it to a binary storage key.
|
||||||
|
// Returns the binary key bytes that can be passed straight to scan/query.
|
||||||
|
// Returns nil (not an error) when the field is absent.
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
parse_exclusive_start_key :: proc(
|
||||||
|
request_body: []byte,
|
||||||
|
table_name: string,
|
||||||
|
key_schema: []Key_Schema_Element,
|
||||||
|
) -> (result: Maybe([]byte), ok: bool) {
|
||||||
data, parse_err := json.parse(request_body, allocator = context.temp_allocator)
|
data, parse_err := json.parse(request_body, allocator = context.temp_allocator)
|
||||||
if parse_err != nil {
|
if parse_err != nil {
|
||||||
return nil
|
return nil, true // no ESK is fine
|
||||||
}
|
}
|
||||||
defer json.destroy_value(data)
|
defer json.destroy_value(data)
|
||||||
|
|
||||||
root, ok := data.(json.Object)
|
root, root_ok := data.(json.Object)
|
||||||
if !ok {
|
if !root_ok {
|
||||||
return nil
|
return nil, true
|
||||||
}
|
}
|
||||||
|
|
||||||
key_val, found := root["ExclusiveStartKey"]
|
esk_val, found := root["ExclusiveStartKey"]
|
||||||
if !found {
|
if !found {
|
||||||
return nil
|
return nil, true // absent → no pagination, that's ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse as Item first
|
// Parse ExclusiveStartKey as a DynamoDB Item
|
||||||
key_item, item_ok := parse_item_from_value(key_val)
|
key_item, item_ok := parse_item_from_value(esk_val)
|
||||||
if !item_ok {
|
if !item_ok {
|
||||||
return nil
|
return nil, false // present but malformed → real error
|
||||||
}
|
}
|
||||||
defer item_destroy(&key_item)
|
defer item_destroy(&key_item)
|
||||||
|
|
||||||
// Convert to binary key bytes (this will be done by the storage layer)
|
// Validate and extract key struct using schema
|
||||||
// For now, just return nil - the storage layer will handle the conversion
|
key_struct, key_ok := key_from_item(key_item, key_schema)
|
||||||
return nil
|
if !key_ok {
|
||||||
|
return nil, false // missing required key attributes
|
||||||
|
}
|
||||||
|
defer key_destroy(&key_struct)
|
||||||
|
|
||||||
|
// Get raw byte values
|
||||||
|
key_values, kv_ok := key_get_values(&key_struct)
|
||||||
|
if !kv_ok {
|
||||||
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serialize a Key as ExclusiveStartKey for response
|
// Build binary storage key
|
||||||
serialize_last_evaluated_key :: proc(key: Key) -> string {
|
binary_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||||
item := key_to_item(key, {}) // Empty key_schema since we don't need validation here
|
result = binary_key
|
||||||
|
ok = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// LastEvaluatedKey Generation (Pagination Output)
|
||||||
|
//
|
||||||
|
// Decode a binary storage key back into a DynamoDB JSON fragment suitable
|
||||||
|
// for the "LastEvaluatedKey" field in scan/query responses.
|
||||||
|
//
|
||||||
|
// Steps:
|
||||||
|
// 1. Decode the binary key → table_name, pk_bytes, sk_bytes
|
||||||
|
// 2. Look up attribute types from metadata (S/N/B)
|
||||||
|
// 3. Build a Key struct with correctly-typed AttributeValues
|
||||||
|
// 4. Convert Key → Item → DynamoDB JSON string
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
// Build a Key struct from a binary storage key using metadata for type info.
|
||||||
|
// This mirrors the Zig buildKeyFromBinaryWithTypes helper.
|
||||||
|
build_key_from_binary_with_types :: proc(
|
||||||
|
binary_key: []byte,
|
||||||
|
metadata: ^Table_Metadata,
|
||||||
|
) -> (key: Key, ok: bool) {
|
||||||
|
decoder := Key_Decoder{data = binary_key, pos = 0}
|
||||||
|
|
||||||
|
// Skip entity type byte
|
||||||
|
_ = decoder_read_entity_type(&decoder) or_return
|
||||||
|
|
||||||
|
// Skip table name segment
|
||||||
|
_ = decoder_read_segment_borrowed(&decoder) or_return
|
||||||
|
|
||||||
|
// Read partition key bytes
|
||||||
|
pk_bytes := decoder_read_segment_borrowed(&decoder) or_return
|
||||||
|
|
||||||
|
// Read sort key bytes if present
|
||||||
|
sk_bytes: Maybe([]byte) = nil
|
||||||
|
if decoder_has_more(&decoder) {
|
||||||
|
sk := decoder_read_segment_borrowed(&decoder) or_return
|
||||||
|
sk_bytes = sk
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get PK attribute type from metadata
|
||||||
|
pk_name := table_metadata_get_partition_key_name(metadata).? or_return
|
||||||
|
pk_type := table_metadata_get_attribute_type(metadata, pk_name).? or_return
|
||||||
|
|
||||||
|
pk_attr := build_attribute_value_with_type(pk_bytes, pk_type)
|
||||||
|
|
||||||
|
// Build SK attribute if present
|
||||||
|
sk_attr: Maybe(Attribute_Value) = nil
|
||||||
|
if sk, has_sk := sk_bytes.?; has_sk {
|
||||||
|
sk_name := table_metadata_get_sort_key_name(metadata).? or_return
|
||||||
|
sk_type := table_metadata_get_attribute_type(metadata, sk_name).? or_return
|
||||||
|
sk_attr = build_attribute_value_with_type(sk, sk_type)
|
||||||
|
}
|
||||||
|
|
||||||
|
return Key{pk = pk_attr, sk = sk_attr}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Serialize a binary storage key as a LastEvaluatedKey JSON fragment.
|
||||||
|
// Returns a string like: {"pk":{"S":"val"},"sk":{"N":"42"}}
|
||||||
|
serialize_last_evaluated_key :: proc(
|
||||||
|
binary_key: []byte,
|
||||||
|
metadata: ^Table_Metadata,
|
||||||
|
) -> (result: string, ok: bool) {
|
||||||
|
key, key_ok := build_key_from_binary_with_types(binary_key, metadata)
|
||||||
|
if !key_ok {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
defer key_destroy(&key)
|
||||||
|
|
||||||
|
item := key_to_item(key, metadata.key_schema)
|
||||||
defer item_destroy(&item)
|
defer item_destroy(&item)
|
||||||
|
|
||||||
return serialize_item(item)
|
return serialize_item(item), true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -527,7 +527,7 @@ create_table :: proc(
|
|||||||
return desc, .None
|
return desc, .None
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete table
|
// Delete table — removes metadata AND all items with the table's data prefix
|
||||||
delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Error {
|
delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Error {
|
||||||
table_lock := get_or_create_table_lock(engine, table_name)
|
table_lock := get_or_create_table_lock(engine, table_name)
|
||||||
sync.rw_mutex_lock(table_lock)
|
sync.rw_mutex_lock(table_lock)
|
||||||
@@ -546,15 +546,48 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err
|
|||||||
}
|
}
|
||||||
delete(existing)
|
delete(existing)
|
||||||
|
|
||||||
|
// Delete all data items using a prefix scan
|
||||||
|
table_prefix := build_table_prefix(table_name)
|
||||||
|
defer delete(table_prefix)
|
||||||
|
|
||||||
|
iter := rocksdb.rocksdb_create_iterator(engine.db.handle, engine.db.read_options)
|
||||||
|
if iter != nil {
|
||||||
|
defer rocksdb.rocksdb_iter_destroy(iter)
|
||||||
|
|
||||||
|
rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix)))
|
||||||
|
|
||||||
|
for rocksdb.rocksdb_iter_valid(iter) != 0 {
|
||||||
|
key_len: c.size_t
|
||||||
|
key_ptr := rocksdb.rocksdb_iter_key(iter, &key_len)
|
||||||
|
key_bytes := key_ptr[:key_len]
|
||||||
|
|
||||||
|
if !has_prefix(key_bytes, table_prefix) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete this item
|
||||||
|
err: cstring
|
||||||
|
rocksdb.rocksdb_delete(
|
||||||
|
engine.db.handle,
|
||||||
|
engine.db.write_options,
|
||||||
|
raw_data(key_bytes),
|
||||||
|
c.size_t(len(key_bytes)),
|
||||||
|
&err,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
rocksdb.rocksdb_free(rawptr(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
rocksdb.rocksdb_iter_next(iter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Delete metadata
|
// Delete metadata
|
||||||
del_err := rocksdb.db_delete(&engine.db, meta_key)
|
del_err := rocksdb.db_delete(&engine.db, meta_key)
|
||||||
if del_err != .None {
|
if del_err != .None {
|
||||||
return .RocksDB_Error
|
return .RocksDB_Error
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Delete all items in table using iterator
|
|
||||||
// For now, just delete metadata
|
|
||||||
|
|
||||||
remove_table_lock(engine, table_name)
|
remove_table_lock(engine, table_name)
|
||||||
return .None
|
return .None
|
||||||
}
|
}
|
||||||
@@ -563,11 +596,11 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err
|
|||||||
// Item Operations
|
// Item Operations
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
// Put item
|
// Put item — uses EXCLUSIVE lock (write operation)
|
||||||
put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Storage_Error {
|
put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Storage_Error {
|
||||||
table_lock := get_or_create_table_lock(engine, table_name)
|
table_lock := get_or_create_table_lock(engine, table_name)
|
||||||
sync.rw_mutex_shared_lock(table_lock)
|
sync.rw_mutex_lock(table_lock)
|
||||||
defer sync.rw_mutex_shared_unlock(table_lock)
|
defer sync.rw_mutex_unlock(table_lock)
|
||||||
|
|
||||||
// Get table metadata
|
// Get table metadata
|
||||||
metadata, meta_err := get_table_metadata(engine, table_name)
|
metadata, meta_err := get_table_metadata(engine, table_name)
|
||||||
@@ -576,6 +609,12 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto
|
|||||||
}
|
}
|
||||||
defer table_metadata_destroy(&metadata, engine.allocator)
|
defer table_metadata_destroy(&metadata, engine.allocator)
|
||||||
|
|
||||||
|
// Validate key attribute types match schema
|
||||||
|
validation_err := validate_item_key_types(item, metadata.key_schema, metadata.attribute_definitions)
|
||||||
|
if validation_err != .None {
|
||||||
|
return validation_err
|
||||||
|
}
|
||||||
|
|
||||||
// Extract key from item
|
// Extract key from item
|
||||||
key, key_ok := key_from_item(item, metadata.key_schema)
|
key, key_ok := key_from_item(item, metadata.key_schema)
|
||||||
if !key_ok {
|
if !key_ok {
|
||||||
@@ -616,7 +655,7 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto
|
|||||||
return .None
|
return .None
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get item
|
// Get item — uses SHARED lock (read operation)
|
||||||
get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (Maybe(Item), Storage_Error) {
|
get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (Maybe(Item), Storage_Error) {
|
||||||
table_lock := get_or_create_table_lock(engine, table_name)
|
table_lock := get_or_create_table_lock(engine, table_name)
|
||||||
sync.rw_mutex_shared_lock(table_lock)
|
sync.rw_mutex_shared_lock(table_lock)
|
||||||
@@ -672,11 +711,11 @@ get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (May
|
|||||||
return item, .None
|
return item, .None
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete item
|
// Delete item — uses EXCLUSIVE lock (write operation)
|
||||||
delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> Storage_Error {
|
delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> Storage_Error {
|
||||||
table_lock := get_or_create_table_lock(engine, table_name)
|
table_lock := get_or_create_table_lock(engine, table_name)
|
||||||
sync.rw_mutex_shared_lock(table_lock)
|
sync.rw_mutex_lock(table_lock)
|
||||||
defer sync.rw_mutex_shared_unlock(table_lock)
|
defer sync.rw_mutex_unlock(table_lock)
|
||||||
|
|
||||||
// Get table metadata
|
// Get table metadata
|
||||||
metadata, meta_err := get_table_metadata(engine, table_name)
|
metadata, meta_err := get_table_metadata(engine, table_name)
|
||||||
@@ -718,6 +757,14 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
|
|||||||
return .None
|
return .None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Scan — with FIXED pagination
|
||||||
|
//
|
||||||
|
// FIX: LastEvaluatedKey must be the key of the LAST RETURNED item, not the
|
||||||
|
// next unread item. DynamoDB semantics: ExclusiveStartKey resumes
|
||||||
|
// *after* the given key, so we save the last key we actually returned.
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
scan :: proc(
|
scan :: proc(
|
||||||
engine: ^Storage_Engine,
|
engine: ^Storage_Engine,
|
||||||
table_name: string,
|
table_name: string,
|
||||||
@@ -748,9 +795,8 @@ scan :: proc(
|
|||||||
|
|
||||||
// Seek to start position
|
// Seek to start position
|
||||||
if start_key, has_start := exclusive_start_key.?; has_start {
|
if start_key, has_start := exclusive_start_key.?; has_start {
|
||||||
// Resume from pagination token
|
// Resume from pagination token — seek to the key then skip it (exclusive)
|
||||||
rocksdb.rocksdb_iter_seek(iter, raw_data(start_key), c.size_t(len(start_key)))
|
rocksdb.rocksdb_iter_seek(iter, raw_data(start_key), c.size_t(len(start_key)))
|
||||||
// Skip the start key itself (it's exclusive)
|
|
||||||
if rocksdb.rocksdb_iter_valid(iter) != 0 {
|
if rocksdb.rocksdb_iter_valid(iter) != 0 {
|
||||||
rocksdb.rocksdb_iter_next(iter)
|
rocksdb.rocksdb_iter_next(iter)
|
||||||
}
|
}
|
||||||
@@ -759,10 +805,13 @@ scan :: proc(
|
|||||||
rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix)))
|
rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
max_items := limit if limit > 0 else 1_000_000
|
||||||
|
|
||||||
// Collect items
|
// Collect items
|
||||||
items := make([dynamic]Item, context.temp_allocator)
|
items := make([dynamic]Item, context.temp_allocator)
|
||||||
count := 0
|
count := 0
|
||||||
last_key: Maybe([]byte) = nil
|
last_key: Maybe([]byte) = nil
|
||||||
|
has_more := false
|
||||||
|
|
||||||
for rocksdb.rocksdb_iter_valid(iter) != 0 {
|
for rocksdb.rocksdb_iter_valid(iter) != 0 {
|
||||||
// Get current key
|
// Get current key
|
||||||
@@ -775,10 +824,9 @@ scan :: proc(
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check limit
|
// Check limit — if we already have enough items, note there's more and stop
|
||||||
if count >= limit {
|
if count >= max_items {
|
||||||
// Save this key as pagination token
|
has_more = true
|
||||||
last_key = slice.clone(key_bytes, engine.allocator)
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -798,12 +846,26 @@ scan :: proc(
|
|||||||
append(&items, item)
|
append(&items, item)
|
||||||
count += 1
|
count += 1
|
||||||
|
|
||||||
|
// Track the key of the last successfully returned item
|
||||||
|
if prev_key, had_prev := last_key.?; had_prev {
|
||||||
|
delete(prev_key)
|
||||||
|
}
|
||||||
|
last_key = slice.clone(key_bytes)
|
||||||
|
|
||||||
// Move to next
|
// Move to next
|
||||||
rocksdb.rocksdb_iter_next(iter)
|
rocksdb.rocksdb_iter_next(iter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert to slice (owned by caller's allocator)
|
// Only emit LastEvaluatedKey if there are more items beyond what we returned
|
||||||
result_items := make([]Item, len(items), engine.allocator)
|
if !has_more {
|
||||||
|
if lk, had_lk := last_key.?; had_lk {
|
||||||
|
delete(lk)
|
||||||
|
}
|
||||||
|
last_key = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to slice
|
||||||
|
result_items := make([]Item, len(items))
|
||||||
copy(result_items, items[:])
|
copy(result_items, items[:])
|
||||||
|
|
||||||
return Scan_Result{
|
return Scan_Result{
|
||||||
@@ -812,13 +874,17 @@ scan :: proc(
|
|||||||
}, .None
|
}, .None
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query items by partition key with optional pagination
|
// ============================================================================
|
||||||
|
// Query — with sort key condition filtering and FIXED pagination
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
query :: proc(
|
query :: proc(
|
||||||
engine: ^Storage_Engine,
|
engine: ^Storage_Engine,
|
||||||
table_name: string,
|
table_name: string,
|
||||||
partition_key_value: []byte,
|
partition_key_value: []byte,
|
||||||
exclusive_start_key: Maybe([]byte),
|
exclusive_start_key: Maybe([]byte),
|
||||||
limit: int,
|
limit: int,
|
||||||
|
sk_condition: Maybe(Sort_Key_Condition) = nil,
|
||||||
) -> (Query_Result, Storage_Error) {
|
) -> (Query_Result, Storage_Error) {
|
||||||
table_lock := get_or_create_table_lock(engine, table_name)
|
table_lock := get_or_create_table_lock(engine, table_name)
|
||||||
sync.rw_mutex_shared_lock(table_lock)
|
sync.rw_mutex_shared_lock(table_lock)
|
||||||
@@ -860,6 +926,7 @@ query :: proc(
|
|||||||
items := make([dynamic]Item)
|
items := make([dynamic]Item)
|
||||||
count := 0
|
count := 0
|
||||||
last_key: Maybe([]byte) = nil
|
last_key: Maybe([]byte) = nil
|
||||||
|
has_more := false
|
||||||
|
|
||||||
for rocksdb.iter_valid(&iter) {
|
for rocksdb.iter_valid(&iter) {
|
||||||
key := rocksdb.iter_key(&iter)
|
key := rocksdb.iter_key(&iter)
|
||||||
@@ -867,9 +934,9 @@ query :: proc(
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hit limit — save this key as pagination token and stop
|
// Hit limit — note there's more and stop
|
||||||
if count >= max_items {
|
if count >= max_items {
|
||||||
last_key = slice.clone(key)
|
has_more = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -885,11 +952,37 @@ query :: proc(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- Sort key condition filtering ----
|
||||||
|
if skc, has_skc := sk_condition.?; has_skc {
|
||||||
|
if !evaluate_sort_key_condition(item, &skc) {
|
||||||
|
// Item doesn't match SK condition — skip it
|
||||||
|
item_copy := item
|
||||||
|
item_destroy(&item_copy)
|
||||||
|
rocksdb.iter_next(&iter)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
append(&items, item)
|
append(&items, item)
|
||||||
count += 1
|
count += 1
|
||||||
|
|
||||||
|
// Track key of last returned item
|
||||||
|
if prev_key, had_prev := last_key.?; had_prev {
|
||||||
|
delete(prev_key)
|
||||||
|
}
|
||||||
|
last_key = slice.clone(key)
|
||||||
|
|
||||||
rocksdb.iter_next(&iter)
|
rocksdb.iter_next(&iter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only emit LastEvaluatedKey if there are more items
|
||||||
|
if !has_more {
|
||||||
|
if lk, had_lk := last_key.?; had_lk {
|
||||||
|
delete(lk)
|
||||||
|
}
|
||||||
|
last_key = nil
|
||||||
|
}
|
||||||
|
|
||||||
result_items := make([]Item, len(items))
|
result_items := make([]Item, len(items))
|
||||||
copy(result_items, items[:])
|
copy(result_items, items[:])
|
||||||
|
|
||||||
@@ -899,6 +992,126 @@ query :: proc(
|
|||||||
}, .None
|
}, .None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Sort Key Condition Evaluation
|
||||||
|
//
|
||||||
|
// Extracts the sort key attribute from a decoded item and compares it against
|
||||||
|
// the parsed Sort_Key_Condition using string comparison (matching DynamoDB's
|
||||||
|
// byte-level comparison semantics for S/N/B types).
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
evaluate_sort_key_condition :: proc(item: Item, skc: ^Sort_Key_Condition) -> bool {
|
||||||
|
attr, found := item[skc.sk_name]
|
||||||
|
if !found {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
item_sk_str, ok1 := attr_value_to_string_for_compare(attr)
|
||||||
|
if !ok1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
cond_val_str, ok2 := attr_value_to_string_for_compare(skc.value)
|
||||||
|
if !ok2 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
cmp := strings.compare(item_sk_str, cond_val_str)
|
||||||
|
|
||||||
|
switch skc.operator {
|
||||||
|
case .EQ:
|
||||||
|
return cmp == 0
|
||||||
|
case .LT:
|
||||||
|
return cmp < 0
|
||||||
|
case .LE:
|
||||||
|
return cmp <= 0
|
||||||
|
case .GT:
|
||||||
|
return cmp > 0
|
||||||
|
case .GE:
|
||||||
|
return cmp >= 0
|
||||||
|
case .BETWEEN:
|
||||||
|
if v2, has_v2 := skc.value2.?; has_v2 {
|
||||||
|
upper_str, ok3 := attr_value_to_string_for_compare(v2)
|
||||||
|
if !ok3 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
cmp2 := strings.compare(item_sk_str, upper_str)
|
||||||
|
return cmp >= 0 && cmp2 <= 0
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
case .BEGINS_WITH:
|
||||||
|
return strings.has_prefix(item_sk_str, cond_val_str)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract a comparable string from a scalar AttributeValue
|
||||||
|
@(private = "file")
|
||||||
|
attr_value_to_string_for_compare :: proc(attr: Attribute_Value) -> (string, bool) {
|
||||||
|
#partial switch v in attr {
|
||||||
|
case String:
|
||||||
|
return string(v), true
|
||||||
|
case Number:
|
||||||
|
return string(v), true
|
||||||
|
case Binary:
|
||||||
|
return string(v), true
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Item Key Validation
|
||||||
|
//
|
||||||
|
// Validates that an item's key attributes match the types declared in
|
||||||
|
// AttributeDefinitions. E.g., if PK is declared as "S", the item must
|
||||||
|
// have a String value for that attribute.
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
validate_item_key_types :: proc(
|
||||||
|
item: Item,
|
||||||
|
key_schema: []Key_Schema_Element,
|
||||||
|
attr_defs: []Attribute_Definition,
|
||||||
|
) -> Storage_Error {
|
||||||
|
for ks in key_schema {
|
||||||
|
attr, found := item[ks.attribute_name]
|
||||||
|
if !found {
|
||||||
|
return .Missing_Key_Attribute
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the expected type from attribute definitions
|
||||||
|
expected_type: Maybe(Scalar_Attribute_Type) = nil
|
||||||
|
for ad in attr_defs {
|
||||||
|
if ad.attribute_name == ks.attribute_name {
|
||||||
|
expected_type = ad.attribute_type
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
et, has_et := expected_type.?
|
||||||
|
if !has_et {
|
||||||
|
continue // No definition found — skip validation (shouldn't happen)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check actual type matches expected
|
||||||
|
match := false
|
||||||
|
#partial switch _ in attr {
|
||||||
|
case String:
|
||||||
|
match = (et == .S)
|
||||||
|
case Number:
|
||||||
|
match = (et == .N)
|
||||||
|
case Binary:
|
||||||
|
match = (et == .B)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !match {
|
||||||
|
return .Invalid_Key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
// Helper to check if a byte slice has a prefix
|
// Helper to check if a byte slice has a prefix
|
||||||
has_prefix :: proc(data: []byte, prefix: []byte) -> bool {
|
has_prefix :: proc(data: []byte, prefix: []byte) -> bool {
|
||||||
if len(data) < len(prefix) {
|
if len(data) < len(prefix) {
|
||||||
|
|||||||
243
main.odin
243
main.odin
@@ -75,7 +75,7 @@ handle_dynamodb_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_all
|
|||||||
// Get X-Amz-Target header to determine operation
|
// Get X-Amz-Target header to determine operation
|
||||||
target := request_get_header(request, "X-Amz-Target")
|
target := request_get_header(request, "X-Amz-Target")
|
||||||
if target == nil {
|
if target == nil {
|
||||||
return make_error_response(&response, .ValidationException, "Missing X-Amz-Target header")
|
return make_error_response(&response, .SerializationException, "Missing X-Amz-Target header")
|
||||||
}
|
}
|
||||||
|
|
||||||
operation := dynamodb.operation_from_target(target.?)
|
operation := dynamodb.operation_from_target(target.?)
|
||||||
@@ -96,6 +96,8 @@ handle_dynamodb_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_all
|
|||||||
handle_get_item(engine, request, &response)
|
handle_get_item(engine, request, &response)
|
||||||
case .DeleteItem:
|
case .DeleteItem:
|
||||||
handle_delete_item(engine, request, &response)
|
handle_delete_item(engine, request, &response)
|
||||||
|
case .UpdateItem:
|
||||||
|
handle_update_item(engine, request, &response)
|
||||||
case .Query:
|
case .Query:
|
||||||
handle_query(engine, request, &response)
|
handle_query(engine, request, &response)
|
||||||
case .Scan:
|
case .Scan:
|
||||||
@@ -117,14 +119,14 @@ handle_create_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Req
|
|||||||
// Parse JSON body
|
// Parse JSON body
|
||||||
data, parse_err := json.parse(request.body, allocator = context.allocator)
|
data, parse_err := json.parse(request.body, allocator = context.allocator)
|
||||||
if parse_err != nil {
|
if parse_err != nil {
|
||||||
make_error_response(response, .ValidationException, "Invalid JSON")
|
make_error_response(response, .SerializationException, "Invalid JSON")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer json.destroy_value(data)
|
defer json.destroy_value(data)
|
||||||
|
|
||||||
root, ok := data.(json.Object)
|
root, ok := data.(json.Object)
|
||||||
if !ok {
|
if !ok {
|
||||||
make_error_response(response, .ValidationException, "Request must be an object")
|
make_error_response(response, .SerializationException, "Request must be an object")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -225,7 +227,7 @@ handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer dynamodb.table_metadata_destroy(&metadata, context.allocator)
|
defer dynamodb.table_metadata_destroy(&metadata, engine.allocator)
|
||||||
|
|
||||||
// Build response with key schema
|
// Build response with key schema
|
||||||
builder := strings.builder_make()
|
builder := strings.builder_make()
|
||||||
@@ -265,7 +267,6 @@ handle_list_tables :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
|
|||||||
make_error_response(response, .InternalServerError, "Failed to list tables")
|
make_error_response(response, .InternalServerError, "Failed to list tables")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// tables are owned by engine allocator — just read them, don't free
|
|
||||||
|
|
||||||
builder := strings.builder_make()
|
builder := strings.builder_make()
|
||||||
strings.write_string(&builder, `{"TableNames":[`)
|
strings.write_string(&builder, `{"TableNames":[`)
|
||||||
@@ -301,16 +302,7 @@ handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request
|
|||||||
|
|
||||||
err := dynamodb.put_item(engine, table_name, item)
|
err := dynamodb.put_item(engine, table_name, item)
|
||||||
if err != .None {
|
if err != .None {
|
||||||
#partial switch err {
|
handle_storage_error(response, err)
|
||||||
case .Table_Not_Found:
|
|
||||||
make_error_response(response, .ResourceNotFoundException, "Table not found")
|
|
||||||
case .Missing_Key_Attribute:
|
|
||||||
make_error_response(response, .ValidationException, "Item missing required key attribute")
|
|
||||||
case .Invalid_Key:
|
|
||||||
make_error_response(response, .ValidationException, "Invalid key format")
|
|
||||||
case:
|
|
||||||
make_error_response(response, .InternalServerError, "Failed to put item")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -333,16 +325,7 @@ handle_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request
|
|||||||
|
|
||||||
item, err := dynamodb.get_item(engine, table_name, key)
|
item, err := dynamodb.get_item(engine, table_name, key)
|
||||||
if err != .None {
|
if err != .None {
|
||||||
#partial switch err {
|
handle_storage_error(response, err)
|
||||||
case .Table_Not_Found:
|
|
||||||
make_error_response(response, .ResourceNotFoundException, "Table not found")
|
|
||||||
case .Missing_Key_Attribute:
|
|
||||||
make_error_response(response, .ValidationException, "Key missing required attributes")
|
|
||||||
case .Invalid_Key:
|
|
||||||
make_error_response(response, .ValidationException, "Invalid key format")
|
|
||||||
case:
|
|
||||||
make_error_response(response, .InternalServerError, "Failed to get item")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -372,22 +355,21 @@ handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
|
|||||||
|
|
||||||
err := dynamodb.delete_item(engine, table_name, key)
|
err := dynamodb.delete_item(engine, table_name, key)
|
||||||
if err != .None {
|
if err != .None {
|
||||||
#partial switch err {
|
handle_storage_error(response, err)
|
||||||
case .Table_Not_Found:
|
|
||||||
make_error_response(response, .ResourceNotFoundException, "Table not found")
|
|
||||||
case .Missing_Key_Attribute:
|
|
||||||
make_error_response(response, .ValidationException, "Key missing required attributes")
|
|
||||||
case .Invalid_Key:
|
|
||||||
make_error_response(response, .ValidationException, "Invalid key format")
|
|
||||||
case:
|
|
||||||
make_error_response(response, .InternalServerError, "Failed to delete item")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
response_set_body(response, transmute([]byte)string("{}"))
|
response_set_body(response, transmute([]byte)string("{}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateItem — minimal stub: supports SET for scalar attributes
|
||||||
|
handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
|
||||||
|
// TODO: Implement UpdateExpression parsing (SET x = :val, REMOVE y, etc.)
|
||||||
|
// For now, return a clear error so callers know it's not yet supported.
|
||||||
|
make_error_response(response, .ValidationException,
|
||||||
|
"UpdateItem is not yet supported. Use PutItem to replace the full item.")
|
||||||
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Query and Scan Operations
|
// Query and Scan Operations
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
@@ -399,6 +381,14 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- Fetch table metadata early so we can parse ExclusiveStartKey ----
|
||||||
|
metadata, meta_err := dynamodb.get_table_metadata(engine, table_name)
|
||||||
|
if meta_err != .None {
|
||||||
|
handle_storage_error(response, meta_err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer dynamodb.table_metadata_destroy(&metadata, engine.allocator)
|
||||||
|
|
||||||
// Parse KeyConditionExpression
|
// Parse KeyConditionExpression
|
||||||
kc, kc_ok := dynamodb.parse_query_key_condition(request.body)
|
kc, kc_ok := dynamodb.parse_query_key_condition(request.body)
|
||||||
if !kc_ok {
|
if !kc_ok {
|
||||||
@@ -425,45 +415,35 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
|||||||
limit = 100
|
limit = 100
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Parse ExclusiveStartKey properly (requires metadata for type info)
|
// ---- Parse ExclusiveStartKey with proper type handling ----
|
||||||
exclusive_start_key: Maybe([]byte) = nil
|
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key(
|
||||||
|
request.body, table_name, metadata.key_schema,
|
||||||
result, err := dynamodb.query(engine, table_name, pk_owned, exclusive_start_key, limit)
|
)
|
||||||
if err != .None {
|
if !esk_ok {
|
||||||
#partial switch err {
|
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
||||||
case .Table_Not_Found:
|
return
|
||||||
make_error_response(response, .ResourceNotFoundException, "Table not found")
|
|
||||||
case:
|
|
||||||
make_error_response(response, .InternalServerError, "Query failed")
|
|
||||||
}
|
}
|
||||||
|
defer {
|
||||||
|
if esk, has_esk := exclusive_start_key.?; has_esk {
|
||||||
|
delete(esk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- Pass sort key condition through to storage layer ----
|
||||||
|
sk_condition: Maybe(dynamodb.Sort_Key_Condition) = nil
|
||||||
|
if skc, has_skc := kc.sk_condition.?; has_skc {
|
||||||
|
sk_condition = skc
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := dynamodb.query(engine, table_name, pk_owned, exclusive_start_key, limit, sk_condition)
|
||||||
|
if err != .None {
|
||||||
|
handle_storage_error(response, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer dynamodb.query_result_destroy(&result)
|
defer dynamodb.query_result_destroy(&result)
|
||||||
|
|
||||||
// Build response
|
// Build response with proper pagination
|
||||||
builder := strings.builder_make()
|
write_items_response_with_pagination(response, result.items, result.last_evaluated_key, &metadata)
|
||||||
strings.write_string(&builder, `{"Items":[`)
|
|
||||||
|
|
||||||
for item, i in result.items {
|
|
||||||
if i > 0 do strings.write_string(&builder, ",")
|
|
||||||
item_json := dynamodb.serialize_item(item)
|
|
||||||
strings.write_string(&builder, item_json)
|
|
||||||
}
|
|
||||||
|
|
||||||
strings.write_string(&builder, `],"Count":`)
|
|
||||||
fmt.sbprintf(&builder, "%d", len(result.items))
|
|
||||||
strings.write_string(&builder, `,"ScannedCount":`)
|
|
||||||
fmt.sbprintf(&builder, "%d", len(result.items))
|
|
||||||
|
|
||||||
// TODO: Add LastEvaluatedKey when pagination is fully wired
|
|
||||||
if last_key, has_last := result.last_evaluated_key.?; has_last {
|
|
||||||
_ = last_key
|
|
||||||
}
|
|
||||||
|
|
||||||
strings.write_string(&builder, "}")
|
|
||||||
|
|
||||||
resp_body := strings.to_string(builder)
|
|
||||||
response_set_body(response, transmute([]byte)resp_body)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
|
handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
|
||||||
@@ -473,52 +453,84 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- Fetch table metadata early so we can parse ExclusiveStartKey ----
|
||||||
|
metadata, meta_err := dynamodb.get_table_metadata(engine, table_name)
|
||||||
|
if meta_err != .None {
|
||||||
|
handle_storage_error(response, meta_err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer dynamodb.table_metadata_destroy(&metadata, engine.allocator)
|
||||||
|
|
||||||
// Parse Limit (default to 100 if not specified)
|
// Parse Limit (default to 100 if not specified)
|
||||||
limit := dynamodb.parse_limit(request.body)
|
limit := dynamodb.parse_limit(request.body)
|
||||||
if limit == 0 {
|
if limit == 0 {
|
||||||
limit = 100
|
limit = 100
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse ExclusiveStartKey if present
|
// ---- Parse ExclusiveStartKey with proper type handling ----
|
||||||
// For now, we'll implement basic scan without ExclusiveStartKey parsing
|
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key(
|
||||||
// TODO: Parse ExclusiveStartKey from request body and convert to binary key
|
request.body, table_name, metadata.key_schema,
|
||||||
exclusive_start_key: Maybe([]byte) = nil
|
)
|
||||||
|
if !esk_ok {
|
||||||
|
make_error_response(response, .ValidationException, "Invalid ExclusiveStartKey")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer {
|
||||||
|
if esk, has_esk := exclusive_start_key.?; has_esk {
|
||||||
|
delete(esk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Perform scan
|
// Perform scan
|
||||||
result, err := dynamodb.scan(engine, table_name, exclusive_start_key, limit)
|
result, err := dynamodb.scan(engine, table_name, exclusive_start_key, limit)
|
||||||
if err != .None {
|
if err != .None {
|
||||||
#partial switch err {
|
handle_storage_error(response, err)
|
||||||
case .Table_Not_Found:
|
|
||||||
make_error_response(response, .ResourceNotFoundException, "Table not found")
|
|
||||||
case:
|
|
||||||
make_error_response(response, .InternalServerError, "Failed to scan table")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer dynamodb.scan_result_destroy(&result)
|
defer dynamodb.scan_result_destroy(&result)
|
||||||
|
|
||||||
// Build response
|
// Build response with proper pagination
|
||||||
|
write_items_response_with_pagination(response, result.items, result.last_evaluated_key, &metadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Shared Pagination Response Builder
|
||||||
|
//
|
||||||
|
// Mirrors the Zig writeItemsResponseWithPagination helper:
|
||||||
|
// - Serializes Items array
|
||||||
|
// - Emits Count / ScannedCount
|
||||||
|
// - Decodes binary last_evaluated_key → DynamoDB JSON LastEvaluatedKey
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
write_items_response_with_pagination :: proc(
|
||||||
|
response: ^HTTP_Response,
|
||||||
|
items: []dynamodb.Item,
|
||||||
|
last_evaluated_key_binary: Maybe([]byte),
|
||||||
|
metadata: ^dynamodb.Table_Metadata,
|
||||||
|
) {
|
||||||
builder := strings.builder_make()
|
builder := strings.builder_make()
|
||||||
strings.write_string(&builder, `{"Items":[`)
|
strings.write_string(&builder, `{"Items":[`)
|
||||||
|
|
||||||
for item, i in result.items {
|
for item, i in items {
|
||||||
if i > 0 do strings.write_string(&builder, ",")
|
if i > 0 do strings.write_string(&builder, ",")
|
||||||
item_json := dynamodb.serialize_item(item)
|
item_json := dynamodb.serialize_item(item)
|
||||||
strings.write_string(&builder, item_json)
|
strings.write_string(&builder, item_json)
|
||||||
}
|
}
|
||||||
|
|
||||||
strings.write_string(&builder, `],"Count":`)
|
strings.write_string(&builder, `],"Count":`)
|
||||||
fmt.sbprintf(&builder, "%d", len(result.items))
|
fmt.sbprintf(&builder, "%d", len(items))
|
||||||
strings.write_string(&builder, `,"ScannedCount":`)
|
strings.write_string(&builder, `,"ScannedCount":`)
|
||||||
fmt.sbprintf(&builder, "%d", len(result.items))
|
fmt.sbprintf(&builder, "%d", len(items))
|
||||||
|
|
||||||
// Add LastEvaluatedKey if present (pagination)
|
// Emit LastEvaluatedKey if the storage layer produced one
|
||||||
if last_key, has_last := result.last_evaluated_key.?; has_last {
|
if binary_key, has_last := last_evaluated_key_binary.?; has_last {
|
||||||
// TODO: Convert binary key back to DynamoDB JSON format
|
lek_json, lek_ok := dynamodb.serialize_last_evaluated_key(binary_key, metadata)
|
||||||
// For now, we'll just include it as base64 (not DynamoDB-compatible yet)
|
if lek_ok {
|
||||||
_ = last_key
|
strings.write_string(&builder, `,"LastEvaluatedKey":`)
|
||||||
// When fully implemented, this should decode the key and serialize as:
|
strings.write_string(&builder, lek_json)
|
||||||
// ,"LastEvaluatedKey":{"pk":{"S":"value"},"sk":{"N":"123"}}
|
}
|
||||||
|
// If decoding fails we still return the items — just without a pagination token.
|
||||||
|
// The client will assume the scan/query is complete.
|
||||||
}
|
}
|
||||||
|
|
||||||
strings.write_string(&builder, "}")
|
strings.write_string(&builder, "}")
|
||||||
@@ -527,6 +539,36 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
|
|||||||
response_set_body(response, transmute([]byte)resp_body)
|
response_set_body(response, transmute([]byte)resp_body)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Centralized Storage Error → DynamoDB Error mapping
|
||||||
|
//
|
||||||
|
// Maps storage errors to the correct DynamoDB error type AND HTTP status code.
|
||||||
|
// DynamoDB uses:
|
||||||
|
// 400 — ValidationException, ResourceNotFoundException, ResourceInUseException, etc.
|
||||||
|
// 500 — InternalServerError
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
handle_storage_error :: proc(response: ^HTTP_Response, err: dynamodb.Storage_Error) {
|
||||||
|
#partial switch err {
|
||||||
|
case .Table_Not_Found:
|
||||||
|
make_error_response(response, .ResourceNotFoundException, "Requested resource not found")
|
||||||
|
case .Table_Already_Exists:
|
||||||
|
make_error_response(response, .ResourceInUseException, "Table already exists")
|
||||||
|
case .Missing_Key_Attribute:
|
||||||
|
make_error_response(response, .ValidationException, "One or more required key attributes are missing")
|
||||||
|
case .Invalid_Key:
|
||||||
|
make_error_response(response, .ValidationException, "Invalid key: type mismatch or malformed key value")
|
||||||
|
case .Serialization_Error:
|
||||||
|
make_error_response(response, .InternalServerError, "Internal serialization error")
|
||||||
|
case .RocksDB_Error:
|
||||||
|
make_error_response(response, .InternalServerError, "Internal storage error")
|
||||||
|
case .Out_Of_Memory:
|
||||||
|
make_error_response(response, .InternalServerError, "Internal memory error")
|
||||||
|
case:
|
||||||
|
make_error_response(response, .InternalServerError, "Unexpected error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Schema Parsing Helpers
|
// Schema Parsing Helpers
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
@@ -560,7 +602,6 @@ parse_key_schema :: proc(root: json.Object) -> ([]dynamodb.Key_Schema_Element, K
|
|||||||
for elem, i in key_schema_array {
|
for elem, i in key_schema_array {
|
||||||
elem_obj, elem_ok := elem.(json.Object)
|
elem_obj, elem_ok := elem.(json.Object)
|
||||||
if !elem_ok {
|
if !elem_ok {
|
||||||
// Cleanup
|
|
||||||
for j in 0..<i {
|
for j in 0..<i {
|
||||||
delete(key_schema[j].attribute_name)
|
delete(key_schema[j].attribute_name)
|
||||||
}
|
}
|
||||||
@@ -702,8 +743,8 @@ parse_attribute_definitions :: proc(root: json.Object) -> ([]dynamodb.Attribute_
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get AttributeName
|
// Get AttributeName
|
||||||
attr_name_val, name_found := elem_obj["AttributeName"]
|
attr_name_val, attr_found := elem_obj["AttributeName"]
|
||||||
if !name_found {
|
if !attr_found {
|
||||||
for j in 0..<i {
|
for j in 0..<i {
|
||||||
delete(attr_defs[j].attribute_name)
|
delete(attr_defs[j].attribute_name)
|
||||||
}
|
}
|
||||||
@@ -794,10 +835,24 @@ validate_key_attributes_defined :: proc(key_schema: []dynamodb.Key_Schema_Elemen
|
|||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Error Response Helper
|
// Error Response Helper
|
||||||
|
//
|
||||||
|
// Maps DynamoDB error types to correct HTTP status codes:
|
||||||
|
// 400 — ValidationException, ResourceNotFoundException, ResourceInUseException,
|
||||||
|
// ConditionalCheckFailedException, SerializationException
|
||||||
|
// 500 — InternalServerError
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
make_error_response :: proc(response: ^HTTP_Response, err_type: dynamodb.DynamoDB_Error_Type, message: string) -> HTTP_Response {
|
make_error_response :: proc(response: ^HTTP_Response, err_type: dynamodb.DynamoDB_Error_Type, message: string) -> HTTP_Response {
|
||||||
response_set_status(response, .Bad_Request)
|
status: HTTP_Status
|
||||||
|
|
||||||
|
#partial switch err_type {
|
||||||
|
case .InternalServerError:
|
||||||
|
status = .Internal_Server_Error
|
||||||
|
case:
|
||||||
|
status = .Bad_Request
|
||||||
|
}
|
||||||
|
|
||||||
|
response_set_status(response, status)
|
||||||
error_body := dynamodb.error_to_response(err_type, message)
|
error_body := dynamodb.error_to_response(err_type, message)
|
||||||
response_set_body(response, transmute([]byte)error_body)
|
response_set_body(response, transmute([]byte)error_body)
|
||||||
return response^
|
return response^
|
||||||
|
|||||||
Reference in New Issue
Block a user