binary item encoding

This commit is contained in:
2026-01-20 12:36:34 -05:00
parent 7132fc4017
commit 2cdd53ac8b
2 changed files with 489 additions and 46 deletions

View File

@@ -4,6 +4,7 @@ const rocksdb = @import("../rocksdb.zig");
const types = @import("types.zig"); const types = @import("types.zig");
const json = @import("json.zig"); const json = @import("json.zig");
const key_codec = @import("../key_codec.zig"); const key_codec = @import("../key_codec.zig");
const item_codec = @import("../item_codec.zig");
pub const StorageError = error{ pub const StorageError = error{
TableNotFound, TableNotFound,
@@ -16,14 +17,25 @@ pub const StorageError = error{
OutOfMemory, OutOfMemory,
}; };
/// In-memory representation of table metadata /// In-memory representation of table metadata with versioning
/// Schema version allows for future migrations
const TableMetadata = struct { const TableMetadata = struct {
/// Schema version for metadata format evolution
schema_version: u32 = 1,
table_name: []const u8, table_name: []const u8,
key_schema: []types.KeySchemaElement, key_schema: []types.KeySchemaElement,
attribute_definitions: []types.AttributeDefinition, attribute_definitions: []types.AttributeDefinition,
table_status: types.TableStatus, table_status: types.TableStatus,
creation_date_time: i64, creation_date_time: i64,
/// Future fields for Phase 3+:
/// - provisioned_throughput: ?ProvisionedThroughput
/// - global_secondary_indexes: ?[]GlobalSecondaryIndex
/// - local_secondary_indexes: ?[]LocalSecondaryIndex
/// - stream_specification: ?StreamSpecification
/// - sse_description: ?SSEDescription
/// - billing_mode: BillingMode
pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void { pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void {
allocator.free(self.table_name); allocator.free(self.table_name);
for (self.key_schema) |ks| { for (self.key_schema) |ks| {
@@ -209,7 +221,7 @@ pub const StorageEngine = struct {
// === Item Operations === // === Item Operations ===
/// Store an item in the database /// Store an item in the database
/// Item is serialized to canonical JSON before storage /// Item is serialized to binary TLV format for efficient storage
pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void { pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void {
// Get table metadata to retrieve key schema // Get table metadata to retrieve key schema
var metadata = try self.getTableMetadata(table_name); var metadata = try self.getTableMetadata(table_name);
@@ -231,16 +243,16 @@ pub const StorageEngine = struct {
); );
defer self.allocator.free(storage_key); defer self.allocator.free(storage_key);
// Serialize item to canonical JSON for storage // Serialize item to binary TLV format
const item_json = json.serializeItem(self.allocator, item) catch return StorageError.SerializationError; const item_binary = item_codec.encode(self.allocator, item) catch return StorageError.SerializationError;
defer self.allocator.free(item_json); defer self.allocator.free(item_binary);
// Store the canonical JSON // Store the binary data
self.db.put(storage_key, item_json) catch return StorageError.RocksDBError; self.db.put(storage_key, item_binary) catch return StorageError.RocksDBError;
} }
/// Retrieve an item from the database /// Retrieve an item from the database
/// Returns a parsed Item (not JSON string) /// Returns a parsed Item (decoded from binary TLV format)
pub fn getItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!?types.Item { pub fn getItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!?types.Item {
// Get table metadata // Get table metadata
var metadata = try self.getTableMetadata(table_name); var metadata = try self.getTableMetadata(table_name);
@@ -262,12 +274,12 @@ pub const StorageEngine = struct {
); );
defer self.allocator.free(storage_key); defer self.allocator.free(storage_key);
const item_json = self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError; const item_binary = self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError;
if (item_json == null) return null; if (item_binary == null) return null;
defer self.allocator.free(item_json.?); defer self.allocator.free(item_binary.?);
// Parse the stored JSON back into an Item // Decode the binary data back into an Item
return json.parseItem(self.allocator, item_json.?) catch return StorageError.SerializationError; return item_codec.decode(self.allocator, item_binary.?) catch return StorageError.SerializationError;
} }
pub fn deleteItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!void { pub fn deleteItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!void {
@@ -294,7 +306,7 @@ pub const StorageEngine = struct {
self.db.delete(storage_key) catch return StorageError.RocksDBError; self.db.delete(storage_key) catch return StorageError.RocksDBError;
} }
/// Scan a table and return parsed Items (not JSON strings) /// Scan a table and return parsed Items (decoded from binary)
pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![]types.Item { pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![]types.Item {
// Verify table exists // Verify table exists
var metadata = try self.getTableMetadata(table_name); var metadata = try self.getTableMetadata(table_name);
@@ -322,8 +334,8 @@ pub const StorageEngine = struct {
const value = iter.value() orelse break; const value = iter.value() orelse break;
// Parse the stored JSON into an Item // Decode the binary data into an Item
const item = json.parseItem(self.allocator, value) catch { const item = item_codec.decode(self.allocator, value) catch {
iter.next(); iter.next();
continue; continue;
}; };
@@ -365,8 +377,8 @@ pub const StorageEngine = struct {
const value = iter.value() orelse break; const value = iter.value() orelse break;
// Parse the stored JSON into an Item // Decode the binary data into an Item
const item = json.parseItem(self.allocator, value) catch { const item = item_codec.decode(self.allocator, value) catch {
iter.next(); iter.next();
continue; continue;
}; };
@@ -432,38 +444,64 @@ pub const StorageEngine = struct {
// === Serialization === // === Serialization ===
/// Serializable representation of KeySchemaElement for JSON
const KeySchemaElementJson = struct {
AttributeName: []const u8,
KeyType: []const u8,
};
/// Serializable representation of AttributeDefinition for JSON
const AttributeDefinitionJson = struct {
AttributeName: []const u8,
AttributeType: []const u8,
};
/// Serializable representation of TableMetadata for JSON
const TableMetadataJson = struct {
schema_version: u32,
TableName: []const u8,
TableStatus: []const u8,
CreationDateTime: i64,
KeySchema: []const KeySchemaElementJson,
AttributeDefinitions: []const AttributeDefinitionJson,
};
fn serializeTableMetadata(self: *Self, metadata: TableMetadata) StorageError![]u8 { fn serializeTableMetadata(self: *Self, metadata: TableMetadata) StorageError![]u8 {
// Build JSON-friendly structs
var key_schema_json = std.ArrayList(KeySchemaElementJson).init(self.allocator);
defer key_schema_json.deinit();
for (metadata.key_schema) |ks| {
try key_schema_json.append(.{
.AttributeName = ks.attribute_name,
.KeyType = ks.key_type.toString(),
});
}
var attr_defs_json = std.ArrayList(AttributeDefinitionJson).init(self.allocator);
defer attr_defs_json.deinit();
for (metadata.attribute_definitions) |ad| {
try attr_defs_json.append(.{
.AttributeName = ad.attribute_name,
.AttributeType = ad.attribute_type.toString(),
});
}
const metadata_json = TableMetadataJson{
.schema_version = metadata.schema_version,
.TableName = metadata.table_name,
.TableStatus = metadata.table_status.toString(),
.CreationDateTime = metadata.creation_date_time,
.KeySchema = key_schema_json.items,
.AttributeDefinitions = attr_defs_json.items,
};
// Use std.json.stringify for clean, reliable serialization
var buf = std.ArrayList(u8).init(self.allocator); var buf = std.ArrayList(u8).init(self.allocator);
errdefer buf.deinit(); errdefer buf.deinit();
const writer = buf.writer();
writer.writeAll("{\"TableName\":\"") catch return StorageError.SerializationError; try std.json.stringify(metadata_json, .{}, buf.writer());
writer.writeAll(metadata.table_name) catch return StorageError.SerializationError;
writer.writeAll("\",\"TableStatus\":\"") catch return StorageError.SerializationError;
writer.writeAll(metadata.table_status.toString()) catch return StorageError.SerializationError;
writer.print("\",\"CreationDateTime\":{d},\"KeySchema\":[", .{metadata.creation_date_time}) catch return StorageError.SerializationError;
for (metadata.key_schema, 0..) |ks, i| {
if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError;
writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError;
writer.writeAll(ks.attribute_name) catch return StorageError.SerializationError;
writer.writeAll("\",\"KeyType\":\"") catch return StorageError.SerializationError;
writer.writeAll(ks.key_type.toString()) catch return StorageError.SerializationError;
writer.writeAll("\"}") catch return StorageError.SerializationError;
}
writer.writeAll("],\"AttributeDefinitions\":[") catch return StorageError.SerializationError;
for (metadata.attribute_definitions, 0..) |ad, i| {
if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError;
writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError;
writer.writeAll(ad.attribute_name) catch return StorageError.SerializationError;
writer.writeAll("\",\"AttributeType\":\"") catch return StorageError.SerializationError;
writer.writeAll(ad.attribute_type.toString()) catch return StorageError.SerializationError;
writer.writeAll("\"}") catch return StorageError.SerializationError;
}
writer.writeAll("]}") catch return StorageError.SerializationError;
return buf.toOwnedSlice() catch return StorageError.OutOfMemory; return buf.toOwnedSlice() catch return StorageError.OutOfMemory;
} }
@@ -477,6 +515,16 @@ pub const StorageEngine = struct {
else => return StorageError.SerializationError, else => return StorageError.SerializationError,
}; };
// Check schema version for future migrations
const schema_version_val = root.get("schema_version");
const schema_version: u32 = if (schema_version_val) |v| switch (v) {
.integer => |i| @intCast(i),
else => 1, // Default to v1 if not present
} else 1; // Legacy metadata without version field
// Future: Handle migrations based on schema_version
// if (schema_version == 1) { ... migrate to v2 ... }
// Extract table name // Extract table name
const table_name_val = root.get("TableName") orelse return StorageError.SerializationError; const table_name_val = root.get("TableName") orelse return StorageError.SerializationError;
const table_name_str = switch (table_name_val) { const table_name_str = switch (table_name_val) {
@@ -589,6 +637,7 @@ pub const StorageEngine = struct {
} }
return TableMetadata{ return TableMetadata{
.schema_version = schema_version,
.table_name = table_name, .table_name = table_name,
.key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory, .key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory,
.attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory, .attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory,

394
src/item_codec.zig Normal file
View File

@@ -0,0 +1,394 @@
/// Binary TLV (Type-Length-Value) encoding for DynamoDB items
/// Replaces JSON storage with efficient binary format
/// Format: [attribute_count][name_len][name][type_tag][value_len][value]...
const std = @import("std");
const types = @import("dynamodb/types.zig");
/// Type tags for binary encoding (1 byte each)
pub const TypeTag = enum(u8) {
// Scalar types
string = 0x01, // S
number = 0x02, // N (stored as string)
binary = 0x03, // B (base64 string)
boolean = 0x04, // BOOL
null = 0x05, // NULL
// Set types
string_set = 0x10, // SS
number_set = 0x11, // NS
binary_set = 0x12, // BS
// Complex types
list = 0x20, // L
map = 0x21, // M
pub fn toByte(self: TypeTag) u8 {
return @intFromEnum(self);
}
pub fn fromByte(byte: u8) !TypeTag {
return std.meta.intToEnum(TypeTag, byte) catch error.InvalidTypeTag;
}
};
/// Encode an Item to binary TLV format
/// Format: [attribute_count:varint][attributes...]
/// Each attribute: [name_len:varint][name:bytes][type_tag:u8][value_encoded:bytes]
/// Caller owns returned slice and must free it
pub fn encode(allocator: std.mem.Allocator, item: types.Item) ![]u8 {
var buf = std.ArrayList(u8).init(allocator);
errdefer buf.deinit();
const writer = buf.writer();
// Write attribute count
try encodeVarint(writer, item.count());
// Sort keys for deterministic encoding
var keys = std.ArrayList([]const u8).init(allocator);
defer keys.deinit();
var iter = item.iterator();
while (iter.next()) |entry| {
try keys.append(entry.key_ptr.*);
}
std.mem.sort([]const u8, keys.items, {}, struct {
fn lessThan(_: void, a: []const u8, b: []const u8) bool {
return std.mem.lessThan(u8, a, b);
}
}.lessThan);
// Encode each attribute
for (keys.items) |key| {
const value = item.get(key).?;
// Write attribute name
try encodeVarint(writer, key.len);
try writer.writeAll(key);
// Write attribute value
try encodeAttributeValue(writer, value);
}
return buf.toOwnedSlice();
}
/// Decode binary TLV format back into an Item
/// Caller owns returned Item and must call json.deinitItem()
pub fn decode(allocator: std.mem.Allocator, data: []const u8) !types.Item {
var decoder = BinaryDecoder.init(data);
const attr_count = try decoder.readVarint();
var item = types.Item.init(allocator);
errdefer {
var iter = item.iterator();
while (iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
deinitAttributeValue(entry.value_ptr, allocator);
}
item.deinit();
}
var i: usize = 0;
while (i < attr_count) : (i += 1) {
// Read attribute name
const name_len = try decoder.readVarint();
const name = try decoder.readBytes(name_len);
const owned_name = try allocator.dupe(u8, name);
errdefer allocator.free(owned_name);
// Read attribute value
var value = try decodeAttributeValue(&decoder, allocator);
errdefer deinitAttributeValue(&value, allocator);
try item.put(owned_name, value);
}
return item;
}
/// Encode an AttributeValue to binary format
fn encodeAttributeValue(writer: anytype, attr: types.AttributeValue) !void {
switch (attr) {
.S => |s| {
try writer.writeByte(TypeTag.string.toByte());
try encodeVarint(writer, s.len);
try writer.writeAll(s);
},
.N => |n| {
try writer.writeByte(TypeTag.number.toByte());
try encodeVarint(writer, n.len);
try writer.writeAll(n);
},
.B => |b| {
try writer.writeByte(TypeTag.binary.toByte());
try encodeVarint(writer, b.len);
try writer.writeAll(b);
},
.BOOL => |b| {
try writer.writeByte(TypeTag.boolean.toByte());
try writer.writeByte(if (b) 1 else 0);
},
.NULL => {
try writer.writeByte(TypeTag.null.toByte());
// NULL has no value bytes
},
.SS => |ss| {
try writer.writeByte(TypeTag.string_set.toByte());
try encodeVarint(writer, ss.len);
for (ss) |s| {
try encodeVarint(writer, s.len);
try writer.writeAll(s);
}
},
.NS => |ns| {
try writer.writeByte(TypeTag.number_set.toByte());
try encodeVarint(writer, ns.len);
for (ns) |n| {
try encodeVarint(writer, n.len);
try writer.writeAll(n);
}
},
.BS => |bs| {
try writer.writeByte(TypeTag.binary_set.toByte());
try encodeVarint(writer, bs.len);
for (bs) |b| {
try encodeVarint(writer, b.len);
try writer.writeAll(b);
}
},
.L => |list| {
try writer.writeByte(TypeTag.list.toByte());
try encodeVarint(writer, list.len);
for (list) |item| {
try encodeAttributeValue(writer, item);
}
},
.M => |map| {
try writer.writeByte(TypeTag.map.toByte());
try encodeVarint(writer, map.count());
// Sort keys for deterministic encoding
var keys = std.ArrayList([]const u8).init(std.heap.page_allocator);
defer keys.deinit();
var iter = map.iterator();
while (iter.next()) |entry| {
try keys.append(entry.key_ptr.*);
}
std.mem.sort([]const u8, keys.items, {}, struct {
fn lessThan(_: void, a: []const u8, b: []const u8) bool {
return std.mem.lessThan(u8, a, b);
}
}.lessThan);
// Encode each map entry
for (keys.items) |key| {
const value = map.get(key).?;
try encodeVarint(writer, key.len);
try writer.writeAll(key);
try encodeAttributeValue(writer, value);
}
},
}
}
/// Decode an AttributeValue from binary format
fn decodeAttributeValue(decoder: *BinaryDecoder, allocator: std.mem.Allocator) !types.AttributeValue {
const type_tag = try TypeTag.fromByte(try decoder.readByte());
return switch (type_tag) {
.string => blk: {
const len = try decoder.readVarint();
const data = try decoder.readBytes(len);
break :blk types.AttributeValue{ .S = try allocator.dupe(u8, data) };
},
.number => blk: {
const len = try decoder.readVarint();
const data = try decoder.readBytes(len);
break :blk types.AttributeValue{ .N = try allocator.dupe(u8, data) };
},
.binary => blk: {
const len = try decoder.readVarint();
const data = try decoder.readBytes(len);
break :blk types.AttributeValue{ .B = try allocator.dupe(u8, data) };
},
.boolean => blk: {
const byte = try decoder.readByte();
break :blk types.AttributeValue{ .BOOL = byte != 0 };
},
.null => types.AttributeValue{ .NULL = true },
.string_set => blk: {
const count = try decoder.readVarint();
var strings = try allocator.alloc([]const u8, count);
errdefer allocator.free(strings);
for (0..count) |i| {
const len = try decoder.readVarint();
const data = try decoder.readBytes(len);
strings[i] = try allocator.dupe(u8, data);
}
break :blk types.AttributeValue{ .SS = strings };
},
.number_set => blk: {
const count = try decoder.readVarint();
var numbers = try allocator.alloc([]const u8, count);
errdefer allocator.free(numbers);
for (0..count) |i| {
const len = try decoder.readVarint();
const data = try decoder.readBytes(len);
numbers[i] = try allocator.dupe(u8, data);
}
break :blk types.AttributeValue{ .NS = numbers };
},
.binary_set => blk: {
const count = try decoder.readVarint();
var binaries = try allocator.alloc([]const u8, count);
errdefer allocator.free(binaries);
for (0..count) |i| {
const len = try decoder.readVarint();
const data = try decoder.readBytes(len);
binaries[i] = try allocator.dupe(u8, data);
}
break :blk types.AttributeValue{ .BS = binaries };
},
.list => blk: {
const count = try decoder.readVarint();
var list = try allocator.alloc(types.AttributeValue, count);
errdefer allocator.free(list);
for (0..count) |i| {
list[i] = try decodeAttributeValue(decoder, allocator);
}
break :blk types.AttributeValue{ .L = list };
},
.map => blk: {
const count = try decoder.readVarint();
var map = types.Item.init(allocator);
errdefer {
var iter = map.iterator();
while (iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
deinitAttributeValue(entry.value_ptr, allocator);
}
map.deinit();
}
var i: usize = 0;
while (i < count) : (i += 1) {
const key_len = try decoder.readVarint();
const key = try decoder.readBytes(key_len);
const owned_key = try allocator.dupe(u8, key);
errdefer allocator.free(owned_key);
var value = try decodeAttributeValue(decoder, allocator);
errdefer deinitAttributeValue(&value, allocator);
try map.put(owned_key, value);
}
break :blk types.AttributeValue{ .M = map };
},
};
}
/// Convert binary-encoded item to DynamoDB JSON for API responses
/// This is a convenience wrapper around decode + json.serializeItem
pub fn toDynamoJson(allocator: std.mem.Allocator, binary_data: []const u8) ![]u8 {
const json = @import("dynamodb/json.zig");
var item = try decode(allocator, binary_data);
defer json.deinitItem(&item, allocator);
return json.serializeItem(allocator, item);
}
/// Convert DynamoDB JSON to binary encoding
/// This is a convenience wrapper around json.parseItem + encode
pub fn fromDynamoJson(allocator: std.mem.Allocator, json_data: []const u8) ![]u8 {
const json = @import("dynamodb/json.zig");
var item = try json.parseItem(allocator, json_data);
defer json.deinitItem(&item, allocator);
return encode(allocator, item);
}
// ============================================================================
// Binary Decoder Helper
// ============================================================================
const BinaryDecoder = struct {
data: []const u8,
pos: usize,
pub fn init(data: []const u8) BinaryDecoder {
return .{ .data = data, .pos = 0 };
}
pub fn readByte(self: *BinaryDecoder) !u8 {
if (self.pos >= self.data.len) return error.UnexpectedEndOfData;
const byte = self.data[self.pos];
self.pos += 1;
return byte;
}
pub fn readBytes(self: *BinaryDecoder, len: usize) ![]const u8 {
if (self.pos + len > self.data.len) return error.UnexpectedEndOfData;
const bytes = self.data[self.pos .. self.pos + len];
self.pos += len;
return bytes;
}
pub fn readVarint(self: *BinaryDecoder) !usize {
var result: usize = 0;
var shift: u6 = 0;
while (self.pos < self.data.len) {
const byte = self.data[self.pos];
self.pos += 1;
result |= @as(usize, byte & 0x7F) << shift;
if ((byte & 0x80) == 0) {
return result;
}
shift += 7;
if (shift >= 64) return error.VarintOverflow;
}
return error.UnexpectedEndOfData;
}
};
// ============================================================================
// Varint encoding (same as key_codec.zig for consistency)
// ============================================================================
fn encodeVarint(writer: anytype, value: usize) !void {
var v = value;
while (true) {
const byte = @as(u8, @intCast(v & 0x7F));
v >>= 7;
if (v == 0) {
try writer.writeByte(byte);
return;
} else {
try writer.writeByte(byte | 0x80);
}
}
}
// ============================================================================
// Memory Management Helper
// ============================================================================
fn deinitAttributeValue(attr: *types.AttributeValue, allocator: std.mem.Allocator) void {
const json = @import("dynamodb/json.zig");
json.deinitAttributeValue(attr, allocator);
}