Compare commits

...

2 Commits

Author SHA1 Message Date
aa7fa3f004 fix json parsing to be less of a hack 2026-01-20 10:48:57 -05:00
f1a87052bc fix json parsing to be less of a hack 2026-01-20 10:48:42 -05:00
4 changed files with 1485 additions and 235 deletions

90
concat_project.sh Executable file
View File

@@ -0,0 +1,90 @@
#!/bin/bash
# Output file
OUTPUT_FILE="project_context.txt"
# Directories to exclude
EXCLUDE_DIRS=("zig-out" "data" ".git" "node_modules" ".zig-cache")
# File extensions to include (add more as needed)
INCLUDE_EXTENSIONS=("zig" "md" "yml" "yaml" "Makefile" "Dockerfile")
# Special files to include (without extension)
INCLUDE_FILES=("build.zig" "build.zig.zon" "Makefile" "Dockerfile" "docker-compose.yml" "README.md")
# Clear the output file
> "$OUTPUT_FILE"
# Function to check if directory should be excluded
should_exclude_dir() {
local dir="$1"
for exclude in "${EXCLUDE_DIRS[@]}"; do
if [[ "$dir" == *"/$exclude"* ]] || [[ "$dir" == "$exclude"* ]]; then
return 0
fi
done
return 1
}
# Function to check if file should be included
should_include_file() {
local file="$1"
local basename=$(basename "$file")
# Check if it's in the special files list
for special in "${INCLUDE_FILES[@]}"; do
if [[ "$basename" == "$special" ]]; then
return 0
fi
done
# Check extension
local ext="${file##*.}"
for include_ext in "${INCLUDE_EXTENSIONS[@]}"; do
if [[ "$ext" == "$include_ext" ]]; then
return 0
fi
done
return 1
}
# Add header
echo "# Project: zyna-db" >> "$OUTPUT_FILE"
echo "# Generated: $(date)" >> "$OUTPUT_FILE"
echo "" >> "$OUTPUT_FILE"
echo "================================================================================" >> "$OUTPUT_FILE"
echo "" >> "$OUTPUT_FILE"
# Find and concatenate files
while IFS= read -r -d '' file; do
# Get directory path
dir=$(dirname "$file")
# Skip excluded directories
if should_exclude_dir "$dir"; then
continue
fi
# Check if file should be included
if should_include_file "$file"; then
echo "Adding: $file"
# Add file delimiter
echo "================================================================================" >> "$OUTPUT_FILE"
echo "FILE: $file" >> "$OUTPUT_FILE"
echo "================================================================================" >> "$OUTPUT_FILE"
echo "" >> "$OUTPUT_FILE"
# Add file contents
cat "$file" >> "$OUTPUT_FILE"
# Add spacing
echo "" >> "$OUTPUT_FILE"
echo "" >> "$OUTPUT_FILE"
fi
done < <(find . -type f -print0 | sort -z)
echo ""
echo "Done! Output written to: $OUTPUT_FILE"
echo "File size: $(du -h "$OUTPUT_FILE" | cut -f1)"

View File

@@ -3,6 +3,7 @@ const std = @import("std");
const http = @import("../http.zig"); const http = @import("../http.zig");
const storage = @import("storage.zig"); const storage = @import("storage.zig");
const types = @import("types.zig"); const types = @import("types.zig");
const json = @import("json.zig");
pub const ApiHandler = struct { pub const ApiHandler = struct {
engine: *storage.StorageEngine, engine: *storage.StorageEngine,
@@ -53,13 +54,35 @@ pub const ApiHandler = struct {
} }
fn handleCreateTable(self: *Self, request: *const http.Request, response: *http.Response) void { fn handleCreateTable(self: *Self, request: *const http.Request, response: *http.Response) void {
// Parse table name from request body // Parse the entire request body properly
const table_name = extractJsonString(request.body, "TableName") orelse { const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, request.body, .{}) catch {
_ = self.errorResponse(response, .ValidationException, "Invalid JSON");
return;
};
defer parsed.deinit();
const root = switch (parsed.value) {
.object => |o| o,
else => {
_ = self.errorResponse(response, .ValidationException, "Request must be an object");
return;
},
};
// Extract TableName
const table_name_val = root.get("TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName"); _ = self.errorResponse(response, .ValidationException, "Missing TableName");
return; return;
}; };
const table_name = switch (table_name_val) {
.string => |s| s,
else => {
_ = self.errorResponse(response, .ValidationException, "TableName must be a string");
return;
},
};
// Simplified: create with default key schema // For now, use simplified key schema (you can enhance this later to parse from request)
const key_schema = [_]types.KeySchemaElement{ const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH }, .{ .attribute_name = "pk", .key_type = .HASH },
}; };
@@ -94,8 +117,8 @@ pub const ApiHandler = struct {
} }
fn handleDeleteTable(self: *Self, request: *const http.Request, response: *http.Response) void { fn handleDeleteTable(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse { const table_name = json.parseTableName(self.allocator, request.body) catch {
_ = self.errorResponse(response, .ValidationException, "Missing TableName"); _ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
return; return;
}; };
@@ -122,8 +145,8 @@ pub const ApiHandler = struct {
} }
fn handleDescribeTable(self: *Self, request: *const http.Request, response: *http.Response) void { fn handleDescribeTable(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse { const table_name = json.parseTableName(self.allocator, request.body) catch {
_ = self.errorResponse(response, .ValidationException, "Missing TableName"); _ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
return; return;
}; };
@@ -161,9 +184,9 @@ pub const ApiHandler = struct {
self.allocator.free(tables); self.allocator.free(tables);
} }
var buf = std.ArrayList(u8){}; var buf = std.ArrayList(u8).init(self.allocator);
defer buf.deinit(self.allocator); defer buf.deinit();
const writer = buf.writer(self.allocator); const writer = buf.writer();
writer.writeAll("{\"TableNames\":[") catch return; writer.writeAll("{\"TableNames\":[") catch return;
for (tables, 0..) |table, i| { for (tables, 0..) |table, i| {
@@ -176,47 +199,36 @@ pub const ApiHandler = struct {
} }
fn handlePutItem(self: *Self, request: *const http.Request, response: *http.Response) void { fn handlePutItem(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse { // Parse table name
_ = self.errorResponse(response, .ValidationException, "Missing TableName"); const table_name = json.parseTableName(self.allocator, request.body) catch {
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
return; return;
}; };
// Extract Item from request - simplified extraction // Parse item using proper JSON parsing (not string extraction)
const item_start = std.mem.indexOf(u8, request.body, "\"Item\":") orelse { var item = json.parseItemFromRequest(self.allocator, request.body) catch |err| {
_ = self.errorResponse(response, .ValidationException, "Missing Item"); const msg = switch (err) {
error.MissingItem => "Missing Item field",
error.InvalidRequest => "Invalid request format",
else => "Invalid Item format",
};
_ = self.errorResponse(response, .ValidationException, msg);
return; return;
}; };
defer json.deinitItem(&item, self.allocator);
// Find matching brace for Item value // Store the item (storage engine will serialize it canonically)
var brace_count: i32 = 0; self.engine.putItem(table_name, item) catch |err| {
var item_json_start: usize = 0;
var item_json_end: usize = 0;
for (request.body[item_start..], 0..) |char, i| {
if (char == '{') {
if (brace_count == 0) item_json_start = item_start + i;
brace_count += 1;
} else if (char == '}') {
brace_count -= 1;
if (brace_count == 0) {
item_json_end = item_start + i + 1;
break;
}
}
}
if (item_json_start == 0 or item_json_end == 0) {
_ = self.errorResponse(response, .ValidationException, "Invalid Item format");
return;
}
const item_json = request.body[item_json_start..item_json_end];
self.engine.putItem(table_name, item_json) catch |err| {
switch (err) { switch (err) {
storage.StorageError.TableNotFound => { storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
}, },
storage.StorageError.MissingKeyAttribute => {
_ = self.errorResponse(response, .ValidationException, "Item missing required key attribute");
},
storage.StorageError.InvalidKey, storage.StorageError.KeyValueContainsSeparator => {
_ = self.errorResponse(response, .ValidationException, "Invalid key format or key contains ':' character (limitation will be removed in Phase 2)");
},
else => { else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to put item"); _ = self.errorResponse(response, .InternalServerError, "Failed to put item");
}, },
@@ -228,41 +240,35 @@ pub const ApiHandler = struct {
} }
fn handleGetItem(self: *Self, request: *const http.Request, response: *http.Response) void { fn handleGetItem(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse { // Parse table name
_ = self.errorResponse(response, .ValidationException, "Missing TableName"); const table_name = json.parseTableName(self.allocator, request.body) catch {
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
return; return;
}; };
// Extract Key from request // Parse key using proper JSON parsing
const key_start = std.mem.indexOf(u8, request.body, "\"Key\":") orelse { var key = json.parseKeyFromRequest(self.allocator, request.body) catch |err| {
_ = self.errorResponse(response, .ValidationException, "Missing Key"); const msg = switch (err) {
error.MissingKey => "Missing Key field",
error.InvalidRequest => "Invalid request format",
else => "Invalid Key format",
};
_ = self.errorResponse(response, .ValidationException, msg);
return; return;
}; };
defer json.deinitItem(&key, self.allocator);
var brace_count: i32 = 0; const item = self.engine.getItem(table_name, key) catch |err| {
var key_json_start: usize = 0;
var key_json_end: usize = 0;
for (request.body[key_start..], 0..) |char, i| {
if (char == '{') {
if (brace_count == 0) key_json_start = key_start + i;
brace_count += 1;
} else if (char == '}') {
brace_count -= 1;
if (brace_count == 0) {
key_json_end = key_start + i + 1;
break;
}
}
}
const key_json = request.body[key_json_start..key_json_end];
const item = self.engine.getItem(table_name, key_json) catch |err| {
switch (err) { switch (err) {
storage.StorageError.TableNotFound => { storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
}, },
storage.StorageError.MissingKeyAttribute => {
_ = self.errorResponse(response, .ValidationException, "Key missing required attributes");
},
storage.StorageError.InvalidKey, storage.StorageError.KeyValueContainsSeparator => {
_ = self.errorResponse(response, .ValidationException, "Invalid key format");
},
else => { else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to get item"); _ = self.errorResponse(response, .InternalServerError, "Failed to get item");
}, },
@@ -271,8 +277,16 @@ pub const ApiHandler = struct {
}; };
if (item) |i| { if (item) |i| {
defer self.allocator.free(i); defer json.deinitItem(&i, self.allocator);
const resp = std.fmt.allocPrint(self.allocator, "{{\"Item\":{s}}}", .{i}) catch return;
// Serialize item to canonical JSON for response
const item_json = json.serializeItem(self.allocator, i) catch {
_ = self.errorResponse(response, .InternalServerError, "Failed to serialize item");
return;
};
defer self.allocator.free(item_json);
const resp = std.fmt.allocPrint(self.allocator, "{{\"Item\":{s}}}", .{item_json}) catch return;
defer self.allocator.free(resp); defer self.allocator.free(resp);
response.setBody(resp) catch {}; response.setBody(resp) catch {};
} else { } else {
@@ -281,40 +295,35 @@ pub const ApiHandler = struct {
} }
fn handleDeleteItem(self: *Self, request: *const http.Request, response: *http.Response) void { fn handleDeleteItem(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse { // Parse table name
_ = self.errorResponse(response, .ValidationException, "Missing TableName"); const table_name = json.parseTableName(self.allocator, request.body) catch {
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
return; return;
}; };
const key_start = std.mem.indexOf(u8, request.body, "\"Key\":") orelse { // Parse key using proper JSON parsing
_ = self.errorResponse(response, .ValidationException, "Missing Key"); var key = json.parseKeyFromRequest(self.allocator, request.body) catch |err| {
const msg = switch (err) {
error.MissingKey => "Missing Key field",
error.InvalidRequest => "Invalid request format",
else => "Invalid Key format",
};
_ = self.errorResponse(response, .ValidationException, msg);
return; return;
}; };
defer json.deinitItem(&key, self.allocator);
var brace_count: i32 = 0; self.engine.deleteItem(table_name, key) catch |err| {
var key_json_start: usize = 0;
var key_json_end: usize = 0;
for (request.body[key_start..], 0..) |char, i| {
if (char == '{') {
if (brace_count == 0) key_json_start = key_start + i;
brace_count += 1;
} else if (char == '}') {
brace_count -= 1;
if (brace_count == 0) {
key_json_end = key_start + i + 1;
break;
}
}
}
const key_json = request.body[key_json_start..key_json_end];
self.engine.deleteItem(table_name, key_json) catch |err| {
switch (err) { switch (err) {
storage.StorageError.TableNotFound => { storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
}, },
storage.StorageError.MissingKeyAttribute => {
_ = self.errorResponse(response, .ValidationException, "Key missing required attributes");
},
storage.StorageError.InvalidKey, storage.StorageError.KeyValueContainsSeparator => {
_ = self.errorResponse(response, .ValidationException, "Invalid key format");
},
else => { else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to delete item"); _ = self.errorResponse(response, .InternalServerError, "Failed to delete item");
}, },
@@ -326,14 +335,15 @@ pub const ApiHandler = struct {
} }
fn handleQuery(self: *Self, request: *const http.Request, response: *http.Response) void { fn handleQuery(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse { // Parse table name
_ = self.errorResponse(response, .ValidationException, "Missing TableName"); const table_name = json.parseTableName(self.allocator, request.body) catch {
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
return; return;
}; };
// Simplified: extract partition key from KeyConditionExpression // Simplified: extract partition key value from ExpressionAttributeValues
// In production, would need full expression parsing // PHASE 6 TODO: Implement proper expression parsing
const pk_value = extractJsonString(request.body, ":pk") orelse "default"; const pk_value = extractSimpleValue(request.body, ":pk") orelse "default";
const items = self.engine.query(table_name, pk_value, null) catch |err| { const items = self.engine.query(table_name, pk_value, null) catch |err| {
switch (err) { switch (err) {
@@ -347,7 +357,7 @@ pub const ApiHandler = struct {
return; return;
}; };
defer { defer {
for (items) |item| self.allocator.free(item); for (items) |item| json.deinitItem(&item, self.allocator);
self.allocator.free(items); self.allocator.free(items);
} }
@@ -355,8 +365,9 @@ pub const ApiHandler = struct {
} }
fn handleScan(self: *Self, request: *const http.Request, response: *http.Response) void { fn handleScan(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse { // Parse table name
_ = self.errorResponse(response, .ValidationException, "Missing TableName"); const table_name = json.parseTableName(self.allocator, request.body) catch {
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
return; return;
}; };
@@ -372,22 +383,23 @@ pub const ApiHandler = struct {
return; return;
}; };
defer { defer {
for (items) |item| self.allocator.free(item); for (items) |item| json.deinitItem(&item, self.allocator);
self.allocator.free(items); self.allocator.free(items);
} }
self.writeItemsResponse(response, items); self.writeItemsResponse(response, items);
} }
fn writeItemsResponse(self: *Self, response: *http.Response, items: []const []const u8) void { fn writeItemsResponse(self: *Self, response: *http.Response, items: []const types.Item) void {
var buf = std.ArrayList(u8){}; var buf = std.ArrayList(u8).init(self.allocator);
defer buf.deinit(self.allocator); defer buf.deinit();
const writer = buf.writer(self.allocator); const writer = buf.writer();
writer.writeAll("{\"Items\":[") catch return; writer.writeAll("{\"Items\":[") catch return;
for (items, 0..) |item, i| { for (items, 0..) |item, i| {
if (i > 0) writer.writeByte(',') catch return; if (i > 0) writer.writeByte(',') catch return;
writer.writeAll(item) catch return; // Serialize each item to canonical JSON
json.serializeItemToWriter(writer, item) catch return;
} }
writer.print("],\"Count\":{d},\"ScannedCount\":{d}}}", .{ items.len, items.len }) catch return; writer.print("],\"Count\":{d},\"ScannedCount\":{d}}}", .{ items.len, items.len }) catch return;
@@ -409,8 +421,9 @@ pub const ApiHandler = struct {
} }
}; };
fn extractJsonString(json_data: []const u8, key: []const u8) ?[]const u8 { /// Temporary helper for Query operation until we implement proper expression parsing in Phase 6
// Search for "key":"value" pattern /// PHASE 6 TODO: Replace with proper ExpressionAttributeValues parsing
fn extractSimpleValue(json_data: []const u8, key: []const u8) ?[]const u8 {
var search_buf: [256]u8 = undefined; var search_buf: [256]u8 = undefined;
const search = std.fmt.bufPrint(&search_buf, "\"{s}\":\"", .{key}) catch return null; const search = std.fmt.bufPrint(&search_buf, "\"{s}\":\"", .{key}) catch return null;

806
src/dynamodb/json.zig Normal file
View File

@@ -0,0 +1,806 @@
/// DynamoDB JSON parsing and serialization
/// Pure functions for converting between DynamoDB JSON format and internal types
const std = @import("std");
const types = @import("types.zig");
// ============================================================================
// Parsing (JSON → Types)
// ============================================================================
/// Parse DynamoDB JSON format into an Item
/// Caller owns returned Item and must call deinitItem() when done
pub fn parseItem(allocator: std.mem.Allocator, json_bytes: []const u8) !types.Item {
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json_bytes, .{});
defer parsed.deinit();
return try parseItemFromValue(allocator, parsed.value);
}
/// Parse an Item from an already-parsed JSON Value
/// This is more efficient when you already have a Value (e.g., from request body parsing)
/// Caller owns returned Item and must call deinitItem() when done
pub fn parseItemFromValue(allocator: std.mem.Allocator, value: std.json.Value) !types.Item {
const obj = switch (value) {
.object => |o| o,
else => return error.InvalidItemFormat,
};
var item = types.Item.init(allocator);
errdefer deinitItem(&item, allocator);
var iter = obj.iterator();
while (iter.next()) |entry| {
const attr_name = try allocator.dupe(u8, entry.key_ptr.*);
errdefer allocator.free(attr_name);
var attr_value = try parseAttributeValue(allocator, entry.value_ptr.*);
errdefer deinitAttributeValue(&attr_value, allocator);
try item.put(attr_name, attr_value);
}
return item;
}
/// Parse a single DynamoDB AttributeValue from JSON
/// Format: {"S": "value"}, {"N": "123"}, {"M": {...}}, etc.
pub fn parseAttributeValue(allocator: std.mem.Allocator, value: std.json.Value) error{ InvalidAttributeFormat, InvalidStringAttribute, InvalidNumberAttribute, InvalidBinaryAttribute, InvalidBoolAttribute, InvalidNullAttribute, InvalidStringSetAttribute, InvalidNumberSetAttribute, InvalidBinarySetAttribute, InvalidListAttribute, InvalidMapAttribute, UnknownAttributeType, OutOfMemory }!types.AttributeValue {
const obj = switch (value) {
.object => |o| o,
else => return error.InvalidAttributeFormat,
};
// DynamoDB attribute must have exactly one key (the type indicator)
if (obj.count() != 1) return error.InvalidAttributeFormat;
var iter = obj.iterator();
const entry = iter.next() orelse return error.InvalidAttributeFormat;
const type_name = entry.key_ptr.*;
const type_value = entry.value_ptr.*;
// String
if (std.mem.eql(u8, type_name, "S")) {
const str = switch (type_value) {
.string => |s| s,
else => return error.InvalidStringAttribute,
};
return types.AttributeValue{ .S = try allocator.dupe(u8, str) };
}
// Number (stored as string)
if (std.mem.eql(u8, type_name, "N")) {
const str = switch (type_value) {
.string => |s| s,
else => return error.InvalidNumberAttribute,
};
return types.AttributeValue{ .N = try allocator.dupe(u8, str) };
}
// Binary (base64 string)
if (std.mem.eql(u8, type_name, "B")) {
const str = switch (type_value) {
.string => |s| s,
else => return error.InvalidBinaryAttribute,
};
return types.AttributeValue{ .B = try allocator.dupe(u8, str) };
}
// Boolean
if (std.mem.eql(u8, type_name, "BOOL")) {
const b = switch (type_value) {
.bool => |b_val| b_val,
else => return error.InvalidBoolAttribute,
};
return types.AttributeValue{ .BOOL = b };
}
// Null
if (std.mem.eql(u8, type_name, "NULL")) {
const n = switch (type_value) {
.bool => |b| b,
else => return error.InvalidNullAttribute,
};
return types.AttributeValue{ .NULL = n };
}
// String Set
if (std.mem.eql(u8, type_name, "SS")) {
const arr = switch (type_value) {
.array => |a| a,
else => return error.InvalidStringSetAttribute,
};
var strings = try allocator.alloc([]const u8, arr.items.len);
errdefer allocator.free(strings);
for (arr.items, 0..) |item, i| {
const str = switch (item) {
.string => |s| s,
else => {
// Cleanup already allocated strings
for (strings[0..i]) |s| allocator.free(s);
return error.InvalidStringSetAttribute;
},
};
strings[i] = try allocator.dupe(u8, str);
}
return types.AttributeValue{ .SS = strings };
}
// Number Set
if (std.mem.eql(u8, type_name, "NS")) {
const arr = switch (type_value) {
.array => |a| a,
else => return error.InvalidNumberSetAttribute,
};
var numbers = try allocator.alloc([]const u8, arr.items.len);
errdefer allocator.free(numbers);
for (arr.items, 0..) |item, i| {
const str = switch (item) {
.string => |s| s,
else => {
for (numbers[0..i]) |n| allocator.free(n);
return error.InvalidNumberSetAttribute;
},
};
numbers[i] = try allocator.dupe(u8, str);
}
return types.AttributeValue{ .NS = numbers };
}
// Binary Set
if (std.mem.eql(u8, type_name, "BS")) {
const arr = switch (type_value) {
.array => |a| a,
else => return error.InvalidBinarySetAttribute,
};
var binaries = try allocator.alloc([]const u8, arr.items.len);
errdefer allocator.free(binaries);
for (arr.items, 0..) |item, i| {
const str = switch (item) {
.string => |s| s,
else => {
for (binaries[0..i]) |b| allocator.free(b);
return error.InvalidBinarySetAttribute;
},
};
binaries[i] = try allocator.dupe(u8, str);
}
return types.AttributeValue{ .BS = binaries };
}
// List (recursive)
if (std.mem.eql(u8, type_name, "L")) {
const arr = switch (type_value) {
.array => |a| a,
else => return error.InvalidListAttribute,
};
var list = try allocator.alloc(types.AttributeValue, arr.items.len);
errdefer {
for (list[0..arr.items.len]) |*item| {
deinitAttributeValue(item, allocator);
}
allocator.free(list);
}
for (arr.items, 0..) |item, i| {
list[i] = try parseAttributeValue(allocator, item);
}
return types.AttributeValue{ .L = list };
}
// Map (recursive)
if (std.mem.eql(u8, type_name, "M")) {
const obj_val = switch (type_value) {
.object => |o| o,
else => return error.InvalidMapAttribute,
};
var map = std.StringHashMap(types.AttributeValue).init(allocator);
errdefer {
var map_iter = map.iterator();
while (map_iter.next()) |map_entry| {
allocator.free(map_entry.key_ptr.*);
deinitAttributeValue(map_entry.value_ptr, allocator);
}
map.deinit();
}
var map_iter = obj_val.iterator();
while (map_iter.next()) |map_entry| {
const key = try allocator.dupe(u8, map_entry.key_ptr.*);
errdefer allocator.free(key);
var val = try parseAttributeValue(allocator, map_entry.value_ptr.*);
errdefer deinitAttributeValue(&val, allocator);
try map.put(key, val);
}
return types.AttributeValue{ .M = map };
}
return error.UnknownAttributeType;
}
// ============================================================================
// Serialization (Types → JSON)
// ============================================================================
/// Serialize an Item to canonical DynamoDB JSON format
/// Keys are sorted alphabetically for deterministic output
/// Caller owns returned slice and must free it
pub fn serializeItem(allocator: std.mem.Allocator, item: types.Item) ![]u8 {
var buf = std.ArrayList(u8).init(allocator);
errdefer buf.deinit();
const writer = buf.writer();
try serializeItemToWriter(writer, item);
return buf.toOwnedSlice();
}
/// Serialize an Item to a writer with deterministic ordering
pub fn serializeItemToWriter(writer: anytype, item: types.Item) !void {
// Collect and sort keys for deterministic output
var keys = std.ArrayList([]const u8).init(std.heap.page_allocator);
defer keys.deinit();
var iter = item.iterator();
while (iter.next()) |entry| {
try keys.append(entry.key_ptr.*);
}
// Sort keys alphabetically
std.mem.sort([]const u8, keys.items, {}, struct {
fn lessThan(_: void, a: []const u8, b: []const u8) bool {
return std.mem.lessThan(u8, a, b);
}
}.lessThan);
try writer.writeByte('{');
for (keys.items, 0..) |key, i| {
if (i > 0) try writer.writeByte(',');
try writer.print("\"{s}\":", .{key});
const value = item.get(key).?;
try serializeAttributeValue(writer, value);
}
try writer.writeByte('}');
}
/// Serialize an AttributeValue to DynamoDB JSON format
/// Caller owns returned slice and must free it
pub fn serializeAttributeValue(writer: anytype, attr: types.AttributeValue) !void {
switch (attr) {
.S => |s| try writer.print("{{\"S\":\"{s}\"}}", .{s}),
.N => |n| try writer.print("{{\"N\":\"{s}\"}}", .{n}),
.B => |b| try writer.print("{{\"B\":\"{s}\"}}", .{b}),
.BOOL => |b| try writer.print("{{\"BOOL\":{}}}", .{b}),
.NULL => try writer.writeAll("{\"NULL\":true}"),
.SS => |ss| {
try writer.writeAll("{\"SS\":[");
for (ss, 0..) |s, i| {
if (i > 0) try writer.writeByte(',');
try writer.print("\"{s}\"", .{s});
}
try writer.writeAll("]}");
},
.NS => |ns| {
try writer.writeAll("{\"NS\":[");
for (ns, 0..) |n, i| {
if (i > 0) try writer.writeByte(',');
try writer.print("\"{s}\"", .{n});
}
try writer.writeAll("]}");
},
.BS => |bs| {
try writer.writeAll("{\"BS\":[");
for (bs, 0..) |b, i| {
if (i > 0) try writer.writeByte(',');
try writer.print("\"{s}\"", .{b});
}
try writer.writeAll("]}");
},
.L => |list| {
try writer.writeAll("{\"L\":[");
for (list, 0..) |item, i| {
if (i > 0) try writer.writeByte(',');
try serializeAttributeValue(writer, item);
}
try writer.writeAll("]}");
},
.M => |map| {
try writer.writeAll("{\"M\":{");
// Collect and sort keys for deterministic output
var keys = std.ArrayList([]const u8).init(std.heap.page_allocator);
defer keys.deinit();
var iter = map.iterator();
while (iter.next()) |entry| {
try keys.append(entry.key_ptr.*);
}
std.mem.sort([]const u8, keys.items, {}, struct {
fn lessThan(_: void, a: []const u8, b: []const u8) bool {
return std.mem.lessThan(u8, a, b);
}
}.lessThan);
for (keys.items, 0..) |key, i| {
if (i > 0) try writer.writeByte(',');
try writer.print("\"{s}\":", .{key});
const value = map.get(key).?;
try serializeAttributeValue(writer, value);
}
try writer.writeAll("}}");
},
}
}
// ============================================================================
// Request Parsing Helpers
// ============================================================================
/// Extract table name from request body
pub fn parseTableName(allocator: std.mem.Allocator, request_body: []const u8) ![]const u8 {
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, request_body, .{});
defer parsed.deinit();
const root = switch (parsed.value) {
.object => |o| o,
else => return error.InvalidRequest,
};
const table_name_val = root.get("TableName") orelse return error.MissingTableName;
const table_name = switch (table_name_val) {
.string => |s| s,
else => return error.InvalidTableName,
};
return table_name;
}
/// Parse Item field from request body
/// Returns owned Item - caller must call deinitItem()
pub fn parseItemFromRequest(allocator: std.mem.Allocator, request_body: []const u8) !types.Item {
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, request_body, .{});
defer parsed.deinit();
const root = switch (parsed.value) {
.object => |o| o,
else => return error.InvalidRequest,
};
const item_val = root.get("Item") orelse return error.MissingItem;
return try parseItemFromValue(allocator, item_val);
}
/// Parse Key field from request body
/// Returns owned Item representing the key - caller must call deinitItem()
pub fn parseKeyFromRequest(allocator: std.mem.Allocator, request_body: []const u8) !types.Item {
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, request_body, .{});
defer parsed.deinit();
const root = switch (parsed.value) {
.object => |o| o,
else => return error.InvalidRequest,
};
const key_val = root.get("Key") orelse return error.MissingKey;
return try parseItemFromValue(allocator, key_val);
}
// ============================================================================
// Storage Helpers
// ============================================================================
/// Extract just the key attributes from an item based on key schema
/// Returns a new Item containing only the key attributes (deep copied)
/// Caller owns returned Item and must call deinitItem() when done
pub fn extractKeyAttributes(
allocator: std.mem.Allocator,
item: types.Item,
key_schema: []const types.KeySchemaElement,
) !types.Item {
var key = types.Item.init(allocator);
errdefer deinitItem(&key, allocator);
for (key_schema) |schema_element| {
const attr_value = item.get(schema_element.attribute_name) orelse
return error.MissingKeyAttribute;
const attr_name = try allocator.dupe(u8, schema_element.attribute_name);
errdefer allocator.free(attr_name);
// Deep copy the attribute value
const copied_value = try deepCopyAttributeValue(allocator, attr_value);
errdefer deinitAttributeValue(&copied_value, allocator);
try key.put(attr_name, copied_value);
}
return key;
}
/// Deep copy an AttributeValue
fn deepCopyAttributeValue(allocator: std.mem.Allocator, attr: types.AttributeValue) !types.AttributeValue {
return switch (attr) {
.S => |s| types.AttributeValue{ .S = try allocator.dupe(u8, s) },
.N => |n| types.AttributeValue{ .N = try allocator.dupe(u8, n) },
.B => |b| types.AttributeValue{ .B = try allocator.dupe(u8, b) },
.BOOL => |b| types.AttributeValue{ .BOOL = b },
.NULL => |n| types.AttributeValue{ .NULL = n },
.SS => |ss| blk: {
var copy = try allocator.alloc([]const u8, ss.len);
errdefer allocator.free(copy);
for (ss, 0..) |s, i| {
copy[i] = try allocator.dupe(u8, s);
}
break :blk types.AttributeValue{ .SS = copy };
},
.NS => |ns| blk: {
var copy = try allocator.alloc([]const u8, ns.len);
errdefer allocator.free(copy);
for (ns, 0..) |n, i| {
copy[i] = try allocator.dupe(u8, n);
}
break :blk types.AttributeValue{ .NS = copy };
},
.BS => |bs| blk: {
var copy = try allocator.alloc([]const u8, bs.len);
errdefer allocator.free(copy);
for (bs, 0..) |b, i| {
copy[i] = try allocator.dupe(u8, b);
}
break :blk types.AttributeValue{ .BS = copy };
},
.L => |list| blk: {
var copy = try allocator.alloc(types.AttributeValue, list.len);
errdefer allocator.free(copy);
for (list, 0..) |item, i| {
copy[i] = try deepCopyAttributeValue(allocator, item);
}
break :blk types.AttributeValue{ .L = copy };
},
.M => |map| blk: {
var copy = std.StringHashMap(types.AttributeValue).init(allocator);
errdefer {
var iter = copy.iterator();
while (iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
deinitAttributeValue(entry.value_ptr, allocator);
}
copy.deinit();
}
var iter = map.iterator();
while (iter.next()) |entry| {
const key = try allocator.dupe(u8, entry.key_ptr.*);
const value = try deepCopyAttributeValue(allocator, entry.value_ptr.*);
try copy.put(key, value);
}
break :blk types.AttributeValue{ .M = copy };
},
};
}
/// Validate that a key attribute value doesn't contain the separator
/// PHASE 2 TODO: Remove this validation once we implement proper binary key encoding
/// with length-prefixed segments. Binary encoding will eliminate separator collision issues.
fn validateKeyValue(value: []const u8) !void {
if (std.mem.indexOf(u8, value, ":")) |_| {
return error.KeyValueContainsSeparator;
}
}
/// Build a RocksDB storage key from table name and key attributes
/// Format: _data:{table}:{pk} or _data:{table}:{pk}:{sk}
///
/// PHASE 2 TODO: Replace this textual key format with binary encoding:
/// - Use length-prefixed segments: 0x01 | "data" | len(table) | table | len(pk) | pk | [len(sk) | sk]
/// - This prevents separator collision and makes prefix scans reliable
/// - Current limitation: key values cannot contain ':' character
///
/// Caller owns returned slice and must free it
pub fn buildRocksDBKey(
allocator: std.mem.Allocator,
table_name: []const u8,
key_schema: []const types.KeySchemaElement,
key: types.Item,
) ![]u8 {
const KeyPrefix = struct {
const data = "_data:";
};
// Find partition key and sort key
var pk_value: ?[]const u8 = null;
var sk_value: ?[]const u8 = null;
for (key_schema) |schema_element| {
const attr = key.get(schema_element.attribute_name) orelse
return error.MissingKeyAttribute;
// Extract string value from attribute
// DynamoDB keys must be S (string), N (number), or B (binary)
const value = switch (attr) {
.S => |s| s,
.N => |n| n,
.B => |b| b,
else => return error.InvalidKeyType,
};
// PHASE 2 TODO: Remove this validation - binary encoding handles all values
try validateKeyValue(value);
switch (schema_element.key_type) {
.HASH => pk_value = value,
.RANGE => sk_value = value,
}
}
const pk = pk_value orelse return error.MissingPartitionKey;
if (sk_value) |sk| {
return std.fmt.allocPrint(
allocator,
"{s}{s}:{s}:{s}",
.{ KeyPrefix.data, table_name, pk, sk },
);
} else {
return std.fmt.allocPrint(
allocator,
"{s}{s}:{s}",
.{ KeyPrefix.data, table_name, pk },
);
}
}
// ============================================================================
// Memory Management
// ============================================================================
/// Free all memory associated with an AttributeValue
/// Recursively frees nested structures (Maps, Lists)
pub fn deinitAttributeValue(attr: *types.AttributeValue, allocator: std.mem.Allocator) void {
switch (attr.*) {
.S, .N, .B => |slice| allocator.free(slice),
.SS, .NS, .BS => |slices| {
for (slices) |s| allocator.free(s);
allocator.free(slices);
},
.M => |*map| {
var iter = map.iterator();
while (iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
deinitAttributeValue(entry.value_ptr, allocator);
}
map.deinit();
},
.L => |list| {
for (list) |*item| {
deinitAttributeValue(item, allocator);
}
allocator.free(list);
},
.NULL, .BOOL => {},
}
}
/// Free all memory associated with an Item
pub fn deinitItem(item: *types.Item, allocator: std.mem.Allocator) void {
var iter = item.iterator();
while (iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
deinitAttributeValue(entry.value_ptr, allocator);
}
item.deinit();
}
// ============================================================================
// Tests
// ============================================================================
test "parse simple string attribute" {
const allocator = std.testing.allocator;
const json_str = "{\"S\":\"hello world\"}";
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json_str, .{});
defer parsed.deinit();
var attr = try parseAttributeValue(allocator, parsed.value);
defer deinitAttributeValue(&attr, allocator);
try std.testing.expectEqualStrings("hello world", attr.S);
}
test "parse simple item" {
const allocator = std.testing.allocator;
const json_str =
\\{"pk":{"S":"user123"},"name":{"S":"Alice"},"age":{"N":"25"}}
;
var item = try parseItem(allocator, json_str);
defer deinitItem(&item, allocator);
try std.testing.expectEqual(@as(usize, 3), item.count());
const pk = item.get("pk").?;
try std.testing.expectEqualStrings("user123", pk.S);
const name = item.get("name").?;
try std.testing.expectEqualStrings("Alice", name.S);
const age = item.get("age").?;
try std.testing.expectEqualStrings("25", age.N);
}
test "parseItemFromValue" {
const allocator = std.testing.allocator;
const json_str = "{\"pk\":{\"S\":\"test\"},\"data\":{\"N\":\"42\"}}";
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json_str, .{});
defer parsed.deinit();
var item = try parseItemFromValue(allocator, parsed.value);
defer deinitItem(&item, allocator);
try std.testing.expectEqual(@as(usize, 2), item.count());
}
test "parse nested map" {
const allocator = std.testing.allocator;
const json_str =
\\{"data":{"M":{"key1":{"S":"value1"},"key2":{"N":"42"}}}}
;
var item = try parseItem(allocator, json_str);
defer deinitItem(&item, allocator);
const data = item.get("data").?;
const inner = data.M.get("key1").?;
try std.testing.expectEqualStrings("value1", inner.S);
}
test "serialize item with deterministic ordering" {
const allocator = std.testing.allocator;
const original =
\\{"pk":{"S":"test"},"num":{"N":"123"},"data":{"S":"value"}}
;
var item = try parseItem(allocator, original);
defer deinitItem(&item, allocator);
const serialized = try serializeItem(allocator, item);
defer allocator.free(serialized);
// Keys should be alphabetically sorted: data, num, pk
const expected = "{\"data\":{\"S\":\"value\"},\"num\":{\"N\":\"123\"},\"pk\":{\"S\":\"test\"}}";
try std.testing.expectEqualStrings(expected, serialized);
}
test "serialize nested map with deterministic ordering" {
const allocator = std.testing.allocator;
const original =
\\{"outer":{"M":{"z":{"S":"last"},"a":{"S":"first"},"m":{"S":"middle"}}}}
;
var item = try parseItem(allocator, original);
defer deinitItem(&item, allocator);
const serialized = try serializeItem(allocator, item);
defer allocator.free(serialized);
// Inner map keys should also be sorted: a, m, z
const expected = "{\"outer\":{\"M\":{\"a\":{\"S\":\"first\"},\"m\":{\"S\":\"middle\"},\"z\":{\"S\":\"last\"}}}}";
try std.testing.expectEqualStrings(expected, serialized);
}
test "build rocksdb key with partition key only" {
const allocator = std.testing.allocator;
const item_json = "{\"pk\":{\"S\":\"user123\"},\"data\":{\"S\":\"test\"}}";
var item = try parseItem(allocator, item_json);
defer deinitItem(&item, allocator);
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const key = try buildRocksDBKey(allocator, "Users", &key_schema, item);
defer allocator.free(key);
try std.testing.expectEqualStrings("_data:Users:user123", key);
}
test "build rocksdb key with partition and sort keys" {
const allocator = std.testing.allocator;
const item_json = "{\"pk\":{\"S\":\"user123\"},\"sk\":{\"S\":\"metadata\"}}";
var item = try parseItem(allocator, item_json);
defer deinitItem(&item, allocator);
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
.{ .attribute_name = "sk", .key_type = .RANGE },
};
const key = try buildRocksDBKey(allocator, "Items", &key_schema, item);
defer allocator.free(key);
try std.testing.expectEqualStrings("_data:Items:user123:metadata", key);
}
test "reject key with separator" {
const allocator = std.testing.allocator;
const item_json = "{\"pk\":{\"S\":\"user:123\"},\"data\":{\"S\":\"test\"}}";
var item = try parseItem(allocator, item_json);
defer deinitItem(&item, allocator);
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const result = buildRocksDBKey(allocator, "Users", &key_schema, item);
try std.testing.expectError(error.KeyValueContainsSeparator, result);
}
test "parseTableName from request" {
const allocator = std.testing.allocator;
const request = "{\"TableName\":\"Users\",\"Item\":{}}";
const table_name = try parseTableName(allocator, request);
try std.testing.expectEqualStrings("Users", table_name);
}
test "parseItemFromRequest" {
const allocator = std.testing.allocator;
const request = "{\"TableName\":\"Users\",\"Item\":{\"pk\":{\"S\":\"test\"}}}";
var item = try parseItemFromRequest(allocator, request);
defer deinitItem(&item, allocator);
try std.testing.expectEqual(@as(usize, 1), item.count());
const pk = item.get("pk").?;
try std.testing.expectEqualStrings("test", pk.S);
}
test "parseKeyFromRequest" {
const allocator = std.testing.allocator;
const request = "{\"TableName\":\"Users\",\"Key\":{\"pk\":{\"S\":\"user123\"}}}";
var key = try parseKeyFromRequest(allocator, request);
defer deinitItem(&key, allocator);
try std.testing.expectEqual(@as(usize, 1), key.count());
}
test "extractKeyAttributes deep copies" {
const allocator = std.testing.allocator;
const item_json = "{\"pk\":{\"S\":\"user123\"},\"name\":{\"S\":\"Alice\"},\"age\":{\"N\":\"25\"}}";
var item = try parseItem(allocator, item_json);
defer deinitItem(&item, allocator);
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
var extracted = try extractKeyAttributes(allocator, item, &key_schema);
defer deinitItem(&extracted, allocator);
try std.testing.expectEqual(@as(usize, 1), extracted.count());
const pk = extracted.get("pk").?;
try std.testing.expectEqualStrings("user123", pk.S);
}

View File

@@ -2,18 +2,22 @@
const std = @import("std"); const std = @import("std");
const rocksdb = @import("../rocksdb.zig"); const rocksdb = @import("../rocksdb.zig");
const types = @import("types.zig"); const types = @import("types.zig");
const json = @import("json.zig");
pub const StorageError = error{ pub const StorageError = error{
TableNotFound, TableNotFound,
TableAlreadyExists, TableAlreadyExists,
ItemNotFound, ItemNotFound,
InvalidKey, InvalidKey,
MissingKeyAttribute,
KeyValueContainsSeparator,
SerializationError, SerializationError,
RocksDBError, RocksDBError,
OutOfMemory, OutOfMemory,
}; };
/// Key prefixes for different data types in RocksDB /// Key prefixes for different data types in RocksDB
/// PHASE 2 TODO: Replace textual prefixes with binary encoding using length-prefixed segments
const KeyPrefix = struct { const KeyPrefix = struct {
/// Table metadata: _meta:{table_name} /// Table metadata: _meta:{table_name}
const meta = "_meta:"; const meta = "_meta:";
@@ -25,6 +29,27 @@ const KeyPrefix = struct {
const lsi = "_lsi:"; const lsi = "_lsi:";
}; };
/// In-memory representation of table metadata
const TableMetadata = struct {
table_name: []const u8,
key_schema: []types.KeySchemaElement,
attribute_definitions: []types.AttributeDefinition,
table_status: types.TableStatus,
creation_date_time: i64,
pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void {
allocator.free(self.table_name);
for (self.key_schema) |ks| {
allocator.free(ks.attribute_name);
}
allocator.free(self.key_schema);
for (self.attribute_definitions) |ad| {
allocator.free(ad.attribute_name);
}
allocator.free(self.attribute_definitions);
}
};
pub const StorageEngine = struct { pub const StorageEngine = struct {
db: rocksdb.DB, db: rocksdb.DB,
allocator: std.mem.Allocator, allocator: std.mem.Allocator,
@@ -45,7 +70,12 @@ pub const StorageEngine = struct {
// === Table Operations === // === Table Operations ===
pub fn createTable(self: *Self, table_name: []const u8, key_schema: []const types.KeySchemaElement, attribute_definitions: []const types.AttributeDefinition) StorageError!types.TableDescription { pub fn createTable(
self: *Self,
table_name: []const u8,
key_schema: []const types.KeySchemaElement,
attribute_definitions: []const types.AttributeDefinition,
) StorageError!types.TableDescription {
// Check if table already exists // Check if table already exists
const meta_key = try self.buildMetaKey(table_name); const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key); defer self.allocator.free(meta_key);
@@ -58,7 +88,22 @@ pub const StorageEngine = struct {
// Create table metadata // Create table metadata
const now = std.time.timestamp(); const now = std.time.timestamp();
const desc = types.TableDescription{
const metadata = TableMetadata{
.table_name = table_name,
.key_schema = key_schema,
.attribute_definitions = attribute_definitions,
.table_status = .ACTIVE,
.creation_date_time = now,
};
// Serialize and store with canonical format
const meta_value = try self.serializeTableMetadata(metadata);
defer self.allocator.free(meta_value);
self.db.put(meta_key, meta_value) catch return StorageError.RocksDBError;
return types.TableDescription{
.table_name = table_name, .table_name = table_name,
.key_schema = key_schema, .key_schema = key_schema,
.attribute_definitions = attribute_definitions, .attribute_definitions = attribute_definitions,
@@ -67,13 +112,6 @@ pub const StorageEngine = struct {
.item_count = 0, .item_count = 0,
.table_size_bytes = 0, .table_size_bytes = 0,
}; };
// Serialize and store
const meta_value = try self.serializeTableMetadata(desc);
defer self.allocator.free(meta_value);
self.db.put(meta_key, meta_value) catch return StorageError.RocksDBError;
return desc;
} }
pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void { pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void {
@@ -111,21 +149,47 @@ pub const StorageEngine = struct {
} }
pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription { pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription {
const meta_key = try self.buildMetaKey(table_name); var metadata = try self.getTableMetadata(table_name);
defer self.allocator.free(meta_key); defer metadata.deinit(self.allocator);
const meta_value = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; // Count items (expensive, but matches DynamoDB behavior)
if (meta_value == null) return StorageError.TableNotFound; const data_prefix = try self.buildDataPrefix(table_name);
defer self.allocator.free(meta_value.?); defer self.allocator.free(data_prefix);
return self.deserializeTableMetadata(meta_value.?); var item_count: u64 = 0;
var total_size: u64 = 0;
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
iter.seek(data_prefix);
while (iter.valid()) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, data_prefix)) break;
const value = iter.value() orelse break;
item_count += 1;
total_size += value.len;
iter.next();
}
return types.TableDescription{
.table_name = metadata.table_name,
.key_schema = metadata.key_schema,
.attribute_definitions = metadata.attribute_definitions,
.table_status = metadata.table_status,
.creation_date_time = metadata.creation_date_time,
.item_count = item_count,
.table_size_bytes = total_size,
};
} }
pub fn listTables(self: *Self) StorageError![][]const u8 { pub fn listTables(self: *Self) StorageError![][]const u8 {
var tables = std.ArrayList([]const u8){}; var tables = std.ArrayList([]const u8).init(self.allocator);
errdefer { errdefer {
for (tables.items) |t| self.allocator.free(t); for (tables.items) |t| self.allocator.free(t);
tables.deinit(self.allocator); tables.deinit();
} }
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
@@ -138,54 +202,130 @@ pub const StorageEngine = struct {
const table_name = key[KeyPrefix.meta.len..]; const table_name = key[KeyPrefix.meta.len..];
const owned_name = self.allocator.dupe(u8, table_name) catch return StorageError.OutOfMemory; const owned_name = self.allocator.dupe(u8, table_name) catch return StorageError.OutOfMemory;
tables.append(self.allocator, owned_name) catch return StorageError.OutOfMemory; tables.append(owned_name) catch return StorageError.OutOfMemory;
iter.next(); iter.next();
} }
return tables.toOwnedSlice(self.allocator) catch return StorageError.OutOfMemory; return tables.toOwnedSlice() catch return StorageError.OutOfMemory;
} }
// === Item Operations === // === Item Operations ===
pub fn putItem(self: *Self, table_name: []const u8, item_json: []const u8) StorageError!void { /// Store an item in the database
/// Item is serialized to canonical JSON before storage
pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void {
// Get table metadata to retrieve key schema
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Validate that item contains all required key attributes
for (metadata.key_schema) |key_elem| {
if (!item.contains(key_elem.attribute_name)) {
return StorageError.MissingKeyAttribute;
}
}
// Build storage key using the item and key schema
const storage_key = json.buildRocksDBKey(
self.allocator,
table_name,
metadata.key_schema,
item,
) catch |err| {
return switch (err) {
error.KeyValueContainsSeparator => StorageError.KeyValueContainsSeparator,
else => StorageError.InvalidKey,
};
};
defer self.allocator.free(storage_key);
// Serialize item to canonical JSON for storage
const item_json = json.serializeItem(self.allocator, item) catch return StorageError.SerializationError;
defer self.allocator.free(item_json);
// Store the canonical JSON
self.db.put(storage_key, item_json) catch return StorageError.RocksDBError;
}
/// Retrieve an item from the database
/// Returns a parsed Item (not JSON string)
pub fn getItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!?types.Item {
// Get table metadata
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Validate key has all required attributes
for (metadata.key_schema) |key_elem| {
if (!key.contains(key_elem.attribute_name)) {
return StorageError.MissingKeyAttribute;
}
}
// Build storage key
const storage_key = json.buildRocksDBKey(
self.allocator,
table_name,
metadata.key_schema,
key,
) catch |err| {
return switch (err) {
error.KeyValueContainsSeparator => StorageError.KeyValueContainsSeparator,
else => StorageError.InvalidKey,
};
};
defer self.allocator.free(storage_key);
const item_json = self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError;
if (item_json == null) return null;
defer self.allocator.free(item_json.?);
// Parse the stored JSON back into an Item
return json.parseItem(self.allocator, item_json.?) catch return StorageError.SerializationError;
}
pub fn deleteItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!void {
// Get table metadata
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Validate key
for (metadata.key_schema) |key_elem| {
if (!key.contains(key_elem.attribute_name)) {
return StorageError.MissingKeyAttribute;
}
}
// Build storage key
const storage_key = json.buildRocksDBKey(
self.allocator,
table_name,
metadata.key_schema,
key,
) catch |err| {
return switch (err) {
error.KeyValueContainsSeparator => StorageError.KeyValueContainsSeparator,
else => StorageError.InvalidKey,
};
};
defer self.allocator.free(storage_key);
self.db.delete(storage_key) catch return StorageError.RocksDBError;
}
/// Scan a table and return parsed Items (not JSON strings)
pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![]types.Item {
// Verify table exists // Verify table exists
const meta_key = try self.buildMetaKey(table_name); var metadata = try self.getTableMetadata(table_name);
defer self.allocator.free(meta_key); defer metadata.deinit(self.allocator);
const meta = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (meta == null) return StorageError.TableNotFound;
defer self.allocator.free(meta.?);
// Extract key from item (simplified - assumes key is extractable from JSON)
const item_key = try self.extractKeyFromItem(table_name, item_json);
defer self.allocator.free(item_key);
self.db.put(item_key, item_json) catch return StorageError.RocksDBError;
}
pub fn getItem(self: *Self, table_name: []const u8, key_json: []const u8) StorageError!?[]u8 {
const item_key = try self.buildItemKey(table_name, key_json);
defer self.allocator.free(item_key);
return self.db.get(self.allocator, item_key) catch return StorageError.RocksDBError;
}
pub fn deleteItem(self: *Self, table_name: []const u8, key_json: []const u8) StorageError!void {
const item_key = try self.buildItemKey(table_name, key_json);
defer self.allocator.free(item_key);
self.db.delete(item_key) catch return StorageError.RocksDBError;
}
pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![][]const u8 {
const data_prefix = try self.buildDataPrefix(table_name); const data_prefix = try self.buildDataPrefix(table_name);
defer self.allocator.free(data_prefix); defer self.allocator.free(data_prefix);
var items = std.ArrayList([]const u8){}; var items = std.ArrayList(types.Item).init(self.allocator);
errdefer { errdefer {
for (items.items) |item| self.allocator.free(item); for (items.items) |*item| json.deinitItem(item, self.allocator);
items.deinit(self.allocator); items.deinit();
} }
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
@@ -200,25 +340,35 @@ pub const StorageEngine = struct {
if (!std.mem.startsWith(u8, key, data_prefix)) break; if (!std.mem.startsWith(u8, key, data_prefix)) break;
const value = iter.value() orelse break; const value = iter.value() orelse break;
const owned_value = self.allocator.dupe(u8, value) catch return StorageError.OutOfMemory;
items.append(self.allocator, owned_value) catch return StorageError.OutOfMemory; // Parse the stored JSON into an Item
const item = json.parseItem(self.allocator, value) catch {
iter.next();
continue;
};
items.append(item) catch return StorageError.OutOfMemory;
count += 1; count += 1;
iter.next(); iter.next();
} }
return items.toOwnedSlice(self.allocator) catch return StorageError.OutOfMemory; return items.toOwnedSlice() catch return StorageError.OutOfMemory;
} }
pub fn query(self: *Self, table_name: []const u8, partition_key: []const u8, limit: ?usize) StorageError![][]const u8 { /// Query items by partition key and return parsed Items
pub fn query(self: *Self, table_name: []const u8, partition_key_value: []const u8, limit: ?usize) StorageError![]types.Item {
// Verify table exists
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Build prefix for this partition // Build prefix for this partition
const prefix = try self.buildPartitionPrefix(table_name, partition_key); const prefix = try self.buildPartitionPrefix(table_name, partition_key_value);
defer self.allocator.free(prefix); defer self.allocator.free(prefix);
var items = std.ArrayList([]const u8){}; var items = std.ArrayList(types.Item).init(self.allocator);
errdefer { errdefer {
for (items.items) |item| self.allocator.free(item); for (items.items) |*item| json.deinitItem(item, self.allocator);
items.deinit(self.allocator); items.deinit();
} }
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
@@ -233,17 +383,33 @@ pub const StorageEngine = struct {
if (!std.mem.startsWith(u8, key, prefix)) break; if (!std.mem.startsWith(u8, key, prefix)) break;
const value = iter.value() orelse break; const value = iter.value() orelse break;
const owned_value = self.allocator.dupe(u8, value) catch return StorageError.OutOfMemory;
items.append(self.allocator, owned_value) catch return StorageError.OutOfMemory; // Parse the stored JSON into an Item
const item = json.parseItem(self.allocator, value) catch {
iter.next();
continue;
};
items.append(item) catch return StorageError.OutOfMemory;
count += 1; count += 1;
iter.next(); iter.next();
} }
return items.toOwnedSlice(self.allocator) catch return StorageError.OutOfMemory; return items.toOwnedSlice() catch return StorageError.OutOfMemory;
} }
// === Key Building Helpers === // === Internal Helpers ===
fn getTableMetadata(self: *Self, table_name: []const u8) StorageError!TableMetadata {
const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key);
const meta_value = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (meta_value == null) return StorageError.TableNotFound;
defer self.allocator.free(meta_value.?);
return self.deserializeTableMetadata(meta_value.?);
}
fn buildMetaKey(self: *Self, table_name: []const u8) StorageError![]u8 { fn buildMetaKey(self: *Self, table_name: []const u8) StorageError![]u8 {
return std.fmt.allocPrint(self.allocator, "{s}{s}", .{ KeyPrefix.meta, table_name }) catch return StorageError.OutOfMemory; return std.fmt.allocPrint(self.allocator, "{s}{s}", .{ KeyPrefix.meta, table_name }) catch return StorageError.OutOfMemory;
@@ -257,98 +423,174 @@ pub const StorageEngine = struct {
return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}", .{ KeyPrefix.data, table_name, partition_key }) catch return StorageError.OutOfMemory; return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}", .{ KeyPrefix.data, table_name, partition_key }) catch return StorageError.OutOfMemory;
} }
fn buildItemKey(self: *Self, table_name: []const u8, key_json: []const u8) StorageError![]u8 { // === Serialization ===
// Parse the key JSON to extract partition key (and sort key if present)
// For now, use simplified key extraction
const pk = extractStringValue(key_json, "pk") orelse extractStringValue(key_json, "PK") orelse return StorageError.InvalidKey;
const sk = extractStringValue(key_json, "sk") orelse extractStringValue(key_json, "SK");
if (sk) |sort_key| { fn serializeTableMetadata(self: *Self, metadata: TableMetadata) StorageError![]u8 {
return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}:{s}", .{ KeyPrefix.data, table_name, pk, sort_key }) catch return StorageError.OutOfMemory; var buf = std.ArrayList(u8).init(self.allocator);
} else { errdefer buf.deinit();
return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}", .{ KeyPrefix.data, table_name, pk }) catch return StorageError.OutOfMemory; const writer = buf.writer();
}
}
fn extractKeyFromItem(self: *Self, table_name: []const u8, item_json: []const u8) StorageError![]u8 { writer.writeAll("{\"TableName\":\"") catch return StorageError.SerializationError;
return self.buildItemKey(table_name, item_json); writer.writeAll(metadata.table_name) catch return StorageError.SerializationError;
} writer.writeAll("\",\"TableStatus\":\"") catch return StorageError.SerializationError;
writer.writeAll(metadata.table_status.toString()) catch return StorageError.SerializationError;
writer.print("\",\"CreationDateTime\":{d},\"KeySchema\":[", .{metadata.creation_date_time}) catch return StorageError.SerializationError;
// === Serialization Helpers === for (metadata.key_schema, 0..) |ks, i| {
fn serializeTableMetadata(self: *Self, desc: types.TableDescription) StorageError![]u8 {
var buf = std.ArrayList(u8){};
errdefer buf.deinit(self.allocator);
const writer = buf.writer(self.allocator);
writer.print("{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"CreationDateTime\":{d},\"ItemCount\":{d},\"TableSizeBytes\":{d},\"KeySchema\":[", .{
desc.table_name,
desc.table_status.toString(),
desc.creation_date_time,
desc.item_count,
desc.table_size_bytes,
}) catch return StorageError.SerializationError;
for (desc.key_schema, 0..) |ks, i| {
if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError; if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError;
writer.print("{{\"AttributeName\":\"{s}\",\"KeyType\":\"{s}\"}}", .{ writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError;
ks.attribute_name, writer.writeAll(ks.attribute_name) catch return StorageError.SerializationError;
ks.key_type.toString(), writer.writeAll("\",\"KeyType\":\"") catch return StorageError.SerializationError;
}) catch return StorageError.SerializationError; writer.writeAll(ks.key_type.toString()) catch return StorageError.SerializationError;
writer.writeAll("\"}") catch return StorageError.SerializationError;
} }
writer.writeAll("],\"AttributeDefinitions\":[") catch return StorageError.SerializationError; writer.writeAll("],\"AttributeDefinitions\":[") catch return StorageError.SerializationError;
for (desc.attribute_definitions, 0..) |ad, i| { for (metadata.attribute_definitions, 0..) |ad, i| {
if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError; if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError;
writer.print("{{\"AttributeName\":\"{s}\",\"AttributeType\":\"{s}\"}}", .{ writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError;
ad.attribute_name, writer.writeAll(ad.attribute_name) catch return StorageError.SerializationError;
ad.attribute_type.toString(), writer.writeAll("\",\"AttributeType\":\"") catch return StorageError.SerializationError;
}) catch return StorageError.SerializationError; writer.writeAll(ad.attribute_type.toString()) catch return StorageError.SerializationError;
writer.writeAll("\"}") catch return StorageError.SerializationError;
} }
writer.writeAll("]}") catch return StorageError.SerializationError; writer.writeAll("]}") catch return StorageError.SerializationError;
return buf.toOwnedSlice(self.allocator) catch return StorageError.OutOfMemory; return buf.toOwnedSlice() catch return StorageError.OutOfMemory;
} }
fn deserializeTableMetadata(self: *Self, data: []const u8) StorageError!types.TableDescription { fn deserializeTableMetadata(self: *Self, data: []const u8) StorageError!TableMetadata {
// Simplified deserialization - in production, use proper JSON parsing const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, data, .{}) catch return StorageError.SerializationError;
_ = self; defer parsed.deinit();
const table_name = extractStringValue(data, "TableName") orelse return StorageError.SerializationError; const root = switch (parsed.value) {
const status_str = extractStringValue(data, "TableStatus") orelse "ACTIVE"; .object => |o| o,
else => return StorageError.SerializationError,
};
const status: types.TableStatus = if (std.mem.eql(u8, status_str, "ACTIVE")) // Extract table name
const table_name_val = root.get("TableName") orelse return StorageError.SerializationError;
const table_name_str = switch (table_name_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const table_name = self.allocator.dupe(u8, table_name_str) catch return StorageError.OutOfMemory;
errdefer self.allocator.free(table_name);
// Extract table status
const status_val = root.get("TableStatus") orelse return StorageError.SerializationError;
const status_str = switch (status_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const table_status: types.TableStatus = if (std.mem.eql(u8, status_str, "ACTIVE"))
.ACTIVE .ACTIVE
else if (std.mem.eql(u8, status_str, "CREATING")) else if (std.mem.eql(u8, status_str, "CREATING"))
.CREATING .CREATING
else if (std.mem.eql(u8, status_str, "DELETING"))
.DELETING
else else
.ACTIVE; .ACTIVE;
return types.TableDescription{ // Extract creation time
const creation_val = root.get("CreationDateTime") orelse return StorageError.SerializationError;
const creation_date_time = switch (creation_val) {
.integer => |i| i,
else => return StorageError.SerializationError,
};
// Extract key schema
const key_schema_val = root.get("KeySchema") orelse return StorageError.SerializationError;
const key_schema_array = switch (key_schema_val) {
.array => |a| a,
else => return StorageError.SerializationError,
};
var key_schema = std.ArrayList(types.KeySchemaElement).init(self.allocator);
errdefer {
for (key_schema.items) |ks| self.allocator.free(ks.attribute_name);
key_schema.deinit();
}
for (key_schema_array.items) |item| {
const obj = switch (item) {
.object => |o| o,
else => return StorageError.SerializationError,
};
const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError;
const attr_name_str = switch (attr_name_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const attr_name = self.allocator.dupe(u8, attr_name_str) catch return StorageError.OutOfMemory;
errdefer self.allocator.free(attr_name);
const key_type_val = obj.get("KeyType") orelse return StorageError.SerializationError;
const key_type_str = switch (key_type_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const key_type = types.KeyType.fromString(key_type_str) orelse return StorageError.SerializationError;
key_schema.append(.{
.attribute_name = attr_name,
.key_type = key_type,
}) catch return StorageError.OutOfMemory;
}
// Extract attribute definitions
const attr_defs_val = root.get("AttributeDefinitions") orelse return StorageError.SerializationError;
const attr_defs_array = switch (attr_defs_val) {
.array => |a| a,
else => return StorageError.SerializationError,
};
var attr_defs = std.ArrayList(types.AttributeDefinition).init(self.allocator);
errdefer {
for (attr_defs.items) |ad| self.allocator.free(ad.attribute_name);
attr_defs.deinit();
}
for (attr_defs_array.items) |item| {
const obj = switch (item) {
.object => |o| o,
else => return StorageError.SerializationError,
};
const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError;
const attr_name_str = switch (attr_name_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const attr_name = self.allocator.dupe(u8, attr_name_str) catch return StorageError.OutOfMemory;
errdefer self.allocator.free(attr_name);
const attr_type_val = obj.get("AttributeType") orelse return StorageError.SerializationError;
const attr_type_str = switch (attr_type_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const attr_type = types.ScalarAttributeType.fromString(attr_type_str) orelse return StorageError.SerializationError;
attr_defs.append(.{
.attribute_name = attr_name,
.attribute_type = attr_type,
}) catch return StorageError.OutOfMemory;
}
return TableMetadata{
.table_name = table_name, .table_name = table_name,
.key_schema = &[_]types.KeySchemaElement{}, .key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory,
.attribute_definitions = &[_]types.AttributeDefinition{}, .attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory,
.table_status = status, .table_status = table_status,
.creation_date_time = 0, .creation_date_time = creation_date_time,
.item_count = 0,
.table_size_bytes = 0,
}; };
} }
}; };
/// Simple JSON string value extraction (production code should use std.json)
fn extractStringValue(json_data: []const u8, key: []const u8) ?[]const u8 {
// Look for "key":"value" pattern
var search_buf: [256]u8 = undefined;
const search_pattern = std.fmt.bufPrint(&search_buf, "\"{s}\":\"", .{key}) catch return null;
const start = std.mem.indexOf(u8, json_data, search_pattern) orelse return null;
const value_start = start + search_pattern.len;
const value_end = std.mem.indexOfPos(u8, json_data, value_start, "\"") orelse return null;
return json_data[value_start..value_end];
}
test "storage basic operations" { test "storage basic operations" {
const allocator = std.testing.allocator; const allocator = std.testing.allocator;
@@ -385,3 +627,102 @@ test "storage basic operations" {
defer allocator.free(tables2); defer allocator.free(tables2);
try std.testing.expectEqual(@as(usize, 0), tables2.len); try std.testing.expectEqual(@as(usize, 0), tables2.len);
} }
test "putItem and getItem with typed Items" {
const allocator = std.testing.allocator;
const path = "/tmp/test_storage_typed";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
_ = try engine.createTable("Users", &key_schema, &attr_defs);
// Create and put item
const item_json = "{\"pk\":{\"S\":\"user123\"},\"name\":{\"S\":\"Alice\"}}";
var item = try json.parseItem(allocator, item_json);
defer json.deinitItem(&item, allocator);
try engine.putItem("Users", item);
// Get item back
const key_json = "{\"pk\":{\"S\":\"user123\"}}";
var key = try json.parseItem(allocator, key_json);
defer json.deinitItem(&key, allocator);
const retrieved = try engine.getItem("Users", key);
try std.testing.expect(retrieved != null);
defer if (retrieved) |*r| json.deinitItem(r, allocator);
const pk = retrieved.?.get("pk").?;
try std.testing.expectEqualStrings("user123", pk.S);
}
test "putItem validates key presence" {
const allocator = std.testing.allocator;
const path = "/tmp/test_storage_validate";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "userId", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "userId", .attribute_type = .S },
};
_ = try engine.createTable("Users", &key_schema, &attr_defs);
// This should fail - missing userId
const bad_item_json = "{\"name\":{\"S\":\"Alice\"}}";
var bad_item = try json.parseItem(allocator, bad_item_json);
defer json.deinitItem(&bad_item, allocator);
const result = engine.putItem("Users", bad_item);
try std.testing.expectError(StorageError.MissingKeyAttribute, result);
// This should succeed
const good_item_json = "{\"userId\":{\"S\":\"user123\"},\"name\":{\"S\":\"Alice\"}}";
var good_item = try json.parseItem(allocator, good_item_json);
defer json.deinitItem(&good_item, allocator);
try engine.putItem("Users", good_item);
}
test "reject key with separator" {
const allocator = std.testing.allocator;
const path = "/tmp/test_storage_separator";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
_ = try engine.createTable("Users", &key_schema, &attr_defs);
// This should fail - pk contains ':'
const bad_item_json = "{\"pk\":{\"S\":\"user:123\"},\"data\":{\"S\":\"test\"}}";
var bad_item = try json.parseItem(allocator, bad_item_json);
defer json.deinitItem(&bad_item, allocator);
const result = engine.putItem("Users", bad_item);
try std.testing.expectError(StorageError.KeyValueContainsSeparator, result);
}