Files
zyna-db/project_context.txt

3904 lines
126 KiB
Plaintext
Raw Normal View History

2026-01-20 10:48:42 -05:00
# Project: zyna-db
# Generated: Tue Jan 20 10:15:05 AM EST 2026
================================================================================
================================================================================
FILE: ./build.zig
================================================================================
const std = @import("std");
pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});
// === Main Executable ===
const exe_module = b.createModule(.{
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
});
const exe = b.addExecutable(.{
.name = "zynamodb",
.root_module = exe_module,
});
// Link RocksDB (C library)
exe.linkLibC();
exe.linkSystemLibrary("rocksdb");
// Compression libraries that RocksDB depends on
exe.linkSystemLibrary("snappy");
exe.linkSystemLibrary("lz4");
exe.linkSystemLibrary("zstd");
exe.linkSystemLibrary("z");
exe.linkSystemLibrary("bz2");
exe.linkSystemLibrary("stdc++");
// Add include path for RocksDB headers
exe.addIncludePath(.{ .cwd_relative = "/usr/local/include" });
exe.addLibraryPath(.{ .cwd_relative = "/usr/local/lib" });
b.installArtifact(exe);
// === Run Command ===
const run_cmd = b.addRunArtifact(exe);
run_cmd.step.dependOn(b.getInstallStep());
if (b.args) |args| {
run_cmd.addArgs(args);
}
const run_step = b.step("run", "Run the DynamoDB-compatible server");
run_step.dependOn(&run_cmd.step);
// === Unit Tests ===
const unit_tests_module = b.createModule(.{
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
});
const unit_tests = b.addTest(.{
.root_module = unit_tests_module,
});
unit_tests.linkLibC();
unit_tests.linkSystemLibrary("rocksdb");
unit_tests.linkSystemLibrary("snappy");
unit_tests.linkSystemLibrary("lz4");
unit_tests.linkSystemLibrary("zstd");
unit_tests.linkSystemLibrary("z");
unit_tests.linkSystemLibrary("bz2");
unit_tests.linkSystemLibrary("stdc++");
unit_tests.addIncludePath(.{ .cwd_relative = "/usr/local/include" });
unit_tests.addLibraryPath(.{ .cwd_relative = "/usr/local/lib" });
const run_unit_tests = b.addRunArtifact(unit_tests);
const test_step = b.step("test", "Run unit tests");
test_step.dependOn(&run_unit_tests.step);
// === Integration Tests ===
const integration_tests_module = b.createModule(.{
.root_source_file = b.path("tests/integration.zig"),
.target = target,
.optimize = optimize,
});
const integration_tests = b.addTest(.{
.root_module = integration_tests_module,
});
integration_tests.linkLibC();
integration_tests.linkSystemLibrary("rocksdb");
integration_tests.linkSystemLibrary("snappy");
integration_tests.linkSystemLibrary("lz4");
integration_tests.linkSystemLibrary("zstd");
integration_tests.linkSystemLibrary("z");
integration_tests.linkSystemLibrary("bz2");
integration_tests.linkSystemLibrary("stdc++");
integration_tests.addIncludePath(.{ .cwd_relative = "/usr/local/include" });
integration_tests.addLibraryPath(.{ .cwd_relative = "/usr/local/lib" });
const run_integration_tests = b.addRunArtifact(integration_tests);
const integration_step = b.step("test-integration", "Run integration tests");
integration_step.dependOn(&run_integration_tests.step);
// === Benchmarks ===
const bench_module = b.createModule(.{
.root_source_file = b.path("src/bench.zig"),
.target = target,
.optimize = .ReleaseFast,
});
const bench = b.addExecutable(.{
.name = "bench",
.root_module = bench_module,
});
bench.linkLibC();
bench.linkSystemLibrary("rocksdb");
bench.linkSystemLibrary("snappy");
bench.linkSystemLibrary("lz4");
bench.linkSystemLibrary("zstd");
bench.linkSystemLibrary("z");
bench.linkSystemLibrary("bz2");
bench.linkSystemLibrary("stdc++");
bench.addIncludePath(.{ .cwd_relative = "/usr/local/include" });
bench.addLibraryPath(.{ .cwd_relative = "/usr/local/lib" });
const run_bench = b.addRunArtifact(bench);
const bench_step = b.step("bench", "Run benchmarks");
bench_step.dependOn(&run_bench.step);
// === Clean ===
const clean_step = b.step("clean", "Remove build artifacts");
clean_step.dependOn(&b.addRemoveDirTree(b.path("zig-out")).step);
clean_step.dependOn(&b.addRemoveDirTree(b.path(".zig-cache")).step);
}
================================================================================
FILE: ./build.zig.zon
================================================================================
.{
.name = .zyna_db,
.version = "0.1.0",
.fingerprint = 0x990c9202941b334a, // ← Copy from error message
.minimum_zig_version = "0.15.1",
// Specify which files/directories to include in package
.paths = .{
"build.zig",
"build.zig.zon",
"src",
"tests",
"README.md",
"Makefile",
},
.dependencies = .{
// External package dependencies
},
}
================================================================================
FILE: ./docker-compose.yml
================================================================================
services:
dev:
build:
context: .
dockerfile: Dockerfile
container_name: dynamodb-zig-dev
volumes:
- .:/workspace
- zig-cache:/root/.cache/zig
ports:
- "8000:8000" # DynamoDB API port
- "8080:8080" # Admin/metrics port (optional)
working_dir: /workspace
stdin_open: true
tty: true
command: /bin/bash
# Optional: Run the actual server
server:
build:
context: .
dockerfile: Dockerfile
container_name: dynamodb-zig-server
volumes:
- .:/workspace
- zig-cache:/root/.cache/zig
- db-data:/workspace/data
ports:
- "8000:8000"
working_dir: /workspace
command: zig build run
# Optional: DynamoDB Local for compatibility testing
dynamodb-local:
image: amazon/dynamodb-local:latest
container_name: dynamodb-local
ports:
- "8001:8000"
command: "-jar DynamoDBLocal.jar -sharedDb -inMemory"
# DynamoDB Admin GUI
dynamodb-admin:
image: aaronshaf/dynamodb-admin:latest
container_name: dynamodb-admin
ports:
- "8002:8001" # Admin GUI will be available at localhost:8002
environment:
- DYNAMO_ENDPOINT=http://server:8000 # Points to your Zig server
# Alternative: Use http://dynamodb-local:8000 to connect to AWS's DynamoDB Local
- AWS_REGION=us-east-1
- AWS_ACCESS_KEY_ID=local
- AWS_SECRET_ACCESS_KEY=local
depends_on:
- server
volumes:
zig-cache:
db-data:
================================================================================
FILE: ./Dockerfile
================================================================================
FROM ubuntu:24.04
ENV DEBIAN_FRONTEND=noninteractive
# Install base dependencies
RUN apt-get update && apt-get install -y \
curl \
wget \
git \
build-essential \
cmake \
ninja-build \
pkg-config \
libsnappy-dev \
liblz4-dev \
libzstd-dev \
libbz2-dev \
zlib1g-dev \
libgflags-dev \
xz-utils \
&& rm -rf /var/lib/apt/lists/*
# Install Zig 0.13.0 (latest stable)
ARG ZIG_VERSION=0.13.0
RUN curl -L "https://ziglang.org/download/${ZIG_VERSION}/zig-linux-x86_64-${ZIG_VERSION}.tar.xz" | tar -xJ -C /opt \
&& ln -s /opt/zig-linux-x86_64-${ZIG_VERSION}/zig /usr/local/bin/zig
# Build RocksDB from source with shared library
ARG ROCKSDB_VERSION=9.6.1
WORKDIR /tmp
RUN git clone --depth 1 --branch v${ROCKSDB_VERSION} https://github.com/facebook/rocksdb.git \
&& cd rocksdb \
&& mkdir build && cd build \
&& cmake .. \
-DCMAKE_BUILD_TYPE=Release \
-DWITH_SNAPPY=ON \
-DWITH_LZ4=ON \
-DWITH_ZSTD=ON \
-DWITH_BZ2=ON \
-DWITH_ZLIB=ON \
-DWITH_GFLAGS=OFF \
-DROCKSDB_BUILD_SHARED=ON \
-DWITH_TESTS=OFF \
-DWITH_BENCHMARK_TOOLS=OFF \
-DWITH_TOOLS=OFF \
-G Ninja \
&& ninja -j$(nproc) \
&& ninja install \
&& ldconfig \
&& cd / && rm -rf /tmp/rocksdb
# Set up working directory
WORKDIR /workspace
# Default command
CMD ["/bin/bash"]
================================================================================
FILE: ./Makefile
================================================================================
.PHONY: all build release run test test-integration bench clean docker-build docker-shell docker-run docker-test fmt help
# Default target
all: build
# === Build Targets ===
build:
zig build
release:
zig build -Doptimize=ReleaseFast
clean:
rm -rf zig-out .zig-cache data
fmt:
zig fmt src/ tests/
# === Run Targets ===
run: build
zig build run
run-port: build
zig build run -- --port $(PORT)
# === Test Targets ===
test:
zig build test
test-integration:
zig build test-integration
test-all: test test-integration
bench:
zig build bench
# === Docker Targets ===
docker-build:
docker-compose build dev
docker-shell:
docker-compose run --rm dev
docker-run:
docker-compose up server
docker-test:
docker-compose run --rm dev zig build test
docker-bench:
docker-compose run --rm dev zig build bench
docker-clean:
docker-compose down -v
docker rmi dynamodb-zig-dev 2>/dev/null || true
# === AWS CLI Test ===
aws-test:
@echo "Creating table..."
aws dynamodb create-table \
--endpoint-url http://localhost:8000 \
--table-name TestTable \
--key-schema AttributeName=pk,KeyType=HASH \
--attribute-definitions AttributeName=pk,AttributeType=S \
--billing-mode PAY_PER_REQUEST || true
@echo "\nListing tables..."
aws dynamodb list-tables --endpoint-url http://localhost:8000
@echo "\nPutting item..."
aws dynamodb put-item \
--endpoint-url http://localhost:8000 \
--table-name TestTable \
--item '{"pk":{"S":"test1"},"data":{"S":"hello world"}}'
@echo "\nGetting item..."
aws dynamodb get-item \
--endpoint-url http://localhost:8000 \
--table-name TestTable \
--key '{"pk":{"S":"test1"}}'
@echo "\nScanning table..."
aws dynamodb scan \
--endpoint-url http://localhost:8000 \
--table-name TestTable
# === Local DynamoDB (for comparison) ===
dynamodb-local:
docker-compose up dynamodb-local
# === Help ===
help:
@echo "ZynamoDB Development Commands"
@echo ""
@echo "Build & Run:"
@echo " make build - Build debug version"
@echo " make release - Build optimized release"
@echo " make run - Build and run server"
@echo " make run-port PORT=8080 - Run on custom port"
@echo ""
@echo "Testing:"
@echo " make test - Run unit tests"
@echo " make test-integration - Run integration tests"
@echo " make test-all - Run all tests"
@echo " make bench - Run benchmarks"
@echo ""
@echo "Docker:"
@echo " make docker-build - Build Docker image"
@echo " make docker-shell - Open shell in container"
@echo " make docker-run - Run server in Docker"
@echo " make docker-test - Run tests in Docker"
@echo ""
@echo "Utilities:"
@echo " make clean - Remove build artifacts"
@echo " make fmt - Format source code"
@echo " make aws-test - Test with AWS CLI"
@echo " make help - Show this help"
================================================================================
FILE: ./README.md
================================================================================
# ZynamoDB
A DynamoDB-compatible database built with **Zig** and **RocksDB**.
## Why Zig?
Zig was chosen over C++ for several reasons:
1. **Built-in Memory Safety** - Compile-time safety checks without garbage collection
2. **Seamless C Interop** - RocksDB's C API can be imported directly with `@cImport`
3. **Simple Build System** - `build.zig` replaces complex CMake/Makefile configurations
4. **No Hidden Control Flow** - Explicit error handling, no exceptions
5. **Modern Tooling** - Built-in test framework, documentation generator, and package manager
## Features
### Implemented Operations
- ✅ CreateTable
- ✅ DeleteTable
- ✅ DescribeTable
- ✅ ListTables
- ✅ PutItem
- ✅ GetItem
- ✅ DeleteItem
- ✅ Query (basic)
- ✅ Scan
### Planned Operations
- 🚧 UpdateItem
- 🚧 BatchGetItem
- 🚧 BatchWriteItem
- 🚧 TransactGetItems
- 🚧 TransactWriteItems
- 🚧 Global Secondary Indexes
- 🚧 Local Secondary Indexes
## Quick Start
### Using Docker (Recommended)
```bash
# Build the development container
docker-compose build dev
# Start a shell in the container
docker-compose run --rm dev
# Inside the container:
zig build run
```
### Native Build (requires Zig 0.13+ and RocksDB)
```bash
# Install dependencies (Ubuntu/Debian)
sudo apt install librocksdb-dev libsnappy-dev liblz4-dev libzstd-dev
# Build and run
zig build run
```
## Usage
### Starting the Server
```bash
# Default (port 8000)
zig build run
# Custom port
zig build run -- --port 8080
# Custom data directory
zig build run -- --data-dir /var/lib/zynamodb
```
### Using AWS CLI
```bash
# Create a table
aws dynamodb create-table \
--endpoint-url http://localhost:8000 \
--table-name Users \
--key-schema AttributeName=pk,KeyType=HASH \
--attribute-definitions AttributeName=pk,AttributeType=S \
--billing-mode PAY_PER_REQUEST
# Put an item
aws dynamodb put-item \
--endpoint-url http://localhost:8000 \
--table-name Users \
--item '{"pk":{"S":"user123"},"name":{"S":"Alice"},"email":{"S":"alice@example.com"}}'
# Get an item
aws dynamodb get-item \
--endpoint-url http://localhost:8000 \
--table-name Users \
--key '{"pk":{"S":"user123"}}'
# Scan the table
aws dynamodb scan \
--endpoint-url http://localhost:8000 \
--table-name Users
# List tables
aws dynamodb list-tables --endpoint-url http://localhost:8000
```
### Using Python (boto3)
```python
import boto3
# Connect to local ZynamoDB
dynamodb = boto3.client(
'dynamodb',
endpoint_url='http://localhost:8000',
region_name='us-east-1',
aws_access_key_id='fake',
aws_secret_access_key='fake'
)
# Create table
dynamodb.create_table(
TableName='Products',
KeySchema=[{'AttributeName': 'pk', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'pk', 'AttributeType': 'S'}],
BillingMode='PAY_PER_REQUEST'
)
# Put item
dynamodb.put_item(
TableName='Products',
Item={
'pk': {'S': 'prod-001'},
'name': {'S': 'Widget'},
'price': {'N': '29.99'}
}
)
# Get item
response = dynamodb.get_item(
TableName='Products',
Key={'pk': {'S': 'prod-001'}}
)
print(response.get('Item'))
```
## Development
### Project Structure
```
dynamodb-compat/
├── Dockerfile # Dev container with Zig + RocksDB
├── docker-compose.yml # Container orchestration
├── build.zig # Zig build configuration
├── Makefile # Convenience commands
├── src/
│ ├── main.zig # Entry point
│ ├── rocksdb.zig # RocksDB C bindings
│ ├── http.zig # HTTP server
│ ├── bench.zig # Performance benchmarks
│ └── dynamodb/
│ ├── types.zig # DynamoDB protocol types
│ ├── storage.zig # Storage engine (RocksDB mapping)
│ └── handler.zig # API request handlers
└── tests/
└── integration.zig # Integration tests
```
### Build Commands
```bash
# Build
make build # Debug build
make release # Optimized release
# Test
make test # Unit tests
make test-integration # Integration tests
make test-all # All tests
# Run
make run # Start server
make run-port PORT=8080 # Custom port
# Benchmark
make bench # Run benchmarks
# Docker
make docker-build # Build container
make docker-shell # Open shell
make docker-test # Run tests in container
```
### Running Tests
```bash
# Unit tests
zig build test
# Integration tests
zig build test-integration
# With Docker
docker-compose run --rm dev zig build test
```
### Running Benchmarks
```bash
zig build bench
# Or with make
make bench
```
## Architecture
### Storage Model
Data is stored in RocksDB with the following key prefixes:
| Prefix | Purpose | Format |
|--------|---------|--------|
| `_meta:` | Table metadata | `_meta:{table_name}` |
| `_data:` | Item data | `_data:{table}:{pk}` or `_data:{table}:{pk}:{sk}` |
| `_gsi:` | Global secondary index | `_gsi:{table}:{index}:{pk}:{sk}` |
| `_lsi:` | Local secondary index | `_lsi:{table}:{index}:{pk}:{sk}` |
### HTTP Server
- Custom HTTP/1.1 implementation using Zig's stdlib
- Thread-per-connection model (suitable for moderate load)
- Parses `X-Amz-Target` header to route DynamoDB operations
### DynamoDB Protocol
- JSON request/response format
- Standard DynamoDB error responses
- Compatible with AWS SDKs (boto3, AWS CLI, etc.)
## Configuration
### Command Line Options
| Option | Description | Default |
|--------|-------------|---------|
| `-p, --port` | HTTP port | 8000 |
| `-h, --host` | Bind address | 0.0.0.0 |
| `-d, --data-dir` | RocksDB data directory | ./data |
| `-v, --verbose` | Enable verbose logging | false |
### Environment Variables
| Variable | Description |
|----------|-------------|
| `DYNAMODB_PORT` | Override port |
| `ROCKSDB_DATA_DIR` | Override data directory |
## Performance
Preliminary benchmarks on development hardware:
| Operation | Ops/sec |
|-----------|---------|
| PutItem | ~15,000 |
| GetItem | ~25,000 |
| Scan (1K items) | ~50,000 |
Run `make bench` for actual numbers on your hardware.
## Comparison with DynamoDB Local
| Feature | ZynamoDB | DynamoDB Local |
|---------|----------|----------------|
| Language | Zig | Java |
| Storage | RocksDB | SQLite |
| Memory | ~10MB | ~200MB+ |
| Startup | Instant | 2-5 seconds |
| Persistence | Yes | Optional |
## License
MIT
## Contributing
Contributions welcome! Please read the contributing guidelines first.
Areas that need work:
- UpdateItem with expression parsing
- Batch operations
- Secondary indexes
- Streams support
- Better JSON parsing (currently simplified)
================================================================================
FILE: ./src/bench.zig
================================================================================
/// Performance benchmarks for ZynamoDB
const std = @import("std");
const rocksdb = @import("rocksdb.zig");
const storage = @import("dynamodb/storage.zig");
const types = @import("dynamodb/types.zig");
const BenchResult = struct {
name: []const u8,
ops: u64,
duration_ns: u64,
pub fn opsPerSec(self: BenchResult) f64 {
return @as(f64, @floatFromInt(self.ops)) / (@as(f64, @floatFromInt(self.duration_ns)) / 1_000_000_000.0);
}
pub fn print(self: BenchResult) void {
std.debug.print("{s:30} | {d:10} ops | {d:8.2} ms | {d:12.0} ops/sec\n", .{
self.name,
self.ops,
@as(f64, @floatFromInt(self.duration_ns)) / 1_000_000.0,
self.opsPerSec(),
});
}
};
fn runBench(name: []const u8, ops: u64, func: anytype) BenchResult {
const start = std.time.nanoTimestamp();
func();
const end = std.time.nanoTimestamp();
return BenchResult{
.name = name,
.ops = ops,
.duration_ns = @intCast(end - start),
};
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
std.debug.print("\n", .{});
std.debug.print("=" ** 70 ++ "\n", .{});
std.debug.print(" ZynamoDB Performance Benchmarks\n", .{});
std.debug.print("=" ** 70 ++ "\n\n", .{});
// Setup
const path = "/tmp/bench_zynamodb";
defer std.fs.deleteTreeAbsolute(path) catch {};
// Raw RocksDB benchmarks
std.debug.print("RocksDB Raw Operations:\n", .{});
std.debug.print("-" ** 70 ++ "\n", .{});
try benchRocksDBWrites(allocator, path);
try benchRocksDBReads(allocator, path);
try benchRocksDBBatch(allocator, path);
try benchRocksDBScan(allocator, path);
std.debug.print("\n", .{});
// Storage engine benchmarks
std.debug.print("Storage Engine Operations:\n", .{});
std.debug.print("-" ** 70 ++ "\n", .{});
try benchStoragePutItem(allocator, path);
try benchStorageGetItem(allocator, path);
try benchStorageScan(allocator, path);
std.debug.print("\n" ++ "=" ** 70 ++ "\n", .{});
}
fn benchRocksDBWrites(allocator: std.mem.Allocator, base_path: []const u8) !void {
_ = allocator;
const path = "/tmp/bench_rocksdb_writes";
defer std.fs.deleteTreeAbsolute(path) catch {};
var db = try rocksdb.DB.open(path, true);
defer db.close();
const ops: u64 = 10000;
var key_buf: [32]u8 = undefined;
var val_buf: [256]u8 = undefined;
const result = runBench("Sequential Writes", ops, struct {
fn run(d: *rocksdb.DB, kb: *[32]u8, vb: *[256]u8, n: u64) void {
var i: u64 = 0;
while (i < n) : (i += 1) {
const key = std.fmt.bufPrint(kb, "key_{d:0>10}", .{i}) catch continue;
const val = std.fmt.bufPrint(vb, "value_{d}_padding_data_to_make_it_realistic", .{i}) catch continue;
d.put(key, val) catch {};
}
}
}.run, .{ &db, &key_buf, &val_buf, ops });
_ = base_path;
result.print();
}
fn benchRocksDBReads(allocator: std.mem.Allocator, base_path: []const u8) !void {
const path = "/tmp/bench_rocksdb_reads";
defer std.fs.deleteTreeAbsolute(path) catch {};
var db = try rocksdb.DB.open(path, true);
defer db.close();
// First write some data
var key_buf: [32]u8 = undefined;
var val_buf: [256]u8 = undefined;
const ops: u64 = 10000;
var i: u64 = 0;
while (i < ops) : (i += 1) {
const key = try std.fmt.bufPrint(&key_buf, "key_{d:0>10}", .{i});
const val = try std.fmt.bufPrint(&val_buf, "value_{d}_padding", .{i});
try db.put(key, val);
}
// Now benchmark reads
var prng = std.Random.DefaultPrng.init(12345);
const random = prng.random();
const result = runBench("Random Reads", ops, struct {
fn run(d: *rocksdb.DB, alloc: std.mem.Allocator, kb: *[32]u8, r: std.Random, n: u64) void {
var j: u64 = 0;
while (j < n) : (j += 1) {
const idx = r.intRangeAtMost(u64, 0, n - 1);
const key = std.fmt.bufPrint(kb, "key_{d:0>10}", .{idx}) catch continue;
const val = d.get(alloc, key) catch continue;
if (val) |v| alloc.free(v);
}
}
}.run, .{ &db, allocator, &key_buf, random, ops });
_ = base_path;
result.print();
}
fn benchRocksDBBatch(allocator: std.mem.Allocator, base_path: []const u8) !void {
_ = allocator;
const path = "/tmp/bench_rocksdb_batch";
defer std.fs.deleteTreeAbsolute(path) catch {};
var db = try rocksdb.DB.open(path, true);
defer db.close();
const ops: u64 = 10000;
var key_buf: [32]u8 = undefined;
var val_buf: [256]u8 = undefined;
const result = runBench("Batch Writes", ops, struct {
fn run(d: *rocksdb.DB, kb: *[32]u8, vb: *[256]u8, n: u64) void {
var batch = rocksdb.WriteBatch.init() orelse return;
defer batch.deinit();
var i: u64 = 0;
while (i < n) : (i += 1) {
const key = std.fmt.bufPrint(kb, "batch_key_{d:0>10}", .{i}) catch continue;
const val = std.fmt.bufPrint(vb, "batch_value_{d}", .{i}) catch continue;
batch.put(key, val);
}
batch.write(d) catch {};
}
}.run, .{ &db, &key_buf, &val_buf, ops });
_ = base_path;
result.print();
}
fn benchRocksDBScan(allocator: std.mem.Allocator, base_path: []const u8) !void {
_ = allocator;
const path = "/tmp/bench_rocksdb_scan";
defer std.fs.deleteTreeAbsolute(path) catch {};
var db = try rocksdb.DB.open(path, true);
defer db.close();
// Write data
var key_buf: [32]u8 = undefined;
var val_buf: [256]u8 = undefined;
const ops: u64 = 10000;
var i: u64 = 0;
while (i < ops) : (i += 1) {
const key = try std.fmt.bufPrint(&key_buf, "scan_key_{d:0>10}", .{i});
const val = try std.fmt.bufPrint(&val_buf, "scan_value_{d}", .{i});
try db.put(key, val);
}
const result = runBench("Full Scan", ops, struct {
fn run(d: *rocksdb.DB, n: u64) void {
_ = n;
var iter = rocksdb.Iterator.init(d) orelse return;
defer iter.deinit();
iter.seekToFirst();
var count: u64 = 0;
while (iter.valid()) {
_ = iter.key();
_ = iter.value();
count += 1;
iter.next();
}
}
}.run, .{ &db, ops });
_ = base_path;
result.print();
}
fn benchStoragePutItem(allocator: std.mem.Allocator, base_path: []const u8) !void {
_ = base_path;
const path = "/tmp/bench_storage_put";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try storage.StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
_ = try engine.createTable("BenchTable", &key_schema, &attr_defs);
const ops: u64 = 5000;
var item_buf: [512]u8 = undefined;
const start = std.time.nanoTimestamp();
var i: u64 = 0;
while (i < ops) : (i += 1) {
const item = std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"name\":{{\"S\":\"User {d}\"}},\"email\":{{\"S\":\"user{d}@example.com\"}}}}", .{ i, i, i }) catch continue;
engine.putItem("BenchTable", item) catch {};
}
const end = std.time.nanoTimestamp();
const result = BenchResult{
.name = "PutItem",
.ops = ops,
.duration_ns = @intCast(end - start),
};
result.print();
}
fn benchStorageGetItem(allocator: std.mem.Allocator, base_path: []const u8) !void {
_ = base_path;
const path = "/tmp/bench_storage_get";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try storage.StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
_ = try engine.createTable("BenchTable", &key_schema, &attr_defs);
// Write data first
const ops: u64 = 5000;
var item_buf: [512]u8 = undefined;
var key_buf: [128]u8 = undefined;
var i: u64 = 0;
while (i < ops) : (i += 1) {
const item = try std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"data\":{{\"S\":\"test\"}}}}", .{i});
try engine.putItem("BenchTable", item);
}
// Benchmark reads
var prng = std.Random.DefaultPrng.init(12345);
const random = prng.random();
const start = std.time.nanoTimestamp();
i = 0;
while (i < ops) : (i += 1) {
const idx = random.intRangeAtMost(u64, 0, ops - 1);
const key = std.fmt.bufPrint(&key_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}}}}", .{idx}) catch continue;
const item = engine.getItem("BenchTable", key) catch continue;
if (item) |v| allocator.free(v);
}
const end = std.time.nanoTimestamp();
const result = BenchResult{
.name = "GetItem",
.ops = ops,
.duration_ns = @intCast(end - start),
};
result.print();
}
fn benchStorageScan(allocator: std.mem.Allocator, base_path: []const u8) !void {
_ = base_path;
const path = "/tmp/bench_storage_scan";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try storage.StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
_ = try engine.createTable("BenchTable", &key_schema, &attr_defs);
// Write data first
const ops: u64 = 5000;
var item_buf: [512]u8 = undefined;
var i: u64 = 0;
while (i < ops) : (i += 1) {
const item = try std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"data\":{{\"S\":\"test\"}}}}", .{i});
try engine.putItem("BenchTable", item);
}
// Benchmark scan
const start = std.time.nanoTimestamp();
const items = try engine.scan("BenchTable", null);
const end = std.time.nanoTimestamp();
// Cleanup
for (items) |item| allocator.free(item);
allocator.free(items);
const result = BenchResult{
.name = "Scan (full table)",
.ops = ops,
.duration_ns = @intCast(end - start),
};
result.print();
}
================================================================================
FILE: ./src/dynamodb/handler.zig
================================================================================
/// DynamoDB API request handlers
const std = @import("std");
const http = @import("../http.zig");
const storage = @import("storage.zig");
const types = @import("types.zig");
const json = @import("json.zig");
pub const ApiHandler = struct {
engine: *storage.StorageEngine,
allocator: std.mem.Allocator,
const Self = @This();
pub fn init(allocator: std.mem.Allocator, engine: *storage.StorageEngine) Self {
return .{
.engine = engine,
.allocator = allocator,
};
}
pub fn handle(self: *Self, request: *const http.Request) http.Response {
var response = http.Response.init(self.allocator);
// Add standard DynamoDB headers
response.addHeader("Content-Type", "application/x-amz-json-1.0") catch {};
response.addHeader("x-amzn-RequestId", "local-request-id") catch {};
// Get operation from X-Amz-Target header
const target = request.getHeader("X-Amz-Target") orelse {
return self.errorResponse(&response, .ValidationException, "Missing X-Amz-Target header");
};
const operation = types.Operation.fromTarget(target);
switch (operation) {
.CreateTable => self.handleCreateTable(request, &response),
.DeleteTable => self.handleDeleteTable(request, &response),
.DescribeTable => self.handleDescribeTable(request, &response),
.ListTables => self.handleListTables(request, &response),
.PutItem => self.handlePutItem(request, &response),
.GetItem => self.handleGetItem(request, &response),
.DeleteItem => self.handleDeleteItem(request, &response),
.Query => self.handleQuery(request, &response),
.Scan => self.handleScan(request, &response),
.Unknown => {
return self.errorResponse(&response, .ValidationException, "Unknown operation");
},
else => {
return self.errorResponse(&response, .ValidationException, "Operation not implemented");
},
}
return response;
}
fn handleCreateTable(self: *Self, request: *const http.Request, response: *http.Response) void {
// Parse the entire request body
const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, request.body, .{}) catch {
_ = self.errorResponse(response, .ValidationException, "Invalid JSON");
return;
};
defer parsed.deinit();
const root = switch (parsed.value) {
.object => |o| o,
else => {
_ = self.errorResponse(response, .ValidationException, "Request must be an object");
return;
},
};
// Extract TableName
const table_name_val = root.get("TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
return;
};
const table_name = switch (table_name_val) {
.string => |s| s,
else => {
_ = self.errorResponse(response, .ValidationException, "TableName must be a string");
return;
},
};
// For now, use simplified key schema (you can enhance this later to parse from request)
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
const desc = self.engine.createTable(table_name, &key_schema, &attr_defs) catch |err| {
switch (err) {
storage.StorageError.TableAlreadyExists => {
_ = self.errorResponse(response, .ResourceInUseException, "Table already exists");
},
else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to create table");
},
}
return;
};
// Build response
const resp_body = std.fmt.allocPrint(
self.allocator,
"{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"CreationDateTime\":{d}}}}}",
.{ desc.table_name, desc.table_status.toString(), desc.creation_date_time },
) catch {
_ = self.errorResponse(response, .InternalServerError, "Serialization failed");
return;
};
defer self.allocator.free(resp_body);
response.setBody(resp_body) catch {};
}
fn handleDeleteTable(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
return;
};
self.engine.deleteTable(table_name) catch |err| {
switch (err) {
storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
},
else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to delete table");
},
}
return;
};
const resp_body = std.fmt.allocPrint(
self.allocator,
"{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"DELETING\"}}}}",
.{table_name},
) catch return;
defer self.allocator.free(resp_body);
response.setBody(resp_body) catch {};
}
fn handleDescribeTable(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
return;
};
const desc = self.engine.describeTable(table_name) catch |err| {
switch (err) {
storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
},
else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to describe table");
},
}
return;
};
const resp_body = std.fmt.allocPrint(
self.allocator,
"{{\"Table\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"ItemCount\":{d},\"TableSizeBytes\":{d}}}}}",
.{ desc.table_name, desc.table_status.toString(), desc.item_count, desc.table_size_bytes },
) catch return;
defer self.allocator.free(resp_body);
response.setBody(resp_body) catch {};
}
fn handleListTables(self: *Self, request: *const http.Request, response: *http.Response) void {
_ = request;
const tables = self.engine.listTables() catch {
_ = self.errorResponse(response, .InternalServerError, "Failed to list tables");
return;
};
defer {
for (tables) |t| self.allocator.free(t);
self.allocator.free(tables);
}
var buf = std.ArrayList(u8).init(self.allocator);
defer buf.deinit();
const writer = buf.writer();
writer.writeAll("{\"TableNames\":[") catch return;
for (tables, 0..) |table, i| {
if (i > 0) writer.writeByte(',') catch return;
writer.print("\"{s}\"", .{table}) catch return;
}
writer.writeAll("]}") catch return;
response.setBody(buf.items) catch {};
}
fn handlePutItem(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
return;
};
// Extract Item JSON from request
const item_json = extractJsonObject(request.body, "Item") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing or invalid Item");
return;
};
self.engine.putItem(table_name, item_json) catch |err| {
switch (err) {
storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
},
storage.StorageError.MissingKeyAttribute => {
_ = self.errorResponse(response, .ValidationException, "Item missing required key attribute");
},
storage.StorageError.InvalidKey => {
_ = self.errorResponse(response, .ValidationException, "Invalid item format");
},
else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to put item");
},
}
return;
};
response.setBody("{}") catch {};
}
fn handleGetItem(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
return;
};
// Extract Key JSON from request
const key_json = extractJsonObject(request.body, "Key") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing or invalid Key");
return;
};
const item = self.engine.getItem(table_name, key_json) catch |err| {
switch (err) {
storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
},
storage.StorageError.MissingKeyAttribute => {
_ = self.errorResponse(response, .ValidationException, "Key missing required attributes");
},
storage.StorageError.InvalidKey => {
_ = self.errorResponse(response, .ValidationException, "Invalid key format");
},
else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to get item");
},
}
return;
};
if (item) |i| {
defer self.allocator.free(i);
const resp = std.fmt.allocPrint(self.allocator, "{{\"Item\":{s}}}", .{i}) catch return;
defer self.allocator.free(resp);
response.setBody(resp) catch {};
} else {
response.setBody("{}") catch {};
}
}
fn handleDeleteItem(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
return;
};
const key_json = extractJsonObject(request.body, "Key") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing or invalid Key");
return;
};
self.engine.deleteItem(table_name, key_json) catch |err| {
switch (err) {
storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
},
storage.StorageError.MissingKeyAttribute => {
_ = self.errorResponse(response, .ValidationException, "Key missing required attributes");
},
storage.StorageError.InvalidKey => {
_ = self.errorResponse(response, .ValidationException, "Invalid key format");
},
else => {
_ = self.errorResponse(response, .InternalServerError, "Failed to delete item");
},
}
return;
};
response.setBody("{}") catch {};
}
fn handleQuery(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
return;
};
// Simplified: extract partition key value from ExpressionAttributeValues
const pk_value = extractJsonString(request.body, ":pk") orelse "default";
const items = self.engine.query(table_name, pk_value, null) catch |err| {
switch (err) {
storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
},
else => {
_ = self.errorResponse(response, .InternalServerError, "Query failed");
},
}
return;
};
defer {
for (items) |item| self.allocator.free(item);
self.allocator.free(items);
}
self.writeItemsResponse(response, items);
}
fn handleScan(self: *Self, request: *const http.Request, response: *http.Response) void {
const table_name = extractJsonString(request.body, "TableName") orelse {
_ = self.errorResponse(response, .ValidationException, "Missing TableName");
return;
};
const items = self.engine.scan(table_name, null) catch |err| {
switch (err) {
storage.StorageError.TableNotFound => {
_ = self.errorResponse(response, .ResourceNotFoundException, "Table not found");
},
else => {
_ = self.errorResponse(response, .InternalServerError, "Scan failed");
},
}
return;
};
defer {
for (items) |item| self.allocator.free(item);
self.allocator.free(items);
}
self.writeItemsResponse(response, items);
}
fn writeItemsResponse(self: *Self, response: *http.Response, items: []const []const u8) void {
var buf = std.ArrayList(u8).init(self.allocator);
defer buf.deinit();
const writer = buf.writer();
writer.writeAll("{\"Items\":[") catch return;
for (items, 0..) |item, i| {
if (i > 0) writer.writeByte(',') catch return;
writer.writeAll(item) catch return;
}
writer.print("],\"Count\":{d},\"ScannedCount\":{d}}}", .{ items.len, items.len }) catch return;
response.setBody(buf.items) catch {};
}
fn errorResponse(self: *Self, response: *http.Response, err_type: types.DynamoDBErrorType, message: []const u8) http.Response {
response.setStatus(switch (err_type) {
.ResourceNotFoundException => .not_found,
.ResourceInUseException => .conflict,
.ValidationException => .bad_request,
else => .internal_server_error,
});
const body = err_type.toErrorResponse(message, self.allocator) catch return response.*;
response.setBody(body) catch {};
self.allocator.free(body);
return response.*;
}
};
/// Extract a simple string value from JSON
/// This is a temporary helper until we fully migrate to proper JSON parsing
fn extractJsonString(json_data: []const u8, key: []const u8) ?[]const u8 {
var search_buf: [256]u8 = undefined;
const search = std.fmt.bufPrint(&search_buf, "\"{s}\":\"", .{key}) catch return null;
const start = std.mem.indexOf(u8, json_data, search) orelse return null;
const value_start = start + search.len;
const value_end = std.mem.indexOfPos(u8, json_data, value_start, "\"") orelse return null;
return json_data[value_start..value_end];
}
/// Extract a JSON object from request body
/// Returns the slice containing the complete object including braces
fn extractJsonObject(json_data: []const u8, key: []const u8) ?[]const u8 {
var search_buf: [256]u8 = undefined;
const search = std.fmt.bufPrint(&search_buf, "\"{s}\":", .{key}) catch return null;
const key_start = std.mem.indexOf(u8, json_data, search) orelse return null;
const value_start_search = key_start + search.len;
// Skip whitespace
var value_start = value_start_search;
while (value_start < json_data.len and (json_data[value_start] == ' ' or json_data[value_start] == '\t' or json_data[value_start] == '\n')) {
value_start += 1;
}
if (value_start >= json_data.len or json_data[value_start] != '{') {
return null;
}
// Find matching closing brace
var brace_count: i32 = 0;
var pos = value_start;
while (pos < json_data.len) : (pos += 1) {
if (json_data[pos] == '{') {
brace_count += 1;
} else if (json_data[pos] == '}') {
brace_count -= 1;
if (brace_count == 0) {
return json_data[value_start .. pos + 1];
}
}
}
return null;
}
// Global handler for use with http.Server
var global_handler: ?*ApiHandler = null;
pub fn setGlobalHandler(handler: *ApiHandler) void {
global_handler = handler;
}
pub fn httpHandler(request: *const http.Request, allocator: std.mem.Allocator) http.Response {
if (global_handler) |h| {
return h.handle(request);
}
var response = http.Response.init(allocator);
response.setStatus(.internal_server_error);
response.setBody("{\"error\":\"Handler not initialized\"}") catch {};
return response;
}
================================================================================
FILE: ./src/dynamodb/json.zig
================================================================================
/// DynamoDB JSON parsing and serialization
/// Pure functions for converting between DynamoDB JSON format and internal types
const std = @import("std");
const types = @import("types.zig");
// ============================================================================
// Parsing (JSON → Types)
// ============================================================================
/// Parse DynamoDB JSON format into an Item
/// Caller owns returned Item and must call deinitItem() when done
pub fn parseItem(allocator: std.mem.Allocator, json_bytes: []const u8) !types.Item {
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json_bytes, .{});
defer parsed.deinit();
const obj = switch (parsed.value) {
.object => |o| o,
else => return error.InvalidItemFormat,
};
var item = types.Item.init(allocator);
errdefer deinitItem(&item, allocator);
var iter = obj.iterator();
while (iter.next()) |entry| {
const attr_name = try allocator.dupe(u8, entry.key_ptr.*);
errdefer allocator.free(attr_name);
var attr_value = try parseAttributeValue(allocator, entry.value_ptr.*);
errdefer deinitAttributeValue(&attr_value, allocator);
try item.put(attr_name, attr_value);
}
return item;
}
/// Parse a single DynamoDB AttributeValue from JSON
/// Format: {"S": "value"}, {"N": "123"}, {"M": {...}}, etc.
pub fn parseAttributeValue(allocator: std.mem.Allocator, value: std.json.Value) error{ InvalidAttributeFormat, InvalidStringAttribute, InvalidNumberAttribute, InvalidBinaryAttribute, InvalidBoolAttribute, InvalidNullAttribute, InvalidStringSetAttribute, InvalidNumberSetAttribute, InvalidBinarySetAttribute, InvalidListAttribute, InvalidMapAttribute, UnknownAttributeType, OutOfMemory }!types.AttributeValue {
const obj = switch (value) {
.object => |o| o,
else => return error.InvalidAttributeFormat,
};
// DynamoDB attribute must have exactly one key (the type indicator)
if (obj.count() != 1) return error.InvalidAttributeFormat;
var iter = obj.iterator();
const entry = iter.next() orelse return error.InvalidAttributeFormat;
const type_name = entry.key_ptr.*;
const type_value = entry.value_ptr.*;
// String
if (std.mem.eql(u8, type_name, "S")) {
const str = switch (type_value) {
.string => |s| s,
else => return error.InvalidStringAttribute,
};
return types.AttributeValue{ .S = try allocator.dupe(u8, str) };
}
// Number (stored as string)
if (std.mem.eql(u8, type_name, "N")) {
const str = switch (type_value) {
.string => |s| s,
else => return error.InvalidNumberAttribute,
};
return types.AttributeValue{ .N = try allocator.dupe(u8, str) };
}
// Binary (base64 string)
if (std.mem.eql(u8, type_name, "B")) {
const str = switch (type_value) {
.string => |s| s,
else => return error.InvalidBinaryAttribute,
};
return types.AttributeValue{ .B = try allocator.dupe(u8, str) };
}
// Boolean
if (std.mem.eql(u8, type_name, "BOOL")) {
const b = switch (type_value) {
.bool => |b_val| b_val,
else => return error.InvalidBoolAttribute,
};
return types.AttributeValue{ .BOOL = b };
}
// Null
if (std.mem.eql(u8, type_name, "NULL")) {
const n = switch (type_value) {
.bool => |b| b,
else => return error.InvalidNullAttribute,
};
return types.AttributeValue{ .NULL = n };
}
// String Set
if (std.mem.eql(u8, type_name, "SS")) {
const arr = switch (type_value) {
.array => |a| a,
else => return error.InvalidStringSetAttribute,
};
var strings = try allocator.alloc([]const u8, arr.items.len);
errdefer allocator.free(strings);
for (arr.items, 0..) |item, i| {
const str = switch (item) {
.string => |s| s,
else => {
// Cleanup already allocated strings
for (strings[0..i]) |s| allocator.free(s);
return error.InvalidStringSetAttribute;
},
};
strings[i] = try allocator.dupe(u8, str);
}
return types.AttributeValue{ .SS = strings };
}
// Number Set
if (std.mem.eql(u8, type_name, "NS")) {
const arr = switch (type_value) {
.array => |a| a,
else => return error.InvalidNumberSetAttribute,
};
var numbers = try allocator.alloc([]const u8, arr.items.len);
errdefer allocator.free(numbers);
for (arr.items, 0..) |item, i| {
const str = switch (item) {
.string => |s| s,
else => {
for (numbers[0..i]) |n| allocator.free(n);
return error.InvalidNumberSetAttribute;
},
};
numbers[i] = try allocator.dupe(u8, str);
}
return types.AttributeValue{ .NS = numbers };
}
// Binary Set
if (std.mem.eql(u8, type_name, "BS")) {
const arr = switch (type_value) {
.array => |a| a,
else => return error.InvalidBinarySetAttribute,
};
var binaries = try allocator.alloc([]const u8, arr.items.len);
errdefer allocator.free(binaries);
for (arr.items, 0..) |item, i| {
const str = switch (item) {
.string => |s| s,
else => {
for (binaries[0..i]) |b| allocator.free(b);
return error.InvalidBinarySetAttribute;
},
};
binaries[i] = try allocator.dupe(u8, str);
}
return types.AttributeValue{ .BS = binaries };
}
// List (recursive)
if (std.mem.eql(u8, type_name, "L")) {
const arr = switch (type_value) {
.array => |a| a,
else => return error.InvalidListAttribute,
};
var list = try allocator.alloc(types.AttributeValue, arr.items.len);
errdefer {
for (list[0..arr.items.len]) |*item| {
deinitAttributeValue(item, allocator);
}
allocator.free(list);
}
for (arr.items, 0..) |item, i| {
list[i] = try parseAttributeValue(allocator, item);
}
return types.AttributeValue{ .L = list };
}
// Map (recursive)
if (std.mem.eql(u8, type_name, "M")) {
const obj_val = switch (type_value) {
.object => |o| o,
else => return error.InvalidMapAttribute,
};
var map = std.StringHashMap(types.AttributeValue).init(allocator);
errdefer {
var map_iter = map.iterator();
while (map_iter.next()) |map_entry| {
allocator.free(map_entry.key_ptr.*);
deinitAttributeValue(map_entry.value_ptr, allocator);
}
map.deinit();
}
var map_iter = obj_val.iterator();
while (map_iter.next()) |map_entry| {
const key = try allocator.dupe(u8, map_entry.key_ptr.*);
errdefer allocator.free(key);
var val = try parseAttributeValue(allocator, map_entry.value_ptr.*);
errdefer deinitAttributeValue(&val, allocator);
try map.put(key, val);
}
return types.AttributeValue{ .M = map };
}
return error.UnknownAttributeType;
}
// ============================================================================
// Serialization (Types → JSON)
// ============================================================================
/// Serialize an Item to DynamoDB JSON format
/// Caller owns returned slice and must free it
pub fn serializeItem(allocator: std.mem.Allocator, item: types.Item) ![]u8 {
var buf = std.ArrayList(u8).init(allocator);
errdefer buf.deinit();
const writer = buf.writer();
try types.json.serializeItem(writer, item);
return buf.toOwnedSlice();
}
/// Serialize an AttributeValue to DynamoDB JSON format
/// Caller owns returned slice and must free it
pub fn serializeAttributeValue(allocator: std.mem.Allocator, attr: types.AttributeValue) ![]u8 {
var buf = std.ArrayList(u8).init(allocator);
errdefer buf.deinit();
const writer = buf.writer();
try types.json.serializeAttributeValue(writer, attr);
return buf.toOwnedSlice();
}
// ============================================================================
// Storage Helpers
// ============================================================================
/// Extract just the key attributes from an item based on key schema
/// Returns a new Item containing only the key attributes
/// Caller owns returned Item and must call deinitItem() when done
pub fn extractKeyAttributes(
allocator: std.mem.Allocator,
item: types.Item,
key_schema: []const types.KeySchemaElement,
) !types.Item {
var key = types.Item.init(allocator);
errdefer key.deinit();
for (key_schema) |schema_element| {
const attr_value = item.get(schema_element.attribute_name) orelse
return error.MissingKeyAttribute;
const attr_name = try allocator.dupe(u8, schema_element.attribute_name);
errdefer allocator.free(attr_name);
// Note: Putting a copy of the pointer, not deep copying the value
try key.put(attr_name, attr_value);
}
return key;
}
/// Build a RocksDB storage key from table name and key attributes
/// Format: _data:{table}:{pk} or _data:{table}:{pk}:{sk}
/// Caller owns returned slice and must free it
pub fn buildRocksDBKey(
allocator: std.mem.Allocator,
table_name: []const u8,
key_schema: []const types.KeySchemaElement,
key: types.Item,
) ![]u8 {
const KeyPrefix = struct {
const data = "_data:";
};
// Find partition key and sort key
var pk_value: ?[]const u8 = null;
var sk_value: ?[]const u8 = null;
for (key_schema) |schema_element| {
const attr = key.get(schema_element.attribute_name) orelse
return error.MissingKeyAttribute;
// Extract string value from attribute
// DynamoDB keys must be S (string), N (number), or B (binary)
const value = switch (attr) {
.S => |s| s,
.N => |n| n,
.B => |b| b,
else => return error.InvalidKeyType,
};
switch (schema_element.key_type) {
.HASH => pk_value = value,
.RANGE => sk_value = value,
}
}
const pk = pk_value orelse return error.MissingPartitionKey;
if (sk_value) |sk| {
return std.fmt.allocPrint(
allocator,
"{s}{s}:{s}:{s}",
.{ KeyPrefix.data, table_name, pk, sk },
);
} else {
return std.fmt.allocPrint(
allocator,
"{s}{s}:{s}",
.{ KeyPrefix.data, table_name, pk },
);
}
}
/// Helper to extract key from item JSON without full parsing
/// This is a faster path when you just need the storage key
pub fn buildRocksDBKeyFromJson(
allocator: std.mem.Allocator,
table_name: []const u8,
key_schema: []const types.KeySchemaElement,
item_json: []const u8,
) ![]u8 {
const item = try parseItem(allocator, item_json);
defer deinitItem(&item, allocator);
return buildRocksDBKey(allocator, table_name, key_schema, item);
}
// ============================================================================
// Memory Management
// ============================================================================
/// Free all memory associated with an AttributeValue
/// Recursively frees nested structures (Maps, Lists)
pub fn deinitAttributeValue(attr: *types.AttributeValue, allocator: std.mem.Allocator) void {
switch (attr.*) {
.S, .N, .B => |slice| allocator.free(slice),
.SS, .NS, .BS => |slices| {
for (slices) |s| allocator.free(s);
allocator.free(slices);
},
.M => |*map| {
var iter = map.iterator();
while (iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
deinitAttributeValue(entry.value_ptr, allocator);
}
map.deinit();
},
.L => |list| {
for (list) |*item| {
deinitAttributeValue(item, allocator);
}
allocator.free(list);
},
.NULL, .BOOL => {},
}
}
/// Free all memory associated with an Item
pub fn deinitItem(item: *types.Item, allocator: std.mem.Allocator) void {
var iter = item.iterator();
while (iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
deinitAttributeValue(entry.value_ptr, allocator);
}
item.deinit();
}
// ============================================================================
// Tests
// ============================================================================
test "parse simple string attribute" {
const allocator = std.testing.allocator;
const json_str = "{\"S\":\"hello world\"}";
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json_str, .{});
defer parsed.deinit();
var attr = try parseAttributeValue(allocator, parsed.value);
defer deinitAttributeValue(&attr, allocator);
try std.testing.expectEqualStrings("hello world", attr.S);
}
test "parse simple item" {
const allocator = std.testing.allocator;
const json_str =
\\{"pk":{"S":"user123"},"name":{"S":"Alice"},"age":{"N":"25"}}
;
var item = try parseItem(allocator, json_str);
defer deinitItem(&item, allocator);
try std.testing.expectEqual(@as(usize, 3), item.count());
const pk = item.get("pk").?;
try std.testing.expectEqualStrings("user123", pk.S);
const name = item.get("name").?;
try std.testing.expectEqualStrings("Alice", name.S);
const age = item.get("age").?;
try std.testing.expectEqualStrings("25", age.N);
}
test "parse nested map" {
const allocator = std.testing.allocator;
const json_str =
\\{"data":{"M":{"key1":{"S":"value1"},"key2":{"N":"42"}}}}
;
var item = try parseItem(allocator, json_str);
defer deinitItem(&item, allocator);
const data = item.get("data").?;
const inner = data.M.get("key1").?;
try std.testing.expectEqualStrings("value1", inner.S);
}
test "serialize item round-trip" {
const allocator = std.testing.allocator;
const original =
\\{"pk":{"S":"test"},"num":{"N":"123"}}
;
var item = try parseItem(allocator, original);
defer deinitItem(&item, allocator);
const serialized = try serializeItem(allocator, item);
defer allocator.free(serialized);
// Parse again to verify
var item2 = try parseItem(allocator, serialized);
defer deinitItem(&item2, allocator);
try std.testing.expectEqual(@as(usize, 2), item2.count());
}
test "build rocksdb key with partition key only" {
const allocator = std.testing.allocator;
const item_json = "{\"pk\":{\"S\":\"user123\"},\"data\":{\"S\":\"test\"}}";
var item = try parseItem(allocator, item_json);
defer deinitItem(&item, allocator);
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const key = try buildRocksDBKey(allocator, "Users", &key_schema, item);
defer allocator.free(key);
try std.testing.expectEqualStrings("_data:Users:user123", key);
}
test "build rocksdb key with partition and sort keys" {
const allocator = std.testing.allocator;
const item_json = "{\"pk\":{\"S\":\"user123\"},\"sk\":{\"S\":\"metadata\"}}";
var item = try parseItem(allocator, item_json);
defer deinitItem(&item, allocator);
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
.{ .attribute_name = "sk", .key_type = .RANGE },
};
const key = try buildRocksDBKey(allocator, "Items", &key_schema, item);
defer allocator.free(key);
try std.testing.expectEqualStrings("_data:Items:user123:metadata", key);
}
================================================================================
FILE: ./src/dynamodb/storage.zig
================================================================================
/// Storage engine mapping DynamoDB operations to RocksDB
const std = @import("std");
const rocksdb = @import("../rocksdb.zig");
const types = @import("types.zig");
const json = @import("json.zig");
pub const StorageError = error{
TableNotFound,
TableAlreadyExists,
ItemNotFound,
InvalidKey,
MissingKeyAttribute,
SerializationError,
RocksDBError,
OutOfMemory,
};
/// Key prefixes for different data types in RocksDB
const KeyPrefix = struct {
/// Table metadata: _meta:{table_name}
const meta = "_meta:";
/// Item data: _data:{table_name}:{partition_key}[:{sort_key}]
const data = "_data:";
/// Global secondary index: _gsi:{table_name}:{index_name}:{pk}:{sk}
const gsi = "_gsi:";
/// Local secondary index: _lsi:{table_name}:{index_name}:{pk}:{sk}
const lsi = "_lsi:";
};
/// In-memory representation of table metadata
const TableMetadata = struct {
table_name: []const u8,
key_schema: []types.KeySchemaElement,
attribute_definitions: []types.AttributeDefinition,
table_status: types.TableStatus,
creation_date_time: i64,
pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void {
allocator.free(self.table_name);
for (self.key_schema) |ks| {
allocator.free(ks.attribute_name);
}
allocator.free(self.key_schema);
for (self.attribute_definitions) |ad| {
allocator.free(ad.attribute_name);
}
allocator.free(self.attribute_definitions);
}
};
pub const StorageEngine = struct {
db: rocksdb.DB,
allocator: std.mem.Allocator,
const Self = @This();
pub fn init(allocator: std.mem.Allocator, data_dir: [*:0]const u8) !Self {
const db = rocksdb.DB.open(data_dir, true) catch return StorageError.RocksDBError;
return Self{
.db = db,
.allocator = allocator,
};
}
pub fn deinit(self: *Self) void {
self.db.close();
}
// === Table Operations ===
pub fn createTable(
self: *Self,
table_name: []const u8,
key_schema: []const types.KeySchemaElement,
attribute_definitions: []const types.AttributeDefinition,
) StorageError!types.TableDescription {
// Check if table already exists
const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key);
const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (existing) |e| {
self.allocator.free(e);
return StorageError.TableAlreadyExists;
}
// Create table metadata
const now = std.time.timestamp();
const metadata = TableMetadata{
.table_name = table_name,
.key_schema = key_schema,
.attribute_definitions = attribute_definitions,
.table_status = .ACTIVE,
.creation_date_time = now,
};
// Serialize and store
const meta_value = try self.serializeTableMetadata(metadata);
defer self.allocator.free(meta_value);
self.db.put(meta_key, meta_value) catch return StorageError.RocksDBError;
return types.TableDescription{
.table_name = table_name,
.key_schema = key_schema,
.attribute_definitions = attribute_definitions,
.table_status = .ACTIVE,
.creation_date_time = now,
.item_count = 0,
.table_size_bytes = 0,
};
}
pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void {
const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key);
// Verify table exists
const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (existing == null) return StorageError.TableNotFound;
self.allocator.free(existing.?);
// Delete all items with this table's prefix
const data_prefix = try self.buildDataPrefix(table_name);
defer self.allocator.free(data_prefix);
var batch = rocksdb.WriteBatch.init() orelse return StorageError.RocksDBError;
defer batch.deinit();
// Scan and delete all matching keys
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
iter.seek(data_prefix);
while (iter.valid()) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, data_prefix)) break;
batch.delete(key);
iter.next();
}
// Delete metadata
batch.delete(meta_key);
batch.write(&self.db) catch return StorageError.RocksDBError;
}
pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription {
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Count items (expensive, but matches DynamoDB behavior)
const data_prefix = try self.buildDataPrefix(table_name);
defer self.allocator.free(data_prefix);
var item_count: u64 = 0;
var total_size: u64 = 0;
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
iter.seek(data_prefix);
while (iter.valid()) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, data_prefix)) break;
const value = iter.value() orelse break;
item_count += 1;
total_size += value.len;
iter.next();
}
return types.TableDescription{
.table_name = metadata.table_name,
.key_schema = metadata.key_schema,
.attribute_definitions = metadata.attribute_definitions,
.table_status = metadata.table_status,
.creation_date_time = metadata.creation_date_time,
.item_count = item_count,
.table_size_bytes = total_size,
};
}
pub fn listTables(self: *Self) StorageError![][]const u8 {
var tables = std.ArrayList([]const u8).init(self.allocator);
errdefer {
for (tables.items) |t| self.allocator.free(t);
tables.deinit();
}
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
iter.seek(KeyPrefix.meta);
while (iter.valid()) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, KeyPrefix.meta)) break;
const table_name = key[KeyPrefix.meta.len..];
const owned_name = self.allocator.dupe(u8, table_name) catch return StorageError.OutOfMemory;
tables.append(owned_name) catch return StorageError.OutOfMemory;
iter.next();
}
return tables.toOwnedSlice() catch return StorageError.OutOfMemory;
}
// === Item Operations ===
pub fn putItem(self: *Self, table_name: []const u8, item_json: []const u8) StorageError!void {
// Get table metadata to retrieve key schema
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Parse the item to validate it
var item = json.parseItem(self.allocator, item_json) catch return StorageError.InvalidKey;
defer json.deinitItem(&item, self.allocator);
// Validate that item contains all required key attributes
for (metadata.key_schema) |key_elem| {
if (!item.contains(key_elem.attribute_name)) {
return StorageError.MissingKeyAttribute;
}
}
// Build storage key using the parsed item and actual key schema
const storage_key = json.buildRocksDBKey(
self.allocator,
table_name,
metadata.key_schema,
item,
) catch return StorageError.InvalidKey;
defer self.allocator.free(storage_key);
// Store the original JSON (for now - later we can optimize to binary)
self.db.put(storage_key, item_json) catch return StorageError.RocksDBError;
}
pub fn getItem(self: *Self, table_name: []const u8, key_json: []const u8) StorageError!?[]u8 {
// Get table metadata
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Parse the key
var key = json.parseItem(self.allocator, key_json) catch return StorageError.InvalidKey;
defer json.deinitItem(&key, self.allocator);
// Validate key has all required attributes
for (metadata.key_schema) |key_elem| {
if (!key.contains(key_elem.attribute_name)) {
return StorageError.MissingKeyAttribute;
}
}
// Build storage key
const storage_key = json.buildRocksDBKey(
self.allocator,
table_name,
metadata.key_schema,
key,
) catch return StorageError.InvalidKey;
defer self.allocator.free(storage_key);
return self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError;
}
pub fn deleteItem(self: *Self, table_name: []const u8, key_json: []const u8) StorageError!void {
// Get table metadata
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Parse the key
var key = json.parseItem(self.allocator, key_json) catch return StorageError.InvalidKey;
defer json.deinitItem(&key, self.allocator);
// Validate key
for (metadata.key_schema) |key_elem| {
if (!key.contains(key_elem.attribute_name)) {
return StorageError.MissingKeyAttribute;
}
}
// Build storage key
const storage_key = json.buildRocksDBKey(
self.allocator,
table_name,
metadata.key_schema,
key,
) catch return StorageError.InvalidKey;
defer self.allocator.free(storage_key);
self.db.delete(storage_key) catch return StorageError.RocksDBError;
}
pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![][]const u8 {
// Verify table exists
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
const data_prefix = try self.buildDataPrefix(table_name);
defer self.allocator.free(data_prefix);
var items = std.ArrayList([]const u8).init(self.allocator);
errdefer {
for (items.items) |item| self.allocator.free(item);
items.deinit();
}
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
var count: usize = 0;
const max_items = limit orelse std.math.maxInt(usize);
iter.seek(data_prefix);
while (iter.valid() and count < max_items) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, data_prefix)) break;
const value = iter.value() orelse break;
const owned_value = self.allocator.dupe(u8, value) catch return StorageError.OutOfMemory;
items.append(owned_value) catch return StorageError.OutOfMemory;
count += 1;
iter.next();
}
return items.toOwnedSlice() catch return StorageError.OutOfMemory;
}
pub fn query(self: *Self, table_name: []const u8, partition_key_value: []const u8, limit: ?usize) StorageError![][]const u8 {
// Verify table exists
var metadata = try self.getTableMetadata(table_name);
defer metadata.deinit(self.allocator);
// Build prefix for this partition
const prefix = try self.buildPartitionPrefix(table_name, partition_key_value);
defer self.allocator.free(prefix);
var items = std.ArrayList([]const u8).init(self.allocator);
errdefer {
for (items.items) |item| self.allocator.free(item);
items.deinit();
}
var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError;
defer iter.deinit();
var count: usize = 0;
const max_items = limit orelse std.math.maxInt(usize);
iter.seek(prefix);
while (iter.valid() and count < max_items) {
const key = iter.key() orelse break;
if (!std.mem.startsWith(u8, key, prefix)) break;
const value = iter.value() orelse break;
const owned_value = self.allocator.dupe(u8, value) catch return StorageError.OutOfMemory;
items.append(owned_value) catch return StorageError.OutOfMemory;
count += 1;
iter.next();
}
return items.toOwnedSlice() catch return StorageError.OutOfMemory;
}
// === Internal Helpers ===
fn getTableMetadata(self: *Self, table_name: []const u8) StorageError!TableMetadata {
const meta_key = try self.buildMetaKey(table_name);
defer self.allocator.free(meta_key);
const meta_value = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError;
if (meta_value == null) return StorageError.TableNotFound;
defer self.allocator.free(meta_value.?);
return self.deserializeTableMetadata(meta_value.?);
}
fn buildMetaKey(self: *Self, table_name: []const u8) StorageError![]u8 {
return std.fmt.allocPrint(self.allocator, "{s}{s}", .{ KeyPrefix.meta, table_name }) catch return StorageError.OutOfMemory;
}
fn buildDataPrefix(self: *Self, table_name: []const u8) StorageError![]u8 {
return std.fmt.allocPrint(self.allocator, "{s}{s}:", .{ KeyPrefix.data, table_name }) catch return StorageError.OutOfMemory;
}
fn buildPartitionPrefix(self: *Self, table_name: []const u8, partition_key: []const u8) StorageError![]u8 {
return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}", .{ KeyPrefix.data, table_name, partition_key }) catch return StorageError.OutOfMemory;
}
// === Serialization ===
fn serializeTableMetadata(self: *Self, metadata: TableMetadata) StorageError![]u8 {
var buf = std.ArrayList(u8).init(self.allocator);
errdefer buf.deinit();
const writer = buf.writer();
writer.writeAll("{\"TableName\":\"") catch return StorageError.SerializationError;
writer.writeAll(metadata.table_name) catch return StorageError.SerializationError;
writer.writeAll("\",\"TableStatus\":\"") catch return StorageError.SerializationError;
writer.writeAll(metadata.table_status.toString()) catch return StorageError.SerializationError;
writer.print("\",\"CreationDateTime\":{d},\"KeySchema\":[", .{metadata.creation_date_time}) catch return StorageError.SerializationError;
for (metadata.key_schema, 0..) |ks, i| {
if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError;
writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError;
writer.writeAll(ks.attribute_name) catch return StorageError.SerializationError;
writer.writeAll("\",\"KeyType\":\"") catch return StorageError.SerializationError;
writer.writeAll(ks.key_type.toString()) catch return StorageError.SerializationError;
writer.writeAll("\"}") catch return StorageError.SerializationError;
}
writer.writeAll("],\"AttributeDefinitions\":[") catch return StorageError.SerializationError;
for (metadata.attribute_definitions, 0..) |ad, i| {
if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError;
writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError;
writer.writeAll(ad.attribute_name) catch return StorageError.SerializationError;
writer.writeAll("\",\"AttributeType\":\"") catch return StorageError.SerializationError;
writer.writeAll(ad.attribute_type.toString()) catch return StorageError.SerializationError;
writer.writeAll("\"}") catch return StorageError.SerializationError;
}
writer.writeAll("]}") catch return StorageError.SerializationError;
return buf.toOwnedSlice() catch return StorageError.OutOfMemory;
}
fn deserializeTableMetadata(self: *Self, data: []const u8) StorageError!TableMetadata {
const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, data, .{}) catch return StorageError.SerializationError;
defer parsed.deinit();
const root = switch (parsed.value) {
.object => |o| o,
else => return StorageError.SerializationError,
};
// Extract table name
const table_name_val = root.get("TableName") orelse return StorageError.SerializationError;
const table_name_str = switch (table_name_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const table_name = self.allocator.dupe(u8, table_name_str) catch return StorageError.OutOfMemory;
errdefer self.allocator.free(table_name);
// Extract table status
const status_val = root.get("TableStatus") orelse return StorageError.SerializationError;
const status_str = switch (status_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const table_status: types.TableStatus = if (std.mem.eql(u8, status_str, "ACTIVE"))
.ACTIVE
else if (std.mem.eql(u8, status_str, "CREATING"))
.CREATING
else if (std.mem.eql(u8, status_str, "DELETING"))
.DELETING
else
.ACTIVE;
// Extract creation time
const creation_val = root.get("CreationDateTime") orelse return StorageError.SerializationError;
const creation_date_time = switch (creation_val) {
.integer => |i| i,
else => return StorageError.SerializationError,
};
// Extract key schema
const key_schema_val = root.get("KeySchema") orelse return StorageError.SerializationError;
const key_schema_array = switch (key_schema_val) {
.array => |a| a,
else => return StorageError.SerializationError,
};
var key_schema = std.ArrayList(types.KeySchemaElement).init(self.allocator);
errdefer {
for (key_schema.items) |ks| self.allocator.free(ks.attribute_name);
key_schema.deinit();
}
for (key_schema_array.items) |item| {
const obj = switch (item) {
.object => |o| o,
else => return StorageError.SerializationError,
};
const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError;
const attr_name_str = switch (attr_name_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const attr_name = self.allocator.dupe(u8, attr_name_str) catch return StorageError.OutOfMemory;
errdefer self.allocator.free(attr_name);
const key_type_val = obj.get("KeyType") orelse return StorageError.SerializationError;
const key_type_str = switch (key_type_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const key_type = types.KeyType.fromString(key_type_str) orelse return StorageError.SerializationError;
key_schema.append(.{
.attribute_name = attr_name,
.key_type = key_type,
}) catch return StorageError.OutOfMemory;
}
// Extract attribute definitions
const attr_defs_val = root.get("AttributeDefinitions") orelse return StorageError.SerializationError;
const attr_defs_array = switch (attr_defs_val) {
.array => |a| a,
else => return StorageError.SerializationError,
};
var attr_defs = std.ArrayList(types.AttributeDefinition).init(self.allocator);
errdefer {
for (attr_defs.items) |ad| self.allocator.free(ad.attribute_name);
attr_defs.deinit();
}
for (attr_defs_array.items) |item| {
const obj = switch (item) {
.object => |o| o,
else => return StorageError.SerializationError,
};
const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError;
const attr_name_str = switch (attr_name_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const attr_name = self.allocator.dupe(u8, attr_name_str) catch return StorageError.OutOfMemory;
errdefer self.allocator.free(attr_name);
const attr_type_val = obj.get("AttributeType") orelse return StorageError.SerializationError;
const attr_type_str = switch (attr_type_val) {
.string => |s| s,
else => return StorageError.SerializationError,
};
const attr_type = types.ScalarAttributeType.fromString(attr_type_str) orelse return StorageError.SerializationError;
attr_defs.append(.{
.attribute_name = attr_name,
.attribute_type = attr_type,
}) catch return StorageError.OutOfMemory;
}
return TableMetadata{
.table_name = table_name,
.key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory,
.attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory,
.table_status = table_status,
.creation_date_time = creation_date_time,
};
}
};
test "storage basic operations" {
const allocator = std.testing.allocator;
const path = "/tmp/test_storage";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try StorageEngine.init(allocator, path);
defer engine.deinit();
// Create table
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
_ = try engine.createTable("TestTable", &key_schema, &attr_defs);
// List tables
const tables = try engine.listTables();
defer {
for (tables) |t| allocator.free(t);
allocator.free(tables);
}
try std.testing.expectEqual(@as(usize, 1), tables.len);
try std.testing.expectEqualStrings("TestTable", tables[0]);
// Delete table
try engine.deleteTable("TestTable");
// Verify deleted
const tables2 = try engine.listTables();
defer allocator.free(tables2);
try std.testing.expectEqual(@as(usize, 0), tables2.len);
}
test "putItem validates key presence" {
const allocator = std.testing.allocator;
const path = "/tmp/test_storage_validate";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "userId", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "userId", .attribute_type = .S },
};
_ = try engine.createTable("Users", &key_schema, &attr_defs);
// This should fail - missing userId
const bad_item = "{\"name\":{\"S\":\"Alice\"}}";
const result = engine.putItem("Users", bad_item);
try std.testing.expectError(StorageError.MissingKeyAttribute, result);
// This should succeed
const good_item = "{\"userId\":{\"S\":\"user123\"},\"name\":{\"S\":\"Alice\"}}";
try engine.putItem("Users", good_item);
}
================================================================================
FILE: ./src/dynamodb/types.zig
================================================================================
/// DynamoDB protocol types and serialization
const std = @import("std");
/// DynamoDB AttributeValue - the core data type
pub const AttributeValue = union(enum) {
S: []const u8, // String
N: []const u8, // Number (stored as string)
B: []const u8, // Binary (base64)
SS: []const []const u8, // String Set
NS: []const []const u8, // Number Set
BS: []const []const u8, // Binary Set
M: std.StringHashMap(AttributeValue), // Map
L: []const AttributeValue, // List
NULL: bool, // Null
BOOL: bool, // Boolean
};
pub const Item = std.StringHashMap(AttributeValue);
pub const KeyType = enum {
HASH,
RANGE,
pub fn toString(self: KeyType) []const u8 {
return switch (self) {
.HASH => "HASH",
.RANGE => "RANGE",
};
}
pub fn fromString(s: []const u8) ?KeyType {
if (std.mem.eql(u8, s, "HASH")) return .HASH;
if (std.mem.eql(u8, s, "RANGE")) return .RANGE;
return null;
}
};
pub const ScalarAttributeType = enum {
S,
N,
B,
pub fn toString(self: ScalarAttributeType) []const u8 {
return switch (self) {
.S => "S",
.N => "N",
.B => "B",
};
}
pub fn fromString(s: []const u8) ?ScalarAttributeType {
if (std.mem.eql(u8, s, "S")) return .S;
if (std.mem.eql(u8, s, "N")) return .N;
if (std.mem.eql(u8, s, "B")) return .B;
return null;
}
};
pub const KeySchemaElement = struct {
attribute_name: []const u8,
key_type: KeyType,
};
pub const AttributeDefinition = struct {
attribute_name: []const u8,
attribute_type: ScalarAttributeType,
};
pub const TableStatus = enum {
CREATING,
UPDATING,
DELETING,
ACTIVE,
INACCESSIBLE_ENCRYPTION_CREDENTIALS,
ARCHIVING,
ARCHIVED,
pub fn toString(self: TableStatus) []const u8 {
return switch (self) {
.CREATING => "CREATING",
.UPDATING => "UPDATING",
.DELETING => "DELETING",
.ACTIVE => "ACTIVE",
.INACCESSIBLE_ENCRYPTION_CREDENTIALS => "INACCESSIBLE_ENCRYPTION_CREDENTIALS",
.ARCHIVING => "ARCHIVING",
.ARCHIVED => "ARCHIVED",
};
}
};
pub const TableDescription = struct {
table_name: []const u8,
key_schema: []const KeySchemaElement,
attribute_definitions: []const AttributeDefinition,
table_status: TableStatus,
creation_date_time: i64,
item_count: u64,
table_size_bytes: u64,
};
/// DynamoDB operation types parsed from X-Amz-Target header
pub const Operation = enum {
CreateTable,
DeleteTable,
DescribeTable,
ListTables,
UpdateTable,
PutItem,
GetItem,
DeleteItem,
UpdateItem,
Query,
Scan,
BatchGetItem,
BatchWriteItem,
TransactGetItems,
TransactWriteItems,
Unknown,
pub fn fromTarget(target: []const u8) Operation {
// Format: DynamoDB_20120810.OperationName
const prefix = "DynamoDB_20120810.";
if (!std.mem.startsWith(u8, target, prefix)) return .Unknown;
const op_name = target[prefix.len..];
const map = std.StaticStringMap(Operation).initComptime(.{
.{ "CreateTable", .CreateTable },
.{ "DeleteTable", .DeleteTable },
.{ "DescribeTable", .DescribeTable },
.{ "ListTables", .ListTables },
.{ "UpdateTable", .UpdateTable },
.{ "PutItem", .PutItem },
.{ "GetItem", .GetItem },
.{ "DeleteItem", .DeleteItem },
.{ "UpdateItem", .UpdateItem },
.{ "Query", .Query },
.{ "Scan", .Scan },
.{ "BatchGetItem", .BatchGetItem },
.{ "BatchWriteItem", .BatchWriteItem },
.{ "TransactGetItems", .TransactGetItems },
.{ "TransactWriteItems", .TransactWriteItems },
});
return map.get(op_name) orelse .Unknown;
}
};
/// DynamoDB error types
pub const DynamoDBErrorType = enum {
ValidationException,
ResourceNotFoundException,
ResourceInUseException,
ConditionalCheckFailedException,
ProvisionedThroughputExceededException,
ItemCollectionSizeLimitExceededException,
InternalServerError,
SerializationException,
pub fn toErrorResponse(self: DynamoDBErrorType, message: []const u8, allocator: std.mem.Allocator) ![]u8 {
const type_str = switch (self) {
.ValidationException => "com.amazonaws.dynamodb.v20120810#ValidationException",
.ResourceNotFoundException => "com.amazonaws.dynamodb.v20120810#ResourceNotFoundException",
.ResourceInUseException => "com.amazonaws.dynamodb.v20120810#ResourceInUseException",
.ConditionalCheckFailedException => "com.amazonaws.dynamodb.v20120810#ConditionalCheckFailedException",
.ProvisionedThroughputExceededException => "com.amazonaws.dynamodb.v20120810#ProvisionedThroughputExceededException",
.ItemCollectionSizeLimitExceededException => "com.amazonaws.dynamodb.v20120810#ItemCollectionSizeLimitExceededException",
.InternalServerError => "com.amazonaws.dynamodb.v20120810#InternalServerError",
.SerializationException => "com.amazonaws.dynamodb.v20120810#SerializationException",
};
return std.fmt.allocPrint(allocator, "{{\"__type\":\"{s}\",\"message\":\"{s}\"}}", .{ type_str, message });
}
};
// JSON serialization helpers for DynamoDB format
pub const json = struct {
/// Serialize AttributeValue to DynamoDB JSON format
pub fn serializeAttributeValue(writer: anytype, value: AttributeValue) !void {
switch (value) {
.S => |s| try writer.print("{{\"S\":\"{s}\"}}", .{s}),
.N => |n| try writer.print("{{\"N\":\"{s}\"}}", .{n}),
.B => |b| try writer.print("{{\"B\":\"{s}\"}}", .{b}),
.BOOL => |b| try writer.print("{{\"BOOL\":{}}}", .{b}),
.NULL => try writer.writeAll("{\"NULL\":true}"),
.SS => |ss| {
try writer.writeAll("{\"SS\":[");
for (ss, 0..) |s, i| {
if (i > 0) try writer.writeByte(',');
try writer.print("\"{s}\"", .{s});
}
try writer.writeAll("]}");
},
.NS => |ns| {
try writer.writeAll("{\"NS\":[");
for (ns, 0..) |n, i| {
if (i > 0) try writer.writeByte(',');
try writer.print("\"{s}\"", .{n});
}
try writer.writeAll("]}");
},
.BS => |bs| {
try writer.writeAll("{\"BS\":[");
for (bs, 0..) |b, i| {
if (i > 0) try writer.writeByte(',');
try writer.print("\"{s}\"", .{b});
}
try writer.writeAll("]}");
},
.L => |list| {
try writer.writeAll("{\"L\":[");
for (list, 0..) |item, i| {
if (i > 0) try writer.writeByte(',');
try serializeAttributeValue(writer, item);
}
try writer.writeAll("]}");
},
.M => |map| {
try writer.writeAll("{\"M\":{");
var first = true;
var iter = map.iterator();
while (iter.next()) |entry| {
if (!first) try writer.writeByte(',');
first = false;
try writer.print("\"{s}\":", .{entry.key_ptr.*});
try serializeAttributeValue(writer, entry.value_ptr.*);
}
try writer.writeAll("}}");
},
}
}
/// Serialize an Item (map of attribute name to AttributeValue)
pub fn serializeItem(writer: anytype, item: Item) !void {
try writer.writeByte('{');
var first = true;
var iter = item.iterator();
while (iter.next()) |entry| {
if (!first) try writer.writeByte(',');
first = false;
try writer.print("\"{s}\":", .{entry.key_ptr.*});
try serializeAttributeValue(writer, entry.value_ptr.*);
}
try writer.writeByte('}');
}
};
test "operation from target" {
try std.testing.expectEqual(Operation.CreateTable, Operation.fromTarget("DynamoDB_20120810.CreateTable"));
try std.testing.expectEqual(Operation.PutItem, Operation.fromTarget("DynamoDB_20120810.PutItem"));
try std.testing.expectEqual(Operation.Unknown, Operation.fromTarget("Invalid"));
}
================================================================================
FILE: ./src/http.zig
================================================================================
/// Simple HTTP server for DynamoDB API
const std = @import("std");
const net = std.net;
const mem = std.mem;
pub const Method = enum {
GET,
POST,
PUT,
DELETE,
OPTIONS,
HEAD,
PATCH,
pub fn fromString(s: []const u8) ?Method {
const map = std.StaticStringMap(Method).initComptime(.{
.{ "GET", .GET },
.{ "POST", .POST },
.{ "PUT", .PUT },
.{ "DELETE", .DELETE },
.{ "OPTIONS", .OPTIONS },
.{ "HEAD", .HEAD },
.{ "PATCH", .PATCH },
});
return map.get(s);
}
};
pub const StatusCode = enum(u16) {
ok = 200,
created = 201,
no_content = 204,
bad_request = 400,
unauthorized = 401,
forbidden = 403,
not_found = 404,
method_not_allowed = 405,
conflict = 409,
internal_server_error = 500,
service_unavailable = 503,
pub fn phrase(self: StatusCode) []const u8 {
return switch (self) {
.ok => "OK",
.created => "Created",
.no_content => "No Content",
.bad_request => "Bad Request",
.unauthorized => "Unauthorized",
.forbidden => "Forbidden",
.not_found => "Not Found",
.method_not_allowed => "Method Not Allowed",
.conflict => "Conflict",
.internal_server_error => "Internal Server Error",
.service_unavailable => "Service Unavailable",
};
}
};
pub const Header = struct {
name: []const u8,
value: []const u8,
};
pub const Request = struct {
method: Method,
path: []const u8,
headers: []const Header,
body: []const u8,
raw_data: []const u8,
pub fn getHeader(self: *const Request, name: []const u8) ?[]const u8 {
for (self.headers) |h| {
if (std.ascii.eqlIgnoreCase(h.name, name)) {
return h.value;
}
}
return null;
}
};
pub const Response = struct {
status: StatusCode,
headers: std.ArrayList(Header),
body: std.ArrayList(u8),
allocator: mem.Allocator,
pub fn init(allocator: mem.Allocator) Response {
return .{
.status = .ok,
.headers = std.ArrayList(Header){},
.body = std.ArrayList(u8){},
.allocator = allocator,
};
}
pub fn deinit(self: *Response) void {
self.headers.deinit(self.allocator);
self.body.deinit(self.allocator);
}
pub fn setStatus(self: *Response, status: StatusCode) void {
self.status = status;
}
pub fn addHeader(self: *Response, name: []const u8, value: []const u8) !void {
try self.headers.append(self.allocator, .{ .name = name, .value = value });
}
pub fn setBody(self: *Response, data: []const u8) !void {
self.body.clearRetainingCapacity();
try self.body.appendSlice(self.allocator, data);
}
pub fn appendBody(self: *Response, data: []const u8) !void {
try self.body.appendSlice(self.allocator, data);
}
pub fn serialize(self: *Response, allocator: mem.Allocator) ![]u8 {
var buf = std.ArrayList(u8){};
errdefer buf.deinit(allocator);
const writer = buf.writer(allocator);
// Status line
try writer.print("HTTP/1.1 {d} {s}\r\n", .{ @intFromEnum(self.status), self.status.phrase() });
// Content-Length header
try writer.print("Content-Length: {d}\r\n", .{self.body.items.len});
// Custom headers
for (self.headers.items) |h| {
try writer.print("{s}: {s}\r\n", .{ h.name, h.value });
}
// End of headers
try writer.writeAll("\r\n");
// Body
try writer.writeAll(self.body.items);
return buf.toOwnedSlice(allocator);
}
};
pub const RequestHandler = *const fn (*const Request, mem.Allocator) Response;
pub const Server = struct {
allocator: mem.Allocator,
address: net.Address,
handler: RequestHandler,
running: std.atomic.Value(bool),
listener: ?net.Server,
const Self = @This();
pub fn init(allocator: mem.Allocator, host: []const u8, port: u16, handler: RequestHandler) !Self {
const address = try net.Address.parseIp(host, port);
return Self{
.allocator = allocator,
.address = address,
.handler = handler,
.running = std.atomic.Value(bool).init(false),
.listener = null,
};
}
pub fn start(self: *Self) !void {
self.listener = try self.address.listen(.{
.reuse_address = true,
});
self.running.store(true, .release);
std.log.info("Server listening on {any}", .{self.address});
while (self.running.load(.acquire)) {
const conn = self.listener.?.accept() catch |err| {
if (err == error.SocketNotListening) break;
std.log.err("Accept error: {any}", .{err});
continue;
};
// Spawn thread for each connection
const thread = std.Thread.spawn(.{}, handleConnection, .{ self, conn }) catch |err| {
std.log.err("Thread spawn error: {any}", .{err});
conn.stream.close();
continue;
};
thread.detach();
}
}
fn handleConnection(self: *Self, conn: net.Server.Connection) void {
defer conn.stream.close();
var buf: [65536]u8 = undefined;
var total_read: usize = 0;
// Read request
while (total_read < buf.len) {
const n = conn.stream.read(buf[total_read..]) catch |err| {
std.log.err("Read error: {any}", .{err});
return;
};
if (n == 0) break;
total_read += n;
// Check if we have complete headers
if (mem.indexOf(u8, buf[0..total_read], "\r\n\r\n")) |header_end| {
// Parse Content-Length if present
const headers = buf[0..header_end];
var content_length: usize = 0;
var lines = mem.splitSequence(u8, headers, "\r\n");
while (lines.next()) |line| {
if (std.ascii.startsWithIgnoreCase(line, "content-length:")) {
const val = mem.trim(u8, line["content-length:".len..], " ");
content_length = std.fmt.parseInt(usize, val, 10) catch 0;
break;
}
}
const body_start = header_end + 4;
const body_received = total_read - body_start;
if (body_received >= content_length) break;
}
}
if (total_read == 0) return;
// Parse and handle request
const request = parseRequest(self.allocator, buf[0..total_read]) catch |err| {
std.log.err("Parse error: {any}", .{err});
const error_response = "HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n";
_ = conn.stream.write(error_response) catch {};
return;
};
defer self.allocator.free(request.headers);
var response = self.handler(&request, self.allocator);
defer response.deinit();
const response_data = response.serialize(self.allocator) catch |err| {
std.log.err("Serialize error: {any}", .{err});
return;
};
defer self.allocator.free(response_data);
_ = conn.stream.write(response_data) catch |err| {
std.log.err("Write error: {any}", .{err});
};
}
pub fn stop(self: *Self) void {
self.running.store(false, .release);
if (self.listener) |*l| {
l.deinit();
self.listener = null;
}
}
};
fn parseRequest(allocator: mem.Allocator, data: []const u8) !Request {
// Find end of headers
const header_end = mem.indexOf(u8, data, "\r\n\r\n") orelse return error.InvalidRequest;
// Parse request line
var lines = mem.splitSequence(u8, data[0..header_end], "\r\n");
const request_line = lines.next() orelse return error.InvalidRequest;
var parts = mem.splitScalar(u8, request_line, ' ');
const method_str = parts.next() orelse return error.InvalidRequest;
const path = parts.next() orelse return error.InvalidRequest;
const method = Method.fromString(method_str) orelse return error.InvalidMethod;
// Parse headers
var headers = std.ArrayList(Header){};
errdefer headers.deinit(allocator);
while (lines.next()) |line| {
if (line.len == 0) break;
const colon = mem.indexOf(u8, line, ":") orelse continue;
const name = mem.trim(u8, line[0..colon], " ");
const value = mem.trim(u8, line[colon + 1 ..], " ");
try headers.append(allocator, .{ .name = name, .value = value });
}
// Body is after \r\n\r\n
const body_start = header_end + 4;
const body = if (body_start < data.len) data[body_start..] else "";
return Request{
.method = method,
.path = path,
.headers = try headers.toOwnedSlice(allocator),
.body = body,
.raw_data = data,
};
}
// Tests
test "parse simple request" {
const allocator = std.testing.allocator;
const raw = "GET /health HTTP/1.1\r\nHost: localhost\r\n\r\n";
const req = try parseRequest(allocator, raw);
defer allocator.free(req.headers);
try std.testing.expectEqual(Method.GET, req.method);
try std.testing.expectEqualStrings("/health", req.path);
}
test "parse request with body" {
const allocator = std.testing.allocator;
const raw = "POST /items HTTP/1.1\r\nHost: localhost\r\nContent-Length: 13\r\n\r\n{\"key\":\"val\"}";
const req = try parseRequest(allocator, raw);
defer allocator.free(req.headers);
try std.testing.expectEqual(Method.POST, req.method);
try std.testing.expectEqualStrings("{\"key\":\"val\"}", req.body);
}
================================================================================
FILE: ./src/main.zig
================================================================================
/// ZynamoDB - A DynamoDB-compatible database using RocksDB
const std = @import("std");
const http = @import("http.zig");
const rocksdb = @import("rocksdb.zig");
const storage = @import("dynamodb/storage.zig");
const handler = @import("dynamodb/handler.zig");
const Config = struct {
host: []const u8 = "0.0.0.0",
port: u16 = 8000,
data_dir: [:0]const u8 = "./data",
verbose: bool = false,
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// Parse command line args
const config = try parseArgs(allocator);
// Print banner
printBanner(config);
// Ensure data directory exists
std.fs.cwd().makePath(config.data_dir) catch |err| {
std.log.err("Failed to create data directory: {any}", .{err});
return;
};
// Initialize storage engine
var engine = storage.StorageEngine.init(allocator, config.data_dir) catch |err| {
std.log.err("Failed to initialize storage: {any}", .{err});
return;
};
defer engine.deinit();
std.log.info("Storage engine initialized at {s}", .{config.data_dir});
// Initialize API handler
var api_handler = handler.ApiHandler.init(allocator, &engine);
handler.setGlobalHandler(&api_handler);
// Start HTTP server
var server = try http.Server.init(allocator, config.host, config.port, handler.httpHandler);
defer server.stop();
std.log.info("Starting DynamoDB-compatible server on {s}:{d}", .{ config.host, config.port });
std.log.info("Ready to accept connections!", .{});
try server.start();
}
fn parseArgs(allocator: std.mem.Allocator) !Config {
var config = Config{};
var args = try std.process.argsWithAllocator(allocator);
defer args.deinit();
// Skip program name
_ = args.next();
while (args.next()) |arg| {
if (std.mem.eql(u8, arg, "--port") or std.mem.eql(u8, arg, "-p")) {
if (args.next()) |port_str| {
config.port = std.fmt.parseInt(u16, port_str, 10) catch 8000;
}
} else if (std.mem.eql(u8, arg, "--host") or std.mem.eql(u8, arg, "-h")) {
if (args.next()) |host| {
config.host = host;
}
} else if (std.mem.eql(u8, arg, "--data-dir") or std.mem.eql(u8, arg, "-d")) {
if (args.next()) |dir| {
// Need sentinel-terminated string for RocksDB
const owned = try allocator.dupeZ(u8, dir);
config.data_dir = owned;
}
} else if (std.mem.eql(u8, arg, "--verbose") or std.mem.eql(u8, arg, "-v")) {
config.verbose = true;
} else if (std.mem.eql(u8, arg, "--help")) {
printHelp();
std.process.exit(0);
}
}
// Check environment variables
if (std.posix.getenv("DYNAMODB_PORT")) |port_str| {
config.port = std.fmt.parseInt(u16, port_str, 10) catch config.port;
}
if (std.posix.getenv("ROCKSDB_DATA_DIR")) |dir| {
config.data_dir = std.mem.span(@as([*:0]const u8, @ptrCast(dir.ptr)));
}
return config;
}
fn printHelp() void {
const help =
\\ZynamoDB - DynamoDB-compatible database
\\
\\Usage: zynamodb [OPTIONS]
\\
\\Options:
\\ -p, --port <PORT> Port to listen on (default: 8000)
\\ -h, --host <HOST> Host to bind to (default: 0.0.0.0)
\\ -d, --data-dir <DIR> Data directory (default: ./data)
\\ -v, --verbose Enable verbose logging
\\ --help Show this help message
\\
\\Environment Variables:
\\ DYNAMODB_PORT Override port
\\ ROCKSDB_DATA_DIR Override data directory
\\
\\Examples:
\\ zynamodb # Start with defaults
\\ zynamodb -p 8080 -d /var/lib/db # Custom port and data dir
\\
;
std.debug.print("{s}", .{help});
}
fn printBanner(config: Config) void {
const banner =
\\
\\ ╔═══════════════════════════════════════════════╗
\\ ║ ║
\\ ║ ███████╗██╗ ██╗███╗ ██╗ █████╗ ║
\\ ║ ╚══███╔╝╚██╗ ██╔╝████╗ ██║██╔══██╗ ║
\\ ║ ███╔╝ ╚████╔╝ ██╔██╗ ██║███████║ ║
\\ ║ ███╔╝ ╚██╔╝ ██║╚██╗██║██╔══██║ ║
\\ ║ ███████╗ ██║ ██║ ╚████║██║ ██║ ║
\\ ║ ╚══════╝ ╚═╝ ╚═╝ ╚═══╝╚═╝ ╚═╝ ║
\\ ║ ║
\\ ║ DynamoDB-Compatible Database ║
\\ ║ Powered by RocksDB + Zig ║
\\ ║ ║
\\ ╚═══════════════════════════════════════════════╝
\\
;
std.debug.print("{s}", .{banner});
std.debug.print(" Port: {d} | Data Dir: {s}\n\n", .{ config.port, config.data_dir });
}
// Re-export modules for testing
pub const _rocksdb = rocksdb;
pub const _http = http;
pub const _storage = storage;
pub const _handler = handler;
test {
std.testing.refAllDeclsRecursive(@This());
}
================================================================================
FILE: ./src/rocksdb.zig
================================================================================
/// RocksDB bindings for Zig via the C API
const std = @import("std");
pub const c = @cImport({
@cInclude("rocksdb/c.h");
});
pub const RocksDBError = error{
OpenFailed,
WriteFailed,
ReadFailed,
DeleteFailed,
InvalidArgument,
Corruption,
NotFound,
IOError,
Unknown,
};
pub const DB = struct {
handle: *c.rocksdb_t,
options: *c.rocksdb_options_t,
write_options: *c.rocksdb_writeoptions_t,
read_options: *c.rocksdb_readoptions_t,
const Self = @This();
pub fn open(path: [*:0]const u8, create_if_missing: bool) RocksDBError!Self {
const options = c.rocksdb_options_create() orelse return RocksDBError.Unknown;
c.rocksdb_options_set_create_if_missing(options, if (create_if_missing) 1 else 0);
// Performance options
c.rocksdb_options_increase_parallelism(options, @as(c_int, @intCast(std.Thread.getCpuCount() catch 4)));
c.rocksdb_options_optimize_level_style_compaction(options, 512 * 1024 * 1024); // 512MB
c.rocksdb_options_set_compression(options, c.rocksdb_lz4_compression);
var err: [*c]u8 = null;
const db = c.rocksdb_open(options, path, &err);
if (err != null) {
defer c.rocksdb_free(err);
return RocksDBError.OpenFailed;
}
const write_options = c.rocksdb_writeoptions_create() orelse {
c.rocksdb_close(db);
c.rocksdb_options_destroy(options);
return RocksDBError.Unknown;
};
const read_options = c.rocksdb_readoptions_create() orelse {
c.rocksdb_writeoptions_destroy(write_options);
c.rocksdb_close(db);
c.rocksdb_options_destroy(options);
return RocksDBError.Unknown;
};
return Self{
.handle = db.?,
.options = options,
.write_options = write_options,
.read_options = read_options,
};
}
pub fn close(self: *Self) void {
c.rocksdb_readoptions_destroy(self.read_options);
c.rocksdb_writeoptions_destroy(self.write_options);
c.rocksdb_close(self.handle);
c.rocksdb_options_destroy(self.options);
}
pub fn put(self: *Self, key: []const u8, value: []const u8) RocksDBError!void {
var err: [*c]u8 = null;
c.rocksdb_put(
self.handle,
self.write_options,
key.ptr,
key.len,
value.ptr,
value.len,
&err,
);
if (err != null) {
defer c.rocksdb_free(err);
return RocksDBError.WriteFailed;
}
}
pub fn get(self: *Self, allocator: std.mem.Allocator, key: []const u8) RocksDBError!?[]u8 {
var err: [*c]u8 = null;
var value_len: usize = 0;
const value = c.rocksdb_get(
self.handle,
self.read_options,
key.ptr,
key.len,
&value_len,
&err,
);
if (err != null) {
defer c.rocksdb_free(err);
return RocksDBError.ReadFailed;
}
if (value == null) {
return null;
}
defer c.rocksdb_free(value);
const result = allocator.alloc(u8, value_len) catch return RocksDBError.Unknown;
@memcpy(result, value[0..value_len]);
return result;
}
pub fn delete(self: *Self, key: []const u8) RocksDBError!void {
var err: [*c]u8 = null;
c.rocksdb_delete(
self.handle,
self.write_options,
key.ptr,
key.len,
&err,
);
if (err != null) {
defer c.rocksdb_free(err);
return RocksDBError.DeleteFailed;
}
}
pub fn flush(self: *Self) RocksDBError!void {
const flush_opts = c.rocksdb_flushoptions_create() orelse return RocksDBError.Unknown;
defer c.rocksdb_flushoptions_destroy(flush_opts);
var err: [*c]u8 = null;
c.rocksdb_flush(self.handle, flush_opts, &err);
if (err != null) {
defer c.rocksdb_free(err);
return RocksDBError.IOError;
}
}
};
pub const WriteBatch = struct {
handle: *c.rocksdb_writebatch_t,
const Self = @This();
pub fn init() ?Self {
const handle = c.rocksdb_writebatch_create() orelse return null;
return Self{ .handle = handle };
}
pub fn deinit(self: *Self) void {
c.rocksdb_writebatch_destroy(self.handle);
}
pub fn put(self: *Self, key: []const u8, value: []const u8) void {
c.rocksdb_writebatch_put(self.handle, key.ptr, key.len, value.ptr, value.len);
}
pub fn delete(self: *Self, key: []const u8) void {
c.rocksdb_writebatch_delete(self.handle, key.ptr, key.len);
}
pub fn clear(self: *Self) void {
c.rocksdb_writebatch_clear(self.handle);
}
pub fn write(self: *Self, db: *DB) RocksDBError!void {
var err: [*c]u8 = null;
c.rocksdb_write(db.handle, db.write_options, self.handle, &err);
if (err != null) {
defer c.rocksdb_free(err);
return RocksDBError.WriteFailed;
}
}
};
pub const Iterator = struct {
handle: *c.rocksdb_iterator_t,
const Self = @This();
pub fn init(db: *DB) ?Self {
const handle = c.rocksdb_create_iterator(db.handle, db.read_options) orelse return null;
return Self{ .handle = handle };
}
pub fn deinit(self: *Self) void {
c.rocksdb_iter_destroy(self.handle);
}
pub fn seekToFirst(self: *Self) void {
c.rocksdb_iter_seek_to_first(self.handle);
}
pub fn seekToLast(self: *Self) void {
c.rocksdb_iter_seek_to_last(self.handle);
}
pub fn seek(self: *Self, target: []const u8) void {
c.rocksdb_iter_seek(self.handle, target.ptr, target.len);
}
pub fn seekForPrev(self: *Self, target: []const u8) void {
c.rocksdb_iter_seek_for_prev(self.handle, target.ptr, target.len);
}
pub fn valid(self: *Self) bool {
return c.rocksdb_iter_valid(self.handle) != 0;
}
pub fn next(self: *Self) void {
c.rocksdb_iter_next(self.handle);
}
pub fn prev(self: *Self) void {
c.rocksdb_iter_prev(self.handle);
}
pub fn key(self: *Self) ?[]const u8 {
var len: usize = 0;
const k = c.rocksdb_iter_key(self.handle, &len);
if (k == null) return null;
return k[0..len];
}
pub fn value(self: *Self) ?[]const u8 {
var len: usize = 0;
const v = c.rocksdb_iter_value(self.handle, &len);
if (v == null) return null;
return v[0..len];
}
};
// Tests
test "rocksdb basic operations" {
const allocator = std.testing.allocator;
// Use temp directory
const path = "/tmp/test_rocksdb_basic";
defer {
std.fs.deleteTreeAbsolute(path) catch {};
}
var db = try DB.open(path, true);
defer db.close();
// Put and get
try db.put("hello", "world");
const val = try db.get(allocator, "hello");
try std.testing.expectEqualStrings("world", val.?);
allocator.free(val.?);
// Delete
try db.delete("hello");
const deleted = try db.get(allocator, "hello");
try std.testing.expect(deleted == null);
}
test "rocksdb write batch" {
const allocator = std.testing.allocator;
const path = "/tmp/test_rocksdb_batch";
defer {
std.fs.deleteTreeAbsolute(path) catch {};
}
var db = try DB.open(path, true);
defer db.close();
var batch = WriteBatch.init() orelse unreachable;
defer batch.deinit();
batch.put("key1", "value1");
batch.put("key2", "value2");
batch.put("key3", "value3");
try batch.write(&db);
const v1 = try db.get(allocator, "key1");
defer if (v1) |v| allocator.free(v);
try std.testing.expectEqualStrings("value1", v1.?);
}
test "rocksdb iterator" {
const path = "/tmp/test_rocksdb_iter";
defer {
std.fs.deleteTreeAbsolute(path) catch {};
}
var db = try DB.open(path, true);
defer db.close();
try db.put("a", "1");
try db.put("b", "2");
try db.put("c", "3");
var iter = Iterator.init(&db) orelse unreachable;
defer iter.deinit();
iter.seekToFirst();
var count: usize = 0;
while (iter.valid()) : (iter.next()) {
count += 1;
}
try std.testing.expectEqual(@as(usize, 3), count);
}
================================================================================
FILE: ./tests/integration.zig
================================================================================
/// Integration tests for ZynamoDB
const std = @import("std");
// Import modules from main source
const rocksdb = @import("../src/rocksdb.zig");
const storage = @import("../src/dynamodb/storage.zig");
const types = @import("../src/dynamodb/types.zig");
test "integration: full table lifecycle" {
const allocator = std.testing.allocator;
// Setup
const path = "/tmp/test_integration_lifecycle";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try storage.StorageEngine.init(allocator, path);
defer engine.deinit();
// Create table
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
const desc = try engine.createTable("Users", &key_schema, &attr_defs);
try std.testing.expectEqualStrings("Users", desc.table_name);
try std.testing.expectEqual(types.TableStatus.ACTIVE, desc.table_status);
// Put items
try engine.putItem("Users", "{\"pk\":{\"S\":\"user1\"},\"name\":{\"S\":\"Alice\"}}");
try engine.putItem("Users", "{\"pk\":{\"S\":\"user2\"},\"name\":{\"S\":\"Bob\"}}");
try engine.putItem("Users", "{\"pk\":{\"S\":\"user3\"},\"name\":{\"S\":\"Charlie\"}}");
// Get item
const item = try engine.getItem("Users", "{\"pk\":{\"S\":\"user1\"}}");
try std.testing.expect(item != null);
defer allocator.free(item.?);
// Scan
const all_items = try engine.scan("Users", null);
defer {
for (all_items) |i| allocator.free(i);
allocator.free(all_items);
}
try std.testing.expectEqual(@as(usize, 3), all_items.len);
// Delete item
try engine.deleteItem("Users", "{\"pk\":{\"S\":\"user2\"}}");
// Verify deletion
const after_delete = try engine.scan("Users", null);
defer {
for (after_delete) |i| allocator.free(i);
allocator.free(after_delete);
}
try std.testing.expectEqual(@as(usize, 2), after_delete.len);
// Delete table
try engine.deleteTable("Users");
// Verify table deletion
const tables = try engine.listTables();
defer allocator.free(tables);
try std.testing.expectEqual(@as(usize, 0), tables.len);
}
test "integration: multiple tables" {
const allocator = std.testing.allocator;
const path = "/tmp/test_integration_multi_table";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try storage.StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
// Create multiple tables
_ = try engine.createTable("Table1", &key_schema, &attr_defs);
_ = try engine.createTable("Table2", &key_schema, &attr_defs);
_ = try engine.createTable("Table3", &key_schema, &attr_defs);
// List tables
const tables = try engine.listTables();
defer {
for (tables) |t| allocator.free(t);
allocator.free(tables);
}
try std.testing.expectEqual(@as(usize, 3), tables.len);
// Put items in different tables
try engine.putItem("Table1", "{\"pk\":{\"S\":\"item1\"}}");
try engine.putItem("Table2", "{\"pk\":{\"S\":\"item2\"}}");
try engine.putItem("Table3", "{\"pk\":{\"S\":\"item3\"}}");
// Verify isolation - scan should only return items from that table
const table1_items = try engine.scan("Table1", null);
defer {
for (table1_items) |i| allocator.free(i);
allocator.free(table1_items);
}
try std.testing.expectEqual(@as(usize, 1), table1_items.len);
}
test "integration: table already exists error" {
const allocator = std.testing.allocator;
const path = "/tmp/test_integration_exists";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try storage.StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
// Create table
_ = try engine.createTable("DuplicateTest", &key_schema, &attr_defs);
// Try to create again - should fail
const result = engine.createTable("DuplicateTest", &key_schema, &attr_defs);
try std.testing.expectError(storage.StorageError.TableAlreadyExists, result);
}
test "integration: table not found error" {
const allocator = std.testing.allocator;
const path = "/tmp/test_integration_notfound";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try storage.StorageEngine.init(allocator, path);
defer engine.deinit();
// Try to put item in non-existent table
const result = engine.putItem("NonExistent", "{\"pk\":{\"S\":\"item1\"}}");
try std.testing.expectError(storage.StorageError.TableNotFound, result);
}
test "integration: scan with limit" {
const allocator = std.testing.allocator;
const path = "/tmp/test_integration_scan_limit";
defer std.fs.deleteTreeAbsolute(path) catch {};
var engine = try storage.StorageEngine.init(allocator, path);
defer engine.deinit();
const key_schema = [_]types.KeySchemaElement{
.{ .attribute_name = "pk", .key_type = .HASH },
};
const attr_defs = [_]types.AttributeDefinition{
.{ .attribute_name = "pk", .attribute_type = .S },
};
_ = try engine.createTable("LimitTest", &key_schema, &attr_defs);
// Add many items
var i: usize = 0;
while (i < 10) : (i += 1) {
var buf: [128]u8 = undefined;
const item = try std.fmt.bufPrint(&buf, "{{\"pk\":{{\"S\":\"item{d}\"}}}}", .{i});
try engine.putItem("LimitTest", item);
}
// Scan with limit
const limited = try engine.scan("LimitTest", 5);
defer {
for (limited) |item| allocator.free(item);
allocator.free(limited);
}
try std.testing.expectEqual(@as(usize, 5), limited.len);
}