more GSI fixes
This commit is contained in:
@@ -175,15 +175,31 @@ gsi_batch_write_entries :: proc(
|
||||
return .None
|
||||
}
|
||||
|
||||
base_key, base_ok := key_from_item(item, metadata.key_schema)
|
||||
if !base_ok {
|
||||
return .Missing_Key_Attribute
|
||||
}
|
||||
defer key_destroy(&base_key)
|
||||
|
||||
base_vals, base_vals_ok := key_get_values(&base_key)
|
||||
if !base_vals_ok {
|
||||
return .Invalid_Key
|
||||
}
|
||||
|
||||
for &gsi in gsis {
|
||||
// Extract GSI key from item
|
||||
gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema)
|
||||
if !kv_ok {
|
||||
continue // Sparse: item doesn't have GSI PK, skip
|
||||
}
|
||||
if !kv_ok do continue // item doesn't have GSI PK, skip
|
||||
|
||||
// Build GSI storage key
|
||||
gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk)
|
||||
gsi_storage_key := build_gsi_key(
|
||||
table_name,
|
||||
gsi.index_name,
|
||||
gsi_kv.pk,
|
||||
gsi_kv.sk,
|
||||
base_vals.pk,
|
||||
base_vals.sk,
|
||||
)
|
||||
defer delete(gsi_storage_key)
|
||||
|
||||
// Build projected item
|
||||
@@ -218,13 +234,31 @@ gsi_batch_delete_entries :: proc(
|
||||
return .None
|
||||
}
|
||||
|
||||
for &gsi in gsis {
|
||||
gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema)
|
||||
if !kv_ok {
|
||||
continue // Item didn't have a GSI entry
|
||||
base_key, base_ok := key_from_item(old_item, metadata.key_schema)
|
||||
if !base_ok {
|
||||
return .Missing_Key_Attribute
|
||||
}
|
||||
defer key_destroy(&base_key)
|
||||
|
||||
base_vals, base_vals_ok := key_get_values(&base_key)
|
||||
if !base_vals_ok {
|
||||
return .Invalid_Key
|
||||
}
|
||||
|
||||
gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk)
|
||||
for &gsi in gsis {
|
||||
// Extract GSI key from item
|
||||
gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema)
|
||||
if !kv_ok do continue // old item doesn't have GSI PK, skip
|
||||
|
||||
// Build GSI storage key
|
||||
gsi_storage_key := build_gsi_key(
|
||||
table_name,
|
||||
gsi.index_name,
|
||||
gsi_kv.pk,
|
||||
gsi_kv.sk,
|
||||
base_vals.pk,
|
||||
base_vals.sk,
|
||||
)
|
||||
defer delete(gsi_storage_key)
|
||||
|
||||
// Add to batch (not written yet)
|
||||
|
||||
@@ -577,6 +577,56 @@ parse_exclusive_start_key :: proc(
|
||||
return
|
||||
}
|
||||
|
||||
// parse_exclusive_start_key_gsi ... Just a helper for GSI keys
|
||||
parse_exclusive_start_key_gsi :: proc(
|
||||
request_body: []byte,
|
||||
table_name: string,
|
||||
metadata: ^Table_Metadata,
|
||||
gsi: ^Global_Secondary_Index,
|
||||
) -> (Maybe([]byte), bool) {
|
||||
root, ok := json.parse(request_body)
|
||||
if ok != nil do return nil, false
|
||||
defer json.destroy_value(root)
|
||||
|
||||
// Assert the root Value as an Object before indexing
|
||||
obj, obj_ok := root.(json.Object)
|
||||
if !obj_ok do return nil, false
|
||||
|
||||
esk_val, has := obj["ExclusiveStartKey"]
|
||||
if !has do return nil, true
|
||||
|
||||
key_item, key_ok := parse_item_from_value(esk_val)
|
||||
if !key_ok do return nil, false
|
||||
defer item_destroy(&key_item)
|
||||
|
||||
// index key
|
||||
idx_key, idx_ok := key_from_item(key_item, gsi.key_schema)
|
||||
if !idx_ok do return nil, false
|
||||
defer key_destroy(&idx_key)
|
||||
|
||||
idx_vals, idx_vals_ok := key_get_values(&idx_key)
|
||||
if !idx_vals_ok do return nil, false
|
||||
|
||||
// base key
|
||||
base_key, base_ok := key_from_item(key_item, metadata.key_schema)
|
||||
if !base_ok do return nil, false
|
||||
defer key_destroy(&base_key)
|
||||
|
||||
base_vals, base_vals_ok := key_get_values(&base_key)
|
||||
if !base_vals_ok do return nil, false
|
||||
|
||||
// build the actual RocksDB GSI key
|
||||
k := build_gsi_key(
|
||||
table_name,
|
||||
gsi.index_name,
|
||||
idx_vals.pk,
|
||||
idx_vals.sk,
|
||||
base_vals.pk,
|
||||
base_vals.sk,
|
||||
)
|
||||
return k, true
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// LastEvaluatedKey Generation (Pagination Output)
|
||||
//
|
||||
@@ -648,3 +698,122 @@ serialize_last_evaluated_key :: proc(
|
||||
|
||||
return serialize_item(item), true
|
||||
}
|
||||
|
||||
Decoded_GSI_Key_Full :: struct {
|
||||
gsi_pk: []byte,
|
||||
gsi_sk: Maybe([]byte),
|
||||
base_pk: []byte,
|
||||
base_sk: Maybe([]byte),
|
||||
}
|
||||
|
||||
// Decode binary GSI key:
|
||||
//
|
||||
// [gsi][table_name][index_name][gsi_pk][gsi_sk?][base_pk][base_sk?]
|
||||
//
|
||||
// Presence of gsi_sk/base_sk depends on whether the index/table has a RANGE key.
|
||||
decode_gsi_key_full_borrowed :: proc(
|
||||
binary_key: []byte,
|
||||
gsi_has_sort_key: bool,
|
||||
table_has_sort_key: bool,
|
||||
) -> (result: Decoded_GSI_Key_Full, ok: bool) {
|
||||
decoder := Key_Decoder{data = binary_key, pos = 0}
|
||||
|
||||
et := decoder_read_entity_type(&decoder) or_return
|
||||
if et != .GSI {
|
||||
return {}, false
|
||||
}
|
||||
|
||||
// Skip table name + index name
|
||||
_ = decoder_read_segment_borrowed(&decoder) or_return
|
||||
_ = decoder_read_segment_borrowed(&decoder) or_return
|
||||
|
||||
// Read GSI PK
|
||||
result.gsi_pk = decoder_read_segment_borrowed(&decoder) or_return
|
||||
|
||||
// Read GSI SK if index has one
|
||||
if gsi_has_sort_key {
|
||||
sk := decoder_read_segment_borrowed(&decoder) or_return
|
||||
result.gsi_sk = sk
|
||||
}
|
||||
|
||||
// Read base PK
|
||||
result.base_pk = decoder_read_segment_borrowed(&decoder) or_return
|
||||
|
||||
// Read base SK if table has one
|
||||
if table_has_sort_key {
|
||||
sk := decoder_read_segment_borrowed(&decoder) or_return
|
||||
result.base_sk = sk
|
||||
}
|
||||
|
||||
return result, true
|
||||
}
|
||||
|
||||
|
||||
// Serialize a binary *GSI* key into a DynamoDB LastEvaluatedKey JSON object.
|
||||
// The output must include the *index* key attrs + the *base table* primary key attrs,
|
||||
// so boto can round-trip ExclusiveStartKey correctly.
|
||||
serialize_last_evaluated_key_gsi :: proc(
|
||||
binary_key: []byte,
|
||||
metadata: ^Table_Metadata,
|
||||
gsi: ^Global_Secondary_Index,
|
||||
) -> (result: string, ok: bool) {
|
||||
|
||||
// Determine whether index/table have range keys
|
||||
_, gsi_has_sk := gsi_get_sort_key_name(gsi).?
|
||||
_, tbl_has_sk := table_metadata_get_sort_key_name(metadata).?
|
||||
|
||||
decoded, dec_ok := decode_gsi_key_full_borrowed(binary_key, gsi_has_sk, tbl_has_sk)
|
||||
if !dec_ok {
|
||||
return "", false
|
||||
}
|
||||
|
||||
// Resolve key attribute names + types
|
||||
idx_pk_name := gsi_get_partition_key_name(gsi).? or_return
|
||||
idx_pk_type := table_metadata_get_attribute_type(metadata, idx_pk_name).? or_return
|
||||
|
||||
idx_sk_name: Maybe(string) = gsi_get_sort_key_name(gsi)
|
||||
idx_sk_type: Maybe(Scalar_Attribute_Type) = nil
|
||||
if n, has := idx_sk_name.?; has {
|
||||
idx_sk_type = table_metadata_get_attribute_type(metadata, n)
|
||||
}
|
||||
|
||||
base_pk_name := table_metadata_get_partition_key_name(metadata).? or_return
|
||||
base_pk_type := table_metadata_get_attribute_type(metadata, base_pk_name).? or_return
|
||||
|
||||
base_sk_name: Maybe(string) = table_metadata_get_sort_key_name(metadata)
|
||||
base_sk_type: Maybe(Scalar_Attribute_Type) = nil
|
||||
if n, has := base_sk_name.?; has {
|
||||
base_sk_type = table_metadata_get_attribute_type(metadata, n)
|
||||
}
|
||||
|
||||
// Build LEK item
|
||||
lek := make(Item)
|
||||
defer item_destroy(&lek)
|
||||
|
||||
add_attr_once :: proc(item: ^Item, name: string, raw: []byte, t: Scalar_Attribute_Type) {
|
||||
if _, exists := item^[name]; exists {
|
||||
return
|
||||
}
|
||||
item^[strings.clone(name)] = build_attribute_value_with_type(raw, t)
|
||||
}
|
||||
|
||||
// Index keys
|
||||
add_attr_once(&lek, idx_pk_name, decoded.gsi_pk, idx_pk_type)
|
||||
|
||||
if sk_raw, has := decoded.gsi_sk.?; has {
|
||||
skn := idx_sk_name.? or_return
|
||||
skt := idx_sk_type.? or_return
|
||||
add_attr_once(&lek, skn, sk_raw, skt)
|
||||
}
|
||||
|
||||
// Base table keys
|
||||
add_attr_once(&lek, base_pk_name, decoded.base_pk, base_pk_type)
|
||||
|
||||
if sk_raw, has := decoded.base_sk.?; has {
|
||||
skn := base_sk_name.? or_return
|
||||
skt := base_sk_type.? or_return
|
||||
add_attr_once(&lek, skn, sk_raw, skt)
|
||||
}
|
||||
|
||||
return serialize_item(lek), true
|
||||
}
|
||||
|
||||
@@ -130,32 +130,43 @@ build_partition_prefix :: proc(table_name: string, pk_value: []byte) -> []byte {
|
||||
return bytes.buffer_to_bytes(&buf)
|
||||
}
|
||||
|
||||
// Build GSI key: [gsi][table_name][index_name][gsi_pk][gsi_sk?]
|
||||
build_gsi_key :: proc(table_name: string, index_name: string, gsi_pk: []byte, gsi_sk: Maybe([]byte)) -> []byte {
|
||||
// Build GSI key: [gsi][table_name][index_name][gsi_pk][gsi_sk?][base_pk][base_sk?]
|
||||
build_gsi_key :: proc(
|
||||
table_name: string,
|
||||
index_name: string,
|
||||
gsi_pk: []byte,
|
||||
gsi_sk: Maybe([]byte),
|
||||
base_pk: []byte,
|
||||
base_sk: Maybe([]byte),
|
||||
) -> []byte {
|
||||
buf: bytes.Buffer
|
||||
bytes.buffer_init_allocator(&buf, 0, 512, context.allocator)
|
||||
|
||||
// Write entity type
|
||||
bytes.buffer_write_byte(&buf, u8(Entity_Type.GSI))
|
||||
|
||||
// Write table name
|
||||
encode_varint(&buf, len(table_name))
|
||||
bytes.buffer_write_string(&buf, table_name)
|
||||
|
||||
// Write index name
|
||||
encode_varint(&buf, len(index_name))
|
||||
bytes.buffer_write_string(&buf, index_name)
|
||||
|
||||
// Write GSI partition key
|
||||
encode_varint(&buf, len(gsi_pk))
|
||||
bytes.buffer_write(&buf, gsi_pk)
|
||||
|
||||
// Write GSI sort key if present
|
||||
if sk, ok := gsi_sk.?; ok {
|
||||
encode_varint(&buf, len(sk))
|
||||
bytes.buffer_write(&buf, sk)
|
||||
}
|
||||
|
||||
// tie-breaker: base table primary key
|
||||
encode_varint(&buf, len(base_pk))
|
||||
bytes.buffer_write(&buf, base_pk)
|
||||
|
||||
if sk, ok := base_sk.?; ok {
|
||||
encode_varint(&buf, len(sk))
|
||||
bytes.buffer_write(&buf, sk)
|
||||
}
|
||||
|
||||
return bytes.buffer_to_bytes(&buf)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user