fix pagniation
This commit is contained in:
@@ -4,6 +4,7 @@ const http = @import("../http.zig");
|
|||||||
const storage = @import("storage.zig");
|
const storage = @import("storage.zig");
|
||||||
const types = @import("types.zig");
|
const types = @import("types.zig");
|
||||||
const json = @import("json.zig");
|
const json = @import("json.zig");
|
||||||
|
const key_codec = @import("../key_codec.zig");
|
||||||
|
|
||||||
pub const ApiHandler = struct {
|
pub const ApiHandler = struct {
|
||||||
engine: *storage.StorageEngine,
|
engine: *storage.StorageEngine,
|
||||||
@@ -341,11 +342,59 @@ pub const ApiHandler = struct {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Get table metadata to access key schema (for pagination)
|
||||||
|
const metadata = self.engine.describeTable(table_name) catch |err| {
|
||||||
|
switch (err) {
|
||||||
|
storage.StorageError.TableNotFound => {
|
||||||
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
||||||
|
},
|
||||||
|
else => {
|
||||||
|
_ = self.errorResponse(response, .InternalServerError, "Failed to access table");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Parse limit
|
||||||
|
const limit = json.parseLimit(self.allocator, request.body) catch null;
|
||||||
|
|
||||||
|
// Parse ExclusiveStartKey
|
||||||
|
var start_key_opt = json.parseExclusiveStartKey(self.allocator, request.body, metadata.key_schema) catch |err| {
|
||||||
|
const msg = switch (err) {
|
||||||
|
error.MissingKeyAttribute => "ExclusiveStartKey missing required attributes",
|
||||||
|
error.InvalidKeyType => "ExclusiveStartKey has invalid key type",
|
||||||
|
else => "Invalid ExclusiveStartKey format",
|
||||||
|
};
|
||||||
|
_ = self.errorResponse(response, .ValidationException, msg);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
defer if (start_key_opt) |*key| key.deinit(self.allocator);
|
||||||
|
|
||||||
|
// Convert Key to binary storage key if present
|
||||||
|
var start_key_binary: ?[]u8 = null;
|
||||||
|
defer if (start_key_binary) |k| self.allocator.free(k);
|
||||||
|
|
||||||
|
if (start_key_opt) |start_key| {
|
||||||
|
const key_values = start_key.getValues() catch {
|
||||||
|
_ = self.errorResponse(response, .ValidationException, "Invalid ExclusiveStartKey");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
start_key_binary = key_codec.buildDataKey(
|
||||||
|
self.allocator,
|
||||||
|
table_name,
|
||||||
|
key_values.pk,
|
||||||
|
key_values.sk,
|
||||||
|
) catch {
|
||||||
|
_ = self.errorResponse(response, .InternalServerError, "Failed to encode start key");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// Simplified: extract partition key value from ExpressionAttributeValues
|
// Simplified: extract partition key value from ExpressionAttributeValues
|
||||||
// PHASE 6 TODO: Implement proper expression parsing
|
// PHASE 6 TODO: Implement proper expression parsing
|
||||||
const pk_value = extractSimpleValue(request.body, ":pk") orelse "default";
|
const pk_value = extractSimpleValue(request.body, ":pk") orelse "default";
|
||||||
|
|
||||||
const items = self.engine.query(table_name, pk_value, null) catch |err| {
|
var result = self.engine.query(table_name, pk_value, limit, start_key_binary) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
||||||
@@ -356,12 +405,9 @@ pub const ApiHandler = struct {
|
|||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer {
|
defer result.deinit(self.allocator);
|
||||||
for (items) |item| json.deinitItem(&item, self.allocator);
|
|
||||||
self.allocator.free(items);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.writeItemsResponse(response, items);
|
self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, metadata.key_schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleScan(self: *Self, request: *const http.Request, response: *http.Response) void {
|
fn handleScan(self: *Self, request: *const http.Request, response: *http.Response) void {
|
||||||
@@ -371,7 +417,55 @@ pub const ApiHandler = struct {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
const items = self.engine.scan(table_name, null) catch |err| {
|
// Get table metadata to access key schema (for pagination)
|
||||||
|
const metadata = self.engine.describeTable(table_name) catch |err| {
|
||||||
|
switch (err) {
|
||||||
|
storage.StorageError.TableNotFound => {
|
||||||
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
||||||
|
},
|
||||||
|
else => {
|
||||||
|
_ = self.errorResponse(response, .InternalServerError, "Failed to access table");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Parse limit
|
||||||
|
const limit = json.parseLimit(self.allocator, request.body) catch null;
|
||||||
|
|
||||||
|
// Parse ExclusiveStartKey
|
||||||
|
var start_key_opt = json.parseExclusiveStartKey(self.allocator, request.body, metadata.key_schema) catch |err| {
|
||||||
|
const msg = switch (err) {
|
||||||
|
error.MissingKeyAttribute => "ExclusiveStartKey missing required attributes",
|
||||||
|
error.InvalidKeyType => "ExclusiveStartKey has invalid key type",
|
||||||
|
else => "Invalid ExclusiveStartKey format",
|
||||||
|
};
|
||||||
|
_ = self.errorResponse(response, .ValidationException, msg);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
defer if (start_key_opt) |*key| key.deinit(self.allocator);
|
||||||
|
|
||||||
|
// Convert Key to binary storage key if present
|
||||||
|
var start_key_binary: ?[]u8 = null;
|
||||||
|
defer if (start_key_binary) |k| self.allocator.free(k);
|
||||||
|
|
||||||
|
if (start_key_opt) |start_key| {
|
||||||
|
const key_values = start_key.getValues() catch {
|
||||||
|
_ = self.errorResponse(response, .ValidationException, "Invalid ExclusiveStartKey");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
start_key_binary = key_codec.buildDataKey(
|
||||||
|
self.allocator,
|
||||||
|
table_name,
|
||||||
|
key_values.pk,
|
||||||
|
key_values.sk,
|
||||||
|
) catch {
|
||||||
|
_ = self.errorResponse(response, .InternalServerError, "Failed to encode start key");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
var result = self.engine.scan(table_name, limit, start_key_binary) catch |err| {
|
||||||
switch (err) {
|
switch (err) {
|
||||||
storage.StorageError.TableNotFound => {
|
storage.StorageError.TableNotFound => {
|
||||||
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
|
||||||
@@ -382,15 +476,18 @@ pub const ApiHandler = struct {
|
|||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer {
|
defer result.deinit(self.allocator);
|
||||||
for (items) |item| json.deinitItem(&item, self.allocator);
|
|
||||||
self.allocator.free(items);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.writeItemsResponse(response, items);
|
self.writeItemsResponseWithPagination(response, result.items, result.last_evaluated_key, metadata.key_schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn writeItemsResponse(self: *Self, response: *http.Response, items: []const types.Item) void {
|
fn writeItemsResponseWithPagination(
|
||||||
|
self: *Self,
|
||||||
|
response: *http.Response,
|
||||||
|
items: []const types.Item,
|
||||||
|
last_evaluated_key_binary: ?[]const u8,
|
||||||
|
key_schema: []const types.KeySchemaElement,
|
||||||
|
) void {
|
||||||
var buf = std.ArrayList(u8).init(self.allocator);
|
var buf = std.ArrayList(u8).init(self.allocator);
|
||||||
defer buf.deinit();
|
defer buf.deinit();
|
||||||
const writer = buf.writer();
|
const writer = buf.writer();
|
||||||
@@ -398,14 +495,93 @@ pub const ApiHandler = struct {
|
|||||||
writer.writeAll("{\"Items\":[") catch return;
|
writer.writeAll("{\"Items\":[") catch return;
|
||||||
for (items, 0..) |item, i| {
|
for (items, 0..) |item, i| {
|
||||||
if (i > 0) writer.writeByte(',') catch return;
|
if (i > 0) writer.writeByte(',') catch return;
|
||||||
// Serialize each item to canonical JSON
|
|
||||||
json.serializeItemToWriter(writer, item) catch return;
|
json.serializeItemToWriter(writer, item) catch return;
|
||||||
}
|
}
|
||||||
writer.print("],\"Count\":{d},\"ScannedCount\":{d}}}", .{ items.len, items.len }) catch return;
|
writer.print("],\"Count\":{d},\"ScannedCount\":{d}", .{ items.len, items.len }) catch return;
|
||||||
|
|
||||||
|
// Add LastEvaluatedKey if pagination is needed
|
||||||
|
if (last_evaluated_key_binary) |binary_key| {
|
||||||
|
// Decode binary storage key back to Key struct
|
||||||
|
var decoder = key_codec.KeyDecoder.init(binary_key);
|
||||||
|
|
||||||
|
// Skip entity type
|
||||||
|
_ = decoder.readEntityType() catch {
|
||||||
|
writer.writeAll("}") catch {};
|
||||||
|
response.setBody(buf.items) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Skip table name segment
|
||||||
|
_ = decoder.readSegmentBorrowed() catch {
|
||||||
|
writer.writeAll("}") catch {};
|
||||||
|
response.setBody(buf.items) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read partition key
|
||||||
|
const pk_bytes = decoder.readSegmentBorrowed() catch {
|
||||||
|
writer.writeAll("}") catch {};
|
||||||
|
response.setBody(buf.items) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read sort key if present
|
||||||
|
var sk_bytes: ?[]const u8 = null;
|
||||||
|
if (decoder.hasMore()) {
|
||||||
|
sk_bytes = decoder.readSegmentBorrowed() catch null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build Key struct from raw bytes
|
||||||
|
// We need to determine the type (S/N/B) from key schema
|
||||||
|
var key = self.buildKeyFromBytes(pk_bytes, sk_bytes, key_schema) catch {
|
||||||
|
writer.writeAll("}") catch {};
|
||||||
|
response.setBody(buf.items) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
defer key.deinit(self.allocator);
|
||||||
|
|
||||||
|
// Serialize Key as DynamoDB JSON
|
||||||
|
const lek_json = json.serializeLastEvaluatedKey(self.allocator, key, key_schema) catch {
|
||||||
|
writer.writeAll("}") catch {};
|
||||||
|
response.setBody(buf.items) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
defer self.allocator.free(lek_json);
|
||||||
|
|
||||||
|
writer.print(",\"LastEvaluatedKey\":{s}", .{lek_json}) catch {};
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.writeAll("}") catch return;
|
||||||
response.setBody(buf.items) catch {};
|
response.setBody(buf.items) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Helper to build a Key struct from raw bytes and key schema
|
||||||
|
/// This reconstructs the AttributeValue with the correct type (S/N/B)
|
||||||
|
/// TODO Phase 3: Need attribute_definitions to properly determine types
|
||||||
|
/// For now, assumes all keys are strings (S) which covers 95% of use cases
|
||||||
|
fn buildKeyFromBytes(
|
||||||
|
self: *Self,
|
||||||
|
pk_bytes: []const u8,
|
||||||
|
sk_bytes: ?[]const u8,
|
||||||
|
_: []const types.KeySchemaElement, // key_schema - TODO: use in Phase 3 with attribute_definitions
|
||||||
|
) !types.Key {
|
||||||
|
// TODO Phase 3: Use key_schema + attribute_definitions to determine correct type
|
||||||
|
// For now, assume all keys are strings (most common case)
|
||||||
|
|
||||||
|
const pk_attr = types.AttributeValue{ .S = try self.allocator.dupe(u8, pk_bytes) };
|
||||||
|
errdefer self.allocator.free(pk_attr.S);
|
||||||
|
|
||||||
|
var sk_attr: ?types.AttributeValue = null;
|
||||||
|
if (sk_bytes) |sk| {
|
||||||
|
sk_attr = types.AttributeValue{ .S = try self.allocator.dupe(u8, sk) };
|
||||||
|
}
|
||||||
|
|
||||||
|
return types.Key{
|
||||||
|
.pk = pk_attr,
|
||||||
|
.sk = sk_attr,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
fn errorResponse(self: *Self, response: *http.Response, err_type: types.DynamoDBErrorType, message: []const u8) http.Response {
|
fn errorResponse(self: *Self, response: *http.Response, err_type: types.DynamoDBErrorType, message: []const u8) http.Response {
|
||||||
response.setStatus(switch (err_type) {
|
response.setStatus(switch (err_type) {
|
||||||
.ResourceNotFoundException => .not_found,
|
.ResourceNotFoundException => .not_found,
|
||||||
@@ -448,5 +624,5 @@ pub fn httpHandler(request: *const http.Request, allocator: std.mem.Allocator) h
|
|||||||
var response = http.Response.init(allocator);
|
var response = http.Response.init(allocator);
|
||||||
response.setStatus(.internal_server_error);
|
response.setStatus(.internal_server_error);
|
||||||
response.setBody("{\"error\":\"Handler not initialized\"}") catch {};
|
response.setBody("{\"error\":\"Handler not initialized\"}") catch {};
|
||||||
return response;
|
return response.*;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -397,6 +397,70 @@ pub fn parseKeyFromRequest(allocator: std.mem.Allocator, request_body: []const u
|
|||||||
return try parseItemFromValue(allocator, key_val);
|
return try parseItemFromValue(allocator, key_val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Pagination Helpers (Phase 2.5)
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// Parse ExclusiveStartKey from request body
|
||||||
|
/// Returns null if not present, owned Key if present
|
||||||
|
/// Caller must call key.deinit() when done
|
||||||
|
pub fn parseExclusiveStartKey(
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
request_body: []const u8,
|
||||||
|
key_schema: []const types.KeySchemaElement,
|
||||||
|
) !?types.Key {
|
||||||
|
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, request_body, .{});
|
||||||
|
defer parsed.deinit();
|
||||||
|
|
||||||
|
const root = switch (parsed.value) {
|
||||||
|
.object => |o| o,
|
||||||
|
else => return error.InvalidRequest,
|
||||||
|
};
|
||||||
|
|
||||||
|
const key_val = root.get("ExclusiveStartKey") orelse return null;
|
||||||
|
|
||||||
|
// Parse as Item first, then convert to Key
|
||||||
|
var key_item = try parseItemFromValue(allocator, key_val);
|
||||||
|
defer deinitItem(&key_item, allocator);
|
||||||
|
|
||||||
|
// Validate and extract key using Key.fromItem
|
||||||
|
return try types.Key.fromItem(allocator, key_item, key_schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse Limit from request body
|
||||||
|
/// Returns null if not present
|
||||||
|
pub fn parseLimit(allocator: std.mem.Allocator, request_body: []const u8) !?usize {
|
||||||
|
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, request_body, .{});
|
||||||
|
defer parsed.deinit();
|
||||||
|
|
||||||
|
const root = switch (parsed.value) {
|
||||||
|
.object => |o| o,
|
||||||
|
else => return error.InvalidRequest,
|
||||||
|
};
|
||||||
|
|
||||||
|
const limit_val = root.get("Limit") orelse return null;
|
||||||
|
const limit_int = switch (limit_val) {
|
||||||
|
.integer => |i| i,
|
||||||
|
else => return error.InvalidLimit,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (limit_int < 0) return error.InvalidLimit;
|
||||||
|
return @intCast(limit_int);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serialize a Key as LastEvaluatedKey in DynamoDB JSON format
|
||||||
|
/// Caller owns returned slice and must free it
|
||||||
|
pub fn serializeLastEvaluatedKey(
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
key: types.Key,
|
||||||
|
key_schema: []const types.KeySchemaElement,
|
||||||
|
) ![]u8 {
|
||||||
|
var key_item = try key.toItem(allocator, key_schema);
|
||||||
|
defer deinitItem(&key_item, allocator);
|
||||||
|
|
||||||
|
return try serializeItem(allocator, key_item);
|
||||||
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Storage Helpers
|
// Storage Helpers
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
@@ -404,6 +468,7 @@ pub fn parseKeyFromRequest(allocator: std.mem.Allocator, request_body: []const u
|
|||||||
/// Extract just the key attributes from an item based on key schema
|
/// Extract just the key attributes from an item based on key schema
|
||||||
/// Returns a new Item containing only the key attributes (deep copied)
|
/// Returns a new Item containing only the key attributes (deep copied)
|
||||||
/// Caller owns returned Item and must call deinitItem() when done
|
/// Caller owns returned Item and must call deinitItem() when done
|
||||||
|
/// DEPRECATED: Use types.Key.fromItem() instead
|
||||||
pub fn extractKeyAttributes(
|
pub fn extractKeyAttributes(
|
||||||
allocator: std.mem.Allocator,
|
allocator: std.mem.Allocator,
|
||||||
item: types.Item,
|
item: types.Item,
|
||||||
@@ -430,7 +495,8 @@ pub fn extractKeyAttributes(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Deep copy an AttributeValue
|
/// Deep copy an AttributeValue
|
||||||
fn deepCopyAttributeValue(allocator: std.mem.Allocator, attr: types.AttributeValue) !types.AttributeValue {
|
/// Made public for use by the Key struct and other modules
|
||||||
|
pub fn deepCopyAttributeValue(allocator: std.mem.Allocator, attr: types.AttributeValue) !types.AttributeValue {
|
||||||
return switch (attr) {
|
return switch (attr) {
|
||||||
.S => |s| types.AttributeValue{ .S = try allocator.dupe(u8, s) },
|
.S => |s| types.AttributeValue{ .S = try allocator.dupe(u8, s) },
|
||||||
.N => |n| types.AttributeValue{ .N = try allocator.dupe(u8, n) },
|
.N => |n| types.AttributeValue{ .N = try allocator.dupe(u8, n) },
|
||||||
|
|||||||
@@ -18,6 +18,30 @@ pub const StorageError = error{
|
|||||||
OutOfMemory,
|
OutOfMemory,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Result type for Scan operations with pagination
|
||||||
|
pub const ScanResult = struct {
|
||||||
|
items: []types.Item,
|
||||||
|
last_evaluated_key: ?[]u8, // Binary-encoded storage key
|
||||||
|
|
||||||
|
pub fn deinit(self: *ScanResult, allocator: std.mem.Allocator) void {
|
||||||
|
for (self.items) |*item| json.deinitItem(item, allocator);
|
||||||
|
allocator.free(self.items);
|
||||||
|
if (self.last_evaluated_key) |key| allocator.free(key);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Result type for Query operations with pagination
|
||||||
|
pub const QueryResult = struct {
|
||||||
|
items: []types.Item,
|
||||||
|
last_evaluated_key: ?[]u8, // Binary-encoded storage key
|
||||||
|
|
||||||
|
pub fn deinit(self: *QueryResult, allocator: std.mem.Allocator) void {
|
||||||
|
for (self.items) |*item| json.deinitItem(item, allocator);
|
||||||
|
allocator.free(self.items);
|
||||||
|
if (self.last_evaluated_key) |key| allocator.free(key);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/// In-memory representation of table metadata with versioning
|
/// In-memory representation of table metadata with versioning
|
||||||
/// Schema version allows for future migrations
|
/// Schema version allows for future migrations
|
||||||
const TableMetadata = struct {
|
const TableMetadata = struct {
|
||||||
@@ -247,12 +271,18 @@ pub const StorageEngine = struct {
|
|||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
|
|
||||||
// Extract key values from item
|
// Extract key using the new Key struct
|
||||||
const key_values = try self.extractKeyValues(item, metadata.key_schema);
|
var key = types.Key.fromItem(self.allocator, item, metadata.key_schema) catch |err| {
|
||||||
defer {
|
return switch (err) {
|
||||||
self.allocator.free(key_values.pk);
|
error.MissingKeyAttribute => StorageError.MissingKeyAttribute,
|
||||||
if (key_values.sk) |sk| self.allocator.free(sk);
|
error.InvalidKeyType => StorageError.InvalidKey,
|
||||||
}
|
else => StorageError.RocksDBError,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
defer key.deinit(self.allocator);
|
||||||
|
|
||||||
|
// Get raw key values for building storage key
|
||||||
|
const key_values = try key.getValues();
|
||||||
|
|
||||||
// Build storage key using binary encoding
|
// Build storage key using binary encoding
|
||||||
const storage_key = try key_codec.buildDataKey(
|
const storage_key = try key_codec.buildDataKey(
|
||||||
@@ -299,12 +329,18 @@ pub const StorageEngine = struct {
|
|||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
|
|
||||||
// Extract key values
|
// Extract key using the new Key struct
|
||||||
const key_values = try self.extractKeyValues(key, metadata.key_schema);
|
var key_struct = types.Key.fromItem(self.allocator, key, metadata.key_schema) catch |err| {
|
||||||
defer {
|
return switch (err) {
|
||||||
self.allocator.free(key_values.pk);
|
error.MissingKeyAttribute => StorageError.MissingKeyAttribute,
|
||||||
if (key_values.sk) |sk| self.allocator.free(sk);
|
error.InvalidKeyType => StorageError.InvalidKey,
|
||||||
}
|
else => StorageError.RocksDBError,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
defer key_struct.deinit(self.allocator);
|
||||||
|
|
||||||
|
// Get raw key values for building storage key
|
||||||
|
const key_values = try key_struct.getValues();
|
||||||
|
|
||||||
// Build storage key
|
// Build storage key
|
||||||
const storage_key = try key_codec.buildDataKey(
|
const storage_key = try key_codec.buildDataKey(
|
||||||
@@ -328,12 +364,18 @@ pub const StorageEngine = struct {
|
|||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
|
|
||||||
// Extract key values
|
// Extract key using the new Key struct
|
||||||
const key_values = try self.extractKeyValues(key, metadata.key_schema);
|
var key_struct = types.Key.fromItem(self.allocator, key, metadata.key_schema) catch |err| {
|
||||||
defer {
|
return switch (err) {
|
||||||
self.allocator.free(key_values.pk);
|
error.MissingKeyAttribute => StorageError.MissingKeyAttribute,
|
||||||
if (key_values.sk) |sk| self.allocator.free(sk);
|
error.InvalidKeyType => StorageError.InvalidKey,
|
||||||
}
|
else => StorageError.RocksDBError,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
defer key_struct.deinit(self.allocator);
|
||||||
|
|
||||||
|
// Get raw key values for building storage key
|
||||||
|
const key_values = try key_struct.getValues();
|
||||||
|
|
||||||
// Build storage key
|
// Build storage key
|
||||||
const storage_key = try key_codec.buildDataKey(
|
const storage_key = try key_codec.buildDataKey(
|
||||||
@@ -347,8 +389,14 @@ pub const StorageEngine = struct {
|
|||||||
self.db.delete(storage_key) catch return StorageError.RocksDBError;
|
self.db.delete(storage_key) catch return StorageError.RocksDBError;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Scan a table and return parsed Items (decoded from binary)
|
/// Scan a table and return parsed Items (decoded from binary) with pagination
|
||||||
pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![]types.Item {
|
/// Phase 2.5: Added pagination support with ExclusiveStartKey and LastEvaluatedKey
|
||||||
|
pub fn scan(
|
||||||
|
self: *Self,
|
||||||
|
table_name: []const u8,
|
||||||
|
limit: ?usize,
|
||||||
|
exclusive_start_key: ?[]const u8, // Binary storage key
|
||||||
|
) StorageError!ScanResult {
|
||||||
// Verify table exists
|
// Verify table exists
|
||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
@@ -368,7 +416,21 @@ pub const StorageEngine = struct {
|
|||||||
var count: usize = 0;
|
var count: usize = 0;
|
||||||
const max_items = limit orelse std.math.maxInt(usize);
|
const max_items = limit orelse std.math.maxInt(usize);
|
||||||
|
|
||||||
iter.seek(data_prefix);
|
// Position iterator based on exclusive_start_key
|
||||||
|
if (exclusive_start_key) |start_key| {
|
||||||
|
// Seek to the start key and move past it
|
||||||
|
iter.seek(start_key);
|
||||||
|
if (iter.valid()) {
|
||||||
|
iter.next(); // Skip the exact match
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Start from beginning of table
|
||||||
|
iter.seek(data_prefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
var last_key: ?[]u8 = null;
|
||||||
|
errdefer if (last_key) |k| self.allocator.free(k);
|
||||||
|
|
||||||
while (iter.valid() and count < max_items) {
|
while (iter.valid() and count < max_items) {
|
||||||
const key = iter.key() orelse break;
|
const key = iter.key() orelse break;
|
||||||
if (!std.mem.startsWith(u8, key, data_prefix)) break;
|
if (!std.mem.startsWith(u8, key, data_prefix)) break;
|
||||||
@@ -383,14 +445,30 @@ pub const StorageEngine = struct {
|
|||||||
items.append(item) catch return StorageError.OutOfMemory;
|
items.append(item) catch return StorageError.OutOfMemory;
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
|
|
||||||
|
// If we're at the limit, capture the current key as last_evaluated_key
|
||||||
|
if (count >= max_items) {
|
||||||
|
last_key = try self.allocator.dupe(u8, key);
|
||||||
|
}
|
||||||
|
|
||||||
iter.next();
|
iter.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
return items.toOwnedSlice() catch return StorageError.OutOfMemory;
|
return ScanResult{
|
||||||
|
.items = items.toOwnedSlice() catch return StorageError.OutOfMemory,
|
||||||
|
.last_evaluated_key = last_key,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Query items by partition key and return parsed Items
|
/// Query items by partition key with pagination support
|
||||||
pub fn query(self: *Self, table_name: []const u8, partition_key_value: []const u8, limit: ?usize) StorageError![]types.Item {
|
/// Phase 2.5: Added pagination support
|
||||||
|
pub fn query(
|
||||||
|
self: *Self,
|
||||||
|
table_name: []const u8,
|
||||||
|
partition_key_value: []const u8,
|
||||||
|
limit: ?usize,
|
||||||
|
exclusive_start_key: ?[]const u8, // Binary storage key
|
||||||
|
) StorageError!QueryResult {
|
||||||
// Verify table exists
|
// Verify table exists
|
||||||
var metadata = try self.getTableMetadata(table_name);
|
var metadata = try self.getTableMetadata(table_name);
|
||||||
defer metadata.deinit(self.allocator);
|
defer metadata.deinit(self.allocator);
|
||||||
@@ -411,7 +489,26 @@ pub const StorageEngine = struct {
|
|||||||
var count: usize = 0;
|
var count: usize = 0;
|
||||||
const max_items = limit orelse std.math.maxInt(usize);
|
const max_items = limit orelse std.math.maxInt(usize);
|
||||||
|
|
||||||
iter.seek(prefix);
|
// Position iterator based on exclusive_start_key
|
||||||
|
if (exclusive_start_key) |start_key| {
|
||||||
|
// Verify the start key is within the partition we're querying
|
||||||
|
if (std.mem.startsWith(u8, start_key, prefix)) {
|
||||||
|
iter.seek(start_key);
|
||||||
|
if (iter.valid()) {
|
||||||
|
iter.next(); // Skip the exact match
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Start key is not in this partition, start from beginning
|
||||||
|
iter.seek(prefix);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Start from beginning of partition
|
||||||
|
iter.seek(prefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
var last_key: ?[]u8 = null;
|
||||||
|
errdefer if (last_key) |k| self.allocator.free(k);
|
||||||
|
|
||||||
while (iter.valid() and count < max_items) {
|
while (iter.valid() and count < max_items) {
|
||||||
const key = iter.key() orelse break;
|
const key = iter.key() orelse break;
|
||||||
if (!std.mem.startsWith(u8, key, prefix)) break;
|
if (!std.mem.startsWith(u8, key, prefix)) break;
|
||||||
@@ -426,10 +523,19 @@ pub const StorageEngine = struct {
|
|||||||
items.append(item) catch return StorageError.OutOfMemory;
|
items.append(item) catch return StorageError.OutOfMemory;
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
|
|
||||||
|
// If we're at the limit, capture the current key as last_evaluated_key
|
||||||
|
if (count >= max_items) {
|
||||||
|
last_key = try self.allocator.dupe(u8, key);
|
||||||
|
}
|
||||||
|
|
||||||
iter.next();
|
iter.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
return items.toOwnedSlice() catch return StorageError.OutOfMemory;
|
return QueryResult{
|
||||||
|
.items = items.toOwnedSlice() catch return StorageError.OutOfMemory,
|
||||||
|
.last_evaluated_key = last_key,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// === Internal Helpers ===
|
// === Internal Helpers ===
|
||||||
@@ -445,44 +551,6 @@ pub const StorageEngine = struct {
|
|||||||
return self.deserializeTableMetadata(meta_value.?);
|
return self.deserializeTableMetadata(meta_value.?);
|
||||||
}
|
}
|
||||||
|
|
||||||
const KeyValues = struct {
|
|
||||||
pk: []u8,
|
|
||||||
sk: ?[]u8,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Extract partition key and sort key values from an item
|
|
||||||
/// Caller must free both pk and sk (if present)
|
|
||||||
fn extractKeyValues(self: *Self, item: types.Item, key_schema: []const types.KeySchemaElement) StorageError!KeyValues {
|
|
||||||
var pk: ?[]u8 = null;
|
|
||||||
var sk: ?[]u8 = null;
|
|
||||||
|
|
||||||
for (key_schema) |schema_element| {
|
|
||||||
const attr = item.get(schema_element.attribute_name) orelse
|
|
||||||
return StorageError.MissingKeyAttribute;
|
|
||||||
|
|
||||||
// Extract string value from attribute
|
|
||||||
// DynamoDB keys must be S (string), N (number), or B (binary)
|
|
||||||
const value = switch (attr) {
|
|
||||||
.S => |s| s,
|
|
||||||
.N => |n| n,
|
|
||||||
.B => |b| b,
|
|
||||||
else => return StorageError.InvalidKey,
|
|
||||||
};
|
|
||||||
|
|
||||||
const owned_value = try self.allocator.dupe(u8, value);
|
|
||||||
|
|
||||||
switch (schema_element.key_type) {
|
|
||||||
.HASH => pk = owned_value,
|
|
||||||
.RANGE => sk = owned_value,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return KeyValues{
|
|
||||||
.pk = pk orelse return StorageError.MissingKeyAttribute,
|
|
||||||
.sk = sk,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Maintain a GSI entry for an item
|
/// Maintain a GSI entry for an item
|
||||||
fn maintainGSIEntry(
|
fn maintainGSIEntry(
|
||||||
self: *Self,
|
self: *Self,
|
||||||
@@ -493,25 +561,24 @@ pub const StorageEngine = struct {
|
|||||||
primary_pk: []const u8,
|
primary_pk: []const u8,
|
||||||
primary_sk: ?[]const u8,
|
primary_sk: ?[]const u8,
|
||||||
) StorageError!void {
|
) StorageError!void {
|
||||||
// Extract GSI key values from item
|
// Extract GSI key using Key.fromItem
|
||||||
const gsi_key_values = self.extractKeyValues(item, gsi.key_schema) catch {
|
var gsi_key = types.Key.fromItem(self.allocator, item, gsi.key_schema) catch {
|
||||||
// Item doesn't have GSI key attributes - skip this index entry
|
// Item doesn't have GSI key attributes - skip this index entry
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
defer {
|
defer gsi_key.deinit(self.allocator);
|
||||||
self.allocator.free(gsi_key_values.pk);
|
|
||||||
if (gsi_key_values.sk) |sk| self.allocator.free(sk);
|
const gsi_key_values = try gsi_key.getValues();
|
||||||
}
|
|
||||||
|
|
||||||
// Build GSI storage key
|
// Build GSI storage key
|
||||||
const gsi_key = try key_codec.buildGSIKey(
|
const gsi_storage_key = try key_codec.buildGSIKey(
|
||||||
self.allocator,
|
self.allocator,
|
||||||
table_name,
|
table_name,
|
||||||
gsi.index_name,
|
gsi.index_name,
|
||||||
gsi_key_values.pk,
|
gsi_key_values.pk,
|
||||||
gsi_key_values.sk,
|
gsi_key_values.sk,
|
||||||
);
|
);
|
||||||
defer self.allocator.free(gsi_key);
|
defer self.allocator.free(gsi_storage_key);
|
||||||
|
|
||||||
// Encode primary key reference as the value
|
// Encode primary key reference as the value
|
||||||
const primary_key_ref = try index_codec.encodePrimaryKeyRef(
|
const primary_key_ref = try index_codec.encodePrimaryKeyRef(
|
||||||
@@ -521,7 +588,7 @@ pub const StorageEngine = struct {
|
|||||||
);
|
);
|
||||||
defer self.allocator.free(primary_key_ref);
|
defer self.allocator.free(primary_key_ref);
|
||||||
|
|
||||||
batch.put(gsi_key, primary_key_ref);
|
batch.put(gsi_storage_key, primary_key_ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Maintain an LSI entry for an item
|
/// Maintain an LSI entry for an item
|
||||||
@@ -556,14 +623,14 @@ pub const StorageEngine = struct {
|
|||||||
const lsi_sk = lsi_sk_value orelse return; // No sort key found
|
const lsi_sk = lsi_sk_value orelse return; // No sort key found
|
||||||
|
|
||||||
// Build LSI storage key
|
// Build LSI storage key
|
||||||
const lsi_key = try key_codec.buildLSIKey(
|
const lsi_storage_key = try key_codec.buildLSIKey(
|
||||||
self.allocator,
|
self.allocator,
|
||||||
table_name,
|
table_name,
|
||||||
lsi.index_name,
|
lsi.index_name,
|
||||||
primary_pk,
|
primary_pk,
|
||||||
lsi_sk,
|
lsi_sk,
|
||||||
);
|
);
|
||||||
defer self.allocator.free(lsi_key);
|
defer self.allocator.free(lsi_storage_key);
|
||||||
|
|
||||||
// Encode primary key reference as the value
|
// Encode primary key reference as the value
|
||||||
const primary_key_ref = try index_codec.encodePrimaryKeyRef(
|
const primary_key_ref = try index_codec.encodePrimaryKeyRef(
|
||||||
@@ -573,7 +640,7 @@ pub const StorageEngine = struct {
|
|||||||
);
|
);
|
||||||
defer self.allocator.free(primary_key_ref);
|
defer self.allocator.free(primary_key_ref);
|
||||||
|
|
||||||
batch.put(lsi_key, primary_key_ref);
|
batch.put(lsi_storage_key, primary_key_ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
// === Serialization ===
|
// === Serialization ===
|
||||||
|
|||||||
@@ -17,6 +17,110 @@ pub const AttributeValue = union(enum) {
|
|||||||
|
|
||||||
pub const Item = std.StringHashMap(AttributeValue);
|
pub const Item = std.StringHashMap(AttributeValue);
|
||||||
|
|
||||||
|
/// Represents a DynamoDB key (partition key + optional sort key)
|
||||||
|
/// Owns its memory and must be deinitialized
|
||||||
|
pub const Key = struct {
|
||||||
|
pk: AttributeValue,
|
||||||
|
sk: ?AttributeValue,
|
||||||
|
|
||||||
|
/// Free all memory associated with this key
|
||||||
|
pub fn deinit(self: *Key, allocator: std.mem.Allocator) void {
|
||||||
|
const json_module = @import("dynamodb/json.zig");
|
||||||
|
json_module.deinitAttributeValue(&self.pk, allocator);
|
||||||
|
if (self.sk) |*sk| {
|
||||||
|
json_module.deinitAttributeValue(sk, allocator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract key from an item based on key schema
|
||||||
|
/// Returns owned Key that caller must deinit()
|
||||||
|
pub fn fromItem(
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
item: Item,
|
||||||
|
key_schema: []const KeySchemaElement,
|
||||||
|
) !Key {
|
||||||
|
const json_module = @import("dynamodb/json.zig");
|
||||||
|
|
||||||
|
var pk_value: ?AttributeValue = null;
|
||||||
|
var sk_value: ?AttributeValue = null;
|
||||||
|
|
||||||
|
for (key_schema) |schema_element| {
|
||||||
|
const attr = item.get(schema_element.attribute_name) orelse
|
||||||
|
return error.MissingKeyAttribute;
|
||||||
|
|
||||||
|
// Validate that key is a scalar type (S, N, or B)
|
||||||
|
const is_valid = switch (attr) {
|
||||||
|
.S, .N, .B => true,
|
||||||
|
else => false,
|
||||||
|
};
|
||||||
|
if (!is_valid) return error.InvalidKeyType;
|
||||||
|
|
||||||
|
// Deep copy the attribute value
|
||||||
|
const copied = try json_module.deepCopyAttributeValue(allocator, attr);
|
||||||
|
errdefer json_module.deinitAttributeValue(&copied, allocator);
|
||||||
|
|
||||||
|
switch (schema_element.key_type) {
|
||||||
|
.HASH => pk_value = copied,
|
||||||
|
.RANGE => sk_value = copied,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Key{
|
||||||
|
.pk = pk_value orelse return error.MissingKeyAttribute,
|
||||||
|
.sk = sk_value,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert key to an Item (for API responses)
|
||||||
|
/// Returns owned Item that caller must deinit
|
||||||
|
pub fn toItem(self: Key, allocator: std.mem.Allocator, key_schema: []const KeySchemaElement) !Item {
|
||||||
|
const json_module = @import("dynamodb/json.zig");
|
||||||
|
|
||||||
|
var item = Item.init(allocator);
|
||||||
|
errdefer json_module.deinitItem(&item, allocator);
|
||||||
|
|
||||||
|
for (key_schema) |schema_element| {
|
||||||
|
const attr_value = switch (schema_element.key_type) {
|
||||||
|
.HASH => self.pk,
|
||||||
|
.RANGE => self.sk orelse continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
const attr_name = try allocator.dupe(u8, schema_element.attribute_name);
|
||||||
|
errdefer allocator.free(attr_name);
|
||||||
|
|
||||||
|
const copied_value = try json_module.deepCopyAttributeValue(allocator, attr_value);
|
||||||
|
errdefer json_module.deinitAttributeValue(&copied_value, allocator);
|
||||||
|
|
||||||
|
try item.put(attr_name, copied_value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract raw byte values from key (for building storage keys)
|
||||||
|
/// Returns borrowed slices - caller must NOT free
|
||||||
|
pub fn getValues(self: *const Key) !struct { pk: []const u8, sk: ?[]const u8 } {
|
||||||
|
const pk_bytes = switch (self.pk) {
|
||||||
|
.S => |s| s,
|
||||||
|
.N => |n| n,
|
||||||
|
.B => |b| b,
|
||||||
|
else => return error.InvalidKeyType,
|
||||||
|
};
|
||||||
|
|
||||||
|
var sk_bytes: ?[]const u8 = null;
|
||||||
|
if (self.sk) |sk| {
|
||||||
|
sk_bytes = switch (sk) {
|
||||||
|
.S => |s| s,
|
||||||
|
.N => |n| n,
|
||||||
|
.B => |b| b,
|
||||||
|
else => return error.InvalidKeyType,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return .{ .pk = pk_bytes, .sk = sk_bytes };
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
pub const KeyType = enum {
|
pub const KeyType = enum {
|
||||||
HASH,
|
HASH,
|
||||||
RANGE,
|
RANGE,
|
||||||
|
|||||||
Reference in New Issue
Block a user