/// Storage engine mapping DynamoDB operations to RocksDB const std = @import("std"); const rocksdb = @import("../rocksdb.zig"); const types = @import("types.zig"); const json = @import("json.zig"); pub const StorageError = error{ TableNotFound, TableAlreadyExists, ItemNotFound, InvalidKey, MissingKeyAttribute, KeyValueContainsSeparator, SerializationError, RocksDBError, OutOfMemory, }; /// Key prefixes for different data types in RocksDB /// PHASE 2 TODO: Replace textual prefixes with binary encoding using length-prefixed segments const KeyPrefix = struct { /// Table metadata: _meta:{table_name} const meta = "_meta:"; /// Item data: _data:{table_name}:{partition_key}[:{sort_key}] const data = "_data:"; /// Global secondary index: _gsi:{table_name}:{index_name}:{pk}:{sk} const gsi = "_gsi:"; /// Local secondary index: _lsi:{table_name}:{index_name}:{pk}:{sk} const lsi = "_lsi:"; }; /// In-memory representation of table metadata const TableMetadata = struct { table_name: []const u8, key_schema: []types.KeySchemaElement, attribute_definitions: []types.AttributeDefinition, table_status: types.TableStatus, creation_date_time: i64, pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void { allocator.free(self.table_name); for (self.key_schema) |ks| { allocator.free(ks.attribute_name); } allocator.free(self.key_schema); for (self.attribute_definitions) |ad| { allocator.free(ad.attribute_name); } allocator.free(self.attribute_definitions); } }; pub const StorageEngine = struct { db: rocksdb.DB, allocator: std.mem.Allocator, const Self = @This(); pub fn init(allocator: std.mem.Allocator, data_dir: [*:0]const u8) !Self { const db = rocksdb.DB.open(data_dir, true) catch return StorageError.RocksDBError; return Self{ .db = db, .allocator = allocator, }; } pub fn deinit(self: *Self) void { self.db.close(); } // === Table Operations === pub fn createTable( self: *Self, table_name: []const u8, key_schema: []const types.KeySchemaElement, attribute_definitions: []const types.AttributeDefinition, ) StorageError!types.TableDescription { // Check if table already exists const meta_key = try self.buildMetaKey(table_name); defer self.allocator.free(meta_key); const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; if (existing) |e| { self.allocator.free(e); return StorageError.TableAlreadyExists; } // Create table metadata const now = std.time.timestamp(); const metadata = TableMetadata{ .table_name = table_name, .key_schema = key_schema, .attribute_definitions = attribute_definitions, .table_status = .ACTIVE, .creation_date_time = now, }; // Serialize and store with canonical format const meta_value = try self.serializeTableMetadata(metadata); defer self.allocator.free(meta_value); self.db.put(meta_key, meta_value) catch return StorageError.RocksDBError; return types.TableDescription{ .table_name = table_name, .key_schema = key_schema, .attribute_definitions = attribute_definitions, .table_status = .ACTIVE, .creation_date_time = now, .item_count = 0, .table_size_bytes = 0, }; } pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void { const meta_key = try self.buildMetaKey(table_name); defer self.allocator.free(meta_key); // Verify table exists const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; if (existing == null) return StorageError.TableNotFound; self.allocator.free(existing.?); // Delete all items with this table's prefix const data_prefix = try self.buildDataPrefix(table_name); defer self.allocator.free(data_prefix); var batch = rocksdb.WriteBatch.init() orelse return StorageError.RocksDBError; defer batch.deinit(); // Scan and delete all matching keys var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; defer iter.deinit(); iter.seek(data_prefix); while (iter.valid()) { const key = iter.key() orelse break; if (!std.mem.startsWith(u8, key, data_prefix)) break; batch.delete(key); iter.next(); } // Delete metadata batch.delete(meta_key); batch.write(&self.db) catch return StorageError.RocksDBError; } pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription { var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); // Count items (expensive, but matches DynamoDB behavior) const data_prefix = try self.buildDataPrefix(table_name); defer self.allocator.free(data_prefix); var item_count: u64 = 0; var total_size: u64 = 0; var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; defer iter.deinit(); iter.seek(data_prefix); while (iter.valid()) { const key = iter.key() orelse break; if (!std.mem.startsWith(u8, key, data_prefix)) break; const value = iter.value() orelse break; item_count += 1; total_size += value.len; iter.next(); } return types.TableDescription{ .table_name = metadata.table_name, .key_schema = metadata.key_schema, .attribute_definitions = metadata.attribute_definitions, .table_status = metadata.table_status, .creation_date_time = metadata.creation_date_time, .item_count = item_count, .table_size_bytes = total_size, }; } pub fn listTables(self: *Self) StorageError![][]const u8 { var tables = std.ArrayList([]const u8).init(self.allocator); errdefer { for (tables.items) |t| self.allocator.free(t); tables.deinit(); } var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; defer iter.deinit(); iter.seek(KeyPrefix.meta); while (iter.valid()) { const key = iter.key() orelse break; if (!std.mem.startsWith(u8, key, KeyPrefix.meta)) break; const table_name = key[KeyPrefix.meta.len..]; const owned_name = self.allocator.dupe(u8, table_name) catch return StorageError.OutOfMemory; tables.append(owned_name) catch return StorageError.OutOfMemory; iter.next(); } return tables.toOwnedSlice() catch return StorageError.OutOfMemory; } // === Item Operations === /// Store an item in the database /// Item is serialized to canonical JSON before storage pub fn putItem(self: *Self, table_name: []const u8, item: types.Item) StorageError!void { // Get table metadata to retrieve key schema var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); // Validate that item contains all required key attributes for (metadata.key_schema) |key_elem| { if (!item.contains(key_elem.attribute_name)) { return StorageError.MissingKeyAttribute; } } // Build storage key using the item and key schema const storage_key = json.buildRocksDBKey( self.allocator, table_name, metadata.key_schema, item, ) catch |err| { return switch (err) { error.KeyValueContainsSeparator => StorageError.KeyValueContainsSeparator, else => StorageError.InvalidKey, }; }; defer self.allocator.free(storage_key); // Serialize item to canonical JSON for storage const item_json = json.serializeItem(self.allocator, item) catch return StorageError.SerializationError; defer self.allocator.free(item_json); // Store the canonical JSON self.db.put(storage_key, item_json) catch return StorageError.RocksDBError; } /// Retrieve an item from the database /// Returns a parsed Item (not JSON string) pub fn getItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!?types.Item { // Get table metadata var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); // Validate key has all required attributes for (metadata.key_schema) |key_elem| { if (!key.contains(key_elem.attribute_name)) { return StorageError.MissingKeyAttribute; } } // Build storage key const storage_key = json.buildRocksDBKey( self.allocator, table_name, metadata.key_schema, key, ) catch |err| { return switch (err) { error.KeyValueContainsSeparator => StorageError.KeyValueContainsSeparator, else => StorageError.InvalidKey, }; }; defer self.allocator.free(storage_key); const item_json = self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError; if (item_json == null) return null; defer self.allocator.free(item_json.?); // Parse the stored JSON back into an Item return json.parseItem(self.allocator, item_json.?) catch return StorageError.SerializationError; } pub fn deleteItem(self: *Self, table_name: []const u8, key: types.Item) StorageError!void { // Get table metadata var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); // Validate key for (metadata.key_schema) |key_elem| { if (!key.contains(key_elem.attribute_name)) { return StorageError.MissingKeyAttribute; } } // Build storage key const storage_key = json.buildRocksDBKey( self.allocator, table_name, metadata.key_schema, key, ) catch |err| { return switch (err) { error.KeyValueContainsSeparator => StorageError.KeyValueContainsSeparator, else => StorageError.InvalidKey, }; }; defer self.allocator.free(storage_key); self.db.delete(storage_key) catch return StorageError.RocksDBError; } /// Scan a table and return parsed Items (not JSON strings) pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![]types.Item { // Verify table exists var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); const data_prefix = try self.buildDataPrefix(table_name); defer self.allocator.free(data_prefix); var items = std.ArrayList(types.Item).init(self.allocator); errdefer { for (items.items) |*item| json.deinitItem(item, self.allocator); items.deinit(); } var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; defer iter.deinit(); var count: usize = 0; const max_items = limit orelse std.math.maxInt(usize); iter.seek(data_prefix); while (iter.valid() and count < max_items) { const key = iter.key() orelse break; if (!std.mem.startsWith(u8, key, data_prefix)) break; const value = iter.value() orelse break; // Parse the stored JSON into an Item const item = json.parseItem(self.allocator, value) catch { iter.next(); continue; }; items.append(item) catch return StorageError.OutOfMemory; count += 1; iter.next(); } return items.toOwnedSlice() catch return StorageError.OutOfMemory; } /// Query items by partition key and return parsed Items pub fn query(self: *Self, table_name: []const u8, partition_key_value: []const u8, limit: ?usize) StorageError![]types.Item { // Verify table exists var metadata = try self.getTableMetadata(table_name); defer metadata.deinit(self.allocator); // Build prefix for this partition const prefix = try self.buildPartitionPrefix(table_name, partition_key_value); defer self.allocator.free(prefix); var items = std.ArrayList(types.Item).init(self.allocator); errdefer { for (items.items) |*item| json.deinitItem(item, self.allocator); items.deinit(); } var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; defer iter.deinit(); var count: usize = 0; const max_items = limit orelse std.math.maxInt(usize); iter.seek(prefix); while (iter.valid() and count < max_items) { const key = iter.key() orelse break; if (!std.mem.startsWith(u8, key, prefix)) break; const value = iter.value() orelse break; // Parse the stored JSON into an Item const item = json.parseItem(self.allocator, value) catch { iter.next(); continue; }; items.append(item) catch return StorageError.OutOfMemory; count += 1; iter.next(); } return items.toOwnedSlice() catch return StorageError.OutOfMemory; } // === Internal Helpers === fn getTableMetadata(self: *Self, table_name: []const u8) StorageError!TableMetadata { const meta_key = try self.buildMetaKey(table_name); defer self.allocator.free(meta_key); const meta_value = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; if (meta_value == null) return StorageError.TableNotFound; defer self.allocator.free(meta_value.?); return self.deserializeTableMetadata(meta_value.?); } fn buildMetaKey(self: *Self, table_name: []const u8) StorageError![]u8 { return std.fmt.allocPrint(self.allocator, "{s}{s}", .{ KeyPrefix.meta, table_name }) catch return StorageError.OutOfMemory; } fn buildDataPrefix(self: *Self, table_name: []const u8) StorageError![]u8 { return std.fmt.allocPrint(self.allocator, "{s}{s}:", .{ KeyPrefix.data, table_name }) catch return StorageError.OutOfMemory; } fn buildPartitionPrefix(self: *Self, table_name: []const u8, partition_key: []const u8) StorageError![]u8 { return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}", .{ KeyPrefix.data, table_name, partition_key }) catch return StorageError.OutOfMemory; } // === Serialization === fn serializeTableMetadata(self: *Self, metadata: TableMetadata) StorageError![]u8 { var buf = std.ArrayList(u8).init(self.allocator); errdefer buf.deinit(); const writer = buf.writer(); writer.writeAll("{\"TableName\":\"") catch return StorageError.SerializationError; writer.writeAll(metadata.table_name) catch return StorageError.SerializationError; writer.writeAll("\",\"TableStatus\":\"") catch return StorageError.SerializationError; writer.writeAll(metadata.table_status.toString()) catch return StorageError.SerializationError; writer.print("\",\"CreationDateTime\":{d},\"KeySchema\":[", .{metadata.creation_date_time}) catch return StorageError.SerializationError; for (metadata.key_schema, 0..) |ks, i| { if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError; writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError; writer.writeAll(ks.attribute_name) catch return StorageError.SerializationError; writer.writeAll("\",\"KeyType\":\"") catch return StorageError.SerializationError; writer.writeAll(ks.key_type.toString()) catch return StorageError.SerializationError; writer.writeAll("\"}") catch return StorageError.SerializationError; } writer.writeAll("],\"AttributeDefinitions\":[") catch return StorageError.SerializationError; for (metadata.attribute_definitions, 0..) |ad, i| { if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError; writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError; writer.writeAll(ad.attribute_name) catch return StorageError.SerializationError; writer.writeAll("\",\"AttributeType\":\"") catch return StorageError.SerializationError; writer.writeAll(ad.attribute_type.toString()) catch return StorageError.SerializationError; writer.writeAll("\"}") catch return StorageError.SerializationError; } writer.writeAll("]}") catch return StorageError.SerializationError; return buf.toOwnedSlice() catch return StorageError.OutOfMemory; } fn deserializeTableMetadata(self: *Self, data: []const u8) StorageError!TableMetadata { const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, data, .{}) catch return StorageError.SerializationError; defer parsed.deinit(); const root = switch (parsed.value) { .object => |o| o, else => return StorageError.SerializationError, }; // Extract table name const table_name_val = root.get("TableName") orelse return StorageError.SerializationError; const table_name_str = switch (table_name_val) { .string => |s| s, else => return StorageError.SerializationError, }; const table_name = self.allocator.dupe(u8, table_name_str) catch return StorageError.OutOfMemory; errdefer self.allocator.free(table_name); // Extract table status const status_val = root.get("TableStatus") orelse return StorageError.SerializationError; const status_str = switch (status_val) { .string => |s| s, else => return StorageError.SerializationError, }; const table_status: types.TableStatus = if (std.mem.eql(u8, status_str, "ACTIVE")) .ACTIVE else if (std.mem.eql(u8, status_str, "CREATING")) .CREATING else if (std.mem.eql(u8, status_str, "DELETING")) .DELETING else .ACTIVE; // Extract creation time const creation_val = root.get("CreationDateTime") orelse return StorageError.SerializationError; const creation_date_time = switch (creation_val) { .integer => |i| i, else => return StorageError.SerializationError, }; // Extract key schema const key_schema_val = root.get("KeySchema") orelse return StorageError.SerializationError; const key_schema_array = switch (key_schema_val) { .array => |a| a, else => return StorageError.SerializationError, }; var key_schema = std.ArrayList(types.KeySchemaElement).init(self.allocator); errdefer { for (key_schema.items) |ks| self.allocator.free(ks.attribute_name); key_schema.deinit(); } for (key_schema_array.items) |item| { const obj = switch (item) { .object => |o| o, else => return StorageError.SerializationError, }; const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError; const attr_name_str = switch (attr_name_val) { .string => |s| s, else => return StorageError.SerializationError, }; const attr_name = self.allocator.dupe(u8, attr_name_str) catch return StorageError.OutOfMemory; errdefer self.allocator.free(attr_name); const key_type_val = obj.get("KeyType") orelse return StorageError.SerializationError; const key_type_str = switch (key_type_val) { .string => |s| s, else => return StorageError.SerializationError, }; const key_type = types.KeyType.fromString(key_type_str) orelse return StorageError.SerializationError; key_schema.append(.{ .attribute_name = attr_name, .key_type = key_type, }) catch return StorageError.OutOfMemory; } // Extract attribute definitions const attr_defs_val = root.get("AttributeDefinitions") orelse return StorageError.SerializationError; const attr_defs_array = switch (attr_defs_val) { .array => |a| a, else => return StorageError.SerializationError, }; var attr_defs = std.ArrayList(types.AttributeDefinition).init(self.allocator); errdefer { for (attr_defs.items) |ad| self.allocator.free(ad.attribute_name); attr_defs.deinit(); } for (attr_defs_array.items) |item| { const obj = switch (item) { .object => |o| o, else => return StorageError.SerializationError, }; const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError; const attr_name_str = switch (attr_name_val) { .string => |s| s, else => return StorageError.SerializationError, }; const attr_name = self.allocator.dupe(u8, attr_name_str) catch return StorageError.OutOfMemory; errdefer self.allocator.free(attr_name); const attr_type_val = obj.get("AttributeType") orelse return StorageError.SerializationError; const attr_type_str = switch (attr_type_val) { .string => |s| s, else => return StorageError.SerializationError, }; const attr_type = types.ScalarAttributeType.fromString(attr_type_str) orelse return StorageError.SerializationError; attr_defs.append(.{ .attribute_name = attr_name, .attribute_type = attr_type, }) catch return StorageError.OutOfMemory; } return TableMetadata{ .table_name = table_name, .key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory, .attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory, .table_status = table_status, .creation_date_time = creation_date_time, }; } }; test "storage basic operations" { const allocator = std.testing.allocator; const path = "/tmp/test_storage"; defer std.fs.deleteTreeAbsolute(path) catch {}; var engine = try StorageEngine.init(allocator, path); defer engine.deinit(); // Create table const key_schema = [_]types.KeySchemaElement{ .{ .attribute_name = "pk", .key_type = .HASH }, }; const attr_defs = [_]types.AttributeDefinition{ .{ .attribute_name = "pk", .attribute_type = .S }, }; _ = try engine.createTable("TestTable", &key_schema, &attr_defs); // List tables const tables = try engine.listTables(); defer { for (tables) |t| allocator.free(t); allocator.free(tables); } try std.testing.expectEqual(@as(usize, 1), tables.len); try std.testing.expectEqualStrings("TestTable", tables[0]); // Delete table try engine.deleteTable("TestTable"); // Verify deleted const tables2 = try engine.listTables(); defer allocator.free(tables2); try std.testing.expectEqual(@as(usize, 0), tables2.len); } test "putItem and getItem with typed Items" { const allocator = std.testing.allocator; const path = "/tmp/test_storage_typed"; defer std.fs.deleteTreeAbsolute(path) catch {}; var engine = try StorageEngine.init(allocator, path); defer engine.deinit(); const key_schema = [_]types.KeySchemaElement{ .{ .attribute_name = "pk", .key_type = .HASH }, }; const attr_defs = [_]types.AttributeDefinition{ .{ .attribute_name = "pk", .attribute_type = .S }, }; _ = try engine.createTable("Users", &key_schema, &attr_defs); // Create and put item const item_json = "{\"pk\":{\"S\":\"user123\"},\"name\":{\"S\":\"Alice\"}}"; var item = try json.parseItem(allocator, item_json); defer json.deinitItem(&item, allocator); try engine.putItem("Users", item); // Get item back const key_json = "{\"pk\":{\"S\":\"user123\"}}"; var key = try json.parseItem(allocator, key_json); defer json.deinitItem(&key, allocator); const retrieved = try engine.getItem("Users", key); try std.testing.expect(retrieved != null); defer if (retrieved) |*r| json.deinitItem(r, allocator); const pk = retrieved.?.get("pk").?; try std.testing.expectEqualStrings("user123", pk.S); } test "putItem validates key presence" { const allocator = std.testing.allocator; const path = "/tmp/test_storage_validate"; defer std.fs.deleteTreeAbsolute(path) catch {}; var engine = try StorageEngine.init(allocator, path); defer engine.deinit(); const key_schema = [_]types.KeySchemaElement{ .{ .attribute_name = "userId", .key_type = .HASH }, }; const attr_defs = [_]types.AttributeDefinition{ .{ .attribute_name = "userId", .attribute_type = .S }, }; _ = try engine.createTable("Users", &key_schema, &attr_defs); // This should fail - missing userId const bad_item_json = "{\"name\":{\"S\":\"Alice\"}}"; var bad_item = try json.parseItem(allocator, bad_item_json); defer json.deinitItem(&bad_item, allocator); const result = engine.putItem("Users", bad_item); try std.testing.expectError(StorageError.MissingKeyAttribute, result); // This should succeed const good_item_json = "{\"userId\":{\"S\":\"user123\"},\"name\":{\"S\":\"Alice\"}}"; var good_item = try json.parseItem(allocator, good_item_json); defer json.deinitItem(&good_item, allocator); try engine.putItem("Users", good_item); } test "reject key with separator" { const allocator = std.testing.allocator; const path = "/tmp/test_storage_separator"; defer std.fs.deleteTreeAbsolute(path) catch {}; var engine = try StorageEngine.init(allocator, path); defer engine.deinit(); const key_schema = [_]types.KeySchemaElement{ .{ .attribute_name = "pk", .key_type = .HASH }, }; const attr_defs = [_]types.AttributeDefinition{ .{ .attribute_name = "pk", .attribute_type = .S }, }; _ = try engine.createTable("Users", &key_schema, &attr_defs); // This should fail - pk contains ':' const bad_item_json = "{\"pk\":{\"S\":\"user:123\"},\"data\":{\"S\":\"test\"}}"; var bad_item = try json.parseItem(allocator, bad_item_json); defer json.deinitItem(&bad_item, allocator); const result = engine.putItem("Users", bad_item); try std.testing.expectError(StorageError.KeyValueContainsSeparator, result); }