consolidate

This commit is contained in:
2026-02-15 13:56:08 -05:00
parent 42db451349
commit cf352dde23
8 changed files with 1096 additions and 338 deletions

View File

@@ -14,7 +14,7 @@ SHIM_HDRS := $(SHIM_DIR)/rocksdb_shim.h
CXX := g++ CXX := g++
AR := ar AR := ar
CXXFLAGS := -O2 -fPIC -std=c++17 $(INCLUDE_PATH) CXXFLAGS := -O2 -fPIC -std=c++20 $(INCLUDE_PATH)
# RocksDB and compression libraries # RocksDB and compression libraries

View File

@@ -1,12 +1,10 @@
// Binary TLV (Type-Length-Value) encoding for DynamoDB items // Binary TLV (Type-Length-Value) encoding for DynamoDB items
// Replaces JSON storage with efficient binary format // Replaces JSON storage with efficient binary format
// Format: [attribute_count][name_len][name][type_tag][value_len][value]... // Format: [attribute_count][name_len][name][type_tag][value_len][value]...
package item_codec package dynamodb
import "core:bytes" import "core:bytes"
import "core:encoding/varint"
import "core:slice" import "core:slice"
import "../dynamodb"
// Type tags for binary encoding (1 byte each) // Type tags for binary encoding (1 byte each)
Type_Tag :: enum u8 { Type_Tag :: enum u8 {
@@ -16,12 +14,12 @@ Type_Tag :: enum u8 {
Binary = 0x03, // B (base64 string) Binary = 0x03, // B (base64 string)
Boolean = 0x04, // BOOL Boolean = 0x04, // BOOL
Null = 0x05, // NULL Null = 0x05, // NULL
// Set types // Set types
String_Set = 0x10, // SS String_Set = 0x10, // SS
Number_Set = 0x11, // NS Number_Set = 0x11, // NS
Binary_Set = 0x12, // BS Binary_Set = 0x12, // BS
// Complex types // Complex types
List = 0x20, // L List = 0x20, // L
Map = 0x21, // M Map = 0x21, // M
@@ -34,93 +32,93 @@ Type_Tag :: enum u8 {
// Encode an Item to binary TLV format // Encode an Item to binary TLV format
// Format: [attribute_count:varint][attributes...] // Format: [attribute_count:varint][attributes...]
// Each attribute: [name_len:varint][name:bytes][type_tag:u8][value_encoded:bytes] // Each attribute: [name_len:varint][name:bytes][type_tag:u8][value_encoded:bytes]
encode :: proc(item: dynamodb.Item) -> ([]byte, bool) { encode :: proc(item: Item) -> ([]byte, bool) {
buf: bytes.Buffer buf: bytes.Buffer
bytes.buffer_init_allocator(&buf, 0, 1024, context.allocator) bytes.buffer_init_allocator(&buf, 0, 1024, context.allocator)
defer bytes.buffer_destroy(&buf) defer bytes.buffer_destroy(&buf)
// Write attribute count // Write attribute count
encode_varint(&buf, len(item)) encode_varint(&buf, len(item))
// Collect and sort keys for deterministic encoding // Collect and sort keys for deterministic encoding
keys := make([dynamic]string, context.temp_allocator) keys := make([dynamic]string, context.temp_allocator)
for key in item { for key in item {
append(&keys, key) append(&keys, key)
} }
slice.sort_by(keys[:], proc(a, b: string) -> bool { slice.sort_by(keys[:], proc(a, b: string) -> bool {
return a < b return a < b
}) })
// Encode each attribute // Encode each attribute
for key in keys { for key in keys {
value := item[key] value := item[key]
// Write attribute name // Write attribute name
encode_varint(&buf, len(key)) encode_varint(&buf, len(key))
bytes.buffer_write_string(&buf, key) bytes.buffer_write_string(&buf, key)
// Encode attribute value // Encode attribute value
ok := encode_attribute_value(&buf, value) ok := encode_attribute_value(&buf, value)
if !ok { if !ok {
return nil, false return nil, false
} }
} }
return bytes.buffer_to_bytes(&buf), true return bytes.buffer_to_bytes(&buf), true
} }
// Encode an AttributeValue to binary format // Encode an AttributeValue to binary format
encode_attribute_value :: proc(buf: ^bytes.Buffer, attr: dynamodb.Attribute_Value) -> bool { encode_attribute_value :: proc(buf: ^bytes.Buffer, attr: Attribute_Value) -> bool {
switch v in attr { switch v in attr {
case dynamodb.String: case String:
bytes.buffer_write_byte(buf, u8(Type_Tag.String)) bytes.buffer_write_byte(buf, u8(Type_Tag.String))
encode_varint(buf, len(v)) encode_varint(buf, len(v))
bytes.buffer_write_string(buf, string(v)) bytes.buffer_write_string(buf, string(v))
case dynamodb.Number: case Number:
bytes.buffer_write_byte(buf, u8(Type_Tag.Number)) bytes.buffer_write_byte(buf, u8(Type_Tag.Number))
encode_varint(buf, len(v)) encode_varint(buf, len(v))
bytes.buffer_write_string(buf, string(v)) bytes.buffer_write_string(buf, string(v))
case dynamodb.Binary: case Binary:
bytes.buffer_write_byte(buf, u8(Type_Tag.Binary)) bytes.buffer_write_byte(buf, u8(Type_Tag.Binary))
encode_varint(buf, len(v)) encode_varint(buf, len(v))
bytes.buffer_write_string(buf, string(v)) bytes.buffer_write_string(buf, string(v))
case dynamodb.Bool: case Bool:
bytes.buffer_write_byte(buf, u8(Type_Tag.Boolean)) bytes.buffer_write_byte(buf, u8(Type_Tag.Boolean))
bytes.buffer_write_byte(buf, 1 if bool(v) else 0) bytes.buffer_write_byte(buf, 1 if bool(v) else 0)
case dynamodb.Null: case Null:
bytes.buffer_write_byte(buf, u8(Type_Tag.Null)) bytes.buffer_write_byte(buf, u8(Type_Tag.Null))
// NULL has no value bytes // NULL has no value bytes
case dynamodb.String_Set: case String_Set:
bytes.buffer_write_byte(buf, u8(Type_Tag.String_Set)) bytes.buffer_write_byte(buf, u8(Type_Tag.String_Set))
encode_varint(buf, len(v)) encode_varint(buf, len(v))
for s in v { for s in v {
encode_varint(buf, len(s)) encode_varint(buf, len(s))
bytes.buffer_write_string(buf, s) bytes.buffer_write_string(buf, s)
} }
case dynamodb.Number_Set: case Number_Set:
bytes.buffer_write_byte(buf, u8(Type_Tag.Number_Set)) bytes.buffer_write_byte(buf, u8(Type_Tag.Number_Set))
encode_varint(buf, len(v)) encode_varint(buf, len(v))
for n in v { for n in v {
encode_varint(buf, len(n)) encode_varint(buf, len(n))
bytes.buffer_write_string(buf, n) bytes.buffer_write_string(buf, n)
} }
case dynamodb.Binary_Set: case Binary_Set:
bytes.buffer_write_byte(buf, u8(Type_Tag.Binary_Set)) bytes.buffer_write_byte(buf, u8(Type_Tag.Binary_Set))
encode_varint(buf, len(v)) encode_varint(buf, len(v))
for b in v { for b in v {
encode_varint(buf, len(b)) encode_varint(buf, len(b))
bytes.buffer_write_string(buf, b) bytes.buffer_write_string(buf, b)
} }
case dynamodb.List: case List:
bytes.buffer_write_byte(buf, u8(Type_Tag.List)) bytes.buffer_write_byte(buf, u8(Type_Tag.List))
encode_varint(buf, len(v)) encode_varint(buf, len(v))
for item in v { for item in v {
@@ -129,21 +127,21 @@ encode_attribute_value :: proc(buf: ^bytes.Buffer, attr: dynamodb.Attribute_Valu
return false return false
} }
} }
case dynamodb.Map: case Map:
bytes.buffer_write_byte(buf, u8(Type_Tag.Map)) bytes.buffer_write_byte(buf, u8(Type_Tag.Map))
encode_varint(buf, len(v)) encode_varint(buf, len(v))
// Collect and sort keys for deterministic encoding // Collect and sort keys for deterministic encoding
keys := make([dynamic]string, context.temp_allocator) keys := make([dynamic]string, context.temp_allocator)
for key in v { for key in v {
append(&keys, key) append(&keys, key)
} }
slice.sort_by(keys[:], proc(a, b: string) -> bool { slice.sort_by(keys[:], proc(a, b: string) -> bool {
return a < b return a < b
}) })
// Encode each map entry // Encode each map entry
for key in keys { for key in keys {
value := v[key] value := v[key]
@@ -155,7 +153,7 @@ encode_attribute_value :: proc(buf: ^bytes.Buffer, attr: dynamodb.Attribute_Valu
} }
} }
} }
return true return true
} }
@@ -169,15 +167,11 @@ Binary_Decoder :: struct {
pos: int, pos: int,
} }
decoder_init :: proc(data: []byte) -> Binary_Decoder {
return Binary_Decoder{data = data, pos = 0}
}
decoder_read_byte :: proc(decoder: ^Binary_Decoder) -> (u8, bool) { decoder_read_byte :: proc(decoder: ^Binary_Decoder) -> (u8, bool) {
if decoder.pos >= len(decoder.data) { if decoder.pos >= len(decoder.data) {
return 0, false return 0, false
} }
byte := decoder.data[decoder.pos] byte := decoder.data[decoder.pos]
decoder.pos += 1 decoder.pos += 1
return byte, true return byte, true
@@ -187,7 +181,7 @@ decoder_read_bytes :: proc(decoder: ^Binary_Decoder, length: int) -> ([]byte, bo
if decoder.pos + length > len(decoder.data) { if decoder.pos + length > len(decoder.data) {
return nil, false return nil, false
} }
bytes := decoder.data[decoder.pos:decoder.pos + length] bytes := decoder.data[decoder.pos:decoder.pos + length]
decoder.pos += length decoder.pos += length
return bytes, true return bytes, true
@@ -196,143 +190,143 @@ decoder_read_bytes :: proc(decoder: ^Binary_Decoder, length: int) -> ([]byte, bo
decoder_read_varint :: proc(decoder: ^Binary_Decoder) -> (int, bool) { decoder_read_varint :: proc(decoder: ^Binary_Decoder) -> (int, bool) {
result: int = 0 result: int = 0
shift: uint = 0 shift: uint = 0
for decoder.pos < len(decoder.data) { for decoder.pos < len(decoder.data) {
byte := decoder.data[decoder.pos] byte := decoder.data[decoder.pos]
decoder.pos += 1 decoder.pos += 1
result |= int(byte & 0x7F) << shift result |= int(byte & 0x7F) << shift
if (byte & 0x80) == 0 { if (byte & 0x80) == 0 {
return result, true return result, true
} }
shift += 7 shift += 7
if shift >= 64 { if shift >= 64 {
return 0, false // Varint overflow return 0, false // Varint overflow
} }
} }
return 0, false // Unexpected end of data return 0, false // Unexpected end of data
} }
// Decode binary TLV format back into an Item // Decode binary TLV format back into an Item
decode :: proc(data: []byte) -> (dynamodb.Item, bool) { decode :: proc(data: []byte) -> (Item, bool) {
decoder := decoder_init(data) decoder := Binary_Decoder{data = data, pos = 0}
attr_count, count_ok := decoder_read_varint(&decoder) attr_count, count_ok := decoder_read_varint(&decoder)
if !count_ok { if !count_ok {
return {}, false return {}, false
} }
item := make(dynamodb.Item) item := make(Item)
for i in 0..<attr_count { for _ in 0..<attr_count {
// Read attribute name // Read attribute name
name_len, name_len_ok := decoder_read_varint(&decoder) name_len, name_len_ok := decoder_read_varint(&decoder)
if !name_len_ok { if !name_len_ok {
// Cleanup on error // Cleanup on error
dynamodb.item_destroy(&item) item_destroy(&item)
return {}, false return {}, false
} }
name_bytes, name_ok := decoder_read_bytes(&decoder, name_len) name_bytes, name_ok := decoder_read_bytes(&decoder, name_len)
if !name_ok { if !name_ok {
dynamodb.item_destroy(&item) item_destroy(&item)
return {}, false return {}, false
} }
owned_name := string(name_bytes) owned_name := string(name_bytes)
owned_name = transmute(string)slice.clone(transmute([]byte)owned_name) owned_name = transmute(string)slice.clone(transmute([]byte)owned_name)
// Read attribute value // Read attribute value
value, value_ok := decode_attribute_value(&decoder) value, value_ok := decode_attribute_value(&decoder)
if !value_ok { if !value_ok {
delete(owned_name) delete(owned_name)
dynamodb.item_destroy(&item) item_destroy(&item)
return {}, false return {}, false
} }
item[owned_name] = value item[owned_name] = value
} }
return item, true return item, true
} }
// Decode an AttributeValue from binary format // Decode an AttributeValue from binary format
decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_Value, bool) { decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (Attribute_Value, bool) {
type_byte, type_ok := decoder_read_byte(decoder) type_byte, type_ok := decoder_read_byte(decoder)
if !type_ok { if !type_ok {
return nil, false return nil, false
} }
type_tag := Type_Tag(type_byte) type_tag := Type_Tag(type_byte)
switch type_tag { switch type_tag {
case .String: case .String:
length, len_ok := decoder_read_varint(decoder) length, len_ok := decoder_read_varint(decoder)
if !len_ok { if !len_ok {
return nil, false return nil, false
} }
data, data_ok := decoder_read_bytes(decoder, length) data, data_ok := decoder_read_bytes(decoder, length)
if !data_ok { if !data_ok {
return nil, false return nil, false
} }
str := string(data) str := string(data)
owned := transmute(string)slice.clone(transmute([]byte)str) owned := transmute(string)slice.clone(transmute([]byte)str)
return dynamodb.String(owned), true return String(owned), true
case .Number: case .Number:
length, len_ok := decoder_read_varint(decoder) length, len_ok := decoder_read_varint(decoder)
if !len_ok { if !len_ok {
return nil, false return nil, false
} }
data, data_ok := decoder_read_bytes(decoder, length) data, data_ok := decoder_read_bytes(decoder, length)
if !data_ok { if !data_ok {
return nil, false return nil, false
} }
str := string(data) str := string(data)
owned := transmute(string)slice.clone(transmute([]byte)str) owned := transmute(string)slice.clone(transmute([]byte)str)
return dynamodb.Number(owned), true return Number(owned), true
case .Binary: case .Binary:
length, len_ok := decoder_read_varint(decoder) length, len_ok := decoder_read_varint(decoder)
if !len_ok { if !len_ok {
return nil, false return nil, false
} }
data, data_ok := decoder_read_bytes(decoder, length) data, data_ok := decoder_read_bytes(decoder, length)
if !data_ok { if !data_ok {
return nil, false return nil, false
} }
str := string(data) str := string(data)
owned := transmute(string)slice.clone(transmute([]byte)str) owned := transmute(string)slice.clone(transmute([]byte)str)
return dynamodb.Binary(owned), true return Binary(owned), true
case .Boolean: case .Boolean:
byte, byte_ok := decoder_read_byte(decoder) byte, byte_ok := decoder_read_byte(decoder)
if !byte_ok { if !byte_ok {
return nil, false return nil, false
} }
return dynamodb.Bool(byte != 0), true return Bool(byte != 0), true
case .Null: case .Null:
return dynamodb.Null(true), true return Null(true), true
case .String_Set: case .String_Set:
count, count_ok := decoder_read_varint(decoder) count, count_ok := decoder_read_varint(decoder)
if !count_ok { if !count_ok {
return nil, false return nil, false
} }
strings := make([]string, count) strings := make([]string, count)
for i in 0..<count { for i in 0..<count {
length, len_ok := decoder_read_varint(decoder) length, len_ok := decoder_read_varint(decoder)
if !len_ok { if !len_ok {
@@ -343,7 +337,7 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_
delete(strings) delete(strings)
return nil, false return nil, false
} }
data, data_ok := decoder_read_bytes(decoder, length) data, data_ok := decoder_read_bytes(decoder, length)
if !data_ok { if !data_ok {
for j in 0..<i { for j in 0..<i {
@@ -352,21 +346,21 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_
delete(strings) delete(strings)
return nil, false return nil, false
} }
str := string(data) str := string(data)
strings[i] = transmute(string)slice.clone(transmute([]byte)str) strings[i] = transmute(string)slice.clone(transmute([]byte)str)
} }
return dynamodb.String_Set(strings), true return String_Set(strings), true
case .Number_Set: case .Number_Set:
count, count_ok := decoder_read_varint(decoder) count, count_ok := decoder_read_varint(decoder)
if !count_ok { if !count_ok {
return nil, false return nil, false
} }
numbers := make([]string, count) numbers := make([]string, count)
for i in 0..<count { for i in 0..<count {
length, len_ok := decoder_read_varint(decoder) length, len_ok := decoder_read_varint(decoder)
if !len_ok { if !len_ok {
@@ -376,7 +370,7 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_
delete(numbers) delete(numbers)
return nil, false return nil, false
} }
data, data_ok := decoder_read_bytes(decoder, length) data, data_ok := decoder_read_bytes(decoder, length)
if !data_ok { if !data_ok {
for j in 0..<i { for j in 0..<i {
@@ -385,21 +379,21 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_
delete(numbers) delete(numbers)
return nil, false return nil, false
} }
str := string(data) str := string(data)
numbers[i] = transmute(string)slice.clone(transmute([]byte)str) numbers[i] = transmute(string)slice.clone(transmute([]byte)str)
} }
return dynamodb.Number_Set(numbers), true return Number_Set(numbers), true
case .Binary_Set: case .Binary_Set:
count, count_ok := decoder_read_varint(decoder) count, count_ok := decoder_read_varint(decoder)
if !count_ok { if !count_ok {
return nil, false return nil, false
} }
binaries := make([]string, count) binaries := make([]string, count)
for i in 0..<count { for i in 0..<count {
length, len_ok := decoder_read_varint(decoder) length, len_ok := decoder_read_varint(decoder)
if !len_ok { if !len_ok {
@@ -409,7 +403,7 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_
delete(binaries) delete(binaries)
return nil, false return nil, false
} }
data, data_ok := decoder_read_bytes(decoder, length) data, data_ok := decoder_read_bytes(decoder, length)
if !data_ok { if !data_ok {
for j in 0..<i { for j in 0..<i {
@@ -418,47 +412,47 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_
delete(binaries) delete(binaries)
return nil, false return nil, false
} }
str := string(data) str := string(data)
binaries[i] = transmute(string)slice.clone(transmute([]byte)str) binaries[i] = transmute(string)slice.clone(transmute([]byte)str)
} }
return dynamodb.Binary_Set(binaries), true return Binary_Set(binaries), true
case .List: case .List:
count, count_ok := decoder_read_varint(decoder) count, count_ok := decoder_read_varint(decoder)
if !count_ok { if !count_ok {
return nil, false return nil, false
} }
list := make([]dynamodb.Attribute_Value, count) list := make([]Attribute_Value, count)
for i in 0..<count { for i in 0..<count {
value, value_ok := decode_attribute_value(decoder) value, value_ok := decode_attribute_value(decoder)
if !value_ok { if !value_ok {
// Cleanup on error // Cleanup on error
for j in 0..<i { for j in 0..<i {
item := list[j] item := list[j]
dynamodb.attr_value_destroy(&item) attr_value_destroy(&item)
} }
delete(list) delete(list)
return nil, false return nil, false
} }
list[i] = value list[i] = value
} }
return dynamodb.List(list), true return List(list), true
case .Map: case .Map:
count, count_ok := decoder_read_varint(decoder) count, count_ok := decoder_read_varint(decoder)
if !count_ok { if !count_ok {
return nil, false return nil, false
} }
attr_map := make(map[string]dynamodb.Attribute_Value) attr_map := make(map[string]Attribute_Value)
for i in 0..<count { for _ in 0..<count {
// Read key // Read key
key_len, key_len_ok := decoder_read_varint(decoder) key_len, key_len_ok := decoder_read_varint(decoder)
if !key_len_ok { if !key_len_ok {
@@ -466,26 +460,26 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_
for k, v in attr_map { for k, v in attr_map {
delete(k) delete(k)
v_copy := v v_copy := v
dynamodb.attr_value_destroy(&v_copy) attr_value_destroy(&v_copy)
} }
delete(attr_map) delete(attr_map)
return nil, false return nil, false
} }
key_bytes, key_ok := decoder_read_bytes(decoder, key_len) key_bytes, key_ok := decoder_read_bytes(decoder, key_len)
if !key_ok { if !key_ok {
for k, v in attr_map { for k, v in attr_map {
delete(k) delete(k)
v_copy := v v_copy := v
dynamodb.attr_value_destroy(&v_copy) attr_value_destroy(&v_copy)
} }
delete(attr_map) delete(attr_map)
return nil, false return nil, false
} }
key := string(key_bytes) key := string(key_bytes)
owned_key := transmute(string)slice.clone(transmute([]byte)key) owned_key := transmute(string)slice.clone(transmute([]byte)key)
// Read value // Read value
value, value_ok := decode_attribute_value(decoder) value, value_ok := decode_attribute_value(decoder)
if !value_ok { if !value_ok {
@@ -493,23 +487,23 @@ decode_attribute_value :: proc(decoder: ^Binary_Decoder) -> (dynamodb.Attribute_
for k, v in attr_map { for k, v in attr_map {
delete(k) delete(k)
v_copy := v v_copy := v
dynamodb.attr_value_destroy(&v_copy) attr_value_destroy(&v_copy)
} }
delete(attr_map) delete(attr_map)
return nil, false return nil, false
} }
attr_map[owned_key] = value attr_map[owned_key] = value
} }
return dynamodb.Map(attr_map), true return Map(attr_map), true
} }
return nil, false return nil, false
} }
// ============================================================================ // ============================================================================
// Varint Encoding // Varint Encoding (Encodes a varint length prefix)
// ============================================================================ // ============================================================================
encode_varint :: proc(buf: ^bytes.Buffer, value: int) { encode_varint :: proc(buf: ^bytes.Buffer, value: int) {
@@ -517,7 +511,7 @@ encode_varint :: proc(buf: ^bytes.Buffer, value: int) {
for { for {
byte := u8(v & 0x7F) byte := u8(v & 0x7F)
v >>= 7 v >>= 7
if v == 0 { if v == 0 {
bytes.buffer_write_byte(buf, byte) bytes.buffer_write_byte(buf, byte)
return return

View File

@@ -4,7 +4,6 @@ package dynamodb
import "core:encoding/json" import "core:encoding/json"
import "core:fmt" import "core:fmt"
import "core:mem"
import "core:slice" import "core:slice"
import "core:strings" import "core:strings"
@@ -20,7 +19,7 @@ parse_item :: proc(json_bytes: []byte) -> (Item, bool) {
return {}, false return {}, false
} }
defer json.destroy_value(data) defer json.destroy_value(data)
return parse_item_from_value(data) return parse_item_from_value(data)
} }
@@ -31,12 +30,12 @@ parse_item_from_value :: proc(value: json.Value) -> (Item, bool) {
if !ok { if !ok {
return {}, false return {}, false
} }
item := make(Item) item := make(Item)
for key, val in obj { for key, val in obj {
attr_name := strings.clone(key) attr_name := strings.clone(key)
attr_value, attr_ok := parse_attribute_value(val) attr_value, attr_ok := parse_attribute_value(val)
if !attr_ok { if !attr_ok {
// Cleanup on error // Cleanup on error
@@ -49,10 +48,10 @@ parse_item_from_value :: proc(value: json.Value) -> (Item, bool) {
delete(attr_name) delete(attr_name)
return {}, false return {}, false
} }
item[attr_name] = attr_value item[attr_name] = attr_value
} }
return item, true return item, true
} }
@@ -63,12 +62,12 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
if !ok { if !ok {
return nil, false return nil, false
} }
// DynamoDB attribute must have exactly one key (the type indicator) // DynamoDB attribute must have exactly one key (the type indicator)
if len(obj) != 1 { if len(obj) != 1 {
return nil, false return nil, false
} }
// Get the single key-value pair // Get the single key-value pair
for type_name, type_value in obj { for type_name, type_value in obj {
// String // String
@@ -79,7 +78,7 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
return String(strings.clone(string(str))), true return String(strings.clone(string(str))), true
} }
// Number (stored as string) // Number (stored as string)
if type_name == "N" { if type_name == "N" {
str, str_ok := type_value.(json.String) str, str_ok := type_value.(json.String)
@@ -88,7 +87,7 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
return Number(strings.clone(string(str))), true return Number(strings.clone(string(str))), true
} }
// Binary (base64 string) // Binary (base64 string)
if type_name == "B" { if type_name == "B" {
str, str_ok := type_value.(json.String) str, str_ok := type_value.(json.String)
@@ -97,7 +96,7 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
return Binary(strings.clone(string(str))), true return Binary(strings.clone(string(str))), true
} }
// Boolean // Boolean
if type_name == "BOOL" { if type_name == "BOOL" {
b, b_ok := type_value.(json.Boolean) b, b_ok := type_value.(json.Boolean)
@@ -106,7 +105,7 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
return Bool(b), true return Bool(b), true
} }
// Null // Null
if type_name == "NULL" { if type_name == "NULL" {
b, b_ok := type_value.(json.Boolean) b, b_ok := type_value.(json.Boolean)
@@ -115,16 +114,16 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
return Null(b), true return Null(b), true
} }
// String Set // String Set
if type_name == "SS" { if type_name == "SS" {
arr, arr_ok := type_value.(json.Array) arr, arr_ok := type_value.(json.Array)
if !arr_ok { if !arr_ok {
return nil, false return nil, false
} }
strings_arr := make([]string, len(arr)) strings_arr := make([]string, len(arr))
for item, i in arr { for item, i in arr {
str, str_ok := item.(json.String) str, str_ok := item.(json.String)
if !str_ok { if !str_ok {
@@ -137,19 +136,19 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
strings_arr[i] = strings.clone(string(str)) strings_arr[i] = strings.clone(string(str))
} }
return String_Set(strings_arr), true return String_Set(strings_arr), true
} }
// Number Set // Number Set
if type_name == "NS" { if type_name == "NS" {
arr, arr_ok := type_value.(json.Array) arr, arr_ok := type_value.(json.Array)
if !arr_ok { if !arr_ok {
return nil, false return nil, false
} }
numbers_arr := make([]string, len(arr)) numbers_arr := make([]string, len(arr))
for item, i in arr { for item, i in arr {
str, str_ok := item.(json.String) str, str_ok := item.(json.String)
if !str_ok { if !str_ok {
@@ -162,19 +161,19 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
numbers_arr[i] = strings.clone(string(str)) numbers_arr[i] = strings.clone(string(str))
} }
return Number_Set(numbers_arr), true return Number_Set(numbers_arr), true
} }
// Binary Set // Binary Set
if type_name == "BS" { if type_name == "BS" {
arr, arr_ok := type_value.(json.Array) arr, arr_ok := type_value.(json.Array)
if !arr_ok { if !arr_ok {
return nil, false return nil, false
} }
binaries_arr := make([]string, len(arr)) binaries_arr := make([]string, len(arr))
for item, i in arr { for item, i in arr {
str, str_ok := item.(json.String) str, str_ok := item.(json.String)
if !str_ok { if !str_ok {
@@ -187,19 +186,19 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
binaries_arr[i] = strings.clone(string(str)) binaries_arr[i] = strings.clone(string(str))
} }
return Binary_Set(binaries_arr), true return Binary_Set(binaries_arr), true
} }
// List // List
if type_name == "L" { if type_name == "L" {
arr, arr_ok := type_value.(json.Array) arr, arr_ok := type_value.(json.Array)
if !arr_ok { if !arr_ok {
return nil, false return nil, false
} }
list := make([]Attribute_Value, len(arr)) list := make([]Attribute_Value, len(arr))
for item, i in arr { for item, i in arr {
val, val_ok := parse_attribute_value(item) val, val_ok := parse_attribute_value(item)
if !val_ok { if !val_ok {
@@ -213,22 +212,22 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
} }
list[i] = val list[i] = val
} }
return List(list), true return List(list), true
} }
// Map // Map
if type_name == "M" { if type_name == "M" {
map_obj, map_ok := type_value.(json.Object) map_obj, map_ok := type_value.(json.Object)
if !map_ok { if !map_ok {
return nil, false return nil, false
} }
attr_map := make(map[string]Attribute_Value) attr_map := make(map[string]Attribute_Value)
for map_key, map_val in map_obj { for map_key, map_val in map_obj {
key := strings.clone(map_key) key := strings.clone(map_key)
val, val_ok := parse_attribute_value(map_val) val, val_ok := parse_attribute_value(map_val)
if !val_ok { if !val_ok {
// Cleanup on error // Cleanup on error
@@ -241,14 +240,14 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
delete(attr_map) delete(attr_map)
return nil, false return nil, false
} }
attr_map[key] = val attr_map[key] = val
} }
return Map(attr_map), true return Map(attr_map), true
} }
} }
return nil, false return nil, false
} }
@@ -261,9 +260,9 @@ parse_attribute_value :: proc(value: json.Value) -> (Attribute_Value, bool) {
serialize_item :: proc(item: Item) -> string { serialize_item :: proc(item: Item) -> string {
builder := strings.builder_make() builder := strings.builder_make()
defer strings.builder_destroy(&builder) defer strings.builder_destroy(&builder)
serialize_item_to_builder(&builder, item) serialize_item_to_builder(&builder, item)
return strings.clone(strings.to_string(builder)) return strings.clone(strings.to_string(builder))
} }
@@ -272,16 +271,16 @@ serialize_item_to_builder :: proc(b: ^strings.Builder, item: Item) {
// Collect and sort keys for deterministic output // Collect and sort keys for deterministic output
keys := make([dynamic]string, context.temp_allocator) keys := make([dynamic]string, context.temp_allocator)
defer delete(keys) defer delete(keys)
for key in item { for key in item {
append(&keys, key) append(&keys, key)
} }
// Sort keys alphabetically // Sort keys alphabetically
slice.sort_by(keys[:], proc(a, b: string) -> bool { slice.sort_by(keys[:], proc(a, b: string) -> bool {
return a < b return a < b
}) })
strings.write_string(b, "{") strings.write_string(b, "{")
for key, i in keys { for key, i in keys {
if i > 0 { if i > 0 {
@@ -299,19 +298,19 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) {
switch v in attr { switch v in attr {
case String: case String:
fmt.sbprintf(b, `{"S":"%s"}`, string(v)) fmt.sbprintf(b, `{"S":"%s"}`, string(v))
case Number: case Number:
fmt.sbprintf(b, `{"N":"%s"}`, string(v)) fmt.sbprintf(b, `{"N":"%s"}`, string(v))
case Binary: case Binary:
fmt.sbprintf(b, `{"B":"%s"}`, string(v)) fmt.sbprintf(b, `{"B":"%s"}`, string(v))
case Bool: case Bool:
fmt.sbprintf(b, `{"BOOL":%v}`, bool(v)) fmt.sbprintf(b, `{"BOOL":%v}`, bool(v))
case Null: case Null:
strings.write_string(b, `{"NULL":true}`) strings.write_string(b, `{"NULL":true}`)
case String_Set: case String_Set:
strings.write_string(b, `{"SS":[`) strings.write_string(b, `{"SS":[`)
for s, i in v { for s, i in v {
@@ -321,7 +320,7 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) {
fmt.sbprintf(b, `"%s"`, s) fmt.sbprintf(b, `"%s"`, s)
} }
strings.write_string(b, "]}") strings.write_string(b, "]}")
case Number_Set: case Number_Set:
strings.write_string(b, `{"NS":[`) strings.write_string(b, `{"NS":[`)
for n, i in v { for n, i in v {
@@ -331,7 +330,7 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) {
fmt.sbprintf(b, `"%s"`, n) fmt.sbprintf(b, `"%s"`, n)
} }
strings.write_string(b, "]}") strings.write_string(b, "]}")
case Binary_Set: case Binary_Set:
strings.write_string(b, `{"BS":[`) strings.write_string(b, `{"BS":[`)
for bin, i in v { for bin, i in v {
@@ -341,7 +340,7 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) {
fmt.sbprintf(b, `"%s"`, bin) fmt.sbprintf(b, `"%s"`, bin)
} }
strings.write_string(b, "]}") strings.write_string(b, "]}")
case List: case List:
strings.write_string(b, `{"L":[`) strings.write_string(b, `{"L":[`)
for item, i in v { for item, i in v {
@@ -351,20 +350,20 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) {
serialize_attribute_value(b, item) serialize_attribute_value(b, item)
} }
strings.write_string(b, "]}") strings.write_string(b, "]}")
case Map: case Map:
strings.write_string(b, `{"M":{`) strings.write_string(b, `{"M":{`)
// Collect and sort keys for deterministic output // Collect and sort keys for deterministic output
keys := make([dynamic]string, context.temp_allocator) keys := make([dynamic]string, context.temp_allocator)
for key in v { for key in v {
append(&keys, key) append(&keys, key)
} }
slice.sort_by(keys[:], proc(a, b: string) -> bool { slice.sort_by(keys[:], proc(a, b: string) -> bool {
return a < b return a < b
}) })
for key, i in keys { for key, i in keys {
if i > 0 { if i > 0 {
strings.write_string(b, ",") strings.write_string(b, ",")
@@ -373,7 +372,7 @@ serialize_attribute_value :: proc(b: ^strings.Builder, attr: Attribute_Value) {
value := v[key] value := v[key]
serialize_attribute_value(b, value) serialize_attribute_value(b, value)
} }
strings.write_string(b, "}}") strings.write_string(b, "}}")
} }
} }
@@ -389,22 +388,22 @@ parse_table_name :: proc(request_body: []byte) -> (string, bool) {
return "", false return "", false
} }
defer json.destroy_value(data) defer json.destroy_value(data)
root, ok := data.(json.Object) root, ok := data.(json.Object)
if !ok { if !ok {
return "", false return "", false
} }
table_name_val, found := root["TableName"] table_name_val, found := root["TableName"]
if !found { if !found {
return "", false return "", false
} }
table_name_str, str_ok := table_name_val.(json.String) table_name_str, str_ok := table_name_val.(json.String)
if !str_ok { if !str_ok {
return "", false return "", false
} }
return string(table_name_str), true return string(table_name_str), true
} }
@@ -416,17 +415,17 @@ parse_item_from_request :: proc(request_body: []byte) -> (Item, bool) {
return {}, false return {}, false
} }
defer json.destroy_value(data) defer json.destroy_value(data)
root, ok := data.(json.Object) root, ok := data.(json.Object)
if !ok { if !ok {
return {}, false return {}, false
} }
item_val, found := root["Item"] item_val, found := root["Item"]
if !found { if !found {
return {}, false return {}, false
} }
return parse_item_from_value(item_val) return parse_item_from_value(item_val)
} }
@@ -438,17 +437,17 @@ parse_key_from_request :: proc(request_body: []byte) -> (Item, bool) {
return {}, false return {}, false
} }
defer json.destroy_value(data) defer json.destroy_value(data)
root, ok := data.(json.Object) root, ok := data.(json.Object)
if !ok { if !ok {
return {}, false return {}, false
} }
key_val, found := root["Key"] key_val, found := root["Key"]
if !found { if !found {
return {}, false return {}, false
} }
return parse_item_from_value(key_val) return parse_item_from_value(key_val)
} }
@@ -464,17 +463,17 @@ parse_limit :: proc(request_body: []byte) -> int {
return 0 return 0
} }
defer json.destroy_value(data) defer json.destroy_value(data)
root, ok := data.(json.Object) root, ok := data.(json.Object)
if !ok { if !ok {
return 0 return 0
} }
limit_val, found := root["Limit"] limit_val, found := root["Limit"]
if !found { if !found {
return 0 return 0
} }
// JSON numbers can be either Integer or Float // JSON numbers can be either Integer or Float
#partial switch v in limit_val { #partial switch v in limit_val {
case json.Integer: case json.Integer:
@@ -482,7 +481,7 @@ parse_limit :: proc(request_body: []byte) -> int {
case json.Float: case json.Float:
return int(v) return int(v)
} }
return 0 return 0
} }
@@ -494,24 +493,24 @@ parse_exclusive_start_key :: proc(request_body: []byte) -> Maybe([]byte) {
return nil return nil
} }
defer json.destroy_value(data) defer json.destroy_value(data)
root, ok := data.(json.Object) root, ok := data.(json.Object)
if !ok { if !ok {
return nil return nil
} }
key_val, found := root["ExclusiveStartKey"] key_val, found := root["ExclusiveStartKey"]
if !found { if !found {
return nil return nil
} }
// Parse as Item first // Parse as Item first
key_item, item_ok := parse_item_from_value(key_val) key_item, item_ok := parse_item_from_value(key_val)
if !item_ok { if !item_ok {
return nil return nil
} }
defer item_destroy(&key_item) defer item_destroy(&key_item)
// Convert to binary key bytes (this will be done by the storage layer) // Convert to binary key bytes (this will be done by the storage layer)
// For now, just return nil - the storage layer will handle the conversion // For now, just return nil - the storage layer will handle the conversion
return nil return nil
@@ -521,6 +520,6 @@ parse_exclusive_start_key :: proc(request_body: []byte) -> Maybe([]byte) {
serialize_last_evaluated_key :: proc(key: Key) -> string { serialize_last_evaluated_key :: proc(key: Key) -> string {
item := key_to_item(key, {}) // Empty key_schema since we don't need validation here item := key_to_item(key, {}) // Empty key_schema since we don't need validation here
defer item_destroy(&item) defer item_destroy(&item)
return serialize_item(item) return serialize_item(item)
} }

View File

@@ -1,8 +1,6 @@
package key_codec package dynamodb
import "core:bytes" import "core:bytes"
import "core:encoding/varint"
import "core:mem"
// Entity type prefix bytes for namespacing // Entity type prefix bytes for namespacing
Entity_Type :: enum u8 { Entity_Type :: enum u8 {
@@ -12,40 +10,64 @@ Entity_Type :: enum u8 {
LSI = 0x04, // Local secondary index LSI = 0x04, // Local secondary index
} }
// Encode a varint length prefix
encode_varint :: proc(buf: ^bytes.Buffer, value: int) { // Decode a varint length prefix from a byte slice.
temp: [10]byte // Reads starting at data[offset^] and advances offset^ past the varint on success.
n := varint.encode_u64(temp[:], u64(value)) decode_varint :: proc(data: []byte, offset: ^int) -> (value: int, ok: bool) {
bytes.buffer_write(buf, temp[:n]) i := offset^
if i < 0 || i >= len(data) {
return 0, false
}
u: u64 = 0
shift: u32 = 0
for {
if i >= len(data) {
return 0, false // truncated
}
b := data[i]
i += 1
u |= u64(b & 0x7F) << shift
if (b & 0x80) == 0 {
break
}
shift += 7
if shift >= 64 {
return 0, false // malformed / overflow
}
}
// ensure it fits in int on this platform
max_int := int((~uint(0)) >> 1)
if u > u64(max_int) {
return 0, false
}
offset^ = i
return int(u), true
} }
// Decode a varint length prefix
decode_varint :: proc(data: []byte, offset: ^int) -> (value: int, ok: bool) {
if offset^ >= len(data) {
return 0, false
}
val, n := varint.decode_u64(data[offset^:])
if n <= 0 {
return 0, false
}
offset^ += n
return int(val), true
}
// Build metadata key: [meta][table_name] // Build metadata key: [meta][table_name]
build_meta_key :: proc(table_name: string) -> []byte { build_meta_key :: proc(table_name: string) -> []byte {
buf: bytes.Buffer buf: bytes.Buffer
bytes.buffer_init_allocator(&buf, 0, 256, context.allocator) bytes.buffer_init_allocator(&buf, 0, 256, context.allocator)
// Write entity type // Write entity type
bytes.buffer_write_byte(&buf, u8(Entity_Type.Meta)) bytes.buffer_write_byte(&buf, u8(Entity_Type.Meta))
// Write table name with length prefix // Write table name with length prefix
encode_varint(&buf, len(table_name)) encode_varint(&buf, len(table_name))
bytes.buffer_write_string(&buf, table_name) bytes.buffer_write_string(&buf, table_name)
return bytes.buffer_to_bytes(&buf) return bytes.buffer_to_bytes(&buf)
} }
@@ -53,24 +75,24 @@ build_meta_key :: proc(table_name: string) -> []byte {
build_data_key :: proc(table_name: string, pk_value: []byte, sk_value: Maybe([]byte)) -> []byte { build_data_key :: proc(table_name: string, pk_value: []byte, sk_value: Maybe([]byte)) -> []byte {
buf: bytes.Buffer buf: bytes.Buffer
bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) bytes.buffer_init_allocator(&buf, 0, 512, context.allocator)
// Write entity type // Write entity type
bytes.buffer_write_byte(&buf, u8(Entity_Type.Data)) bytes.buffer_write_byte(&buf, u8(Entity_Type.Data))
// Write table name // Write table name
encode_varint(&buf, len(table_name)) encode_varint(&buf, len(table_name))
bytes.buffer_write_string(&buf, table_name) bytes.buffer_write_string(&buf, table_name)
// Write partition key // Write partition key
encode_varint(&buf, len(pk_value)) encode_varint(&buf, len(pk_value))
bytes.buffer_write(&buf, pk_value) bytes.buffer_write(&buf, pk_value)
// Write sort key if present // Write sort key if present
if sk, ok := sk_value.?; ok { if sk, ok := sk_value.?; ok {
encode_varint(&buf, len(sk)) encode_varint(&buf, len(sk))
bytes.buffer_write(&buf, sk) bytes.buffer_write(&buf, sk)
} }
return bytes.buffer_to_bytes(&buf) return bytes.buffer_to_bytes(&buf)
} }
@@ -78,14 +100,14 @@ build_data_key :: proc(table_name: string, pk_value: []byte, sk_value: Maybe([]b
build_table_prefix :: proc(table_name: string) -> []byte { build_table_prefix :: proc(table_name: string) -> []byte {
buf: bytes.Buffer buf: bytes.Buffer
bytes.buffer_init_allocator(&buf, 0, 256, context.allocator) bytes.buffer_init_allocator(&buf, 0, 256, context.allocator)
// Write entity type // Write entity type
bytes.buffer_write_byte(&buf, u8(Entity_Type.Data)) bytes.buffer_write_byte(&buf, u8(Entity_Type.Data))
// Write table name // Write table name
encode_varint(&buf, len(table_name)) encode_varint(&buf, len(table_name))
bytes.buffer_write_string(&buf, table_name) bytes.buffer_write_string(&buf, table_name)
return bytes.buffer_to_bytes(&buf) return bytes.buffer_to_bytes(&buf)
} }
@@ -93,18 +115,18 @@ build_table_prefix :: proc(table_name: string) -> []byte {
build_partition_prefix :: proc(table_name: string, pk_value: []byte) -> []byte { build_partition_prefix :: proc(table_name: string, pk_value: []byte) -> []byte {
buf: bytes.Buffer buf: bytes.Buffer
bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) bytes.buffer_init_allocator(&buf, 0, 512, context.allocator)
// Write entity type // Write entity type
bytes.buffer_write_byte(&buf, u8(Entity_Type.Data)) bytes.buffer_write_byte(&buf, u8(Entity_Type.Data))
// Write table name // Write table name
encode_varint(&buf, len(table_name)) encode_varint(&buf, len(table_name))
bytes.buffer_write_string(&buf, table_name) bytes.buffer_write_string(&buf, table_name)
// Write partition key // Write partition key
encode_varint(&buf, len(pk_value)) encode_varint(&buf, len(pk_value))
bytes.buffer_write(&buf, pk_value) bytes.buffer_write(&buf, pk_value)
return bytes.buffer_to_bytes(&buf) return bytes.buffer_to_bytes(&buf)
} }
@@ -112,28 +134,28 @@ build_partition_prefix :: proc(table_name: string, pk_value: []byte) -> []byte {
build_gsi_key :: proc(table_name: string, index_name: string, gsi_pk: []byte, gsi_sk: Maybe([]byte)) -> []byte { build_gsi_key :: proc(table_name: string, index_name: string, gsi_pk: []byte, gsi_sk: Maybe([]byte)) -> []byte {
buf: bytes.Buffer buf: bytes.Buffer
bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) bytes.buffer_init_allocator(&buf, 0, 512, context.allocator)
// Write entity type // Write entity type
bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI)) bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI))
// Write table name // Write table name
encode_varint(&buf, len(table_name)) encode_varint(&buf, len(table_name))
bytes.buffer_write_string(&buf, table_name) bytes.buffer_write_string(&buf, table_name)
// Write index name // Write index name
encode_varint(&buf, len(index_name)) encode_varint(&buf, len(index_name))
bytes.buffer_write_string(&buf, index_name) bytes.buffer_write_string(&buf, index_name)
// Write GSI partition key // Write GSI partition key
encode_varint(&buf, len(gsi_pk)) encode_varint(&buf, len(gsi_pk))
bytes.buffer_write(&buf, gsi_pk) bytes.buffer_write(&buf, gsi_pk)
// Write GSI sort key if present // Write GSI sort key if present
if sk, ok := gsi_sk.?; ok { if sk, ok := gsi_sk.?; ok {
encode_varint(&buf, len(sk)) encode_varint(&buf, len(sk))
bytes.buffer_write(&buf, sk) bytes.buffer_write(&buf, sk)
} }
return bytes.buffer_to_bytes(&buf) return bytes.buffer_to_bytes(&buf)
} }
@@ -141,26 +163,26 @@ build_gsi_key :: proc(table_name: string, index_name: string, gsi_pk: []byte, gs
build_lsi_key :: proc(table_name: string, index_name: string, pk: []byte, lsi_sk: []byte) -> []byte { build_lsi_key :: proc(table_name: string, index_name: string, pk: []byte, lsi_sk: []byte) -> []byte {
buf: bytes.Buffer buf: bytes.Buffer
bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) bytes.buffer_init_allocator(&buf, 0, 512, context.allocator)
// Write entity type // Write entity type
bytes.buffer_write_byte(&buf, u8(Entity_Type.LSI)) bytes.buffer_write_byte(&buf, u8(Entity_Type.LSI))
// Write table name // Write table name
encode_varint(&buf, len(table_name)) encode_varint(&buf, len(table_name))
bytes.buffer_write_string(&buf, table_name) bytes.buffer_write_string(&buf, table_name)
// Write index name // Write index name
encode_varint(&buf, len(index_name)) encode_varint(&buf, len(index_name))
bytes.buffer_write_string(&buf, index_name) bytes.buffer_write_string(&buf, index_name)
// Write partition key // Write partition key
encode_varint(&buf, len(pk)) encode_varint(&buf, len(pk))
bytes.buffer_write(&buf, pk) bytes.buffer_write(&buf, pk)
// Write LSI sort key // Write LSI sort key
encode_varint(&buf, len(lsi_sk)) encode_varint(&buf, len(lsi_sk))
bytes.buffer_write(&buf, lsi_sk) bytes.buffer_write(&buf, lsi_sk)
return bytes.buffer_to_bytes(&buf) return bytes.buffer_to_bytes(&buf)
} }
@@ -170,15 +192,11 @@ Key_Decoder :: struct {
pos: int, pos: int,
} }
decoder_init :: proc(data: []byte) -> Key_Decoder {
return Key_Decoder{data = data, pos = 0}
}
decoder_read_entity_type :: proc(decoder: ^Key_Decoder) -> (Entity_Type, bool) { decoder_read_entity_type :: proc(decoder: ^Key_Decoder) -> (Entity_Type, bool) {
if decoder.pos >= len(decoder.data) { if decoder.pos >= len(decoder.data) {
return .Meta, false return .Meta, false
} }
entity_type := Entity_Type(decoder.data[decoder.pos]) entity_type := Entity_Type(decoder.data[decoder.pos])
decoder.pos += 1 decoder.pos += 1
return entity_type, true return entity_type, true
@@ -187,32 +205,32 @@ decoder_read_entity_type :: proc(decoder: ^Key_Decoder) -> (Entity_Type, bool) {
decoder_read_segment :: proc(decoder: ^Key_Decoder) -> (segment: []byte, ok: bool) { decoder_read_segment :: proc(decoder: ^Key_Decoder) -> (segment: []byte, ok: bool) {
// Read length // Read length
length := decode_varint(decoder.data, &decoder.pos) or_return length := decode_varint(decoder.data, &decoder.pos) or_return
// Read data // Read data
if decoder.pos + length > len(decoder.data) { if decoder.pos + length > len(decoder.data) {
return nil, false return nil, false
} }
// Return slice (owned by caller via context.allocator) // Return slice (owned by caller via context.allocator)
segment = make([]byte, length, context.allocator) segment = make([]byte, length, context.allocator)
copy(segment, decoder.data[decoder.pos:decoder.pos + length]) copy(segment, decoder.data[decoder.pos:decoder.pos + length])
decoder.pos += length decoder.pos += length
return segment, true return segment, true
} }
decoder_read_segment_borrowed :: proc(decoder: ^Key_Decoder) -> (segment: []byte, ok: bool) { decoder_read_segment_borrowed :: proc(decoder: ^Key_Decoder) -> (segment: []byte, ok: bool) {
// Read length // Read length
length := decode_varint(decoder.data, &decoder.pos) or_return length := decode_varint(decoder.data, &decoder.pos) or_return
// Return borrowed slice // Return borrowed slice
if decoder.pos + length > len(decoder.data) { if decoder.pos + length > len(decoder.data) {
return nil, false return nil, false
} }
segment = decoder.data[decoder.pos:decoder.pos + length] segment = decoder.data[decoder.pos:decoder.pos + length]
decoder.pos += length decoder.pos += length
return segment, true return segment, true
} }
@@ -228,26 +246,26 @@ Decoded_Data_Key :: struct {
} }
decode_data_key :: proc(key: []byte) -> (result: Decoded_Data_Key, ok: bool) { decode_data_key :: proc(key: []byte) -> (result: Decoded_Data_Key, ok: bool) {
decoder := decoder_init(key) decoder := Key_Decoder{data = key, pos = 0}
// Read and verify entity type // Read and verify entity type
entity_type := decoder_read_entity_type(&decoder) or_return entity_type := decoder_read_entity_type(&decoder) or_return
if entity_type != .Data { if entity_type != .Data {
return {}, false return {}, false
} }
// Read table name // Read table name
table_name_bytes := decoder_read_segment(&decoder) or_return table_name_bytes := decoder_read_segment(&decoder) or_return
result.table_name = string(table_name_bytes) result.table_name = string(table_name_bytes)
// Read partition key // Read partition key
result.pk_value = decoder_read_segment(&decoder) or_return result.pk_value = decoder_read_segment(&decoder) or_return
// Read sort key if present // Read sort key if present
if decoder_has_more(&decoder) { if decoder_has_more(&decoder) {
sk := decoder_read_segment(&decoder) or_return sk := decoder_read_segment(&decoder) or_return
result.sk_value = sk result.sk_value = sk
} }
return result, true return result, true
} }

View File

@@ -1,15 +1,12 @@
// Storage engine mapping DynamoDB operations to RocksDB // Storage engine mapping DynamoDB operations to RocksDB
package dynamodb package dynamodb
import "core:encoding/json"
import "core:fmt" import "core:fmt"
import "core:mem" import "core:mem"
import "core:slice" import "core:slice"
import "core:strings" import "core:strings"
import "core:sync" import "core:sync"
import "core:time" import "core:time"
import "../key_codec"
import "../item_codec"
import "../rocksdb" import "../rocksdb"
Storage_Error :: enum { Storage_Error :: enum {
@@ -218,12 +215,12 @@ serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) {
meta_item["CreationDateTime"] = Number(fmt.aprint(metadata.creation_date_time)) meta_item["CreationDateTime"] = Number(fmt.aprint(metadata.creation_date_time))
// Encode to binary // Encode to binary
return item_codec.encode(meta_item) return encode(meta_item)
} }
// Deserialize table metadata from binary format // Deserialize table metadata from binary format
deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (Table_Metadata, bool) { deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (Table_Metadata, bool) {
meta_item, ok := item_codec.decode(data) meta_item, ok := decode(data)
if !ok { if !ok {
return {}, false return {}, false
} }
@@ -239,7 +236,7 @@ deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (T
// Get table metadata // Get table metadata
get_table_metadata :: proc(engine: ^Storage_Engine, table_name: string) -> (Table_Metadata, Storage_Error) { get_table_metadata :: proc(engine: ^Storage_Engine, table_name: string) -> (Table_Metadata, Storage_Error) {
meta_key := key_codec.build_meta_key(table_name) meta_key := build_meta_key(table_name)
defer delete(meta_key) defer delete(meta_key)
value, get_err := rocksdb.db_get(&engine.db, meta_key) value, get_err := rocksdb.db_get(&engine.db, meta_key)
@@ -275,7 +272,7 @@ create_table :: proc(
defer sync.rw_mutex_unlock(table_lock) defer sync.rw_mutex_unlock(table_lock)
// Check if table already exists // Check if table already exists
meta_key := key_codec.build_meta_key(table_name) meta_key := build_meta_key(table_name)
defer delete(meta_key) defer delete(meta_key)
existing, get_err := rocksdb.db_get(&engine.db, meta_key) existing, get_err := rocksdb.db_get(&engine.db, meta_key)
@@ -340,7 +337,7 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err
defer sync.rw_mutex_unlock(table_lock) defer sync.rw_mutex_unlock(table_lock)
// Check table exists // Check table exists
meta_key := key_codec.build_meta_key(table_name) meta_key := build_meta_key(table_name)
defer delete(meta_key) defer delete(meta_key)
existing, get_err := rocksdb.db_get(&engine.db, meta_key) existing, get_err := rocksdb.db_get(&engine.db, meta_key)
@@ -403,11 +400,11 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto
} }
// Build storage key // Build storage key
storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk) storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
defer delete(storage_key) defer delete(storage_key)
// Encode item // Encode item
encoded_item, encode_ok := item_codec.encode(item) encoded_item, encode_ok := encode(item)
if !encode_ok { if !encode_ok {
return .Serialization_Error return .Serialization_Error
} }
@@ -456,7 +453,7 @@ get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (May
} }
// Build storage key // Build storage key
storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk) storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
defer delete(storage_key) defer delete(storage_key)
// Get from RocksDB // Get from RocksDB
@@ -470,7 +467,7 @@ get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (May
defer delete(value) defer delete(value)
// Decode item // Decode item
item, decode_ok := item_codec.decode(value) item, decode_ok := decode(value)
if !decode_ok { if !decode_ok {
return nil, .Serialization_Error return nil, .Serialization_Error
} }
@@ -512,7 +509,7 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
} }
// Build storage key // Build storage key
storage_key := key_codec.build_data_key(table_name, key_values.pk, key_values.sk) storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
defer delete(storage_key) defer delete(storage_key)
// Delete from RocksDB // Delete from RocksDB

View File

@@ -51,13 +51,13 @@ key_destroy :: proc(key: ^Key) {
key_from_item :: proc(item: Item, key_schema: []Key_Schema_Element) -> (Key, bool) { key_from_item :: proc(item: Item, key_schema: []Key_Schema_Element) -> (Key, bool) {
pk_value: Attribute_Value pk_value: Attribute_Value
sk_value: Maybe(Attribute_Value) sk_value: Maybe(Attribute_Value)
for schema_elem in key_schema { for schema_elem in key_schema {
attr, ok := item[schema_elem.attribute_name] attr, ok := item[schema_elem.attribute_name]
if !ok { if !ok {
return {}, false return {}, false
} }
// Validate that key is a scalar type (S, N, or B) // Validate that key is a scalar type (S, N, or B)
#partial switch _ in attr { #partial switch _ in attr {
case String, Number, Binary: case String, Number, Binary:
@@ -65,10 +65,10 @@ key_from_item :: proc(item: Item, key_schema: []Key_Schema_Element) -> (Key, boo
case: case:
return {}, false return {}, false
} }
// Deep copy the attribute value // Deep copy the attribute value
copied := attr_value_deep_copy(attr) copied := attr_value_deep_copy(attr)
switch schema_elem.key_type { switch schema_elem.key_type {
case .HASH: case .HASH:
pk_value = copied pk_value = copied
@@ -76,17 +76,17 @@ key_from_item :: proc(item: Item, key_schema: []Key_Schema_Element) -> (Key, boo
sk_value = copied sk_value = copied
} }
} }
return Key{pk = pk_value, sk = sk_value}, true return Key{pk = pk_value, sk = sk_value}, true
} }
// Convert key to item // Convert key to item
key_to_item :: proc(key: Key, key_schema: []Key_Schema_Element) -> Item { key_to_item :: proc(key: Key, key_schema: []Key_Schema_Element) -> Item {
item := make(Item) item := make(Item)
for schema_elem in key_schema { for schema_elem in key_schema {
attr_value: Attribute_Value attr_value: Attribute_Value
switch schema_elem.key_type { switch schema_elem.key_type {
case .HASH: case .HASH:
attr_value = key.pk attr_value = key.pk
@@ -97,10 +97,10 @@ key_to_item :: proc(key: Key, key_schema: []Key_Schema_Element) -> Item {
continue continue
} }
} }
item[schema_elem.attribute_name] = attr_value_deep_copy(attr_value) item[schema_elem.attribute_name] = attr_value_deep_copy(attr_value)
} }
return item return item
} }
@@ -112,8 +112,8 @@ Key_Values :: struct {
key_get_values :: proc(key: ^Key) -> (Key_Values, bool) { key_get_values :: proc(key: ^Key) -> (Key_Values, bool) {
pk_bytes: []byte pk_bytes: []byte
switch v in key.pk { #partial switch v in key.pk {
case String: case String:
pk_bytes = transmute([]byte)string(v) pk_bytes = transmute([]byte)string(v)
case Number: case Number:
@@ -121,12 +121,13 @@ key_get_values :: proc(key: ^Key) -> (Key_Values, bool) {
case Binary: case Binary:
pk_bytes = transmute([]byte)string(v) pk_bytes = transmute([]byte)string(v)
case: case:
// Keys should only be scalar types (S, N, or B)
return {}, false return {}, false
} }
sk_bytes: Maybe([]byte) sk_bytes: Maybe([]byte)
if sk, ok := key.sk.?; ok { if sk, ok := key.sk.?; ok {
switch v in sk { #partial switch v in sk {
case String: case String:
sk_bytes = transmute([]byte)string(v) sk_bytes = transmute([]byte)string(v)
case Number: case Number:
@@ -134,10 +135,11 @@ key_get_values :: proc(key: ^Key) -> (Key_Values, bool) {
case Binary: case Binary:
sk_bytes = transmute([]byte)string(v) sk_bytes = transmute([]byte)string(v)
case: case:
// Keys should only be scalar types
return {}, false return {}, false
} }
} }
return Key_Values{pk = pk_bytes, sk = sk_bytes}, true return Key_Values{pk = pk_bytes, sk = sk_bytes}, true
} }
@@ -289,9 +291,9 @@ operation_from_target :: proc(target: string) -> Operation {
if !strings.has_prefix(target, prefix) { if !strings.has_prefix(target, prefix) {
return .Unknown return .Unknown
} }
op_name := target[len(prefix):] op_name := target[len(prefix):]
switch op_name { switch op_name {
case "CreateTable": return .CreateTable case "CreateTable": return .CreateTable
case "DeleteTable": return .DeleteTable case "DeleteTable": return .DeleteTable
@@ -309,7 +311,7 @@ operation_from_target :: proc(target: string) -> Operation {
case "TransactGetItems": return .TransactGetItems case "TransactGetItems": return .TransactGetItems
case "TransactWriteItems": return .TransactWriteItems case "TransactWriteItems": return .TransactWriteItems
} }
return .Unknown return .Unknown
} }
@@ -327,7 +329,7 @@ DynamoDB_Error_Type :: enum {
error_to_response :: proc(err_type: DynamoDB_Error_Type, message: string) -> string { error_to_response :: proc(err_type: DynamoDB_Error_Type, message: string) -> string {
type_str: string type_str: string
switch err_type { switch err_type {
case .ValidationException: case .ValidationException:
type_str = "com.amazonaws.dynamodb.v20120810#ValidationException" type_str = "com.amazonaws.dynamodb.v20120810#ValidationException"
@@ -346,7 +348,7 @@ error_to_response :: proc(err_type: DynamoDB_Error_Type, message: string) -> str
case .SerializationException: case .SerializationException:
type_str = "com.amazonaws.dynamodb.v20120810#SerializationException" type_str = "com.amazonaws.dynamodb.v20120810#SerializationException"
} }
return fmt.aprintf(`{{"__type":"%s","message":"%s"}}`, type_str, message) return fmt.aprintf(`{{"__type":"%s","message":"%s"}}`, type_str, message)
} }
@@ -410,30 +412,35 @@ attr_value_destroy :: proc(attr: ^Attribute_Value) {
for s in v { for s in v {
delete(s) delete(s)
} }
delete([]string(v)) slice := v
delete(slice)
case Number_Set: case Number_Set:
for n in v { for n in v {
delete(n) delete(n)
} }
delete([]string(v)) slice := v
delete(slice)
case Binary_Set: case Binary_Set:
for b in v { for b in v {
delete(b) delete(b)
} }
delete([]string(v)) slice := v
delete(slice)
case List: case List:
for item in v { for item in v {
item_copy := item item_copy := item
attr_value_destroy(&item_copy) attr_value_destroy(&item_copy)
} }
delete([]Attribute_Value(v)) list := v
delete(list)
case Map: case Map:
for key, val in v { for key, val in v {
delete(key) delete(key)
val_copy := val val_copy := val
attr_value_destroy(&val_copy) attr_value_destroy(&val_copy)
} }
delete(map[string]Attribute_Value(v)) m := v
delete(m)
case Bool, Null: case Bool, Null:
// Nothing to free // Nothing to free
} }

636
main.odin
View File

@@ -1,11 +1,12 @@
package main package main
import "core:encoding/json"
import "core:fmt" import "core:fmt"
import "core:mem" import "core:mem"
import "core:os" import "core:os"
import "core:strconv" import "core:strconv"
//import "core:strings" // I know we'll use in future but because we're not right now, compiler is complaining import "core:strings"
import "rocksdb" import "dynamodb"
Config :: struct { Config :: struct {
host: string, host: string,
@@ -25,12 +26,12 @@ main :: proc() {
os.make_directory(config.data_dir) os.make_directory(config.data_dir)
// Initialize storage engine // Initialize storage engine
db, err := rocksdb.db_open(config.data_dir, true) engine, err := dynamodb.storage_engine_init(context.allocator, config.data_dir)
if err != .None { if err != .None {
fmt.eprintln("Failed to initialize storage:", err) fmt.eprintln("Failed to initialize storage:", err)
os.exit(1) os.exit(1)
} }
defer rocksdb.db_close(&db) defer dynamodb.storage_engine_destroy(engine)
fmt.printfln("Storage engine initialized at %s", config.data_dir) fmt.printfln("Storage engine initialized at %s", config.data_dir)
fmt.printfln("Starting DynamoDB-compatible server on %s:%d", config.host, config.port) fmt.printfln("Starting DynamoDB-compatible server on %s:%d", config.host, config.port)
@@ -38,13 +39,12 @@ main :: proc() {
// Create HTTP server // Create HTTP server
server_config := default_server_config() server_config := default_server_config()
// For now, use a simple echo handler until we implement the full DynamoDB handler
server, server_ok := server_init( server, server_ok := server_init(
context.allocator, context.allocator,
config.host, config.host,
config.port, config.port,
handle_http_request, handle_dynamodb_request,
&db, engine,
server_config, server_config,
) )
@@ -63,30 +63,628 @@ main :: proc() {
} }
} }
// Temporary HTTP request handler // DynamoDB request handler - called for each HTTP request with request-scoped arena allocator
// TODO: Replace with full DynamoDB handler once dynamodb/handler.odin is implemented handle_dynamodb_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_alloc: mem.Allocator) -> HTTP_Response {
handle_http_request :: proc(ctx: rawptr, request: ^HTTP_Request, request_alloc: mem.Allocator) -> HTTP_Response { engine := cast(^dynamodb.Storage_Engine)ctx
//db := cast(^rocksdb.DB)ctx // I know we'll use in future but because we're not right now, compiler is complaining
// All allocations in this function use the request arena automatically
response := response_init(request_alloc) response := response_init(request_alloc)
response_add_header(&response, "Content-Type", "application/x-amz-json-1.0") response_add_header(&response, "Content-Type", "application/x-amz-json-1.0")
response_add_header(&response, "x-amzn-RequestId", "local-request-id") response_add_header(&response, "x-amzn-RequestId", "local-request-id")
// Get X-Amz-Target header // Get X-Amz-Target header to determine operation
target := request_get_header(request, "X-Amz-Target") target := request_get_header(request, "X-Amz-Target")
if target == nil {
return make_error_response(&response, .ValidationException, "Missing X-Amz-Target header")
}
if t, ok := target.?; ok { operation := dynamodb.operation_from_target(target.?)
// Echo back the operation for now
body := fmt.aprintf("{{\"operation\":\"%s\",\"status\":\"not_implemented\"}}", t) // Route to appropriate handler
response_set_body(&response, transmute([]byte)body) #partial switch operation {
} else { case .CreateTable:
response_set_status(&response, .Bad_Request) handle_create_table(engine, request, &response)
response_set_body(&response, transmute([]byte)string("{\"error\":\"Missing X-Amz-Target header\"}")) case .DeleteTable:
handle_delete_table(engine, request, &response)
case .DescribeTable:
handle_describe_table(engine, request, &response)
case .ListTables:
handle_list_tables(engine, request, &response)
case .PutItem:
handle_put_item(engine, request, &response)
case .GetItem:
handle_get_item(engine, request, &response)
case .DeleteItem:
handle_delete_item(engine, request, &response)
case .Query:
handle_query(engine, request, &response)
case .Scan:
handle_scan(engine, request, &response)
case .Unknown:
return make_error_response(&response, .ValidationException, "Unknown operation")
case:
return make_error_response(&response, .ValidationException, "Operation not implemented")
} }
return response return response
} }
// ============================================================================
// Table Operations
// ============================================================================
handle_create_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
// Parse JSON body
data, parse_err := json.parse(request.body, allocator = context.allocator)
if parse_err != nil {
make_error_response(response, .ValidationException, "Invalid JSON")
return
}
defer json.destroy_value(data)
root, ok := data.(json.Object)
if !ok {
make_error_response(response, .ValidationException, "Request must be an object")
return
}
// Extract TableName
table_name_val, found := root["TableName"]
if !found {
make_error_response(response, .ValidationException, "Missing TableName")
return
}
table_name, name_ok := table_name_val.(json.String)
if !name_ok {
make_error_response(response, .ValidationException, "TableName must be a string")
return
}
// Parse KeySchema
key_schema, ks_err := parse_key_schema(root)
if ks_err != .None {
msg := key_schema_error_message(ks_err)
make_error_response(response, .ValidationException, msg)
return
}
// Parse AttributeDefinitions
attr_defs, ad_err := parse_attribute_definitions(root)
if ad_err != .None {
msg := attribute_definitions_error_message(ad_err)
make_error_response(response, .ValidationException, msg)
return
}
// Validate that key attributes are defined
if !validate_key_attributes_defined(key_schema, attr_defs) {
make_error_response(response, .ValidationException, "Key attribute not defined in AttributeDefinitions")
return
}
// Create the table
desc, create_err := dynamodb.create_table(engine, string(table_name), key_schema, attr_defs)
if create_err != .None {
#partial switch create_err {
case .Table_Already_Exists:
make_error_response(response, .ResourceInUseException, "Table already exists")
case:
make_error_response(response, .InternalServerError, "Failed to create table")
}
return
}
// Build response
resp_body := fmt.aprintf(
`{{"TableDescription":{{"TableName":"%s","TableStatus":"%s","CreationDateTime":%d}}}}`,
desc.table_name,
dynamodb.table_status_to_string(desc.table_status),
desc.creation_date_time,
)
response_set_body(response, transmute([]byte)resp_body)
}
handle_delete_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
err := dynamodb.delete_table(engine, table_name)
if err != .None {
#partial switch err {
case .Table_Not_Found:
make_error_response(response, .ResourceNotFoundException, "Table not found")
case:
make_error_response(response, .InternalServerError, "Failed to delete table")
}
return
}
resp_body := fmt.aprintf(`{{"TableDescription":{{"TableName":"%s","TableStatus":"DELETING"}}}}`, table_name)
response_set_body(response, transmute([]byte)resp_body)
}
handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
metadata, err := dynamodb.get_table_metadata(engine, table_name)
if err != .None {
#partial switch err {
case .Table_Not_Found:
make_error_response(response, .ResourceNotFoundException, "Table not found")
case:
make_error_response(response, .InternalServerError, "Failed to describe table")
}
return
}
defer dynamodb.table_metadata_destroy(&metadata, context.allocator)
// Build response with key schema
builder := strings.builder_make()
strings.write_string(&builder, `{"Table":{"TableName":"`)
strings.write_string(&builder, metadata.table_name)
strings.write_string(&builder, `","TableStatus":"`)
strings.write_string(&builder, dynamodb.table_status_to_string(metadata.table_status))
strings.write_string(&builder, `","CreationDateTime":`)
fmt.sbprintf(&builder, "%d", metadata.creation_date_time)
strings.write_string(&builder, `,"KeySchema":[`)
for ks, i in metadata.key_schema {
if i > 0 do strings.write_string(&builder, ",")
fmt.sbprintf(&builder, `{"AttributeName":"%s","KeyType":"%s"}`,
ks.attribute_name, dynamodb.key_type_to_string(ks.key_type))
}
strings.write_string(&builder, `],"AttributeDefinitions":[`)
for ad, i in metadata.attribute_definitions {
if i > 0 do strings.write_string(&builder, ",")
fmt.sbprintf(&builder, `{"AttributeName":"%s","AttributeType":"%s"}`,
ad.attribute_name, dynamodb.scalar_type_to_string(ad.attribute_type))
}
strings.write_string(&builder, `]}}`)
resp_body := strings.to_string(builder)
response_set_body(response, transmute([]byte)resp_body)
}
handle_list_tables :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
_ = request // Not using request body for ListTables
tables := dynamodb.list_tables(engine)
// list_tables returns []string which may be empty, not an error
// Build response
builder := strings.builder_make()
strings.write_string(&builder, `{"TableNames":[`)
for table, i in tables {
if i > 0 do strings.write_string(&builder, ",")
fmt.sbprintf(&builder, `"%s"`, table)
}
strings.write_string(&builder, `]}`)
resp_body := strings.to_string(builder)
response_set_body(response, transmute([]byte)resp_body)
}
// ============================================================================
// Item Operations
// ============================================================================
handle_put_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
item, item_ok := dynamodb.parse_item_from_request(request.body)
if !item_ok {
make_error_response(response, .ValidationException, "Invalid or missing Item")
return
}
defer dynamodb.item_destroy(&item)
err := dynamodb.put_item(engine, table_name, item)
if err != .None {
#partial switch err {
case .Table_Not_Found:
make_error_response(response, .ResourceNotFoundException, "Table not found")
case .Missing_Key_Attribute:
make_error_response(response, .ValidationException, "Item missing required key attribute")
case .Invalid_Key:
make_error_response(response, .ValidationException, "Invalid key format")
case:
make_error_response(response, .InternalServerError, "Failed to put item")
}
return
}
response_set_body(response, transmute([]byte)string("{}"))
}
handle_get_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
key, key_ok := dynamodb.parse_key_from_request(request.body)
if !key_ok {
make_error_response(response, .ValidationException, "Invalid or missing Key")
return
}
defer dynamodb.item_destroy(&key)
item, err := dynamodb.get_item(engine, table_name, key)
if err != .None {
#partial switch err {
case .Table_Not_Found:
make_error_response(response, .ResourceNotFoundException, "Table not found")
case .Missing_Key_Attribute:
make_error_response(response, .ValidationException, "Key missing required attributes")
case .Invalid_Key:
make_error_response(response, .ValidationException, "Invalid key format")
case:
make_error_response(response, .InternalServerError, "Failed to get item")
}
return
}
if item_val, has_item := item.?; has_item {
defer dynamodb.item_destroy(&item_val)
item_json := dynamodb.serialize_item(item_val)
resp := fmt.aprintf(`{"Item":%s}`, item_json)
response_set_body(response, transmute([]byte)resp)
} else {
response_set_body(response, transmute([]byte)string("{}"))
}
}
handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
table_name, ok := dynamodb.parse_table_name(request.body)
if !ok {
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
return
}
key, key_ok := dynamodb.parse_key_from_request(request.body)
if !key_ok {
make_error_response(response, .ValidationException, "Invalid or missing Key")
return
}
defer dynamodb.item_destroy(&key)
err := dynamodb.delete_item(engine, table_name, key)
if err != .None {
#partial switch err {
case .Table_Not_Found:
make_error_response(response, .ResourceNotFoundException, "Table not found")
case .Missing_Key_Attribute:
make_error_response(response, .ValidationException, "Key missing required attributes")
case .Invalid_Key:
make_error_response(response, .ValidationException, "Invalid key format")
case:
make_error_response(response, .InternalServerError, "Failed to delete item")
}
return
}
response_set_body(response, transmute([]byte)string("{}"))
}
// ============================================================================
// Query and Scan Operations
// ============================================================================
handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
_ = engine
_ = request
// For now, return not implemented
// TODO: Implement KeyConditionExpression parsing and query logic
make_error_response(response, .ValidationException, "Query operation not yet implemented")
}
handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
_ = engine
_ = request
// TODO: Implement scan operation in storage.odin
make_error_response(response, .ValidationException, "Scan operation not yet implemented")
}
// ============================================================================
// Schema Parsing Helpers
// ============================================================================
Key_Schema_Error :: enum {
None,
Missing_Key_Schema,
Invalid_Key_Schema,
No_Hash_Key,
Multiple_Hash_Keys,
Multiple_Range_Keys,
Invalid_Key_Type,
}
parse_key_schema :: proc(root: json.Object) -> ([]dynamodb.Key_Schema_Element, Key_Schema_Error) {
key_schema_val, found := root["KeySchema"]
if !found do return nil, .Missing_Key_Schema
key_schema_array, ok := key_schema_val.(json.Array)
if !ok do return nil, .Invalid_Key_Schema
if len(key_schema_array) == 0 || len(key_schema_array) > 2 {
return nil, .Invalid_Key_Schema
}
key_schema := make([]dynamodb.Key_Schema_Element, len(key_schema_array))
hash_count := 0
range_count := 0
for elem, i in key_schema_array {
elem_obj, elem_ok := elem.(json.Object)
if !elem_ok {
// Cleanup
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
// Get AttributeName
attr_name_val, attr_found := elem_obj["AttributeName"]
if !attr_found {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
attr_name, name_ok := attr_name_val.(json.String)
if !name_ok {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
// Get KeyType
key_type_val, type_found := elem_obj["KeyType"]
if !type_found {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
key_type_str, type_ok := key_type_val.(json.String)
if !type_ok {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Schema
}
key_type, kt_ok := dynamodb.key_type_from_string(string(key_type_str))
if !kt_ok {
for j in 0..<i {
delete(key_schema[j].attribute_name)
}
delete(key_schema)
return nil, .Invalid_Key_Type
}
// Count key types
switch key_type {
case .HASH: hash_count += 1
case .RANGE: range_count += 1
}
key_schema[i] = dynamodb.Key_Schema_Element{
attribute_name = strings.clone(string(attr_name)),
key_type = key_type,
}
}
// Validate key counts
if hash_count == 0 {
for ks in key_schema {
delete(ks.attribute_name)
}
delete(key_schema)
return nil, .No_Hash_Key
}
if hash_count > 1 {
for ks in key_schema {
delete(ks.attribute_name)
}
delete(key_schema)
return nil, .Multiple_Hash_Keys
}
if range_count > 1 {
for ks in key_schema {
delete(ks.attribute_name)
}
delete(key_schema)
return nil, .Multiple_Range_Keys
}
return key_schema, .None
}
key_schema_error_message :: proc(err: Key_Schema_Error) -> string {
switch err {
case .None: return ""
case .Missing_Key_Schema: return "Missing KeySchema"
case .Invalid_Key_Schema: return "Invalid KeySchema format"
case .No_Hash_Key: return "KeySchema must contain exactly one HASH key"
case .Multiple_Hash_Keys: return "KeySchema can only contain one HASH key"
case .Multiple_Range_Keys: return "KeySchema can only contain one RANGE key"
case .Invalid_Key_Type: return "Invalid KeyType (must be HASH or RANGE)"
}
return "Invalid KeySchema"
}
Attribute_Definitions_Error :: enum {
None,
Missing_Attribute_Definitions,
Invalid_Attribute_Definitions,
Invalid_Attribute_Type,
Duplicate_Attribute_Name,
}
parse_attribute_definitions :: proc(root: json.Object) -> ([]dynamodb.Attribute_Definition, Attribute_Definitions_Error) {
attr_defs_val, found := root["AttributeDefinitions"]
if !found do return nil, .Missing_Attribute_Definitions
attr_defs_array, ok := attr_defs_val.(json.Array)
if !ok do return nil, .Invalid_Attribute_Definitions
if len(attr_defs_array) == 0 {
return nil, .Invalid_Attribute_Definitions
}
attr_defs := make([]dynamodb.Attribute_Definition, len(attr_defs_array))
seen_names := make(map[string]bool, allocator = context.temp_allocator)
defer delete(seen_names)
for elem, i in attr_defs_array {
elem_obj, elem_ok := elem.(json.Object)
if !elem_ok {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
// Get AttributeName
attr_name_val, name_found := elem_obj["AttributeName"]
if !name_found {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
attr_name, name_ok := attr_name_val.(json.String)
if !name_ok {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
// Check for duplicates
if string(attr_name) in seen_names {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Duplicate_Attribute_Name
}
seen_names[string(attr_name)] = true
// Get AttributeType
attr_type_val, type_found := elem_obj["AttributeType"]
if !type_found {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
attr_type_str, type_ok := attr_type_val.(json.String)
if !type_ok {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Definitions
}
attr_type, at_ok := dynamodb.scalar_type_from_string(string(attr_type_str))
if !at_ok {
for j in 0..<i {
delete(attr_defs[j].attribute_name)
}
delete(attr_defs)
return nil, .Invalid_Attribute_Type
}
attr_defs[i] = dynamodb.Attribute_Definition{
attribute_name = strings.clone(string(attr_name)),
attribute_type = attr_type,
}
}
return attr_defs, .None
}
attribute_definitions_error_message :: proc(err: Attribute_Definitions_Error) -> string {
switch err {
case .None: return ""
case .Missing_Attribute_Definitions: return "Missing AttributeDefinitions"
case .Invalid_Attribute_Definitions: return "Invalid AttributeDefinitions format"
case .Invalid_Attribute_Type: return "Invalid AttributeType (must be S, N, or B)"
case .Duplicate_Attribute_Name: return "Duplicate attribute name in AttributeDefinitions"
}
return "Invalid AttributeDefinitions"
}
validate_key_attributes_defined :: proc(key_schema: []dynamodb.Key_Schema_Element, attr_defs: []dynamodb.Attribute_Definition) -> bool {
for ks in key_schema {
found := false
for ad in attr_defs {
if ks.attribute_name == ad.attribute_name {
found = true
break
}
}
if !found do return false
}
return true
}
// ============================================================================
// Error Response Helper
// ============================================================================
make_error_response :: proc(response: ^HTTP_Response, err_type: dynamodb.DynamoDB_Error_Type, message: string) -> HTTP_Response {
response_set_status(response, .Bad_Request)
error_body := dynamodb.error_to_response(err_type, message)
response_set_body(response, transmute([]byte)error_body)
return response^
}
// ============================================================================
// Configuration
// ============================================================================
parse_config :: proc() -> Config { parse_config :: proc() -> Config {
config := Config{ config := Config{
host = "0.0.0.0", host = "0.0.0.0",

View File

@@ -1,22 +1,167 @@
// TODO: In order to use RocksDB's WAL replication helpers, we need to import the C++ library so we use this shim #include "rocksdb_shim.h"
/** #include <rocksdb/db.h>
C++ shim implementation notes (the important bits) #include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <cstring>
#include <string>
In this rocksdb_shim.cc we'll need to use: // Internal structure wrapping rocksdb::DB
struct jormun_db {
rocksdb::DB* db;
};
rocksdb::DB::Open(...) // Placeholder for WAL iterator (not implemented yet)
struct jormun_wal_iter {
// TODO: Implement with TransactionLogIterator when needed
void* placeholder;
};
db->GetLatestSequenceNumber() // Open database
jormun_db* jormun_db_open(const char* path, int create_if_missing, char** err) {
rocksdb::Options options;
options.create_if_missing = create_if_missing != 0;
rocksdb::DB* db_ptr = nullptr;
rocksdb::Status status = rocksdb::DB::Open(options, path, &db_ptr);
if (!status.ok()) {
if (err) {
std::string error_msg = status.ToString();
*err = strdup(error_msg.c_str());
}
return nullptr;
}
jormun_db* jdb = new jormun_db;
jdb->db = db_ptr;
return jdb;
}
db->GetUpdatesSince(seq, &iter) // Close database
void jormun_db_close(jormun_db* db) {
if (db) {
delete db->db;
delete db;
}
}
from each TransactionLogIterator entry: // Put key-value pair
void jormun_db_put(jormun_db* db,
const void* key, size_t keylen,
const void* val, size_t vallen,
char** err) {
if (!db || !db->db) {
if (err) *err = strdup("Database is null");
return;
}
rocksdb::WriteOptions write_options;
rocksdb::Slice key_slice(static_cast<const char*>(key), keylen);
rocksdb::Slice val_slice(static_cast<const char*>(val), vallen);
rocksdb::Status status = db->db->Put(write_options, key_slice, val_slice);
if (!status.ok() && err) {
std::string error_msg = status.ToString();
*err = strdup(error_msg.c_str());
}
}
get WriteBatch and serialize via WriteBatch::Data() // Get value for key
unsigned char* jormun_db_get(jormun_db* db,
const void* key, size_t keylen,
size_t* vallen,
char** err) {
if (!db || !db->db) {
if (err) *err = strdup("Database is null");
return nullptr;
}
rocksdb::ReadOptions read_options;
rocksdb::Slice key_slice(static_cast<const char*>(key), keylen);
std::string value;
rocksdb::Status status = db->db->Get(read_options, key_slice, &value);
if (status.IsNotFound()) {
*vallen = 0;
return nullptr;
}
if (!status.ok()) {
if (err) {
std::string error_msg = status.ToString();
*err = strdup(error_msg.c_str());
}
return nullptr;
}
// Allocate and copy value
*vallen = value.size();
unsigned char* result = static_cast<unsigned char*>(malloc(value.size()));
if (result) {
memcpy(result, value.data(), value.size());
}
return result;
}
apply via rocksdb::WriteBatch wb(data); db->Write(write_options, &wb); // Free memory allocated by the shim
void jormun_free(void* p) {
free(p);
}
Also we must configure WAL retention so the followers dont fall off the end. RocksDB warns the iterator can become invalid if WAL is cleared aggressively; typical controls are WAL TTL / size limit. // ============================================================================
// WAL Replication Functions (Stubs for now - to be implemented)
// ============================================================================
https://github.com/facebook/rocksdb/issues/1565 // Get latest sequence number
*/ uint64_t jormun_latest_sequence(jormun_db* db) {
if (!db || !db->db) return 0;
return db->db->GetLatestSequenceNumber();
}
// Create WAL iterator (stub)
jormun_wal_iter* jormun_wal_iter_create(jormun_db* db, uint64_t seq, char** err) {
(void)db;
(void)seq;
if (err) {
*err = strdup("WAL iteration not yet implemented");
}
return nullptr;
}
// Destroy WAL iterator (stub)
void jormun_wal_iter_destroy(jormun_wal_iter* it) {
if (it) {
delete it;
}
}
// Get next batch from WAL (stub)
int jormun_wal_iter_next(jormun_wal_iter* it,
uint64_t* batch_start_seq,
unsigned char** out_data,
size_t* out_len,
char** err) {
(void)it;
(void)batch_start_seq;
(void)out_data;
(void)out_len;
if (err) {
*err = strdup("WAL iteration not yet implemented");
}
return 0;
}
// Apply write batch (stub)
void jormun_apply_writebatch(jormun_db* db,
const unsigned char* data, size_t len,
char** err) {
(void)db;
(void)data;
(void)len;
if (err) {
*err = strdup("WAL apply not yet implemented");
}
}