From aa7fa3f0047465f1451092cafa078a71d173f6c0 Mon Sep 17 00:00:00 2001 From: biondizzle Date: Tue, 20 Jan 2026 10:48:57 -0500 Subject: [PATCH] fix json parsing to be less of a hack --- project_context.txt | 3903 ------------------------------------------- 1 file changed, 3903 deletions(-) delete mode 100644 project_context.txt diff --git a/project_context.txt b/project_context.txt deleted file mode 100644 index d128927..0000000 --- a/project_context.txt +++ /dev/null @@ -1,3903 +0,0 @@ -# Project: zyna-db -# Generated: Tue Jan 20 10:15:05 AM EST 2026 - -================================================================================ - -================================================================================ -FILE: ./build.zig -================================================================================ - -const std = @import("std"); - -pub fn build(b: *std.Build) void { - const target = b.standardTargetOptions(.{}); - const optimize = b.standardOptimizeOption(.{}); - - // === Main Executable === - const exe_module = b.createModule(.{ - .root_source_file = b.path("src/main.zig"), - .target = target, - .optimize = optimize, - }); - - const exe = b.addExecutable(.{ - .name = "zynamodb", - .root_module = exe_module, - }); - - // Link RocksDB (C library) - exe.linkLibC(); - exe.linkSystemLibrary("rocksdb"); - - // Compression libraries that RocksDB depends on - exe.linkSystemLibrary("snappy"); - exe.linkSystemLibrary("lz4"); - exe.linkSystemLibrary("zstd"); - exe.linkSystemLibrary("z"); - exe.linkSystemLibrary("bz2"); - exe.linkSystemLibrary("stdc++"); - - // Add include path for RocksDB headers - exe.addIncludePath(.{ .cwd_relative = "/usr/local/include" }); - exe.addLibraryPath(.{ .cwd_relative = "/usr/local/lib" }); - - b.installArtifact(exe); - - // === Run Command === - const run_cmd = b.addRunArtifact(exe); - run_cmd.step.dependOn(b.getInstallStep()); - - if (b.args) |args| { - run_cmd.addArgs(args); - } - - const run_step = b.step("run", "Run the DynamoDB-compatible server"); - run_step.dependOn(&run_cmd.step); - - // === Unit Tests === - const unit_tests_module = b.createModule(.{ - .root_source_file = b.path("src/main.zig"), - .target = target, - .optimize = optimize, - }); - - const unit_tests = b.addTest(.{ - .root_module = unit_tests_module, - }); - - unit_tests.linkLibC(); - unit_tests.linkSystemLibrary("rocksdb"); - unit_tests.linkSystemLibrary("snappy"); - unit_tests.linkSystemLibrary("lz4"); - unit_tests.linkSystemLibrary("zstd"); - unit_tests.linkSystemLibrary("z"); - unit_tests.linkSystemLibrary("bz2"); - unit_tests.linkSystemLibrary("stdc++"); - unit_tests.addIncludePath(.{ .cwd_relative = "/usr/local/include" }); - unit_tests.addLibraryPath(.{ .cwd_relative = "/usr/local/lib" }); - - const run_unit_tests = b.addRunArtifact(unit_tests); - const test_step = b.step("test", "Run unit tests"); - test_step.dependOn(&run_unit_tests.step); - - // === Integration Tests === - const integration_tests_module = b.createModule(.{ - .root_source_file = b.path("tests/integration.zig"), - .target = target, - .optimize = optimize, - }); - - const integration_tests = b.addTest(.{ - .root_module = integration_tests_module, - }); - - integration_tests.linkLibC(); - integration_tests.linkSystemLibrary("rocksdb"); - integration_tests.linkSystemLibrary("snappy"); - integration_tests.linkSystemLibrary("lz4"); - integration_tests.linkSystemLibrary("zstd"); - integration_tests.linkSystemLibrary("z"); - integration_tests.linkSystemLibrary("bz2"); - integration_tests.linkSystemLibrary("stdc++"); - integration_tests.addIncludePath(.{ .cwd_relative = "/usr/local/include" }); - integration_tests.addLibraryPath(.{ .cwd_relative = "/usr/local/lib" }); - - const run_integration_tests = b.addRunArtifact(integration_tests); - const integration_step = b.step("test-integration", "Run integration tests"); - integration_step.dependOn(&run_integration_tests.step); - - // === Benchmarks === - const bench_module = b.createModule(.{ - .root_source_file = b.path("src/bench.zig"), - .target = target, - .optimize = .ReleaseFast, - }); - - const bench = b.addExecutable(.{ - .name = "bench", - .root_module = bench_module, - }); - - bench.linkLibC(); - bench.linkSystemLibrary("rocksdb"); - bench.linkSystemLibrary("snappy"); - bench.linkSystemLibrary("lz4"); - bench.linkSystemLibrary("zstd"); - bench.linkSystemLibrary("z"); - bench.linkSystemLibrary("bz2"); - bench.linkSystemLibrary("stdc++"); - bench.addIncludePath(.{ .cwd_relative = "/usr/local/include" }); - bench.addLibraryPath(.{ .cwd_relative = "/usr/local/lib" }); - - const run_bench = b.addRunArtifact(bench); - const bench_step = b.step("bench", "Run benchmarks"); - bench_step.dependOn(&run_bench.step); - - // === Clean === - const clean_step = b.step("clean", "Remove build artifacts"); - clean_step.dependOn(&b.addRemoveDirTree(b.path("zig-out")).step); - clean_step.dependOn(&b.addRemoveDirTree(b.path(".zig-cache")).step); -} - - -================================================================================ -FILE: ./build.zig.zon -================================================================================ - -.{ - .name = .zyna_db, - .version = "0.1.0", - .fingerprint = 0x990c9202941b334a, // ← Copy from error message - .minimum_zig_version = "0.15.1", - - // Specify which files/directories to include in package - .paths = .{ - "build.zig", - "build.zig.zon", - "src", - "tests", - "README.md", - "Makefile", - }, - - .dependencies = .{ - // External package dependencies - }, -} - - -================================================================================ -FILE: ./docker-compose.yml -================================================================================ - -services: - dev: - build: - context: . - dockerfile: Dockerfile - container_name: dynamodb-zig-dev - volumes: - - .:/workspace - - zig-cache:/root/.cache/zig - ports: - - "8000:8000" # DynamoDB API port - - "8080:8080" # Admin/metrics port (optional) - working_dir: /workspace - stdin_open: true - tty: true - command: /bin/bash - - # Optional: Run the actual server - server: - build: - context: . - dockerfile: Dockerfile - container_name: dynamodb-zig-server - volumes: - - .:/workspace - - zig-cache:/root/.cache/zig - - db-data:/workspace/data - ports: - - "8000:8000" - working_dir: /workspace - command: zig build run - - # Optional: DynamoDB Local for compatibility testing - dynamodb-local: - image: amazon/dynamodb-local:latest - container_name: dynamodb-local - ports: - - "8001:8000" - command: "-jar DynamoDBLocal.jar -sharedDb -inMemory" - - # DynamoDB Admin GUI - dynamodb-admin: - image: aaronshaf/dynamodb-admin:latest - container_name: dynamodb-admin - ports: - - "8002:8001" # Admin GUI will be available at localhost:8002 - environment: - - DYNAMO_ENDPOINT=http://server:8000 # Points to your Zig server - # Alternative: Use http://dynamodb-local:8000 to connect to AWS's DynamoDB Local - - AWS_REGION=us-east-1 - - AWS_ACCESS_KEY_ID=local - - AWS_SECRET_ACCESS_KEY=local - depends_on: - - server - -volumes: - zig-cache: - db-data: - -================================================================================ -FILE: ./Dockerfile -================================================================================ - -FROM ubuntu:24.04 - -ENV DEBIAN_FRONTEND=noninteractive - -# Install base dependencies -RUN apt-get update && apt-get install -y \ - curl \ - wget \ - git \ - build-essential \ - cmake \ - ninja-build \ - pkg-config \ - libsnappy-dev \ - liblz4-dev \ - libzstd-dev \ - libbz2-dev \ - zlib1g-dev \ - libgflags-dev \ - xz-utils \ - && rm -rf /var/lib/apt/lists/* - -# Install Zig 0.13.0 (latest stable) -ARG ZIG_VERSION=0.13.0 -RUN curl -L "https://ziglang.org/download/${ZIG_VERSION}/zig-linux-x86_64-${ZIG_VERSION}.tar.xz" | tar -xJ -C /opt \ - && ln -s /opt/zig-linux-x86_64-${ZIG_VERSION}/zig /usr/local/bin/zig - -# Build RocksDB from source with shared library -ARG ROCKSDB_VERSION=9.6.1 -WORKDIR /tmp -RUN git clone --depth 1 --branch v${ROCKSDB_VERSION} https://github.com/facebook/rocksdb.git \ - && cd rocksdb \ - && mkdir build && cd build \ - && cmake .. \ - -DCMAKE_BUILD_TYPE=Release \ - -DWITH_SNAPPY=ON \ - -DWITH_LZ4=ON \ - -DWITH_ZSTD=ON \ - -DWITH_BZ2=ON \ - -DWITH_ZLIB=ON \ - -DWITH_GFLAGS=OFF \ - -DROCKSDB_BUILD_SHARED=ON \ - -DWITH_TESTS=OFF \ - -DWITH_BENCHMARK_TOOLS=OFF \ - -DWITH_TOOLS=OFF \ - -G Ninja \ - && ninja -j$(nproc) \ - && ninja install \ - && ldconfig \ - && cd / && rm -rf /tmp/rocksdb - -# Set up working directory -WORKDIR /workspace - -# Default command -CMD ["/bin/bash"] - - -================================================================================ -FILE: ./Makefile -================================================================================ - -.PHONY: all build release run test test-integration bench clean docker-build docker-shell docker-run docker-test fmt help - -# Default target -all: build - -# === Build Targets === -build: - zig build - -release: - zig build -Doptimize=ReleaseFast - -clean: - rm -rf zig-out .zig-cache data - -fmt: - zig fmt src/ tests/ - -# === Run Targets === -run: build - zig build run - -run-port: build - zig build run -- --port $(PORT) - -# === Test Targets === -test: - zig build test - -test-integration: - zig build test-integration - -test-all: test test-integration - -bench: - zig build bench - -# === Docker Targets === -docker-build: - docker-compose build dev - -docker-shell: - docker-compose run --rm dev - -docker-run: - docker-compose up server - -docker-test: - docker-compose run --rm dev zig build test - -docker-bench: - docker-compose run --rm dev zig build bench - -docker-clean: - docker-compose down -v - docker rmi dynamodb-zig-dev 2>/dev/null || true - -# === AWS CLI Test === -aws-test: - @echo "Creating table..." - aws dynamodb create-table \ - --endpoint-url http://localhost:8000 \ - --table-name TestTable \ - --key-schema AttributeName=pk,KeyType=HASH \ - --attribute-definitions AttributeName=pk,AttributeType=S \ - --billing-mode PAY_PER_REQUEST || true - @echo "\nListing tables..." - aws dynamodb list-tables --endpoint-url http://localhost:8000 - @echo "\nPutting item..." - aws dynamodb put-item \ - --endpoint-url http://localhost:8000 \ - --table-name TestTable \ - --item '{"pk":{"S":"test1"},"data":{"S":"hello world"}}' - @echo "\nGetting item..." - aws dynamodb get-item \ - --endpoint-url http://localhost:8000 \ - --table-name TestTable \ - --key '{"pk":{"S":"test1"}}' - @echo "\nScanning table..." - aws dynamodb scan \ - --endpoint-url http://localhost:8000 \ - --table-name TestTable - -# === Local DynamoDB (for comparison) === -dynamodb-local: - docker-compose up dynamodb-local - -# === Help === -help: - @echo "ZynamoDB Development Commands" - @echo "" - @echo "Build & Run:" - @echo " make build - Build debug version" - @echo " make release - Build optimized release" - @echo " make run - Build and run server" - @echo " make run-port PORT=8080 - Run on custom port" - @echo "" - @echo "Testing:" - @echo " make test - Run unit tests" - @echo " make test-integration - Run integration tests" - @echo " make test-all - Run all tests" - @echo " make bench - Run benchmarks" - @echo "" - @echo "Docker:" - @echo " make docker-build - Build Docker image" - @echo " make docker-shell - Open shell in container" - @echo " make docker-run - Run server in Docker" - @echo " make docker-test - Run tests in Docker" - @echo "" - @echo "Utilities:" - @echo " make clean - Remove build artifacts" - @echo " make fmt - Format source code" - @echo " make aws-test - Test with AWS CLI" - @echo " make help - Show this help" - - -================================================================================ -FILE: ./README.md -================================================================================ - -# ZynamoDB - -A DynamoDB-compatible database built with **Zig** and **RocksDB**. - -## Why Zig? - -Zig was chosen over C++ for several reasons: - -1. **Built-in Memory Safety** - Compile-time safety checks without garbage collection -2. **Seamless C Interop** - RocksDB's C API can be imported directly with `@cImport` -3. **Simple Build System** - `build.zig` replaces complex CMake/Makefile configurations -4. **No Hidden Control Flow** - Explicit error handling, no exceptions -5. **Modern Tooling** - Built-in test framework, documentation generator, and package manager - -## Features - -### Implemented Operations -- ✅ CreateTable -- ✅ DeleteTable -- ✅ DescribeTable -- ✅ ListTables -- ✅ PutItem -- ✅ GetItem -- ✅ DeleteItem -- ✅ Query (basic) -- ✅ Scan - -### Planned Operations -- 🚧 UpdateItem -- 🚧 BatchGetItem -- 🚧 BatchWriteItem -- 🚧 TransactGetItems -- 🚧 TransactWriteItems -- 🚧 Global Secondary Indexes -- 🚧 Local Secondary Indexes - -## Quick Start - -### Using Docker (Recommended) - -```bash -# Build the development container -docker-compose build dev - -# Start a shell in the container -docker-compose run --rm dev - -# Inside the container: -zig build run -``` - -### Native Build (requires Zig 0.13+ and RocksDB) - -```bash -# Install dependencies (Ubuntu/Debian) -sudo apt install librocksdb-dev libsnappy-dev liblz4-dev libzstd-dev - -# Build and run -zig build run -``` - -## Usage - -### Starting the Server - -```bash -# Default (port 8000) -zig build run - -# Custom port -zig build run -- --port 8080 - -# Custom data directory -zig build run -- --data-dir /var/lib/zynamodb -``` - -### Using AWS CLI - -```bash -# Create a table -aws dynamodb create-table \ - --endpoint-url http://localhost:8000 \ - --table-name Users \ - --key-schema AttributeName=pk,KeyType=HASH \ - --attribute-definitions AttributeName=pk,AttributeType=S \ - --billing-mode PAY_PER_REQUEST - -# Put an item -aws dynamodb put-item \ - --endpoint-url http://localhost:8000 \ - --table-name Users \ - --item '{"pk":{"S":"user123"},"name":{"S":"Alice"},"email":{"S":"alice@example.com"}}' - -# Get an item -aws dynamodb get-item \ - --endpoint-url http://localhost:8000 \ - --table-name Users \ - --key '{"pk":{"S":"user123"}}' - -# Scan the table -aws dynamodb scan \ - --endpoint-url http://localhost:8000 \ - --table-name Users - -# List tables -aws dynamodb list-tables --endpoint-url http://localhost:8000 -``` - -### Using Python (boto3) - -```python -import boto3 - -# Connect to local ZynamoDB -dynamodb = boto3.client( - 'dynamodb', - endpoint_url='http://localhost:8000', - region_name='us-east-1', - aws_access_key_id='fake', - aws_secret_access_key='fake' -) - -# Create table -dynamodb.create_table( - TableName='Products', - KeySchema=[{'AttributeName': 'pk', 'KeyType': 'HASH'}], - AttributeDefinitions=[{'AttributeName': 'pk', 'AttributeType': 'S'}], - BillingMode='PAY_PER_REQUEST' -) - -# Put item -dynamodb.put_item( - TableName='Products', - Item={ - 'pk': {'S': 'prod-001'}, - 'name': {'S': 'Widget'}, - 'price': {'N': '29.99'} - } -) - -# Get item -response = dynamodb.get_item( - TableName='Products', - Key={'pk': {'S': 'prod-001'}} -) -print(response.get('Item')) -``` - -## Development - -### Project Structure - -``` -dynamodb-compat/ -├── Dockerfile # Dev container with Zig + RocksDB -├── docker-compose.yml # Container orchestration -├── build.zig # Zig build configuration -├── Makefile # Convenience commands -├── src/ -│ ├── main.zig # Entry point -│ ├── rocksdb.zig # RocksDB C bindings -│ ├── http.zig # HTTP server -│ ├── bench.zig # Performance benchmarks -│ └── dynamodb/ -│ ├── types.zig # DynamoDB protocol types -│ ├── storage.zig # Storage engine (RocksDB mapping) -│ └── handler.zig # API request handlers -└── tests/ - └── integration.zig # Integration tests -``` - -### Build Commands - -```bash -# Build -make build # Debug build -make release # Optimized release - -# Test -make test # Unit tests -make test-integration # Integration tests -make test-all # All tests - -# Run -make run # Start server -make run-port PORT=8080 # Custom port - -# Benchmark -make bench # Run benchmarks - -# Docker -make docker-build # Build container -make docker-shell # Open shell -make docker-test # Run tests in container -``` - -### Running Tests - -```bash -# Unit tests -zig build test - -# Integration tests -zig build test-integration - -# With Docker -docker-compose run --rm dev zig build test -``` - -### Running Benchmarks - -```bash -zig build bench - -# Or with make -make bench -``` - -## Architecture - -### Storage Model - -Data is stored in RocksDB with the following key prefixes: - -| Prefix | Purpose | Format | -|--------|---------|--------| -| `_meta:` | Table metadata | `_meta:{table_name}` | -| `_data:` | Item data | `_data:{table}:{pk}` or `_data:{table}:{pk}:{sk}` | -| `_gsi:` | Global secondary index | `_gsi:{table}:{index}:{pk}:{sk}` | -| `_lsi:` | Local secondary index | `_lsi:{table}:{index}:{pk}:{sk}` | - -### HTTP Server - -- Custom HTTP/1.1 implementation using Zig's stdlib -- Thread-per-connection model (suitable for moderate load) -- Parses `X-Amz-Target` header to route DynamoDB operations - -### DynamoDB Protocol - -- JSON request/response format -- Standard DynamoDB error responses -- Compatible with AWS SDKs (boto3, AWS CLI, etc.) - -## Configuration - -### Command Line Options - -| Option | Description | Default | -|--------|-------------|---------| -| `-p, --port` | HTTP port | 8000 | -| `-h, --host` | Bind address | 0.0.0.0 | -| `-d, --data-dir` | RocksDB data directory | ./data | -| `-v, --verbose` | Enable verbose logging | false | - -### Environment Variables - -| Variable | Description | -|----------|-------------| -| `DYNAMODB_PORT` | Override port | -| `ROCKSDB_DATA_DIR` | Override data directory | - -## Performance - -Preliminary benchmarks on development hardware: - -| Operation | Ops/sec | -|-----------|---------| -| PutItem | ~15,000 | -| GetItem | ~25,000 | -| Scan (1K items) | ~50,000 | - -Run `make bench` for actual numbers on your hardware. - -## Comparison with DynamoDB Local - -| Feature | ZynamoDB | DynamoDB Local | -|---------|----------|----------------| -| Language | Zig | Java | -| Storage | RocksDB | SQLite | -| Memory | ~10MB | ~200MB+ | -| Startup | Instant | 2-5 seconds | -| Persistence | Yes | Optional | - -## License - -MIT - -## Contributing - -Contributions welcome! Please read the contributing guidelines first. - -Areas that need work: -- UpdateItem with expression parsing -- Batch operations -- Secondary indexes -- Streams support -- Better JSON parsing (currently simplified) - - -================================================================================ -FILE: ./src/bench.zig -================================================================================ - -/// Performance benchmarks for ZynamoDB -const std = @import("std"); -const rocksdb = @import("rocksdb.zig"); -const storage = @import("dynamodb/storage.zig"); -const types = @import("dynamodb/types.zig"); - -const BenchResult = struct { - name: []const u8, - ops: u64, - duration_ns: u64, - - pub fn opsPerSec(self: BenchResult) f64 { - return @as(f64, @floatFromInt(self.ops)) / (@as(f64, @floatFromInt(self.duration_ns)) / 1_000_000_000.0); - } - - pub fn print(self: BenchResult) void { - std.debug.print("{s:30} | {d:10} ops | {d:8.2} ms | {d:12.0} ops/sec\n", .{ - self.name, - self.ops, - @as(f64, @floatFromInt(self.duration_ns)) / 1_000_000.0, - self.opsPerSec(), - }); - } -}; - -fn runBench(name: []const u8, ops: u64, func: anytype) BenchResult { - const start = std.time.nanoTimestamp(); - func(); - const end = std.time.nanoTimestamp(); - - return BenchResult{ - .name = name, - .ops = ops, - .duration_ns = @intCast(end - start), - }; -} - -pub fn main() !void { - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; - defer _ = gpa.deinit(); - const allocator = gpa.allocator(); - - std.debug.print("\n", .{}); - std.debug.print("=" ** 70 ++ "\n", .{}); - std.debug.print(" ZynamoDB Performance Benchmarks\n", .{}); - std.debug.print("=" ** 70 ++ "\n\n", .{}); - - // Setup - const path = "/tmp/bench_zynamodb"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - // Raw RocksDB benchmarks - std.debug.print("RocksDB Raw Operations:\n", .{}); - std.debug.print("-" ** 70 ++ "\n", .{}); - - try benchRocksDBWrites(allocator, path); - try benchRocksDBReads(allocator, path); - try benchRocksDBBatch(allocator, path); - try benchRocksDBScan(allocator, path); - - std.debug.print("\n", .{}); - - // Storage engine benchmarks - std.debug.print("Storage Engine Operations:\n", .{}); - std.debug.print("-" ** 70 ++ "\n", .{}); - - try benchStoragePutItem(allocator, path); - try benchStorageGetItem(allocator, path); - try benchStorageScan(allocator, path); - - std.debug.print("\n" ++ "=" ** 70 ++ "\n", .{}); -} - -fn benchRocksDBWrites(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = allocator; - const path = "/tmp/bench_rocksdb_writes"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var db = try rocksdb.DB.open(path, true); - defer db.close(); - - const ops: u64 = 10000; - var key_buf: [32]u8 = undefined; - var val_buf: [256]u8 = undefined; - - const result = runBench("Sequential Writes", ops, struct { - fn run(d: *rocksdb.DB, kb: *[32]u8, vb: *[256]u8, n: u64) void { - var i: u64 = 0; - while (i < n) : (i += 1) { - const key = std.fmt.bufPrint(kb, "key_{d:0>10}", .{i}) catch continue; - const val = std.fmt.bufPrint(vb, "value_{d}_padding_data_to_make_it_realistic", .{i}) catch continue; - d.put(key, val) catch {}; - } - } - }.run, .{ &db, &key_buf, &val_buf, ops }); - - _ = base_path; - result.print(); -} - -fn benchRocksDBReads(allocator: std.mem.Allocator, base_path: []const u8) !void { - const path = "/tmp/bench_rocksdb_reads"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var db = try rocksdb.DB.open(path, true); - defer db.close(); - - // First write some data - var key_buf: [32]u8 = undefined; - var val_buf: [256]u8 = undefined; - - const ops: u64 = 10000; - var i: u64 = 0; - while (i < ops) : (i += 1) { - const key = try std.fmt.bufPrint(&key_buf, "key_{d:0>10}", .{i}); - const val = try std.fmt.bufPrint(&val_buf, "value_{d}_padding", .{i}); - try db.put(key, val); - } - - // Now benchmark reads - var prng = std.Random.DefaultPrng.init(12345); - const random = prng.random(); - - const result = runBench("Random Reads", ops, struct { - fn run(d: *rocksdb.DB, alloc: std.mem.Allocator, kb: *[32]u8, r: std.Random, n: u64) void { - var j: u64 = 0; - while (j < n) : (j += 1) { - const idx = r.intRangeAtMost(u64, 0, n - 1); - const key = std.fmt.bufPrint(kb, "key_{d:0>10}", .{idx}) catch continue; - const val = d.get(alloc, key) catch continue; - if (val) |v| alloc.free(v); - } - } - }.run, .{ &db, allocator, &key_buf, random, ops }); - - _ = base_path; - result.print(); -} - -fn benchRocksDBBatch(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = allocator; - const path = "/tmp/bench_rocksdb_batch"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var db = try rocksdb.DB.open(path, true); - defer db.close(); - - const ops: u64 = 10000; - var key_buf: [32]u8 = undefined; - var val_buf: [256]u8 = undefined; - - const result = runBench("Batch Writes", ops, struct { - fn run(d: *rocksdb.DB, kb: *[32]u8, vb: *[256]u8, n: u64) void { - var batch = rocksdb.WriteBatch.init() orelse return; - defer batch.deinit(); - - var i: u64 = 0; - while (i < n) : (i += 1) { - const key = std.fmt.bufPrint(kb, "batch_key_{d:0>10}", .{i}) catch continue; - const val = std.fmt.bufPrint(vb, "batch_value_{d}", .{i}) catch continue; - batch.put(key, val); - } - - batch.write(d) catch {}; - } - }.run, .{ &db, &key_buf, &val_buf, ops }); - - _ = base_path; - result.print(); -} - -fn benchRocksDBScan(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = allocator; - const path = "/tmp/bench_rocksdb_scan"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var db = try rocksdb.DB.open(path, true); - defer db.close(); - - // Write data - var key_buf: [32]u8 = undefined; - var val_buf: [256]u8 = undefined; - - const ops: u64 = 10000; - var i: u64 = 0; - while (i < ops) : (i += 1) { - const key = try std.fmt.bufPrint(&key_buf, "scan_key_{d:0>10}", .{i}); - const val = try std.fmt.bufPrint(&val_buf, "scan_value_{d}", .{i}); - try db.put(key, val); - } - - const result = runBench("Full Scan", ops, struct { - fn run(d: *rocksdb.DB, n: u64) void { - _ = n; - var iter = rocksdb.Iterator.init(d) orelse return; - defer iter.deinit(); - - iter.seekToFirst(); - var count: u64 = 0; - while (iter.valid()) { - _ = iter.key(); - _ = iter.value(); - count += 1; - iter.next(); - } - } - }.run, .{ &db, ops }); - - _ = base_path; - result.print(); -} - -fn benchStoragePutItem(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = base_path; - const path = "/tmp/bench_storage_put"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try storage.StorageEngine.init(allocator, path); - defer engine.deinit(); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - _ = try engine.createTable("BenchTable", &key_schema, &attr_defs); - - const ops: u64 = 5000; - var item_buf: [512]u8 = undefined; - - const start = std.time.nanoTimestamp(); - var i: u64 = 0; - while (i < ops) : (i += 1) { - const item = std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"name\":{{\"S\":\"User {d}\"}},\"email\":{{\"S\":\"user{d}@example.com\"}}}}", .{ i, i, i }) catch continue; - engine.putItem("BenchTable", item) catch {}; - } - const end = std.time.nanoTimestamp(); - - const result = BenchResult{ - .name = "PutItem", - .ops = ops, - .duration_ns = @intCast(end - start), - }; - result.print(); -} - -fn benchStorageGetItem(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = base_path; - const path = "/tmp/bench_storage_get"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try storage.StorageEngine.init(allocator, path); - defer engine.deinit(); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - _ = try engine.createTable("BenchTable", &key_schema, &attr_defs); - - // Write data first - const ops: u64 = 5000; - var item_buf: [512]u8 = undefined; - var key_buf: [128]u8 = undefined; - - var i: u64 = 0; - while (i < ops) : (i += 1) { - const item = try std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"data\":{{\"S\":\"test\"}}}}", .{i}); - try engine.putItem("BenchTable", item); - } - - // Benchmark reads - var prng = std.Random.DefaultPrng.init(12345); - const random = prng.random(); - - const start = std.time.nanoTimestamp(); - i = 0; - while (i < ops) : (i += 1) { - const idx = random.intRangeAtMost(u64, 0, ops - 1); - const key = std.fmt.bufPrint(&key_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}}}}", .{idx}) catch continue; - const item = engine.getItem("BenchTable", key) catch continue; - if (item) |v| allocator.free(v); - } - const end = std.time.nanoTimestamp(); - - const result = BenchResult{ - .name = "GetItem", - .ops = ops, - .duration_ns = @intCast(end - start), - }; - result.print(); -} - -fn benchStorageScan(allocator: std.mem.Allocator, base_path: []const u8) !void { - _ = base_path; - const path = "/tmp/bench_storage_scan"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try storage.StorageEngine.init(allocator, path); - defer engine.deinit(); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - _ = try engine.createTable("BenchTable", &key_schema, &attr_defs); - - // Write data first - const ops: u64 = 5000; - var item_buf: [512]u8 = undefined; - - var i: u64 = 0; - while (i < ops) : (i += 1) { - const item = try std.fmt.bufPrint(&item_buf, "{{\"pk\":{{\"S\":\"user{d:0>10}\"}},\"data\":{{\"S\":\"test\"}}}}", .{i}); - try engine.putItem("BenchTable", item); - } - - // Benchmark scan - const start = std.time.nanoTimestamp(); - const items = try engine.scan("BenchTable", null); - const end = std.time.nanoTimestamp(); - - // Cleanup - for (items) |item| allocator.free(item); - allocator.free(items); - - const result = BenchResult{ - .name = "Scan (full table)", - .ops = ops, - .duration_ns = @intCast(end - start), - }; - result.print(); -} - - -================================================================================ -FILE: ./src/dynamodb/handler.zig -================================================================================ - -/// DynamoDB API request handlers -const std = @import("std"); -const http = @import("../http.zig"); -const storage = @import("storage.zig"); -const types = @import("types.zig"); -const json = @import("json.zig"); - -pub const ApiHandler = struct { - engine: *storage.StorageEngine, - allocator: std.mem.Allocator, - - const Self = @This(); - - pub fn init(allocator: std.mem.Allocator, engine: *storage.StorageEngine) Self { - return .{ - .engine = engine, - .allocator = allocator, - }; - } - - pub fn handle(self: *Self, request: *const http.Request) http.Response { - var response = http.Response.init(self.allocator); - - // Add standard DynamoDB headers - response.addHeader("Content-Type", "application/x-amz-json-1.0") catch {}; - response.addHeader("x-amzn-RequestId", "local-request-id") catch {}; - - // Get operation from X-Amz-Target header - const target = request.getHeader("X-Amz-Target") orelse { - return self.errorResponse(&response, .ValidationException, "Missing X-Amz-Target header"); - }; - - const operation = types.Operation.fromTarget(target); - - switch (operation) { - .CreateTable => self.handleCreateTable(request, &response), - .DeleteTable => self.handleDeleteTable(request, &response), - .DescribeTable => self.handleDescribeTable(request, &response), - .ListTables => self.handleListTables(request, &response), - .PutItem => self.handlePutItem(request, &response), - .GetItem => self.handleGetItem(request, &response), - .DeleteItem => self.handleDeleteItem(request, &response), - .Query => self.handleQuery(request, &response), - .Scan => self.handleScan(request, &response), - .Unknown => { - return self.errorResponse(&response, .ValidationException, "Unknown operation"); - }, - else => { - return self.errorResponse(&response, .ValidationException, "Operation not implemented"); - }, - } - - return response; - } - - fn handleCreateTable(self: *Self, request: *const http.Request, response: *http.Response) void { - // Parse the entire request body - const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, request.body, .{}) catch { - _ = self.errorResponse(response, .ValidationException, "Invalid JSON"); - return; - }; - defer parsed.deinit(); - - const root = switch (parsed.value) { - .object => |o| o, - else => { - _ = self.errorResponse(response, .ValidationException, "Request must be an object"); - return; - }, - }; - - // Extract TableName - const table_name_val = root.get("TableName") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing TableName"); - return; - }; - const table_name = switch (table_name_val) { - .string => |s| s, - else => { - _ = self.errorResponse(response, .ValidationException, "TableName must be a string"); - return; - }, - }; - - // For now, use simplified key schema (you can enhance this later to parse from request) - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - const desc = self.engine.createTable(table_name, &key_schema, &attr_defs) catch |err| { - switch (err) { - storage.StorageError.TableAlreadyExists => { - _ = self.errorResponse(response, .ResourceInUseException, "Table already exists"); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to create table"); - }, - } - return; - }; - - // Build response - const resp_body = std.fmt.allocPrint( - self.allocator, - "{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"CreationDateTime\":{d}}}}}", - .{ desc.table_name, desc.table_status.toString(), desc.creation_date_time }, - ) catch { - _ = self.errorResponse(response, .InternalServerError, "Serialization failed"); - return; - }; - defer self.allocator.free(resp_body); - - response.setBody(resp_body) catch {}; - } - - fn handleDeleteTable(self: *Self, request: *const http.Request, response: *http.Response) void { - const table_name = extractJsonString(request.body, "TableName") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing TableName"); - return; - }; - - self.engine.deleteTable(table_name) catch |err| { - switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to delete table"); - }, - } - return; - }; - - const resp_body = std.fmt.allocPrint( - self.allocator, - "{{\"TableDescription\":{{\"TableName\":\"{s}\",\"TableStatus\":\"DELETING\"}}}}", - .{table_name}, - ) catch return; - defer self.allocator.free(resp_body); - - response.setBody(resp_body) catch {}; - } - - fn handleDescribeTable(self: *Self, request: *const http.Request, response: *http.Response) void { - const table_name = extractJsonString(request.body, "TableName") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing TableName"); - return; - }; - - const desc = self.engine.describeTable(table_name) catch |err| { - switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to describe table"); - }, - } - return; - }; - - const resp_body = std.fmt.allocPrint( - self.allocator, - "{{\"Table\":{{\"TableName\":\"{s}\",\"TableStatus\":\"{s}\",\"ItemCount\":{d},\"TableSizeBytes\":{d}}}}}", - .{ desc.table_name, desc.table_status.toString(), desc.item_count, desc.table_size_bytes }, - ) catch return; - defer self.allocator.free(resp_body); - - response.setBody(resp_body) catch {}; - } - - fn handleListTables(self: *Self, request: *const http.Request, response: *http.Response) void { - _ = request; - - const tables = self.engine.listTables() catch { - _ = self.errorResponse(response, .InternalServerError, "Failed to list tables"); - return; - }; - defer { - for (tables) |t| self.allocator.free(t); - self.allocator.free(tables); - } - - var buf = std.ArrayList(u8).init(self.allocator); - defer buf.deinit(); - const writer = buf.writer(); - - writer.writeAll("{\"TableNames\":[") catch return; - for (tables, 0..) |table, i| { - if (i > 0) writer.writeByte(',') catch return; - writer.print("\"{s}\"", .{table}) catch return; - } - writer.writeAll("]}") catch return; - - response.setBody(buf.items) catch {}; - } - - fn handlePutItem(self: *Self, request: *const http.Request, response: *http.Response) void { - const table_name = extractJsonString(request.body, "TableName") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing TableName"); - return; - }; - - // Extract Item JSON from request - const item_json = extractJsonObject(request.body, "Item") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing or invalid Item"); - return; - }; - - self.engine.putItem(table_name, item_json) catch |err| { - switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); - }, - storage.StorageError.MissingKeyAttribute => { - _ = self.errorResponse(response, .ValidationException, "Item missing required key attribute"); - }, - storage.StorageError.InvalidKey => { - _ = self.errorResponse(response, .ValidationException, "Invalid item format"); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to put item"); - }, - } - return; - }; - - response.setBody("{}") catch {}; - } - - fn handleGetItem(self: *Self, request: *const http.Request, response: *http.Response) void { - const table_name = extractJsonString(request.body, "TableName") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing TableName"); - return; - }; - - // Extract Key JSON from request - const key_json = extractJsonObject(request.body, "Key") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing or invalid Key"); - return; - }; - - const item = self.engine.getItem(table_name, key_json) catch |err| { - switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); - }, - storage.StorageError.MissingKeyAttribute => { - _ = self.errorResponse(response, .ValidationException, "Key missing required attributes"); - }, - storage.StorageError.InvalidKey => { - _ = self.errorResponse(response, .ValidationException, "Invalid key format"); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to get item"); - }, - } - return; - }; - - if (item) |i| { - defer self.allocator.free(i); - const resp = std.fmt.allocPrint(self.allocator, "{{\"Item\":{s}}}", .{i}) catch return; - defer self.allocator.free(resp); - response.setBody(resp) catch {}; - } else { - response.setBody("{}") catch {}; - } - } - - fn handleDeleteItem(self: *Self, request: *const http.Request, response: *http.Response) void { - const table_name = extractJsonString(request.body, "TableName") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing TableName"); - return; - }; - - const key_json = extractJsonObject(request.body, "Key") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing or invalid Key"); - return; - }; - - self.engine.deleteItem(table_name, key_json) catch |err| { - switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); - }, - storage.StorageError.MissingKeyAttribute => { - _ = self.errorResponse(response, .ValidationException, "Key missing required attributes"); - }, - storage.StorageError.InvalidKey => { - _ = self.errorResponse(response, .ValidationException, "Invalid key format"); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Failed to delete item"); - }, - } - return; - }; - - response.setBody("{}") catch {}; - } - - fn handleQuery(self: *Self, request: *const http.Request, response: *http.Response) void { - const table_name = extractJsonString(request.body, "TableName") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing TableName"); - return; - }; - - // Simplified: extract partition key value from ExpressionAttributeValues - const pk_value = extractJsonString(request.body, ":pk") orelse "default"; - - const items = self.engine.query(table_name, pk_value, null) catch |err| { - switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Query failed"); - }, - } - return; - }; - defer { - for (items) |item| self.allocator.free(item); - self.allocator.free(items); - } - - self.writeItemsResponse(response, items); - } - - fn handleScan(self: *Self, request: *const http.Request, response: *http.Response) void { - const table_name = extractJsonString(request.body, "TableName") orelse { - _ = self.errorResponse(response, .ValidationException, "Missing TableName"); - return; - }; - - const items = self.engine.scan(table_name, null) catch |err| { - switch (err) { - storage.StorageError.TableNotFound => { - _ = self.errorResponse(response, .ResourceNotFoundException, "Table not found"); - }, - else => { - _ = self.errorResponse(response, .InternalServerError, "Scan failed"); - }, - } - return; - }; - defer { - for (items) |item| self.allocator.free(item); - self.allocator.free(items); - } - - self.writeItemsResponse(response, items); - } - - fn writeItemsResponse(self: *Self, response: *http.Response, items: []const []const u8) void { - var buf = std.ArrayList(u8).init(self.allocator); - defer buf.deinit(); - const writer = buf.writer(); - - writer.writeAll("{\"Items\":[") catch return; - for (items, 0..) |item, i| { - if (i > 0) writer.writeByte(',') catch return; - writer.writeAll(item) catch return; - } - writer.print("],\"Count\":{d},\"ScannedCount\":{d}}}", .{ items.len, items.len }) catch return; - - response.setBody(buf.items) catch {}; - } - - fn errorResponse(self: *Self, response: *http.Response, err_type: types.DynamoDBErrorType, message: []const u8) http.Response { - response.setStatus(switch (err_type) { - .ResourceNotFoundException => .not_found, - .ResourceInUseException => .conflict, - .ValidationException => .bad_request, - else => .internal_server_error, - }); - - const body = err_type.toErrorResponse(message, self.allocator) catch return response.*; - response.setBody(body) catch {}; - self.allocator.free(body); - return response.*; - } -}; - -/// Extract a simple string value from JSON -/// This is a temporary helper until we fully migrate to proper JSON parsing -fn extractJsonString(json_data: []const u8, key: []const u8) ?[]const u8 { - var search_buf: [256]u8 = undefined; - const search = std.fmt.bufPrint(&search_buf, "\"{s}\":\"", .{key}) catch return null; - - const start = std.mem.indexOf(u8, json_data, search) orelse return null; - const value_start = start + search.len; - const value_end = std.mem.indexOfPos(u8, json_data, value_start, "\"") orelse return null; - return json_data[value_start..value_end]; -} - -/// Extract a JSON object from request body -/// Returns the slice containing the complete object including braces -fn extractJsonObject(json_data: []const u8, key: []const u8) ?[]const u8 { - var search_buf: [256]u8 = undefined; - const search = std.fmt.bufPrint(&search_buf, "\"{s}\":", .{key}) catch return null; - - const key_start = std.mem.indexOf(u8, json_data, search) orelse return null; - const value_start_search = key_start + search.len; - - // Skip whitespace - var value_start = value_start_search; - while (value_start < json_data.len and (json_data[value_start] == ' ' or json_data[value_start] == '\t' or json_data[value_start] == '\n')) { - value_start += 1; - } - - if (value_start >= json_data.len or json_data[value_start] != '{') { - return null; - } - - // Find matching closing brace - var brace_count: i32 = 0; - var pos = value_start; - while (pos < json_data.len) : (pos += 1) { - if (json_data[pos] == '{') { - brace_count += 1; - } else if (json_data[pos] == '}') { - brace_count -= 1; - if (brace_count == 0) { - return json_data[value_start .. pos + 1]; - } - } - } - - return null; -} - -// Global handler for use with http.Server -var global_handler: ?*ApiHandler = null; - -pub fn setGlobalHandler(handler: *ApiHandler) void { - global_handler = handler; -} - -pub fn httpHandler(request: *const http.Request, allocator: std.mem.Allocator) http.Response { - if (global_handler) |h| { - return h.handle(request); - } - - var response = http.Response.init(allocator); - response.setStatus(.internal_server_error); - response.setBody("{\"error\":\"Handler not initialized\"}") catch {}; - return response; -} - - -================================================================================ -FILE: ./src/dynamodb/json.zig -================================================================================ - -/// DynamoDB JSON parsing and serialization -/// Pure functions for converting between DynamoDB JSON format and internal types -const std = @import("std"); -const types = @import("types.zig"); - -// ============================================================================ -// Parsing (JSON → Types) -// ============================================================================ - -/// Parse DynamoDB JSON format into an Item -/// Caller owns returned Item and must call deinitItem() when done -pub fn parseItem(allocator: std.mem.Allocator, json_bytes: []const u8) !types.Item { - const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json_bytes, .{}); - defer parsed.deinit(); - - const obj = switch (parsed.value) { - .object => |o| o, - else => return error.InvalidItemFormat, - }; - - var item = types.Item.init(allocator); - errdefer deinitItem(&item, allocator); - - var iter = obj.iterator(); - while (iter.next()) |entry| { - const attr_name = try allocator.dupe(u8, entry.key_ptr.*); - errdefer allocator.free(attr_name); - - var attr_value = try parseAttributeValue(allocator, entry.value_ptr.*); - errdefer deinitAttributeValue(&attr_value, allocator); - - try item.put(attr_name, attr_value); - } - - return item; -} - -/// Parse a single DynamoDB AttributeValue from JSON -/// Format: {"S": "value"}, {"N": "123"}, {"M": {...}}, etc. -pub fn parseAttributeValue(allocator: std.mem.Allocator, value: std.json.Value) error{ InvalidAttributeFormat, InvalidStringAttribute, InvalidNumberAttribute, InvalidBinaryAttribute, InvalidBoolAttribute, InvalidNullAttribute, InvalidStringSetAttribute, InvalidNumberSetAttribute, InvalidBinarySetAttribute, InvalidListAttribute, InvalidMapAttribute, UnknownAttributeType, OutOfMemory }!types.AttributeValue { - const obj = switch (value) { - .object => |o| o, - else => return error.InvalidAttributeFormat, - }; - - // DynamoDB attribute must have exactly one key (the type indicator) - if (obj.count() != 1) return error.InvalidAttributeFormat; - - var iter = obj.iterator(); - const entry = iter.next() orelse return error.InvalidAttributeFormat; - - const type_name = entry.key_ptr.*; - const type_value = entry.value_ptr.*; - - // String - if (std.mem.eql(u8, type_name, "S")) { - const str = switch (type_value) { - .string => |s| s, - else => return error.InvalidStringAttribute, - }; - return types.AttributeValue{ .S = try allocator.dupe(u8, str) }; - } - - // Number (stored as string) - if (std.mem.eql(u8, type_name, "N")) { - const str = switch (type_value) { - .string => |s| s, - else => return error.InvalidNumberAttribute, - }; - return types.AttributeValue{ .N = try allocator.dupe(u8, str) }; - } - - // Binary (base64 string) - if (std.mem.eql(u8, type_name, "B")) { - const str = switch (type_value) { - .string => |s| s, - else => return error.InvalidBinaryAttribute, - }; - return types.AttributeValue{ .B = try allocator.dupe(u8, str) }; - } - - // Boolean - if (std.mem.eql(u8, type_name, "BOOL")) { - const b = switch (type_value) { - .bool => |b_val| b_val, - else => return error.InvalidBoolAttribute, - }; - return types.AttributeValue{ .BOOL = b }; - } - - // Null - if (std.mem.eql(u8, type_name, "NULL")) { - const n = switch (type_value) { - .bool => |b| b, - else => return error.InvalidNullAttribute, - }; - return types.AttributeValue{ .NULL = n }; - } - - // String Set - if (std.mem.eql(u8, type_name, "SS")) { - const arr = switch (type_value) { - .array => |a| a, - else => return error.InvalidStringSetAttribute, - }; - - var strings = try allocator.alloc([]const u8, arr.items.len); - errdefer allocator.free(strings); - - for (arr.items, 0..) |item, i| { - const str = switch (item) { - .string => |s| s, - else => { - // Cleanup already allocated strings - for (strings[0..i]) |s| allocator.free(s); - return error.InvalidStringSetAttribute; - }, - }; - strings[i] = try allocator.dupe(u8, str); - } - return types.AttributeValue{ .SS = strings }; - } - - // Number Set - if (std.mem.eql(u8, type_name, "NS")) { - const arr = switch (type_value) { - .array => |a| a, - else => return error.InvalidNumberSetAttribute, - }; - - var numbers = try allocator.alloc([]const u8, arr.items.len); - errdefer allocator.free(numbers); - - for (arr.items, 0..) |item, i| { - const str = switch (item) { - .string => |s| s, - else => { - for (numbers[0..i]) |n| allocator.free(n); - return error.InvalidNumberSetAttribute; - }, - }; - numbers[i] = try allocator.dupe(u8, str); - } - return types.AttributeValue{ .NS = numbers }; - } - - // Binary Set - if (std.mem.eql(u8, type_name, "BS")) { - const arr = switch (type_value) { - .array => |a| a, - else => return error.InvalidBinarySetAttribute, - }; - - var binaries = try allocator.alloc([]const u8, arr.items.len); - errdefer allocator.free(binaries); - - for (arr.items, 0..) |item, i| { - const str = switch (item) { - .string => |s| s, - else => { - for (binaries[0..i]) |b| allocator.free(b); - return error.InvalidBinarySetAttribute; - }, - }; - binaries[i] = try allocator.dupe(u8, str); - } - return types.AttributeValue{ .BS = binaries }; - } - - // List (recursive) - if (std.mem.eql(u8, type_name, "L")) { - const arr = switch (type_value) { - .array => |a| a, - else => return error.InvalidListAttribute, - }; - - var list = try allocator.alloc(types.AttributeValue, arr.items.len); - errdefer { - for (list[0..arr.items.len]) |*item| { - deinitAttributeValue(item, allocator); - } - allocator.free(list); - } - - for (arr.items, 0..) |item, i| { - list[i] = try parseAttributeValue(allocator, item); - } - return types.AttributeValue{ .L = list }; - } - - // Map (recursive) - if (std.mem.eql(u8, type_name, "M")) { - const obj_val = switch (type_value) { - .object => |o| o, - else => return error.InvalidMapAttribute, - }; - - var map = std.StringHashMap(types.AttributeValue).init(allocator); - errdefer { - var map_iter = map.iterator(); - while (map_iter.next()) |map_entry| { - allocator.free(map_entry.key_ptr.*); - deinitAttributeValue(map_entry.value_ptr, allocator); - } - map.deinit(); - } - - var map_iter = obj_val.iterator(); - while (map_iter.next()) |map_entry| { - const key = try allocator.dupe(u8, map_entry.key_ptr.*); - errdefer allocator.free(key); - - var val = try parseAttributeValue(allocator, map_entry.value_ptr.*); - errdefer deinitAttributeValue(&val, allocator); - - try map.put(key, val); - } - - return types.AttributeValue{ .M = map }; - } - - return error.UnknownAttributeType; -} - -// ============================================================================ -// Serialization (Types → JSON) -// ============================================================================ - -/// Serialize an Item to DynamoDB JSON format -/// Caller owns returned slice and must free it -pub fn serializeItem(allocator: std.mem.Allocator, item: types.Item) ![]u8 { - var buf = std.ArrayList(u8).init(allocator); - errdefer buf.deinit(); - const writer = buf.writer(); - - try types.json.serializeItem(writer, item); - - return buf.toOwnedSlice(); -} - -/// Serialize an AttributeValue to DynamoDB JSON format -/// Caller owns returned slice and must free it -pub fn serializeAttributeValue(allocator: std.mem.Allocator, attr: types.AttributeValue) ![]u8 { - var buf = std.ArrayList(u8).init(allocator); - errdefer buf.deinit(); - const writer = buf.writer(); - - try types.json.serializeAttributeValue(writer, attr); - - return buf.toOwnedSlice(); -} - -// ============================================================================ -// Storage Helpers -// ============================================================================ - -/// Extract just the key attributes from an item based on key schema -/// Returns a new Item containing only the key attributes -/// Caller owns returned Item and must call deinitItem() when done -pub fn extractKeyAttributes( - allocator: std.mem.Allocator, - item: types.Item, - key_schema: []const types.KeySchemaElement, -) !types.Item { - var key = types.Item.init(allocator); - errdefer key.deinit(); - - for (key_schema) |schema_element| { - const attr_value = item.get(schema_element.attribute_name) orelse - return error.MissingKeyAttribute; - - const attr_name = try allocator.dupe(u8, schema_element.attribute_name); - errdefer allocator.free(attr_name); - - // Note: Putting a copy of the pointer, not deep copying the value - try key.put(attr_name, attr_value); - } - - return key; -} - -/// Build a RocksDB storage key from table name and key attributes -/// Format: _data:{table}:{pk} or _data:{table}:{pk}:{sk} -/// Caller owns returned slice and must free it -pub fn buildRocksDBKey( - allocator: std.mem.Allocator, - table_name: []const u8, - key_schema: []const types.KeySchemaElement, - key: types.Item, -) ![]u8 { - const KeyPrefix = struct { - const data = "_data:"; - }; - - // Find partition key and sort key - var pk_value: ?[]const u8 = null; - var sk_value: ?[]const u8 = null; - - for (key_schema) |schema_element| { - const attr = key.get(schema_element.attribute_name) orelse - return error.MissingKeyAttribute; - - // Extract string value from attribute - // DynamoDB keys must be S (string), N (number), or B (binary) - const value = switch (attr) { - .S => |s| s, - .N => |n| n, - .B => |b| b, - else => return error.InvalidKeyType, - }; - - switch (schema_element.key_type) { - .HASH => pk_value = value, - .RANGE => sk_value = value, - } - } - - const pk = pk_value orelse return error.MissingPartitionKey; - - if (sk_value) |sk| { - return std.fmt.allocPrint( - allocator, - "{s}{s}:{s}:{s}", - .{ KeyPrefix.data, table_name, pk, sk }, - ); - } else { - return std.fmt.allocPrint( - allocator, - "{s}{s}:{s}", - .{ KeyPrefix.data, table_name, pk }, - ); - } -} - -/// Helper to extract key from item JSON without full parsing -/// This is a faster path when you just need the storage key -pub fn buildRocksDBKeyFromJson( - allocator: std.mem.Allocator, - table_name: []const u8, - key_schema: []const types.KeySchemaElement, - item_json: []const u8, -) ![]u8 { - const item = try parseItem(allocator, item_json); - defer deinitItem(&item, allocator); - - return buildRocksDBKey(allocator, table_name, key_schema, item); -} - -// ============================================================================ -// Memory Management -// ============================================================================ - -/// Free all memory associated with an AttributeValue -/// Recursively frees nested structures (Maps, Lists) -pub fn deinitAttributeValue(attr: *types.AttributeValue, allocator: std.mem.Allocator) void { - switch (attr.*) { - .S, .N, .B => |slice| allocator.free(slice), - .SS, .NS, .BS => |slices| { - for (slices) |s| allocator.free(s); - allocator.free(slices); - }, - .M => |*map| { - var iter = map.iterator(); - while (iter.next()) |entry| { - allocator.free(entry.key_ptr.*); - deinitAttributeValue(entry.value_ptr, allocator); - } - map.deinit(); - }, - .L => |list| { - for (list) |*item| { - deinitAttributeValue(item, allocator); - } - allocator.free(list); - }, - .NULL, .BOOL => {}, - } -} - -/// Free all memory associated with an Item -pub fn deinitItem(item: *types.Item, allocator: std.mem.Allocator) void { - var iter = item.iterator(); - while (iter.next()) |entry| { - allocator.free(entry.key_ptr.*); - deinitAttributeValue(entry.value_ptr, allocator); - } - item.deinit(); -} - -// ============================================================================ -// Tests -// ============================================================================ - -test "parse simple string attribute" { - const allocator = std.testing.allocator; - - const json_str = "{\"S\":\"hello world\"}"; - const parsed = try std.json.parseFromSlice(std.json.Value, allocator, json_str, .{}); - defer parsed.deinit(); - - var attr = try parseAttributeValue(allocator, parsed.value); - defer deinitAttributeValue(&attr, allocator); - - try std.testing.expectEqualStrings("hello world", attr.S); -} - -test "parse simple item" { - const allocator = std.testing.allocator; - - const json_str = - \\{"pk":{"S":"user123"},"name":{"S":"Alice"},"age":{"N":"25"}} - ; - - var item = try parseItem(allocator, json_str); - defer deinitItem(&item, allocator); - - try std.testing.expectEqual(@as(usize, 3), item.count()); - - const pk = item.get("pk").?; - try std.testing.expectEqualStrings("user123", pk.S); - - const name = item.get("name").?; - try std.testing.expectEqualStrings("Alice", name.S); - - const age = item.get("age").?; - try std.testing.expectEqualStrings("25", age.N); -} - -test "parse nested map" { - const allocator = std.testing.allocator; - - const json_str = - \\{"data":{"M":{"key1":{"S":"value1"},"key2":{"N":"42"}}}} - ; - - var item = try parseItem(allocator, json_str); - defer deinitItem(&item, allocator); - - const data = item.get("data").?; - const inner = data.M.get("key1").?; - try std.testing.expectEqualStrings("value1", inner.S); -} - -test "serialize item round-trip" { - const allocator = std.testing.allocator; - - const original = - \\{"pk":{"S":"test"},"num":{"N":"123"}} - ; - - var item = try parseItem(allocator, original); - defer deinitItem(&item, allocator); - - const serialized = try serializeItem(allocator, item); - defer allocator.free(serialized); - - // Parse again to verify - var item2 = try parseItem(allocator, serialized); - defer deinitItem(&item2, allocator); - - try std.testing.expectEqual(@as(usize, 2), item2.count()); -} - -test "build rocksdb key with partition key only" { - const allocator = std.testing.allocator; - - const item_json = "{\"pk\":{\"S\":\"user123\"},\"data\":{\"S\":\"test\"}}"; - var item = try parseItem(allocator, item_json); - defer deinitItem(&item, allocator); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - - const key = try buildRocksDBKey(allocator, "Users", &key_schema, item); - defer allocator.free(key); - - try std.testing.expectEqualStrings("_data:Users:user123", key); -} - -test "build rocksdb key with partition and sort keys" { - const allocator = std.testing.allocator; - - const item_json = "{\"pk\":{\"S\":\"user123\"},\"sk\":{\"S\":\"metadata\"}}"; - var item = try parseItem(allocator, item_json); - defer deinitItem(&item, allocator); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - .{ .attribute_name = "sk", .key_type = .RANGE }, - }; - - const key = try buildRocksDBKey(allocator, "Items", &key_schema, item); - defer allocator.free(key); - - try std.testing.expectEqualStrings("_data:Items:user123:metadata", key); -} - - -================================================================================ -FILE: ./src/dynamodb/storage.zig -================================================================================ - -/// Storage engine mapping DynamoDB operations to RocksDB -const std = @import("std"); -const rocksdb = @import("../rocksdb.zig"); -const types = @import("types.zig"); -const json = @import("json.zig"); - -pub const StorageError = error{ - TableNotFound, - TableAlreadyExists, - ItemNotFound, - InvalidKey, - MissingKeyAttribute, - SerializationError, - RocksDBError, - OutOfMemory, -}; - -/// Key prefixes for different data types in RocksDB -const KeyPrefix = struct { - /// Table metadata: _meta:{table_name} - const meta = "_meta:"; - /// Item data: _data:{table_name}:{partition_key}[:{sort_key}] - const data = "_data:"; - /// Global secondary index: _gsi:{table_name}:{index_name}:{pk}:{sk} - const gsi = "_gsi:"; - /// Local secondary index: _lsi:{table_name}:{index_name}:{pk}:{sk} - const lsi = "_lsi:"; -}; - -/// In-memory representation of table metadata -const TableMetadata = struct { - table_name: []const u8, - key_schema: []types.KeySchemaElement, - attribute_definitions: []types.AttributeDefinition, - table_status: types.TableStatus, - creation_date_time: i64, - - pub fn deinit(self: *TableMetadata, allocator: std.mem.Allocator) void { - allocator.free(self.table_name); - for (self.key_schema) |ks| { - allocator.free(ks.attribute_name); - } - allocator.free(self.key_schema); - for (self.attribute_definitions) |ad| { - allocator.free(ad.attribute_name); - } - allocator.free(self.attribute_definitions); - } -}; - -pub const StorageEngine = struct { - db: rocksdb.DB, - allocator: std.mem.Allocator, - - const Self = @This(); - - pub fn init(allocator: std.mem.Allocator, data_dir: [*:0]const u8) !Self { - const db = rocksdb.DB.open(data_dir, true) catch return StorageError.RocksDBError; - return Self{ - .db = db, - .allocator = allocator, - }; - } - - pub fn deinit(self: *Self) void { - self.db.close(); - } - - // === Table Operations === - - pub fn createTable( - self: *Self, - table_name: []const u8, - key_schema: []const types.KeySchemaElement, - attribute_definitions: []const types.AttributeDefinition, - ) StorageError!types.TableDescription { - // Check if table already exists - const meta_key = try self.buildMetaKey(table_name); - defer self.allocator.free(meta_key); - - const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; - if (existing) |e| { - self.allocator.free(e); - return StorageError.TableAlreadyExists; - } - - // Create table metadata - const now = std.time.timestamp(); - - const metadata = TableMetadata{ - .table_name = table_name, - .key_schema = key_schema, - .attribute_definitions = attribute_definitions, - .table_status = .ACTIVE, - .creation_date_time = now, - }; - - // Serialize and store - const meta_value = try self.serializeTableMetadata(metadata); - defer self.allocator.free(meta_value); - - self.db.put(meta_key, meta_value) catch return StorageError.RocksDBError; - - return types.TableDescription{ - .table_name = table_name, - .key_schema = key_schema, - .attribute_definitions = attribute_definitions, - .table_status = .ACTIVE, - .creation_date_time = now, - .item_count = 0, - .table_size_bytes = 0, - }; - } - - pub fn deleteTable(self: *Self, table_name: []const u8) StorageError!void { - const meta_key = try self.buildMetaKey(table_name); - defer self.allocator.free(meta_key); - - // Verify table exists - const existing = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; - if (existing == null) return StorageError.TableNotFound; - self.allocator.free(existing.?); - - // Delete all items with this table's prefix - const data_prefix = try self.buildDataPrefix(table_name); - defer self.allocator.free(data_prefix); - - var batch = rocksdb.WriteBatch.init() orelse return StorageError.RocksDBError; - defer batch.deinit(); - - // Scan and delete all matching keys - var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; - defer iter.deinit(); - - iter.seek(data_prefix); - while (iter.valid()) { - const key = iter.key() orelse break; - if (!std.mem.startsWith(u8, key, data_prefix)) break; - batch.delete(key); - iter.next(); - } - - // Delete metadata - batch.delete(meta_key); - - batch.write(&self.db) catch return StorageError.RocksDBError; - } - - pub fn describeTable(self: *Self, table_name: []const u8) StorageError!types.TableDescription { - var metadata = try self.getTableMetadata(table_name); - defer metadata.deinit(self.allocator); - - // Count items (expensive, but matches DynamoDB behavior) - const data_prefix = try self.buildDataPrefix(table_name); - defer self.allocator.free(data_prefix); - - var item_count: u64 = 0; - var total_size: u64 = 0; - - var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; - defer iter.deinit(); - - iter.seek(data_prefix); - while (iter.valid()) { - const key = iter.key() orelse break; - if (!std.mem.startsWith(u8, key, data_prefix)) break; - - const value = iter.value() orelse break; - item_count += 1; - total_size += value.len; - - iter.next(); - } - - return types.TableDescription{ - .table_name = metadata.table_name, - .key_schema = metadata.key_schema, - .attribute_definitions = metadata.attribute_definitions, - .table_status = metadata.table_status, - .creation_date_time = metadata.creation_date_time, - .item_count = item_count, - .table_size_bytes = total_size, - }; - } - - pub fn listTables(self: *Self) StorageError![][]const u8 { - var tables = std.ArrayList([]const u8).init(self.allocator); - errdefer { - for (tables.items) |t| self.allocator.free(t); - tables.deinit(); - } - - var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; - defer iter.deinit(); - - iter.seek(KeyPrefix.meta); - while (iter.valid()) { - const key = iter.key() orelse break; - if (!std.mem.startsWith(u8, key, KeyPrefix.meta)) break; - - const table_name = key[KeyPrefix.meta.len..]; - const owned_name = self.allocator.dupe(u8, table_name) catch return StorageError.OutOfMemory; - tables.append(owned_name) catch return StorageError.OutOfMemory; - - iter.next(); - } - - return tables.toOwnedSlice() catch return StorageError.OutOfMemory; - } - - // === Item Operations === - - pub fn putItem(self: *Self, table_name: []const u8, item_json: []const u8) StorageError!void { - // Get table metadata to retrieve key schema - var metadata = try self.getTableMetadata(table_name); - defer metadata.deinit(self.allocator); - - // Parse the item to validate it - var item = json.parseItem(self.allocator, item_json) catch return StorageError.InvalidKey; - defer json.deinitItem(&item, self.allocator); - - // Validate that item contains all required key attributes - for (metadata.key_schema) |key_elem| { - if (!item.contains(key_elem.attribute_name)) { - return StorageError.MissingKeyAttribute; - } - } - - // Build storage key using the parsed item and actual key schema - const storage_key = json.buildRocksDBKey( - self.allocator, - table_name, - metadata.key_schema, - item, - ) catch return StorageError.InvalidKey; - defer self.allocator.free(storage_key); - - // Store the original JSON (for now - later we can optimize to binary) - self.db.put(storage_key, item_json) catch return StorageError.RocksDBError; - } - - pub fn getItem(self: *Self, table_name: []const u8, key_json: []const u8) StorageError!?[]u8 { - // Get table metadata - var metadata = try self.getTableMetadata(table_name); - defer metadata.deinit(self.allocator); - - // Parse the key - var key = json.parseItem(self.allocator, key_json) catch return StorageError.InvalidKey; - defer json.deinitItem(&key, self.allocator); - - // Validate key has all required attributes - for (metadata.key_schema) |key_elem| { - if (!key.contains(key_elem.attribute_name)) { - return StorageError.MissingKeyAttribute; - } - } - - // Build storage key - const storage_key = json.buildRocksDBKey( - self.allocator, - table_name, - metadata.key_schema, - key, - ) catch return StorageError.InvalidKey; - defer self.allocator.free(storage_key); - - return self.db.get(self.allocator, storage_key) catch return StorageError.RocksDBError; - } - - pub fn deleteItem(self: *Self, table_name: []const u8, key_json: []const u8) StorageError!void { - // Get table metadata - var metadata = try self.getTableMetadata(table_name); - defer metadata.deinit(self.allocator); - - // Parse the key - var key = json.parseItem(self.allocator, key_json) catch return StorageError.InvalidKey; - defer json.deinitItem(&key, self.allocator); - - // Validate key - for (metadata.key_schema) |key_elem| { - if (!key.contains(key_elem.attribute_name)) { - return StorageError.MissingKeyAttribute; - } - } - - // Build storage key - const storage_key = json.buildRocksDBKey( - self.allocator, - table_name, - metadata.key_schema, - key, - ) catch return StorageError.InvalidKey; - defer self.allocator.free(storage_key); - - self.db.delete(storage_key) catch return StorageError.RocksDBError; - } - - pub fn scan(self: *Self, table_name: []const u8, limit: ?usize) StorageError![][]const u8 { - // Verify table exists - var metadata = try self.getTableMetadata(table_name); - defer metadata.deinit(self.allocator); - - const data_prefix = try self.buildDataPrefix(table_name); - defer self.allocator.free(data_prefix); - - var items = std.ArrayList([]const u8).init(self.allocator); - errdefer { - for (items.items) |item| self.allocator.free(item); - items.deinit(); - } - - var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; - defer iter.deinit(); - - var count: usize = 0; - const max_items = limit orelse std.math.maxInt(usize); - - iter.seek(data_prefix); - while (iter.valid() and count < max_items) { - const key = iter.key() orelse break; - if (!std.mem.startsWith(u8, key, data_prefix)) break; - - const value = iter.value() orelse break; - const owned_value = self.allocator.dupe(u8, value) catch return StorageError.OutOfMemory; - items.append(owned_value) catch return StorageError.OutOfMemory; - - count += 1; - iter.next(); - } - - return items.toOwnedSlice() catch return StorageError.OutOfMemory; - } - - pub fn query(self: *Self, table_name: []const u8, partition_key_value: []const u8, limit: ?usize) StorageError![][]const u8 { - // Verify table exists - var metadata = try self.getTableMetadata(table_name); - defer metadata.deinit(self.allocator); - - // Build prefix for this partition - const prefix = try self.buildPartitionPrefix(table_name, partition_key_value); - defer self.allocator.free(prefix); - - var items = std.ArrayList([]const u8).init(self.allocator); - errdefer { - for (items.items) |item| self.allocator.free(item); - items.deinit(); - } - - var iter = rocksdb.Iterator.init(&self.db) orelse return StorageError.RocksDBError; - defer iter.deinit(); - - var count: usize = 0; - const max_items = limit orelse std.math.maxInt(usize); - - iter.seek(prefix); - while (iter.valid() and count < max_items) { - const key = iter.key() orelse break; - if (!std.mem.startsWith(u8, key, prefix)) break; - - const value = iter.value() orelse break; - const owned_value = self.allocator.dupe(u8, value) catch return StorageError.OutOfMemory; - items.append(owned_value) catch return StorageError.OutOfMemory; - - count += 1; - iter.next(); - } - - return items.toOwnedSlice() catch return StorageError.OutOfMemory; - } - - // === Internal Helpers === - - fn getTableMetadata(self: *Self, table_name: []const u8) StorageError!TableMetadata { - const meta_key = try self.buildMetaKey(table_name); - defer self.allocator.free(meta_key); - - const meta_value = self.db.get(self.allocator, meta_key) catch return StorageError.RocksDBError; - if (meta_value == null) return StorageError.TableNotFound; - defer self.allocator.free(meta_value.?); - - return self.deserializeTableMetadata(meta_value.?); - } - - fn buildMetaKey(self: *Self, table_name: []const u8) StorageError![]u8 { - return std.fmt.allocPrint(self.allocator, "{s}{s}", .{ KeyPrefix.meta, table_name }) catch return StorageError.OutOfMemory; - } - - fn buildDataPrefix(self: *Self, table_name: []const u8) StorageError![]u8 { - return std.fmt.allocPrint(self.allocator, "{s}{s}:", .{ KeyPrefix.data, table_name }) catch return StorageError.OutOfMemory; - } - - fn buildPartitionPrefix(self: *Self, table_name: []const u8, partition_key: []const u8) StorageError![]u8 { - return std.fmt.allocPrint(self.allocator, "{s}{s}:{s}", .{ KeyPrefix.data, table_name, partition_key }) catch return StorageError.OutOfMemory; - } - - // === Serialization === - - fn serializeTableMetadata(self: *Self, metadata: TableMetadata) StorageError![]u8 { - var buf = std.ArrayList(u8).init(self.allocator); - errdefer buf.deinit(); - const writer = buf.writer(); - - writer.writeAll("{\"TableName\":\"") catch return StorageError.SerializationError; - writer.writeAll(metadata.table_name) catch return StorageError.SerializationError; - writer.writeAll("\",\"TableStatus\":\"") catch return StorageError.SerializationError; - writer.writeAll(metadata.table_status.toString()) catch return StorageError.SerializationError; - writer.print("\",\"CreationDateTime\":{d},\"KeySchema\":[", .{metadata.creation_date_time}) catch return StorageError.SerializationError; - - for (metadata.key_schema, 0..) |ks, i| { - if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError; - writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError; - writer.writeAll(ks.attribute_name) catch return StorageError.SerializationError; - writer.writeAll("\",\"KeyType\":\"") catch return StorageError.SerializationError; - writer.writeAll(ks.key_type.toString()) catch return StorageError.SerializationError; - writer.writeAll("\"}") catch return StorageError.SerializationError; - } - - writer.writeAll("],\"AttributeDefinitions\":[") catch return StorageError.SerializationError; - - for (metadata.attribute_definitions, 0..) |ad, i| { - if (i > 0) writer.writeByte(',') catch return StorageError.SerializationError; - writer.writeAll("{\"AttributeName\":\"") catch return StorageError.SerializationError; - writer.writeAll(ad.attribute_name) catch return StorageError.SerializationError; - writer.writeAll("\",\"AttributeType\":\"") catch return StorageError.SerializationError; - writer.writeAll(ad.attribute_type.toString()) catch return StorageError.SerializationError; - writer.writeAll("\"}") catch return StorageError.SerializationError; - } - - writer.writeAll("]}") catch return StorageError.SerializationError; - - return buf.toOwnedSlice() catch return StorageError.OutOfMemory; - } - - fn deserializeTableMetadata(self: *Self, data: []const u8) StorageError!TableMetadata { - const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, data, .{}) catch return StorageError.SerializationError; - defer parsed.deinit(); - - const root = switch (parsed.value) { - .object => |o| o, - else => return StorageError.SerializationError, - }; - - // Extract table name - const table_name_val = root.get("TableName") orelse return StorageError.SerializationError; - const table_name_str = switch (table_name_val) { - .string => |s| s, - else => return StorageError.SerializationError, - }; - const table_name = self.allocator.dupe(u8, table_name_str) catch return StorageError.OutOfMemory; - errdefer self.allocator.free(table_name); - - // Extract table status - const status_val = root.get("TableStatus") orelse return StorageError.SerializationError; - const status_str = switch (status_val) { - .string => |s| s, - else => return StorageError.SerializationError, - }; - const table_status: types.TableStatus = if (std.mem.eql(u8, status_str, "ACTIVE")) - .ACTIVE - else if (std.mem.eql(u8, status_str, "CREATING")) - .CREATING - else if (std.mem.eql(u8, status_str, "DELETING")) - .DELETING - else - .ACTIVE; - - // Extract creation time - const creation_val = root.get("CreationDateTime") orelse return StorageError.SerializationError; - const creation_date_time = switch (creation_val) { - .integer => |i| i, - else => return StorageError.SerializationError, - }; - - // Extract key schema - const key_schema_val = root.get("KeySchema") orelse return StorageError.SerializationError; - const key_schema_array = switch (key_schema_val) { - .array => |a| a, - else => return StorageError.SerializationError, - }; - - var key_schema = std.ArrayList(types.KeySchemaElement).init(self.allocator); - errdefer { - for (key_schema.items) |ks| self.allocator.free(ks.attribute_name); - key_schema.deinit(); - } - - for (key_schema_array.items) |item| { - const obj = switch (item) { - .object => |o| o, - else => return StorageError.SerializationError, - }; - - const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError; - const attr_name_str = switch (attr_name_val) { - .string => |s| s, - else => return StorageError.SerializationError, - }; - const attr_name = self.allocator.dupe(u8, attr_name_str) catch return StorageError.OutOfMemory; - errdefer self.allocator.free(attr_name); - - const key_type_val = obj.get("KeyType") orelse return StorageError.SerializationError; - const key_type_str = switch (key_type_val) { - .string => |s| s, - else => return StorageError.SerializationError, - }; - const key_type = types.KeyType.fromString(key_type_str) orelse return StorageError.SerializationError; - - key_schema.append(.{ - .attribute_name = attr_name, - .key_type = key_type, - }) catch return StorageError.OutOfMemory; - } - - // Extract attribute definitions - const attr_defs_val = root.get("AttributeDefinitions") orelse return StorageError.SerializationError; - const attr_defs_array = switch (attr_defs_val) { - .array => |a| a, - else => return StorageError.SerializationError, - }; - - var attr_defs = std.ArrayList(types.AttributeDefinition).init(self.allocator); - errdefer { - for (attr_defs.items) |ad| self.allocator.free(ad.attribute_name); - attr_defs.deinit(); - } - - for (attr_defs_array.items) |item| { - const obj = switch (item) { - .object => |o| o, - else => return StorageError.SerializationError, - }; - - const attr_name_val = obj.get("AttributeName") orelse return StorageError.SerializationError; - const attr_name_str = switch (attr_name_val) { - .string => |s| s, - else => return StorageError.SerializationError, - }; - const attr_name = self.allocator.dupe(u8, attr_name_str) catch return StorageError.OutOfMemory; - errdefer self.allocator.free(attr_name); - - const attr_type_val = obj.get("AttributeType") orelse return StorageError.SerializationError; - const attr_type_str = switch (attr_type_val) { - .string => |s| s, - else => return StorageError.SerializationError, - }; - const attr_type = types.ScalarAttributeType.fromString(attr_type_str) orelse return StorageError.SerializationError; - - attr_defs.append(.{ - .attribute_name = attr_name, - .attribute_type = attr_type, - }) catch return StorageError.OutOfMemory; - } - - return TableMetadata{ - .table_name = table_name, - .key_schema = key_schema.toOwnedSlice() catch return StorageError.OutOfMemory, - .attribute_definitions = attr_defs.toOwnedSlice() catch return StorageError.OutOfMemory, - .table_status = table_status, - .creation_date_time = creation_date_time, - }; - } -}; - -test "storage basic operations" { - const allocator = std.testing.allocator; - - const path = "/tmp/test_storage"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try StorageEngine.init(allocator, path); - defer engine.deinit(); - - // Create table - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - _ = try engine.createTable("TestTable", &key_schema, &attr_defs); - - // List tables - const tables = try engine.listTables(); - defer { - for (tables) |t| allocator.free(t); - allocator.free(tables); - } - try std.testing.expectEqual(@as(usize, 1), tables.len); - try std.testing.expectEqualStrings("TestTable", tables[0]); - - // Delete table - try engine.deleteTable("TestTable"); - - // Verify deleted - const tables2 = try engine.listTables(); - defer allocator.free(tables2); - try std.testing.expectEqual(@as(usize, 0), tables2.len); -} - -test "putItem validates key presence" { - const allocator = std.testing.allocator; - - const path = "/tmp/test_storage_validate"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try StorageEngine.init(allocator, path); - defer engine.deinit(); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "userId", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "userId", .attribute_type = .S }, - }; - - _ = try engine.createTable("Users", &key_schema, &attr_defs); - - // This should fail - missing userId - const bad_item = "{\"name\":{\"S\":\"Alice\"}}"; - const result = engine.putItem("Users", bad_item); - try std.testing.expectError(StorageError.MissingKeyAttribute, result); - - // This should succeed - const good_item = "{\"userId\":{\"S\":\"user123\"},\"name\":{\"S\":\"Alice\"}}"; - try engine.putItem("Users", good_item); -} - - -================================================================================ -FILE: ./src/dynamodb/types.zig -================================================================================ - -/// DynamoDB protocol types and serialization -const std = @import("std"); - -/// DynamoDB AttributeValue - the core data type -pub const AttributeValue = union(enum) { - S: []const u8, // String - N: []const u8, // Number (stored as string) - B: []const u8, // Binary (base64) - SS: []const []const u8, // String Set - NS: []const []const u8, // Number Set - BS: []const []const u8, // Binary Set - M: std.StringHashMap(AttributeValue), // Map - L: []const AttributeValue, // List - NULL: bool, // Null - BOOL: bool, // Boolean -}; - -pub const Item = std.StringHashMap(AttributeValue); - -pub const KeyType = enum { - HASH, - RANGE, - - pub fn toString(self: KeyType) []const u8 { - return switch (self) { - .HASH => "HASH", - .RANGE => "RANGE", - }; - } - - pub fn fromString(s: []const u8) ?KeyType { - if (std.mem.eql(u8, s, "HASH")) return .HASH; - if (std.mem.eql(u8, s, "RANGE")) return .RANGE; - return null; - } -}; - -pub const ScalarAttributeType = enum { - S, - N, - B, - - pub fn toString(self: ScalarAttributeType) []const u8 { - return switch (self) { - .S => "S", - .N => "N", - .B => "B", - }; - } - - pub fn fromString(s: []const u8) ?ScalarAttributeType { - if (std.mem.eql(u8, s, "S")) return .S; - if (std.mem.eql(u8, s, "N")) return .N; - if (std.mem.eql(u8, s, "B")) return .B; - return null; - } -}; - -pub const KeySchemaElement = struct { - attribute_name: []const u8, - key_type: KeyType, -}; - -pub const AttributeDefinition = struct { - attribute_name: []const u8, - attribute_type: ScalarAttributeType, -}; - -pub const TableStatus = enum { - CREATING, - UPDATING, - DELETING, - ACTIVE, - INACCESSIBLE_ENCRYPTION_CREDENTIALS, - ARCHIVING, - ARCHIVED, - - pub fn toString(self: TableStatus) []const u8 { - return switch (self) { - .CREATING => "CREATING", - .UPDATING => "UPDATING", - .DELETING => "DELETING", - .ACTIVE => "ACTIVE", - .INACCESSIBLE_ENCRYPTION_CREDENTIALS => "INACCESSIBLE_ENCRYPTION_CREDENTIALS", - .ARCHIVING => "ARCHIVING", - .ARCHIVED => "ARCHIVED", - }; - } -}; - -pub const TableDescription = struct { - table_name: []const u8, - key_schema: []const KeySchemaElement, - attribute_definitions: []const AttributeDefinition, - table_status: TableStatus, - creation_date_time: i64, - item_count: u64, - table_size_bytes: u64, -}; - -/// DynamoDB operation types parsed from X-Amz-Target header -pub const Operation = enum { - CreateTable, - DeleteTable, - DescribeTable, - ListTables, - UpdateTable, - PutItem, - GetItem, - DeleteItem, - UpdateItem, - Query, - Scan, - BatchGetItem, - BatchWriteItem, - TransactGetItems, - TransactWriteItems, - Unknown, - - pub fn fromTarget(target: []const u8) Operation { - // Format: DynamoDB_20120810.OperationName - const prefix = "DynamoDB_20120810."; - if (!std.mem.startsWith(u8, target, prefix)) return .Unknown; - - const op_name = target[prefix.len..]; - const map = std.StaticStringMap(Operation).initComptime(.{ - .{ "CreateTable", .CreateTable }, - .{ "DeleteTable", .DeleteTable }, - .{ "DescribeTable", .DescribeTable }, - .{ "ListTables", .ListTables }, - .{ "UpdateTable", .UpdateTable }, - .{ "PutItem", .PutItem }, - .{ "GetItem", .GetItem }, - .{ "DeleteItem", .DeleteItem }, - .{ "UpdateItem", .UpdateItem }, - .{ "Query", .Query }, - .{ "Scan", .Scan }, - .{ "BatchGetItem", .BatchGetItem }, - .{ "BatchWriteItem", .BatchWriteItem }, - .{ "TransactGetItems", .TransactGetItems }, - .{ "TransactWriteItems", .TransactWriteItems }, - }); - return map.get(op_name) orelse .Unknown; - } -}; - -/// DynamoDB error types -pub const DynamoDBErrorType = enum { - ValidationException, - ResourceNotFoundException, - ResourceInUseException, - ConditionalCheckFailedException, - ProvisionedThroughputExceededException, - ItemCollectionSizeLimitExceededException, - InternalServerError, - SerializationException, - - pub fn toErrorResponse(self: DynamoDBErrorType, message: []const u8, allocator: std.mem.Allocator) ![]u8 { - const type_str = switch (self) { - .ValidationException => "com.amazonaws.dynamodb.v20120810#ValidationException", - .ResourceNotFoundException => "com.amazonaws.dynamodb.v20120810#ResourceNotFoundException", - .ResourceInUseException => "com.amazonaws.dynamodb.v20120810#ResourceInUseException", - .ConditionalCheckFailedException => "com.amazonaws.dynamodb.v20120810#ConditionalCheckFailedException", - .ProvisionedThroughputExceededException => "com.amazonaws.dynamodb.v20120810#ProvisionedThroughputExceededException", - .ItemCollectionSizeLimitExceededException => "com.amazonaws.dynamodb.v20120810#ItemCollectionSizeLimitExceededException", - .InternalServerError => "com.amazonaws.dynamodb.v20120810#InternalServerError", - .SerializationException => "com.amazonaws.dynamodb.v20120810#SerializationException", - }; - - return std.fmt.allocPrint(allocator, "{{\"__type\":\"{s}\",\"message\":\"{s}\"}}", .{ type_str, message }); - } -}; - -// JSON serialization helpers for DynamoDB format -pub const json = struct { - /// Serialize AttributeValue to DynamoDB JSON format - pub fn serializeAttributeValue(writer: anytype, value: AttributeValue) !void { - switch (value) { - .S => |s| try writer.print("{{\"S\":\"{s}\"}}", .{s}), - .N => |n| try writer.print("{{\"N\":\"{s}\"}}", .{n}), - .B => |b| try writer.print("{{\"B\":\"{s}\"}}", .{b}), - .BOOL => |b| try writer.print("{{\"BOOL\":{}}}", .{b}), - .NULL => try writer.writeAll("{\"NULL\":true}"), - .SS => |ss| { - try writer.writeAll("{\"SS\":["); - for (ss, 0..) |s, i| { - if (i > 0) try writer.writeByte(','); - try writer.print("\"{s}\"", .{s}); - } - try writer.writeAll("]}"); - }, - .NS => |ns| { - try writer.writeAll("{\"NS\":["); - for (ns, 0..) |n, i| { - if (i > 0) try writer.writeByte(','); - try writer.print("\"{s}\"", .{n}); - } - try writer.writeAll("]}"); - }, - .BS => |bs| { - try writer.writeAll("{\"BS\":["); - for (bs, 0..) |b, i| { - if (i > 0) try writer.writeByte(','); - try writer.print("\"{s}\"", .{b}); - } - try writer.writeAll("]}"); - }, - .L => |list| { - try writer.writeAll("{\"L\":["); - for (list, 0..) |item, i| { - if (i > 0) try writer.writeByte(','); - try serializeAttributeValue(writer, item); - } - try writer.writeAll("]}"); - }, - .M => |map| { - try writer.writeAll("{\"M\":{"); - var first = true; - var iter = map.iterator(); - while (iter.next()) |entry| { - if (!first) try writer.writeByte(','); - first = false; - try writer.print("\"{s}\":", .{entry.key_ptr.*}); - try serializeAttributeValue(writer, entry.value_ptr.*); - } - try writer.writeAll("}}"); - }, - } - } - - /// Serialize an Item (map of attribute name to AttributeValue) - pub fn serializeItem(writer: anytype, item: Item) !void { - try writer.writeByte('{'); - var first = true; - var iter = item.iterator(); - while (iter.next()) |entry| { - if (!first) try writer.writeByte(','); - first = false; - try writer.print("\"{s}\":", .{entry.key_ptr.*}); - try serializeAttributeValue(writer, entry.value_ptr.*); - } - try writer.writeByte('}'); - } -}; - -test "operation from target" { - try std.testing.expectEqual(Operation.CreateTable, Operation.fromTarget("DynamoDB_20120810.CreateTable")); - try std.testing.expectEqual(Operation.PutItem, Operation.fromTarget("DynamoDB_20120810.PutItem")); - try std.testing.expectEqual(Operation.Unknown, Operation.fromTarget("Invalid")); -} - - -================================================================================ -FILE: ./src/http.zig -================================================================================ - -/// Simple HTTP server for DynamoDB API -const std = @import("std"); -const net = std.net; -const mem = std.mem; - -pub const Method = enum { - GET, - POST, - PUT, - DELETE, - OPTIONS, - HEAD, - PATCH, - - pub fn fromString(s: []const u8) ?Method { - const map = std.StaticStringMap(Method).initComptime(.{ - .{ "GET", .GET }, - .{ "POST", .POST }, - .{ "PUT", .PUT }, - .{ "DELETE", .DELETE }, - .{ "OPTIONS", .OPTIONS }, - .{ "HEAD", .HEAD }, - .{ "PATCH", .PATCH }, - }); - return map.get(s); - } -}; - -pub const StatusCode = enum(u16) { - ok = 200, - created = 201, - no_content = 204, - bad_request = 400, - unauthorized = 401, - forbidden = 403, - not_found = 404, - method_not_allowed = 405, - conflict = 409, - internal_server_error = 500, - service_unavailable = 503, - - pub fn phrase(self: StatusCode) []const u8 { - return switch (self) { - .ok => "OK", - .created => "Created", - .no_content => "No Content", - .bad_request => "Bad Request", - .unauthorized => "Unauthorized", - .forbidden => "Forbidden", - .not_found => "Not Found", - .method_not_allowed => "Method Not Allowed", - .conflict => "Conflict", - .internal_server_error => "Internal Server Error", - .service_unavailable => "Service Unavailable", - }; - } -}; - -pub const Header = struct { - name: []const u8, - value: []const u8, -}; - -pub const Request = struct { - method: Method, - path: []const u8, - headers: []const Header, - body: []const u8, - raw_data: []const u8, - - pub fn getHeader(self: *const Request, name: []const u8) ?[]const u8 { - for (self.headers) |h| { - if (std.ascii.eqlIgnoreCase(h.name, name)) { - return h.value; - } - } - return null; - } -}; - -pub const Response = struct { - status: StatusCode, - headers: std.ArrayList(Header), - body: std.ArrayList(u8), - allocator: mem.Allocator, - - pub fn init(allocator: mem.Allocator) Response { - return .{ - .status = .ok, - .headers = std.ArrayList(Header){}, - .body = std.ArrayList(u8){}, - .allocator = allocator, - }; - } - - pub fn deinit(self: *Response) void { - self.headers.deinit(self.allocator); - self.body.deinit(self.allocator); - } - - pub fn setStatus(self: *Response, status: StatusCode) void { - self.status = status; - } - - pub fn addHeader(self: *Response, name: []const u8, value: []const u8) !void { - try self.headers.append(self.allocator, .{ .name = name, .value = value }); - } - - pub fn setBody(self: *Response, data: []const u8) !void { - self.body.clearRetainingCapacity(); - try self.body.appendSlice(self.allocator, data); - } - - pub fn appendBody(self: *Response, data: []const u8) !void { - try self.body.appendSlice(self.allocator, data); - } - - pub fn serialize(self: *Response, allocator: mem.Allocator) ![]u8 { - var buf = std.ArrayList(u8){}; - errdefer buf.deinit(allocator); - const writer = buf.writer(allocator); - - // Status line - try writer.print("HTTP/1.1 {d} {s}\r\n", .{ @intFromEnum(self.status), self.status.phrase() }); - - // Content-Length header - try writer.print("Content-Length: {d}\r\n", .{self.body.items.len}); - - // Custom headers - for (self.headers.items) |h| { - try writer.print("{s}: {s}\r\n", .{ h.name, h.value }); - } - - // End of headers - try writer.writeAll("\r\n"); - - // Body - try writer.writeAll(self.body.items); - - return buf.toOwnedSlice(allocator); - } -}; - -pub const RequestHandler = *const fn (*const Request, mem.Allocator) Response; - -pub const Server = struct { - allocator: mem.Allocator, - address: net.Address, - handler: RequestHandler, - running: std.atomic.Value(bool), - listener: ?net.Server, - - const Self = @This(); - - pub fn init(allocator: mem.Allocator, host: []const u8, port: u16, handler: RequestHandler) !Self { - const address = try net.Address.parseIp(host, port); - return Self{ - .allocator = allocator, - .address = address, - .handler = handler, - .running = std.atomic.Value(bool).init(false), - .listener = null, - }; - } - - pub fn start(self: *Self) !void { - self.listener = try self.address.listen(.{ - .reuse_address = true, - }); - self.running.store(true, .release); - - std.log.info("Server listening on {any}", .{self.address}); - - while (self.running.load(.acquire)) { - const conn = self.listener.?.accept() catch |err| { - if (err == error.SocketNotListening) break; - std.log.err("Accept error: {any}", .{err}); - continue; - }; - - // Spawn thread for each connection - const thread = std.Thread.spawn(.{}, handleConnection, .{ self, conn }) catch |err| { - std.log.err("Thread spawn error: {any}", .{err}); - conn.stream.close(); - continue; - }; - thread.detach(); - } - } - - fn handleConnection(self: *Self, conn: net.Server.Connection) void { - defer conn.stream.close(); - - var buf: [65536]u8 = undefined; - var total_read: usize = 0; - - // Read request - while (total_read < buf.len) { - const n = conn.stream.read(buf[total_read..]) catch |err| { - std.log.err("Read error: {any}", .{err}); - return; - }; - if (n == 0) break; - total_read += n; - - // Check if we have complete headers - if (mem.indexOf(u8, buf[0..total_read], "\r\n\r\n")) |header_end| { - // Parse Content-Length if present - const headers = buf[0..header_end]; - var content_length: usize = 0; - - var lines = mem.splitSequence(u8, headers, "\r\n"); - while (lines.next()) |line| { - if (std.ascii.startsWithIgnoreCase(line, "content-length:")) { - const val = mem.trim(u8, line["content-length:".len..], " "); - content_length = std.fmt.parseInt(usize, val, 10) catch 0; - break; - } - } - - const body_start = header_end + 4; - const body_received = total_read - body_start; - - if (body_received >= content_length) break; - } - } - - if (total_read == 0) return; - - // Parse and handle request - const request = parseRequest(self.allocator, buf[0..total_read]) catch |err| { - std.log.err("Parse error: {any}", .{err}); - const error_response = "HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n"; - _ = conn.stream.write(error_response) catch {}; - return; - }; - defer self.allocator.free(request.headers); - - var response = self.handler(&request, self.allocator); - defer response.deinit(); - - const response_data = response.serialize(self.allocator) catch |err| { - std.log.err("Serialize error: {any}", .{err}); - return; - }; - defer self.allocator.free(response_data); - - _ = conn.stream.write(response_data) catch |err| { - std.log.err("Write error: {any}", .{err}); - }; - } - - pub fn stop(self: *Self) void { - self.running.store(false, .release); - if (self.listener) |*l| { - l.deinit(); - self.listener = null; - } - } -}; - -fn parseRequest(allocator: mem.Allocator, data: []const u8) !Request { - // Find end of headers - const header_end = mem.indexOf(u8, data, "\r\n\r\n") orelse return error.InvalidRequest; - - // Parse request line - var lines = mem.splitSequence(u8, data[0..header_end], "\r\n"); - const request_line = lines.next() orelse return error.InvalidRequest; - - var parts = mem.splitScalar(u8, request_line, ' '); - const method_str = parts.next() orelse return error.InvalidRequest; - const path = parts.next() orelse return error.InvalidRequest; - - const method = Method.fromString(method_str) orelse return error.InvalidMethod; - - // Parse headers - var headers = std.ArrayList(Header){}; - errdefer headers.deinit(allocator); - - while (lines.next()) |line| { - if (line.len == 0) break; - const colon = mem.indexOf(u8, line, ":") orelse continue; - const name = mem.trim(u8, line[0..colon], " "); - const value = mem.trim(u8, line[colon + 1 ..], " "); - try headers.append(allocator, .{ .name = name, .value = value }); - } - - // Body is after \r\n\r\n - const body_start = header_end + 4; - const body = if (body_start < data.len) data[body_start..] else ""; - - return Request{ - .method = method, - .path = path, - .headers = try headers.toOwnedSlice(allocator), - .body = body, - .raw_data = data, - }; -} - -// Tests -test "parse simple request" { - const allocator = std.testing.allocator; - const raw = "GET /health HTTP/1.1\r\nHost: localhost\r\n\r\n"; - - const req = try parseRequest(allocator, raw); - defer allocator.free(req.headers); - - try std.testing.expectEqual(Method.GET, req.method); - try std.testing.expectEqualStrings("/health", req.path); -} - -test "parse request with body" { - const allocator = std.testing.allocator; - const raw = "POST /items HTTP/1.1\r\nHost: localhost\r\nContent-Length: 13\r\n\r\n{\"key\":\"val\"}"; - - const req = try parseRequest(allocator, raw); - defer allocator.free(req.headers); - - try std.testing.expectEqual(Method.POST, req.method); - try std.testing.expectEqualStrings("{\"key\":\"val\"}", req.body); -} - - -================================================================================ -FILE: ./src/main.zig -================================================================================ - -/// ZynamoDB - A DynamoDB-compatible database using RocksDB -const std = @import("std"); -const http = @import("http.zig"); -const rocksdb = @import("rocksdb.zig"); -const storage = @import("dynamodb/storage.zig"); -const handler = @import("dynamodb/handler.zig"); - -const Config = struct { - host: []const u8 = "0.0.0.0", - port: u16 = 8000, - data_dir: [:0]const u8 = "./data", - verbose: bool = false, -}; - -pub fn main() !void { - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; - defer _ = gpa.deinit(); - const allocator = gpa.allocator(); - - // Parse command line args - const config = try parseArgs(allocator); - - // Print banner - printBanner(config); - - // Ensure data directory exists - std.fs.cwd().makePath(config.data_dir) catch |err| { - std.log.err("Failed to create data directory: {any}", .{err}); - return; - }; - - // Initialize storage engine - var engine = storage.StorageEngine.init(allocator, config.data_dir) catch |err| { - std.log.err("Failed to initialize storage: {any}", .{err}); - return; - }; - defer engine.deinit(); - - std.log.info("Storage engine initialized at {s}", .{config.data_dir}); - - // Initialize API handler - var api_handler = handler.ApiHandler.init(allocator, &engine); - handler.setGlobalHandler(&api_handler); - - // Start HTTP server - var server = try http.Server.init(allocator, config.host, config.port, handler.httpHandler); - defer server.stop(); - - std.log.info("Starting DynamoDB-compatible server on {s}:{d}", .{ config.host, config.port }); - std.log.info("Ready to accept connections!", .{}); - - try server.start(); -} - -fn parseArgs(allocator: std.mem.Allocator) !Config { - var config = Config{}; - var args = try std.process.argsWithAllocator(allocator); - defer args.deinit(); - - // Skip program name - _ = args.next(); - - while (args.next()) |arg| { - if (std.mem.eql(u8, arg, "--port") or std.mem.eql(u8, arg, "-p")) { - if (args.next()) |port_str| { - config.port = std.fmt.parseInt(u16, port_str, 10) catch 8000; - } - } else if (std.mem.eql(u8, arg, "--host") or std.mem.eql(u8, arg, "-h")) { - if (args.next()) |host| { - config.host = host; - } - } else if (std.mem.eql(u8, arg, "--data-dir") or std.mem.eql(u8, arg, "-d")) { - if (args.next()) |dir| { - // Need sentinel-terminated string for RocksDB - const owned = try allocator.dupeZ(u8, dir); - config.data_dir = owned; - } - } else if (std.mem.eql(u8, arg, "--verbose") or std.mem.eql(u8, arg, "-v")) { - config.verbose = true; - } else if (std.mem.eql(u8, arg, "--help")) { - printHelp(); - std.process.exit(0); - } - } - - // Check environment variables - if (std.posix.getenv("DYNAMODB_PORT")) |port_str| { - config.port = std.fmt.parseInt(u16, port_str, 10) catch config.port; - } - if (std.posix.getenv("ROCKSDB_DATA_DIR")) |dir| { - config.data_dir = std.mem.span(@as([*:0]const u8, @ptrCast(dir.ptr))); - } - - return config; -} - -fn printHelp() void { - const help = - \\ZynamoDB - DynamoDB-compatible database - \\ - \\Usage: zynamodb [OPTIONS] - \\ - \\Options: - \\ -p, --port Port to listen on (default: 8000) - \\ -h, --host Host to bind to (default: 0.0.0.0) - \\ -d, --data-dir Data directory (default: ./data) - \\ -v, --verbose Enable verbose logging - \\ --help Show this help message - \\ - \\Environment Variables: - \\ DYNAMODB_PORT Override port - \\ ROCKSDB_DATA_DIR Override data directory - \\ - \\Examples: - \\ zynamodb # Start with defaults - \\ zynamodb -p 8080 -d /var/lib/db # Custom port and data dir - \\ - ; - std.debug.print("{s}", .{help}); -} - -fn printBanner(config: Config) void { - const banner = - \\ - \\ ╔═══════════════════════════════════════════════╗ - \\ ║ ║ - \\ ║ ███████╗██╗ ██╗███╗ ██╗ █████╗ ║ - \\ ║ ╚══███╔╝╚██╗ ██╔╝████╗ ██║██╔══██╗ ║ - \\ ║ ███╔╝ ╚████╔╝ ██╔██╗ ██║███████║ ║ - \\ ║ ███╔╝ ╚██╔╝ ██║╚██╗██║██╔══██║ ║ - \\ ║ ███████╗ ██║ ██║ ╚████║██║ ██║ ║ - \\ ║ ╚══════╝ ╚═╝ ╚═╝ ╚═══╝╚═╝ ╚═╝ ║ - \\ ║ ║ - \\ ║ DynamoDB-Compatible Database ║ - \\ ║ Powered by RocksDB + Zig ║ - \\ ║ ║ - \\ ╚═══════════════════════════════════════════════╝ - \\ - ; - std.debug.print("{s}", .{banner}); - std.debug.print(" Port: {d} | Data Dir: {s}\n\n", .{ config.port, config.data_dir }); -} - -// Re-export modules for testing -pub const _rocksdb = rocksdb; -pub const _http = http; -pub const _storage = storage; -pub const _handler = handler; - -test { - std.testing.refAllDeclsRecursive(@This()); -} - - -================================================================================ -FILE: ./src/rocksdb.zig -================================================================================ - -/// RocksDB bindings for Zig via the C API -const std = @import("std"); - -pub const c = @cImport({ - @cInclude("rocksdb/c.h"); -}); - -pub const RocksDBError = error{ - OpenFailed, - WriteFailed, - ReadFailed, - DeleteFailed, - InvalidArgument, - Corruption, - NotFound, - IOError, - Unknown, -}; - -pub const DB = struct { - handle: *c.rocksdb_t, - options: *c.rocksdb_options_t, - write_options: *c.rocksdb_writeoptions_t, - read_options: *c.rocksdb_readoptions_t, - - const Self = @This(); - - pub fn open(path: [*:0]const u8, create_if_missing: bool) RocksDBError!Self { - const options = c.rocksdb_options_create() orelse return RocksDBError.Unknown; - c.rocksdb_options_set_create_if_missing(options, if (create_if_missing) 1 else 0); - - // Performance options - c.rocksdb_options_increase_parallelism(options, @as(c_int, @intCast(std.Thread.getCpuCount() catch 4))); - c.rocksdb_options_optimize_level_style_compaction(options, 512 * 1024 * 1024); // 512MB - c.rocksdb_options_set_compression(options, c.rocksdb_lz4_compression); - - var err: [*c]u8 = null; - const db = c.rocksdb_open(options, path, &err); - if (err != null) { - defer c.rocksdb_free(err); - return RocksDBError.OpenFailed; - } - - const write_options = c.rocksdb_writeoptions_create() orelse { - c.rocksdb_close(db); - c.rocksdb_options_destroy(options); - return RocksDBError.Unknown; - }; - - const read_options = c.rocksdb_readoptions_create() orelse { - c.rocksdb_writeoptions_destroy(write_options); - c.rocksdb_close(db); - c.rocksdb_options_destroy(options); - return RocksDBError.Unknown; - }; - - return Self{ - .handle = db.?, - .options = options, - .write_options = write_options, - .read_options = read_options, - }; - } - - pub fn close(self: *Self) void { - c.rocksdb_readoptions_destroy(self.read_options); - c.rocksdb_writeoptions_destroy(self.write_options); - c.rocksdb_close(self.handle); - c.rocksdb_options_destroy(self.options); - } - - pub fn put(self: *Self, key: []const u8, value: []const u8) RocksDBError!void { - var err: [*c]u8 = null; - c.rocksdb_put( - self.handle, - self.write_options, - key.ptr, - key.len, - value.ptr, - value.len, - &err, - ); - if (err != null) { - defer c.rocksdb_free(err); - return RocksDBError.WriteFailed; - } - } - - pub fn get(self: *Self, allocator: std.mem.Allocator, key: []const u8) RocksDBError!?[]u8 { - var err: [*c]u8 = null; - var value_len: usize = 0; - const value = c.rocksdb_get( - self.handle, - self.read_options, - key.ptr, - key.len, - &value_len, - &err, - ); - if (err != null) { - defer c.rocksdb_free(err); - return RocksDBError.ReadFailed; - } - if (value == null) { - return null; - } - defer c.rocksdb_free(value); - - const result = allocator.alloc(u8, value_len) catch return RocksDBError.Unknown; - @memcpy(result, value[0..value_len]); - return result; - } - - pub fn delete(self: *Self, key: []const u8) RocksDBError!void { - var err: [*c]u8 = null; - c.rocksdb_delete( - self.handle, - self.write_options, - key.ptr, - key.len, - &err, - ); - if (err != null) { - defer c.rocksdb_free(err); - return RocksDBError.DeleteFailed; - } - } - - pub fn flush(self: *Self) RocksDBError!void { - const flush_opts = c.rocksdb_flushoptions_create() orelse return RocksDBError.Unknown; - defer c.rocksdb_flushoptions_destroy(flush_opts); - - var err: [*c]u8 = null; - c.rocksdb_flush(self.handle, flush_opts, &err); - if (err != null) { - defer c.rocksdb_free(err); - return RocksDBError.IOError; - } - } -}; - -pub const WriteBatch = struct { - handle: *c.rocksdb_writebatch_t, - - const Self = @This(); - - pub fn init() ?Self { - const handle = c.rocksdb_writebatch_create() orelse return null; - return Self{ .handle = handle }; - } - - pub fn deinit(self: *Self) void { - c.rocksdb_writebatch_destroy(self.handle); - } - - pub fn put(self: *Self, key: []const u8, value: []const u8) void { - c.rocksdb_writebatch_put(self.handle, key.ptr, key.len, value.ptr, value.len); - } - - pub fn delete(self: *Self, key: []const u8) void { - c.rocksdb_writebatch_delete(self.handle, key.ptr, key.len); - } - - pub fn clear(self: *Self) void { - c.rocksdb_writebatch_clear(self.handle); - } - - pub fn write(self: *Self, db: *DB) RocksDBError!void { - var err: [*c]u8 = null; - c.rocksdb_write(db.handle, db.write_options, self.handle, &err); - if (err != null) { - defer c.rocksdb_free(err); - return RocksDBError.WriteFailed; - } - } -}; - -pub const Iterator = struct { - handle: *c.rocksdb_iterator_t, - - const Self = @This(); - - pub fn init(db: *DB) ?Self { - const handle = c.rocksdb_create_iterator(db.handle, db.read_options) orelse return null; - return Self{ .handle = handle }; - } - - pub fn deinit(self: *Self) void { - c.rocksdb_iter_destroy(self.handle); - } - - pub fn seekToFirst(self: *Self) void { - c.rocksdb_iter_seek_to_first(self.handle); - } - - pub fn seekToLast(self: *Self) void { - c.rocksdb_iter_seek_to_last(self.handle); - } - - pub fn seek(self: *Self, target: []const u8) void { - c.rocksdb_iter_seek(self.handle, target.ptr, target.len); - } - - pub fn seekForPrev(self: *Self, target: []const u8) void { - c.rocksdb_iter_seek_for_prev(self.handle, target.ptr, target.len); - } - - pub fn valid(self: *Self) bool { - return c.rocksdb_iter_valid(self.handle) != 0; - } - - pub fn next(self: *Self) void { - c.rocksdb_iter_next(self.handle); - } - - pub fn prev(self: *Self) void { - c.rocksdb_iter_prev(self.handle); - } - - pub fn key(self: *Self) ?[]const u8 { - var len: usize = 0; - const k = c.rocksdb_iter_key(self.handle, &len); - if (k == null) return null; - return k[0..len]; - } - - pub fn value(self: *Self) ?[]const u8 { - var len: usize = 0; - const v = c.rocksdb_iter_value(self.handle, &len); - if (v == null) return null; - return v[0..len]; - } -}; - -// Tests -test "rocksdb basic operations" { - const allocator = std.testing.allocator; - - // Use temp directory - const path = "/tmp/test_rocksdb_basic"; - defer { - std.fs.deleteTreeAbsolute(path) catch {}; - } - - var db = try DB.open(path, true); - defer db.close(); - - // Put and get - try db.put("hello", "world"); - const val = try db.get(allocator, "hello"); - try std.testing.expectEqualStrings("world", val.?); - allocator.free(val.?); - - // Delete - try db.delete("hello"); - const deleted = try db.get(allocator, "hello"); - try std.testing.expect(deleted == null); -} - -test "rocksdb write batch" { - const allocator = std.testing.allocator; - - const path = "/tmp/test_rocksdb_batch"; - defer { - std.fs.deleteTreeAbsolute(path) catch {}; - } - - var db = try DB.open(path, true); - defer db.close(); - - var batch = WriteBatch.init() orelse unreachable; - defer batch.deinit(); - - batch.put("key1", "value1"); - batch.put("key2", "value2"); - batch.put("key3", "value3"); - - try batch.write(&db); - - const v1 = try db.get(allocator, "key1"); - defer if (v1) |v| allocator.free(v); - try std.testing.expectEqualStrings("value1", v1.?); -} - -test "rocksdb iterator" { - const path = "/tmp/test_rocksdb_iter"; - defer { - std.fs.deleteTreeAbsolute(path) catch {}; - } - - var db = try DB.open(path, true); - defer db.close(); - - try db.put("a", "1"); - try db.put("b", "2"); - try db.put("c", "3"); - - var iter = Iterator.init(&db) orelse unreachable; - defer iter.deinit(); - - iter.seekToFirst(); - - var count: usize = 0; - while (iter.valid()) : (iter.next()) { - count += 1; - } - try std.testing.expectEqual(@as(usize, 3), count); -} - - -================================================================================ -FILE: ./tests/integration.zig -================================================================================ - -/// Integration tests for ZynamoDB -const std = @import("std"); - -// Import modules from main source -const rocksdb = @import("../src/rocksdb.zig"); -const storage = @import("../src/dynamodb/storage.zig"); -const types = @import("../src/dynamodb/types.zig"); - -test "integration: full table lifecycle" { - const allocator = std.testing.allocator; - - // Setup - const path = "/tmp/test_integration_lifecycle"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try storage.StorageEngine.init(allocator, path); - defer engine.deinit(); - - // Create table - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - const desc = try engine.createTable("Users", &key_schema, &attr_defs); - try std.testing.expectEqualStrings("Users", desc.table_name); - try std.testing.expectEqual(types.TableStatus.ACTIVE, desc.table_status); - - // Put items - try engine.putItem("Users", "{\"pk\":{\"S\":\"user1\"},\"name\":{\"S\":\"Alice\"}}"); - try engine.putItem("Users", "{\"pk\":{\"S\":\"user2\"},\"name\":{\"S\":\"Bob\"}}"); - try engine.putItem("Users", "{\"pk\":{\"S\":\"user3\"},\"name\":{\"S\":\"Charlie\"}}"); - - // Get item - const item = try engine.getItem("Users", "{\"pk\":{\"S\":\"user1\"}}"); - try std.testing.expect(item != null); - defer allocator.free(item.?); - - // Scan - const all_items = try engine.scan("Users", null); - defer { - for (all_items) |i| allocator.free(i); - allocator.free(all_items); - } - try std.testing.expectEqual(@as(usize, 3), all_items.len); - - // Delete item - try engine.deleteItem("Users", "{\"pk\":{\"S\":\"user2\"}}"); - - // Verify deletion - const after_delete = try engine.scan("Users", null); - defer { - for (after_delete) |i| allocator.free(i); - allocator.free(after_delete); - } - try std.testing.expectEqual(@as(usize, 2), after_delete.len); - - // Delete table - try engine.deleteTable("Users"); - - // Verify table deletion - const tables = try engine.listTables(); - defer allocator.free(tables); - try std.testing.expectEqual(@as(usize, 0), tables.len); -} - -test "integration: multiple tables" { - const allocator = std.testing.allocator; - - const path = "/tmp/test_integration_multi_table"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try storage.StorageEngine.init(allocator, path); - defer engine.deinit(); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - // Create multiple tables - _ = try engine.createTable("Table1", &key_schema, &attr_defs); - _ = try engine.createTable("Table2", &key_schema, &attr_defs); - _ = try engine.createTable("Table3", &key_schema, &attr_defs); - - // List tables - const tables = try engine.listTables(); - defer { - for (tables) |t| allocator.free(t); - allocator.free(tables); - } - try std.testing.expectEqual(@as(usize, 3), tables.len); - - // Put items in different tables - try engine.putItem("Table1", "{\"pk\":{\"S\":\"item1\"}}"); - try engine.putItem("Table2", "{\"pk\":{\"S\":\"item2\"}}"); - try engine.putItem("Table3", "{\"pk\":{\"S\":\"item3\"}}"); - - // Verify isolation - scan should only return items from that table - const table1_items = try engine.scan("Table1", null); - defer { - for (table1_items) |i| allocator.free(i); - allocator.free(table1_items); - } - try std.testing.expectEqual(@as(usize, 1), table1_items.len); -} - -test "integration: table already exists error" { - const allocator = std.testing.allocator; - - const path = "/tmp/test_integration_exists"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try storage.StorageEngine.init(allocator, path); - defer engine.deinit(); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - // Create table - _ = try engine.createTable("DuplicateTest", &key_schema, &attr_defs); - - // Try to create again - should fail - const result = engine.createTable("DuplicateTest", &key_schema, &attr_defs); - try std.testing.expectError(storage.StorageError.TableAlreadyExists, result); -} - -test "integration: table not found error" { - const allocator = std.testing.allocator; - - const path = "/tmp/test_integration_notfound"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try storage.StorageEngine.init(allocator, path); - defer engine.deinit(); - - // Try to put item in non-existent table - const result = engine.putItem("NonExistent", "{\"pk\":{\"S\":\"item1\"}}"); - try std.testing.expectError(storage.StorageError.TableNotFound, result); -} - -test "integration: scan with limit" { - const allocator = std.testing.allocator; - - const path = "/tmp/test_integration_scan_limit"; - defer std.fs.deleteTreeAbsolute(path) catch {}; - - var engine = try storage.StorageEngine.init(allocator, path); - defer engine.deinit(); - - const key_schema = [_]types.KeySchemaElement{ - .{ .attribute_name = "pk", .key_type = .HASH }, - }; - const attr_defs = [_]types.AttributeDefinition{ - .{ .attribute_name = "pk", .attribute_type = .S }, - }; - - _ = try engine.createTable("LimitTest", &key_schema, &attr_defs); - - // Add many items - var i: usize = 0; - while (i < 10) : (i += 1) { - var buf: [128]u8 = undefined; - const item = try std.fmt.bufPrint(&buf, "{{\"pk\":{{\"S\":\"item{d}\"}}}}", .{i}); - try engine.putItem("LimitTest", item); - } - - // Scan with limit - const limited = try engine.scan("LimitTest", 5); - defer { - for (limited) |item| allocator.free(item); - allocator.free(limited); - } - try std.testing.expectEqual(@as(usize, 5), limited.len); -} - -