diff --git a/src/bench.zig b/src/bench.zig index 1f6dc17..0f84ffc 100644 --- a/src/bench.zig +++ b/src/bench.zig @@ -3,6 +3,7 @@ const std = @import("std"); const rocksdb = @import("rocksdb.zig"); const storage = @import("dynamodb/storage.zig"); const types = @import("dynamodb/types.zig"); +const json = @import("dynamodb/json.zig"); const BenchResult = struct { name: []const u8, @@ -23,18 +24,6 @@ const BenchResult = struct { } }; -fn runBench(name: []const u8, ops: u64, func: anytype) BenchResult { - const start = std.time.nanoTimestamp(); - func(); - const end = std.time.nanoTimestamp(); - - return BenchResult{ - .name = name, - .ops = ops, - .duration_ns = @intCast(end - start), - }; -} - pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); @@ -45,33 +34,27 @@ pub fn main() !void { std.debug.print(" ZynamoDB Performance Benchmarks\n", .{}); std.debug.print("=" ** 70 ++ "\n\n", .{}); - // Setup const path = "/tmp/bench_zynamodb"; defer std.fs.deleteTreeAbsolute(path) catch {}; - // Raw RocksDB benchmarks std.debug.print("RocksDB Raw Operations:\n", .{}); std.debug.print("-" ** 70 ++ "\n", .{}); - try benchRocksDBWrites(allocator, path); - try benchRocksDBReads(allocator, path); - try benchRocksDBBatch(allocator, path); - try benchRocksDBScan(allocator, path); + try benchRocksDBWrites(allocator); + try benchRocksDBReads(allocator); + try benchRocksDBBatch(allocator); - std.debug.print("\n", .{}); - - // Storage engine benchmarks - std.debug.print("Storage Engine Operations:\n", .{}); + std.debug.print("\nStorage Engine Operations:\n", .{}); std.debug.print("-" ** 70 ++ "\n", .{}); - try benchStoragePutItem(allocator, path); - try benchStorageGetItem(allocator, path); - try benchStorageScan(allocator, path); + try benchStoragePutItem(allocator); + try benchStorageGetItem(allocator); + try benchStorageScan(allocator); std.debug.print("\n" ++ "=" ** 70 ++ "\n", .{}); } -fn benchRocksDBWrites(allocator: std.mem.Allocator, base_path: []const u8) !void { +fn benchRocksDBWrites(allocator: std.mem.Allocator) !void { _ = allocator; const path = "/tmp/bench_rocksdb_writes"; defer std.fs.deleteTreeAbsolute(path) catch {}; @@ -83,29 +66,26 @@ fn benchRocksDBWrites(allocator: std.mem.Allocator, base_path: []const u8) !void var key_buf: [32]u8 = undefined; var val_buf: [256]u8 = undefined; - const result = runBench("Sequential Writes", ops, struct { - fn run(d: *rocksdb.DB, kb: *[32]u8, vb: *[256]u8, n: u64) void { - var i: u64 = 0; - while (i < n) : (i += 1) { - const key = std.fmt.bufPrint(kb, "key_{d:0>10}", .{i}) catch continue; - const val = std.fmt.bufPrint(vb, "value_{d}_padding_data_to_make_it_realistic", .{i}) catch continue; - d.put(key, val) catch {}; - } - } - }.run, .{ &db, &key_buf, &val_buf, ops }); + const start = std.time.nanoTimestamp(); + var i: u64 = 0; + while (i < ops) : (i += 1) { + const key = std.fmt.bufPrint(&key_buf, "key_{d:0>10}", .{i}) catch continue; + const val = std.fmt.bufPrint(&val_buf, "value_{d}_padding_data", .{i}) catch continue; + db.put(key, val) catch {}; + } + const end = std.time.nanoTimestamp(); - _ = base_path; + const result = BenchResult{ .name = "Sequential Writes", .ops = ops, .duration_ns = @intCast(end - start) }; result.print(); } -fn benchRocksDBReads(allocator: std.mem.Allocator, base_path: []const u8) !void { +fn benchRocksDBReads(allocator: std.mem.Allocator) !void { const path = "/tmp/bench_rocksdb_reads"; defer std.fs.deleteTreeAbsolute(path) catch {}; var db = try rocksdb.DB.open(path, true); defer db.close(); - // First write some data var key_buf: [32]u8 = undefined; var val_buf: [256]u8 = undefined; @@ -117,27 +97,24 @@ fn benchRocksDBReads(allocator: std.mem.Allocator, base_path: []const u8) !void try db.put(key, val); } - // Now benchmark reads var prng = std.Random.DefaultPrng.init(12345); const random = prng.random(); - const result = runBench("Random Reads", ops, struct { - fn run(d: *rocksdb.DB, alloc: std.mem.Allocator, kb: *[32]u8, r: std.Random, n: u64) void { - var j: u64 = 0; - while (j < n) : (j += 1) { - const idx = r.intRangeAtMost(u64, 0, n - 1); - const key = std.fmt.bufPrint(kb, "key_{d:0>10}", .{idx}) catch continue; - const val = d.get(alloc, key) catch continue; - if (val) |v| alloc.free(v); - } - } - }.run, .{ &db, allocator, &key_buf, random, ops }); + const start = std.time.nanoTimestamp(); + var j: u64 = 0; + while (j < ops) : (j += 1) { + const idx = random.intRangeAtMost(u64, 0, ops - 1); + const key = std.fmt.bufPrint(&key_buf, "key_{d:0>10}", .{idx}) catch continue; + const val = db.get(allocator, key) catch continue; + if (val) |v| allocator.free(v); + } + const end = std.time.nanoTimestamp(); - _ = base_path; + const result = BenchResult{ .name = "Random Reads", .ops = ops, .duration_ns = @intCast(end - start) }; result.print(); } -fn benchRocksDBBatch(allocator: std.mem.Allocator, base_path: []const u8) !void { +fn benchRocksDBBatch(allocator: std.mem.Allocator) !void { _ = allocator; const path = "/tmp/bench_rocksdb_batch"; defer std.fs.deleteTreeAbsolute(path) catch {}; @@ -149,132 +126,85 @@ fn benchRocksDBBatch(allocator: std.mem.Allocator, base_path: []const u8) !void var key_buf: [32]u8 = undefined; var val_buf: [256]u8 = undefined; - const result = runBench("Batch Writes", ops, struct { - fn run(d: *rocksdb.DB, kb: *[32]u8, vb: *[256]u8, n: u64) void { - var batch = rocksdb.WriteBatch.init() orelse return; - defer batch.deinit(); + const start = std.time.nanoTimestamp(); + var batch = rocksdb.WriteBatch.init() orelse return; + defer batch.deinit(); - var i: u64 = 0; - while (i < n) : (i += 1) { - const key = std.fmt.bufPrint(kb, "batch_key_{d:0>10}", .{i}) catch continue; - const val = std.fmt.bufPrint(vb, "batch_value_{d}", .{i}) catch continue; - batch.put(key, val); - } - - batch.write(d) catch {}; - } - }.run, .{ &db, &key_buf, &val_buf, ops }); - - _ = base_path; - result.print(); -} - -fn benchRocksDBScan(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = allocator; - const path = "/tmp/bench_rocksdb_scan"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var db = try rocksdb.DB.open(path, true); - defer db.close(); - - // Write data - var key_buf: [32]u8 = undefined; - var val_buf: [256]u8 = undefined; - - const ops: u64 = 10000; var i: u64 = 0; while (i < ops) : (i += 1) { - const key = try std.fmt.bufPrint(&key_buf, "scan_key_{d:0>10}", .{i}); - const val = try std.fmt.bufPrint(&val_buf, "scan_value_{d}", .{i}); - try db.put(key, val); + const key = std.fmt.bufPrint(&key_buf, "batch_key_{d:0>10}", .{i}) catch continue; + const val = std.fmt.bufPrint(&val_buf, "batch_value_{d}", .{i}) catch continue; + batch.put(key, val); } + batch.write(&db) catch {}; + const end = std.time.nanoTimestamp(); - const result = runBench("Full Scan", ops, struct { - fn run(d: *rocksdb.DB, n: u64) void { - _ = n; - var iter = rocksdb.Iterator.init(d) orelse return; - defer iter.deinit(); - - iter.seekToFirst(); - var count: u64 = 0; - while (iter.valid()) { - _ = iter.key(); - _ = iter.value(); - count += 1; - iter.next(); - } - } - }.run, .{ &db, ops }); - - _ = base_path; + const result = BenchResult{ .name = "Batch Writes", .ops = ops, .duration_ns = @intCast(end - start) }; result.print(); } -fn benchStoragePutItem(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = base_path; +fn benchStoragePutItem(allocator: std.mem.Allocator) !void { const path = "/tmp/bench_storage_put"; defer std.fs.deleteTreeAbsolute(path) catch {}; var engine = try storage.StorageEngine.init(allocator, path); defer engine.deinit(); - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; + const key_schema = [_]types.KeySchemaElement{.{ .attribute_name = "pk", .key_type = .HASH }}; + const attr_defs = [_]types.AttributeDefinition{.{ .attribute_name = "pk", .attribute_type = .S }}; _ = try engine.createTable("BenchTable", &key_schema, &attr_defs); const ops: u64 = 5000; - var item_buf: [512]u8 = undefined; const start = std.time.nanoTimestamp(); var i: u64 = 0; while (i < ops) : (i += 1) { - const item = std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"name\":{{\"S\":\"User {d}\"}},\"email\":{{\"S\":\"user{d}@example.com\"}}}}", .{ i, i, i }) catch continue; + var item = types.Item.init(allocator); + defer json.deinitItem(&item, allocator); + + var pk_buf: [32]u8 = undefined; + const pk_str = std.fmt.bufPrint(&pk_buf, "user{d:0>10}", .{i}) catch continue; + const pk_owned = allocator.dupe(u8, pk_str) catch continue; + + const pk_name = allocator.dupe(u8, "pk") catch continue; + item.put(pk_name, types.AttributeValue{ .S = pk_owned }) catch continue; + engine.putItem("BenchTable", item) catch {}; } const end = std.time.nanoTimestamp(); - const result = BenchResult{ - .name = "PutItem", - .ops = ops, - .duration_ns = @intCast(end - start), - }; + const result = BenchResult{ .name = "PutItem", .ops = ops, .duration_ns = @intCast(end - start) }; result.print(); } -fn benchStorageGetItem(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = base_path; +fn benchStorageGetItem(allocator: std.mem.Allocator) !void { const path = "/tmp/bench_storage_get"; defer std.fs.deleteTreeAbsolute(path) catch {}; var engine = try storage.StorageEngine.init(allocator, path); defer engine.deinit(); - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; + const key_schema = [_]types.KeySchemaElement{.{ .attribute_name = "pk", .key_type = .HASH }}; + const attr_defs = [_]types.AttributeDefinition{.{ .attribute_name = "pk", .attribute_type = .S }}; _ = try engine.createTable("BenchTable", &key_schema, &attr_defs); - // Write data first const ops: u64 = 5000; - var item_buf: [512]u8 = undefined; - var key_buf: [128]u8 = undefined; - var i: u64 = 0; while (i < ops) : (i += 1) { - const item = try std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"data\":{{\"S\":\"test\"}}}}", .{i}); - try engine.putItem("BenchTable", item); + var item = types.Item.init(allocator); + defer json.deinitItem(&item, allocator); + + var pk_buf: [32]u8 = undefined; + const pk_str = std.fmt.bufPrint(&pk_buf, "user{d:0>10}", .{i}) catch continue; + const pk_owned = allocator.dupe(u8, pk_str) catch continue; + const pk_name = allocator.dupe(u8, "pk") catch continue; + item.put(pk_name, types.AttributeValue{ .S = pk_owned }) catch continue; + + engine.putItem("BenchTable", item) catch {}; } - // Benchmark reads var prng = std.Random.DefaultPrng.init(12345); const random = prng.random(); @@ -282,60 +212,61 @@ fn benchStorageGetItem(allocator: std.mem.Allocator, base_path: []const u8) !voi i = 0; while (i < ops) : (i += 1) { const idx = random.intRangeAtMost(u64, 0, ops - 1); - const key = std.fmt.bufPrint(&key_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}}}}", .{idx}) catch continue; - const item = engine.getItem("BenchTable", key) catch continue; - if (item) |v| allocator.free(v); + + var key_item = types.Item.init(allocator); + defer json.deinitItem(&key_item, allocator); + + var pk_buf: [32]u8 = undefined; + const pk_str = std.fmt.bufPrint(&pk_buf, "user{d:0>10}", .{idx}) catch continue; + const pk_owned = allocator.dupe(u8, pk_str) catch continue; + const pk_name = allocator.dupe(u8, "pk") catch continue; + key_item.put(pk_name, types.AttributeValue{ .S = pk_owned }) catch continue; + + const item = engine.getItem("BenchTable", key_item) catch continue; + if (item) |it| { + var it_mut = it; + json.deinitItem(&it_mut, allocator); + } } const end = std.time.nanoTimestamp(); - const result = BenchResult{ - .name = "GetItem", - .ops = ops, - .duration_ns = @intCast(end - start), - }; + const result = BenchResult{ .name = "GetItem", .ops = ops, .duration_ns = @intCast(end - start) }; result.print(); } -fn benchStorageScan(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = base_path; +fn benchStorageScan(allocator: std.mem.Allocator) !void { const path = "/tmp/bench_storage_scan"; defer std.fs.deleteTreeAbsolute(path) catch {}; var engine = try storage.StorageEngine.init(allocator, path); defer engine.deinit(); - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; + const key_schema = [_]types.KeySchemaElement{.{ .attribute_name = "pk", .key_type = .HASH }}; + const attr_defs = [_]types.AttributeDefinition{.{ .attribute_name = "pk", .attribute_type = .S }}; _ = try engine.createTable("BenchTable", &key_schema, &attr_defs); - // Write data first const ops: u64 = 5000; - var item_buf: [512]u8 = undefined; - var i: u64 = 0; while (i < ops) : (i += 1) { - const item = try std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"data\":{{\"S\":\"test\"}}}}", .{i}); - try engine.putItem("BenchTable", item); + var item = types.Item.init(allocator); + defer json.deinitItem(&item, allocator); + + var pk_buf: [32]u8 = undefined; + const pk_str = std.fmt.bufPrint(&pk_buf, "user{d:0>10}", .{i}) catch continue; + const pk_owned = allocator.dupe(u8, pk_str) catch continue; + const pk_name = allocator.dupe(u8, "pk") catch continue; + item.put(pk_name, types.AttributeValue{ .S = pk_owned }) catch continue; + + engine.putItem("BenchTable", item) catch {}; } - // Benchmark scan const start = std.time.nanoTimestamp(); - const items = try engine.scan("BenchTable", null); + var result_scan = try engine.scan("BenchTable", null, null); const end = std.time.nanoTimestamp(); - // Cleanup - for (items) |item| allocator.free(item); - allocator.free(items); + result_scan.deinit(allocator); - const result = BenchResult{ - .name = "Scan (full table)", - .ops = ops, - .duration_ns = @intCast(end - start), - }; + const result = BenchResult{ .name = "Scan (full table)", .ops = ops, .duration_ns = @intCast(end - start) }; result.print(); } diff --git a/src/dynamodb/expression.zig b/src/dynamodb/expression.zig new file mode 100644 index 0000000..d7a198f --- /dev/null +++ b/src/dynamodb/expression.zig @@ -0,0 +1,491 @@ +/// DynamoDB Expression Parser +/// Parses KeyConditionExpression, FilterExpression, ProjectionExpression, etc. +/// Replaces the temporary string-search hack with proper expression parsing. +const std = @import("std"); +const types = @import("types.zig"); +const json_module = @import("json.zig"); + +// ============================================================================ +// Key Condition Expression Parsing +// ============================================================================ + +/// Parsed key condition for Query operations +pub const KeyCondition = struct { + /// Partition key attribute name (from ExpressionAttributeNames or direct) + pk_name: []const u8, + /// Partition key value (owned) + pk_value: types.AttributeValue, + /// Sort key condition (optional) + sk_condition: ?SortKeyCondition, + + pub fn deinit(self: *KeyCondition, allocator: std.mem.Allocator) void { + json_module.deinitAttributeValue(&self.pk_value, allocator); + if (self.sk_condition) |*sk| { + sk.deinit(allocator); + } + } + + /// Get the raw partition key value bytes (for building storage keys) + pub fn getPkBytes(self: *const KeyCondition) ![]const u8 { + return switch (self.pk_value) { + .S => |s| s, + .N => |n| n, + .B => |b| b, + else => error.InvalidKeyType, + }; + } +}; + +/// Sort key condition operators +pub const SortKeyOperator = enum { + EQ, // = + LT, // < + LE, // <= + GT, // > + GE, // >= + BETWEEN, // BETWEEN x AND y + BEGINS_WITH, // begins_with(sk, prefix) +}; + +/// Parsed sort key condition +pub const SortKeyCondition = struct { + /// Sort key attribute name + sk_name: []const u8, + /// Comparison operator + operator: SortKeyOperator, + /// Primary value (or lower bound for BETWEEN) - owned + value: types.AttributeValue, + /// Upper bound for BETWEEN operator - owned + value2: ?types.AttributeValue, + + pub fn deinit(self: *SortKeyCondition, allocator: std.mem.Allocator) void { + json_module.deinitAttributeValue(&self.value, allocator); + if (self.value2) |*v2| { + json_module.deinitAttributeValue(v2, allocator); + } + } +}; + +/// Parse a KeyConditionExpression with ExpressionAttributeNames and ExpressionAttributeValues +/// Returns owned KeyCondition - caller must call deinit() +/// +/// Supported formats: +/// - "pk = :pk" +/// - "#pk = :pk" +/// - "pk = :pk AND sk = :sk" +/// - "pk = :pk AND sk > :sk" +/// - "pk = :pk AND sk BETWEEN :sk1 AND :sk2" +/// - "pk = :pk AND begins_with(sk, :prefix)" +pub fn parseKeyConditionExpression( + allocator: std.mem.Allocator, + expression: []const u8, + attribute_names: ?std.StringHashMap([]const u8), + attribute_values: std.StringHashMap(types.AttributeValue), +) !KeyCondition { + var tokenizer = Tokenizer.init(expression); + + // Parse partition key condition: pk_name = :pk_value + const pk_name_token = tokenizer.nextToken() orelse return error.InvalidExpression; + const pk_name = resolveAttributeName(pk_name_token, attribute_names) orelse return error.InvalidExpression; + + const eq_token = tokenizer.nextToken() orelse return error.InvalidExpression; + if (!std.mem.eql(u8, eq_token, "=")) return error.InvalidExpression; + + const pk_value_token = tokenizer.nextToken() orelse return error.InvalidExpression; + var pk_value = try resolveAttributeValue(allocator, pk_value_token, attribute_values); + errdefer json_module.deinitAttributeValue(&pk_value, allocator); + + // Check for AND (sort key condition) + var sk_condition: ?SortKeyCondition = null; + if (tokenizer.nextToken()) |and_token| { + if (!std.ascii.eqlIgnoreCase(and_token, "AND")) { + return error.InvalidExpression; + } + + sk_condition = try parseSortKeyCondition(allocator, &tokenizer, attribute_names, attribute_values); + } + + return KeyCondition{ + .pk_name = pk_name, + .pk_value = pk_value, + .sk_condition = sk_condition, + }; +} + +fn parseSortKeyCondition( + allocator: std.mem.Allocator, + tokenizer: *Tokenizer, + attribute_names: ?std.StringHashMap([]const u8), + attribute_values: std.StringHashMap(types.AttributeValue), +) !SortKeyCondition { + const first_token = tokenizer.nextToken() orelse return error.InvalidExpression; + + // Check for begins_with(sk, :value) + if (std.ascii.eqlIgnoreCase(first_token, "begins_with")) { + return try parseBeginsWith(allocator, tokenizer, attribute_names, attribute_values); + } + + // Otherwise it's: sk_name operator :value + const sk_name = resolveAttributeName(first_token, attribute_names) orelse return error.InvalidExpression; + + const op_token = tokenizer.nextToken() orelse return error.InvalidExpression; + const operator = parseOperator(op_token) orelse return error.InvalidExpression; + + const value_token = tokenizer.nextToken() orelse return error.InvalidExpression; + var value = try resolveAttributeValue(allocator, value_token, attribute_values); + errdefer json_module.deinitAttributeValue(&value, allocator); + + // Check for BETWEEN ... AND ... + var value2: ?types.AttributeValue = null; + if (operator == .BETWEEN) { + const and_token = tokenizer.nextToken() orelse return error.InvalidExpression; + if (!std.ascii.eqlIgnoreCase(and_token, "AND")) { + return error.InvalidExpression; + } + + const value2_token = tokenizer.nextToken() orelse return error.InvalidExpression; + value2 = try resolveAttributeValue(allocator, value2_token, attribute_values); + } + + return SortKeyCondition{ + .sk_name = sk_name, + .operator = operator, + .value = value, + .value2 = value2, + }; +} + +fn parseBeginsWith( + allocator: std.mem.Allocator, + tokenizer: *Tokenizer, + attribute_names: ?std.StringHashMap([]const u8), + attribute_values: std.StringHashMap(types.AttributeValue), +) !SortKeyCondition { + // Expect: ( sk_name , :value ) + const lparen = tokenizer.nextToken() orelse return error.InvalidExpression; + if (!std.mem.eql(u8, lparen, "(")) return error.InvalidExpression; + + const sk_name_token = tokenizer.nextToken() orelse return error.InvalidExpression; + const sk_name = resolveAttributeName(sk_name_token, attribute_names) orelse return error.InvalidExpression; + + const comma = tokenizer.nextToken() orelse return error.InvalidExpression; + if (!std.mem.eql(u8, comma, ",")) return error.InvalidExpression; + + const value_token = tokenizer.nextToken() orelse return error.InvalidExpression; + var value = try resolveAttributeValue(allocator, value_token, attribute_values); + errdefer json_module.deinitAttributeValue(&value, allocator); + + const rparen = tokenizer.nextToken() orelse return error.InvalidExpression; + if (!std.mem.eql(u8, rparen, ")")) return error.InvalidExpression; + + return SortKeyCondition{ + .sk_name = sk_name, + .operator = .BEGINS_WITH, + .value = value, + .value2 = null, + }; +} + +fn parseOperator(token: []const u8) ?SortKeyOperator { + if (std.mem.eql(u8, token, "=")) return .EQ; + if (std.mem.eql(u8, token, "<")) return .LT; + if (std.mem.eql(u8, token, "<=")) return .LE; + if (std.mem.eql(u8, token, ">")) return .GT; + if (std.mem.eql(u8, token, ">=")) return .GE; + if (std.ascii.eqlIgnoreCase(token, "BETWEEN")) return .BETWEEN; + return null; +} + +fn resolveAttributeName(token: []const u8, names: ?std.StringHashMap([]const u8)) ?[]const u8 { + if (token.len > 0 and token[0] == '#') { + // Expression attribute name placeholder + if (names) |n| { + return n.get(token); + } + return null; + } + // Direct attribute name + return token; +} + +fn resolveAttributeValue( + allocator: std.mem.Allocator, + token: []const u8, + values: std.StringHashMap(types.AttributeValue), +) !types.AttributeValue { + if (token.len > 0 and token[0] == ':') { + // Expression attribute value placeholder + const original = values.get(token) orelse return error.MissingAttributeValue; + return try json_module.deepCopyAttributeValue(allocator, original); + } + return error.InvalidExpression; +} + +// ============================================================================ +// Request Parsing Helpers +// ============================================================================ + +/// Parse ExpressionAttributeNames from request body +/// Returns null if not present +pub fn parseExpressionAttributeNames( + allocator: std.mem.Allocator, + request_body: []const u8, +) !?std.StringHashMap([]const u8) { + const parsed = std.json.parseFromSlice(std.json.Value, allocator, request_body, .{}) catch return null; + defer parsed.deinit(); + + const root = switch (parsed.value) { + .object => |o| o, + else => return null, + }; + + const names_val = root.get("ExpressionAttributeNames") orelse return null; + const names_obj = switch (names_val) { + .object => |o| o, + else => return null, + }; + + var result = std.StringHashMap([]const u8).init(allocator); + errdefer { + var iter = result.iterator(); + while (iter.next()) |entry| { + allocator.free(entry.key_ptr.*); + allocator.free(entry.value_ptr.*); + } + result.deinit(); + } + + var iter = names_obj.iterator(); + while (iter.next()) |entry| { + const key = try allocator.dupe(u8, entry.key_ptr.*); + errdefer allocator.free(key); + + const value = switch (entry.value_ptr.*) { + .string => |s| try allocator.dupe(u8, s), + else => { + allocator.free(key); + continue; + }, + }; + + try result.put(key, value); + } + + return result; +} + +/// Parse ExpressionAttributeValues from request body +/// Returns owned HashMap - caller must free +pub fn parseExpressionAttributeValues( + allocator: std.mem.Allocator, + request_body: []const u8, +) !std.StringHashMap(types.AttributeValue) { + const parsed = std.json.parseFromSlice(std.json.Value, allocator, request_body, .{}) catch + return std.StringHashMap(types.AttributeValue).init(allocator); + defer parsed.deinit(); + + const root = switch (parsed.value) { + .object => |o| o, + else => return std.StringHashMap(types.AttributeValue).init(allocator), + }; + + const values_val = root.get("ExpressionAttributeValues") orelse + return std.StringHashMap(types.AttributeValue).init(allocator); + const values_obj = switch (values_val) { + .object => |o| o, + else => return std.StringHashMap(types.AttributeValue).init(allocator), + }; + + var result = std.StringHashMap(types.AttributeValue).init(allocator); + errdefer { + var iter = result.iterator(); + while (iter.next()) |entry| { + allocator.free(entry.key_ptr.*); + json_module.deinitAttributeValue(entry.value_ptr, allocator); + } + result.deinit(); + } + + var iter = values_obj.iterator(); + while (iter.next()) |entry| { + const key = try allocator.dupe(u8, entry.key_ptr.*); + errdefer allocator.free(key); + + var value = json_module.parseAttributeValue(allocator, entry.value_ptr.*) catch continue; + errdefer json_module.deinitAttributeValue(&value, allocator); + + try result.put(key, value); + } + + return result; +} + +/// Parse KeyConditionExpression string from request body +pub fn parseKeyConditionExpressionString( + request_body: []const u8, +) ?[]const u8 { + // Use a simple search to avoid allocation for this common operation + const marker = "\"KeyConditionExpression\""; + const start_idx = std.mem.indexOf(u8, request_body, marker) orelse return null; + + // Find the colon after the key + const colon_idx = std.mem.indexOfPos(u8, request_body, start_idx + marker.len, ":") orelse return null; + + // Find the opening quote + var pos = colon_idx + 1; + while (pos < request_body.len and request_body[pos] != '"') : (pos += 1) {} + if (pos >= request_body.len) return null; + pos += 1; // Skip opening quote + + // Find the closing quote (handle escaped quotes) + const value_start = pos; + while (pos < request_body.len) { + if (request_body[pos] == '"' and (pos == 0 or request_body[pos - 1] != '\\')) { + return request_body[value_start..pos]; + } + pos += 1; + } + + return null; +} + +/// Convenience function to parse and evaluate a complete Query key condition +/// Returns owned KeyCondition - caller must call deinit() +pub fn parseQueryKeyCondition( + allocator: std.mem.Allocator, + request_body: []const u8, +) !?KeyCondition { + // Parse expression string + const expression = parseKeyConditionExpressionString(request_body) orelse return null; + + // Parse attribute names (optional) + var attr_names = try parseExpressionAttributeNames(allocator, request_body); + defer if (attr_names) |*names| { + deinitExpressionAttributeNames(names, allocator); + }; + + // Parse attribute values + var attr_values = try parseExpressionAttributeValues(allocator, request_body); + defer deinitExpressionAttributeValues(&attr_values, allocator); + + return try parseKeyConditionExpression(allocator, expression, attr_names, attr_values); +} + +// ============================================================================ +// Simple Tokenizer +// ============================================================================ + +const Tokenizer = struct { + input: []const u8, + pos: usize, + + pub fn init(input: []const u8) Tokenizer { + return .{ .input = input, .pos = 0 }; + } + + pub fn nextToken(self: *Tokenizer) ?[]const u8 { + // Skip whitespace + while (self.pos < self.input.len and std.ascii.isWhitespace(self.input[self.pos])) { + self.pos += 1; + } + + if (self.pos >= self.input.len) return null; + + const start = self.pos; + + // Single-character tokens + const c = self.input[self.pos]; + if (c == '(' or c == ')' or c == ',') { + self.pos += 1; + return self.input[start..self.pos]; + } + + // Two-character operators + if (self.pos + 1 < self.input.len) { + const two = self.input[self.pos .. self.pos + 2]; + if (std.mem.eql(u8, two, "<=") or std.mem.eql(u8, two, ">=") or std.mem.eql(u8, two, "<>")) { + self.pos += 2; + return two; + } + } + + // Single-character operators + if (c == '=' or c == '<' or c == '>') { + self.pos += 1; + return self.input[start..self.pos]; + } + + // Identifier or keyword (includes :placeholder and #name) + while (self.pos < self.input.len) { + const ch = self.input[self.pos]; + if (std.ascii.isAlphanumeric(ch) or ch == '_' or ch == ':' or ch == '#' or ch == '-') { + self.pos += 1; + } else { + break; + } + } + + if (self.pos > start) { + return self.input[start..self.pos]; + } + + // Unknown character, skip it + self.pos += 1; + return self.nextToken(); + } +}; + +// ============================================================================ +// Helpers for freeing parsed expression data +// ============================================================================ + +pub fn deinitExpressionAttributeNames(names: *std.StringHashMap([]const u8), allocator: std.mem.Allocator) void { + var iter = names.iterator(); + while (iter.next()) |entry| { + allocator.free(entry.key_ptr.*); + allocator.free(entry.value_ptr.*); + } + names.deinit(); +} + +pub fn deinitExpressionAttributeValues(values: *std.StringHashMap(types.AttributeValue), allocator: std.mem.Allocator) void { + var iter = values.iterator(); + while (iter.next()) |entry| { + allocator.free(entry.key_ptr.*); + json_module.deinitAttributeValue(entry.value_ptr, allocator); + } + values.deinit(); +} + +// ============================================================================ +// Tests +// ============================================================================ + +test "tokenizer basic" { + var t = Tokenizer.init("pk = :pk AND sk > :sk"); + + try std.testing.expectEqualStrings("pk", t.nextToken().?); + try std.testing.expectEqualStrings("=", t.nextToken().?); + try std.testing.expectEqualStrings(":pk", t.nextToken().?); + try std.testing.expectEqualStrings("AND", t.nextToken().?); + try std.testing.expectEqualStrings("sk", t.nextToken().?); + try std.testing.expectEqualStrings(">", t.nextToken().?); + try std.testing.expectEqualStrings(":sk", t.nextToken().?); + try std.testing.expect(t.nextToken() == null); +} + +test "tokenizer begins_with" { + var t = Tokenizer.init("pk = :pk AND begins_with(sk, :prefix)"); + + try std.testing.expectEqualStrings("pk", t.nextToken().?); + try std.testing.expectEqualStrings("=", t.nextToken().?); + try std.testing.expectEqualStrings(":pk", t.nextToken().?); + try std.testing.expectEqualStrings("AND", t.nextToken().?); + try std.testing.expectEqualStrings("begins_with", t.nextToken().?); + try std.testing.expectEqualStrings("(", t.nextToken().?); + try std.testing.expectEqualStrings("sk", t.nextToken().?); + try std.testing.expectEqualStrings(",", t.nextToken().?); + try std.testing.expectEqualStrings(":prefix", t.nextToken().?); + try std.testing.expectEqualStrings(")", t.nextToken().?); + try std.testing.expect(t.nextToken() == null); +} diff --git a/src/dynamodb/handler.zig b/src/dynamodb/handler.zig index 68f639a..301c2b5 100644 --- a/src/dynamodb/handler.zig +++ b/src/dynamodb/handler.zig @@ -1,29 +1,25 @@ /// DynamoDB API request handlers with proper concurrency support -/// Phase 3.1: Uses request-scoped arena allocator for temporary allocations -/// Phase 3.3: Context-based handler, no global state +/// - Uses request-scoped arena allocator for temporary allocations +/// - Proper expression parsing for Query operations +/// - Correct key type reconstruction for pagination const std = @import("std"); const http = @import("../http.zig"); const storage = @import("storage.zig"); const types = @import("types.zig"); const json = @import("json.zig"); +const expression = @import("expression.zig"); const key_codec = @import("../key_codec.zig"); pub const ApiHandler = struct { engine: *storage.StorageEngine, - main_allocator: std.mem.Allocator, // For persistent allocations (storage engine) + main_allocator: std.mem.Allocator, const Self = @This(); pub fn init(main_allocator: std.mem.Allocator, engine: *storage.StorageEngine) Self { - return .{ - .engine = engine, - .main_allocator = main_allocator, - }; + return .{ .engine = engine, .main_allocator = main_allocator }; } - /// Main request handler - called with context pointer - /// Phase 3.3: No global state, context passed explicitly - /// Phase 3.1: Uses request_alloc (arena) for temporary allocations pub fn handleRequest(ctx: *anyopaque, request: *const http.Request, request_alloc: std.mem.Allocator) http.Response { const self: *Self = @ptrCast(@alignCast(ctx)); return self.handle(request, request_alloc); @@ -31,12 +27,9 @@ pub const ApiHandler = struct { fn handle(self: *Self, request: *const http.Request, request_alloc: std.mem.Allocator) http.Response { var response = http.Response.init(request_alloc); - - // Add standard DynamoDB headers response.addHeader("Content-Type", "application/x-amz-json-1.0") catch {}; response.addHeader("x-amzn-RequestId", "local-request-id") catch {}; - // Get operation from X-Amz-Target header const target = request.getHeader("X-Amz-Target") orelse { return self.errorResponse(&response, .ValidationException, "Missing X-Amz-Target header", request_alloc); }; @@ -53,19 +46,14 @@ pub const ApiHandler = struct { .DeleteItem => self.handleDeleteItem(request, &response, request_alloc), .Query => self.handleQuery(request, &response, request_alloc), .Scan => self.handleScan(request, &response, request_alloc), - .Unknown => { - return self.errorResponse(&response, .ValidationException, "Unknown operation", request_alloc); - }, - else => { - return self.errorResponse(&response, .ValidationException, "Operation not implemented", request_alloc); - }, + .Unknown => return self.errorResponse(&response, .ValidationException, "Unknown operation", request_alloc), + else => return self.errorResponse(&response, .ValidationException, "Operation not implemented", request_alloc), } return response; } fn handleCreateTable(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void { - // Parse the entire request body properly (using request_alloc for parsing) const parsed = std.json.parseFromSlice(std.json.Value, request_alloc, request.body, .{}) catch { _ = self.errorResponse(response, .ValidationException, "Invalid JSON", request_alloc); return; @@ -80,7 +68,6 @@ pub const ApiHandler = struct { }, }; - // Extract TableName const table_name_val = root.get("TableName") orelse { _ = self.errorResponse(response, .ValidationException, "Missing TableName", request_alloc); return; @@ -93,7 +80,6 @@ pub const ApiHandler = struct { }, }; - // Parse KeySchema from request const key_schema = self.parseKeySchema(root, request_alloc) catch |err| { const msg = switch (err) { error.MissingKeySchema => "Missing KeySchema", @@ -108,7 +94,6 @@ pub const ApiHandler = struct { return; }; - // Parse AttributeDefinitions from request const attr_defs = self.parseAttributeDefinitions(root, request_alloc) catch |err| { const msg = switch (err) { error.MissingAttributeDefinitions => "Missing AttributeDefinitions", @@ -121,7 +106,6 @@ pub const ApiHandler = struct { return; }; - // Cross-validate: key attributes must be defined in AttributeDefinitions self.validateKeyAttributesDefined(key_schema, attr_defs) catch |err| { const msg = switch (err) { error.KeyAttributeNotDefined => "Key attribute not defined in AttributeDefinitions", @@ -143,7 +127,6 @@ pub const ApiHandler = struct { return; }; - // Build response (using request_alloc for temporary string) const resp_body = std.fmt.allocPrint( request_alloc, "{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"CreationDateTime\":{d}}}}}", @@ -152,29 +135,19 @@ pub const ApiHandler = struct { _ = self.errorResponse(response, .InternalServerError, "Serialization failed", request_alloc); return; }; - // No defer needed - arena handles cleanup response.setBody(resp_body) catch {}; } - /// Parse KeySchema from CreateTable request - /// Validates: exactly 1 HASH, at most 1 RANGE - /// Returns slice allocated with request_alloc (will be freed by storage engine) - fn parseKeySchema( - self: *Self, - root: std.json.ObjectMap, - allocator: std.mem.Allocator, - ) ![]types.KeySchemaElement { + fn parseKeySchema(self: *Self, root: std.json.ObjectMap, allocator: std.mem.Allocator) ![]types.KeySchemaElement { _ = self; - const key_schema_val = root.get("KeySchema") orelse return error.MissingKeySchema; const key_schema_array = switch (key_schema_val) { .array => |a| a, else => return error.InvalidKeySchema, }; - if (key_schema_array.items.len == 0) return error.InvalidKeySchema; - if (key_schema_array.items.len > 2) return error.InvalidKeySchema; + if (key_schema_array.items.len == 0 or key_schema_array.items.len > 2) return error.InvalidKeySchema; var key_schema = std.ArrayList(types.KeySchemaElement).init(allocator); errdefer { @@ -191,7 +164,6 @@ pub const ApiHandler = struct { else => return error.InvalidKeySchema, }; - // Parse AttributeName const attr_name_val = obj.get("AttributeName") orelse return error.InvalidKeySchema; const attr_name_str = switch (attr_name_val) { .string => |s| s, @@ -200,33 +172,21 @@ pub const ApiHandler = struct { const attr_name = try allocator.dupe(u8, attr_name_str); errdefer allocator.free(attr_name); - // Parse KeyType const key_type_val = obj.get("KeyType") orelse return error.InvalidKeySchema; const key_type_str = switch (key_type_val) { .string => |s| s, else => return error.InvalidKeySchema, }; - const key_type = if (std.mem.eql(u8, key_type_str, "HASH")) - types.KeyType.HASH - else if (std.mem.eql(u8, key_type_str, "RANGE")) - types.KeyType.RANGE - else - return error.InvalidKeyType; + const key_type = if (std.mem.eql(u8, key_type_str, "HASH")) types.KeyType.HASH else if (std.mem.eql(u8, key_type_str, "RANGE")) types.KeyType.RANGE else return error.InvalidKeyType; - // Count keys switch (key_type) { .HASH => hash_count += 1, .RANGE => range_count += 1, } - - try key_schema.append(.{ - .attribute_name = attr_name, - .key_type = key_type, - }); + try key_schema.append(.{ .attribute_name = attr_name, .key_type = key_type }); } - // Validate counts if (hash_count == 0) return error.NoHashKey; if (hash_count > 1) return error.MultipleHashKeys; if (range_count > 1) return error.MultipleRangeKeys; @@ -234,15 +194,8 @@ pub const ApiHandler = struct { return key_schema.toOwnedSlice(); } - /// Parse AttributeDefinitions from CreateTable request - /// Returns slice allocated with request_alloc (will be freed by storage engine) - fn parseAttributeDefinitions( - self: *Self, - root: std.json.ObjectMap, - allocator: std.mem.Allocator, - ) ![]types.AttributeDefinition { + fn parseAttributeDefinitions(self: *Self, root: std.json.ObjectMap, allocator: std.mem.Allocator) ![]types.AttributeDefinition { _ = self; - const attr_defs_val = root.get("AttributeDefinitions") orelse return error.MissingAttributeDefinitions; const attr_defs_array = switch (attr_defs_val) { .array => |a| a, @@ -257,7 +210,6 @@ pub const ApiHandler = struct { attr_defs.deinit(); } - // Track seen attributes to detect duplicates var seen = std.StringHashMap(void).init(allocator); defer seen.deinit(); @@ -267,55 +219,34 @@ pub const ApiHandler = struct { else => return error.InvalidAttributeDefinitions, }; - // Parse AttributeName const attr_name_val = obj.get("AttributeName") orelse return error.InvalidAttributeDefinitions; const attr_name_str = switch (attr_name_val) { .string => |s| s, else => return error.InvalidAttributeDefinitions, }; - // Check for duplicates if (seen.contains(attr_name_str)) return error.DuplicateAttributeName; try seen.put(attr_name_str, {}); const attr_name = try allocator.dupe(u8, attr_name_str); errdefer allocator.free(attr_name); - // Parse AttributeType const attr_type_val = obj.get("AttributeType") orelse return error.InvalidAttributeDefinitions; const attr_type_str = switch (attr_type_val) { .string => |s| s, else => return error.InvalidAttributeDefinitions, }; - const attr_type = if (std.mem.eql(u8, attr_type_str, "S")) - types.ScalarAttributeType.S - else if (std.mem.eql(u8, attr_type_str, "N")) - types.ScalarAttributeType.N - else if (std.mem.eql(u8, attr_type_str, "B")) - types.ScalarAttributeType.B - else - return error.InvalidAttributeType; + const attr_type = if (std.mem.eql(u8, attr_type_str, "S")) types.ScalarAttributeType.S else if (std.mem.eql(u8, attr_type_str, "N")) types.ScalarAttributeType.N else if (std.mem.eql(u8, attr_type_str, "B")) types.ScalarAttributeType.B else return error.InvalidAttributeType; - try attr_defs.append(.{ - .attribute_name = attr_name, - .attribute_type = attr_type, - }); + try attr_defs.append(.{ .attribute_name = attr_name, .attribute_type = attr_type }); } return attr_defs.toOwnedSlice(); } - /// Validate that all key attributes are defined in AttributeDefinitions - /// DynamoDB rule: AttributeDefinitions should only contain key attributes - fn validateKeyAttributesDefined( - self: *Self, - key_schema: []const types.KeySchemaElement, - attr_defs: []const types.AttributeDefinition, - ) !void { + fn validateKeyAttributesDefined(self: *Self, key_schema: []const types.KeySchemaElement, attr_defs: []const types.AttributeDefinition) !void { _ = self; - - // Check each key attribute is defined for (key_schema) |key_elem| { var found = false; for (attr_defs) |attr_def| { @@ -326,10 +257,6 @@ pub const ApiHandler = struct { } if (!found) return error.KeyAttributeNotDefined; } - - // Note: DynamoDB only allows attributes in AttributeDefinitions if they're - // used in keys (primary or secondary indexes). For now we allow extra - // attributes for forward compatibility with GSI/LSI implementation. } fn handleDeleteTable(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void { @@ -340,22 +267,13 @@ pub const ApiHandler = struct { self.engine.deleteTable(table_name) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to delete table", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Failed to delete table", request_alloc), } return; }; - const resp_body = std.fmt.allocPrint( - request_alloc, - "{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"DELETING\"}}}}", - .{table_name}, - ) catch return; - + const resp_body = std.fmt.allocPrint(request_alloc, "{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"DELETING\"}}}}", .{table_name}) catch return; response.setBody(resp_body) catch {}; } @@ -367,29 +285,18 @@ pub const ApiHandler = struct { const desc = self.engine.describeTable(table_name) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to describe table", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Failed to describe table", request_alloc), } return; }; - const resp_body = std.fmt.allocPrint( - request_alloc, - "{{\"Table\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"ItemCount\":{d},\"TableSizeBytes\":{d}}}}}", - .{ desc.table_name, desc.table_status.toString(), desc.item_count, desc.table_size_bytes }, - ) catch return; - + const resp_body = std.fmt.allocPrint(request_alloc, "{{\"Table\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"ItemCount\":{d},\"TableSizeBytes\":{d}}}}}", .{ desc.table_name, desc.table_status.toString(), desc.item_count, desc.table_size_bytes }) catch return; response.setBody(resp_body) catch {}; } fn handleListTables(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void { _ = request; - - // Note: listTables allocates with main_allocator, we must free const tables = self.engine.listTables() catch { _ = self.errorResponse(response, .InternalServerError, "Failed to list tables", request_alloc); return; @@ -409,18 +316,15 @@ pub const ApiHandler = struct { writer.print("\"{s}\"", .{table}) catch return; } writer.writeAll("]}") catch return; - response.setBody(buf.items) catch {}; } fn handlePutItem(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void { - // Parse table name (temporary - arena) const table_name = json.parseTableName(request_alloc, request.body) catch { _ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc); return; }; - // Parse item (temporary - arena) var item = json.parseItemFromRequest(request_alloc, request.body) catch |err| { const msg = switch (err) { error.MissingItem => "Missing Item field", @@ -432,21 +336,12 @@ pub const ApiHandler = struct { }; defer json.deinitItem(&item, request_alloc); - // Store the item (storage engine handles persistent allocation) self.engine.putItem(table_name, item) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - storage.StorageError.MissingKeyAttribute => { - _ = self.errorResponse(response, .ValidationException, "Item missing required key attribute", request_alloc); - }, - storage.StorageError.InvalidKey => { - _ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to put item", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + storage.StorageError.MissingKeyAttribute => _ = self.errorResponse(response, .ValidationException, "Item missing required key attribute", request_alloc), + storage.StorageError.InvalidKey => _ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Failed to put item", request_alloc), } return; }; @@ -455,13 +350,11 @@ pub const ApiHandler = struct { } fn handleGetItem(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void { - // Parse table name const table_name = json.parseTableName(request_alloc, request.body) catch { _ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc); return; }; - // Parse key var key = json.parseKeyFromRequest(request_alloc, request.body) catch |err| { const msg = switch (err) { error.MissingKey => "Missing Key field", @@ -473,34 +366,22 @@ pub const ApiHandler = struct { }; defer json.deinitItem(&key, request_alloc); - // Get item (storage engine returns item allocated with main_allocator) const item = self.engine.getItem(table_name, key) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - storage.StorageError.MissingKeyAttribute => { - _ = self.errorResponse(response, .ValidationException, "Key missing required attributes", request_alloc); - }, - storage.StorageError.InvalidKey => { - _ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to get item", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + storage.StorageError.MissingKeyAttribute => _ = self.errorResponse(response, .ValidationException, "Key missing required attributes", request_alloc), + storage.StorageError.InvalidKey => _ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Failed to get item", request_alloc), } return; }; if (item) |i| { defer json.deinitItem(&i, self.main_allocator); - - // Serialize item to JSON (temporary - arena) const item_json = json.serializeItem(request_alloc, i) catch { _ = self.errorResponse(response, .InternalServerError, "Failed to serialize item", request_alloc); return; }; - const resp = std.fmt.allocPrint(request_alloc, "{{\"Item\":{s}}}", .{item_json}) catch return; response.setBody(resp) catch {}; } else { @@ -509,13 +390,11 @@ pub const ApiHandler = struct { } fn handleDeleteItem(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void { - // Parse table name const table_name = json.parseTableName(request_alloc, request.body) catch { _ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc); return; }; - // Parse key var key = json.parseKeyFromRequest(request_alloc, request.body) catch |err| { const msg = switch (err) { error.MissingKey => "Missing Key field", @@ -529,18 +408,10 @@ pub const ApiHandler = struct { self.engine.deleteItem(table_name, key) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - storage.StorageError.MissingKeyAttribute => { - _ = self.errorResponse(response, .ValidationException, "Key missing required attributes", request_alloc); - }, - storage.StorageError.InvalidKey => { - _ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to delete item", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + storage.StorageError.MissingKeyAttribute => _ = self.errorResponse(response, .ValidationException, "Key missing required attributes", request_alloc), + storage.StorageError.InvalidKey => _ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Failed to delete item", request_alloc), } return; }; @@ -549,29 +420,47 @@ pub const ApiHandler = struct { } fn handleQuery(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void { - // Parse table name const table_name = json.parseTableName(request_alloc, request.body) catch { _ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc); return; }; - // Get table metadata to access key schema (for pagination) - const metadata = self.engine.describeTable(table_name) catch |err| { + // Get table metadata for key schema and attribute definitions + var metadata = self.engine.getTableMetadata(table_name) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to access table", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Failed to access table", request_alloc), } return; }; + defer metadata.deinit(self.main_allocator); + + // Parse KeyConditionExpression properly + var key_condition = expression.parseQueryKeyCondition(request_alloc, request.body) catch |err| { + const msg = switch (err) { + error.InvalidExpression => "Invalid KeyConditionExpression", + error.MissingAttributeValue => "Missing value in ExpressionAttributeValues", + else => "Failed to parse KeyConditionExpression", + }; + _ = self.errorResponse(response, .ValidationException, msg, request_alloc); + return; + }; + + // Get partition key value + const pk_value = if (key_condition) |*kc| blk: { + defer kc.deinit(request_alloc); + break :blk kc.getPkBytes() catch { + _ = self.errorResponse(response, .ValidationException, "Invalid partition key type", request_alloc); + return; + }; + } else { + _ = self.errorResponse(response, .ValidationException, "Missing KeyConditionExpression", request_alloc); + return; + }; - // Parse limit const limit = json.parseLimit(request_alloc, request.body) catch null; - // Parse ExclusiveStartKey + // Parse ExclusiveStartKey with proper type handling var start_key_opt = json.parseExclusiveStartKey(request_alloc, request.body, metadata.key_schema) catch |err| { const msg = switch (err) { error.MissingKeyAttribute => "ExclusiveStartKey missing required attributes", @@ -583,7 +472,6 @@ pub const ApiHandler = struct { }; defer if (start_key_opt) |*key| key.deinit(request_alloc); - // Convert Key to binary storage key if present var start_key_binary: ?[]u8 = null; defer if (start_key_binary) |k| request_alloc.free(k); @@ -592,61 +480,41 @@ pub const ApiHandler = struct { _ = self.errorResponse(response, .ValidationException, "Invalid ExclusiveStartKey", request_alloc); return; }; - start_key_binary = key_codec.buildDataKey( - request_alloc, - table_name, - key_values.pk, - key_values.sk, - ) catch { + start_key_binary = key_codec.buildDataKey(request_alloc, table_name, key_values.pk, key_values.sk) catch { _ = self.errorResponse(response, .InternalServerError, "Failed to encode start key", request_alloc); return; }; } - // Simplified: extract partition key value from ExpressionAttributeValues - // PHASE 6 TODO: Implement proper expression parsing - const pk_value = extractSimpleValue(request.body, ":pk") orelse "default"; - var result = self.engine.query(table_name, pk_value, limit, start_key_binary) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Query failed", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Query failed", request_alloc), } return; }; defer result.deinit(self.main_allocator); - self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, metadata.key_schema, request_alloc); + self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, &metadata, request_alloc); } fn handleScan(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void { - // Parse table name const table_name = json.parseTableName(request_alloc, request.body) catch { _ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc); return; }; - // Get table metadata to access key schema (for pagination) - const metadata = self.engine.describeTable(table_name) catch |err| { + var metadata = self.engine.getTableMetadata(table_name) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to access table", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Failed to access table", request_alloc), } return; }; + defer metadata.deinit(self.main_allocator); - // Parse limit const limit = json.parseLimit(request_alloc, request.body) catch null; - // Parse ExclusiveStartKey var start_key_opt = json.parseExclusiveStartKey(request_alloc, request.body, metadata.key_schema) catch |err| { const msg = switch (err) { error.MissingKeyAttribute => "ExclusiveStartKey missing required attributes", @@ -658,7 +526,6 @@ pub const ApiHandler = struct { }; defer if (start_key_opt) |*key| key.deinit(request_alloc); - // Convert Key to binary storage key if present var start_key_binary: ?[]u8 = null; defer if (start_key_binary) |k| request_alloc.free(k); @@ -667,12 +534,7 @@ pub const ApiHandler = struct { _ = self.errorResponse(response, .ValidationException, "Invalid ExclusiveStartKey", request_alloc); return; }; - start_key_binary = key_codec.buildDataKey( - request_alloc, - table_name, - key_values.pk, - key_values.sk, - ) catch { + start_key_binary = key_codec.buildDataKey(request_alloc, table_name, key_values.pk, key_values.sk) catch { _ = self.errorResponse(response, .InternalServerError, "Failed to encode start key", request_alloc); return; }; @@ -680,18 +542,14 @@ pub const ApiHandler = struct { var result = self.engine.scan(table_name, limit, start_key_binary) catch |err| { switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Scan failed", request_alloc); - }, + storage.StorageError.TableNotFound => _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc), + else => _ = self.errorResponse(response, .InternalServerError, "Scan failed", request_alloc), } return; }; defer result.deinit(self.main_allocator); - self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, metadata.key_schema, request_alloc); + self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, &metadata, request_alloc); } fn writeItemsResponseWithPagination( @@ -699,7 +557,7 @@ pub const ApiHandler = struct { response: *http.Response, items: []const types.Item, last_evaluated_key_binary: ?[]const u8, - key_schema: []const types.KeySchemaElement, + metadata: *const storage.TableMetadata, request_alloc: std.mem.Allocator, ) void { var buf = std.ArrayList(u8).init(request_alloc); @@ -709,52 +567,19 @@ pub const ApiHandler = struct { writer.writeAll("{\"Items\":[") catch return; for (items, 0..) |item, i| { if (i > 0) writer.writeByte(',') catch return; - json.serializeItemToWriter(writer, item) catch return; + json.serializeItemToWriter(writer, item, request_alloc) catch return; } writer.print("],\"Count\":{d},\"ScannedCount\":{d}", .{ items.len, items.len }) catch return; - // Add LastEvaluatedKey if pagination is needed if (last_evaluated_key_binary) |binary_key| { - // Decode binary storage key back to Key struct - var decoder = key_codec.KeyDecoder.init(binary_key); - - // Skip entity type - _ = decoder.readEntityType() catch { - writer.writeAll("}") catch {}; - response.setBody(buf.items) catch {}; - return; - }; - - // Skip table name segment - _ = decoder.readSegmentBorrowed() catch { - writer.writeAll("}") catch {}; - response.setBody(buf.items) catch {}; - return; - }; - - // Read partition key - const pk_bytes = decoder.readSegmentBorrowed() catch { - writer.writeAll("}") catch {}; - response.setBody(buf.items) catch {}; - return; - }; - - // Read sort key if present - var sk_bytes: ?[]const u8 = null; - if (decoder.hasMore()) { - sk_bytes = decoder.readSegmentBorrowed() catch null; - } - - // Build Key struct from raw bytes (using request_alloc) - var key = self.buildKeyFromBytes(pk_bytes, sk_bytes, key_schema, request_alloc) catch { + var key = self.buildKeyFromBinaryWithTypes(binary_key, metadata, request_alloc) catch { writer.writeAll("}") catch {}; response.setBody(buf.items) catch {}; return; }; defer key.deinit(request_alloc); - // Serialize Key as DynamoDB JSON - const lek_json = json.serializeLastEvaluatedKey(request_alloc, key, key_schema) catch { + const lek_json = json.serializeLastEvaluatedKey(request_alloc, key, metadata.key_schema) catch { writer.writeAll("}") catch {}; response.setBody(buf.items) catch {}; return; @@ -767,33 +592,46 @@ pub const ApiHandler = struct { response.setBody(buf.items) catch {}; } - /// Helper to build a Key struct from raw bytes and key schema - /// This reconstructs the AttributeValue with the correct type (S/N/B) - /// TODO Phase 3: Need attribute_definitions to properly determine types - /// For now, assumes all keys are strings (S) which covers 95% of use cases - fn buildKeyFromBytes( + /// Build a Key struct from binary storage key with correct attribute types + /// Uses attribute_definitions from metadata to determine S/N/B type + fn buildKeyFromBinaryWithTypes( self: *Self, - pk_bytes: []const u8, - sk_bytes: ?[]const u8, - _: []const types.KeySchemaElement, // key_schema - TODO: use in Phase 3 with attribute_definitions + binary_key: []const u8, + metadata: *const storage.TableMetadata, allocator: std.mem.Allocator, ) !types.Key { _ = self; - // TODO Phase 3: Use key_schema + attribute_definitions to determine correct type - // For now, assume all keys are strings (most common case) + var decoder = key_codec.KeyDecoder.init(binary_key); - const pk_attr = types.AttributeValue{ .S = try allocator.dupe(u8, pk_bytes) }; - errdefer allocator.free(pk_attr.S); + // Skip entity type + _ = try decoder.readEntityType(); + // Skip table name + _ = try decoder.readSegmentBorrowed(); + + // Read partition key bytes + const pk_bytes = try decoder.readSegmentBorrowed(); + + // Read sort key bytes if present + var sk_bytes: ?[]const u8 = null; + if (decoder.hasMore()) { + sk_bytes = try decoder.readSegmentBorrowed(); + } + + // Get attribute types from metadata + const pk_name = metadata.getPartitionKeyName() orelse return error.InvalidKey; + const pk_type = metadata.getAttributeType(pk_name) orelse return error.InvalidKey; + + const pk_attr = try buildAttributeValueWithType(allocator, pk_bytes, pk_type); + errdefer json.deinitAttributeValue(&pk_attr, allocator); var sk_attr: ?types.AttributeValue = null; if (sk_bytes) |sk| { - sk_attr = types.AttributeValue{ .S = try allocator.dupe(u8, sk) }; + const sk_name = metadata.getSortKeyName() orelse return error.InvalidKey; + const sk_type = metadata.getAttributeType(sk_name) orelse return error.InvalidKey; + sk_attr = try buildAttributeValueWithType(allocator, sk, sk_type); } - return types.Key{ - .pk = pk_attr, - .sk = sk_attr, - }; + return types.Key{ .pk = pk_attr, .sk = sk_attr }; } fn errorResponse(self: *Self, response: *http.Response, err_type: types.DynamoDBErrorType, message: []const u8, request_alloc: std.mem.Allocator) http.Response { @@ -807,19 +645,20 @@ pub const ApiHandler = struct { const body = err_type.toErrorResponse(message, request_alloc) catch return response.*; response.setBody(body) catch {}; - // No need to free - arena handles it return response.*; } }; -/// Temporary helper for Query operation until we implement proper expression parsing in Phase 6 -/// PHASE 6 TODO: Replace with proper ExpressionAttributeValues parsing -fn extractSimpleValue(json_data: []const u8, key: []const u8) ?[]const u8 { - var search_buf: [256]u8 = undefined; - const search = std.fmt.bufPrint(&search_buf, "\"{s}\":\"", .{key}) catch return null; - - const start = std.mem.indexOf(u8, json_data, search) orelse return null; - const value_start = start + search.len; - const value_end = std.mem.indexOfPos(u8, json_data, value_start, "\"") orelse return null; - return json_data[value_start..value_end]; +/// Build an AttributeValue with the correct type (S, N, or B) from raw bytes +fn buildAttributeValueWithType( + allocator: std.mem.Allocator, + bytes: []const u8, + attr_type: types.ScalarAttributeType, +) !types.AttributeValue { + const owned = try allocator.dupe(u8, bytes); + return switch (attr_type) { + .S => types.AttributeValue{ .S = owned }, + .N => types.AttributeValue{ .N = owned }, + .B => types.AttributeValue{ .B = owned }, + }; } diff --git a/src/dynamodb/json.zig b/src/dynamodb/json.zig index c92f6b8..9ab20fa 100644 --- a/src/dynamodb/json.zig +++ b/src/dynamodb/json.zig @@ -241,15 +241,16 @@ pub fn serializeItem(allocator: std.mem.Allocator, item: types.Item) ![]u8 { errdefer buf.deinit(); const writer = buf.writer(); - try serializeItemToWriter(writer, item); + try serializeItemToWriter(writer, item, allocator); return buf.toOwnedSlice(); } /// Serialize an Item to a writer with deterministic ordering -pub fn serializeItemToWriter(writer: anytype, item: types.Item) !void { +/// Uses the provided allocator for temporary key sorting (not page_allocator) +pub fn serializeItemToWriter(writer: anytype, item: types.Item, allocator: std.mem.Allocator) !void { // Collect and sort keys for deterministic output - var keys = std.ArrayList([]const u8).init(std.heap.page_allocator); + var keys = std.ArrayList([]const u8).init(allocator); defer keys.deinit(); var iter = item.iterator(); @@ -269,14 +270,13 @@ pub fn serializeItemToWriter(writer: anytype, item: types.Item) !void { if (i > 0) try writer.writeByte(','); try writer.print("\"{s}\":", .{key}); const value = item.get(key).?; - try serializeAttributeValue(writer, value); + try serializeAttributeValue(writer, value, allocator); } try writer.writeByte('}'); } /// Serialize an AttributeValue to DynamoDB JSON format -/// Caller owns returned slice and must free it -pub fn serializeAttributeValue(writer: anytype, attr: types.AttributeValue) !void { +pub fn serializeAttributeValue(writer: anytype, attr: types.AttributeValue, allocator: std.mem.Allocator) !void { switch (attr) { .S => |s| try writer.print("{{\"S\":\"{s}\"}}", .{s}), .N => |n| try writer.print("{{\"N\":\"{s}\"}}", .{n}), @@ -311,15 +311,15 @@ pub fn serializeAttributeValue(writer: anytype, attr: types.AttributeValue) !voi try writer.writeAll("{\"L\":["); for (list, 0..) |item, i| { if (i > 0) try writer.writeByte(','); - try serializeAttributeValue(writer, item); + try serializeAttributeValue(writer, item, allocator); } try writer.writeAll("]}"); }, .M => |map| { try writer.writeAll("{\"M\":{"); - // Collect and sort keys for deterministic output - var keys = std.ArrayList([]const u8).init(std.heap.page_allocator); + // Collect and sort keys for deterministic output - use provided allocator + var keys = std.ArrayList([]const u8).init(allocator); defer keys.deinit(); var iter = map.iterator(); @@ -337,7 +337,7 @@ pub fn serializeAttributeValue(writer: anytype, attr: types.AttributeValue) !voi if (i > 0) try writer.writeByte(','); try writer.print("\"{s}\":", .{key}); const value = map.get(key).?; - try serializeAttributeValue(writer, value); + try serializeAttributeValue(writer, value, allocator); } try writer.writeAll("}}"); }, @@ -398,7 +398,7 @@ pub fn parseKeyFromRequest(allocator: std.mem.Allocator, request_body: []const u } // ============================================================================ -// Pagination Helpers (Phase 2.5) +// Pagination Helpers // ============================================================================ /// Parse ExclusiveStartKey from request body @@ -485,7 +485,7 @@ pub fn extractKeyAttributes( errdefer allocator.free(attr_name); // Deep copy the attribute value - const copied_value = try deepCopyAttributeValue(allocator, attr_value); + var copied_value = try deepCopyAttributeValue(allocator, attr_value); errdefer deinitAttributeValue(&copied_value, allocator); try key.put(attr_name, copied_value); @@ -579,7 +579,7 @@ pub fn deinitAttributeValue(attr: *types.AttributeValue, allocator: std.mem.Allo }, .L => |list| { for (list) |*item| { - deinitAttributeValue(item, allocator); + deinitAttributeValue(@constCast(item), allocator); } allocator.free(list); }, diff --git a/src/dynamodb/storage.zig b/src/dynamodb/storage.zig index 5bac54d..82a4d03 100644 --- a/src/dynamodb/storage.zig +++ b/src/dynamodb/storage.zig @@ -21,7 +21,7 @@ pub const StorageError = error{ /// Result type for Scan operations with pagination pub const ScanResult = struct { items: []types.Item, - last_evaluated_key: ?[]u8, // Binary-encoded storage key + last_evaluated_key: ?[]u8, pub fn deinit(self: *ScanResult, allocator: std.mem.Allocator) void { for (self.items) |*item| json.deinitItem(item, allocator); @@ -33,7 +33,7 @@ pub const ScanResult = struct { /// Result type for Query operations with pagination pub const QueryResult = struct { items: []types.Item, - last_evaluated_key: ?[]u8, // Binary-encoded storage key + last_evaluated_key: ?[]u8, pub fn deinit(self: *QueryResult, allocator: std.mem.Allocator) void { for (self.items) |*item| json.deinitItem(item, allocator); @@ -43,25 +43,46 @@ pub const QueryResult = struct { }; /// In-memory representation of table metadata -const TableMetadata = struct { +pub const TableMetadata = struct { table_name: []const u8, key_schema: []types.KeySchemaElement, attribute_definitions: []types.AttributeDefinition, table_status: types.TableStatus, creation_date_time: i64, - - // Secondary indexes global_secondary_indexes: ?[]types.GlobalSecondaryIndex = null, local_secondary_indexes: ?[]types.LocalSecondaryIndex = null, + + /// Get the attribute type for a given attribute name + pub fn getAttributeType(self: *const TableMetadata, attr_name: []const u8) ?types.ScalarAttributeType { + for (self.attribute_definitions) |def| { + if (std.mem.eql(u8, def.attribute_name, attr_name)) { + return def.attribute_type; + } + } + return null; + } + + /// Get the partition key attribute name + pub fn getPartitionKeyName(self: *const TableMetadata) ?[]const u8 { + for (self.key_schema) |ks| { + if (ks.key_type == .HASH) return ks.attribute_name; + } + return null; + } + + /// Get the sort key attribute name (if any) + pub fn getSortKeyName(self: *const TableMetadata) ?[]const u8 { + for (self.key_schema) |ks| { + if (ks.key_type == .RANGE) return ks.attribute_name; + } + return null; + } + pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void { allocator.free(self.table_name); - for (self.key_schema) |ks| { - allocator.free(ks.attribute_name); - } + for (self.key_schema) |ks| allocator.free(ks.attribute_name); allocator.free(self.key_schema); - for (self.attribute_definitions) |ad| { - allocator.free(ad.attribute_name); - } + for (self.attribute_definitions) |ad| allocator.free(ad.attribute_name); allocator.free(self.attribute_definitions); if (self.global_secondary_indexes) |gsis| { @@ -71,7 +92,6 @@ const TableMetadata = struct { } allocator.free(gsis); } - if (self.local_secondary_indexes) |lsis| { for (lsis) |*lsi| { var lsi_mut = lsi.*; @@ -85,14 +105,8 @@ const TableMetadata = struct { pub const StorageEngine = struct { db: rocksdb.DB, allocator: std.mem.Allocator, - - // Phase 3.2: Per-table locks for safe concurrent access - // Maps table_name -> RwLock - // - Read operations (Get, Query, Scan) acquire read lock - // - Write operations (Put, Delete) acquire read lock (RocksDB handles write concurrency) - // - DDL operations (CreateTable, DeleteTable) acquire write lock table_locks: std.StringHashMap(*std.Thread.RwLock), - table_locks_mutex: std.Thread.Mutex, // Protects the table_locks map itself + table_locks_mutex: std.Thread.Mutex, const Self = @This(); @@ -107,67 +121,48 @@ pub const StorageEngine = struct { } pub fn deinit(self: *Self) void { - // Clean up all table locks var iter = self.table_locks.iterator(); while (iter.next()) |entry| { self.allocator.free(entry.key_ptr.*); self.allocator.destroy(entry.value_ptr.*); } self.table_locks.deinit(); - self.db.close(); } - // === Lock Management (Phase 3.2) === - - /// Get or create a lock for a table - /// Thread-safe: Uses mutex to protect table_locks map fn getOrCreateTableLock(self: *Self, table_name: []const u8) !*std.Thread.RwLock { self.table_locks_mutex.lock(); defer self.table_locks_mutex.unlock(); - // Check if lock already exists - if (self.table_locks.get(table_name)) |lock| { - return lock; - } + if (self.table_locks.get(table_name)) |lock| return lock; - // Create new lock const lock = try self.allocator.create(std.Thread.RwLock); lock.* = std.Thread.RwLock{}; - - // Store with owned table name const owned_name = try self.allocator.dupe(u8, table_name); errdefer self.allocator.free(owned_name); - try self.table_locks.put(owned_name, lock); return lock; } - /// Remove lock for a table (called during DeleteTable) fn removeTableLock(self: *Self, table_name: []const u8) void { self.table_locks_mutex.lock(); defer self.table_locks_mutex.unlock(); - if (self.table_locks.fetchRemove(table_name)) |kv| { self.allocator.free(kv.key); self.allocator.destroy(kv.value); } } - // === Table Operations === - pub fn createTable( self: *Self, table_name: []const u8, key_schema: []const types.KeySchemaElement, attribute_definitions: []const types.AttributeDefinition, ) StorageError!types.TableDescription { - // Phase 3.2: Acquire write lock for DDL operation const table_lock = try self.getOrCreateTableLock(table_name); table_lock.lock(); defer table_lock.unlock(); - // Check if table already exists const meta_key = try key_codec.buildMetaKey(self.allocator, table_name); defer self.allocator.free(meta_key); @@ -177,9 +172,7 @@ pub const StorageEngine = struct { return StorageError.TableAlreadyExists; } - // Create table metadata const now = std.time.timestamp(); - const metadata = TableMetadata{ .table_name = table_name, .key_schema = key_schema, @@ -188,10 +181,8 @@ pub const StorageEngine = struct { .creation_date_time = now, }; - // Serialize and store with canonical format const meta_value = try self.serializeTableMetadata(metadata); defer self.allocator.free(meta_value); - self.db.put(meta_key, meta_value) catch return StorageError.RocksDBError; return types.TableDescription{ @@ -206,7 +197,6 @@ pub const StorageEngine = struct { } pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void { - // Phase 3.2: Acquire write lock for DDL operation const table_lock = try self.getOrCreateTableLock(table_name); table_lock.lock(); defer table_lock.unlock(); @@ -214,19 +204,16 @@ pub const StorageEngine = struct { const meta_key = try key_codec.buildMetaKey(self.allocator, table_name); defer self.allocator.free(meta_key); - // Verify table exists const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; if (existing == null) return StorageError.TableNotFound; self.allocator.free(existing.?); - // Delete all items with this table's prefix const data_prefix = try key_codec.buildTablePrefix(self.allocator, table_name); defer self.allocator.free(data_prefix); var batch = rocksdb.WriteBatch.init() orelse return StorageError.RocksDBError; defer batch.deinit(); - // Scan and delete all matching keys var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; defer iter.deinit(); @@ -237,19 +224,12 @@ pub const StorageEngine = struct { batch.delete(key); iter.next(); } - - // Delete metadata batch.delete(meta_key); - batch.write(&self.db) catch return StorageError.RocksDBError; - - // Phase 3.2: Remove table lock after successful deletion - // Must defer this after unlock to avoid deadlock defer self.removeTableLock(table_name); } pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription { - // Phase 3.2: Acquire read lock for read operation const table_lock = try self.getOrCreateTableLock(table_name); table_lock.lockShared(); defer table_lock.unlockShared(); @@ -257,7 +237,6 @@ pub const StorageEngine = struct { var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); - // Count items (expensive, but matches DynamoDB behavior) const data_prefix = try key_codec.buildTablePrefix(self.allocator, table_name); defer self.allocator.free(data_prefix); @@ -271,11 +250,9 @@ pub const StorageEngine = struct { while (iter.valid()) { const key = iter.key() orelse break; if (!std.mem.startsWith(u8, key, data_prefix)) break; - const value = iter.value() orelse break; item_count += 1; total_size += value.len; - iter.next(); } @@ -290,6 +267,17 @@ pub const StorageEngine = struct { }; } + pub fn getTableMetadata(self: *Self, table_name: []const u8) StorageError!TableMetadata { + const meta_key = try key_codec.buildMetaKey(self.allocator, table_name); + defer self.allocator.free(meta_key); + + const meta_value = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; + if (meta_value == null) return StorageError.TableNotFound; + defer self.allocator.free(meta_value.?); + + return self.deserializeTableMetadata(meta_value.?); + } + pub fn listTables(self: *Self) StorageError![][]const u8 { var tables = std.ArrayList([]const u8).init(self.allocator); errdefer { @@ -300,46 +288,33 @@ pub const StorageEngine = struct { var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; defer iter.deinit(); - // Seek to metadata entity type const meta_prefix = [_]u8{key_codec.EntityType.meta.toByte()}; iter.seek(&meta_prefix); while (iter.valid()) { const key = iter.key() orelse break; - - // Check if still in metadata namespace if (key.len == 0 or key[0] != key_codec.EntityType.meta.toByte()) break; - // Decode key to extract table name var decoder = key_codec.KeyDecoder.init(key); _ = decoder.readEntityType() catch break; - const table_name = decoder.readSegmentBorrowed() catch break; + const tbl_name = decoder.readSegmentBorrowed() catch break; - const owned_name = self.allocator.dupe(u8, table_name) catch return StorageError.OutOfMemory; + const owned_name = self.allocator.dupe(u8, tbl_name) catch return StorageError.OutOfMemory; tables.append(owned_name) catch return StorageError.OutOfMemory; - iter.next(); } return tables.toOwnedSlice() catch return StorageError.OutOfMemory; } - // === Item Operations === - - /// Store an item in the database - /// Item is serialized to binary TLV format for efficient storage - /// Also maintains secondary index entries (GSI and LSI) pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void { - // Phase 3.2: Acquire read lock (RocksDB handles write concurrency internally) const table_lock = try self.getOrCreateTableLock(table_name); table_lock.lockShared(); defer table_lock.unlockShared(); - // Get table metadata to retrieve key schema and indexes var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); - // Extract key using the new Key struct var key = types.Key.fromItem(self.allocator, item, metadata.key_schema) catch |err| { return switch (err) { error.MissingKeyAttribute => StorageError.MissingKeyAttribute, @@ -349,60 +324,39 @@ pub const StorageEngine = struct { }; defer key.deinit(self.allocator); - // Get raw key values for building storage key const key_values = try key.getValues(); - - // Build storage key using binary encoding - const storage_key = try key_codec.buildDataKey( - self.allocator, - table_name, - key_values.pk, - key_values.sk, - ); + const storage_key = try key_codec.buildDataKey(self.allocator, table_name, key_values.pk, key_values.sk); defer self.allocator.free(storage_key); - // Serialize item to binary TLV format const item_binary = item_codec.encode(self.allocator, item) catch return StorageError.SerializationError; defer self.allocator.free(item_binary); - // Use a write batch to ensure atomicity var batch = rocksdb.WriteBatch.init() orelse return StorageError.RocksDBError; defer batch.deinit(); - - // Write the main item batch.put(storage_key, item_binary); - // Maintain global secondary indexes if (metadata.global_secondary_indexes) |gsis| { for (gsis) |gsi| { try self.maintainGSIEntry(&batch, table_name, &gsi, item, key_values.pk, key_values.sk); } } - - // Maintain local secondary indexes if (metadata.local_secondary_indexes) |lsis| { for (lsis) |lsi| { try self.maintainLSIEntry(&batch, table_name, &lsi, item, key_values.pk, key_values.sk); } } - // Commit the batch atomically batch.write(&self.db) catch return StorageError.RocksDBError; } - /// Retrieve an item from the database - /// Returns a parsed Item (decoded from binary TLV format) pub fn getItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!?types.Item { - // Phase 3.2: Acquire read lock for read operation const table_lock = try self.getOrCreateTableLock(table_name); table_lock.lockShared(); defer table_lock.unlockShared(); - // Get table metadata var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); - // Extract key using the new Key struct var key_struct = types.Key.fromItem(self.allocator, key, metadata.key_schema) catch |err| { return switch (err) { error.MissingKeyAttribute => StorageError.MissingKeyAttribute, @@ -412,37 +366,25 @@ pub const StorageEngine = struct { }; defer key_struct.deinit(self.allocator); - // Get raw key values for building storage key const key_values = try key_struct.getValues(); - - // Build storage key - const storage_key = try key_codec.buildDataKey( - self.allocator, - table_name, - key_values.pk, - key_values.sk, - ); + const storage_key = try key_codec.buildDataKey(self.allocator, table_name, key_values.pk, key_values.sk); defer self.allocator.free(storage_key); const item_binary = self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError; if (item_binary == null) return null; defer self.allocator.free(item_binary.?); - // Decode the binary data back into an Item return item_codec.decode(self.allocator, item_binary.?) catch return StorageError.SerializationError; } pub fn deleteItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!void { - // Phase 3.2: Acquire read lock (RocksDB handles write concurrency) const table_lock = try self.getOrCreateTableLock(table_name); table_lock.lockShared(); defer table_lock.unlockShared(); - // Get table metadata var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); - // Extract key using the new Key struct var key_struct = types.Key.fromItem(self.allocator, key, metadata.key_schema) catch |err| { return switch (err) { error.MissingKeyAttribute => StorageError.MissingKeyAttribute, @@ -452,35 +394,23 @@ pub const StorageEngine = struct { }; defer key_struct.deinit(self.allocator); - // Get raw key values for building storage key const key_values = try key_struct.getValues(); - - // Build storage key - const storage_key = try key_codec.buildDataKey( - self.allocator, - table_name, - key_values.pk, - key_values.sk, - ); + const storage_key = try key_codec.buildDataKey(self.allocator, table_name, key_values.pk, key_values.sk); defer self.allocator.free(storage_key); self.db.delete(storage_key) catch return StorageError.RocksDBError; } - /// Scan a table and return parsed Items (decoded from binary) with pagination - /// Phase 2.5: Added pagination support with ExclusiveStartKey and LastEvaluatedKey pub fn scan( self: *Self, table_name: []const u8, limit: ?usize, - exclusive_start_key: ?[]const u8, // Binary storage key + exclusive_start_key: ?[]const u8, ) StorageError!ScanResult { - // Phase 3.2: Acquire read lock for read operation const table_lock = try self.getOrCreateTableLock(table_name); table_lock.lockShared(); defer table_lock.unlockShared(); - // Verify table exists var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); @@ -499,15 +429,10 @@ pub const StorageEngine = struct { var count: usize = 0; const max_items = limit orelse std.math.maxInt(usize); - // Position iterator based on exclusive_start_key if (exclusive_start_key) |start_key| { - // Seek to the start key and move past it iter.seek(start_key); - if (iter.valid()) { - iter.next(); // Skip the exact match - } + if (iter.valid()) iter.next(); } else { - // Start from beginning of table iter.seek(data_prefix); } @@ -519,21 +444,16 @@ pub const StorageEngine = struct { if (!std.mem.startsWith(u8, key, data_prefix)) break; const value = iter.value() orelse break; - - // Decode the binary data into an Item const item = item_codec.decode(self.allocator, value) catch { iter.next(); continue; }; items.append(item) catch return StorageError.OutOfMemory; - count += 1; - // If we're at the limit, capture the current key as last_evaluated_key if (count >= max_items) { last_key = try self.allocator.dupe(u8, key); } - iter.next(); } @@ -543,25 +463,20 @@ pub const StorageEngine = struct { }; } - /// Query items by partition key with pagination support - /// Phase 2.5: Added pagination support pub fn query( self: *Self, table_name: []const u8, partition_key_value: []const u8, limit: ?usize, - exclusive_start_key: ?[]const u8, // Binary storage key + exclusive_start_key: ?[]const u8, ) StorageError!QueryResult { - // Phase 3.2: Acquire read lock for read operation const table_lock = try self.getOrCreateTableLock(table_name); table_lock.lockShared(); defer table_lock.unlockShared(); - // Verify table exists var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); - // Build prefix for this partition using binary encoding const prefix = try key_codec.buildPartitionPrefix(self.allocator, table_name, partition_key_value); defer self.allocator.free(prefix); @@ -577,20 +492,14 @@ pub const StorageEngine = struct { var count: usize = 0; const max_items = limit orelse std.math.maxInt(usize); - // Position iterator based on exclusive_start_key if (exclusive_start_key) |start_key| { - // Verify the start key is within the partition we're querying if (std.mem.startsWith(u8, start_key, prefix)) { iter.seek(start_key); - if (iter.valid()) { - iter.next(); // Skip the exact match - } + if (iter.valid()) iter.next(); } else { - // Start key is not in this partition, start from beginning iter.seek(prefix); } } else { - // Start from beginning of partition iter.seek(prefix); } @@ -602,21 +511,16 @@ pub const StorageEngine = struct { if (!std.mem.startsWith(u8, key, prefix)) break; const value = iter.value() orelse break; - - // Decode the binary data into an Item const item = item_codec.decode(self.allocator, value) catch { iter.next(); continue; }; items.append(item) catch return StorageError.OutOfMemory; - count += 1; - // If we're at the limit, capture the current key as last_evaluated_key if (count >= max_items) { last_key = try self.allocator.dupe(u8, key); } - iter.next(); } @@ -626,20 +530,6 @@ pub const StorageEngine = struct { }; } - // === Internal Helpers === - - fn getTableMetadata(self: *Self, table_name: []const u8) StorageError!TableMetadata { - const meta_key = try key_codec.buildMetaKey(self.allocator, table_name); - defer self.allocator.free(meta_key); - - const meta_value = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; - if (meta_value == null) return StorageError.TableNotFound; - defer self.allocator.free(meta_value.?); - - return self.deserializeTableMetadata(meta_value.?); - } - - /// Maintain a GSI entry for an item fn maintainGSIEntry( self: *Self, batch: *rocksdb.WriteBatch, @@ -649,16 +539,10 @@ pub const StorageEngine = struct { primary_pk: []const u8, primary_sk: ?[]const u8, ) StorageError!void { - // Extract GSI key using Key.fromItem - var gsi_key = types.Key.fromItem(self.allocator, item, gsi.key_schema) catch { - // Item doesn't have GSI key attributes - skip this index entry - return; - }; + var gsi_key = types.Key.fromItem(self.allocator, item, gsi.key_schema) catch return; defer gsi_key.deinit(self.allocator); const gsi_key_values = try gsi_key.getValues(); - - // Build GSI storage key const gsi_storage_key = try key_codec.buildGSIKey( self.allocator, table_name, @@ -668,18 +552,12 @@ pub const StorageEngine = struct { ); defer self.allocator.free(gsi_storage_key); - // Encode primary key reference as the value - const primary_key_ref = try index_codec.encodePrimaryKeyRef( - self.allocator, - primary_pk, - primary_sk, - ); + const primary_key_ref = try index_codec.encodePrimaryKeyRef(self.allocator, primary_pk, primary_sk); defer self.allocator.free(primary_key_ref); batch.put(gsi_storage_key, primary_key_ref); } - /// Maintain an LSI entry for an item fn maintainLSIEntry( self: *Self, batch: *rocksdb.WriteBatch, @@ -689,15 +567,10 @@ pub const StorageEngine = struct { primary_pk: []const u8, primary_sk: ?[]const u8, ) StorageError!void { - // LSI requires a sort key from the LSI key schema - // Find the sort key attribute (LSI shares partition key with table) var lsi_sk_value: ?[]const u8 = null; for (lsi.key_schema) |ks| { if (ks.key_type == .RANGE) { - const attr = item.get(ks.attribute_name) orelse { - // Item doesn't have LSI sort key - skip this index entry - return; - }; + const attr = item.get(ks.attribute_name) orelse return; lsi_sk_value = switch (attr) { .S => |s| s, .N => |n| n, @@ -708,44 +581,19 @@ pub const StorageEngine = struct { } } - const lsi_sk = lsi_sk_value orelse return; // No sort key found - - // Build LSI storage key - const lsi_storage_key = try key_codec.buildLSIKey( - self.allocator, - table_name, - lsi.index_name, - primary_pk, - lsi_sk, - ); + const lsi_sk = lsi_sk_value orelse return; + const lsi_storage_key = try key_codec.buildLSIKey(self.allocator, table_name, lsi.index_name, primary_pk, lsi_sk); defer self.allocator.free(lsi_storage_key); - // Encode primary key reference as the value - const primary_key_ref = try index_codec.encodePrimaryKeyRef( - self.allocator, - primary_pk, - primary_sk, - ); + const primary_key_ref = try index_codec.encodePrimaryKeyRef(self.allocator, primary_pk, primary_sk); defer self.allocator.free(primary_key_ref); batch.put(lsi_storage_key, primary_key_ref); } - // === Serialization === - - /// Serializable representation of KeySchemaElement for JSON - const KeySchemaElementJson = struct { - AttributeName: []const u8, - KeyType: []const u8, - }; - - /// Serializable representation of AttributeDefinition for JSON - const AttributeDefinitionJson = struct { - AttributeName: []const u8, - AttributeType: []const u8, - }; - - /// Serializable representation of TableMetadata for JSON + // Serialization helpers + const KeySchemaElementJson = struct { AttributeName: []const u8, KeyType: []const u8 }; + const AttributeDefinitionJson = struct { AttributeName: []const u8, AttributeType: []const u8 }; const TableMetadataJson = struct { TableName: []const u8, TableStatus: []const u8, @@ -755,25 +603,16 @@ pub const StorageEngine = struct { }; fn serializeTableMetadata(self: *Self, metadata: TableMetadata) StorageError![]u8 { - // Build JSON-friendly structs var key_schema_json = std.ArrayList(KeySchemaElementJson).init(self.allocator); defer key_schema_json.deinit(); - for (metadata.key_schema) |ks| { - try key_schema_json.append(.{ - .AttributeName = ks.attribute_name, - .KeyType = ks.key_type.toString(), - }); + try key_schema_json.append(.{ .AttributeName = ks.attribute_name, .KeyType = ks.key_type.toString() }); } var attr_defs_json = std.ArrayList(AttributeDefinitionJson).init(self.allocator); defer attr_defs_json.deinit(); - for (metadata.attribute_definitions) |ad| { - try attr_defs_json.append(.{ - .AttributeName = ad.attribute_name, - .AttributeType = ad.attribute_type.toString(), - }); + try attr_defs_json.append(.{ .AttributeName = ad.attribute_name, .AttributeType = ad.attribute_type.toString() }); } const metadata_json = TableMetadataJson{ @@ -784,12 +623,9 @@ pub const StorageEngine = struct { .AttributeDefinitions = attr_defs_json.items, }; - // Use std.json.stringify for clean, reliable serialization var buf = std.ArrayList(u8).init(self.allocator); errdefer buf.deinit(); - try std.json.stringify(metadata_json, .{}, buf.writer()); - return buf.toOwnedSlice() catch return StorageError.OutOfMemory; } @@ -802,7 +638,6 @@ pub const StorageEngine = struct { else => return StorageError.SerializationError, }; - // Extract table name const table_name_val = root.get("TableName") orelse return StorageError.SerializationError; const table_name_str = switch (table_name_val) { .string => |s| s, @@ -811,29 +646,19 @@ pub const StorageEngine = struct { const table_name = self.allocator.dupe(u8, table_name_str) catch return StorageError.OutOfMemory; errdefer self.allocator.free(table_name); - // Extract table status const status_val = root.get("TableStatus") orelse return StorageError.SerializationError; const status_str = switch (status_val) { .string => |s| s, else => return StorageError.SerializationError, }; - const table_status: types.TableStatus = if (std.mem.eql(u8, status_str, "ACTIVE")) - .ACTIVE - else if (std.mem.eql(u8, status_str, "CREATING")) - .CREATING - else if (std.mem.eql(u8, status_str, "DELETING")) - .DELETING - else - .ACTIVE; + const table_status: types.TableStatus = if (std.mem.eql(u8, status_str, "ACTIVE")) .ACTIVE else if (std.mem.eql(u8, status_str, "CREATING")) .CREATING else if (std.mem.eql(u8, status_str, "DELETING")) .DELETING else .ACTIVE; - // Extract creation time const creation_val = root.get("CreationDateTime") orelse return StorageError.SerializationError; const creation_date_time = switch (creation_val) { .integer => |i| i, else => return StorageError.SerializationError, }; - // Extract key schema const key_schema_val = root.get("KeySchema") orelse return StorageError.SerializationError; const key_schema_array = switch (key_schema_val) { .array => |a| a, @@ -851,7 +676,6 @@ pub const StorageEngine = struct { .object => |o| o, else => return StorageError.SerializationError, }; - const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError; const attr_name_str = switch (attr_name_val) { .string => |s| s, @@ -867,13 +691,9 @@ pub const StorageEngine = struct { }; const key_type = types.KeyType.fromString(key_type_str) orelse return StorageError.SerializationError; - key_schema.append(.{ - .attribute_name = attr_name, - .key_type = key_type, - }) catch return StorageError.OutOfMemory; + key_schema.append(.{ .attribute_name = attr_name, .key_type = key_type }) catch return StorageError.OutOfMemory; } - // Extract attribute definitions const attr_defs_val = root.get("AttributeDefinitions") orelse return StorageError.SerializationError; const attr_defs_array = switch (attr_defs_val) { .array => |a| a, @@ -891,7 +711,6 @@ pub const StorageEngine = struct { .object => |o| o, else => return StorageError.SerializationError, }; - const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError; const attr_name_str = switch (attr_name_val) { .string => |s| s, @@ -907,10 +726,7 @@ pub const StorageEngine = struct { }; const attr_type = types.ScalarAttributeType.fromString(attr_type_str) orelse return StorageError.SerializationError; - attr_defs.append(.{ - .attribute_name = attr_name, - .attribute_type = attr_type, - }) catch return StorageError.OutOfMemory; + attr_defs.append(.{ .attribute_name = attr_name, .attribute_type = attr_type }) catch return StorageError.OutOfMemory; } return TableMetadata{ diff --git a/src/main.zig b/src/main.zig index d8587c8..129eb8f 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,5 +1,4 @@ /// ZynamoDB - A DynamoDB-compatible database using RocksDB -/// Phase 3: Concurrency support with proper allocator strategy const std = @import("std"); const http = @import("http.zig"); const rocksdb = @import("rocksdb.zig"); @@ -18,19 +17,14 @@ pub fn main() !void { defer _ = gpa.deinit(); const allocator = gpa.allocator(); - // Parse command line args const config = try parseArgs(allocator); - - // Print banner printBanner(config); - // Ensure data directory exists std.fs.cwd().makePath(config.data_dir) catch |err| { std.log.err("Failed to create data directory: {any}", .{err}); return; }; - // Initialize storage engine (uses main allocator for persistent data) var engine = storage.StorageEngine.init(allocator, config.data_dir) catch |err| { std.log.err("Failed to initialize storage: {any}", .{err}); return; @@ -39,26 +33,20 @@ pub fn main() !void { std.log.info("Storage engine initialized at {s}", .{config.data_dir}); - // Initialize API handler - // Phase 3.3: Handler is no longer global, passed as context var api_handler = handler.ApiHandler.init(allocator, &engine); - // Server configuration const server_config = http.ServerConfig{ - .max_body_size = 100 * 1024 * 1024, // 100MB + .max_body_size = 100 * 1024 * 1024, .enable_keep_alive = true, .max_requests_per_connection = 1000, }; - // Start HTTP server with context - // Phase 3.3: Pass handler context explicitly, no global state - // Phase HTTP: Using stdlib http.Server with proper chunked/keep-alive support var server = try http.Server.init( allocator, config.host, config.port, - handler.ApiHandler.handleRequest, // Function pointer - @ptrCast(&api_handler), // Context pointer + handler.ApiHandler.handleRequest, + @ptrCast(&api_handler), server_config, ); defer server.stop(); @@ -74,7 +62,6 @@ fn parseArgs(allocator: std.mem.Allocator) !Config { var args = try std.process.argsWithAllocator(allocator); defer args.deinit(); - // Skip program name _ = args.next(); while (args.next()) |arg| { @@ -88,7 +75,6 @@ fn parseArgs(allocator: std.mem.Allocator) !Config { } } else if (std.mem.eql(u8, arg, "--data-dir") or std.mem.eql(u8, arg, "-d")) { if (args.next()) |dir| { - // Need sentinel-terminated string for RocksDB const owned = try allocator.dupeZ(u8, dir); config.data_dir = owned; } @@ -100,7 +86,6 @@ fn parseArgs(allocator: std.mem.Allocator) !Config { } } - // Check environment variables if (std.posix.getenv("DYNAMODB_PORT")) |port_str| { config.port = std.fmt.parseInt(u16, port_str, 10) catch config.port; } @@ -124,14 +109,6 @@ fn printHelp() void { \\ -v, --verbose Enable verbose logging \\ --help Show this help message \\ - \\Environment Variables: - \\ DYNAMODB_PORT Override port - \\ ROCKSDB_DATA_DIR Override data directory - \\ - \\Examples: - \\ zynamodb # Start with defaults - \\ zynamodb -p 8080 -d /var/lib/db # Custom port and data dir - \\ ; std.debug.print("{s}", .{help}); } @@ -158,7 +135,6 @@ fn printBanner(config: Config) void { std.debug.print(" Port: {d} | Data Dir: {s}\n\n", .{ config.port, config.data_dir }); } -// Re-export modules for testing pub const _rocksdb = rocksdb; pub const _http = http; pub const _storage = storage;