Files
zyna-db/src/dynamodb/storage.zig

388 lines
15 KiB
Zig
Raw Normal View History

2026-01-20 09:34:33 -05:00
/// Storage engine mapping DynamoDB operations to RocksDB
const std = @import("std");
const rocksdb = @import("../rocksdb.zig");
const types = @import("types.zig");
pub const StorageError = error{
TableNotFound,
TableAlreadyExists,
ItemNotFound,
InvalidKey,
SerializationError,
RocksDBError,
OutOfMemory,
};
/// Key prefixes for different data types in RocksDB
const KeyPrefix = struct {
/// Table metadata: _meta:{table_name}
const meta = "_meta:";
/// Item data: _data:{table_name}:{partition_key}[:{sort_key}]
const data = "_data:";
/// Global secondary index: _gsi:{table_name}:{index_name}:{pk}:{sk}
const gsi = "_gsi:";
/// Local secondary index: _lsi:{table_name}:{index_name}:{pk}:{sk}
const lsi = "_lsi:";
};
pub const StorageEngine = struct {
db: rocksdb.DB,
allocator: std.mem.Allocator,
const Self = @This();
pub fn init(allocator: std.mem.Allocator, data_dir: [*:0]const u8) !Self {
const db = rocksdb.DB.open(data_dir, true) catch return StorageError.RocksDBError;
return Self{
.db = db,
.allocator = allocator,
};
}
pub fn deinit(self: *Self) void {
self.db.close();
}
// === Table Operations ===
pub fn createTable(self: *Self, table_name: []const u8, key_schema: []const types.KeySchemaElement, attribute_definitions: []const types.AttributeDefinition) StorageError!types.TableDescription {
// Check if table already exists
const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key);
const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (existing) |e| {
self.allocator.free(e);
return StorageError.TableAlreadyExists;
}
// Create table metadata
const now = std.time.timestamp();
const desc = types.TableDescription{
.table_name = table_name,
.key_schema = key_schema,
.attribute_definitions = attribute_definitions,
.table_status = .ACTIVE,
.creation_date_time = now,
.item_count = 0,
.table_size_bytes = 0,
};
// Serialize and store
const meta_value = try self.serializeTableMetadata(desc);
defer self.allocator.free(meta_value);
self.db.put(meta_key, meta_value) catch return StorageError.RocksDBError;
return desc;
}
pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void {
const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key);
// Verify table exists
const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (existing == null) return StorageError.TableNotFound;
self.allocator.free(existing.?);
// Delete all items with this table's prefix
const data_prefix = try self.buildDataPrefix(table_name);
defer self.allocator.free(data_prefix);
var batch = rocksdb.WriteBatch.init() orelse return StorageError.RocksDBError;
defer batch.deinit();
// Scan and delete all matching keys
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
iter.seek(data_prefix);
while (iter.valid()) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, data_prefix)) break;
batch.delete(key);
iter.next();
}
// Delete metadata
batch.delete(meta_key);
batch.write(&self.db) catch return StorageError.RocksDBError;
}
pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription {
const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key);
const meta_value = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (meta_value == null) return StorageError.TableNotFound;
defer self.allocator.free(meta_value.?);
return self.deserializeTableMetadata(meta_value.?);
}
pub fn listTables(self: *Self) StorageError![][]const u8 {
var tables = std.ArrayList([]const u8){};
errdefer {
for (tables.items) |t| self.allocator.free(t);
tables.deinit(self.allocator);
}
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
iter.seek(KeyPrefix.meta);
while (iter.valid()) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, KeyPrefix.meta)) break;
const table_name = key[KeyPrefix.meta.len..];
const owned_name = self.allocator.dupe(u8, table_name) catch return StorageError.OutOfMemory;
tables.append(self.allocator, owned_name) catch return StorageError.OutOfMemory;
iter.next();
}
return tables.toOwnedSlice(self.allocator) catch return StorageError.OutOfMemory;
}
// === Item Operations ===
pub fn putItem(self: *Self, table_name: []const u8, item_json: []const u8) StorageError!void {
// Verify table exists
const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key);
const meta = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (meta == null) return StorageError.TableNotFound;
defer self.allocator.free(meta.?);
// Extract key from item (simplified - assumes key is extractable from JSON)
const item_key = try self.extractKeyFromItem(table_name, item_json);
defer self.allocator.free(item_key);
self.db.put(item_key, item_json) catch return StorageError.RocksDBError;
}
pub fn getItem(self: *Self, table_name: []const u8, key_json: []const u8) StorageError!?[]u8 {
const item_key = try self.buildItemKey(table_name, key_json);
defer self.allocator.free(item_key);
return self.db.get(self.allocator, item_key) catch return StorageError.RocksDBError;
}
pub fn deleteItem(self: *Self, table_name: []const u8, key_json: []const u8) StorageError!void {
const item_key = try self.buildItemKey(table_name, key_json);
defer self.allocator.free(item_key);
self.db.delete(item_key) catch return StorageError.RocksDBError;
}
pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![][]const u8 {
const data_prefix = try self.buildDataPrefix(table_name);
defer self.allocator.free(data_prefix);
var items = std.ArrayList([]const u8){};
errdefer {
for (items.items) |item| self.allocator.free(item);
items.deinit(self.allocator);
}
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
var count: usize = 0;
const max_items = limit orelse std.math.maxInt(usize);
iter.seek(data_prefix);
while (iter.valid() and count < max_items) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, data_prefix)) break;
const value = iter.value() orelse break;
const owned_value = self.allocator.dupe(u8, value) catch return StorageError.OutOfMemory;
items.append(self.allocator, owned_value) catch return StorageError.OutOfMemory;
count += 1;
iter.next();
}
return items.toOwnedSlice(self.allocator) catch return StorageError.OutOfMemory;
}
pub fn query(self: *Self, table_name: []const u8, partition_key: []const u8, limit: ?usize) StorageError![][]const u8 {
// Build prefix for this partition
const prefix = try self.buildPartitionPrefix(table_name, partition_key);
defer self.allocator.free(prefix);
var items = std.ArrayList([]const u8){};
errdefer {
for (items.items) |item| self.allocator.free(item);
items.deinit(self.allocator);
}
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
var count: usize = 0;
const max_items = limit orelse std.math.maxInt(usize);
iter.seek(prefix);
while (iter.valid() and count < max_items) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, prefix)) break;
const value = iter.value() orelse break;
const owned_value = self.allocator.dupe(u8, value) catch return StorageError.OutOfMemory;
items.append(self.allocator, owned_value) catch return StorageError.OutOfMemory;
count += 1;
iter.next();
}
return items.toOwnedSlice(self.allocator) catch return StorageError.OutOfMemory;
}
// === Key Building Helpers ===
fn buildMetaKey(self: *Self, table_name: []const u8) StorageError![]u8 {
return std.fmt.allocPrint(self.allocator, "{s}{s}", .{ KeyPrefix.meta, table_name }) catch return StorageError.OutOfMemory;
}
fn buildDataPrefix(self: *Self, table_name: []const u8) StorageError![]u8 {
return std.fmt.allocPrint(self.allocator, "{s}{s}:", .{ KeyPrefix.data, table_name }) catch return StorageError.OutOfMemory;
}
fn buildPartitionPrefix(self: *Self, table_name: []const u8, partition_key: []const u8) StorageError![]u8 {
return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}", .{ KeyPrefix.data, table_name, partition_key }) catch return StorageError.OutOfMemory;
}
fn buildItemKey(self: *Self, table_name: []const u8, key_json: []const u8) StorageError![]u8 {
// Parse the key JSON to extract partition key (and sort key if present)
// For now, use simplified key extraction
const pk = extractStringValue(key_json, "pk") orelse extractStringValue(key_json, "PK") orelse return StorageError.InvalidKey;
const sk = extractStringValue(key_json, "sk") orelse extractStringValue(key_json, "SK");
if (sk) |sort_key| {
return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}:{s}", .{ KeyPrefix.data, table_name, pk, sort_key }) catch return StorageError.OutOfMemory;
} else {
return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}", .{ KeyPrefix.data, table_name, pk }) catch return StorageError.OutOfMemory;
}
}
fn extractKeyFromItem(self: *Self, table_name: []const u8, item_json: []const u8) StorageError![]u8 {
return self.buildItemKey(table_name, item_json);
}
// === Serialization Helpers ===
fn serializeTableMetadata(self: *Self, desc: types.TableDescription) StorageError![]u8 {
var buf = std.ArrayList(u8){};
errdefer buf.deinit(self.allocator);
const writer = buf.writer(self.allocator);
writer.print("{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"CreationDateTime\":{d},\"ItemCount\":{d},\"TableSizeBytes\":{d},\"KeySchema\":[", .{
desc.table_name,
desc.table_status.toString(),
desc.creation_date_time,
desc.item_count,
desc.table_size_bytes,
}) catch return StorageError.SerializationError;
for (desc.key_schema, 0..) |ks, i| {
if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError;
writer.print("{{\"AttributeName\":\"{s}\",\"KeyType\":\"{s}\"}}", .{
ks.attribute_name,
ks.key_type.toString(),
}) catch return StorageError.SerializationError;
}
writer.writeAll("],\"AttributeDefinitions\":[") catch return StorageError.SerializationError;
for (desc.attribute_definitions, 0..) |ad, i| {
if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError;
writer.print("{{\"AttributeName\":\"{s}\",\"AttributeType\":\"{s}\"}}", .{
ad.attribute_name,
ad.attribute_type.toString(),
}) catch return StorageError.SerializationError;
}
writer.writeAll("]}") catch return StorageError.SerializationError;
return buf.toOwnedSlice(self.allocator) catch return StorageError.OutOfMemory;
}
fn deserializeTableMetadata(self: *Self, data: []const u8) StorageError!types.TableDescription {
// Simplified deserialization - in production, use proper JSON parsing
_ = self;
const table_name = extractStringValue(data, "TableName") orelse return StorageError.SerializationError;
const status_str = extractStringValue(data, "TableStatus") orelse "ACTIVE";
const status: types.TableStatus = if (std.mem.eql(u8, status_str, "ACTIVE"))
.ACTIVE
else if (std.mem.eql(u8, status_str, "CREATING"))
.CREATING
else
.ACTIVE;
return types.TableDescription{
.table_name = table_name,
.key_schema = &[_]types.KeySchemaElement{},
.attribute_definitions = &[_]types.AttributeDefinition{},
.table_status = status,
.creation_date_time = 0,
.item_count = 0,
.table_size_bytes = 0,
};
}
};
/// Simple JSON string value extraction (production code should use std.json)
fn extractStringValue(json_data: []const u8, key: []const u8) ?[]const u8 {
// Look for "key":"value" pattern
var search_buf: [256]u8 = undefined;
const search_pattern = std.fmt.bufPrint(&search_buf, "\"{s}\":\"", .{key}) catch return null;
const start = std.mem.indexOf(u8, json_data, search_pattern) orelse return null;
const value_start = start + search_pattern.len;
const value_end = std.mem.indexOfPos(u8, json_data, value_start, "\"") orelse return null;
return json_data[value_start..value_end];
}
test "storage basic operations" {
const allocator = std.testing.allocator;
const path = "/tmp/test_storage";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try StorageEngine.init(allocator, path);
defer engine.deinit();
// Create table
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
_ = try engine.createTable("TestTable", &key_schema, &attr_defs);
// List tables
const tables = try engine.listTables();
defer {
for (tables) |t| allocator.free(t);
allocator.free(tables);
}
try std.testing.expectEqual(@as(usize, 1), tables.len);
try std.testing.expectEqualStrings("TestTable", tables[0]);
// Delete table
try engine.deleteTable("TestTable");
// Verify deleted
const tables2 = try engine.listTables();
defer allocator.free(tables2);
try std.testing.expectEqual(@as(usize, 0), tables2.len);
}