From 1b9bea7f53a00f6e96a205b728f16a79247df3fc Mon Sep 17 00:00:00 2001 From: biondizzle Date: Tue, 20 Jan 2026 12:44:41 -0500 Subject: [PATCH] try to get secondary index to work --- src/dynamodb/storage.zig | 146 +++++++++++++++++++++++++++++++++++++-- src/dynamodb/types.zig | 67 ++++++++++++++++++ src/index_codec.zig | 127 ++++++++++++++++++++++++++++++++++ 3 files changed, 334 insertions(+), 6 deletions(-) create mode 100644 src/index_codec.zig diff --git a/src/dynamodb/storage.zig b/src/dynamodb/storage.zig index ca4192c..22a5eec 100644 --- a/src/dynamodb/storage.zig +++ b/src/dynamodb/storage.zig @@ -5,6 +5,7 @@ const types = @import("types.zig"); const json = @import("json.zig"); const key_codec = @import("../key_codec.zig"); const item_codec = @import("../item_codec.zig"); +const index_codec = @import("../index_codec.zig"); pub const StorageError = error{ TableNotFound, @@ -21,7 +22,7 @@ pub const StorageError = error{ /// Schema version allows for future migrations const TableMetadata = struct { /// Schema version for metadata format evolution - schema_version: u32 = 1, + schema_version: u32 = 2, table_name: []const u8, key_schema: []types.KeySchemaElement, @@ -29,10 +30,12 @@ const TableMetadata = struct { table_status: types.TableStatus, creation_date_time: i64, + // Phase 2.4: Secondary indexes + global_secondary_indexes: ?[]types.GlobalSecondaryIndex = null, + local_secondary_indexes: ?[]types.LocalSecondaryIndex = null, + /// Future fields for Phase 3+: /// - provisioned_throughput: ?ProvisionedThroughput - /// - global_secondary_indexes: ?[]GlobalSecondaryIndex - /// - local_secondary_indexes: ?[]LocalSecondaryIndex /// - stream_specification: ?StreamSpecification /// - sse_description: ?SSEDescription /// - billing_mode: BillingMode @@ -46,6 +49,22 @@ const TableMetadata = struct { allocator.free(ad.attribute_name); } allocator.free(self.attribute_definitions); + + if (self.global_secondary_indexes) |gsis| { + for (gsis) |*gsi| { + var gsi_mut = gsi.*; + gsi_mut.deinit(allocator); + } + allocator.free(gsis); + } + + if (self.local_secondary_indexes) |lsis| { + for (lsis) |*lsi| { + var lsi_mut = lsi.*; + lsi_mut.deinit(allocator); + } + allocator.free(lsis); + } } }; @@ -222,8 +241,9 @@ pub const StorageEngine = struct { /// Store an item in the database /// Item is serialized to binary TLV format for efficient storage + /// Also maintains secondary index entries (GSI and LSI) pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void { - // Get table metadata to retrieve key schema + // Get table metadata to retrieve key schema and indexes var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); @@ -247,8 +267,29 @@ pub const StorageEngine = struct { const item_binary = item_codec.encode(self.allocator, item) catch return StorageError.SerializationError; defer self.allocator.free(item_binary); - // Store the binary data - self.db.put(storage_key, item_binary) catch return StorageError.RocksDBError; + // Use a write batch to ensure atomicity + var batch = rocksdb.WriteBatch.init() orelse return StorageError.RocksDBError; + defer batch.deinit(); + + // Write the main item + batch.put(storage_key, item_binary); + + // Maintain global secondary indexes + if (metadata.global_secondary_indexes) |gsis| { + for (gsis) |gsi| { + try self.maintainGSIEntry(&batch, table_name, &gsi, item, key_values.pk, key_values.sk); + } + } + + // Maintain local secondary indexes + if (metadata.local_secondary_indexes) |lsis| { + for (lsis) |lsi| { + try self.maintainLSIEntry(&batch, table_name, &lsi, item, key_values.pk, key_values.sk); + } + } + + // Commit the batch atomically + batch.write(&self.db) catch return StorageError.RocksDBError; } /// Retrieve an item from the database @@ -442,6 +483,99 @@ pub const StorageEngine = struct { }; } + /// Maintain a GSI entry for an item + fn maintainGSIEntry( + self: *Self, + batch: *rocksdb.WriteBatch, + table_name: []const u8, + gsi: *const types.GlobalSecondaryIndex, + item: types.Item, + primary_pk: []const u8, + primary_sk: ?[]const u8, + ) StorageError!void { + // Extract GSI key values from item + const gsi_key_values = self.extractKeyValues(item, gsi.key_schema) catch { + // Item doesn't have GSI key attributes - skip this index entry + return; + }; + defer { + self.allocator.free(gsi_key_values.pk); + if (gsi_key_values.sk) |sk| self.allocator.free(sk); + } + + // Build GSI storage key + const gsi_key = try key_codec.buildGSIKey( + self.allocator, + table_name, + gsi.index_name, + gsi_key_values.pk, + gsi_key_values.sk, + ); + defer self.allocator.free(gsi_key); + + // Encode primary key reference as the value + const primary_key_ref = try index_codec.encodePrimaryKeyRef( + self.allocator, + primary_pk, + primary_sk, + ); + defer self.allocator.free(primary_key_ref); + + batch.put(gsi_key, primary_key_ref); + } + + /// Maintain an LSI entry for an item + fn maintainLSIEntry( + self: *Self, + batch: *rocksdb.WriteBatch, + table_name: []const u8, + lsi: *const types.LocalSecondaryIndex, + item: types.Item, + primary_pk: []const u8, + primary_sk: ?[]const u8, + ) StorageError!void { + // LSI requires a sort key from the LSI key schema + // Find the sort key attribute (LSI shares partition key with table) + var lsi_sk_value: ?[]const u8 = null; + for (lsi.key_schema) |ks| { + if (ks.key_type == .RANGE) { + const attr = item.get(ks.attribute_name) orelse { + // Item doesn't have LSI sort key - skip this index entry + return; + }; + lsi_sk_value = switch (attr) { + .S => |s| s, + .N => |n| n, + .B => |b| b, + else => return StorageError.InvalidKey, + }; + break; + } + } + + const lsi_sk = lsi_sk_value orelse return; // No sort key found + + // Build LSI storage key + const lsi_key = try key_codec.buildLSIKey( + self.allocator, + table_name, + lsi.index_name, + primary_pk, + lsi_sk, + ); + defer self.allocator.free(lsi_key); + + // Encode primary key reference as the value + const primary_key_ref = try index_codec.encodePrimaryKeyRef( + self.allocator, + primary_pk, + primary_sk, + ); + defer self.allocator.free(primary_key_ref); + + batch.put(lsi_key, primary_key_ref); + } + // === Serialization === /// Serializable representation of KeySchemaElement for JSON diff --git a/src/dynamodb/types.zig b/src/dynamodb/types.zig index c96a80a..165cd50 100644 --- a/src/dynamodb/types.zig +++ b/src/dynamodb/types.zig @@ -66,6 +66,71 @@ pub const AttributeDefinition = struct { attribute_type: ScalarAttributeType, }; +pub const ProjectionType = enum { + ALL, + KEYS_ONLY, + INCLUDE, + + pub fn toString(self: ProjectionType) []const u8 { + return switch (self) { + .ALL => "ALL", + .KEYS_ONLY => "KEYS_ONLY", + .INCLUDE => "INCLUDE", + }; + } + + pub fn fromString(s: []const u8) ?ProjectionType { + if (std.mem.eql(u8, s, "ALL")) return .ALL; + if (std.mem.eql(u8, s, "KEYS_ONLY")) return .KEYS_ONLY; + if (std.mem.eql(u8, s, "INCLUDE")) return .INCLUDE; + return null; + } +}; + +pub const Projection = struct { + projection_type: ProjectionType, + non_key_attributes: ?[][]const u8, + + pub fn deinit(self: *Projection, allocator: std.mem.Allocator) void { + if (self.non_key_attributes) |attrs| { + for (attrs) |attr| { + allocator.free(attr); + } + allocator.free(attrs); + } + } +}; + +pub const GlobalSecondaryIndex = struct { + index_name: []const u8, + key_schema: []KeySchemaElement, + projection: Projection, + + pub fn deinit(self: *GlobalSecondaryIndex, allocator: std.mem.Allocator) void { + allocator.free(self.index_name); + for (self.key_schema) |ks| { + allocator.free(ks.attribute_name); + } + allocator.free(self.key_schema); + self.projection.deinit(allocator); + } +}; + +pub const LocalSecondaryIndex = struct { + index_name: []const u8, + key_schema: []KeySchemaElement, + projection: Projection, + + pub fn deinit(self: *LocalSecondaryIndex, allocator: std.mem.Allocator) void { + allocator.free(self.index_name); + for (self.key_schema) |ks| { + allocator.free(ks.attribute_name); + } + allocator.free(self.key_schema); + self.projection.deinit(allocator); + } +}; + pub const TableStatus = enum { CREATING, UPDATING, @@ -96,6 +161,8 @@ pub const TableDescription = struct { creation_date_time: i64, item_count: u64, table_size_bytes: u64, + global_secondary_indexes: ?[]const GlobalSecondaryIndex = null, + local_secondary_indexes: ?[]const LocalSecondaryIndex = null, }; /// DynamoDB operation types parsed from X-Amz-Target header diff --git a/src/index_codec.zig b/src/index_codec.zig new file mode 100644 index 0000000..0e31d7f --- /dev/null +++ b/src/index_codec.zig @@ -0,0 +1,127 @@ +/// Secondary index entry encoding +/// Index entries store pointers to primary keys, not full items +const std = @import("std"); + +/// Encode a primary key reference for storage in an index entry +/// Format: [pk_len:varint][pk:bytes][sk_len:varint][sk:bytes]? +/// Returns owned slice that caller must free +pub fn encodePrimaryKeyRef( + allocator: std.mem.Allocator, + pk_value: []const u8, + sk_value: ?[]const u8, +) ![]u8 { + var buf = std.ArrayList(u8).init(allocator); + errdefer buf.deinit(); + const writer = buf.writer(); + + // Encode partition key + try encodeVarint(writer, pk_value.len); + try writer.writeAll(pk_value); + + // Encode sort key if present + if (sk_value) |sk| { + try encodeVarint(writer, sk.len); + try writer.writeAll(sk); + } + + return buf.toOwnedSlice(); +} + +/// Decode a primary key reference from an index entry +/// Returns struct with owned slices that caller must free +pub fn decodePrimaryKeyRef(allocator: std.mem.Allocator, data: []const u8) !PrimaryKeyRef { + var decoder = BinaryDecoder.init(data); + + // Decode partition key + const pk_len = try decoder.readVarint(); + const pk = try decoder.readBytes(pk_len); + const owned_pk = try allocator.dupe(u8, pk); + errdefer allocator.free(owned_pk); + + // Decode sort key if present + var owned_sk: ?[]u8 = null; + if (decoder.hasMore()) { + const sk_len = try decoder.readVarint(); + const sk = try decoder.readBytes(sk_len); + owned_sk = try allocator.dupe(u8, sk); + } + + return PrimaryKeyRef{ + .pk = owned_pk, + .sk = owned_sk, + }; +} + +pub const PrimaryKeyRef = struct { + pk: []u8, + sk: ?[]u8, + + pub fn deinit(self: *PrimaryKeyRef, allocator: std.mem.Allocator) void { + allocator.free(self.pk); + if (self.sk) |sk| allocator.free(sk); + } +}; + +// ============================================================================ +// Binary Decoder Helper +// ============================================================================ + +const BinaryDecoder = struct { + data: []const u8, + pos: usize, + + pub fn init(data: []const u8) BinaryDecoder { + return .{ .data = data, .pos = 0 }; + } + + pub fn readBytes(self: *BinaryDecoder, len: usize) ![]const u8 { + if (self.pos + len > self.data.len) return error.UnexpectedEndOfData; + const bytes = self.data[self.pos .. self.pos + len]; + self.pos += len; + return bytes; + } + + pub fn readVarint(self: *BinaryDecoder) !usize { + var result: usize = 0; + var shift: u6 = 0; + + while (self.pos < self.data.len) { + const byte = self.data[self.pos]; + self.pos += 1; + + result |= @as(usize, byte & 0x7F) << shift; + + if ((byte & 0x80) == 0) { + return result; + } + + shift += 7; + if (shift >= 64) return error.VarintOverflow; + } + + return error.UnexpectedEndOfData; + } + + pub fn hasMore(self: *BinaryDecoder) bool { + return self.pos < self.data.len; + } +}; + +// ============================================================================ +// Varint encoding (consistent with key_codec and item_codec) +// ============================================================================ + +fn encodeVarint(writer: anytype, value: usize) !void { + var v = value; + while (true) { + const byte = @as(u8, @intCast(v & 0x7F)); + v >>= 7; + + if (v == 0) { + try writer.writeByte(byte); + return; + } else { + try writer.writeByte(byte | 0x80); + } + } +}