try to get secondary index to work
This commit is contained in:
@@ -5,6 +5,7 @@ const types = @import("types.zig");
|
||||
const json = @import("json.zig");
|
||||
const key_codec = @import("../key_codec.zig");
|
||||
const item_codec = @import("../item_codec.zig");
|
||||
const index_codec = @import("../index_codec.zig");
|
||||
|
||||
pub const StorageError = error{
|
||||
TableNotFound,
|
||||
@@ -21,7 +22,7 @@ pub const StorageError = error{
|
||||
/// Schema version allows for future migrations
|
||||
const TableMetadata = struct {
|
||||
/// Schema version for metadata format evolution
|
||||
schema_version: u32 = 1,
|
||||
schema_version: u32 = 2,
|
||||
|
||||
table_name: []const u8,
|
||||
key_schema: []types.KeySchemaElement,
|
||||
@@ -29,10 +30,12 @@ const TableMetadata = struct {
|
||||
table_status: types.TableStatus,
|
||||
creation_date_time: i64,
|
||||
|
||||
// Phase 2.4: Secondary indexes
|
||||
global_secondary_indexes: ?[]types.GlobalSecondaryIndex = null,
|
||||
local_secondary_indexes: ?[]types.LocalSecondaryIndex = null,
|
||||
|
||||
/// Future fields for Phase 3+:
|
||||
/// - provisioned_throughput: ?ProvisionedThroughput
|
||||
/// - global_secondary_indexes: ?[]GlobalSecondaryIndex
|
||||
/// - local_secondary_indexes: ?[]LocalSecondaryIndex
|
||||
/// - stream_specification: ?StreamSpecification
|
||||
/// - sse_description: ?SSEDescription
|
||||
/// - billing_mode: BillingMode
|
||||
@@ -46,6 +49,22 @@ const TableMetadata = struct {
|
||||
allocator.free(ad.attribute_name);
|
||||
}
|
||||
allocator.free(self.attribute_definitions);
|
||||
|
||||
if (self.global_secondary_indexes) |gsis| {
|
||||
for (gsis) |*gsi| {
|
||||
var gsi_mut = gsi.*;
|
||||
gsi_mut.deinit(allocator);
|
||||
}
|
||||
allocator.free(gsis);
|
||||
}
|
||||
|
||||
if (self.local_secondary_indexes) |lsis| {
|
||||
for (lsis) |*lsi| {
|
||||
var lsi_mut = lsi.*;
|
||||
lsi_mut.deinit(allocator);
|
||||
}
|
||||
allocator.free(lsis);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -222,8 +241,9 @@ pub const StorageEngine = struct {
|
||||
|
||||
/// Store an item in the database
|
||||
/// Item is serialized to binary TLV format for efficient storage
|
||||
/// Also maintains secondary index entries (GSI and LSI)
|
||||
pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void {
|
||||
// Get table metadata to retrieve key schema
|
||||
// Get table metadata to retrieve key schema and indexes
|
||||
var metadata = try self.getTableMetadata(table_name);
|
||||
defer metadata.deinit(self.allocator);
|
||||
|
||||
@@ -247,8 +267,29 @@ pub const StorageEngine = struct {
|
||||
const item_binary = item_codec.encode(self.allocator, item) catch return StorageError.SerializationError;
|
||||
defer self.allocator.free(item_binary);
|
||||
|
||||
// Store the binary data
|
||||
self.db.put(storage_key, item_binary) catch return StorageError.RocksDBError;
|
||||
// Use a write batch to ensure atomicity
|
||||
var batch = rocksdb.WriteBatch.init() orelse return StorageError.RocksDBError;
|
||||
defer batch.deinit();
|
||||
|
||||
// Write the main item
|
||||
batch.put(storage_key, item_binary);
|
||||
|
||||
// Maintain global secondary indexes
|
||||
if (metadata.global_secondary_indexes) |gsis| {
|
||||
for (gsis) |gsi| {
|
||||
try self.maintainGSIEntry(&batch, table_name, &gsi, item, key_values.pk, key_values.sk);
|
||||
}
|
||||
}
|
||||
|
||||
// Maintain local secondary indexes
|
||||
if (metadata.local_secondary_indexes) |lsis| {
|
||||
for (lsis) |lsi| {
|
||||
try self.maintainLSIEntry(&batch, table_name, &lsi, item, key_values.pk, key_values.sk);
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the batch atomically
|
||||
batch.write(&self.db) catch return StorageError.RocksDBError;
|
||||
}
|
||||
|
||||
/// Retrieve an item from the database
|
||||
@@ -442,6 +483,99 @@ pub const StorageEngine = struct {
|
||||
};
|
||||
}
|
||||
|
||||
/// Maintain a GSI entry for an item
|
||||
fn maintainGSIEntry(
|
||||
self: *Self,
|
||||
batch: *rocksdb.WriteBatch,
|
||||
table_name: []const u8,
|
||||
gsi: *const types.GlobalSecondaryIndex,
|
||||
item: types.Item,
|
||||
primary_pk: []const u8,
|
||||
primary_sk: ?[]const u8,
|
||||
) StorageError!void {
|
||||
// Extract GSI key values from item
|
||||
const gsi_key_values = self.extractKeyValues(item, gsi.key_schema) catch {
|
||||
// Item doesn't have GSI key attributes - skip this index entry
|
||||
return;
|
||||
};
|
||||
defer {
|
||||
self.allocator.free(gsi_key_values.pk);
|
||||
if (gsi_key_values.sk) |sk| self.allocator.free(sk);
|
||||
}
|
||||
|
||||
// Build GSI storage key
|
||||
const gsi_key = try key_codec.buildGSIKey(
|
||||
self.allocator,
|
||||
table_name,
|
||||
gsi.index_name,
|
||||
gsi_key_values.pk,
|
||||
gsi_key_values.sk,
|
||||
);
|
||||
defer self.allocator.free(gsi_key);
|
||||
|
||||
// Encode primary key reference as the value
|
||||
const primary_key_ref = try index_codec.encodePrimaryKeyRef(
|
||||
self.allocator,
|
||||
primary_pk,
|
||||
primary_sk,
|
||||
);
|
||||
defer self.allocator.free(primary_key_ref);
|
||||
|
||||
batch.put(gsi_key, primary_key_ref);
|
||||
}
|
||||
|
||||
/// Maintain an LSI entry for an item
|
||||
fn maintainLSIEntry(
|
||||
self: *Self,
|
||||
batch: *rocksdb.WriteBatch,
|
||||
table_name: []const u8,
|
||||
lsi: *const types.LocalSecondaryIndex,
|
||||
item: types.Item,
|
||||
primary_pk: []const u8,
|
||||
primary_sk: ?[]const u8,
|
||||
) StorageError!void {
|
||||
// LSI requires a sort key from the LSI key schema
|
||||
// Find the sort key attribute (LSI shares partition key with table)
|
||||
var lsi_sk_value: ?[]const u8 = null;
|
||||
for (lsi.key_schema) |ks| {
|
||||
if (ks.key_type == .RANGE) {
|
||||
const attr = item.get(ks.attribute_name) orelse {
|
||||
// Item doesn't have LSI sort key - skip this index entry
|
||||
return;
|
||||
};
|
||||
lsi_sk_value = switch (attr) {
|
||||
.S => |s| s,
|
||||
.N => |n| n,
|
||||
.B => |b| b,
|
||||
else => return StorageError.InvalidKey,
|
||||
};
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const lsi_sk = lsi_sk_value orelse return; // No sort key found
|
||||
|
||||
// Build LSI storage key
|
||||
const lsi_key = try key_codec.buildLSIKey(
|
||||
self.allocator,
|
||||
table_name,
|
||||
lsi.index_name,
|
||||
primary_pk,
|
||||
lsi_sk,
|
||||
);
|
||||
defer self.allocator.free(lsi_key);
|
||||
|
||||
// Encode primary key reference as the value
|
||||
const primary_key_ref = try index_codec.encodePrimaryKeyRef(
|
||||
self.allocator,
|
||||
primary_pk,
|
||||
primary_sk,
|
||||
);
|
||||
defer self.allocator.free(primary_key_ref);
|
||||
|
||||
batch.put(lsi_key, primary_key_ref);
|
||||
}
|
||||
|
||||
// === Serialization ===
|
||||
|
||||
/// Serializable representation of KeySchemaElement for JSON
|
||||
|
||||
@@ -66,6 +66,71 @@ pub const AttributeDefinition = struct {
|
||||
attribute_type: ScalarAttributeType,
|
||||
};
|
||||
|
||||
pub const ProjectionType = enum {
|
||||
ALL,
|
||||
KEYS_ONLY,
|
||||
INCLUDE,
|
||||
|
||||
pub fn toString(self: ProjectionType) []const u8 {
|
||||
return switch (self) {
|
||||
.ALL => "ALL",
|
||||
.KEYS_ONLY => "KEYS_ONLY",
|
||||
.INCLUDE => "INCLUDE",
|
||||
};
|
||||
}
|
||||
|
||||
pub fn fromString(s: []const u8) ?ProjectionType {
|
||||
if (std.mem.eql(u8, s, "ALL")) return .ALL;
|
||||
if (std.mem.eql(u8, s, "KEYS_ONLY")) return .KEYS_ONLY;
|
||||
if (std.mem.eql(u8, s, "INCLUDE")) return .INCLUDE;
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
pub const Projection = struct {
|
||||
projection_type: ProjectionType,
|
||||
non_key_attributes: ?[][]const u8,
|
||||
|
||||
pub fn deinit(self: *Projection, allocator: std.mem.Allocator) void {
|
||||
if (self.non_key_attributes) |attrs| {
|
||||
for (attrs) |attr| {
|
||||
allocator.free(attr);
|
||||
}
|
||||
allocator.free(attrs);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub const GlobalSecondaryIndex = struct {
|
||||
index_name: []const u8,
|
||||
key_schema: []KeySchemaElement,
|
||||
projection: Projection,
|
||||
|
||||
pub fn deinit(self: *GlobalSecondaryIndex, allocator: std.mem.Allocator) void {
|
||||
allocator.free(self.index_name);
|
||||
for (self.key_schema) |ks| {
|
||||
allocator.free(ks.attribute_name);
|
||||
}
|
||||
allocator.free(self.key_schema);
|
||||
self.projection.deinit(allocator);
|
||||
}
|
||||
};
|
||||
|
||||
pub const LocalSecondaryIndex = struct {
|
||||
index_name: []const u8,
|
||||
key_schema: []KeySchemaElement,
|
||||
projection: Projection,
|
||||
|
||||
pub fn deinit(self: *LocalSecondaryIndex, allocator: std.mem.Allocator) void {
|
||||
allocator.free(self.index_name);
|
||||
for (self.key_schema) |ks| {
|
||||
allocator.free(ks.attribute_name);
|
||||
}
|
||||
allocator.free(self.key_schema);
|
||||
self.projection.deinit(allocator);
|
||||
}
|
||||
};
|
||||
|
||||
pub const TableStatus = enum {
|
||||
CREATING,
|
||||
UPDATING,
|
||||
@@ -96,6 +161,8 @@ pub const TableDescription = struct {
|
||||
creation_date_time: i64,
|
||||
item_count: u64,
|
||||
table_size_bytes: u64,
|
||||
global_secondary_indexes: ?[]const GlobalSecondaryIndex = null,
|
||||
local_secondary_indexes: ?[]const LocalSecondaryIndex = null,
|
||||
};
|
||||
|
||||
/// DynamoDB operation types parsed from X-Amz-Target header
|
||||
|
||||
127
src/index_codec.zig
Normal file
127
src/index_codec.zig
Normal file
@@ -0,0 +1,127 @@
|
||||
/// Secondary index entry encoding
|
||||
/// Index entries store pointers to primary keys, not full items
|
||||
const std = @import("std");
|
||||
|
||||
/// Encode a primary key reference for storage in an index entry
|
||||
/// Format: [pk_len:varint][pk:bytes][sk_len:varint][sk:bytes]?
|
||||
/// Returns owned slice that caller must free
|
||||
pub fn encodePrimaryKeyRef(
|
||||
allocator: std.mem.Allocator,
|
||||
pk_value: []const u8,
|
||||
sk_value: ?[]const u8,
|
||||
) ![]u8 {
|
||||
var buf = std.ArrayList(u8).init(allocator);
|
||||
errdefer buf.deinit();
|
||||
const writer = buf.writer();
|
||||
|
||||
// Encode partition key
|
||||
try encodeVarint(writer, pk_value.len);
|
||||
try writer.writeAll(pk_value);
|
||||
|
||||
// Encode sort key if present
|
||||
if (sk_value) |sk| {
|
||||
try encodeVarint(writer, sk.len);
|
||||
try writer.writeAll(sk);
|
||||
}
|
||||
|
||||
return buf.toOwnedSlice();
|
||||
}
|
||||
|
||||
/// Decode a primary key reference from an index entry
|
||||
/// Returns struct with owned slices that caller must free
|
||||
pub fn decodePrimaryKeyRef(allocator: std.mem.Allocator, data: []const u8) !PrimaryKeyRef {
|
||||
var decoder = BinaryDecoder.init(data);
|
||||
|
||||
// Decode partition key
|
||||
const pk_len = try decoder.readVarint();
|
||||
const pk = try decoder.readBytes(pk_len);
|
||||
const owned_pk = try allocator.dupe(u8, pk);
|
||||
errdefer allocator.free(owned_pk);
|
||||
|
||||
// Decode sort key if present
|
||||
var owned_sk: ?[]u8 = null;
|
||||
if (decoder.hasMore()) {
|
||||
const sk_len = try decoder.readVarint();
|
||||
const sk = try decoder.readBytes(sk_len);
|
||||
owned_sk = try allocator.dupe(u8, sk);
|
||||
}
|
||||
|
||||
return PrimaryKeyRef{
|
||||
.pk = owned_pk,
|
||||
.sk = owned_sk,
|
||||
};
|
||||
}
|
||||
|
||||
pub const PrimaryKeyRef = struct {
|
||||
pk: []u8,
|
||||
sk: ?[]u8,
|
||||
|
||||
pub fn deinit(self: *PrimaryKeyRef, allocator: std.mem.Allocator) void {
|
||||
allocator.free(self.pk);
|
||||
if (self.sk) |sk| allocator.free(sk);
|
||||
}
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Binary Decoder Helper
|
||||
// ============================================================================
|
||||
|
||||
const BinaryDecoder = struct {
|
||||
data: []const u8,
|
||||
pos: usize,
|
||||
|
||||
pub fn init(data: []const u8) BinaryDecoder {
|
||||
return .{ .data = data, .pos = 0 };
|
||||
}
|
||||
|
||||
pub fn readBytes(self: *BinaryDecoder, len: usize) ![]const u8 {
|
||||
if (self.pos + len > self.data.len) return error.UnexpectedEndOfData;
|
||||
const bytes = self.data[self.pos .. self.pos + len];
|
||||
self.pos += len;
|
||||
return bytes;
|
||||
}
|
||||
|
||||
pub fn readVarint(self: *BinaryDecoder) !usize {
|
||||
var result: usize = 0;
|
||||
var shift: u6 = 0;
|
||||
|
||||
while (self.pos < self.data.len) {
|
||||
const byte = self.data[self.pos];
|
||||
self.pos += 1;
|
||||
|
||||
result |= @as(usize, byte & 0x7F) << shift;
|
||||
|
||||
if ((byte & 0x80) == 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
shift += 7;
|
||||
if (shift >= 64) return error.VarintOverflow;
|
||||
}
|
||||
|
||||
return error.UnexpectedEndOfData;
|
||||
}
|
||||
|
||||
pub fn hasMore(self: *BinaryDecoder) bool {
|
||||
return self.pos < self.data.len;
|
||||
}
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Varint encoding (consistent with key_codec and item_codec)
|
||||
// ============================================================================
|
||||
|
||||
fn encodeVarint(writer: anytype, value: usize) !void {
|
||||
var v = value;
|
||||
while (true) {
|
||||
const byte = @as(u8, @intCast(v & 0x7F));
|
||||
v >>= 7;
|
||||
|
||||
if (v == 0) {
|
||||
try writer.writeByte(byte);
|
||||
return;
|
||||
} else {
|
||||
try writer.writeByte(byte | 0x80);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user