diff --git a/dynamodb/gsi.odin b/dynamodb/gsi.odin index 0949aa5..dc61f96 100644 --- a/dynamodb/gsi.odin +++ b/dynamodb/gsi.odin @@ -175,15 +175,31 @@ gsi_batch_write_entries :: proc( return .None } + base_key, base_ok := key_from_item(item, metadata.key_schema) + if !base_ok { + return .Missing_Key_Attribute + } + defer key_destroy(&base_key) + + base_vals, base_vals_ok := key_get_values(&base_key) + if !base_vals_ok { + return .Invalid_Key + } + for &gsi in gsis { // Extract GSI key from item gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema) - if !kv_ok { - continue // Sparse: item doesn't have GSI PK, skip - } + if !kv_ok do continue // item doesn't have GSI PK, skip // Build GSI storage key - gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) + gsi_storage_key := build_gsi_key( + table_name, + gsi.index_name, + gsi_kv.pk, + gsi_kv.sk, + base_vals.pk, + base_vals.sk, + ) defer delete(gsi_storage_key) // Build projected item @@ -218,13 +234,31 @@ gsi_batch_delete_entries :: proc( return .None } - for &gsi in gsis { - gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema) - if !kv_ok { - continue // Item didn't have a GSI entry - } + base_key, base_ok := key_from_item(old_item, metadata.key_schema) + if !base_ok { + return .Missing_Key_Attribute + } + defer key_destroy(&base_key) - gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk) + base_vals, base_vals_ok := key_get_values(&base_key) + if !base_vals_ok { + return .Invalid_Key + } + + for &gsi in gsis { + // Extract GSI key from item + gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema) + if !kv_ok do continue // old item doesn't have GSI PK, skip + + // Build GSI storage key + gsi_storage_key := build_gsi_key( + table_name, + gsi.index_name, + gsi_kv.pk, + gsi_kv.sk, + base_vals.pk, + base_vals.sk, + ) defer delete(gsi_storage_key) // Add to batch (not written yet) diff --git a/dynamodb/json.odin b/dynamodb/json.odin index ac3ff10..a9e2ff2 100644 --- a/dynamodb/json.odin +++ b/dynamodb/json.odin @@ -577,6 +577,56 @@ parse_exclusive_start_key :: proc( return } +// parse_exclusive_start_key_gsi ... Just a helper for GSI keys +parse_exclusive_start_key_gsi :: proc( + request_body: []byte, + table_name: string, + metadata: ^Table_Metadata, + gsi: ^Global_Secondary_Index, +) -> (Maybe([]byte), bool) { + root, ok := json.parse(request_body) + if ok != nil do return nil, false + defer json.destroy_value(root) + + // Assert the root Value as an Object before indexing + obj, obj_ok := root.(json.Object) + if !obj_ok do return nil, false + + esk_val, has := obj["ExclusiveStartKey"] + if !has do return nil, true + + key_item, key_ok := parse_item_from_value(esk_val) + if !key_ok do return nil, false + defer item_destroy(&key_item) + + // index key + idx_key, idx_ok := key_from_item(key_item, gsi.key_schema) + if !idx_ok do return nil, false + defer key_destroy(&idx_key) + + idx_vals, idx_vals_ok := key_get_values(&idx_key) + if !idx_vals_ok do return nil, false + + // base key + base_key, base_ok := key_from_item(key_item, metadata.key_schema) + if !base_ok do return nil, false + defer key_destroy(&base_key) + + base_vals, base_vals_ok := key_get_values(&base_key) + if !base_vals_ok do return nil, false + + // build the actual RocksDB GSI key + k := build_gsi_key( + table_name, + gsi.index_name, + idx_vals.pk, + idx_vals.sk, + base_vals.pk, + base_vals.sk, + ) + return k, true +} + // ============================================================================ // LastEvaluatedKey Generation (Pagination Output) // @@ -648,3 +698,122 @@ serialize_last_evaluated_key :: proc( return serialize_item(item), true } + +Decoded_GSI_Key_Full :: struct { + gsi_pk: []byte, + gsi_sk: Maybe([]byte), + base_pk: []byte, + base_sk: Maybe([]byte), +} + +// Decode binary GSI key: +// +// [gsi][table_name][index_name][gsi_pk][gsi_sk?][base_pk][base_sk?] +// +// Presence of gsi_sk/base_sk depends on whether the index/table has a RANGE key. +decode_gsi_key_full_borrowed :: proc( + binary_key: []byte, + gsi_has_sort_key: bool, + table_has_sort_key: bool, +) -> (result: Decoded_GSI_Key_Full, ok: bool) { + decoder := Key_Decoder{data = binary_key, pos = 0} + + et := decoder_read_entity_type(&decoder) or_return + if et != .GSI { + return {}, false + } + + // Skip table name + index name + _ = decoder_read_segment_borrowed(&decoder) or_return + _ = decoder_read_segment_borrowed(&decoder) or_return + + // Read GSI PK + result.gsi_pk = decoder_read_segment_borrowed(&decoder) or_return + + // Read GSI SK if index has one + if gsi_has_sort_key { + sk := decoder_read_segment_borrowed(&decoder) or_return + result.gsi_sk = sk + } + + // Read base PK + result.base_pk = decoder_read_segment_borrowed(&decoder) or_return + + // Read base SK if table has one + if table_has_sort_key { + sk := decoder_read_segment_borrowed(&decoder) or_return + result.base_sk = sk + } + + return result, true +} + + +// Serialize a binary *GSI* key into a DynamoDB LastEvaluatedKey JSON object. +// The output must include the *index* key attrs + the *base table* primary key attrs, +// so boto can round-trip ExclusiveStartKey correctly. +serialize_last_evaluated_key_gsi :: proc( + binary_key: []byte, + metadata: ^Table_Metadata, + gsi: ^Global_Secondary_Index, +) -> (result: string, ok: bool) { + + // Determine whether index/table have range keys + _, gsi_has_sk := gsi_get_sort_key_name(gsi).? + _, tbl_has_sk := table_metadata_get_sort_key_name(metadata).? + + decoded, dec_ok := decode_gsi_key_full_borrowed(binary_key, gsi_has_sk, tbl_has_sk) + if !dec_ok { + return "", false + } + + // Resolve key attribute names + types + idx_pk_name := gsi_get_partition_key_name(gsi).? or_return + idx_pk_type := table_metadata_get_attribute_type(metadata, idx_pk_name).? or_return + + idx_sk_name: Maybe(string) = gsi_get_sort_key_name(gsi) + idx_sk_type: Maybe(Scalar_Attribute_Type) = nil + if n, has := idx_sk_name.?; has { + idx_sk_type = table_metadata_get_attribute_type(metadata, n) + } + + base_pk_name := table_metadata_get_partition_key_name(metadata).? or_return + base_pk_type := table_metadata_get_attribute_type(metadata, base_pk_name).? or_return + + base_sk_name: Maybe(string) = table_metadata_get_sort_key_name(metadata) + base_sk_type: Maybe(Scalar_Attribute_Type) = nil + if n, has := base_sk_name.?; has { + base_sk_type = table_metadata_get_attribute_type(metadata, n) + } + + // Build LEK item + lek := make(Item) + defer item_destroy(&lek) + + add_attr_once :: proc(item: ^Item, name: string, raw: []byte, t: Scalar_Attribute_Type) { + if _, exists := item^[name]; exists { + return + } + item^[strings.clone(name)] = build_attribute_value_with_type(raw, t) + } + + // Index keys + add_attr_once(&lek, idx_pk_name, decoded.gsi_pk, idx_pk_type) + + if sk_raw, has := decoded.gsi_sk.?; has { + skn := idx_sk_name.? or_return + skt := idx_sk_type.? or_return + add_attr_once(&lek, skn, sk_raw, skt) + } + + // Base table keys + add_attr_once(&lek, base_pk_name, decoded.base_pk, base_pk_type) + + if sk_raw, has := decoded.base_sk.?; has { + skn := base_sk_name.? or_return + skt := base_sk_type.? or_return + add_attr_once(&lek, skn, sk_raw, skt) + } + + return serialize_item(lek), true +} diff --git a/dynamodb/key_codec.odin b/dynamodb/key_codec.odin index ea6a94a..e496b76 100644 --- a/dynamodb/key_codec.odin +++ b/dynamodb/key_codec.odin @@ -130,32 +130,43 @@ build_partition_prefix :: proc(table_name: string, pk_value: []byte) -> []byte { return bytes.buffer_to_bytes(&buf) } -// Build GSI key: [gsi][table_name][index_name][gsi_pk][gsi_sk?] -build_gsi_key :: proc(table_name: string, index_name: string, gsi_pk: []byte, gsi_sk: Maybe([]byte)) -> []byte { +// Build GSI key: [gsi][table_name][index_name][gsi_pk][gsi_sk?][base_pk][base_sk?] +build_gsi_key :: proc( + table_name: string, + index_name: string, + gsi_pk: []byte, + gsi_sk: Maybe([]byte), + base_pk: []byte, + base_sk: Maybe([]byte), +) -> []byte { buf: bytes.Buffer bytes.buffer_init_allocator(&buf, 0, 512, context.allocator) - // Write entity type bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI)) - // Write table name encode_varint(&buf, len(table_name)) bytes.buffer_write_string(&buf, table_name) - // Write index name encode_varint(&buf, len(index_name)) bytes.buffer_write_string(&buf, index_name) - // Write GSI partition key encode_varint(&buf, len(gsi_pk)) bytes.buffer_write(&buf, gsi_pk) - // Write GSI sort key if present if sk, ok := gsi_sk.?; ok { encode_varint(&buf, len(sk)) bytes.buffer_write(&buf, sk) } + // tie-breaker: base table primary key + encode_varint(&buf, len(base_pk)) + bytes.buffer_write(&buf, base_pk) + + if sk, ok := base_sk.?; ok { + encode_varint(&buf, len(sk)) + bytes.buffer_write(&buf, sk) + } + return bytes.buffer_to_bytes(&buf) }