From cf352dde23e70ca7c61d0783852b1d33ebd4c702 Mon Sep 17 00:00:00 2001 From: biondizzle Date: Sun, 15 Feb 2026 13:56:08 -0500 Subject: [PATCH] consolidate --- Makefile | 2 +- {item_codec => dynamodb}/item_codec.odin | 250 +++++---- dynamodb/json.odin | 145 +++--- {key_codec => dynamodb}/key_codec.odin | 152 +++--- dynamodb/storage.odin | 23 +- dynamodb/types.odin | 55 +- main.odin | 636 ++++++++++++++++++++++- rocksdb_shim/rocksdb_shim.cc | 171 +++++- 8 files changed, 1096 insertions(+), 338 deletions(-) rename {item_codec => dynamodb}/item_codec.odin (84%) rename {key_codec => dynamodb}/key_codec.odin (89%) diff --git a/Makefile b/Makefile index cd1f316..27c3223 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ SHIM_HDRS := $(SHIM_DIR)/rocksdb_shim.h CXX := g++ AR := ar -CXXFLAGS := -O2 -fPIC -std=c++17 $(INCLUDE_PATH) +CXXFLAGS := -O2 -fPIC -std=c++20 $(INCLUDE_PATH) # RocksDB and compression libraries diff --git a/item_codec/item_codec.odin b/dynamodb/item_codec.odin similarity index 84% rename from item_codec/item_codec.odin rename to dynamodb/item_codec.odin index 5b61659..76ee991 100644 --- a/item_codec/item_codec.odin +++ b/dynamodb/item_codec.odin @@ -1,12 +1,10 @@ // Binary TLV (Type-Length-Value) encoding for DynamoDB items // Replaces JSON storage with efficient binary format // Format: [attribute_count][name_len][name][type_tag][value_len][value]... -package item_codec +package dynamodb import "core:bytes" -import "core:encoding/varint" import "core:slice" -import "../dynamodb" // Type tags for binary encoding (1 byte each) Type_Tag :: enum u8 { @@ -16,12 +14,12 @@ Type_Tag :: enum u8 { Binary = 0x03, // B (base64 string) Boolean = 0x04, // BOOL Null = 0x05, // NULL - + // Set types String_Set = 0x10, // SS Number_Set = 0x11, // NS Binary_Set = 0x12, // BS - + // Complex types List = 0x20, // L Map = 0x21, // M @@ -34,93 +32,93 @@ Type_Tag :: enum u8 { // Encode an Item to binary TLV format // Format: [attribute_count:varint][attributes...] // Each attribute: [name_len:varint][name:bytes][type_tag:u8][value_encoded:bytes] -encode :: proc(item: dynamodb.Item) -> ([]byte, bool) { +encode :: proc(item: Item) -> ([]byte, bool) { buf: bytes.Buffer bytes.buffer_init_allocator(&buf, 0, 1024, context.allocator) defer bytes.buffer_destroy(&buf) - + // Write attribute count encode_varint(&buf, len(item)) - + // Collect and sort keys for deterministic encoding keys := make([dynamic]string, context.temp_allocator) for key in item { append(&keys, key) } - + slice.sort_by(keys[:], proc(a, b: string) -> bool { return a < b }) - + // Encode each attribute for key in keys { value := item[key] - + // Write attribute name encode_varint(&buf, len(key)) bytes.buffer_write_string(&buf, key) - + // Encode attribute value ok := encode_attribute_value(&buf, value) if !ok { return nil, false } } - + return bytes.buffer_to_bytes(&buf), true } // Encode an AttributeValue to binary format -encode_attribute_value :: proc(buf: ^bytes.Buffer, attr: dynamodb.Attribute_Value) -> bool { +encode_attribute_value :: proc(buf: ^bytes.Buffer, attr: Attribute_Value) -> bool { switch v in attr { - case dynamodb.String: + case String: bytes.buffer_write_byte(buf, u8(Type_Tag.String)) encode_varint(buf, len(v)) bytes.buffer_write_string(buf, string(v)) - - case dynamodb.Number: + + case Number: bytes.buffer_write_byte(buf, u8(Type_Tag.Number)) encode_varint(buf, len(v)) bytes.buffer_write_string(buf, string(v)) - - case dynamodb.Binary: + + case Binary: bytes.buffer_write_byte(buf, u8(Type_Tag.Binary)) encode_varint(buf, len(v)) bytes.buffer_write_string(buf, string(v)) - - case dynamodb.Bool: + + case Bool: bytes.buffer_write_byte(buf, u8(Type_Tag.Boolean)) bytes.buffer_write_byte(buf, 1 if bool(v) else 0) - - case dynamodb.Null: + + case Null: bytes.buffer_write_byte(buf, u8(Type_Tag.Null)) // NULL has no value bytes - - case dynamodb.String_Set: + + case String_Set: bytes.buffer_write_byte(buf, u8(Type_Tag.String_Set)) encode_varint(buf, len(v)) for s in v { encode_varint(buf, len(s)) bytes.buffer_write_string(buf, s) } - - case dynamodb.Number_Set: + + case Number_Set: bytes.buffer_write_byte(buf, u8(Type_Tag.Number_Set)) encode_varint(buf, len(v)) for n in v { encode_varint(buf, len(n)) bytes.buffer_write_string(buf, n) } - - case dynamodb.Binary_Set: + + case Binary_Set: bytes.buffer_write_byte(buf, u8(Type_Tag.Binary_Set)) encode_varint(buf, len(v)) for b in v { encode_varint(buf, len(b)) bytes.buffer_write_string(buf, b) } - - case dynamodb.List: + + case List: bytes.buffer_write_byte(buf, u8(Type_Tag.List)) encode_varint(buf, len(v)) for item in v { @@ -129,21 +127,21 @@ encode_attribute_value :: proc(buf: ^bytes.Buffer, attr: dynamodb.Attribute_Valu return false } } - - case dynamodb.Map: + + case Map: bytes.buffer_write_byte(buf, u8(Type_Tag.Map)) encode_varint(buf, len(v)) - + // Collect and sort keys for deterministic encoding keys := make([dynamic]string, context.temp_allocator) for key in v { append(&keys, key) } - + slice.sort_by(keys[:], proc(a, b: string) -> bool { return a < b }) - + // Encode each map entry for key in keys { value := v[key] @@ -155,7 +153,7 @@ encode_attribute_value :: proc(buf: ^bytes.Buffer, attr: dynamodb.Attribute_Valu } } } - + return true } @@ -169,15 +167,11 @@ Binary_Decoder :: struct { pos: int, } -decoder_init :: proc(data: []byte) -> Binary_Decoder { - return Binary_Decoder{data = data, pos = 0} -} - decoder_read_byte :: proc(decoder: ^Binary_Decoder) -> (u8, bool) { if decoder.pos >= len(decoder.data) { return 0, false } - + byte := decoder.data[decoder.pos] decoder.pos += 1 return byte, true @@ -187,7 +181,7 @@ decoder_read_bytes :: proc(decoder: ^Binary_Decoder, length: int) -> ([]byte, bo if decoder.pos + length > len(decoder.data) { return nil, false } - + bytes := decoder.data[decoder.pos:decoder.pos + length] decoder.pos += length return bytes, true @@ -196,143 +190,143 @@ decoder_read_bytes :: proc(decoder: ^Binary_Decoder, length: int) -> ([]byte, bo decoder_read_varint :: proc(decoder: ^Binary_Decoder) -> (int, bool) { result: int = 0 shift: uint = 0 - + for decoder.pos < len(decoder.data) { byte := decoder.data[decoder.pos] decoder.pos += 1 - + result |= int(byte & 0x7F) << shift - + if (byte & 0x80) == 0 { return result, true } - + shift += 7 if shift >= 64 { return 0, false // Varint overflow } } - + return 0, false // Unexpected end of data } // Decode binary TLV format back into an Item -decode :: proc(data: []byte) -> (dynamodb.Item, bool) { - decoder := decoder_init(data) - +decode :: proc(data: []byte) -> (Item, bool) { + decoder := Binary_Decoder{data = data, pos = 0} + attr_count, count_ok := decoder_read_varint(&decoder) if !count_ok { return {}, false } - - item := make(dynamodb.Item) - - for i in 0.. (dynamodb.Attribute_Value, bool) { +decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (Attribute_Value, bool) { type_byte, type_ok := decoder_read_byte(decoder) if !type_ok { return nil, false } - + type_tag := Type_Tag(type_byte) - + switch type_tag { case .String: length, len_ok := decoder_read_varint(decoder) if !len_ok { return nil, false } - + data, data_ok := decoder_read_bytes(decoder, length) if !data_ok { return nil, false } - + str := string(data) owned := transmute(string)slice.clone(transmute([]byte)str) - return dynamodb.String(owned), true - + return String(owned), true + case .Number: length, len_ok := decoder_read_varint(decoder) if !len_ok { return nil, false } - + data, data_ok := decoder_read_bytes(decoder, length) if !data_ok { return nil, false } - + str := string(data) owned := transmute(string)slice.clone(transmute([]byte)str) - return dynamodb.Number(owned), true - + return Number(owned), true + case .Binary: length, len_ok := decoder_read_varint(decoder) if !len_ok { return nil, false } - + data, data_ok := decoder_read_bytes(decoder, length) if !data_ok { return nil, false } - + str := string(data) owned := transmute(string)slice.clone(transmute([]byte)str) - return dynamodb.Binary(owned), true - + return Binary(owned), true + case .Boolean: byte, byte_ok := decoder_read_byte(decoder) if !byte_ok { return nil, false } - - return dynamodb.Bool(byte != 0), true - + + return Bool(byte != 0), true + case .Null: - return dynamodb.Null(true), true - + return Null(true), true + case .String_Set: count, count_ok := decoder_read_varint(decoder) if !count_ok { return nil, false } - + strings := make([]string, count) - + for i in 0.. (dynamodb.Attribute_ delete(strings) return nil, false } - + data, data_ok := decoder_read_bytes(decoder, length) if !data_ok { for j in 0.. (dynamodb.Attribute_ delete(strings) return nil, false } - + str := string(data) strings[i] = transmute(string)slice.clone(transmute([]byte)str) } - - return dynamodb.String_Set(strings), true - + + return String_Set(strings), true + case .Number_Set: count, count_ok := decoder_read_varint(decoder) if !count_ok { return nil, false } - + numbers := make([]string, count) - + for i in 0.. (dynamodb.Attribute_ delete(numbers) return nil, false } - + data, data_ok := decoder_read_bytes(decoder, length) if !data_ok { for j in 0.. (dynamodb.Attribute_ delete(numbers) return nil, false } - + str := string(data) numbers[i] = transmute(string)slice.clone(transmute([]byte)str) } - - return dynamodb.Number_Set(numbers), true - + + return Number_Set(numbers), true + case .Binary_Set: count, count_ok := decoder_read_varint(decoder) if !count_ok { return nil, false } - + binaries := make([]string, count) - + for i in 0.. (dynamodb.Attribute_ delete(binaries) return nil, false } - + data, data_ok := decoder_read_bytes(decoder, length) if !data_ok { for j in 0.. (dynamodb.Attribute_ delete(binaries) return nil, false } - + str := string(data) binaries[i] = transmute(string)slice.clone(transmute([]byte)str) } - - return dynamodb.Binary_Set(binaries), true - + + return Binary_Set(binaries), true + case .List: count, count_ok := decoder_read_varint(decoder) if !count_ok { return nil, false } - - list := make([]dynamodb.Attribute_Value, count) - + + list := make([]Attribute_Value, count) + for i in 0.. (dynamodb.Attribute_ for k, v in attr_map { delete(k) v_copy := v - dynamodb.attr_value_destroy(&v_copy) + attr_value_destroy(&v_copy) } delete(attr_map) return nil, false } - + key_bytes, key_ok := decoder_read_bytes(decoder, key_len) if !key_ok { for k, v in attr_map { delete(k) v_copy := v - dynamodb.attr_value_destroy(&v_copy) + attr_value_destroy(&v_copy) } delete(attr_map) return nil, false } - + key := string(key_bytes) owned_key := transmute(string)slice.clone(transmute([]byte)key) - + // Read value value, value_ok := decode_attribute_value(decoder) if !value_ok { @@ -493,23 +487,23 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_ for k, v in attr_map { delete(k) v_copy := v - dynamodb.attr_value_destroy(&v_copy) + attr_value_destroy(&v_copy) } delete(attr_map) return nil, false } - + attr_map[owned_key] = value } - - return dynamodb.Map(attr_map), true + + return Map(attr_map), true } - + return nil, false } // ============================================================================ -// Varint Encoding +// Varint Encoding (Encodes a varint length prefix) // ============================================================================ encode_varint :: proc(buf: ^bytes.Buffer, value: int) { @@ -517,7 +511,7 @@ encode_varint :: proc(buf: ^bytes.Buffer, value: int) { for { byte := u8(v & 0x7F) v >>= 7 - + if v == 0 { bytes.buffer_write_byte(buf, byte) return diff --git a/dynamodb/json.odin b/dynamodb/json.odin index cc74f35..ea69140 100644 --- a/dynamodb/json.odin +++ b/dynamodb/json.odin @@ -4,7 +4,6 @@ package dynamodb import "core:encoding/json" import "core:fmt" -import "core:mem" import "core:slice" import "core:strings" @@ -20,7 +19,7 @@ parse_item :: proc(json_bytes: []byte) -> (Item, bool) { return {}, false } defer json.destroy_value(data) - + return parse_item_from_value(data) } @@ -31,12 +30,12 @@ parse_item_from_value :: proc(value: json.Value) -> (Item, bool) { if !ok { return {}, false } - + item := make(Item) - + for key, val in obj { attr_name := strings.clone(key) - + attr_value, attr_ok := parse_attribute_value(val) if !attr_ok { // Cleanup on error @@ -49,10 +48,10 @@ parse_item_from_value :: proc(value: json.Value) -> (Item, bool) { delete(attr_name) return {}, false } - + item[attr_name] = attr_value } - + return item, true } @@ -63,12 +62,12 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { if !ok { return nil, false } - + // DynamoDB attribute must have exactly one key (the type indicator) if len(obj) != 1 { return nil, false } - + // Get the single key-value pair for type_name, type_value in obj { // String @@ -79,7 +78,7 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } return String(strings.clone(string(str))), true } - + // Number (stored as string) if type_name == "N" { str, str_ok := type_value.(json.String) @@ -88,7 +87,7 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } return Number(strings.clone(string(str))), true } - + // Binary (base64 string) if type_name == "B" { str, str_ok := type_value.(json.String) @@ -97,7 +96,7 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } return Binary(strings.clone(string(str))), true } - + // Boolean if type_name == "BOOL" { b, b_ok := type_value.(json.Boolean) @@ -106,7 +105,7 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } return Bool(b), true } - + // Null if type_name == "NULL" { b, b_ok := type_value.(json.Boolean) @@ -115,16 +114,16 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } return Null(b), true } - + // String Set if type_name == "SS" { arr, arr_ok := type_value.(json.Array) if !arr_ok { return nil, false } - + strings_arr := make([]string, len(arr)) - + for item, i in arr { str, str_ok := item.(json.String) if !str_ok { @@ -137,19 +136,19 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } strings_arr[i] = strings.clone(string(str)) } - + return String_Set(strings_arr), true } - + // Number Set if type_name == "NS" { arr, arr_ok := type_value.(json.Array) if !arr_ok { return nil, false } - + numbers_arr := make([]string, len(arr)) - + for item, i in arr { str, str_ok := item.(json.String) if !str_ok { @@ -162,19 +161,19 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } numbers_arr[i] = strings.clone(string(str)) } - + return Number_Set(numbers_arr), true } - + // Binary Set if type_name == "BS" { arr, arr_ok := type_value.(json.Array) if !arr_ok { return nil, false } - + binaries_arr := make([]string, len(arr)) - + for item, i in arr { str, str_ok := item.(json.String) if !str_ok { @@ -187,19 +186,19 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } binaries_arr[i] = strings.clone(string(str)) } - + return Binary_Set(binaries_arr), true } - + // List if type_name == "L" { arr, arr_ok := type_value.(json.Array) if !arr_ok { return nil, false } - + list := make([]Attribute_Value, len(arr)) - + for item, i in arr { val, val_ok := parse_attribute_value(item) if !val_ok { @@ -213,22 +212,22 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { } list[i] = val } - + return List(list), true } - + // Map if type_name == "M" { map_obj, map_ok := type_value.(json.Object) if !map_ok { return nil, false } - + attr_map := make(map[string]Attribute_Value) - + for map_key, map_val in map_obj { key := strings.clone(map_key) - + val, val_ok := parse_attribute_value(map_val) if !val_ok { // Cleanup on error @@ -241,14 +240,14 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { delete(attr_map) return nil, false } - + attr_map[key] = val } - + return Map(attr_map), true } } - + return nil, false } @@ -261,9 +260,9 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) { serialize_item :: proc(item: Item) -> string { builder := strings.builder_make() defer strings.builder_destroy(&builder) - + serialize_item_to_builder(&builder, item) - + return strings.clone(strings.to_string(builder)) } @@ -272,16 +271,16 @@ serialize_item_to_builder :: proc(b: ^strings.Builder, item: Item) { // Collect and sort keys for deterministic output keys := make([dynamic]string, context.temp_allocator) defer delete(keys) - + for key in item { append(&keys, key) } - + // Sort keys alphabetically slice.sort_by(keys[:], proc(a, b: string) -> bool { return a < b }) - + strings.write_string(b, "{") for key, i in keys { if i > 0 { @@ -299,19 +298,19 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) { switch v in attr { case String: fmt.sbprintf(b, `{"S":"%s"}`, string(v)) - + case Number: fmt.sbprintf(b, `{"N":"%s"}`, string(v)) - + case Binary: fmt.sbprintf(b, `{"B":"%s"}`, string(v)) - + case Bool: fmt.sbprintf(b, `{"BOOL":%v}`, bool(v)) - + case Null: strings.write_string(b, `{"NULL":true}`) - + case String_Set: strings.write_string(b, `{"SS":[`) for s, i in v { @@ -321,7 +320,7 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) { fmt.sbprintf(b, `"%s"`, s) } strings.write_string(b, "]}") - + case Number_Set: strings.write_string(b, `{"NS":[`) for n, i in v { @@ -331,7 +330,7 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) { fmt.sbprintf(b, `"%s"`, n) } strings.write_string(b, "]}") - + case Binary_Set: strings.write_string(b, `{"BS":[`) for bin, i in v { @@ -341,7 +340,7 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) { fmt.sbprintf(b, `"%s"`, bin) } strings.write_string(b, "]}") - + case List: strings.write_string(b, `{"L":[`) for item, i in v { @@ -351,20 +350,20 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) { serialize_attribute_value(b, item) } strings.write_string(b, "]}") - + case Map: strings.write_string(b, `{"M":{`) - + // Collect and sort keys for deterministic output keys := make([dynamic]string, context.temp_allocator) for key in v { append(&keys, key) } - + slice.sort_by(keys[:], proc(a, b: string) -> bool { return a < b }) - + for key, i in keys { if i > 0 { strings.write_string(b, ",") @@ -373,7 +372,7 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) { value := v[key] serialize_attribute_value(b, value) } - + strings.write_string(b, "}}") } } @@ -389,22 +388,22 @@ parse_table_name :: proc(request_body: []byte) -> (string, bool) { return "", false } defer json.destroy_value(data) - + root, ok := data.(json.Object) if !ok { return "", false } - + table_name_val, found := root["TableName"] if !found { return "", false } - + table_name_str, str_ok := table_name_val.(json.String) if !str_ok { return "", false } - + return string(table_name_str), true } @@ -416,17 +415,17 @@ parse_item_from_request :: proc(request_body: []byte) -> (Item, bool) { return {}, false } defer json.destroy_value(data) - + root, ok := data.(json.Object) if !ok { return {}, false } - + item_val, found := root["Item"] if !found { return {}, false } - + return parse_item_from_value(item_val) } @@ -438,17 +437,17 @@ parse_key_from_request :: proc(request_body: []byte) -> (Item, bool) { return {}, false } defer json.destroy_value(data) - + root, ok := data.(json.Object) if !ok { return {}, false } - + key_val, found := root["Key"] if !found { return {}, false } - + return parse_item_from_value(key_val) } @@ -464,17 +463,17 @@ parse_limit :: proc(request_body: []byte) -> int { return 0 } defer json.destroy_value(data) - + root, ok := data.(json.Object) if !ok { return 0 } - + limit_val, found := root["Limit"] if !found { return 0 } - + // JSON numbers can be either Integer or Float #partial switch v in limit_val { case json.Integer: @@ -482,7 +481,7 @@ parse_limit :: proc(request_body: []byte) -> int { case json.Float: return int(v) } - + return 0 } @@ -494,24 +493,24 @@ parse_exclusive_start_key :: proc(request_body: []byte) -> Maybe([]byte) { return nil } defer json.destroy_value(data) - + root, ok := data.(json.Object) if !ok { return nil } - + key_val, found := root["ExclusiveStartKey"] if !found { return nil } - + // Parse as Item first key_item, item_ok := parse_item_from_value(key_val) if !item_ok { return nil } defer item_destroy(&key_item) - + // Convert to binary key bytes (this will be done by the storage layer) // For now, just return nil - the storage layer will handle the conversion return nil @@ -521,6 +520,6 @@ parse_exclusive_start_key :: proc(request_body: []byte) -> Maybe([]byte) { serialize_last_evaluated_key :: proc(key: Key) -> string { item := key_to_item(key, {}) // Empty key_schema since we don't need validation here defer item_destroy(&item) - + return serialize_item(item) } diff --git a/key_codec/key_codec.odin b/dynamodb/key_codec.odin similarity index 89% rename from key_codec/key_codec.odin rename to dynamodb/key_codec.odin index ae42ca0..ea6a94a 100644 --- a/key_codec/key_codec.odin +++ b/dynamodb/key_codec.odin @@ -1,8 +1,6 @@ -package key_codec +package dynamodb import "core:bytes" -import "core:encoding/varint" -import "core:mem" // Entity type prefix bytes for namespacing Entity_Type :: enum u8 { @@ -12,40 +10,64 @@ Entity_Type :: enum u8 { LSI = 0x04, // Local secondary index } -// Encode a varint length prefix -encode_varint :: proc(buf: ^bytes.Buffer, value: int) { - temp: [10]byte - n := varint.encode_u64(temp[:], u64(value)) - bytes.buffer_write(buf, temp[:n]) + +// Decode a varint length prefix from a byte slice. +// Reads starting at data[offset^] and advances offset^ past the varint on success. +decode_varint :: proc(data: []byte, offset: ^int) -> (value: int, ok: bool) { + i := offset^ + if i < 0 || i >= len(data) { + return 0, false + } + + u: u64 = 0 + shift: u32 = 0 + + for { + if i >= len(data) { + return 0, false // truncated + } + + b := data[i] + i += 1 + + u |= u64(b & 0x7F) << shift + + if (b & 0x80) == 0 { + break + } + + shift += 7 + if shift >= 64 { + return 0, false // malformed / overflow + } + } + + // ensure it fits in int on this platform + max_int := int((~uint(0)) >> 1) + if u > u64(max_int) { + return 0, false + } + + offset^ = i + return int(u), true } -// Decode a varint length prefix -decode_varint :: proc(data: []byte, offset: ^int) -> (value: int, ok: bool) { - if offset^ >= len(data) { - return 0, false - } - - val, n := varint.decode_u64(data[offset^:]) - if n <= 0 { - return 0, false - } - - offset^ += n - return int(val), true -} + + + // Build metadata key: [meta][table_name] build_meta_key :: proc(table_name: string) -> []byte { buf: bytes.Buffer bytes.buffer_init_allocator(&buf, 0, 256, context.allocator) - + // Write entity type bytes.buffer_write_byte(&buf, u8(Entity_Type.Meta)) - + // Write table name with length prefix encode_varint(&buf, len(table_name)) bytes.buffer_write_string(&buf, table_name) - + return bytes.buffer_to_bytes(&buf) } @@ -53,24 +75,24 @@ build_meta_key :: proc(table_name: string) -> []byte { build_data_key :: proc(table_name: string, pk_value: []byte, sk_value: Maybe([]byte)) -> []byte { buf: bytes.Buffer bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) - + // Write entity type bytes.buffer_write_byte(&buf, u8(Entity_Type.Data)) - + // Write table name encode_varint(&buf, len(table_name)) bytes.buffer_write_string(&buf, table_name) - + // Write partition key encode_varint(&buf, len(pk_value)) bytes.buffer_write(&buf, pk_value) - + // Write sort key if present if sk, ok := sk_value.?; ok { encode_varint(&buf, len(sk)) bytes.buffer_write(&buf, sk) } - + return bytes.buffer_to_bytes(&buf) } @@ -78,14 +100,14 @@ build_data_key :: proc(table_name: string, pk_value: []byte, sk_value: Maybe([]b build_table_prefix :: proc(table_name: string) -> []byte { buf: bytes.Buffer bytes.buffer_init_allocator(&buf, 0, 256, context.allocator) - + // Write entity type bytes.buffer_write_byte(&buf, u8(Entity_Type.Data)) - + // Write table name encode_varint(&buf, len(table_name)) bytes.buffer_write_string(&buf, table_name) - + return bytes.buffer_to_bytes(&buf) } @@ -93,18 +115,18 @@ build_table_prefix :: proc(table_name: string) -> []byte { build_partition_prefix :: proc(table_name: string, pk_value: []byte) -> []byte { buf: bytes.Buffer bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) - + // Write entity type bytes.buffer_write_byte(&buf, u8(Entity_Type.Data)) - + // Write table name encode_varint(&buf, len(table_name)) bytes.buffer_write_string(&buf, table_name) - + // Write partition key encode_varint(&buf, len(pk_value)) bytes.buffer_write(&buf, pk_value) - + return bytes.buffer_to_bytes(&buf) } @@ -112,28 +134,28 @@ build_partition_prefix :: proc(table_name: string, pk_value: []byte) -> []byte { build_gsi_key :: proc(table_name: string, index_name: string, gsi_pk: []byte, gsi_sk: Maybe([]byte)) -> []byte { buf: bytes.Buffer bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) - + // Write entity type bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI)) - + // Write table name encode_varint(&buf, len(table_name)) bytes.buffer_write_string(&buf, table_name) - + // Write index name encode_varint(&buf, len(index_name)) bytes.buffer_write_string(&buf, index_name) - + // Write GSI partition key encode_varint(&buf, len(gsi_pk)) bytes.buffer_write(&buf, gsi_pk) - + // Write GSI sort key if present if sk, ok := gsi_sk.?; ok { encode_varint(&buf, len(sk)) bytes.buffer_write(&buf, sk) } - + return bytes.buffer_to_bytes(&buf) } @@ -141,26 +163,26 @@ build_gsi_key :: proc(table_name: string, index_name: string, gsi_pk: []byte, gs build_lsi_key :: proc(table_name: string, index_name: string, pk: []byte, lsi_sk: []byte) -> []byte { buf: bytes.Buffer bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) - + // Write entity type bytes.buffer_write_byte(&buf, u8(Entity_Type.LSI)) - + // Write table name encode_varint(&buf, len(table_name)) bytes.buffer_write_string(&buf, table_name) - + // Write index name encode_varint(&buf, len(index_name)) bytes.buffer_write_string(&buf, index_name) - + // Write partition key encode_varint(&buf, len(pk)) bytes.buffer_write(&buf, pk) - + // Write LSI sort key encode_varint(&buf, len(lsi_sk)) bytes.buffer_write(&buf, lsi_sk) - + return bytes.buffer_to_bytes(&buf) } @@ -170,15 +192,11 @@ Key_Decoder :: struct { pos: int, } -decoder_init :: proc(data: []byte) -> Key_Decoder { - return Key_Decoder{data = data, pos = 0} -} - decoder_read_entity_type :: proc(decoder: ^Key_Decoder) -> (Entity_Type, bool) { if decoder.pos >= len(decoder.data) { return .Meta, false } - + entity_type := Entity_Type(decoder.data[decoder.pos]) decoder.pos += 1 return entity_type, true @@ -187,32 +205,32 @@ decoder_read_entity_type :: proc(decoder: ^Key_Decoder) -> (Entity_Type, bool) { decoder_read_segment :: proc(decoder: ^Key_Decoder) -> (segment: []byte, ok: bool) { // Read length length := decode_varint(decoder.data, &decoder.pos) or_return - + // Read data if decoder.pos + length > len(decoder.data) { return nil, false } - + // Return slice (owned by caller via context.allocator) segment = make([]byte, length, context.allocator) copy(segment, decoder.data[decoder.pos:decoder.pos + length]) decoder.pos += length - + return segment, true } decoder_read_segment_borrowed :: proc(decoder: ^Key_Decoder) -> (segment: []byte, ok: bool) { // Read length length := decode_varint(decoder.data, &decoder.pos) or_return - + // Return borrowed slice if decoder.pos + length > len(decoder.data) { return nil, false } - + segment = decoder.data[decoder.pos:decoder.pos + length] decoder.pos += length - + return segment, true } @@ -228,26 +246,26 @@ Decoded_Data_Key :: struct { } decode_data_key :: proc(key: []byte) -> (result: Decoded_Data_Key, ok: bool) { - decoder := decoder_init(key) - + decoder := Key_Decoder{data = key, pos = 0} + // Read and verify entity type entity_type := decoder_read_entity_type(&decoder) or_return if entity_type != .Data { return {}, false } - + // Read table name table_name_bytes := decoder_read_segment(&decoder) or_return result.table_name = string(table_name_bytes) - + // Read partition key result.pk_value = decoder_read_segment(&decoder) or_return - + // Read sort key if present if decoder_has_more(&decoder) { sk := decoder_read_segment(&decoder) or_return result.sk_value = sk } - + return result, true -} +} \ No newline at end of file diff --git a/dynamodb/storage.odin b/dynamodb/storage.odin index b1bf729..d0f4733 100644 --- a/dynamodb/storage.odin +++ b/dynamodb/storage.odin @@ -1,15 +1,12 @@ // Storage engine mapping DynamoDB operations to RocksDB package dynamodb -import "core:encoding/json" import "core:fmt" import "core:mem" import "core:slice" import "core:strings" import "core:sync" import "core:time" -import "../key_codec" -import "../item_codec" import "../rocksdb" Storage_Error :: enum { @@ -218,12 +215,12 @@ serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) { meta_item["CreationDateTime"] = Number(fmt.aprint(metadata.creation_date_time)) // Encode to binary - return item_codec.encode(meta_item) + return encode(meta_item) } // Deserialize table metadata from binary format deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (Table_Metadata, bool) { - meta_item, ok := item_codec.decode(data) + meta_item, ok := decode(data) if !ok { return {}, false } @@ -239,7 +236,7 @@ deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (T // Get table metadata get_table_metadata :: proc(engine: ^Storage_Engine, table_name: string) -> (Table_Metadata, Storage_Error) { - meta_key := key_codec.build_meta_key(table_name) + meta_key := build_meta_key(table_name) defer delete(meta_key) value, get_err := rocksdb.db_get(&engine.db, meta_key) @@ -275,7 +272,7 @@ create_table :: proc( defer sync.rw_mutex_unlock(table_lock) // Check if table already exists - meta_key := key_codec.build_meta_key(table_name) + meta_key := build_meta_key(table_name) defer delete(meta_key) existing, get_err := rocksdb.db_get(&engine.db, meta_key) @@ -340,7 +337,7 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err defer sync.rw_mutex_unlock(table_lock) // Check table exists - meta_key := key_codec.build_meta_key(table_name) + meta_key := build_meta_key(table_name) defer delete(meta_key) existing, get_err := rocksdb.db_get(&engine.db, meta_key) @@ -403,11 +400,11 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto } // Build storage key - storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk) + storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) // Encode item - encoded_item, encode_ok := item_codec.encode(item) + encoded_item, encode_ok := encode(item) if !encode_ok { return .Serialization_Error } @@ -456,7 +453,7 @@ get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (May } // Build storage key - storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk) + storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) // Get from RocksDB @@ -470,7 +467,7 @@ get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (May defer delete(value) // Decode item - item, decode_ok := item_codec.decode(value) + item, decode_ok := decode(value) if !decode_ok { return nil, .Serialization_Error } @@ -512,7 +509,7 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S } // Build storage key - storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk) + storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) // Delete from RocksDB diff --git a/dynamodb/types.odin b/dynamodb/types.odin index 531a868..cf85ca8 100644 --- a/dynamodb/types.odin +++ b/dynamodb/types.odin @@ -51,13 +51,13 @@ key_destroy :: proc(key: ^Key) { key_from_item :: proc(item: Item, key_schema: []Key_Schema_Element) -> (Key, bool) { pk_value: Attribute_Value sk_value: Maybe(Attribute_Value) - + for schema_elem in key_schema { attr, ok := item[schema_elem.attribute_name] if !ok { return {}, false } - + // Validate that key is a scalar type (S, N, or B) #partial switch _ in attr { case String, Number, Binary: @@ -65,10 +65,10 @@ key_from_item :: proc(item: Item, key_schema: []Key_Schema_Element) -> (Key, boo case: return {}, false } - + // Deep copy the attribute value copied := attr_value_deep_copy(attr) - + switch schema_elem.key_type { case .HASH: pk_value = copied @@ -76,17 +76,17 @@ key_from_item :: proc(item: Item, key_schema: []Key_Schema_Element) -> (Key, boo sk_value = copied } } - + return Key{pk = pk_value, sk = sk_value}, true } // Convert key to item key_to_item :: proc(key: Key, key_schema: []Key_Schema_Element) -> Item { item := make(Item) - + for schema_elem in key_schema { attr_value: Attribute_Value - + switch schema_elem.key_type { case .HASH: attr_value = key.pk @@ -97,10 +97,10 @@ key_to_item :: proc(key: Key, key_schema: []Key_Schema_Element) -> Item { continue } } - + item[schema_elem.attribute_name] = attr_value_deep_copy(attr_value) } - + return item } @@ -112,8 +112,8 @@ Key_Values :: struct { key_get_values :: proc(key: ^Key) -> (Key_Values, bool) { pk_bytes: []byte - - switch v in key.pk { + + #partial switch v in key.pk { case String: pk_bytes = transmute([]byte)string(v) case Number: @@ -121,12 +121,13 @@ key_get_values :: proc(key: ^Key) -> (Key_Values, bool) { case Binary: pk_bytes = transmute([]byte)string(v) case: + // Keys should only be scalar types (S, N, or B) return {}, false } - + sk_bytes: Maybe([]byte) if sk, ok := key.sk.?; ok { - switch v in sk { + #partial switch v in sk { case String: sk_bytes = transmute([]byte)string(v) case Number: @@ -134,10 +135,11 @@ key_get_values :: proc(key: ^Key) -> (Key_Values, bool) { case Binary: sk_bytes = transmute([]byte)string(v) case: + // Keys should only be scalar types return {}, false } } - + return Key_Values{pk = pk_bytes, sk = sk_bytes}, true } @@ -289,9 +291,9 @@ operation_from_target :: proc(target: string) -> Operation { if !strings.has_prefix(target, prefix) { return .Unknown } - + op_name := target[len(prefix):] - + switch op_name { case "CreateTable": return .CreateTable case "DeleteTable": return .DeleteTable @@ -309,7 +311,7 @@ operation_from_target :: proc(target: string) -> Operation { case "TransactGetItems": return .TransactGetItems case "TransactWriteItems": return .TransactWriteItems } - + return .Unknown } @@ -327,7 +329,7 @@ DynamoDB_Error_Type :: enum { error_to_response :: proc(err_type: DynamoDB_Error_Type, message: string) -> string { type_str: string - + switch err_type { case .ValidationException: type_str = "com.amazonaws.dynamodb.v20120810#ValidationException" @@ -346,7 +348,7 @@ error_to_response :: proc(err_type: DynamoDB_Error_Type, message: string) -> str case .SerializationException: type_str = "com.amazonaws.dynamodb.v20120810#SerializationException" } - + return fmt.aprintf(`{{"__type":"%s","message":"%s"}}`, type_str, message) } @@ -410,30 +412,35 @@ attr_value_destroy :: proc(attr: ^Attribute_Value) { for s in v { delete(s) } - delete([]string(v)) + slice := v + delete(slice) case Number_Set: for n in v { delete(n) } - delete([]string(v)) + slice := v + delete(slice) case Binary_Set: for b in v { delete(b) } - delete([]string(v)) + slice := v + delete(slice) case List: for item in v { item_copy := item attr_value_destroy(&item_copy) } - delete([]Attribute_Value(v)) + list := v + delete(list) case Map: for key, val in v { delete(key) val_copy := val attr_value_destroy(&val_copy) } - delete(map[string]Attribute_Value(v)) + m := v + delete(m) case Bool, Null: // Nothing to free } diff --git a/main.odin b/main.odin index 6bf2844..f8e5523 100644 --- a/main.odin +++ b/main.odin @@ -1,11 +1,12 @@ package main +import "core:encoding/json" import "core:fmt" import "core:mem" import "core:os" import "core:strconv" -//import "core:strings" // I know we'll use in future but because we're not right now, compiler is complaining -import "rocksdb" +import "core:strings" +import "dynamodb" Config :: struct { host: string, @@ -25,12 +26,12 @@ main :: proc() { os.make_directory(config.data_dir) // Initialize storage engine - db, err := rocksdb.db_open(config.data_dir, true) + engine, err := dynamodb.storage_engine_init(context.allocator, config.data_dir) if err != .None { fmt.eprintln("Failed to initialize storage:", err) os.exit(1) } - defer rocksdb.db_close(&db) + defer dynamodb.storage_engine_destroy(engine) fmt.printfln("Storage engine initialized at %s", config.data_dir) fmt.printfln("Starting DynamoDB-compatible server on %s:%d", config.host, config.port) @@ -38,13 +39,12 @@ main :: proc() { // Create HTTP server server_config := default_server_config() - // For now, use a simple echo handler until we implement the full DynamoDB handler server, server_ok := server_init( context.allocator, config.host, config.port, - handle_http_request, - &db, + handle_dynamodb_request, + engine, server_config, ) @@ -63,30 +63,628 @@ main :: proc() { } } -// Temporary HTTP request handler -// TODO: Replace with full DynamoDB handler once dynamodb/handler.odin is implemented -handle_http_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_alloc: mem.Allocator) -> HTTP_Response { - //db := cast(^rocksdb.DB)ctx // I know we'll use in future but because we're not right now, compiler is complaining +// DynamoDB request handler - called for each HTTP request with request-scoped arena allocator +handle_dynamodb_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_alloc: mem.Allocator) -> HTTP_Response { + engine := cast(^dynamodb.Storage_Engine)ctx + // All allocations in this function use the request arena automatically response := response_init(request_alloc) response_add_header(&response, "Content-Type", "application/x-amz-json-1.0") response_add_header(&response, "x-amzn-RequestId", "local-request-id") - // Get X-Amz-Target header + // Get X-Amz-Target header to determine operation target := request_get_header(request, "X-Amz-Target") + if target == nil { + return make_error_response(&response, .ValidationException, "Missing X-Amz-Target header") + } - if t, ok := target.?; ok { - // Echo back the operation for now - body := fmt.aprintf("{{\"operation\":\"%s\",\"status\":\"not_implemented\"}}", t) - response_set_body(&response, transmute([]byte)body) - } else { - response_set_status(&response, .Bad_Request) - response_set_body(&response, transmute([]byte)string("{\"error\":\"Missing X-Amz-Target header\"}")) + operation := dynamodb.operation_from_target(target.?) + + // Route to appropriate handler + #partial switch operation { + case .CreateTable: + handle_create_table(engine, request, &response) + case .DeleteTable: + handle_delete_table(engine, request, &response) + case .DescribeTable: + handle_describe_table(engine, request, &response) + case .ListTables: + handle_list_tables(engine, request, &response) + case .PutItem: + handle_put_item(engine, request, &response) + case .GetItem: + handle_get_item(engine, request, &response) + case .DeleteItem: + handle_delete_item(engine, request, &response) + case .Query: + handle_query(engine, request, &response) + case .Scan: + handle_scan(engine, request, &response) + case .Unknown: + return make_error_response(&response, .ValidationException, "Unknown operation") + case: + return make_error_response(&response, .ValidationException, "Operation not implemented") } return response } +// ============================================================================ +// Table Operations +// ============================================================================ + +handle_create_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + // Parse JSON body + data, parse_err := json.parse(request.body, allocator = context.allocator) + if parse_err != nil { + make_error_response(response, .ValidationException, "Invalid JSON") + return + } + defer json.destroy_value(data) + + root, ok := data.(json.Object) + if !ok { + make_error_response(response, .ValidationException, "Request must be an object") + return + } + + // Extract TableName + table_name_val, found := root["TableName"] + if !found { + make_error_response(response, .ValidationException, "Missing TableName") + return + } + + table_name, name_ok := table_name_val.(json.String) + if !name_ok { + make_error_response(response, .ValidationException, "TableName must be a string") + return + } + + // Parse KeySchema + key_schema, ks_err := parse_key_schema(root) + if ks_err != .None { + msg := key_schema_error_message(ks_err) + make_error_response(response, .ValidationException, msg) + return + } + + // Parse AttributeDefinitions + attr_defs, ad_err := parse_attribute_definitions(root) + if ad_err != .None { + msg := attribute_definitions_error_message(ad_err) + make_error_response(response, .ValidationException, msg) + return + } + + // Validate that key attributes are defined + if !validate_key_attributes_defined(key_schema, attr_defs) { + make_error_response(response, .ValidationException, "Key attribute not defined in AttributeDefinitions") + return + } + + // Create the table + desc, create_err := dynamodb.create_table(engine, string(table_name), key_schema, attr_defs) + if create_err != .None { + #partial switch create_err { + case .Table_Already_Exists: + make_error_response(response, .ResourceInUseException, "Table already exists") + case: + make_error_response(response, .InternalServerError, "Failed to create table") + } + return + } + + // Build response + resp_body := fmt.aprintf( + `{{"TableDescription":{{"TableName":"%s","TableStatus":"%s","CreationDateTime":%d}}}}`, + desc.table_name, + dynamodb.table_status_to_string(desc.table_status), + desc.creation_date_time, + ) + + response_set_body(response, transmute([]byte)resp_body) +} + +handle_delete_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + table_name, ok := dynamodb.parse_table_name(request.body) + if !ok { + make_error_response(response, .ValidationException, "Invalid request or missing TableName") + return + } + + err := dynamodb.delete_table(engine, table_name) + if err != .None { + #partial switch err { + case .Table_Not_Found: + make_error_response(response, .ResourceNotFoundException, "Table not found") + case: + make_error_response(response, .InternalServerError, "Failed to delete table") + } + return + } + + resp_body := fmt.aprintf(`{{"TableDescription":{{"TableName":"%s","TableStatus":"DELETING"}}}}`, table_name) + response_set_body(response, transmute([]byte)resp_body) +} + +handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + table_name, ok := dynamodb.parse_table_name(request.body) + if !ok { + make_error_response(response, .ValidationException, "Invalid request or missing TableName") + return + } + + metadata, err := dynamodb.get_table_metadata(engine, table_name) + if err != .None { + #partial switch err { + case .Table_Not_Found: + make_error_response(response, .ResourceNotFoundException, "Table not found") + case: + make_error_response(response, .InternalServerError, "Failed to describe table") + } + return + } + defer dynamodb.table_metadata_destroy(&metadata, context.allocator) + + // Build response with key schema + builder := strings.builder_make() + strings.write_string(&builder, `{"Table":{"TableName":"`) + strings.write_string(&builder, metadata.table_name) + strings.write_string(&builder, `","TableStatus":"`) + strings.write_string(&builder, dynamodb.table_status_to_string(metadata.table_status)) + strings.write_string(&builder, `","CreationDateTime":`) + fmt.sbprintf(&builder, "%d", metadata.creation_date_time) + strings.write_string(&builder, `,"KeySchema":[`) + + for ks, i in metadata.key_schema { + if i > 0 do strings.write_string(&builder, ",") + fmt.sbprintf(&builder, `{"AttributeName":"%s","KeyType":"%s"}`, + ks.attribute_name, dynamodb.key_type_to_string(ks.key_type)) + } + + strings.write_string(&builder, `],"AttributeDefinitions":[`) + + for ad, i in metadata.attribute_definitions { + if i > 0 do strings.write_string(&builder, ",") + fmt.sbprintf(&builder, `{"AttributeName":"%s","AttributeType":"%s"}`, + ad.attribute_name, dynamodb.scalar_type_to_string(ad.attribute_type)) + } + + strings.write_string(&builder, `]}}`) + + resp_body := strings.to_string(builder) + response_set_body(response, transmute([]byte)resp_body) +} + +handle_list_tables :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + _ = request // Not using request body for ListTables + + tables := dynamodb.list_tables(engine) + // list_tables returns []string which may be empty, not an error + + // Build response + builder := strings.builder_make() + strings.write_string(&builder, `{"TableNames":[`) + + for table, i in tables { + if i > 0 do strings.write_string(&builder, ",") + fmt.sbprintf(&builder, `"%s"`, table) + } + + strings.write_string(&builder, `]}`) + + resp_body := strings.to_string(builder) + response_set_body(response, transmute([]byte)resp_body) +} + +// ============================================================================ +// Item Operations +// ============================================================================ + +handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + table_name, ok := dynamodb.parse_table_name(request.body) + if !ok { + make_error_response(response, .ValidationException, "Invalid request or missing TableName") + return + } + + item, item_ok := dynamodb.parse_item_from_request(request.body) + if !item_ok { + make_error_response(response, .ValidationException, "Invalid or missing Item") + return + } + defer dynamodb.item_destroy(&item) + + err := dynamodb.put_item(engine, table_name, item) + if err != .None { + #partial switch err { + case .Table_Not_Found: + make_error_response(response, .ResourceNotFoundException, "Table not found") + case .Missing_Key_Attribute: + make_error_response(response, .ValidationException, "Item missing required key attribute") + case .Invalid_Key: + make_error_response(response, .ValidationException, "Invalid key format") + case: + make_error_response(response, .InternalServerError, "Failed to put item") + } + return + } + + response_set_body(response, transmute([]byte)string("{}")) +} + +handle_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + table_name, ok := dynamodb.parse_table_name(request.body) + if !ok { + make_error_response(response, .ValidationException, "Invalid request or missing TableName") + return + } + + key, key_ok := dynamodb.parse_key_from_request(request.body) + if !key_ok { + make_error_response(response, .ValidationException, "Invalid or missing Key") + return + } + defer dynamodb.item_destroy(&key) + + item, err := dynamodb.get_item(engine, table_name, key) + if err != .None { + #partial switch err { + case .Table_Not_Found: + make_error_response(response, .ResourceNotFoundException, "Table not found") + case .Missing_Key_Attribute: + make_error_response(response, .ValidationException, "Key missing required attributes") + case .Invalid_Key: + make_error_response(response, .ValidationException, "Invalid key format") + case: + make_error_response(response, .InternalServerError, "Failed to get item") + } + return + } + + if item_val, has_item := item.?; has_item { + defer dynamodb.item_destroy(&item_val) + item_json := dynamodb.serialize_item(item_val) + resp := fmt.aprintf(`{"Item":%s}`, item_json) + response_set_body(response, transmute([]byte)resp) + } else { + response_set_body(response, transmute([]byte)string("{}")) + } +} + +handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + table_name, ok := dynamodb.parse_table_name(request.body) + if !ok { + make_error_response(response, .ValidationException, "Invalid request or missing TableName") + return + } + + key, key_ok := dynamodb.parse_key_from_request(request.body) + if !key_ok { + make_error_response(response, .ValidationException, "Invalid or missing Key") + return + } + defer dynamodb.item_destroy(&key) + + err := dynamodb.delete_item(engine, table_name, key) + if err != .None { + #partial switch err { + case .Table_Not_Found: + make_error_response(response, .ResourceNotFoundException, "Table not found") + case .Missing_Key_Attribute: + make_error_response(response, .ValidationException, "Key missing required attributes") + case .Invalid_Key: + make_error_response(response, .ValidationException, "Invalid key format") + case: + make_error_response(response, .InternalServerError, "Failed to delete item") + } + return + } + + response_set_body(response, transmute([]byte)string("{}")) +} + +// ============================================================================ +// Query and Scan Operations +// ============================================================================ + +handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + _ = engine + _ = request + // For now, return not implemented + // TODO: Implement KeyConditionExpression parsing and query logic + make_error_response(response, .ValidationException, "Query operation not yet implemented") +} + +handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { + _ = engine + _ = request + + // TODO: Implement scan operation in storage.odin + make_error_response(response, .ValidationException, "Scan operation not yet implemented") +} + +// ============================================================================ +// Schema Parsing Helpers +// ============================================================================ + +Key_Schema_Error :: enum { + None, + Missing_Key_Schema, + Invalid_Key_Schema, + No_Hash_Key, + Multiple_Hash_Keys, + Multiple_Range_Keys, + Invalid_Key_Type, +} + +parse_key_schema :: proc(root: json.Object) -> ([]dynamodb.Key_Schema_Element, Key_Schema_Error) { + key_schema_val, found := root["KeySchema"] + if !found do return nil, .Missing_Key_Schema + + key_schema_array, ok := key_schema_val.(json.Array) + if !ok do return nil, .Invalid_Key_Schema + + if len(key_schema_array) == 0 || len(key_schema_array) > 2 { + return nil, .Invalid_Key_Schema + } + + key_schema := make([]dynamodb.Key_Schema_Element, len(key_schema_array)) + + hash_count := 0 + range_count := 0 + + for elem, i in key_schema_array { + elem_obj, elem_ok := elem.(json.Object) + if !elem_ok { + // Cleanup + for j in 0.. 1 { + for ks in key_schema { + delete(ks.attribute_name) + } + delete(key_schema) + return nil, .Multiple_Hash_Keys + } + + if range_count > 1 { + for ks in key_schema { + delete(ks.attribute_name) + } + delete(key_schema) + return nil, .Multiple_Range_Keys + } + + return key_schema, .None +} + +key_schema_error_message :: proc(err: Key_Schema_Error) -> string { + switch err { + case .None: return "" + case .Missing_Key_Schema: return "Missing KeySchema" + case .Invalid_Key_Schema: return "Invalid KeySchema format" + case .No_Hash_Key: return "KeySchema must contain exactly one HASH key" + case .Multiple_Hash_Keys: return "KeySchema can only contain one HASH key" + case .Multiple_Range_Keys: return "KeySchema can only contain one RANGE key" + case .Invalid_Key_Type: return "Invalid KeyType (must be HASH or RANGE)" + } + return "Invalid KeySchema" +} + +Attribute_Definitions_Error :: enum { + None, + Missing_Attribute_Definitions, + Invalid_Attribute_Definitions, + Invalid_Attribute_Type, + Duplicate_Attribute_Name, +} + +parse_attribute_definitions :: proc(root: json.Object) -> ([]dynamodb.Attribute_Definition, Attribute_Definitions_Error) { + attr_defs_val, found := root["AttributeDefinitions"] + if !found do return nil, .Missing_Attribute_Definitions + + attr_defs_array, ok := attr_defs_val.(json.Array) + if !ok do return nil, .Invalid_Attribute_Definitions + + if len(attr_defs_array) == 0 { + return nil, .Invalid_Attribute_Definitions + } + + attr_defs := make([]dynamodb.Attribute_Definition, len(attr_defs_array)) + seen_names := make(map[string]bool, allocator = context.temp_allocator) + defer delete(seen_names) + + for elem, i in attr_defs_array { + elem_obj, elem_ok := elem.(json.Object) + if !elem_ok { + for j in 0.. string { + switch err { + case .None: return "" + case .Missing_Attribute_Definitions: return "Missing AttributeDefinitions" + case .Invalid_Attribute_Definitions: return "Invalid AttributeDefinitions format" + case .Invalid_Attribute_Type: return "Invalid AttributeType (must be S, N, or B)" + case .Duplicate_Attribute_Name: return "Duplicate attribute name in AttributeDefinitions" + } + return "Invalid AttributeDefinitions" +} + +validate_key_attributes_defined :: proc(key_schema: []dynamodb.Key_Schema_Element, attr_defs: []dynamodb.Attribute_Definition) -> bool { + for ks in key_schema { + found := false + for ad in attr_defs { + if ks.attribute_name == ad.attribute_name { + found = true + break + } + } + if !found do return false + } + return true +} + +// ============================================================================ +// Error Response Helper +// ============================================================================ + +make_error_response :: proc(response: ^HTTP_Response, err_type: dynamodb.DynamoDB_Error_Type, message: string) -> HTTP_Response { + response_set_status(response, .Bad_Request) + error_body := dynamodb.error_to_response(err_type, message) + response_set_body(response, transmute([]byte)error_body) + return response^ +} + +// ============================================================================ +// Configuration +// ============================================================================ + parse_config :: proc() -> Config { config := Config{ host = "0.0.0.0", diff --git a/rocksdb_shim/rocksdb_shim.cc b/rocksdb_shim/rocksdb_shim.cc index 43c5b5c..f49f75a 100644 --- a/rocksdb_shim/rocksdb_shim.cc +++ b/rocksdb_shim/rocksdb_shim.cc @@ -1,22 +1,167 @@ -// TODO: In order to use RocksDB's WAL replication helpers, we need to import the C++ library so we use this shim -/** - C++ shim implementation notes (the important bits) +#include "rocksdb_shim.h" +#include +#include +#include +#include +#include - In this rocksdb_shim.cc we'll need to use: +// Internal structure wrapping rocksdb::DB +struct jormun_db { + rocksdb::DB* db; +}; - rocksdb::DB::Open(...) +// Placeholder for WAL iterator (not implemented yet) +struct jormun_wal_iter { + // TODO: Implement with TransactionLogIterator when needed + void* placeholder; +}; - db->GetLatestSequenceNumber() +// Open database +jormun_db* jormun_db_open(const char* path, int create_if_missing, char** err) { + rocksdb::Options options; + options.create_if_missing = create_if_missing != 0; + + rocksdb::DB* db_ptr = nullptr; + rocksdb::Status status = rocksdb::DB::Open(options, path, &db_ptr); + + if (!status.ok()) { + if (err) { + std::string error_msg = status.ToString(); + *err = strdup(error_msg.c_str()); + } + return nullptr; + } + + jormun_db* jdb = new jormun_db; + jdb->db = db_ptr; + return jdb; +} - db->GetUpdatesSince(seq, &iter) +// Close database +void jormun_db_close(jormun_db* db) { + if (db) { + delete db->db; + delete db; + } +} - from each TransactionLogIterator entry: +// Put key-value pair +void jormun_db_put(jormun_db* db, + const void* key, size_t keylen, + const void* val, size_t vallen, + char** err) { + if (!db || !db->db) { + if (err) *err = strdup("Database is null"); + return; + } + + rocksdb::WriteOptions write_options; + rocksdb::Slice key_slice(static_cast(key), keylen); + rocksdb::Slice val_slice(static_cast(val), vallen); + + rocksdb::Status status = db->db->Put(write_options, key_slice, val_slice); + + if (!status.ok() && err) { + std::string error_msg = status.ToString(); + *err = strdup(error_msg.c_str()); + } +} - get WriteBatch and serialize via WriteBatch::Data() +// Get value for key +unsigned char* jormun_db_get(jormun_db* db, + const void* key, size_t keylen, + size_t* vallen, + char** err) { + if (!db || !db->db) { + if (err) *err = strdup("Database is null"); + return nullptr; + } + + rocksdb::ReadOptions read_options; + rocksdb::Slice key_slice(static_cast(key), keylen); + std::string value; + + rocksdb::Status status = db->db->Get(read_options, key_slice, &value); + + if (status.IsNotFound()) { + *vallen = 0; + return nullptr; + } + + if (!status.ok()) { + if (err) { + std::string error_msg = status.ToString(); + *err = strdup(error_msg.c_str()); + } + return nullptr; + } + + // Allocate and copy value + *vallen = value.size(); + unsigned char* result = static_cast(malloc(value.size())); + if (result) { + memcpy(result, value.data(), value.size()); + } + + return result; +} - apply via rocksdb::WriteBatch wb(data); db->Write(write_options, &wb); +// Free memory allocated by the shim +void jormun_free(void* p) { + free(p); +} - Also we must configure WAL retention so the followers don’t fall off the end. RocksDB warns the iterator can become invalid if WAL is cleared aggressively; typical controls are WAL TTL / size limit. +// ============================================================================ +// WAL Replication Functions (Stubs for now - to be implemented) +// ============================================================================ - https://github.com/facebook/rocksdb/issues/1565 - */ \ No newline at end of file +// Get latest sequence number +uint64_t jormun_latest_sequence(jormun_db* db) { + if (!db || !db->db) return 0; + return db->db->GetLatestSequenceNumber(); +} + +// Create WAL iterator (stub) +jormun_wal_iter* jormun_wal_iter_create(jormun_db* db, uint64_t seq, char** err) { + (void)db; + (void)seq; + if (err) { + *err = strdup("WAL iteration not yet implemented"); + } + return nullptr; +} + +// Destroy WAL iterator (stub) +void jormun_wal_iter_destroy(jormun_wal_iter* it) { + if (it) { + delete it; + } +} + +// Get next batch from WAL (stub) +int jormun_wal_iter_next(jormun_wal_iter* it, + uint64_t* batch_start_seq, + unsigned char** out_data, + size_t* out_len, + char** err) { + (void)it; + (void)batch_start_seq; + (void)out_data; + (void)out_len; + if (err) { + *err = strdup("WAL iteration not yet implemented"); + } + return 0; +} + +// Apply write batch (stub) +void jormun_apply_writebatch(jormun_db* db, + const unsigned char* data, size_t len, + char** err) { + (void)db; + (void)data; + (void)len; + if (err) { + *err = strdup("WAL apply not yet implemented"); + } +}