From 972e6ece5eb65f3e6dfb39460f27609e0a17d020 Mon Sep 17 00:00:00 2001 From: biondizzle Date: Mon, 16 Feb 2026 02:15:15 -0500 Subject: [PATCH] global secondary indexes --- README.md | 2 +- dynamodb/gsi.odin | 481 ++++++++++++++++++++++++++++++++++++ dynamodb/gsi_metadata.odin | 187 ++++++++++++++ dynamodb/key_codec_gsi.odin | 94 +++++++ dynamodb/storage.odin | 170 ++++++++++++- dynamodb/update_item.odin | 6 + gsi_handlers.odin | 276 +++++++++++++++++++++ main.odin | 175 +++++++++++-- 8 files changed, 1369 insertions(+), 22 deletions(-) create mode 100644 dynamodb/gsi.odin create mode 100644 dynamodb/gsi_metadata.odin create mode 100644 dynamodb/key_codec_gsi.odin create mode 100644 gsi_handlers.odin diff --git a/README.md b/README.md index 8c3ea62..7f648fe 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ JormunDB is a Self-Hosted DynamoDB replacement that speaks the DynamoDB wire protocol. Point your AWS SDK or CLI at it and use it as a drop-in replacement. -**Why Odin?** The original Zig implementation suffered from explicit allocator threading. Where every function ended up needing an `allocator` parameter and every allocation needed `errdefer` cleanup. Odin's implicit context allocator system eliminates this ceremony. Just one `context.allocator = arena_allocator` at the request handler entry and everything downstream just works. +**Why Odin?** The original Zig implementation suffered from explicit allocator threading. Where every function ended up needing an `allocator` parameter and every allocation needed `errdefer` cleanup. Odin's implicit context allocator system eliminates this ceremony. Just one `context.allocator = arena_allocator` at the request handler entry and it feels more like working with ctx in Go instead of filling out tax forms. ## Features diff --git a/dynamodb/gsi.odin b/dynamodb/gsi.odin new file mode 100644 index 0000000..d33a382 --- /dev/null +++ b/dynamodb/gsi.odin @@ -0,0 +1,481 @@ +// Global Secondary Index (GSI) support +// +// DynamoDB GSI semantics: +// - GSI entries are maintained automatically on every write (put/delete/update) +// - Each GSI has its own key schema (partition key + optional sort key) +// - GSI keys are built from item attributes; if an item doesn't have the GSI +// key attribute(s), NO GSI entry is written (sparse index) +// - Projection controls which non-key attributes are stored in the GSI entry: +// ALL → entire item is copied +// KEYS_ONLY → only table PK/SK + GSI PK/SK +// INCLUDE → table keys + GSI keys + specified non-key attributes +// - Query on a GSI uses IndexName to route to the correct key prefix +// +// Storage layout: +// GSI key: [0x03][table_name][index_name][gsi_pk_value][gsi_sk_value?] +// GSI value: TLV-encoded projected item (same binary format as regular items) +// +// Write path: +// put_item → for each GSI, extract GSI key attrs from the NEW item, write GSI entry +// delete → for each GSI, extract GSI key attrs from the OLD item, delete GSI entry +// update → delete OLD GSI entries, write NEW GSI entries +// +package dynamodb + +import "core:slice" +import "core:strings" +import "../rocksdb" + +// ============================================================================ +// GSI Key Extraction +// +// Extracts the GSI partition key (and optional sort key) raw bytes from an item. +// Returns false if the item doesn't have the required GSI PK attribute (sparse). +// ============================================================================ + +GSI_Key_Values :: struct { + pk: []byte, + sk: Maybe([]byte), +} + +// Extract GSI key values from an item based on the GSI's key schema. +// Returns ok=false if the required partition key attribute is missing (sparse index). +gsi_extract_key_values :: proc(item: Item, gsi_key_schema: []Key_Schema_Element) -> (GSI_Key_Values, bool) { + result: GSI_Key_Values + + for ks in gsi_key_schema { + attr, found := item[ks.attribute_name] + if !found { + if ks.key_type == .HASH { + return {}, false // PK missing → sparse, skip this GSI entry + } + continue // SK missing is OK, just no SK segment + } + + raw, raw_ok := attr_value_to_bytes(attr) + if !raw_ok { + if ks.key_type == .HASH { + return {}, false + } + continue + } + + switch ks.key_type { + case .HASH: + result.pk = raw + case .RANGE: + result.sk = raw + } + } + + return result, true +} + +// Convert a scalar attribute value to its raw byte representation (borrowed). +attr_value_to_bytes :: proc(attr: Attribute_Value) -> ([]byte, bool) { + #partial switch v in attr { + case String: + return transmute([]byte)string(v), true + case Number: + return transmute([]byte)string(v), true + case Binary: + return transmute([]byte)string(v), true + } + return nil, false +} + +// ============================================================================ +// GSI Projection +// +// Build a projected copy of an item for storage in a GSI entry. +// ============================================================================ + +// Build the projected item for a GSI entry. +// The result is a new Item that the caller owns. +gsi_project_item :: proc( + item: Item, + gsi: ^Global_Secondary_Index, + table_key_schema: []Key_Schema_Element, +) -> Item { + switch gsi.projection.projection_type { + case .ALL: + return item_deep_copy(item) + + case .KEYS_ONLY: + projected := make(Item) + // Include table key attributes + for ks in table_key_schema { + if val, found := item[ks.attribute_name]; found { + projected[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) + } + } + // Include GSI key attributes + for ks in gsi.key_schema { + if _, already := projected[ks.attribute_name]; already { + continue // Already included as table key + } + if val, found := item[ks.attribute_name]; found { + projected[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) + } + } + return projected + + case .INCLUDE: + projected := make(Item) + // Include table key attributes + for ks in table_key_schema { + if val, found := item[ks.attribute_name]; found { + projected[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) + } + } + // Include GSI key attributes + for ks in gsi.key_schema { + if _, already := projected[ks.attribute_name]; already { + continue + } + if val, found := item[ks.attribute_name]; found { + projected[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) + } + } + // Include specified non-key attributes + if nka, has_nka := gsi.projection.non_key_attributes.?; has_nka { + for attr_name in nka { + if _, already := projected[attr_name]; already { + continue + } + if val, found := item[attr_name]; found { + projected[strings.clone(attr_name)] = attr_value_deep_copy(val) + } + } + } + return projected + } + + // Fallback: all + return item_deep_copy(item) +} + +// ============================================================================ +// GSI Write Maintenance +// +// Called after a successful data write to maintain GSI entries. +// Uses WriteBatch for atomicity (all GSI entries for one item in one batch). +// ============================================================================ + +// Write GSI entries for an item across all GSIs defined on the table. +// Should be called AFTER the main data key is written. +gsi_write_entries :: proc( + engine: ^Storage_Engine, + table_name: string, + item: Item, + metadata: ^Table_Metadata, +) -> Storage_Error { + gsis, has_gsis := metadata.global_secondary_indexes.? + if !has_gsis || len(gsis) == 0 { + return .None + } + + for &gsi in gsis { + // Extract GSI key from item + gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema) + if !kv_ok { + continue // Sparse: item doesn't have GSI PK, skip + } + + // Build GSI storage key + gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) + defer delete(gsi_storage_key) + + // Build projected item + projected := gsi_project_item(item, &gsi, metadata.key_schema) + defer item_destroy(&projected) + + // Encode projected item + encoded, encode_ok := encode(projected) + if !encode_ok { + return .Serialization_Error + } + defer delete(encoded) + + // Write to RocksDB + put_err := rocksdb.db_put(&engine.db, gsi_storage_key, encoded) + if put_err != .None { + return .RocksDB_Error + } + } + + return .None +} + +// Delete GSI entries for an item across all GSIs. +// Should be called BEFORE or AFTER the main data key is deleted. +// Needs the OLD item to know which GSI keys to remove. +gsi_delete_entries :: proc( + engine: ^Storage_Engine, + table_name: string, + old_item: Item, + metadata: ^Table_Metadata, +) -> Storage_Error { + gsis, has_gsis := metadata.global_secondary_indexes.? + if !has_gsis || len(gsis) == 0 { + return .None + } + + for &gsi in gsis { + gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema) + if !kv_ok { + continue // Item didn't have a GSI entry + } + + gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) + defer delete(gsi_storage_key) + + del_err := rocksdb.db_delete(&engine.db, gsi_storage_key) + if del_err != .None { + return .RocksDB_Error + } + } + + return .None +} + +// ============================================================================ +// GSI Query +// +// Queries a GSI by partition key with optional sort key condition. +// Mirrors the main table query() but uses GSI key prefix. +// ============================================================================ + +gsi_query :: proc( + engine: ^Storage_Engine, + table_name: string, + index_name: string, + partition_key_value: []byte, + exclusive_start_key: Maybe([]byte), + limit: int, + sk_condition: Maybe(Sort_Key_Condition) = nil, +) -> (Query_Result, Storage_Error) { + // Build GSI partition prefix + prefix := build_gsi_partition_prefix(table_name, index_name, partition_key_value) + defer delete(prefix) + + iter, iter_err := rocksdb.iter_create(&engine.db) + if iter_err != .None { + return {}, .RocksDB_Error + } + defer rocksdb.iter_destroy(&iter) + + max_items := limit if limit > 0 else 1_000_000 + + // Seek to start position + if start_key, has_start := exclusive_start_key.?; has_start { + if has_prefix(start_key, prefix) { + rocksdb.iter_seek(&iter, start_key) + if rocksdb.iter_valid(&iter) { + rocksdb.iter_next(&iter) + } + } else { + rocksdb.iter_seek(&iter, prefix) + } + } else { + rocksdb.iter_seek(&iter, prefix) + } + + items := make([dynamic]Item) + count := 0 + last_key: Maybe([]byte) = nil + has_more := false + + for rocksdb.iter_valid(&iter) { + key := rocksdb.iter_key(&iter) + if key == nil || !has_prefix(key, prefix) { + break + } + + if count >= max_items { + has_more = true + break + } + + value := rocksdb.iter_value(&iter) + if value == nil { + rocksdb.iter_next(&iter) + continue + } + + item, decode_ok := decode(value) + if !decode_ok { + rocksdb.iter_next(&iter) + continue + } + + // Sort key condition filtering + if skc, has_skc := sk_condition.?; has_skc { + if !evaluate_sort_key_condition(item, &skc) { + item_copy := item + item_destroy(&item_copy) + rocksdb.iter_next(&iter) + continue + } + } + + append(&items, item) + count += 1 + + // Track key of last returned item + if prev_key, had_prev := last_key.?; had_prev { + delete(prev_key) + } + last_key = slice.clone(key) + + rocksdb.iter_next(&iter) + } + + // Only emit LastEvaluatedKey if there are more items + if !has_more { + if lk, had_lk := last_key.?; had_lk { + delete(lk) + } + last_key = nil + } + + result_items := make([]Item, len(items)) + copy(result_items, items[:]) + + return Query_Result{ + items = result_items, + last_evaluated_key = last_key, + }, .None +} + +// ============================================================================ +// GSI Scan +// +// Scans all entries in a GSI (all partition keys under that index). +// ============================================================================ + +gsi_scan :: proc( + engine: ^Storage_Engine, + table_name: string, + index_name: string, + exclusive_start_key: Maybe([]byte), + limit: int, +) -> (Scan_Result, Storage_Error) { + prefix := build_gsi_prefix(table_name, index_name) + defer delete(prefix) + + iter, iter_err := rocksdb.iter_create(&engine.db) + if iter_err != .None { + return {}, .RocksDB_Error + } + defer rocksdb.iter_destroy(&iter) + + max_items := limit if limit > 0 else 1_000_000 + + if start_key, has_start := exclusive_start_key.?; has_start { + if has_prefix(start_key, prefix) { + rocksdb.iter_seek(&iter, start_key) + if rocksdb.iter_valid(&iter) { + rocksdb.iter_next(&iter) + } + } else { + rocksdb.iter_seek(&iter, prefix) + } + } else { + rocksdb.iter_seek(&iter, prefix) + } + + items := make([dynamic]Item) + count := 0 + last_key: Maybe([]byte) = nil + has_more := false + + for rocksdb.iter_valid(&iter) { + key := rocksdb.iter_key(&iter) + if key == nil || !has_prefix(key, prefix) { + break + } + + if count >= max_items { + has_more = true + break + } + + value := rocksdb.iter_value(&iter) + if value == nil { + rocksdb.iter_next(&iter) + continue + } + + item, decode_ok := decode(value) + if !decode_ok { + rocksdb.iter_next(&iter) + continue + } + + append(&items, item) + count += 1 + + if prev_key, had_prev := last_key.?; had_prev { + delete(prev_key) + } + last_key = slice.clone(key) + + rocksdb.iter_next(&iter) + } + + if !has_more { + if lk, had_lk := last_key.?; had_lk { + delete(lk) + } + last_key = nil + } + + result_items := make([]Item, len(items)) + copy(result_items, items[:]) + + return Scan_Result{ + items = result_items, + last_evaluated_key = last_key, + }, .None +} + +// ============================================================================ +// GSI Metadata Lookup Helpers +// ============================================================================ + +// Find a GSI definition by index name in the table metadata. +find_gsi :: proc(metadata: ^Table_Metadata, index_name: string) -> (^Global_Secondary_Index, bool) { + gsis, has_gsis := metadata.global_secondary_indexes.? + if !has_gsis { + return nil, false + } + + for &gsi in gsis { + if gsi.index_name == index_name { + return &gsi, true + } + } + + return nil, false +} + +// Get the GSI's sort key attribute name (if any). +gsi_get_sort_key_name :: proc(gsi: ^Global_Secondary_Index) -> Maybe(string) { + for ks in gsi.key_schema { + if ks.key_type == .RANGE { + return ks.attribute_name + } + } + return nil +} + +// Get the GSI's partition key attribute name. +gsi_get_partition_key_name :: proc(gsi: ^Global_Secondary_Index) -> Maybe(string) { + for ks in gsi.key_schema { + if ks.key_type == .HASH { + return ks.attribute_name + } + } + return nil +} diff --git a/dynamodb/gsi_metadata.odin b/dynamodb/gsi_metadata.odin new file mode 100644 index 0000000..ddf4b13 --- /dev/null +++ b/dynamodb/gsi_metadata.odin @@ -0,0 +1,187 @@ +// gsi_metadata.odin — GSI metadata parsing for serialize/deserialize_table_metadata +// +// Parses GSI definitions from the embedded JSON string stored in table metadata. +// This file lives in the dynamodb/ package. +package dynamodb + +import "core:encoding/json" +import "core:mem" +import "core:strings" + +// Parse GlobalSecondaryIndexes from a JSON string like: +// [{"IndexName":"email-index","KeySchema":[{"AttributeName":"email","KeyType":"HASH"}], +// "Projection":{"ProjectionType":"ALL"}}] +// +// Allocates all strings with the given allocator (engine.allocator for long-lived data). +parse_gsis_json :: proc(json_str: string, allocator: mem.Allocator) -> ([]Global_Secondary_Index, bool) { + data, parse_err := json.parse(transmute([]byte)json_str, allocator = context.temp_allocator) + if parse_err != nil { + return nil, false + } + defer json.destroy_value(data) + + arr, ok := data.(json.Array) + if !ok { + return nil, false + } + + if len(arr) == 0 { + return nil, true // Empty is valid + } + + result := make([]Global_Secondary_Index, len(arr), allocator) + + for elem, i in arr { + obj, obj_ok := elem.(json.Object) + if !obj_ok { + cleanup_gsis(result[:i], allocator) + delete(result, allocator) + return nil, false + } + + gsi, gsi_ok := parse_single_gsi_json(obj, allocator) + if !gsi_ok { + cleanup_gsis(result[:i], allocator) + delete(result, allocator) + return nil, false + } + + result[i] = gsi + } + + return result, true +} + +// Parse a single GSI object from JSON +@(private = "file") +parse_single_gsi_json :: proc(obj: json.Object, allocator: mem.Allocator) -> (Global_Secondary_Index, bool) { + gsi: Global_Secondary_Index + + // IndexName + idx_val, idx_found := obj["IndexName"] + if !idx_found { + return {}, false + } + idx_str, idx_ok := idx_val.(json.String) + if !idx_ok { + return {}, false + } + gsi.index_name = strings.clone(string(idx_str), allocator) + + // KeySchema + ks_val, ks_found := obj["KeySchema"] + if !ks_found { + delete(gsi.index_name, allocator) + return {}, false + } + ks_arr, ks_ok := ks_val.(json.Array) + if !ks_ok || len(ks_arr) == 0 || len(ks_arr) > 2 { + delete(gsi.index_name, allocator) + return {}, false + } + + key_schema := make([]Key_Schema_Element, len(ks_arr), allocator) + for ks_elem, j in ks_arr { + ks_obj, kobj_ok := ks_elem.(json.Object) + if !kobj_ok { + for k in 0.. 0 { + nka := make([]string, len(nka_arr), allocator) + for attr_val, k in nka_arr { + if attr_str, attr_ok := attr_val.(json.String); attr_ok { + nka[k] = strings.clone(string(attr_str), allocator) + } + } + gsi.projection.non_key_attributes = nka + } + } + } + } + + return gsi, true +} + +// Clean up partially-constructed GSI array +cleanup_gsis :: proc(gsis: []Global_Secondary_Index, allocator: mem.Allocator) { + for gsi in gsis { + delete(gsi.index_name, allocator) + for ks in gsi.key_schema { + delete(ks.attribute_name, allocator) + } + delete(gsi.key_schema, allocator) + if nka, has_nka := gsi.projection.non_key_attributes.?; has_nka { + for attr in nka { + delete(attr, allocator) + } + delete(nka, allocator) + } + } +} diff --git a/dynamodb/key_codec_gsi.odin b/dynamodb/key_codec_gsi.odin new file mode 100644 index 0000000..f1c9d03 --- /dev/null +++ b/dynamodb/key_codec_gsi.odin @@ -0,0 +1,94 @@ +// key_codec_gsi.odin — Additional key codec functions for GSI support +// +// These procedures complement key_codec.odin with prefix builders needed +// for GSI scanning and querying. They follow the same encoding conventions: +// [entity_type][varint_len][segment_bytes]... +// +// Add the contents of this file to key_codec.odin (or keep as a separate file +// in the dynamodb/ package). +package dynamodb + +import "core:bytes" + +// Build GSI index prefix for scanning all entries in a GSI: +// [gsi][table_name][index_name] +build_gsi_prefix :: proc(table_name: string, index_name: string) -> []byte { + buf: bytes.Buffer + bytes.buffer_init_allocator(&buf, 0, 256, context.allocator) + + bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI)) + + encode_varint(&buf, len(table_name)) + bytes.buffer_write_string(&buf, table_name) + + encode_varint(&buf, len(index_name)) + bytes.buffer_write_string(&buf, index_name) + + return bytes.buffer_to_bytes(&buf) +} + +// Build GSI partition prefix for querying within a single partition: +// [gsi][table_name][index_name][pk_value] +build_gsi_partition_prefix :: proc(table_name: string, index_name: string, pk_value: []byte) -> []byte { + buf: bytes.Buffer + bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) + + bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI)) + + encode_varint(&buf, len(table_name)) + bytes.buffer_write_string(&buf, table_name) + + encode_varint(&buf, len(index_name)) + bytes.buffer_write_string(&buf, index_name) + + encode_varint(&buf, len(pk_value)) + bytes.buffer_write(&buf, pk_value) + + return bytes.buffer_to_bytes(&buf) +} + +// Decode a GSI key back into components +Decoded_GSI_Key :: struct { + table_name: string, + index_name: string, + pk_value: []byte, + sk_value: Maybe([]byte), +} + +decode_gsi_key :: proc(key: []byte) -> (result: Decoded_GSI_Key, ok: bool) { + decoder := Key_Decoder{data = key, pos = 0} + + entity_type := decoder_read_entity_type(&decoder) or_return + if entity_type != .GSI { + return {}, false + } + + table_name_bytes := decoder_read_segment(&decoder) or_return + result.table_name = string(table_name_bytes) + + index_name_bytes := decoder_read_segment(&decoder) or_return + result.index_name = string(index_name_bytes) + + result.pk_value = decoder_read_segment(&decoder) or_return + + if decoder_has_more(&decoder) { + sk := decoder_read_segment(&decoder) or_return + result.sk_value = sk + } + + return result, true +} + +// Build GSI prefix for deleting all GSI entries for a table (used by delete_table) +// [gsi][table_name] +build_gsi_table_prefix :: proc(table_name: string) -> []byte { + buf: bytes.Buffer + bytes.buffer_init_allocator(&buf, 0, 256, context.allocator) + + bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI)) + + encode_varint(&buf, len(table_name)) + bytes.buffer_write_string(&buf, table_name) + + return bytes.buffer_to_bytes(&buf) +} diff --git a/dynamodb/storage.odin b/dynamodb/storage.odin index 0da9d8e..b507ec9 100644 --- a/dynamodb/storage.odin +++ b/dynamodb/storage.odin @@ -84,7 +84,23 @@ table_metadata_destroy :: proc(metadata: ^Table_Metadata, allocator: mem.Allocat } delete(metadata.attribute_definitions, allocator) - // TODO: Free GSI/LSI if we implement them + // Free GSI definitions + if gsis, has_gsis := metadata.global_secondary_indexes.?; has_gsis { + for gsi in gsis { + delete(gsi.index_name, allocator) + for ks in gsi.key_schema { + delete(ks.attribute_name, allocator) + } + delete(gsi.key_schema, allocator) + if nka, has_nka := gsi.projection.non_key_attributes.?; has_nka { + for attr in nka { + delete(attr, allocator) + } + delete(nka, allocator) + } + } + delete(gsis, allocator) + } } // Get the partition key attribute name @@ -187,7 +203,6 @@ remove_table_lock :: proc(engine: ^Storage_Engine, table_name: string) { // Serialize table metadata to binary format serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) { - // Create a temporary item to hold metadata meta_item := make(Item, context.temp_allocator) defer delete(meta_item) @@ -200,7 +215,7 @@ serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) { if i > 0 { strings.write_string(&ks_builder, ",") } - fmt.sbprintf(&ks_builder, `{"AttributeName":"%s","KeyType":"%s"}`, + fmt.sbprintf(&ks_builder, `{{"AttributeName":"%s","KeyType":"%s"}}`, ks.attribute_name, key_type_to_string(ks.key_type)) } strings.write_string(&ks_builder, "]") @@ -216,7 +231,7 @@ serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) { if i > 0 { strings.write_string(&ad_builder, ",") } - fmt.sbprintf(&ad_builder, `{"AttributeName":"%s","AttributeType":"%s"}`, + fmt.sbprintf(&ad_builder, `{{"AttributeName":"%s","AttributeType":"%s"}}`, ad.attribute_name, scalar_type_to_string(ad.attribute_type)) } strings.write_string(&ad_builder, "]") @@ -227,6 +242,48 @@ serialize_table_metadata :: proc(metadata: ^Table_Metadata) -> ([]byte, bool) { meta_item["TableStatus"] = String(strings.clone(table_status_to_string(metadata.table_status))) meta_item["CreationDateTime"] = Number(fmt.aprint(metadata.creation_date_time)) + // Encode GSI definitions as JSON string + if gsis, has_gsis := metadata.global_secondary_indexes.?; has_gsis && len(gsis) > 0 { + gsi_builder := strings.builder_make(context.temp_allocator) + defer strings.builder_destroy(&gsi_builder) + + strings.write_string(&gsi_builder, "[") + for gsi, i in gsis { + if i > 0 { + strings.write_string(&gsi_builder, ",") + } + fmt.sbprintf(&gsi_builder, `{{"IndexName":"%s","KeySchema":[`, gsi.index_name) + for ks, j in gsi.key_schema { + if j > 0 { + strings.write_string(&gsi_builder, ",") + } + fmt.sbprintf(&gsi_builder, `{{"AttributeName":"%s","KeyType":"%s"}}`, + ks.attribute_name, key_type_to_string(ks.key_type)) + } + strings.write_string(&gsi_builder, `],"Projection":{{"ProjectionType":"`) + switch gsi.projection.projection_type { + case .ALL: strings.write_string(&gsi_builder, "ALL") + case .KEYS_ONLY: strings.write_string(&gsi_builder, "KEYS_ONLY") + case .INCLUDE: strings.write_string(&gsi_builder, "INCLUDE") + } + strings.write_string(&gsi_builder, `"`) + if nka, has_nka := gsi.projection.non_key_attributes.?; has_nka && len(nka) > 0 { + strings.write_string(&gsi_builder, `,"NonKeyAttributes":[`) + for attr, k in nka { + if k > 0 { + strings.write_string(&gsi_builder, ",") + } + fmt.sbprintf(&gsi_builder, `"%s"`, attr) + } + strings.write_string(&gsi_builder, "]") + } + strings.write_string(&gsi_builder, "}}") + } + strings.write_string(&gsi_builder, "]") + + meta_item["GlobalSecondaryIndexes"] = String(strings.clone(strings.to_string(gsi_builder))) + } + // Encode to binary return encode(meta_item) } @@ -282,6 +339,17 @@ deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (T } } + // Parse GlobalSecondaryIndexes from embedded JSON string + if gsi_val, gsi_found := meta_item["GlobalSecondaryIndexes"]; gsi_found { + #partial switch v in gsi_val { + case String: + gsis, gsi_ok := parse_gsis_json(string(v), allocator) + if gsi_ok && len(gsis) > 0 { + metadata.global_secondary_indexes = gsis + } + } + } + return metadata, true } @@ -463,6 +531,7 @@ create_table :: proc( table_name: string, key_schema: []Key_Schema_Element, attribute_definitions: []Attribute_Definition, + gsis: Maybe([]Global_Secondary_Index) = nil, ) -> (Table_Description, Storage_Error) { table_lock := get_or_create_table_lock(engine, table_name) sync.rw_mutex_lock(table_lock) @@ -500,6 +569,34 @@ create_table :: proc( ad.attribute_name = strings.clone(ad.attribute_name, engine.allocator) } + // Deep copy GSI definitions into engine allocator + if gsi_defs, has_gsis := gsis.?; has_gsis && len(gsi_defs) > 0 { + owned_gsis := make([]Global_Secondary_Index, len(gsi_defs), engine.allocator) + for gsi_def, i in gsi_defs { + owned_gsis[i] = Global_Secondary_Index{ + index_name = strings.clone(gsi_def.index_name, engine.allocator), + key_schema = make([]Key_Schema_Element, len(gsi_def.key_schema), engine.allocator), + projection = Projection{ + projection_type = gsi_def.projection.projection_type, + }, + } + for ks, j in gsi_def.key_schema { + owned_gsis[i].key_schema[j] = Key_Schema_Element{ + attribute_name = strings.clone(ks.attribute_name, engine.allocator), + key_type = ks.key_type, + } + } + if nka, has_nka := gsi_def.projection.non_key_attributes.?; has_nka { + owned_nka := make([]string, len(nka), engine.allocator) + for attr, k in nka { + owned_nka[k] = strings.clone(attr, engine.allocator) + } + owned_gsis[i].projection.non_key_attributes = owned_nka + } + } + metadata.global_secondary_indexes = owned_gsis + } + // Serialize and store meta_value, serialize_ok := serialize_table_metadata(&metadata) if !serialize_ok { @@ -522,6 +619,7 @@ create_table :: proc( creation_date_time = now, item_count = 0, table_size_bytes = 0, + global_secondary_indexes = gsis, } return desc, .None @@ -565,7 +663,6 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err break } - // Delete this item err: cstring rocksdb.rocksdb_delete( engine.db.handle, @@ -582,6 +679,41 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err } } + // Delete all GSI entries for this table + gsi_table_prefix := build_gsi_table_prefix(table_name) + defer delete(gsi_table_prefix) + + gsi_iter := rocksdb.rocksdb_create_iterator(engine.db.handle, engine.db.read_options) + if gsi_iter != nil { + defer rocksdb.rocksdb_iter_destroy(gsi_iter) + + rocksdb.rocksdb_iter_seek(gsi_iter, raw_data(gsi_table_prefix), c.size_t(len(gsi_table_prefix))) + + for rocksdb.rocksdb_iter_valid(gsi_iter) != 0 { + key_len: c.size_t + key_ptr := rocksdb.rocksdb_iter_key(gsi_iter, &key_len) + key_bytes := key_ptr[:key_len] + + if !has_prefix(key_bytes, gsi_table_prefix) { + break + } + + err: cstring + rocksdb.rocksdb_delete( + engine.db.handle, + engine.db.write_options, + raw_data(key_bytes), + c.size_t(len(key_bytes)), + &err, + ) + if err != nil { + rocksdb.rocksdb_free(rawptr(err)) + } + + rocksdb.rocksdb_iter_next(gsi_iter) + } + } + // Delete metadata del_err := rocksdb.db_delete(&engine.db, meta_key) if del_err != .None { @@ -639,6 +771,17 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) + // --- GSI cleanup: delete OLD GSI entries if item already exists --- + existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key) + if existing_err == .None && existing_value != nil { + defer delete(existing_value) + old_item, decode_ok := decode(existing_value) + if decode_ok { + defer item_destroy(&old_item) + gsi_delete_entries(engine, table_name, old_item, &metadata) + } + } + // Encode item encoded_item, encode_ok := encode(item) if !encode_ok { @@ -652,6 +795,12 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto return .RocksDB_Error } + // --- GSI maintenance: write NEW GSI entries --- + gsi_err := gsi_write_entries(engine, table_name, item, &metadata) + if gsi_err != .None { + return gsi_err + } + return .None } @@ -748,6 +897,17 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S storage_key := build_data_key(table_name, key_values.pk, key_values.sk) defer delete(storage_key) + // --- GSI cleanup: read existing item to know which GSI entries to remove --- + existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key) + if existing_err == .None && existing_value != nil { + defer delete(existing_value) + old_item, decode_ok := decode(existing_value) + if decode_ok { + defer item_destroy(&old_item) + gsi_delete_entries(engine, table_name, old_item, &metadata) + } + } + // Delete from RocksDB del_err := rocksdb.db_delete(&engine.db, storage_key) if del_err != .None { diff --git a/dynamodb/update_item.odin b/dynamodb/update_item.odin index 97d0e4c..81fc503 100644 --- a/dynamodb/update_item.odin +++ b/dynamodb/update_item.odin @@ -120,6 +120,12 @@ update_item :: proc( return nil, nil, .RocksDB_Error } + // --- GSI maintenance: delete old entries, write new entries --- + if old, has := old_item.?; has { + gsi_delete_entries(engine, table_name, old, &metadata) + } + gsi_write_entries(engine, table_name, existing_item, &metadata) + new_item = existing_item return old_item, new_item, .None } diff --git a/gsi_handlers.odin b/gsi_handlers.odin new file mode 100644 index 0000000..8088caa --- /dev/null +++ b/gsi_handlers.odin @@ -0,0 +1,276 @@ +// gsi_handlers.odin — GSI-related HTTP handler helpers +// +// This file lives in the main package alongside main.odin. +// It provides: +// 1. parse_global_secondary_indexes — parse GSI definitions from CreateTable request +// 2. parse_index_name — extract IndexName from Query/Scan requests +// 3. Projection type helper for response building +package main + +import "core:encoding/json" +import "core:strings" +import "dynamodb" + +// ============================================================================ +// Parse GlobalSecondaryIndexes from CreateTable request body +// +// DynamoDB CreateTable request format for GSIs: +// { +// "GlobalSecondaryIndexes": [ +// { +// "IndexName": "email-index", +// "KeySchema": [ +// { "AttributeName": "email", "KeyType": "HASH" }, +// { "AttributeName": "timestamp", "KeyType": "RANGE" } +// ], +// "Projection": { +// "ProjectionType": "ALL" | "KEYS_ONLY" | "INCLUDE", +// "NonKeyAttributes": ["attr1", "attr2"] // only for INCLUDE +// } +// } +// ] +// } +// +// Returns nil if no GSI definitions are present (valid — GSIs are optional). +// ============================================================================ + +parse_global_secondary_indexes :: proc( + root: json.Object, + attr_defs: []dynamodb.Attribute_Definition, +) -> Maybe([]dynamodb.Global_Secondary_Index) { + gsi_val, found := root["GlobalSecondaryIndexes"] + if !found { + return nil + } + + gsi_arr, ok := gsi_val.(json.Array) + if !ok || len(gsi_arr) == 0 { + return nil + } + + gsis := make([]dynamodb.Global_Secondary_Index, len(gsi_arr)) + + for elem, i in gsi_arr { + elem_obj, elem_ok := elem.(json.Object) + if !elem_ok { + cleanup_parsed_gsis(gsis[:i]) + delete(gsis) + return nil + } + + gsi, gsi_ok := parse_single_gsi(elem_obj, attr_defs) + if !gsi_ok { + cleanup_parsed_gsis(gsis[:i]) + delete(gsis) + return nil + } + + gsis[i] = gsi + } + + return gsis +} + +@(private = "file") +parse_single_gsi :: proc( + obj: json.Object, + attr_defs: []dynamodb.Attribute_Definition, +) -> (dynamodb.Global_Secondary_Index, bool) { + gsi: dynamodb.Global_Secondary_Index + + // IndexName (required) + idx_val, idx_found := obj["IndexName"] + if !idx_found { + return {}, false + } + idx_str, idx_ok := idx_val.(json.String) + if !idx_ok { + return {}, false + } + gsi.index_name = strings.clone(string(idx_str)) + + // KeySchema (required) + ks_val, ks_found := obj["KeySchema"] + if !ks_found { + delete(gsi.index_name) + return {}, false + } + ks_arr, ks_ok := ks_val.(json.Array) + if !ks_ok || len(ks_arr) == 0 || len(ks_arr) > 2 { + delete(gsi.index_name) + return {}, false + } + + key_schema := make([]dynamodb.Key_Schema_Element, len(ks_arr)) + hash_count := 0 + + for ks_elem, j in ks_arr { + ks_obj, kobj_ok := ks_elem.(json.Object) + if !kobj_ok { + for k in 0.. 0 { + nka := make([]string, len(nka_arr)) + for attr_val, k in nka_arr { + if attr_str, attr_ok := attr_val.(json.String); attr_ok { + nka[k] = strings.clone(string(attr_str)) + } + } + gsi.projection.non_key_attributes = nka + } + } + } + } + + return gsi, true +} + +@(private = "file") +cleanup_parsed_gsis :: proc(gsis: []dynamodb.Global_Secondary_Index) { + for gsi in gsis { + delete(gsi.index_name) + for ks in gsi.key_schema { + delete(ks.attribute_name) + } + delete(gsi.key_schema) + if nka, has_nka := gsi.projection.non_key_attributes.?; has_nka { + for attr in nka { delete(attr) } + delete(nka) + } + } +} + +// ============================================================================ +// Parse IndexName from Query/Scan request +// ============================================================================ + +parse_index_name :: proc(request_body: []byte) -> Maybe(string) { + data, parse_err := json.parse(request_body, allocator = context.temp_allocator) + if parse_err != nil { + return nil + } + defer json.destroy_value(data) + + root, root_ok := data.(json.Object) + if !root_ok { + return nil + } + + idx_val, found := root["IndexName"] + if !found { + return nil + } + + idx_str, ok := idx_val.(json.String) + if !ok { + return nil + } + + return string(idx_str) +} + +// ============================================================================ +// Projection type to string for DescribeTable response +// ============================================================================ + +projection_type_to_string :: proc(pt: dynamodb.Projection_Type) -> string { + switch pt { + case .ALL: return "ALL" + case .KEYS_ONLY: return "KEYS_ONLY" + case .INCLUDE: return "INCLUDE" + } + return "ALL" +} diff --git a/main.odin b/main.odin index 0636af1..ef81a34 100644 --- a/main.odin +++ b/main.odin @@ -173,8 +173,25 @@ handle_create_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Req return } + // Parse GlobalSecondaryIndexes (optional) + gsis := parse_global_secondary_indexes(root, attr_defs) + defer { + if gsi_list, has := gsis.?; has { + for &g in gsi_list { + delete(g.index_name) + for &ks in g.key_schema { delete(ks.attribute_name) } + delete(g.key_schema) + if nka, has_nka := g.projection.non_key_attributes.?; has_nka { + for a in nka { delete(a) } + delete(nka) + } + } + delete(gsi_list) + } + } + // Create the table - desc, create_err := dynamodb.create_table(engine, string(table_name), key_schema, attr_defs) + desc, create_err := dynamodb.create_table(engine, string(table_name), key_schema, attr_defs, gsis) if create_err != .None { #partial switch create_err { case .Table_Already_Exists: @@ -261,7 +278,30 @@ handle_describe_table :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_R ad.attribute_name, dynamodb.scalar_type_to_string(ad.attribute_type)) } - strings.write_string(&builder, `]}}`) + strings.write_string(&builder, `]`) + + // Include GSI Info — INSIDE the Table object, before the closing braces + if gsis, has_gsis := metadata.global_secondary_indexes.?; has_gsis && len(gsis) > 0 { + strings.write_string(&builder, `,"GlobalSecondaryIndexes":[`) + for gsi, gi in gsis { + if gi > 0 do strings.write_string(&builder, ",") + strings.write_string(&builder, `{"IndexName":"`) + strings.write_string(&builder, gsi.index_name) + strings.write_string(&builder, `","KeySchema":[`) + for ks, ki in gsi.key_schema { + if ki > 0 do strings.write_string(&builder, ",") + fmt.sbprintf(&builder, `{"AttributeName":"%s","KeyType":"%s"}`, + ks.attribute_name, dynamodb.key_type_to_string(ks.key_type)) + } + strings.write_string(&builder, `],"Projection":{"ProjectionType":"`) + strings.write_string(&builder, projection_type_to_string(gsi.projection.projection_type)) + strings.write_string(&builder, `"},"IndexStatus":"ACTIVE"}`) + } + strings.write_string(&builder, "]") + } + + // Close Table object and root object + strings.write_string(&builder, `}}`) resp_body := strings.to_string(builder) response_set_body(response, transmute([]byte)resp_body) @@ -1054,6 +1094,9 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r return } + // Grab index name from request body + index_name := parse_index_name(request.body) + // Fetch table metadata early for ExclusiveStartKey parsing metadata, meta_err := dynamodb.get_table_metadata(engine, table_name) if meta_err != .None { @@ -1081,6 +1124,8 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r copy(pk_owned, pk_bytes) defer delete(pk_owned) + // ---- Parse shared parameters BEFORE the GSI/table branch ---- + // Parse Limit limit := dynamodb.parse_limit(request.body) if limit == 0 { @@ -1107,13 +1152,6 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r sk_condition = skc } - result, err := dynamodb.query(engine, table_name, pk_owned, exclusive_start_key, limit, sk_condition) - if err != .None { - handle_storage_error(response, err) - return - } - defer dynamodb.query_result_destroy(&result) - // ---- Parse ExpressionAttributeNames/Values for filter/projection ---- attr_names := dynamodb.parse_expression_attribute_names(request.body) defer { @@ -1137,6 +1175,62 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r delete(attr_values) } + // ---- GSI query path ---- + if idx_name, has_idx := index_name.?; has_idx { + _, gsi_found := dynamodb.find_gsi(&metadata, idx_name) + if !gsi_found { + make_error_response(response, .ValidationException, + fmt.tprintf("The table does not have the specified index: %s", idx_name)) + return + } + + result, err := dynamodb.gsi_query(engine, table_name, idx_name, + pk_owned, exclusive_start_key, limit, sk_condition) + if err != .None { + handle_storage_error(response, err) + return + } + defer dynamodb.query_result_destroy(&result) + + // Apply FilterExpression + filtered_items := apply_filter_to_items(request.body, result.items, attr_names, attr_values) + scanned_count := len(result.items) + + // Apply ProjectionExpression + projection, has_proj := dynamodb.parse_projection_expression(request.body, attr_names) + final_items: []dynamodb.Item + + if has_proj && len(projection) > 0 { + projected := make([]dynamodb.Item, len(filtered_items)) + for item, i in filtered_items { + projected[i] = dynamodb.apply_projection(item, projection) + } + final_items = projected + } else { + final_items = filtered_items + } + + write_items_response_with_pagination_ex( + response, final_items, result.last_evaluated_key, &metadata, scanned_count, + ) + + if has_proj && len(projection) > 0 { + for &item in final_items { + dynamodb.item_destroy(&item) + } + delete(final_items) + } + return + } + + // ---- Main table query path ---- + result, err := dynamodb.query(engine, table_name, pk_owned, exclusive_start_key, limit, sk_condition) + if err != .None { + handle_storage_error(response, err) + return + } + defer dynamodb.query_result_destroy(&result) + // ---- Apply FilterExpression (post-query filter) ---- filtered_items := apply_filter_to_items(request.body, result.items, attr_names, attr_values) scanned_count := len(result.items) @@ -1177,6 +1271,9 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re return } + // Grab index name from request body + index_name := parse_index_name(request.body) + metadata, meta_err := dynamodb.get_table_metadata(engine, table_name) if meta_err != .None { handle_storage_error(response, meta_err) @@ -1202,13 +1299,6 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re } } - result, err := dynamodb.scan(engine, table_name, exclusive_start_key, limit) - if err != .None { - handle_storage_error(response, err) - return - } - defer dynamodb.scan_result_destroy(&result) - // ---- Parse ExpressionAttributeNames/Values for filter/projection ---- attr_names := dynamodb.parse_expression_attribute_names(request.body) defer { @@ -1232,6 +1322,59 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re delete(attr_values) } + // ---- GSI scan path ---- + if idx_name, has_idx := index_name.?; has_idx { + _, gsi_found := dynamodb.find_gsi(&metadata, idx_name) + if !gsi_found { + make_error_response(response, .ValidationException, + fmt.tprintf("The table does not have the specified index: %s", idx_name)) + return + } + + result, err := dynamodb.gsi_scan(engine, table_name, idx_name, exclusive_start_key, limit) + if err != .None { + handle_storage_error(response, err) + return + } + defer dynamodb.scan_result_destroy(&result) + + filtered_items := apply_filter_to_items(request.body, result.items, attr_names, attr_values) + scanned_count := len(result.items) + + projection, has_proj := dynamodb.parse_projection_expression(request.body, attr_names) + final_items: []dynamodb.Item + + if has_proj && len(projection) > 0 { + projected := make([]dynamodb.Item, len(filtered_items)) + for item, i in filtered_items { + projected[i] = dynamodb.apply_projection(item, projection) + } + final_items = projected + } else { + final_items = filtered_items + } + + write_items_response_with_pagination_ex( + response, final_items, result.last_evaluated_key, &metadata, scanned_count, + ) + + if has_proj && len(projection) > 0 { + for &item in final_items { + dynamodb.item_destroy(&item) + } + delete(final_items) + } + return + } + + // ---- Main table scan path ---- + result, err := dynamodb.scan(engine, table_name, exclusive_start_key, limit) + if err != .None { + handle_storage_error(response, err) + return + } + defer dynamodb.scan_result_destroy(&result) + // ---- Apply FilterExpression ---- filtered_items := apply_filter_to_items(request.body, result.items, attr_names, attr_values) scanned_count := len(result.items)