diff --git a/src/dynamodb/storage.zig b/src/dynamodb/storage.zig index 0a9767d..ca4192c 100644 --- a/src/dynamodb/storage.zig +++ b/src/dynamodb/storage.zig @@ -4,6 +4,7 @@ const rocksdb = @import("../rocksdb.zig"); const types = @import("types.zig"); const json = @import("json.zig"); const key_codec = @import("../key_codec.zig"); +const item_codec = @import("../item_codec.zig"); pub const StorageError = error{ TableNotFound, @@ -16,14 +17,25 @@ pub const StorageError = error{ OutOfMemory, }; -/// In-memory representation of table metadata +/// In-memory representation of table metadata with versioning +/// Schema version allows for future migrations const TableMetadata = struct { + /// Schema version for metadata format evolution + schema_version: u32 = 1, + table_name: []const u8, key_schema: []types.KeySchemaElement, attribute_definitions: []types.AttributeDefinition, table_status: types.TableStatus, creation_date_time: i64, + /// Future fields for Phase 3+: + /// - provisioned_throughput: ?ProvisionedThroughput + /// - global_secondary_indexes: ?[]GlobalSecondaryIndex + /// - local_secondary_indexes: ?[]LocalSecondaryIndex + /// - stream_specification: ?StreamSpecification + /// - sse_description: ?SSEDescription + /// - billing_mode: BillingMode pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void { allocator.free(self.table_name); for (self.key_schema) |ks| { @@ -209,7 +221,7 @@ pub const StorageEngine = struct { // === Item Operations === /// Store an item in the database - /// Item is serialized to canonical JSON before storage + /// Item is serialized to binary TLV format for efficient storage pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void { // Get table metadata to retrieve key schema var metadata = try self.getTableMetadata(table_name); @@ -231,16 +243,16 @@ pub const StorageEngine = struct { ); defer self.allocator.free(storage_key); - // Serialize item to canonical JSON for storage - const item_json = json.serializeItem(self.allocator, item) catch return StorageError.SerializationError; - defer self.allocator.free(item_json); + // Serialize item to binary TLV format + const item_binary = item_codec.encode(self.allocator, item) catch return StorageError.SerializationError; + defer self.allocator.free(item_binary); - // Store the canonical JSON - self.db.put(storage_key, item_json) catch return StorageError.RocksDBError; + // Store the binary data + self.db.put(storage_key, item_binary) catch return StorageError.RocksDBError; } /// Retrieve an item from the database - /// Returns a parsed Item (not JSON string) + /// Returns a parsed Item (decoded from binary TLV format) pub fn getItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!?types.Item { // Get table metadata var metadata = try self.getTableMetadata(table_name); @@ -262,12 +274,12 @@ pub const StorageEngine = struct { ); defer self.allocator.free(storage_key); - const item_json = self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError; - if (item_json == null) return null; - defer self.allocator.free(item_json.?); + const item_binary = self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError; + if (item_binary == null) return null; + defer self.allocator.free(item_binary.?); - // Parse the stored JSON back into an Item - return json.parseItem(self.allocator, item_json.?) catch return StorageError.SerializationError; + // Decode the binary data back into an Item + return item_codec.decode(self.allocator, item_binary.?) catch return StorageError.SerializationError; } pub fn deleteItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!void { @@ -294,7 +306,7 @@ pub const StorageEngine = struct { self.db.delete(storage_key) catch return StorageError.RocksDBError; } - /// Scan a table and return parsed Items (not JSON strings) + /// Scan a table and return parsed Items (decoded from binary) pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![]types.Item { // Verify table exists var metadata = try self.getTableMetadata(table_name); @@ -322,8 +334,8 @@ pub const StorageEngine = struct { const value = iter.value() orelse break; - // Parse the stored JSON into an Item - const item = json.parseItem(self.allocator, value) catch { + // Decode the binary data into an Item + const item = item_codec.decode(self.allocator, value) catch { iter.next(); continue; }; @@ -365,8 +377,8 @@ pub const StorageEngine = struct { const value = iter.value() orelse break; - // Parse the stored JSON into an Item - const item = json.parseItem(self.allocator, value) catch { + // Decode the binary data into an Item + const item = item_codec.decode(self.allocator, value) catch { iter.next(); continue; }; @@ -432,38 +444,64 @@ pub const StorageEngine = struct { // === Serialization === + /// Serializable representation of KeySchemaElement for JSON + const KeySchemaElementJson = struct { + AttributeName: []const u8, + KeyType: []const u8, + }; + + /// Serializable representation of AttributeDefinition for JSON + const AttributeDefinitionJson = struct { + AttributeName: []const u8, + AttributeType: []const u8, + }; + + /// Serializable representation of TableMetadata for JSON + const TableMetadataJson = struct { + schema_version: u32, + TableName: []const u8, + TableStatus: []const u8, + CreationDateTime: i64, + KeySchema: []const KeySchemaElementJson, + AttributeDefinitions: []const AttributeDefinitionJson, + }; + fn serializeTableMetadata(self: *Self, metadata: TableMetadata) StorageError![]u8 { + // Build JSON-friendly structs + var key_schema_json = std.ArrayList(KeySchemaElementJson).init(self.allocator); + defer key_schema_json.deinit(); + + for (metadata.key_schema) |ks| { + try key_schema_json.append(.{ + .AttributeName = ks.attribute_name, + .KeyType = ks.key_type.toString(), + }); + } + + var attr_defs_json = std.ArrayList(AttributeDefinitionJson).init(self.allocator); + defer attr_defs_json.deinit(); + + for (metadata.attribute_definitions) |ad| { + try attr_defs_json.append(.{ + .AttributeName = ad.attribute_name, + .AttributeType = ad.attribute_type.toString(), + }); + } + + const metadata_json = TableMetadataJson{ + .schema_version = metadata.schema_version, + .TableName = metadata.table_name, + .TableStatus = metadata.table_status.toString(), + .CreationDateTime = metadata.creation_date_time, + .KeySchema = key_schema_json.items, + .AttributeDefinitions = attr_defs_json.items, + }; + + // Use std.json.stringify for clean, reliable serialization var buf = std.ArrayList(u8).init(self.allocator); errdefer buf.deinit(); - const writer = buf.writer(); - writer.writeAll("{\"TableName\":\"") catch return StorageError.SerializationError; - writer.writeAll(metadata.table_name) catch return StorageError.SerializationError; - writer.writeAll("\",\"TableStatus\":\"") catch return StorageError.SerializationError; - writer.writeAll(metadata.table_status.toString()) catch return StorageError.SerializationError; - writer.print("\",\"CreationDateTime\":{d},\"KeySchema\":[", .{metadata.creation_date_time}) catch return StorageError.SerializationError; - - for (metadata.key_schema, 0..) |ks, i| { - if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError; - writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError; - writer.writeAll(ks.attribute_name) catch return StorageError.SerializationError; - writer.writeAll("\",\"KeyType\":\"") catch return StorageError.SerializationError; - writer.writeAll(ks.key_type.toString()) catch return StorageError.SerializationError; - writer.writeAll("\"}") catch return StorageError.SerializationError; - } - - writer.writeAll("],\"AttributeDefinitions\":[") catch return StorageError.SerializationError; - - for (metadata.attribute_definitions, 0..) |ad, i| { - if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError; - writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError; - writer.writeAll(ad.attribute_name) catch return StorageError.SerializationError; - writer.writeAll("\",\"AttributeType\":\"") catch return StorageError.SerializationError; - writer.writeAll(ad.attribute_type.toString()) catch return StorageError.SerializationError; - writer.writeAll("\"}") catch return StorageError.SerializationError; - } - - writer.writeAll("]}") catch return StorageError.SerializationError; + try std.json.stringify(metadata_json, .{}, buf.writer()); return buf.toOwnedSlice() catch return StorageError.OutOfMemory; } @@ -477,6 +515,16 @@ pub const StorageEngine = struct { else => return StorageError.SerializationError, }; + // Check schema version for future migrations + const schema_version_val = root.get("schema_version"); + const schema_version: u32 = if (schema_version_val) |v| switch (v) { + .integer => |i| @intCast(i), + else => 1, // Default to v1 if not present + } else 1; // Legacy metadata without version field + + // Future: Handle migrations based on schema_version + // if (schema_version == 1) { ... migrate to v2 ... } + // Extract table name const table_name_val = root.get("TableName") orelse return StorageError.SerializationError; const table_name_str = switch (table_name_val) { @@ -589,6 +637,7 @@ pub const StorageEngine = struct { } return TableMetadata{ + .schema_version = schema_version, .table_name = table_name, .key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory, .attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory, diff --git a/src/item_codec.zig b/src/item_codec.zig new file mode 100644 index 0000000..1f6e8fe --- /dev/null +++ b/src/item_codec.zig @@ -0,0 +1,394 @@ +/// Binary TLV (Type-Length-Value) encoding for DynamoDB items +/// Replaces JSON storage with efficient binary format +/// Format: [attribute_count][name_len][name][type_tag][value_len][value]... +const std = @import("std"); +const types = @import("dynamodb/types.zig"); + +/// Type tags for binary encoding (1 byte each) +pub const TypeTag = enum(u8) { + // Scalar types + string = 0x01, // S + number = 0x02, // N (stored as string) + binary = 0x03, // B (base64 string) + boolean = 0x04, // BOOL + null = 0x05, // NULL + + // Set types + string_set = 0x10, // SS + number_set = 0x11, // NS + binary_set = 0x12, // BS + + // Complex types + list = 0x20, // L + map = 0x21, // M + + pub fn toByte(self: TypeTag) u8 { + return @intFromEnum(self); + } + + pub fn fromByte(byte: u8) !TypeTag { + return std.meta.intToEnum(TypeTag, byte) catch error.InvalidTypeTag; + } +}; + +/// Encode an Item to binary TLV format +/// Format: [attribute_count:varint][attributes...] +/// Each attribute: [name_len:varint][name:bytes][type_tag:u8][value_encoded:bytes] +/// Caller owns returned slice and must free it +pub fn encode(allocator: std.mem.Allocator, item: types.Item) ![]u8 { + var buf = std.ArrayList(u8).init(allocator); + errdefer buf.deinit(); + const writer = buf.writer(); + + // Write attribute count + try encodeVarint(writer, item.count()); + + // Sort keys for deterministic encoding + var keys = std.ArrayList([]const u8).init(allocator); + defer keys.deinit(); + + var iter = item.iterator(); + while (iter.next()) |entry| { + try keys.append(entry.key_ptr.*); + } + + std.mem.sort([]const u8, keys.items, {}, struct { + fn lessThan(_: void, a: []const u8, b: []const u8) bool { + return std.mem.lessThan(u8, a, b); + } + }.lessThan); + + // Encode each attribute + for (keys.items) |key| { + const value = item.get(key).?; + + // Write attribute name + try encodeVarint(writer, key.len); + try writer.writeAll(key); + + // Write attribute value + try encodeAttributeValue(writer, value); + } + + return buf.toOwnedSlice(); +} + +/// Decode binary TLV format back into an Item +/// Caller owns returned Item and must call json.deinitItem() +pub fn decode(allocator: std.mem.Allocator, data: []const u8) !types.Item { + var decoder = BinaryDecoder.init(data); + + const attr_count = try decoder.readVarint(); + + var item = types.Item.init(allocator); + errdefer { + var iter = item.iterator(); + while (iter.next()) |entry| { + allocator.free(entry.key_ptr.*); + deinitAttributeValue(entry.value_ptr, allocator); + } + item.deinit(); + } + + var i: usize = 0; + while (i < attr_count) : (i += 1) { + // Read attribute name + const name_len = try decoder.readVarint(); + const name = try decoder.readBytes(name_len); + const owned_name = try allocator.dupe(u8, name); + errdefer allocator.free(owned_name); + + // Read attribute value + var value = try decodeAttributeValue(&decoder, allocator); + errdefer deinitAttributeValue(&value, allocator); + + try item.put(owned_name, value); + } + + return item; +} + +/// Encode an AttributeValue to binary format +fn encodeAttributeValue(writer: anytype, attr: types.AttributeValue) !void { + switch (attr) { + .S => |s| { + try writer.writeByte(TypeTag.string.toByte()); + try encodeVarint(writer, s.len); + try writer.writeAll(s); + }, + .N => |n| { + try writer.writeByte(TypeTag.number.toByte()); + try encodeVarint(writer, n.len); + try writer.writeAll(n); + }, + .B => |b| { + try writer.writeByte(TypeTag.binary.toByte()); + try encodeVarint(writer, b.len); + try writer.writeAll(b); + }, + .BOOL => |b| { + try writer.writeByte(TypeTag.boolean.toByte()); + try writer.writeByte(if (b) 1 else 0); + }, + .NULL => { + try writer.writeByte(TypeTag.null.toByte()); + // NULL has no value bytes + }, + .SS => |ss| { + try writer.writeByte(TypeTag.string_set.toByte()); + try encodeVarint(writer, ss.len); + for (ss) |s| { + try encodeVarint(writer, s.len); + try writer.writeAll(s); + } + }, + .NS => |ns| { + try writer.writeByte(TypeTag.number_set.toByte()); + try encodeVarint(writer, ns.len); + for (ns) |n| { + try encodeVarint(writer, n.len); + try writer.writeAll(n); + } + }, + .BS => |bs| { + try writer.writeByte(TypeTag.binary_set.toByte()); + try encodeVarint(writer, bs.len); + for (bs) |b| { + try encodeVarint(writer, b.len); + try writer.writeAll(b); + } + }, + .L => |list| { + try writer.writeByte(TypeTag.list.toByte()); + try encodeVarint(writer, list.len); + for (list) |item| { + try encodeAttributeValue(writer, item); + } + }, + .M => |map| { + try writer.writeByte(TypeTag.map.toByte()); + try encodeVarint(writer, map.count()); + + // Sort keys for deterministic encoding + var keys = std.ArrayList([]const u8).init(std.heap.page_allocator); + defer keys.deinit(); + + var iter = map.iterator(); + while (iter.next()) |entry| { + try keys.append(entry.key_ptr.*); + } + + std.mem.sort([]const u8, keys.items, {}, struct { + fn lessThan(_: void, a: []const u8, b: []const u8) bool { + return std.mem.lessThan(u8, a, b); + } + }.lessThan); + + // Encode each map entry + for (keys.items) |key| { + const value = map.get(key).?; + try encodeVarint(writer, key.len); + try writer.writeAll(key); + try encodeAttributeValue(writer, value); + } + }, + } +} + +/// Decode an AttributeValue from binary format +fn decodeAttributeValue(decoder: *BinaryDecoder, allocator: std.mem.Allocator) !types.AttributeValue { + const type_tag = try TypeTag.fromByte(try decoder.readByte()); + + return switch (type_tag) { + .string => blk: { + const len = try decoder.readVarint(); + const data = try decoder.readBytes(len); + break :blk types.AttributeValue{ .S = try allocator.dupe(u8, data) }; + }, + .number => blk: { + const len = try decoder.readVarint(); + const data = try decoder.readBytes(len); + break :blk types.AttributeValue{ .N = try allocator.dupe(u8, data) }; + }, + .binary => blk: { + const len = try decoder.readVarint(); + const data = try decoder.readBytes(len); + break :blk types.AttributeValue{ .B = try allocator.dupe(u8, data) }; + }, + .boolean => blk: { + const byte = try decoder.readByte(); + break :blk types.AttributeValue{ .BOOL = byte != 0 }; + }, + .null => types.AttributeValue{ .NULL = true }, + .string_set => blk: { + const count = try decoder.readVarint(); + var strings = try allocator.alloc([]const u8, count); + errdefer allocator.free(strings); + + for (0..count) |i| { + const len = try decoder.readVarint(); + const data = try decoder.readBytes(len); + strings[i] = try allocator.dupe(u8, data); + } + break :blk types.AttributeValue{ .SS = strings }; + }, + .number_set => blk: { + const count = try decoder.readVarint(); + var numbers = try allocator.alloc([]const u8, count); + errdefer allocator.free(numbers); + + for (0..count) |i| { + const len = try decoder.readVarint(); + const data = try decoder.readBytes(len); + numbers[i] = try allocator.dupe(u8, data); + } + break :blk types.AttributeValue{ .NS = numbers }; + }, + .binary_set => blk: { + const count = try decoder.readVarint(); + var binaries = try allocator.alloc([]const u8, count); + errdefer allocator.free(binaries); + + for (0..count) |i| { + const len = try decoder.readVarint(); + const data = try decoder.readBytes(len); + binaries[i] = try allocator.dupe(u8, data); + } + break :blk types.AttributeValue{ .BS = binaries }; + }, + .list => blk: { + const count = try decoder.readVarint(); + var list = try allocator.alloc(types.AttributeValue, count); + errdefer allocator.free(list); + + for (0..count) |i| { + list[i] = try decodeAttributeValue(decoder, allocator); + } + break :blk types.AttributeValue{ .L = list }; + }, + .map => blk: { + const count = try decoder.readVarint(); + var map = types.Item.init(allocator); + errdefer { + var iter = map.iterator(); + while (iter.next()) |entry| { + allocator.free(entry.key_ptr.*); + deinitAttributeValue(entry.value_ptr, allocator); + } + map.deinit(); + } + + var i: usize = 0; + while (i < count) : (i += 1) { + const key_len = try decoder.readVarint(); + const key = try decoder.readBytes(key_len); + const owned_key = try allocator.dupe(u8, key); + errdefer allocator.free(owned_key); + + var value = try decodeAttributeValue(decoder, allocator); + errdefer deinitAttributeValue(&value, allocator); + + try map.put(owned_key, value); + } + break :blk types.AttributeValue{ .M = map }; + }, + }; +} + +/// Convert binary-encoded item to DynamoDB JSON for API responses +/// This is a convenience wrapper around decode + json.serializeItem +pub fn toDynamoJson(allocator: std.mem.Allocator, binary_data: []const u8) ![]u8 { + const json = @import("dynamodb/json.zig"); + + var item = try decode(allocator, binary_data); + defer json.deinitItem(&item, allocator); + + return json.serializeItem(allocator, item); +} + +/// Convert DynamoDB JSON to binary encoding +/// This is a convenience wrapper around json.parseItem + encode +pub fn fromDynamoJson(allocator: std.mem.Allocator, json_data: []const u8) ![]u8 { + const json = @import("dynamodb/json.zig"); + + var item = try json.parseItem(allocator, json_data); + defer json.deinitItem(&item, allocator); + + return encode(allocator, item); +} + +// ============================================================================ +// Binary Decoder Helper +// ============================================================================ + +const BinaryDecoder = struct { + data: []const u8, + pos: usize, + + pub fn init(data: []const u8) BinaryDecoder { + return .{ .data = data, .pos = 0 }; + } + + pub fn readByte(self: *BinaryDecoder) !u8 { + if (self.pos >= self.data.len) return error.UnexpectedEndOfData; + const byte = self.data[self.pos]; + self.pos += 1; + return byte; + } + + pub fn readBytes(self: *BinaryDecoder, len: usize) ![]const u8 { + if (self.pos + len > self.data.len) return error.UnexpectedEndOfData; + const bytes = self.data[self.pos .. self.pos + len]; + self.pos += len; + return bytes; + } + + pub fn readVarint(self: *BinaryDecoder) !usize { + var result: usize = 0; + var shift: u6 = 0; + + while (self.pos < self.data.len) { + const byte = self.data[self.pos]; + self.pos += 1; + + result |= @as(usize, byte & 0x7F) << shift; + + if ((byte & 0x80) == 0) { + return result; + } + + shift += 7; + if (shift >= 64) return error.VarintOverflow; + } + + return error.UnexpectedEndOfData; + } +}; + +// ============================================================================ +// Varint encoding (same as key_codec.zig for consistency) +// ============================================================================ + +fn encodeVarint(writer: anytype, value: usize) !void { + var v = value; + while (true) { + const byte = @as(u8, @intCast(v & 0x7F)); + v >>= 7; + + if (v == 0) { + try writer.writeByte(byte); + return; + } else { + try writer.writeByte(byte | 0x80); + } + } +} + +// ============================================================================ +// Memory Management Helper +// ============================================================================ + +fn deinitAttributeValue(attr: *types.AttributeValue, allocator: std.mem.Allocator) void { + const json = @import("dynamodb/json.zig"); + json.deinitAttributeValue(attr, allocator); +}