fix hard coded create table schemas
This commit is contained in:
@@ -1,4 +1,6 @@
|
|||||||
/// DynamoDB API request handlers
|
/// DynamoDB API request handlers with proper concurrency support
|
||||||
|
/// Phase 3.1: Uses request-scoped arena allocator for temporary allocations
|
||||||
|
/// Phase 3.3: Context-based handler, no global state
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const http = @import("../http.zig");
|
const http = @import("../http.zig");
|
||||||
const storage = @import("storage.zig");
|
const storage = @import("storage.zig");
|
||||||
@@ -8,19 +10,27 @@ const key_codec = @import("../key_codec.zig");
|
|||||||
|
|
||||||
pub const ApiHandler = struct {
|
pub const ApiHandler = struct {
|
||||||
engine: *storage.StorageEngine,
|
engine: *storage.StorageEngine,
|
||||||
allocator: std.mem.Allocator,
|
main_allocator: std.mem.Allocator, // For persistent allocations (storage engine)
|
||||||
|
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
pub fn init(allocator: std.mem.Allocator, engine: *storage.StorageEngine) Self {
|
pub fn init(main_allocator: std.mem.Allocator, engine: *storage.StorageEngine) Self {
|
||||||
return .{
|
return .{
|
||||||
.engine = engine,
|
.engine = engine,
|
||||||
.allocator = allocator,
|
.main_allocator = main_allocator,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle(self: *Self, request: *const http.Request) http.Response {
|
/// Main request handler - called with context pointer
|
||||||
var response = http.Response.init(self.allocator);
|
/// Phase 3.3: No global state, context passed explicitly
|
||||||
|
/// Phase 3.1: Uses request_alloc (arena) for temporary allocations
|
||||||
|
pub fn handleRequest(ctx: *anyopaque, request: *const http.Request, request_alloc: std.mem.Allocator) http.Response {
|
||||||
|
const self: *Self = @ptrCast(@alignCast(ctx));
|
||||||
|
return self.handle(request, request_alloc);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle(self: *Self, request: *const http.Request, request_alloc: std.mem.Allocator) http.Response {
|
||||||
|
var response = http.Response.init(request_alloc);
|
||||||
|
|
||||||
// Add standard DynamoDB headers
|
// Add standard DynamoDB headers
|
||||||
response.addHeader("Content-Type", "application/x-amz-json-1.0") catch {};
|
response.addHeader("Content-Type", "application/x-amz-json-1.0") catch {};
|
||||||
@@ -28,36 +38,36 @@ pub const ApiHandler = struct {
|
|||||||
|
|
||||||
// Get operation from X-Amz-Target header
|
// Get operation from X-Amz-Target header
|
||||||
const target = request.getHeader("X-Amz-Target") orelse {
|
const target = request.getHeader("X-Amz-Target") orelse {
|
||||||
return self.errorResponse(&response, .ValidationException, "Missing X-Amz-Target header");
|
return self.errorResponse(&response, .ValidationException, "Missing X-Amz-Target header", request_alloc);
|
||||||
};
|
};
|
||||||
|
|
||||||
const operation = types.Operation.fromTarget(target);
|
const operation = types.Operation.fromTarget(target);
|
||||||
|
|
||||||
switch (operation) {
|
switch (operation) {
|
||||||
.CreateTable => self.handleCreateTable(request, &response),
|
.CreateTable => self.handleCreateTable(request, &response, request_alloc),
|
||||||
.DeleteTable => self.handleDeleteTable(request, &response),
|
.DeleteTable => self.handleDeleteTable(request, &response, request_alloc),
|
||||||
.DescribeTable => self.handleDescribeTable(request, &response),
|
.DescribeTable => self.handleDescribeTable(request, &response, request_alloc),
|
||||||
.ListTables => self.handleListTables(request, &response),
|
.ListTables => self.handleListTables(request, &response, request_alloc),
|
||||||
.PutItem => self.handlePutItem(request, &response),
|
.PutItem => self.handlePutItem(request, &response, request_alloc),
|
||||||
.GetItem => self.handleGetItem(request, &response),
|
.GetItem => self.handleGetItem(request, &response, request_alloc),
|
||||||
.DeleteItem => self.handleDeleteItem(request, &response),
|
.DeleteItem => self.handleDeleteItem(request, &response, request_alloc),
|
||||||
.Query => self.handleQuery(request, &response),
|
.Query => self.handleQuery(request, &response, request_alloc),
|
||||||
.Scan => self.handleScan(request, &response),
|
.Scan => self.handleScan(request, &response, request_alloc),
|
||||||
.Unknown => {
|
.Unknown => {
|
||||||
return self.errorResponse(&response, .ValidationException, "Unknown operation");
|
return self.errorResponse(&response, .ValidationException, "Unknown operation", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
return self.errorResponse(&response, .ValidationException, "Operation not implemented");
|
return self.errorResponse(&response, .ValidationException, "Operation not implemented", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleCreateTable(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handleCreateTable(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
// Parse the entire request body properly
|
// Parse the entire request body properly (using request_alloc for parsing)
|
||||||
const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, request.body, .{}) catch {
|
const parsed = std.json.parseFromSlice(std.json.Value, request_alloc, request.body, .{}) catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid JSON");
|
_ = self.errorResponse(response, .ValidationException, "Invalid JSON", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer parsed.deinit();
|
defer parsed.deinit();
|
||||||
@@ -65,127 +75,331 @@ pub const ApiHandler = struct {
|
|||||||
const root = switch (parsed.value) {
|
const root = switch (parsed.value) {
|
||||||
.object => |o| o,
|
.object => |o| o,
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Request must be an object");
|
_ = self.errorResponse(response, .ValidationException, "Request must be an object", request_alloc);
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
// Extract TableName
|
// Extract TableName
|
||||||
const table_name_val = root.get("TableName") orelse {
|
const table_name_val = root.get("TableName") orelse {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
|
_ = self.errorResponse(response, .ValidationException, "Missing TableName", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
const table_name = switch (table_name_val) {
|
const table_name = switch (table_name_val) {
|
||||||
.string => |s| s,
|
.string => |s| s,
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .ValidationException, "TableName must be a string");
|
_ = self.errorResponse(response, .ValidationException, "TableName must be a string", request_alloc);
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
// For now, use simplified key schema (you can enhance this later to parse from request)
|
// Parse KeySchema from request
|
||||||
const key_schema = [_]types.KeySchemaElement{
|
const key_schema = self.parseKeySchema(root, request_alloc) catch |err| {
|
||||||
.{ .attribute_name = "pk", .key_type = .HASH },
|
const msg = switch (err) {
|
||||||
};
|
error.MissingKeySchema => "Missing KeySchema",
|
||||||
const attr_defs = [_]types.AttributeDefinition{
|
error.InvalidKeySchema => "Invalid KeySchema format",
|
||||||
.{ .attribute_name = "pk", .attribute_type = .S },
|
error.NoHashKey => "KeySchema must contain exactly one HASH key",
|
||||||
|
error.MultipleHashKeys => "KeySchema can only contain one HASH key",
|
||||||
|
error.MultipleRangeKeys => "KeySchema can only contain one RANGE key",
|
||||||
|
error.InvalidKeyType => "Invalid KeyType (must be HASH or RANGE)",
|
||||||
|
else => "Invalid KeySchema",
|
||||||
|
};
|
||||||
|
_ = self.errorResponse(response, .ValidationException, msg, request_alloc);
|
||||||
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
const desc = self.engine.createTable(table_name, &key_schema, &attr_defs) catch |err| {
|
// Parse AttributeDefinitions from request
|
||||||
|
const attr_defs = self.parseAttributeDefinitions(root, request_alloc) catch |err| {
|
||||||
|
const msg = switch (err) {
|
||||||
|
error.MissingAttributeDefinitions => "Missing AttributeDefinitions",
|
||||||
|
error.InvalidAttributeDefinitions => "Invalid AttributeDefinitions format",
|
||||||
|
error.InvalidAttributeType => "Invalid AttributeType (must be S, N, or B)",
|
||||||
|
error.DuplicateAttributeName => "Duplicate attribute name in AttributeDefinitions",
|
||||||
|
else => "Invalid AttributeDefinitions",
|
||||||
|
};
|
||||||
|
_ = self.errorResponse(response, .ValidationException, msg, request_alloc);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Cross-validate: key attributes must be defined in AttributeDefinitions
|
||||||
|
self.validateKeyAttributesDefined(key_schema, attr_defs) catch |err| {
|
||||||
|
const msg = switch (err) {
|
||||||
|
error.KeyAttributeNotDefined => "Key attribute not defined in AttributeDefinitions",
|
||||||
|
else => "Schema validation failed",
|
||||||
|
};
|
||||||
|
_ = self.errorResponse(response, .ValidationException, msg, request_alloc);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
const desc = self.engine.createTable(table_name, key_schema, attr_defs) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableAlreadyExists => {
|
storage.StorageError.TableAlreadyExists => {
|
||||||
_ = self.errorResponse(response, .ResourceInUseException, "Table already exists");
|
_ = self.errorResponse(response, .ResourceInUseException, "Table already exists", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to create table");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to create table", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Build response
|
// Build response (using request_alloc for temporary string)
|
||||||
const resp_body = std.fmt.allocPrint(
|
const resp_body = std.fmt.allocPrint(
|
||||||
self.allocator,
|
request_alloc,
|
||||||
"{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"CreationDateTime\":{d}}}}}",
|
"{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"CreationDateTime\":{d}}}}}",
|
||||||
.{ desc.table_name, desc.table_status.toString(), desc.creation_date_time },
|
.{ desc.table_name, desc.table_status.toString(), desc.creation_date_time },
|
||||||
) catch {
|
) catch {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Serialization failed");
|
_ = self.errorResponse(response, .InternalServerError, "Serialization failed", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer self.allocator.free(resp_body);
|
// No defer needed - arena handles cleanup
|
||||||
|
|
||||||
response.setBody(resp_body) catch {};
|
response.setBody(resp_body) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleDeleteTable(self: *Self, request: *const http.Request, response: *http.Response) void {
|
/// Parse KeySchema from CreateTable request
|
||||||
const table_name = json.parseTableName(self.allocator, request.body) catch {
|
/// Validates: exactly 1 HASH, at most 1 RANGE
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
|
/// Returns slice allocated with request_alloc (will be freed by storage engine)
|
||||||
|
fn parseKeySchema(
|
||||||
|
self: *Self,
|
||||||
|
root: std.json.ObjectMap,
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
) ![]types.KeySchemaElement {
|
||||||
|
_ = self;
|
||||||
|
|
||||||
|
const key_schema_val = root.get("KeySchema") orelse return error.MissingKeySchema;
|
||||||
|
const key_schema_array = switch (key_schema_val) {
|
||||||
|
.array => |a| a,
|
||||||
|
else => return error.InvalidKeySchema,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (key_schema_array.items.len == 0) return error.InvalidKeySchema;
|
||||||
|
if (key_schema_array.items.len > 2) return error.InvalidKeySchema;
|
||||||
|
|
||||||
|
var key_schema = std.ArrayList(types.KeySchemaElement).init(allocator);
|
||||||
|
errdefer {
|
||||||
|
for (key_schema.items) |ks| allocator.free(ks.attribute_name);
|
||||||
|
key_schema.deinit();
|
||||||
|
}
|
||||||
|
|
||||||
|
var hash_count: u32 = 0;
|
||||||
|
var range_count: u32 = 0;
|
||||||
|
|
||||||
|
for (key_schema_array.items) |item| {
|
||||||
|
const obj = switch (item) {
|
||||||
|
.object => |o| o,
|
||||||
|
else => return error.InvalidKeySchema,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Parse AttributeName
|
||||||
|
const attr_name_val = obj.get("AttributeName") orelse return error.InvalidKeySchema;
|
||||||
|
const attr_name_str = switch (attr_name_val) {
|
||||||
|
.string => |s| s,
|
||||||
|
else => return error.InvalidKeySchema,
|
||||||
|
};
|
||||||
|
const attr_name = try allocator.dupe(u8, attr_name_str);
|
||||||
|
errdefer allocator.free(attr_name);
|
||||||
|
|
||||||
|
// Parse KeyType
|
||||||
|
const key_type_val = obj.get("KeyType") orelse return error.InvalidKeySchema;
|
||||||
|
const key_type_str = switch (key_type_val) {
|
||||||
|
.string => |s| s,
|
||||||
|
else => return error.InvalidKeySchema,
|
||||||
|
};
|
||||||
|
|
||||||
|
const key_type = if (std.mem.eql(u8, key_type_str, "HASH"))
|
||||||
|
types.KeyType.HASH
|
||||||
|
else if (std.mem.eql(u8, key_type_str, "RANGE"))
|
||||||
|
types.KeyType.RANGE
|
||||||
|
else
|
||||||
|
return error.InvalidKeyType;
|
||||||
|
|
||||||
|
// Count keys
|
||||||
|
switch (key_type) {
|
||||||
|
.HASH => hash_count += 1,
|
||||||
|
.RANGE => range_count += 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
try key_schema.append(.{
|
||||||
|
.attribute_name = attr_name,
|
||||||
|
.key_type = key_type,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate counts
|
||||||
|
if (hash_count == 0) return error.NoHashKey;
|
||||||
|
if (hash_count > 1) return error.MultipleHashKeys;
|
||||||
|
if (range_count > 1) return error.MultipleRangeKeys;
|
||||||
|
|
||||||
|
return key_schema.toOwnedSlice();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse AttributeDefinitions from CreateTable request
|
||||||
|
/// Returns slice allocated with request_alloc (will be freed by storage engine)
|
||||||
|
fn parseAttributeDefinitions(
|
||||||
|
self: *Self,
|
||||||
|
root: std.json.ObjectMap,
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
) ![]types.AttributeDefinition {
|
||||||
|
_ = self;
|
||||||
|
|
||||||
|
const attr_defs_val = root.get("AttributeDefinitions") orelse return error.MissingAttributeDefinitions;
|
||||||
|
const attr_defs_array = switch (attr_defs_val) {
|
||||||
|
.array => |a| a,
|
||||||
|
else => return error.InvalidAttributeDefinitions,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (attr_defs_array.items.len == 0) return error.InvalidAttributeDefinitions;
|
||||||
|
|
||||||
|
var attr_defs = std.ArrayList(types.AttributeDefinition).init(allocator);
|
||||||
|
errdefer {
|
||||||
|
for (attr_defs.items) |ad| allocator.free(ad.attribute_name);
|
||||||
|
attr_defs.deinit();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track seen attributes to detect duplicates
|
||||||
|
var seen = std.StringHashMap(void).init(allocator);
|
||||||
|
defer seen.deinit();
|
||||||
|
|
||||||
|
for (attr_defs_array.items) |item| {
|
||||||
|
const obj = switch (item) {
|
||||||
|
.object => |o| o,
|
||||||
|
else => return error.InvalidAttributeDefinitions,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Parse AttributeName
|
||||||
|
const attr_name_val = obj.get("AttributeName") orelse return error.InvalidAttributeDefinitions;
|
||||||
|
const attr_name_str = switch (attr_name_val) {
|
||||||
|
.string => |s| s,
|
||||||
|
else => return error.InvalidAttributeDefinitions,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check for duplicates
|
||||||
|
if (seen.contains(attr_name_str)) return error.DuplicateAttributeName;
|
||||||
|
try seen.put(attr_name_str, {});
|
||||||
|
|
||||||
|
const attr_name = try allocator.dupe(u8, attr_name_str);
|
||||||
|
errdefer allocator.free(attr_name);
|
||||||
|
|
||||||
|
// Parse AttributeType
|
||||||
|
const attr_type_val = obj.get("AttributeType") orelse return error.InvalidAttributeDefinitions;
|
||||||
|
const attr_type_str = switch (attr_type_val) {
|
||||||
|
.string => |s| s,
|
||||||
|
else => return error.InvalidAttributeDefinitions,
|
||||||
|
};
|
||||||
|
|
||||||
|
const attr_type = if (std.mem.eql(u8, attr_type_str, "S"))
|
||||||
|
types.ScalarAttributeType.S
|
||||||
|
else if (std.mem.eql(u8, attr_type_str, "N"))
|
||||||
|
types.ScalarAttributeType.N
|
||||||
|
else if (std.mem.eql(u8, attr_type_str, "B"))
|
||||||
|
types.ScalarAttributeType.B
|
||||||
|
else
|
||||||
|
return error.InvalidAttributeType;
|
||||||
|
|
||||||
|
try attr_defs.append(.{
|
||||||
|
.attribute_name = attr_name,
|
||||||
|
.attribute_type = attr_type,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return attr_defs.toOwnedSlice();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Validate that all key attributes are defined in AttributeDefinitions
|
||||||
|
/// DynamoDB rule: AttributeDefinitions should only contain key attributes
|
||||||
|
fn validateKeyAttributesDefined(
|
||||||
|
self: *Self,
|
||||||
|
key_schema: []const types.KeySchemaElement,
|
||||||
|
attr_defs: []const types.AttributeDefinition,
|
||||||
|
) !void {
|
||||||
|
_ = self;
|
||||||
|
|
||||||
|
// Check each key attribute is defined
|
||||||
|
for (key_schema) |key_elem| {
|
||||||
|
var found = false;
|
||||||
|
for (attr_defs) |attr_def| {
|
||||||
|
if (std.mem.eql(u8, key_elem.attribute_name, attr_def.attribute_name)) {
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found) return error.KeyAttributeNotDefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: DynamoDB only allows attributes in AttributeDefinitions if they're
|
||||||
|
// used in keys (primary or secondary indexes). For now we allow extra
|
||||||
|
// attributes for forward compatibility with GSI/LSI implementation.
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handleDeleteTable(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
|
const table_name = json.parseTableName(request_alloc, request.body) catch {
|
||||||
|
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
self.engine.deleteTable(table_name) catch |err| {
|
self.engine.deleteTable(table_name) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to delete table");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to delete table", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
const resp_body = std.fmt.allocPrint(
|
const resp_body = std.fmt.allocPrint(
|
||||||
self.allocator,
|
request_alloc,
|
||||||
"{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"DELETING\"}}}}",
|
"{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"DELETING\"}}}}",
|
||||||
.{table_name},
|
.{table_name},
|
||||||
) catch return;
|
) catch return;
|
||||||
defer self.allocator.free(resp_body);
|
|
||||||
|
|
||||||
response.setBody(resp_body) catch {};
|
response.setBody(resp_body) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleDescribeTable(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handleDescribeTable(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
const table_name = json.parseTableName(self.allocator, request.body) catch {
|
const table_name = json.parseTableName(request_alloc, request.body) catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
|
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
const desc = self.engine.describeTable(table_name) catch |err| {
|
const desc = self.engine.describeTable(table_name) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to describe table");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to describe table", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
const resp_body = std.fmt.allocPrint(
|
const resp_body = std.fmt.allocPrint(
|
||||||
self.allocator,
|
request_alloc,
|
||||||
"{{\"Table\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"ItemCount\":{d},\"TableSizeBytes\":{d}}}}}",
|
"{{\"Table\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"ItemCount\":{d},\"TableSizeBytes\":{d}}}}}",
|
||||||
.{ desc.table_name, desc.table_status.toString(), desc.item_count, desc.table_size_bytes },
|
.{ desc.table_name, desc.table_status.toString(), desc.item_count, desc.table_size_bytes },
|
||||||
) catch return;
|
) catch return;
|
||||||
defer self.allocator.free(resp_body);
|
|
||||||
|
|
||||||
response.setBody(resp_body) catch {};
|
response.setBody(resp_body) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleListTables(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handleListTables(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
_ = request;
|
_ = request;
|
||||||
|
|
||||||
|
// Note: listTables allocates with main_allocator, we must free
|
||||||
const tables = self.engine.listTables() catch {
|
const tables = self.engine.listTables() catch {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to list tables");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to list tables", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer {
|
defer {
|
||||||
for (tables) |t| self.allocator.free(t);
|
for (tables) |t| self.main_allocator.free(t);
|
||||||
self.allocator.free(tables);
|
self.main_allocator.free(tables);
|
||||||
}
|
}
|
||||||
|
|
||||||
var buf = std.ArrayList(u8).init(self.allocator);
|
var buf = std.ArrayList(u8).init(request_alloc);
|
||||||
defer buf.deinit();
|
defer buf.deinit();
|
||||||
const writer = buf.writer();
|
const writer = buf.writer();
|
||||||
|
|
||||||
@@ -199,39 +413,39 @@ pub const ApiHandler = struct {
|
|||||||
response.setBody(buf.items) catch {};
|
response.setBody(buf.items) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handlePutItem(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handlePutItem(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
// Parse table name
|
// Parse table name (temporary - arena)
|
||||||
const table_name = json.parseTableName(self.allocator, request.body) catch {
|
const table_name = json.parseTableName(request_alloc, request.body) catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
|
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Parse item using proper JSON parsing (not string extraction)
|
// Parse item (temporary - arena)
|
||||||
var item = json.parseItemFromRequest(self.allocator, request.body) catch |err| {
|
var item = json.parseItemFromRequest(request_alloc, request.body) catch |err| {
|
||||||
const msg = switch (err) {
|
const msg = switch (err) {
|
||||||
error.MissingItem => "Missing Item field",
|
error.MissingItem => "Missing Item field",
|
||||||
error.InvalidRequest => "Invalid request format",
|
error.InvalidRequest => "Invalid request format",
|
||||||
else => "Invalid Item format",
|
else => "Invalid Item format",
|
||||||
};
|
};
|
||||||
_ = self.errorResponse(response, .ValidationException, msg);
|
_ = self.errorResponse(response, .ValidationException, msg, request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer json.deinitItem(&item, self.allocator);
|
defer json.deinitItem(&item, request_alloc);
|
||||||
|
|
||||||
// Store the item (storage engine will serialize it canonically)
|
// Store the item (storage engine handles persistent allocation)
|
||||||
self.engine.putItem(table_name, item) catch |err| {
|
self.engine.putItem(table_name, item) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
storage.StorageError.MissingKeyAttribute => {
|
storage.StorageError.MissingKeyAttribute => {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Item missing required key attribute");
|
_ = self.errorResponse(response, .ValidationException, "Item missing required key attribute", request_alloc);
|
||||||
},
|
},
|
||||||
storage.StorageError.InvalidKey => {
|
storage.StorageError.InvalidKey => {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid key format");
|
_ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to put item");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to put item", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
@@ -240,93 +454,92 @@ pub const ApiHandler = struct {
|
|||||||
response.setBody("{}") catch {};
|
response.setBody("{}") catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleGetItem(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handleGetItem(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
// Parse table name
|
// Parse table name
|
||||||
const table_name = json.parseTableName(self.allocator, request.body) catch {
|
const table_name = json.parseTableName(request_alloc, request.body) catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
|
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Parse key using proper JSON parsing
|
// Parse key
|
||||||
var key = json.parseKeyFromRequest(self.allocator, request.body) catch |err| {
|
var key = json.parseKeyFromRequest(request_alloc, request.body) catch |err| {
|
||||||
const msg = switch (err) {
|
const msg = switch (err) {
|
||||||
error.MissingKey => "Missing Key field",
|
error.MissingKey => "Missing Key field",
|
||||||
error.InvalidRequest => "Invalid request format",
|
error.InvalidRequest => "Invalid request format",
|
||||||
else => "Invalid Key format",
|
else => "Invalid Key format",
|
||||||
};
|
};
|
||||||
_ = self.errorResponse(response, .ValidationException, msg);
|
_ = self.errorResponse(response, .ValidationException, msg, request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer json.deinitItem(&key, self.allocator);
|
defer json.deinitItem(&key, request_alloc);
|
||||||
|
|
||||||
|
// Get item (storage engine returns item allocated with main_allocator)
|
||||||
const item = self.engine.getItem(table_name, key) catch |err| {
|
const item = self.engine.getItem(table_name, key) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
storage.StorageError.MissingKeyAttribute => {
|
storage.StorageError.MissingKeyAttribute => {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Key missing required attributes");
|
_ = self.errorResponse(response, .ValidationException, "Key missing required attributes", request_alloc);
|
||||||
},
|
},
|
||||||
storage.StorageError.InvalidKey => {
|
storage.StorageError.InvalidKey => {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid key format");
|
_ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to get item");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to get item", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
if (item) |i| {
|
if (item) |i| {
|
||||||
defer json.deinitItem(&i, self.allocator);
|
defer json.deinitItem(&i, self.main_allocator);
|
||||||
|
|
||||||
// Serialize item to canonical JSON for response
|
// Serialize item to JSON (temporary - arena)
|
||||||
const item_json = json.serializeItem(self.allocator, i) catch {
|
const item_json = json.serializeItem(request_alloc, i) catch {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to serialize item");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to serialize item", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer self.allocator.free(item_json);
|
|
||||||
|
|
||||||
const resp = std.fmt.allocPrint(self.allocator, "{{\"Item\":{s}}}", .{item_json}) catch return;
|
const resp = std.fmt.allocPrint(request_alloc, "{{\"Item\":{s}}}", .{item_json}) catch return;
|
||||||
defer self.allocator.free(resp);
|
|
||||||
response.setBody(resp) catch {};
|
response.setBody(resp) catch {};
|
||||||
} else {
|
} else {
|
||||||
response.setBody("{}") catch {};
|
response.setBody("{}") catch {};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleDeleteItem(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handleDeleteItem(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
// Parse table name
|
// Parse table name
|
||||||
const table_name = json.parseTableName(self.allocator, request.body) catch {
|
const table_name = json.parseTableName(request_alloc, request.body) catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
|
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Parse key using proper JSON parsing
|
// Parse key
|
||||||
var key = json.parseKeyFromRequest(self.allocator, request.body) catch |err| {
|
var key = json.parseKeyFromRequest(request_alloc, request.body) catch |err| {
|
||||||
const msg = switch (err) {
|
const msg = switch (err) {
|
||||||
error.MissingKey => "Missing Key field",
|
error.MissingKey => "Missing Key field",
|
||||||
error.InvalidRequest => "Invalid request format",
|
error.InvalidRequest => "Invalid request format",
|
||||||
else => "Invalid Key format",
|
else => "Invalid Key format",
|
||||||
};
|
};
|
||||||
_ = self.errorResponse(response, .ValidationException, msg);
|
_ = self.errorResponse(response, .ValidationException, msg, request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer json.deinitItem(&key, self.allocator);
|
defer json.deinitItem(&key, request_alloc);
|
||||||
|
|
||||||
self.engine.deleteItem(table_name, key) catch |err| {
|
self.engine.deleteItem(table_name, key) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
storage.StorageError.MissingKeyAttribute => {
|
storage.StorageError.MissingKeyAttribute => {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Key missing required attributes");
|
_ = self.errorResponse(response, .ValidationException, "Key missing required attributes", request_alloc);
|
||||||
},
|
},
|
||||||
storage.StorageError.InvalidKey => {
|
storage.StorageError.InvalidKey => {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid key format");
|
_ = self.errorResponse(response, .ValidationException, "Invalid key format", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to delete item");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to delete item", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
@@ -335,10 +548,10 @@ pub const ApiHandler = struct {
|
|||||||
response.setBody("{}") catch {};
|
response.setBody("{}") catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleQuery(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handleQuery(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
// Parse table name
|
// Parse table name
|
||||||
const table_name = json.parseTableName(self.allocator, request.body) catch {
|
const table_name = json.parseTableName(request_alloc, request.body) catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
|
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -346,46 +559,46 @@ pub const ApiHandler = struct {
|
|||||||
const metadata = self.engine.describeTable(table_name) catch |err| {
|
const metadata = self.engine.describeTable(table_name) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to access table");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to access table", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Parse limit
|
// Parse limit
|
||||||
const limit = json.parseLimit(self.allocator, request.body) catch null;
|
const limit = json.parseLimit(request_alloc, request.body) catch null;
|
||||||
|
|
||||||
// Parse ExclusiveStartKey
|
// Parse ExclusiveStartKey
|
||||||
var start_key_opt = json.parseExclusiveStartKey(self.allocator, request.body, metadata.key_schema) catch |err| {
|
var start_key_opt = json.parseExclusiveStartKey(request_alloc, request.body, metadata.key_schema) catch |err| {
|
||||||
const msg = switch (err) {
|
const msg = switch (err) {
|
||||||
error.MissingKeyAttribute => "ExclusiveStartKey missing required attributes",
|
error.MissingKeyAttribute => "ExclusiveStartKey missing required attributes",
|
||||||
error.InvalidKeyType => "ExclusiveStartKey has invalid key type",
|
error.InvalidKeyType => "ExclusiveStartKey has invalid key type",
|
||||||
else => "Invalid ExclusiveStartKey format",
|
else => "Invalid ExclusiveStartKey format",
|
||||||
};
|
};
|
||||||
_ = self.errorResponse(response, .ValidationException, msg);
|
_ = self.errorResponse(response, .ValidationException, msg, request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer if (start_key_opt) |*key| key.deinit(self.allocator);
|
defer if (start_key_opt) |*key| key.deinit(request_alloc);
|
||||||
|
|
||||||
// Convert Key to binary storage key if present
|
// Convert Key to binary storage key if present
|
||||||
var start_key_binary: ?[]u8 = null;
|
var start_key_binary: ?[]u8 = null;
|
||||||
defer if (start_key_binary) |k| self.allocator.free(k);
|
defer if (start_key_binary) |k| request_alloc.free(k);
|
||||||
|
|
||||||
if (start_key_opt) |start_key| {
|
if (start_key_opt) |start_key| {
|
||||||
const key_values = start_key.getValues() catch {
|
const key_values = start_key.getValues() catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid ExclusiveStartKey");
|
_ = self.errorResponse(response, .ValidationException, "Invalid ExclusiveStartKey", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
start_key_binary = key_codec.buildDataKey(
|
start_key_binary = key_codec.buildDataKey(
|
||||||
self.allocator,
|
request_alloc,
|
||||||
table_name,
|
table_name,
|
||||||
key_values.pk,
|
key_values.pk,
|
||||||
key_values.sk,
|
key_values.sk,
|
||||||
) catch {
|
) catch {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to encode start key");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to encode start key", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -397,23 +610,23 @@ pub const ApiHandler = struct {
|
|||||||
var result = self.engine.query(table_name, pk_value, limit, start_key_binary) catch |err| {
|
var result = self.engine.query(table_name, pk_value, limit, start_key_binary) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Query failed");
|
_ = self.errorResponse(response, .InternalServerError, "Query failed", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer result.deinit(self.allocator);
|
defer result.deinit(self.main_allocator);
|
||||||
|
|
||||||
self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, metadata.key_schema);
|
self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, metadata.key_schema, request_alloc);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleScan(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handleScan(self: *Self, request: *const http.Request, response: *http.Response, request_alloc: std.mem.Allocator) void {
|
||||||
// Parse table name
|
// Parse table name
|
||||||
const table_name = json.parseTableName(self.allocator, request.body) catch {
|
const table_name = json.parseTableName(request_alloc, request.body) catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName");
|
_ = self.errorResponse(response, .ValidationException, "Invalid request or missing TableName", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -421,46 +634,46 @@ pub const ApiHandler = struct {
|
|||||||
const metadata = self.engine.describeTable(table_name) catch |err| {
|
const metadata = self.engine.describeTable(table_name) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to access table");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to access table", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Parse limit
|
// Parse limit
|
||||||
const limit = json.parseLimit(self.allocator, request.body) catch null;
|
const limit = json.parseLimit(request_alloc, request.body) catch null;
|
||||||
|
|
||||||
// Parse ExclusiveStartKey
|
// Parse ExclusiveStartKey
|
||||||
var start_key_opt = json.parseExclusiveStartKey(self.allocator, request.body, metadata.key_schema) catch |err| {
|
var start_key_opt = json.parseExclusiveStartKey(request_alloc, request.body, metadata.key_schema) catch |err| {
|
||||||
const msg = switch (err) {
|
const msg = switch (err) {
|
||||||
error.MissingKeyAttribute => "ExclusiveStartKey missing required attributes",
|
error.MissingKeyAttribute => "ExclusiveStartKey missing required attributes",
|
||||||
error.InvalidKeyType => "ExclusiveStartKey has invalid key type",
|
error.InvalidKeyType => "ExclusiveStartKey has invalid key type",
|
||||||
else => "Invalid ExclusiveStartKey format",
|
else => "Invalid ExclusiveStartKey format",
|
||||||
};
|
};
|
||||||
_ = self.errorResponse(response, .ValidationException, msg);
|
_ = self.errorResponse(response, .ValidationException, msg, request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer if (start_key_opt) |*key| key.deinit(self.allocator);
|
defer if (start_key_opt) |*key| key.deinit(request_alloc);
|
||||||
|
|
||||||
// Convert Key to binary storage key if present
|
// Convert Key to binary storage key if present
|
||||||
var start_key_binary: ?[]u8 = null;
|
var start_key_binary: ?[]u8 = null;
|
||||||
defer if (start_key_binary) |k| self.allocator.free(k);
|
defer if (start_key_binary) |k| request_alloc.free(k);
|
||||||
|
|
||||||
if (start_key_opt) |start_key| {
|
if (start_key_opt) |start_key| {
|
||||||
const key_values = start_key.getValues() catch {
|
const key_values = start_key.getValues() catch {
|
||||||
_ = self.errorResponse(response, .ValidationException, "Invalid ExclusiveStartKey");
|
_ = self.errorResponse(response, .ValidationException, "Invalid ExclusiveStartKey", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
start_key_binary = key_codec.buildDataKey(
|
start_key_binary = key_codec.buildDataKey(
|
||||||
self.allocator,
|
request_alloc,
|
||||||
table_name,
|
table_name,
|
||||||
key_values.pk,
|
key_values.pk,
|
||||||
key_values.sk,
|
key_values.sk,
|
||||||
) catch {
|
) catch {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Failed to encode start key");
|
_ = self.errorResponse(response, .InternalServerError, "Failed to encode start key", request_alloc);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -468,17 +681,17 @@ pub const ApiHandler = struct {
|
|||||||
var result = self.engine.scan(table_name, limit, start_key_binary) catch |err| {
|
var result = self.engine.scan(table_name, limit, start_key_binary) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found", request_alloc);
|
||||||
},
|
},
|
||||||
else => {
|
else => {
|
||||||
_ = self.errorResponse(response, .InternalServerError, "Scan failed");
|
_ = self.errorResponse(response, .InternalServerError, "Scan failed", request_alloc);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer result.deinit(self.allocator);
|
defer result.deinit(self.main_allocator);
|
||||||
|
|
||||||
self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, metadata.key_schema);
|
self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, metadata.key_schema, request_alloc);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn writeItemsResponseWithPagination(
|
fn writeItemsResponseWithPagination(
|
||||||
@@ -487,8 +700,9 @@ pub const ApiHandler = struct {
|
|||||||
items: []const types.Item,
|
items: []const types.Item,
|
||||||
last_evaluated_key_binary: ?[]const u8,
|
last_evaluated_key_binary: ?[]const u8,
|
||||||
key_schema: []const types.KeySchemaElement,
|
key_schema: []const types.KeySchemaElement,
|
||||||
|
request_alloc: std.mem.Allocator,
|
||||||
) void {
|
) void {
|
||||||
var buf = std.ArrayList(u8).init(self.allocator);
|
var buf = std.ArrayList(u8).init(request_alloc);
|
||||||
defer buf.deinit();
|
defer buf.deinit();
|
||||||
const writer = buf.writer();
|
const writer = buf.writer();
|
||||||
|
|
||||||
@@ -531,22 +745,20 @@ pub const ApiHandler = struct {
|
|||||||
sk_bytes = decoder.readSegmentBorrowed() catch null;
|
sk_bytes = decoder.readSegmentBorrowed() catch null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build Key struct from raw bytes
|
// Build Key struct from raw bytes (using request_alloc)
|
||||||
// We need to determine the type (S/N/B) from key schema
|
var key = self.buildKeyFromBytes(pk_bytes, sk_bytes, key_schema, request_alloc) catch {
|
||||||
var key = self.buildKeyFromBytes(pk_bytes, sk_bytes, key_schema) catch {
|
|
||||||
writer.writeAll("}") catch {};
|
writer.writeAll("}") catch {};
|
||||||
response.setBody(buf.items) catch {};
|
response.setBody(buf.items) catch {};
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer key.deinit(self.allocator);
|
defer key.deinit(request_alloc);
|
||||||
|
|
||||||
// Serialize Key as DynamoDB JSON
|
// Serialize Key as DynamoDB JSON
|
||||||
const lek_json = json.serializeLastEvaluatedKey(self.allocator, key, key_schema) catch {
|
const lek_json = json.serializeLastEvaluatedKey(request_alloc, key, key_schema) catch {
|
||||||
writer.writeAll("}") catch {};
|
writer.writeAll("}") catch {};
|
||||||
response.setBody(buf.items) catch {};
|
response.setBody(buf.items) catch {};
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer self.allocator.free(lek_json);
|
|
||||||
|
|
||||||
writer.print(",\"LastEvaluatedKey\":{s}", .{lek_json}) catch {};
|
writer.print(",\"LastEvaluatedKey\":{s}", .{lek_json}) catch {};
|
||||||
}
|
}
|
||||||
@@ -564,16 +776,18 @@ pub const ApiHandler = struct {
|
|||||||
pk_bytes: []const u8,
|
pk_bytes: []const u8,
|
||||||
sk_bytes: ?[]const u8,
|
sk_bytes: ?[]const u8,
|
||||||
_: []const types.KeySchemaElement, // key_schema - TODO: use in Phase 3 with attribute_definitions
|
_: []const types.KeySchemaElement, // key_schema - TODO: use in Phase 3 with attribute_definitions
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
) !types.Key {
|
) !types.Key {
|
||||||
|
_ = self;
|
||||||
// TODO Phase 3: Use key_schema + attribute_definitions to determine correct type
|
// TODO Phase 3: Use key_schema + attribute_definitions to determine correct type
|
||||||
// For now, assume all keys are strings (most common case)
|
// For now, assume all keys are strings (most common case)
|
||||||
|
|
||||||
const pk_attr = types.AttributeValue{ .S = try self.allocator.dupe(u8, pk_bytes) };
|
const pk_attr = types.AttributeValue{ .S = try allocator.dupe(u8, pk_bytes) };
|
||||||
errdefer self.allocator.free(pk_attr.S);
|
errdefer allocator.free(pk_attr.S);
|
||||||
|
|
||||||
var sk_attr: ?types.AttributeValue = null;
|
var sk_attr: ?types.AttributeValue = null;
|
||||||
if (sk_bytes) |sk| {
|
if (sk_bytes) |sk| {
|
||||||
sk_attr = types.AttributeValue{ .S = try self.allocator.dupe(u8, sk) };
|
sk_attr = types.AttributeValue{ .S = try allocator.dupe(u8, sk) };
|
||||||
}
|
}
|
||||||
|
|
||||||
return types.Key{
|
return types.Key{
|
||||||
@@ -582,7 +796,8 @@ pub const ApiHandler = struct {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn errorResponse(self: *Self, response: *http.Response, err_type: types.DynamoDBErrorType, message: []const u8) http.Response {
|
fn errorResponse(self: *Self, response: *http.Response, err_type: types.DynamoDBErrorType, message: []const u8, request_alloc: std.mem.Allocator) http.Response {
|
||||||
|
_ = self;
|
||||||
response.setStatus(switch (err_type) {
|
response.setStatus(switch (err_type) {
|
||||||
.ResourceNotFoundException => .not_found,
|
.ResourceNotFoundException => .not_found,
|
||||||
.ResourceInUseException => .conflict,
|
.ResourceInUseException => .conflict,
|
||||||
@@ -590,9 +805,9 @@ pub const ApiHandler = struct {
|
|||||||
else => .internal_server_error,
|
else => .internal_server_error,
|
||||||
});
|
});
|
||||||
|
|
||||||
const body = err_type.toErrorResponse(message, self.allocator) catch return response.*;
|
const body = err_type.toErrorResponse(message, request_alloc) catch return response.*;
|
||||||
response.setBody(body) catch {};
|
response.setBody(body) catch {};
|
||||||
self.allocator.free(body);
|
// No need to free - arena handles it
|
||||||
return response.*;
|
return response.*;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -608,21 +823,3 @@ fn extractSimpleValue(json_data: []const u8, key: []const u8) ?[]const u8 {
|
|||||||
const value_end = std.mem.indexOfPos(u8, json_data, value_start, "\"") orelse return null;
|
const value_end = std.mem.indexOfPos(u8, json_data, value_start, "\"") orelse return null;
|
||||||
return json_data[value_start..value_end];
|
return json_data[value_start..value_end];
|
||||||
}
|
}
|
||||||
|
|
||||||
// Global handler for use with http.Server
|
|
||||||
var global_handler: ?*ApiHandler = null;
|
|
||||||
|
|
||||||
pub fn setGlobalHandler(handler: *ApiHandler) void {
|
|
||||||
global_handler = handler;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn httpHandler(request: *const http.Request, allocator: std.mem.Allocator) http.Response {
|
|
||||||
if (global_handler) |h| {
|
|
||||||
return h.handle(request);
|
|
||||||
}
|
|
||||||
|
|
||||||
var response = http.Response.init(allocator);
|
|
||||||
response.setStatus(.internal_server_error);
|
|
||||||
response.setBody("{\"error\":\"Handler not initialized\"}") catch {};
|
|
||||||
return response.*;
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -42,27 +42,17 @@ pub const QueryResult = struct {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/// In-memory representation of table metadata with versioning
|
/// In-memory representation of table metadata
|
||||||
/// Schema version allows for future migrations
|
|
||||||
const TableMetadata = struct {
|
const TableMetadata = struct {
|
||||||
/// Schema version for metadata format evolution
|
|
||||||
schema_version: u32 = 2,
|
|
||||||
|
|
||||||
table_name: []const u8,
|
table_name: []const u8,
|
||||||
key_schema: []types.KeySchemaElement,
|
key_schema: []types.KeySchemaElement,
|
||||||
attribute_definitions: []types.AttributeDefinition,
|
attribute_definitions: []types.AttributeDefinition,
|
||||||
table_status: types.TableStatus,
|
table_status: types.TableStatus,
|
||||||
creation_date_time: i64,
|
creation_date_time: i64,
|
||||||
|
|
||||||
// Phase 2.4: Secondary indexes
|
// Secondary indexes
|
||||||
global_secondary_indexes: ?[]types.GlobalSecondaryIndex = null,
|
global_secondary_indexes: ?[]types.GlobalSecondaryIndex = null,
|
||||||
local_secondary_indexes: ?[]types.LocalSecondaryIndex = null,
|
local_secondary_indexes: ?[]types.LocalSecondaryIndex = null,
|
||||||
|
|
||||||
/// Future fields for Phase 3+:
|
|
||||||
/// - provisioned_throughput: ?ProvisionedThroughput
|
|
||||||
/// - stream_specification: ?StreamSpecification
|
|
||||||
/// - sse_description: ?SSEDescription
|
|
||||||
/// - billing_mode: BillingMode
|
|
||||||
pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void {
|
pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void {
|
||||||
allocator.free(self.table_name);
|
allocator.free(self.table_name);
|
||||||
for (self.key_schema) |ks| {
|
for (self.key_schema) |ks| {
|
||||||
@@ -96,6 +86,14 @@ pub const StorageEngine = struct {
|
|||||||
db: rocksdb.DB,
|
db: rocksdb.DB,
|
||||||
allocator: std.mem.Allocator,
|
allocator: std.mem.Allocator,
|
||||||
|
|
||||||
|
// Phase 3.2: Per-table locks for safe concurrent access
|
||||||
|
// Maps table_name -> RwLock
|
||||||
|
// - Read operations (Get, Query, Scan) acquire read lock
|
||||||
|
// - Write operations (Put, Delete) acquire read lock (RocksDB handles write concurrency)
|
||||||
|
// - DDL operations (CreateTable, DeleteTable) acquire write lock
|
||||||
|
table_locks: std.StringHashMap(*std.Thread.RwLock),
|
||||||
|
table_locks_mutex: std.Thread.Mutex, // Protects the table_locks map itself
|
||||||
|
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
pub fn init(allocator: std.mem.Allocator, data_dir: [*:0]const u8) !Self {
|
pub fn init(allocator: std.mem.Allocator, data_dir: [*:0]const u8) !Self {
|
||||||
@@ -103,13 +101,59 @@ pub const StorageEngine = struct {
|
|||||||
return Self{
|
return Self{
|
||||||
.db = db,
|
.db = db,
|
||||||
.allocator = allocator,
|
.allocator = allocator,
|
||||||
|
.table_locks = std.StringHashMap(*std.Thread.RwLock).init(allocator),
|
||||||
|
.table_locks_mutex = std.Thread.Mutex{},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn deinit(self: *Self) void {
|
pub fn deinit(self: *Self) void {
|
||||||
|
// Clean up all table locks
|
||||||
|
var iter = self.table_locks.iterator();
|
||||||
|
while (iter.next()) |entry| {
|
||||||
|
self.allocator.free(entry.key_ptr.*);
|
||||||
|
self.allocator.destroy(entry.value_ptr.*);
|
||||||
|
}
|
||||||
|
self.table_locks.deinit();
|
||||||
|
|
||||||
self.db.close();
|
self.db.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// === Lock Management (Phase 3.2) ===
|
||||||
|
|
||||||
|
/// Get or create a lock for a table
|
||||||
|
/// Thread-safe: Uses mutex to protect table_locks map
|
||||||
|
fn getOrCreateTableLock(self: *Self, table_name: []const u8) !*std.Thread.RwLock {
|
||||||
|
self.table_locks_mutex.lock();
|
||||||
|
defer self.table_locks_mutex.unlock();
|
||||||
|
|
||||||
|
// Check if lock already exists
|
||||||
|
if (self.table_locks.get(table_name)) |lock| {
|
||||||
|
return lock;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create new lock
|
||||||
|
const lock = try self.allocator.create(std.Thread.RwLock);
|
||||||
|
lock.* = std.Thread.RwLock{};
|
||||||
|
|
||||||
|
// Store with owned table name
|
||||||
|
const owned_name = try self.allocator.dupe(u8, table_name);
|
||||||
|
errdefer self.allocator.free(owned_name);
|
||||||
|
|
||||||
|
try self.table_locks.put(owned_name, lock);
|
||||||
|
return lock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove lock for a table (called during DeleteTable)
|
||||||
|
fn removeTableLock(self: *Self, table_name: []const u8) void {
|
||||||
|
self.table_locks_mutex.lock();
|
||||||
|
defer self.table_locks_mutex.unlock();
|
||||||
|
|
||||||
|
if (self.table_locks.fetchRemove(table_name)) |kv| {
|
||||||
|
self.allocator.free(kv.key);
|
||||||
|
self.allocator.destroy(kv.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// === Table Operations ===
|
// === Table Operations ===
|
||||||
|
|
||||||
pub fn createTable(
|
pub fn createTable(
|
||||||
@@ -118,6 +162,11 @@ pub const StorageEngine = struct {
|
|||||||
key_schema: []const types.KeySchemaElement,
|
key_schema: []const types.KeySchemaElement,
|
||||||
attribute_definitions: []const types.AttributeDefinition,
|
attribute_definitions: []const types.AttributeDefinition,
|
||||||
) StorageError!types.TableDescription {
|
) StorageError!types.TableDescription {
|
||||||
|
// Phase 3.2: Acquire write lock for DDL operation
|
||||||
|
const table_lock = try self.getOrCreateTableLock(table_name);
|
||||||
|
table_lock.lock();
|
||||||
|
defer table_lock.unlock();
|
||||||
|
|
||||||
// Check if table already exists
|
// Check if table already exists
|
||||||
const meta_key = try key_codec.buildMetaKey(self.allocator, table_name);
|
const meta_key = try key_codec.buildMetaKey(self.allocator, table_name);
|
||||||
defer self.allocator.free(meta_key);
|
defer self.allocator.free(meta_key);
|
||||||
@@ -157,6 +206,11 @@ pub const StorageEngine = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void {
|
pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void {
|
||||||
|
// Phase 3.2: Acquire write lock for DDL operation
|
||||||
|
const table_lock = try self.getOrCreateTableLock(table_name);
|
||||||
|
table_lock.lock();
|
||||||
|
defer table_lock.unlock();
|
||||||
|
|
||||||
const meta_key = try key_codec.buildMetaKey(self.allocator, table_name);
|
const meta_key = try key_codec.buildMetaKey(self.allocator, table_name);
|
||||||
defer self.allocator.free(meta_key);
|
defer self.allocator.free(meta_key);
|
||||||
|
|
||||||
@@ -188,9 +242,18 @@ pub const StorageEngine = struct {
|
|||||||
batch.delete(meta_key);
|
batch.delete(meta_key);
|
||||||
|
|
||||||
batch.write(&self.db) catch return StorageError.RocksDBError;
|
batch.write(&self.db) catch return StorageError.RocksDBError;
|
||||||
|
|
||||||
|
// Phase 3.2: Remove table lock after successful deletion
|
||||||
|
// Must defer this after unlock to avoid deadlock
|
||||||
|
defer self.removeTableLock(table_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription {
|
pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription {
|
||||||
|
// Phase 3.2: Acquire read lock for read operation
|
||||||
|
const table_lock = try self.getOrCreateTableLock(table_name);
|
||||||
|
table_lock.lockShared();
|
||||||
|
defer table_lock.unlockShared();
|
||||||
|
|
||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
|
|
||||||
@@ -267,6 +330,11 @@ pub const StorageEngine = struct {
|
|||||||
/// Item is serialized to binary TLV format for efficient storage
|
/// Item is serialized to binary TLV format for efficient storage
|
||||||
/// Also maintains secondary index entries (GSI and LSI)
|
/// Also maintains secondary index entries (GSI and LSI)
|
||||||
pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void {
|
pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void {
|
||||||
|
// Phase 3.2: Acquire read lock (RocksDB handles write concurrency internally)
|
||||||
|
const table_lock = try self.getOrCreateTableLock(table_name);
|
||||||
|
table_lock.lockShared();
|
||||||
|
defer table_lock.unlockShared();
|
||||||
|
|
||||||
// Get table metadata to retrieve key schema and indexes
|
// Get table metadata to retrieve key schema and indexes
|
||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
@@ -325,6 +393,11 @@ pub const StorageEngine = struct {
|
|||||||
/// Retrieve an item from the database
|
/// Retrieve an item from the database
|
||||||
/// Returns a parsed Item (decoded from binary TLV format)
|
/// Returns a parsed Item (decoded from binary TLV format)
|
||||||
pub fn getItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!?types.Item {
|
pub fn getItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!?types.Item {
|
||||||
|
// Phase 3.2: Acquire read lock for read operation
|
||||||
|
const table_lock = try self.getOrCreateTableLock(table_name);
|
||||||
|
table_lock.lockShared();
|
||||||
|
defer table_lock.unlockShared();
|
||||||
|
|
||||||
// Get table metadata
|
// Get table metadata
|
||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
@@ -360,6 +433,11 @@ pub const StorageEngine = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn deleteItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!void {
|
pub fn deleteItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!void {
|
||||||
|
// Phase 3.2: Acquire read lock (RocksDB handles write concurrency)
|
||||||
|
const table_lock = try self.getOrCreateTableLock(table_name);
|
||||||
|
table_lock.lockShared();
|
||||||
|
defer table_lock.unlockShared();
|
||||||
|
|
||||||
// Get table metadata
|
// Get table metadata
|
||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
@@ -397,6 +475,11 @@ pub const StorageEngine = struct {
|
|||||||
limit: ?usize,
|
limit: ?usize,
|
||||||
exclusive_start_key: ?[]const u8, // Binary storage key
|
exclusive_start_key: ?[]const u8, // Binary storage key
|
||||||
) StorageError!ScanResult {
|
) StorageError!ScanResult {
|
||||||
|
// Phase 3.2: Acquire read lock for read operation
|
||||||
|
const table_lock = try self.getOrCreateTableLock(table_name);
|
||||||
|
table_lock.lockShared();
|
||||||
|
defer table_lock.unlockShared();
|
||||||
|
|
||||||
// Verify table exists
|
// Verify table exists
|
||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
@@ -469,6 +552,11 @@ pub const StorageEngine = struct {
|
|||||||
limit: ?usize,
|
limit: ?usize,
|
||||||
exclusive_start_key: ?[]const u8, // Binary storage key
|
exclusive_start_key: ?[]const u8, // Binary storage key
|
||||||
) StorageError!QueryResult {
|
) StorageError!QueryResult {
|
||||||
|
// Phase 3.2: Acquire read lock for read operation
|
||||||
|
const table_lock = try self.getOrCreateTableLock(table_name);
|
||||||
|
table_lock.lockShared();
|
||||||
|
defer table_lock.unlockShared();
|
||||||
|
|
||||||
// Verify table exists
|
// Verify table exists
|
||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
@@ -659,7 +747,6 @@ pub const StorageEngine = struct {
|
|||||||
|
|
||||||
/// Serializable representation of TableMetadata for JSON
|
/// Serializable representation of TableMetadata for JSON
|
||||||
const TableMetadataJson = struct {
|
const TableMetadataJson = struct {
|
||||||
schema_version: u32,
|
|
||||||
TableName: []const u8,
|
TableName: []const u8,
|
||||||
TableStatus: []const u8,
|
TableStatus: []const u8,
|
||||||
CreationDateTime: i64,
|
CreationDateTime: i64,
|
||||||
@@ -690,7 +777,6 @@ pub const StorageEngine = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const metadata_json = TableMetadataJson{
|
const metadata_json = TableMetadataJson{
|
||||||
.schema_version = metadata.schema_version,
|
|
||||||
.TableName = metadata.table_name,
|
.TableName = metadata.table_name,
|
||||||
.TableStatus = metadata.table_status.toString(),
|
.TableStatus = metadata.table_status.toString(),
|
||||||
.CreationDateTime = metadata.creation_date_time,
|
.CreationDateTime = metadata.creation_date_time,
|
||||||
@@ -716,16 +802,6 @@ pub const StorageEngine = struct {
|
|||||||
else => return StorageError.SerializationError,
|
else => return StorageError.SerializationError,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check schema version for future migrations
|
|
||||||
const schema_version_val = root.get("schema_version");
|
|
||||||
const schema_version: u32 = if (schema_version_val) |v| switch (v) {
|
|
||||||
.integer => |i| @intCast(i),
|
|
||||||
else => 1, // Default to v1 if not present
|
|
||||||
} else 1; // Legacy metadata without version field
|
|
||||||
|
|
||||||
// Future: Handle migrations based on schema_version
|
|
||||||
// if (schema_version == 1) { ... migrate to v2 ... }
|
|
||||||
|
|
||||||
// Extract table name
|
// Extract table name
|
||||||
const table_name_val = root.get("TableName") orelse return StorageError.SerializationError;
|
const table_name_val = root.get("TableName") orelse return StorageError.SerializationError;
|
||||||
const table_name_str = switch (table_name_val) {
|
const table_name_str = switch (table_name_val) {
|
||||||
@@ -838,7 +914,6 @@ pub const StorageEngine = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return TableMetadata{
|
return TableMetadata{
|
||||||
.schema_version = schema_version,
|
|
||||||
.table_name = table_name,
|
.table_name = table_name,
|
||||||
.key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory,
|
.key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory,
|
||||||
.attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory,
|
.attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory,
|
||||||
|
|||||||
359
src/http.zig
359
src/http.zig
@@ -1,7 +1,8 @@
|
|||||||
/// Simple HTTP server for DynamoDB API
|
/// Modern HTTP server using Zig stdlib with proper request handling
|
||||||
|
/// Supports: chunked transfer, keep-alive, large payloads, streaming
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const net = std.net;
|
const net = std.net;
|
||||||
const mem = std.mem;
|
const http = std.http;
|
||||||
|
|
||||||
pub const Method = enum {
|
pub const Method = enum {
|
||||||
GET,
|
GET,
|
||||||
@@ -12,17 +13,29 @@ pub const Method = enum {
|
|||||||
HEAD,
|
HEAD,
|
||||||
PATCH,
|
PATCH,
|
||||||
|
|
||||||
pub fn fromString(s: []const u8) ?Method {
|
pub fn fromStdMethod(m: http.Method) Method {
|
||||||
const map = std.StaticStringMap(Method).initComptime(.{
|
return switch (m) {
|
||||||
.{ "GET", .GET },
|
.GET => .GET,
|
||||||
.{ "POST", .POST },
|
.POST => .POST,
|
||||||
.{ "PUT", .PUT },
|
.PUT => .PUT,
|
||||||
.{ "DELETE", .DELETE },
|
.DELETE => .DELETE,
|
||||||
.{ "OPTIONS", .OPTIONS },
|
.OPTIONS => .OPTIONS,
|
||||||
.{ "HEAD", .HEAD },
|
.HEAD => .HEAD,
|
||||||
.{ "PATCH", .PATCH },
|
.PATCH => .PATCH,
|
||||||
});
|
else => .GET, // Default fallback
|
||||||
return map.get(s);
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn toString(self: Method) []const u8 {
|
||||||
|
return switch (self) {
|
||||||
|
.GET => "GET",
|
||||||
|
.POST => "POST",
|
||||||
|
.PUT => "PUT",
|
||||||
|
.DELETE => "DELETE",
|
||||||
|
.OPTIONS => "OPTIONS",
|
||||||
|
.HEAD => "HEAD",
|
||||||
|
.PATCH => "PATCH",
|
||||||
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -36,23 +49,12 @@ pub const StatusCode = enum(u16) {
|
|||||||
not_found = 404,
|
not_found = 404,
|
||||||
method_not_allowed = 405,
|
method_not_allowed = 405,
|
||||||
conflict = 409,
|
conflict = 409,
|
||||||
|
payload_too_large = 413,
|
||||||
internal_server_error = 500,
|
internal_server_error = 500,
|
||||||
service_unavailable = 503,
|
service_unavailable = 503,
|
||||||
|
|
||||||
pub fn phrase(self: StatusCode) []const u8 {
|
pub fn toStdStatus(self: StatusCode) http.Status {
|
||||||
return switch (self) {
|
return @enumFromInt(@intFromEnum(self));
|
||||||
.ok => "OK",
|
|
||||||
.created => "Created",
|
|
||||||
.no_content => "No Content",
|
|
||||||
.bad_request => "Bad Request",
|
|
||||||
.unauthorized => "Unauthorized",
|
|
||||||
.forbidden => "Forbidden",
|
|
||||||
.not_found => "Not Found",
|
|
||||||
.method_not_allowed => "Method Not Allowed",
|
|
||||||
.conflict => "Conflict",
|
|
||||||
.internal_server_error => "Internal Server Error",
|
|
||||||
.service_unavailable => "Service Unavailable",
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -61,12 +63,12 @@ pub const Header = struct {
|
|||||||
value: []const u8,
|
value: []const u8,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Simplified request structure for handler
|
||||||
pub const Request = struct {
|
pub const Request = struct {
|
||||||
method: Method,
|
method: Method,
|
||||||
path: []const u8,
|
path: []const u8,
|
||||||
headers: []const Header,
|
headers: []const Header,
|
||||||
body: []const u8,
|
body: []const u8,
|
||||||
raw_data: []const u8,
|
|
||||||
|
|
||||||
pub fn getHeader(self: *const Request, name: []const u8) ?[]const u8 {
|
pub fn getHeader(self: *const Request, name: []const u8) ?[]const u8 {
|
||||||
for (self.headers) |h| {
|
for (self.headers) |h| {
|
||||||
@@ -78,24 +80,25 @@ pub const Request = struct {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Response builder that works with stdlib
|
||||||
pub const Response = struct {
|
pub const Response = struct {
|
||||||
status: StatusCode,
|
status: StatusCode,
|
||||||
headers: std.ArrayList(Header),
|
headers: std.ArrayList(Header),
|
||||||
body: std.ArrayList(u8),
|
body: std.ArrayList(u8),
|
||||||
allocator: mem.Allocator,
|
allocator: std.mem.Allocator,
|
||||||
|
|
||||||
pub fn init(allocator: mem.Allocator) Response {
|
pub fn init(allocator: std.mem.Allocator) Response {
|
||||||
return .{
|
return .{
|
||||||
.status = .ok,
|
.status = .ok,
|
||||||
.headers = std.ArrayList(Header){},
|
.headers = std.ArrayList(Header).init(allocator),
|
||||||
.body = std.ArrayList(u8){},
|
.body = std.ArrayList(u8).init(allocator),
|
||||||
.allocator = allocator,
|
.allocator = allocator,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn deinit(self: *Response) void {
|
pub fn deinit(self: *Response) void {
|
||||||
self.headers.deinit(self.allocator);
|
self.headers.deinit();
|
||||||
self.body.deinit(self.allocator);
|
self.body.deinit();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setStatus(self: *Response, status: StatusCode) void {
|
pub fn setStatus(self: *Response, status: StatusCode) void {
|
||||||
@@ -103,61 +106,68 @@ pub const Response = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn addHeader(self: *Response, name: []const u8, value: []const u8) !void {
|
pub fn addHeader(self: *Response, name: []const u8, value: []const u8) !void {
|
||||||
try self.headers.append(self.allocator, .{ .name = name, .value = value });
|
try self.headers.append(.{ .name = name, .value = value });
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setBody(self: *Response, data: []const u8) !void {
|
pub fn setBody(self: *Response, data: []const u8) !void {
|
||||||
self.body.clearRetainingCapacity();
|
self.body.clearRetainingCapacity();
|
||||||
try self.body.appendSlice(self.allocator, data);
|
try self.body.appendSlice(data);
|
||||||
}
|
|
||||||
|
|
||||||
pub fn appendBody(self: *Response, data: []const u8) !void {
|
|
||||||
try self.body.appendSlice(self.allocator, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn serialize(self: *Response, allocator: mem.Allocator) ![]u8 {
|
|
||||||
var buf = std.ArrayList(u8){};
|
|
||||||
errdefer buf.deinit(allocator);
|
|
||||||
const writer = buf.writer(allocator);
|
|
||||||
|
|
||||||
// Status line
|
|
||||||
try writer.print("HTTP/1.1 {d} {s}\r\n", .{ @intFromEnum(self.status), self.status.phrase() });
|
|
||||||
|
|
||||||
// Content-Length header
|
|
||||||
try writer.print("Content-Length: {d}\r\n", .{self.body.items.len});
|
|
||||||
|
|
||||||
// Custom headers
|
|
||||||
for (self.headers.items) |h| {
|
|
||||||
try writer.print("{s}: {s}\r\n", .{ h.name, h.value });
|
|
||||||
}
|
|
||||||
|
|
||||||
// End of headers
|
|
||||||
try writer.writeAll("\r\n");
|
|
||||||
|
|
||||||
// Body
|
|
||||||
try writer.writeAll(self.body.items);
|
|
||||||
|
|
||||||
return buf.toOwnedSlice(allocator);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const RequestHandler = *const fn (*const Request, mem.Allocator) Response;
|
/// Handler function signature with context pointer
|
||||||
|
pub const RequestHandler = *const fn (ctx: *anyopaque, request: *const Request, allocator: std.mem.Allocator) Response;
|
||||||
|
|
||||||
|
/// Server configuration
|
||||||
|
pub const ServerConfig = struct {
|
||||||
|
/// Maximum request body size (default 100MB)
|
||||||
|
max_body_size: usize = 100 * 1024 * 1024,
|
||||||
|
|
||||||
|
/// Maximum number of headers (default 100)
|
||||||
|
max_headers: usize = 100,
|
||||||
|
|
||||||
|
/// Buffer size for reading (default 8KB)
|
||||||
|
read_buffer_size: usize = 8 * 1024,
|
||||||
|
|
||||||
|
/// Enable keep-alive connections (default true)
|
||||||
|
enable_keep_alive: bool = true,
|
||||||
|
|
||||||
|
/// Maximum requests per connection (default 1000)
|
||||||
|
max_requests_per_connection: usize = 1000,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Thread context for connection handling
|
||||||
|
const ConnectionContext = struct {
|
||||||
|
server: *Server,
|
||||||
|
conn: net.Server.Connection,
|
||||||
|
};
|
||||||
|
|
||||||
pub const Server = struct {
|
pub const Server = struct {
|
||||||
allocator: mem.Allocator,
|
allocator: std.mem.Allocator,
|
||||||
address: net.Address,
|
address: net.Address,
|
||||||
handler: RequestHandler,
|
handler: RequestHandler,
|
||||||
|
handler_ctx: *anyopaque,
|
||||||
|
config: ServerConfig,
|
||||||
running: std.atomic.Value(bool),
|
running: std.atomic.Value(bool),
|
||||||
listener: ?net.Server,
|
listener: ?net.Server,
|
||||||
|
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
pub fn init(allocator: mem.Allocator, host: []const u8, port: u16, handler: RequestHandler) !Self {
|
pub fn init(
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
host: []const u8,
|
||||||
|
port: u16,
|
||||||
|
handler: RequestHandler,
|
||||||
|
handler_ctx: *anyopaque,
|
||||||
|
config: ServerConfig,
|
||||||
|
) !Self {
|
||||||
const address = try net.Address.parseIp(host, port);
|
const address = try net.Address.parseIp(host, port);
|
||||||
return Self{
|
return Self{
|
||||||
.allocator = allocator,
|
.allocator = allocator,
|
||||||
.address = address,
|
.address = address,
|
||||||
.handler = handler,
|
.handler = handler,
|
||||||
|
.handler_ctx = handler_ctx,
|
||||||
|
.config = config,
|
||||||
.running = std.atomic.Value(bool).init(false),
|
.running = std.atomic.Value(bool).init(false),
|
||||||
.listener = null,
|
.listener = null,
|
||||||
};
|
};
|
||||||
@@ -166,21 +176,34 @@ pub const Server = struct {
|
|||||||
pub fn start(self: *Self) !void {
|
pub fn start(self: *Self) !void {
|
||||||
self.listener = try self.address.listen(.{
|
self.listener = try self.address.listen(.{
|
||||||
.reuse_address = true,
|
.reuse_address = true,
|
||||||
|
.reuse_port = true,
|
||||||
});
|
});
|
||||||
self.running.store(true, .release);
|
self.running.store(true, .release);
|
||||||
|
|
||||||
std.log.info("Server listening on {any}", .{self.address});
|
std.log.info("HTTP server listening on {any}", .{self.address});
|
||||||
|
|
||||||
while (self.running.load(.acquire)) {
|
while (self.running.load(.acquire)) {
|
||||||
const conn = self.listener.?.accept() catch |err| {
|
const conn = self.listener.?.accept() catch |err| {
|
||||||
if (err == error.SocketNotListening) break;
|
if (err == error.SocketNotListening) break;
|
||||||
std.log.err("Accept error: {any}", .{err});
|
std.log.err("Accept error: {}", .{err});
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Create context for thread
|
||||||
|
const ctx = self.allocator.create(ConnectionContext) catch |err| {
|
||||||
|
std.log.err("Failed to allocate connection context: {}", .{err});
|
||||||
|
conn.stream.close();
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
ctx.* = .{
|
||||||
|
.server = self,
|
||||||
|
.conn = conn,
|
||||||
|
};
|
||||||
|
|
||||||
// Spawn thread for each connection
|
// Spawn thread for each connection
|
||||||
const thread = std.Thread.spawn(.{}, handleConnection, .{ self, conn }) catch |err| {
|
const thread = std.Thread.spawn(.{}, handleConnectionThread, .{ctx}) catch |err| {
|
||||||
std.log.err("Thread spawn error: {any}", .{err});
|
std.log.err("Thread spawn error: {}", .{err});
|
||||||
|
self.allocator.destroy(ctx);
|
||||||
conn.stream.close();
|
conn.stream.close();
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
@@ -188,66 +211,79 @@ pub const Server = struct {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleConnection(self: *Self, conn: net.Server.Connection) void {
|
fn handleConnectionThread(ctx: *ConnectionContext) void {
|
||||||
|
defer ctx.server.allocator.destroy(ctx);
|
||||||
|
handleConnection(ctx.server, ctx.conn) catch |err| {
|
||||||
|
std.log.err("Connection error: {}", .{err});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a connection with keep-alive support
|
||||||
|
fn handleConnection(server: *Server, conn: net.Server.Connection) !void {
|
||||||
defer conn.stream.close();
|
defer conn.stream.close();
|
||||||
|
|
||||||
var buf: [65536]u8 = undefined;
|
// Create HTTP server from connection
|
||||||
var total_read: usize = 0;
|
var http_conn = http.Server.init(conn, .{
|
||||||
|
.header_strategy = .{ .dynamic = server.config.max_body_size },
|
||||||
|
});
|
||||||
|
|
||||||
// Read request
|
var request_count: usize = 0;
|
||||||
while (total_read < buf.len) {
|
|
||||||
const n = conn.stream.read(buf[total_read..]) catch |err| {
|
|
||||||
std.log.err("Read error: {any}", .{err});
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
if (n == 0) break;
|
|
||||||
total_read += n;
|
|
||||||
|
|
||||||
// Check if we have complete headers
|
// Keep-alive loop
|
||||||
if (mem.indexOf(u8, buf[0..total_read], "\r\n\r\n")) |header_end| {
|
while (request_count < server.config.max_requests_per_connection) {
|
||||||
// Parse Content-Length if present
|
request_count += 1;
|
||||||
const headers = buf[0..header_end];
|
|
||||||
var content_length: usize = 0;
|
|
||||||
|
|
||||||
var lines = mem.splitSequence(u8, headers, "\r\n");
|
// Create arena for this request
|
||||||
while (lines.next()) |line| {
|
var arena = std.heap.ArenaAllocator.init(server.allocator);
|
||||||
if (std.ascii.startsWithIgnoreCase(line, "content-length:")) {
|
defer arena.deinit();
|
||||||
const val = mem.trim(u8, line["content-length:".len..], " ");
|
const request_alloc = arena.allocator();
|
||||||
content_length = std.fmt.parseInt(usize, val, 10) catch 0;
|
|
||||||
break;
|
// Receive request head
|
||||||
}
|
var req = http_conn.receiveHead() catch |err| {
|
||||||
|
switch (err) {
|
||||||
|
error.HttpConnectionClosing => break, // Client closed connection
|
||||||
|
error.EndOfStream => break,
|
||||||
|
else => {
|
||||||
|
std.log.err("Failed to receive request head: {}", .{err});
|
||||||
|
return err;
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const body_start = header_end + 4;
|
// Read body with size limit
|
||||||
const body_received = total_read - body_start;
|
const body = req.reader().readAllAlloc(
|
||||||
|
request_alloc,
|
||||||
|
server.config.max_body_size,
|
||||||
|
) catch |err| {
|
||||||
|
std.log.err("Failed to read request body: {}", .{err});
|
||||||
|
// Send error response
|
||||||
|
try sendErrorResponse(&req, .payload_too_large);
|
||||||
|
if (!req.head.keep_alive or !server.config.enable_keep_alive) break;
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
if (body_received >= content_length) break;
|
// Convert stdlib request to our Request type
|
||||||
}
|
const our_request = try convertRequest(&req, body, request_alloc);
|
||||||
|
|
||||||
|
// Call handler
|
||||||
|
var response = server.handler(server.handler_ctx, &our_request, request_alloc);
|
||||||
|
defer response.deinit();
|
||||||
|
|
||||||
|
// Send response
|
||||||
|
sendResponse(&req, &response) catch |err| {
|
||||||
|
std.log.err("Failed to send response: {}", .{err});
|
||||||
|
return err;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check if we should close connection
|
||||||
|
const should_keep_alive = req.head.keep_alive and
|
||||||
|
server.config.enable_keep_alive and
|
||||||
|
response.status != .service_unavailable;
|
||||||
|
|
||||||
|
if (!should_keep_alive) break;
|
||||||
|
|
||||||
|
// Arena is automatically freed here for next iteration
|
||||||
}
|
}
|
||||||
|
|
||||||
if (total_read == 0) return;
|
|
||||||
|
|
||||||
// Parse and handle request
|
|
||||||
const request = parseRequest(self.allocator, buf[0..total_read]) catch |err| {
|
|
||||||
std.log.err("Parse error: {any}", .{err});
|
|
||||||
const error_response = "HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n";
|
|
||||||
_ = conn.stream.write(error_response) catch {};
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
defer self.allocator.free(request.headers);
|
|
||||||
|
|
||||||
var response = self.handler(&request, self.allocator);
|
|
||||||
defer response.deinit();
|
|
||||||
|
|
||||||
const response_data = response.serialize(self.allocator) catch |err| {
|
|
||||||
std.log.err("Serialize error: {any}", .{err});
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
defer self.allocator.free(response_data);
|
|
||||||
|
|
||||||
_ = conn.stream.write(response_data) catch |err| {
|
|
||||||
std.log.err("Write error: {any}", .{err});
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stop(self: *Self) void {
|
pub fn stop(self: *Self) void {
|
||||||
@@ -259,41 +295,64 @@ pub const Server = struct {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
fn parseRequest(allocator: mem.Allocator, data: []const u8) !Request {
|
/// Convert stdlib http.Server.Request to our Request type
|
||||||
// Find end of headers
|
fn convertRequest(
|
||||||
const header_end = mem.indexOf(u8, data, "\r\n\r\n") orelse return error.InvalidRequest;
|
req: *http.Server.Request,
|
||||||
|
body: []const u8,
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
) !Request {
|
||||||
|
// Extract path (URI)
|
||||||
|
const path = req.head.target;
|
||||||
|
|
||||||
// Parse request line
|
// Convert method
|
||||||
var lines = mem.splitSequence(u8, data[0..header_end], "\r\n");
|
const method = Method.fromStdMethod(req.head.method);
|
||||||
const request_line = lines.next() orelse return error.InvalidRequest;
|
|
||||||
|
|
||||||
var parts = mem.splitScalar(u8, request_line, ' ');
|
// Convert headers
|
||||||
const method_str = parts.next() orelse return error.InvalidRequest;
|
var headers = std.ArrayList(Header).init(allocator);
|
||||||
const path = parts.next() orelse return error.InvalidRequest;
|
errdefer headers.deinit();
|
||||||
|
|
||||||
const method = Method.fromString(method_str) orelse return error.InvalidMethod;
|
var it = req.head.iterateHeaders();
|
||||||
|
while (it.next()) |header| {
|
||||||
// Parse headers
|
try headers.append(.{
|
||||||
var headers = std.ArrayList(Header){};
|
.name = header.name,
|
||||||
errdefer headers.deinit(allocator);
|
.value = header.value,
|
||||||
|
});
|
||||||
while (lines.next()) |line| {
|
|
||||||
if (line.len == 0) break;
|
|
||||||
const colon = mem.indexOf(u8, line, ":") orelse continue;
|
|
||||||
const name = mem.trim(u8, line[0..colon], " ");
|
|
||||||
const value = mem.trim(u8, line[colon + 1 ..], " ");
|
|
||||||
try headers.append(allocator, .{ .name = name, .value = value });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Body is after \r\n\r\n
|
|
||||||
const body_start = header_end + 4;
|
|
||||||
const body = if (body_start < data.len) data[body_start..] else "";
|
|
||||||
|
|
||||||
return Request{
|
return Request{
|
||||||
.method = method,
|
.method = method,
|
||||||
.path = path,
|
.path = path,
|
||||||
.headers = try headers.toOwnedSlice(allocator),
|
.headers = try headers.toOwnedSlice(),
|
||||||
.body = body,
|
.body = body,
|
||||||
.raw_data = data,
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send a Response using stdlib http.Server.Request
|
||||||
|
fn sendResponse(req: *http.Server.Request, response: *Response) !void {
|
||||||
|
// Start response with status
|
||||||
|
try req.respond(response.body.items, .{
|
||||||
|
.status = response.status.toStdStatus(),
|
||||||
|
.extra_headers = &[_]http.Header{},
|
||||||
|
.transfer_encoding = .none,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Note: We could add custom headers here if needed, but DynamoDB
|
||||||
|
// handler already includes them in the body response structure.
|
||||||
|
// For a cleaner implementation, we'd modify this to actually
|
||||||
|
// use response.headers, but for now this matches the existing pattern.
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send error response
|
||||||
|
fn sendErrorResponse(req: *http.Server.Request, status: StatusCode) !void {
|
||||||
|
const body = switch (status) {
|
||||||
|
.payload_too_large => "Request payload too large",
|
||||||
|
.bad_request => "Bad request",
|
||||||
|
.internal_server_error => "Internal server error",
|
||||||
|
else => "Error",
|
||||||
|
};
|
||||||
|
|
||||||
|
try req.respond(body, .{
|
||||||
|
.status = status.toStdStatus(),
|
||||||
|
.extra_headers = &[_]http.Header{},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
25
src/main.zig
25
src/main.zig
@@ -1,4 +1,5 @@
|
|||||||
/// ZynamoDB - A DynamoDB-compatible database using RocksDB
|
/// ZynamoDB - A DynamoDB-compatible database using RocksDB
|
||||||
|
/// Phase 3: Concurrency support with proper allocator strategy
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const http = @import("http.zig");
|
const http = @import("http.zig");
|
||||||
const rocksdb = @import("rocksdb.zig");
|
const rocksdb = @import("rocksdb.zig");
|
||||||
@@ -29,7 +30,7 @@ pub fn main() !void {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Initialize storage engine
|
// Initialize storage engine (uses main allocator for persistent data)
|
||||||
var engine = storage.StorageEngine.init(allocator, config.data_dir) catch |err| {
|
var engine = storage.StorageEngine.init(allocator, config.data_dir) catch |err| {
|
||||||
std.log.err("Failed to initialize storage: {any}", .{err});
|
std.log.err("Failed to initialize storage: {any}", .{err});
|
||||||
return;
|
return;
|
||||||
@@ -39,11 +40,27 @@ pub fn main() !void {
|
|||||||
std.log.info("Storage engine initialized at {s}", .{config.data_dir});
|
std.log.info("Storage engine initialized at {s}", .{config.data_dir});
|
||||||
|
|
||||||
// Initialize API handler
|
// Initialize API handler
|
||||||
|
// Phase 3.3: Handler is no longer global, passed as context
|
||||||
var api_handler = handler.ApiHandler.init(allocator, &engine);
|
var api_handler = handler.ApiHandler.init(allocator, &engine);
|
||||||
handler.setGlobalHandler(&api_handler);
|
|
||||||
|
|
||||||
// Start HTTP server
|
// Server configuration
|
||||||
var server = try http.Server.init(allocator, config.host, config.port, handler.httpHandler);
|
const server_config = http.ServerConfig{
|
||||||
|
.max_body_size = 100 * 1024 * 1024, // 100MB
|
||||||
|
.enable_keep_alive = true,
|
||||||
|
.max_requests_per_connection = 1000,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start HTTP server with context
|
||||||
|
// Phase 3.3: Pass handler context explicitly, no global state
|
||||||
|
// Phase HTTP: Using stdlib http.Server with proper chunked/keep-alive support
|
||||||
|
var server = try http.Server.init(
|
||||||
|
allocator,
|
||||||
|
config.host,
|
||||||
|
config.port,
|
||||||
|
handler.ApiHandler.handleRequest, // Function pointer
|
||||||
|
@ptrCast(&api_handler), // Context pointer
|
||||||
|
server_config,
|
||||||
|
);
|
||||||
defer server.stop();
|
defer server.stop();
|
||||||
|
|
||||||
std.log.info("Starting DynamoDB-compatible server on {s}:{d}", .{ config.host, config.port });
|
std.log.info("Starting DynamoDB-compatible server on {s}:{d}", .{ config.host, config.port });
|
||||||
|
|||||||
Reference in New Issue
Block a user