// Global Secondary Index (GSI) support // // DynamoDB GSI semantics: // - GSI entries are maintained automatically on every write (put/delete/update) // - Each GSI has its own key schema (partition key + optional sort key) // - GSI keys are built from item attributes; if an item doesn't have the GSI // key attribute(s), NO GSI entry is written (sparse index) // - Projection controls which non-key attributes are stored in the GSI entry: // ALL → entire item is copied // KEYS_ONLY → only table PK/SK + GSI PK/SK // INCLUDE → table keys + GSI keys + specified non-key attributes // - Query on a GSI uses IndexName to route to the correct key prefix // // Storage layout: // GSI key: [0x03][table_name][index_name][gsi_pk_value][gsi_sk_value?] // GSI value: TLV-encoded projected item (same binary format as regular items) // // Write path: // put_item → for each GSI, extract GSI key attrs from the NEW item, write GSI entry // delete → for each GSI, extract GSI key attrs from the OLD item, delete GSI entry // update → delete OLD GSI entries, write NEW GSI entries // // ATOMICITY: All GSI operations use WriteBatch to ensure that GSI entries are // maintained atomically with the base item write/delete. // package dynamodb import "core:slice" import "core:strings" import "../rocksdb" // ============================================================================ // GSI Key Extraction // // Extracts the GSI partition key (and optional sort key) raw bytes from an item. // Returns false if the item doesn't have the required GSI PK attribute (sparse). // ============================================================================ GSI_Key_Values :: struct { pk: []byte, sk: Maybe([]byte), } // Extract GSI key values from an item based on the GSI's key schema. // Returns ok=false if ANY required key attribute is missing (sparse index). // DynamoDB sparse index semantics: item must have ALL key attributes defined in the GSI schema. gsi_extract_key_values :: proc(item: Item, gsi_key_schema: []Key_Schema_Element) -> (GSI_Key_Values, bool) { result: GSI_Key_Values for ks in gsi_key_schema { attr, found := item[ks.attribute_name] if !found { // Any key attribute missing → sparse index, skip this item return {}, false } raw, raw_ok := attr_value_to_bytes(attr) if !raw_ok { // Can't convert attribute to bytes → skip this item return {}, false } switch ks.key_type { case .HASH: result.pk = raw case .RANGE: result.sk = raw } } return result, true } // Convert a scalar attribute value to its raw byte representation (borrowed). attr_value_to_bytes :: proc(attr: Attribute_Value) -> ([]byte, bool) { #partial switch v in attr { case String: return transmute([]byte)string(v), true case DDB_Number: return encode_ddb_number_for_sort(v), true case Binary: return transmute([]byte)string(v), true } return nil, false } // ============================================================================ // GSI Projection // // Build a projected copy of an item for storage in a GSI entry. // ============================================================================ // Build the projected item for a GSI entry. // The result is a new Item that the caller owns. gsi_project_item :: proc( item: Item, gsi: ^Global_Secondary_Index, table_key_schema: []Key_Schema_Element, ) -> Item { switch gsi.projection.projection_type { case .ALL: return item_deep_copy(item) case .KEYS_ONLY: projected := make(Item) // Include table key attributes for ks in table_key_schema { if val, found := item[ks.attribute_name]; found { projected[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) } } // Include GSI key attributes for ks in gsi.key_schema { if _, already := projected[ks.attribute_name]; already { continue // Already included as table key } if val, found := item[ks.attribute_name]; found { projected[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) } } return projected case .INCLUDE: projected := make(Item) // Include table key attributes for ks in table_key_schema { if val, found := item[ks.attribute_name]; found { projected[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) } } // Include GSI key attributes for ks in gsi.key_schema { if _, already := projected[ks.attribute_name]; already { continue } if val, found := item[ks.attribute_name]; found { projected[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val) } } // Include specified non-key attributes if nka, has_nka := gsi.projection.non_key_attributes.?; has_nka { for attr_name in nka { if _, already := projected[attr_name]; already { continue } if val, found := item[attr_name]; found { projected[strings.clone(attr_name)] = attr_value_deep_copy(val) } } } return projected } // Fallback: all return item_deep_copy(item) } // ============================================================================ // GSI Write Maintenance - ATOMIC via WriteBatch // // These procedures add GSI operations to a WriteBatch instead of performing // direct database writes. This ensures atomicity with the base item operation. // ============================================================================ // Add GSI write operations to a WriteBatch for an item across all GSIs. // Called during put_item or update_item to maintain NEW GSI entries. gsi_batch_write_entries :: proc( batch: ^rocksdb.WriteBatch, table_name: string, item: Item, metadata: ^Table_Metadata, ) -> Storage_Error { gsis, has_gsis := metadata.global_secondary_indexes.? if !has_gsis || len(gsis) == 0 { return .None } for &gsi in gsis { // Extract GSI key from item gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema) if !kv_ok { continue // Sparse: item doesn't have GSI PK, skip } // Build GSI storage key gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) defer delete(gsi_storage_key) // Build projected item projected := gsi_project_item(item, &gsi, metadata.key_schema) defer item_destroy(&projected) // Encode projected item encoded, encode_ok := encode(projected) if !encode_ok { return .Serialization_Error } defer delete(encoded) // Add to batch (not written yet) rocksdb.batch_put(batch, gsi_storage_key, encoded) } return .None } // Add GSI delete operations to a WriteBatch for an item across all GSIs. // Called during delete_item or update_item to remove OLD GSI entries. // Needs the OLD item to know which GSI keys to remove. gsi_batch_delete_entries :: proc( batch: ^rocksdb.WriteBatch, table_name: string, old_item: Item, metadata: ^Table_Metadata, ) -> Storage_Error { gsis, has_gsis := metadata.global_secondary_indexes.? if !has_gsis || len(gsis) == 0 { return .None } for &gsi in gsis { gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema) if !kv_ok { continue // Item didn't have a GSI entry } gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) defer delete(gsi_storage_key) // Add to batch (not written yet) rocksdb.batch_delete(batch, gsi_storage_key) } return .None } // ============================================================================ // DEPRECATED - Non-atomic GSI maintenance // // These procedures are kept for backwards compatibility but should NOT be used. // They perform individual database writes which is NOT atomic. // Use gsi_batch_write_entries and gsi_batch_delete_entries instead. // ============================================================================ // DEPRECATED: Use gsi_batch_write_entries instead for atomic operations. // Write GSI entries for an item across all GSIs defined on the table. // WARNING: This performs individual writes which is NOT atomic! gsi_write_entries :: proc( engine: ^Storage_Engine, table_name: string, item: Item, metadata: ^Table_Metadata, ) -> Storage_Error { gsis, has_gsis := metadata.global_secondary_indexes.? if !has_gsis || len(gsis) == 0 { return .None } for &gsi in gsis { // Extract GSI key from item gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema) if !kv_ok { continue // Sparse: item doesn't have GSI PK, skip } // Build GSI storage key gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) defer delete(gsi_storage_key) // Build projected item projected := gsi_project_item(item, &gsi, metadata.key_schema) defer item_destroy(&projected) // Encode projected item encoded, encode_ok := encode(projected) if !encode_ok { return .Serialization_Error } defer delete(encoded) // Write to RocksDB put_err := rocksdb.db_put(&engine.db, gsi_storage_key, encoded) if put_err != .None { return .RocksDB_Error } } return .None } // DEPRECATED: Use gsi_batch_delete_entries instead for atomic operations. // Delete GSI entries for an item across all GSIs. // WARNING: This performs individual writes which is NOT atomic! gsi_delete_entries :: proc( engine: ^Storage_Engine, table_name: string, old_item: Item, metadata: ^Table_Metadata, ) -> Storage_Error { gsis, has_gsis := metadata.global_secondary_indexes.? if !has_gsis || len(gsis) == 0 { return .None } for &gsi in gsis { gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema) if !kv_ok { continue // Item didn't have a GSI entry } gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) defer delete(gsi_storage_key) del_err := rocksdb.db_delete(&engine.db, gsi_storage_key) if del_err != .None { return .RocksDB_Error } } return .None } // ============================================================================ // GSI Query // // Queries a GSI by partition key with optional sort key condition. // Mirrors the main table query() but uses GSI key prefix. // ============================================================================ gsi_query :: proc( engine: ^Storage_Engine, table_name: string, index_name: string, partition_key_value: []byte, exclusive_start_key: Maybe([]byte), limit: int, sk_condition: Maybe(Sort_Key_Condition) = nil, ) -> (Query_Result, Storage_Error) { // Build GSI partition prefix prefix := build_gsi_partition_prefix(table_name, index_name, partition_key_value) defer delete(prefix) iter, iter_err := rocksdb.iter_create(&engine.db) if iter_err != .None { return {}, .RocksDB_Error } defer rocksdb.iter_destroy(&iter) max_items := limit if limit > 0 else 1_000_000 // Seek to start position if start_key, has_start := exclusive_start_key.?; has_start { if has_prefix(start_key, prefix) { rocksdb.iter_seek(&iter, start_key) if rocksdb.iter_valid(&iter) { rocksdb.iter_next(&iter) } } else { rocksdb.iter_seek(&iter, prefix) } } else { rocksdb.iter_seek(&iter, prefix) } items := make([dynamic]Item) count := 0 last_key: Maybe([]byte) = nil has_more := false for rocksdb.iter_valid(&iter) { key := rocksdb.iter_key(&iter) if key == nil || !has_prefix(key, prefix) { break } if count >= max_items { has_more = true break } value := rocksdb.iter_value(&iter) if value == nil { rocksdb.iter_next(&iter) continue } item, decode_ok := decode(value) if !decode_ok { rocksdb.iter_next(&iter) continue } // Sort key condition filtering if skc, has_skc := sk_condition.?; has_skc { if !evaluate_sort_key_condition(item, &skc) { item_copy := item item_destroy(&item_copy) rocksdb.iter_next(&iter) continue } } append(&items, item) count += 1 // Track key of last returned item if prev_key, had_prev := last_key.?; had_prev { delete(prev_key) } last_key = slice.clone(key) rocksdb.iter_next(&iter) } // Only emit LastEvaluatedKey if there are more items if !has_more { if lk, had_lk := last_key.?; had_lk { delete(lk) } last_key = nil } result_items := make([]Item, len(items)) copy(result_items, items[:]) return Query_Result{ items = result_items, last_evaluated_key = last_key, }, .None } // ============================================================================ // GSI Scan // // Scans all entries in a GSI (all partition keys under that index). // ============================================================================ gsi_scan :: proc( engine: ^Storage_Engine, table_name: string, index_name: string, exclusive_start_key: Maybe([]byte), limit: int, ) -> (Scan_Result, Storage_Error) { prefix := build_gsi_prefix(table_name, index_name) defer delete(prefix) iter, iter_err := rocksdb.iter_create(&engine.db) if iter_err != .None { return {}, .RocksDB_Error } defer rocksdb.iter_destroy(&iter) max_items := limit if limit > 0 else 1_000_000 if start_key, has_start := exclusive_start_key.?; has_start { if has_prefix(start_key, prefix) { rocksdb.iter_seek(&iter, start_key) if rocksdb.iter_valid(&iter) { rocksdb.iter_next(&iter) } } else { rocksdb.iter_seek(&iter, prefix) } } else { rocksdb.iter_seek(&iter, prefix) } items := make([dynamic]Item) count := 0 last_key: Maybe([]byte) = nil has_more := false for rocksdb.iter_valid(&iter) { key := rocksdb.iter_key(&iter) if key == nil || !has_prefix(key, prefix) { break } if count >= max_items { has_more = true break } value := rocksdb.iter_value(&iter) if value == nil { rocksdb.iter_next(&iter) continue } item, decode_ok := decode(value) if !decode_ok { rocksdb.iter_next(&iter) continue } append(&items, item) count += 1 if prev_key, had_prev := last_key.?; had_prev { delete(prev_key) } last_key = slice.clone(key) rocksdb.iter_next(&iter) } if !has_more { if lk, had_lk := last_key.?; had_lk { delete(lk) } last_key = nil } result_items := make([]Item, len(items)) copy(result_items, items[:]) return Scan_Result{ items = result_items, last_evaluated_key = last_key, }, .None } // ============================================================================ // GSI Metadata Lookup Helpers // ============================================================================ // Find a GSI definition by index name in the table metadata. find_gsi :: proc(metadata: ^Table_Metadata, index_name: string) -> (^Global_Secondary_Index, bool) { gsis, has_gsis := metadata.global_secondary_indexes.? if !has_gsis { return nil, false } for &gsi in gsis { if gsi.index_name == index_name { return &gsi, true } } return nil, false } // Get the GSI's sort key attribute name (if any). gsi_get_sort_key_name :: proc(gsi: ^Global_Secondary_Index) -> Maybe(string) { for ks in gsi.key_schema { if ks.key_type == .RANGE { return ks.attribute_name } } return nil } // Get the GSI's partition key attribute name. gsi_get_partition_key_name :: proc(gsi: ^Global_Secondary_Index) -> Maybe(string) { for ks in gsi.key_schema { if ks.key_type == .HASH { return ks.attribute_name } } return nil }