make GSI less shit
This commit is contained in:
@@ -20,6 +20,9 @@
|
|||||||
// delete → for each GSI, extract GSI key attrs from the OLD item, delete GSI entry
|
// delete → for each GSI, extract GSI key attrs from the OLD item, delete GSI entry
|
||||||
// update → delete OLD GSI entries, write NEW GSI entries
|
// update → delete OLD GSI entries, write NEW GSI entries
|
||||||
//
|
//
|
||||||
|
// ATOMICITY: All GSI operations use WriteBatch to ensure that GSI entries are
|
||||||
|
// maintained atomically with the base item write/delete.
|
||||||
|
//
|
||||||
package dynamodb
|
package dynamodb
|
||||||
|
|
||||||
import "core:slice"
|
import "core:slice"
|
||||||
@@ -156,14 +159,95 @@ gsi_project_item :: proc(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// GSI Write Maintenance
|
// GSI Write Maintenance - ATOMIC via WriteBatch
|
||||||
//
|
//
|
||||||
// Called after a successful data write to maintain GSI entries.
|
// These procedures add GSI operations to a WriteBatch instead of performing
|
||||||
// Uses WriteBatch for atomicity (all GSI entries for one item in one batch).
|
// direct database writes. This ensures atomicity with the base item operation.
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
|
// Add GSI write operations to a WriteBatch for an item across all GSIs.
|
||||||
|
// Called during put_item or update_item to maintain NEW GSI entries.
|
||||||
|
gsi_batch_write_entries :: proc(
|
||||||
|
batch: ^rocksdb.WriteBatch,
|
||||||
|
table_name: string,
|
||||||
|
item: Item,
|
||||||
|
metadata: ^Table_Metadata,
|
||||||
|
) -> Storage_Error {
|
||||||
|
gsis, has_gsis := metadata.global_secondary_indexes.?
|
||||||
|
if !has_gsis || len(gsis) == 0 {
|
||||||
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
|
for &gsi in gsis {
|
||||||
|
// Extract GSI key from item
|
||||||
|
gsi_kv, kv_ok := gsi_extract_key_values(item, gsi.key_schema)
|
||||||
|
if !kv_ok {
|
||||||
|
continue // Sparse: item doesn't have GSI PK, skip
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build GSI storage key
|
||||||
|
gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk)
|
||||||
|
defer delete(gsi_storage_key)
|
||||||
|
|
||||||
|
// Build projected item
|
||||||
|
projected := gsi_project_item(item, &gsi, metadata.key_schema)
|
||||||
|
defer item_destroy(&projected)
|
||||||
|
|
||||||
|
// Encode projected item
|
||||||
|
encoded, encode_ok := encode(projected)
|
||||||
|
if !encode_ok {
|
||||||
|
return .Serialization_Error
|
||||||
|
}
|
||||||
|
defer delete(encoded)
|
||||||
|
|
||||||
|
// Add to batch (not written yet)
|
||||||
|
rocksdb.batch_put(batch, gsi_storage_key, encoded)
|
||||||
|
}
|
||||||
|
|
||||||
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add GSI delete operations to a WriteBatch for an item across all GSIs.
|
||||||
|
// Called during delete_item or update_item to remove OLD GSI entries.
|
||||||
|
// Needs the OLD item to know which GSI keys to remove.
|
||||||
|
gsi_batch_delete_entries :: proc(
|
||||||
|
batch: ^rocksdb.WriteBatch,
|
||||||
|
table_name: string,
|
||||||
|
old_item: Item,
|
||||||
|
metadata: ^Table_Metadata,
|
||||||
|
) -> Storage_Error {
|
||||||
|
gsis, has_gsis := metadata.global_secondary_indexes.?
|
||||||
|
if !has_gsis || len(gsis) == 0 {
|
||||||
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
|
for &gsi in gsis {
|
||||||
|
gsi_kv, kv_ok := gsi_extract_key_values(old_item, gsi.key_schema)
|
||||||
|
if !kv_ok {
|
||||||
|
continue // Item didn't have a GSI entry
|
||||||
|
}
|
||||||
|
|
||||||
|
gsi_storage_key := build_gsi_key(table_name, gsi.index_name, gsi_kv.pk, gsi_kv.sk)
|
||||||
|
defer delete(gsi_storage_key)
|
||||||
|
|
||||||
|
// Add to batch (not written yet)
|
||||||
|
rocksdb.batch_delete(batch, gsi_storage_key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// DEPRECATED - Non-atomic GSI maintenance
|
||||||
|
//
|
||||||
|
// These procedures are kept for backwards compatibility but should NOT be used.
|
||||||
|
// They perform individual database writes which is NOT atomic.
|
||||||
|
// Use gsi_batch_write_entries and gsi_batch_delete_entries instead.
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
// DEPRECATED: Use gsi_batch_write_entries instead for atomic operations.
|
||||||
// Write GSI entries for an item across all GSIs defined on the table.
|
// Write GSI entries for an item across all GSIs defined on the table.
|
||||||
// Should be called AFTER the main data key is written.
|
// WARNING: This performs individual writes which is NOT atomic!
|
||||||
gsi_write_entries :: proc(
|
gsi_write_entries :: proc(
|
||||||
engine: ^Storage_Engine,
|
engine: ^Storage_Engine,
|
||||||
table_name: string,
|
table_name: string,
|
||||||
@@ -207,9 +291,9 @@ gsi_write_entries :: proc(
|
|||||||
return .None
|
return .None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DEPRECATED: Use gsi_batch_delete_entries instead for atomic operations.
|
||||||
// Delete GSI entries for an item across all GSIs.
|
// Delete GSI entries for an item across all GSIs.
|
||||||
// Should be called BEFORE or AFTER the main data key is deleted.
|
// WARNING: This performs individual writes which is NOT atomic!
|
||||||
// Needs the OLD item to know which GSI keys to remove.
|
|
||||||
gsi_delete_entries :: proc(
|
gsi_delete_entries :: proc(
|
||||||
engine: ^Storage_Engine,
|
engine: ^Storage_Engine,
|
||||||
table_name: string,
|
table_name: string,
|
||||||
|
|||||||
@@ -729,6 +729,7 @@ delete_table :: proc(engine: ^Storage_Engine, table_name: string) -> Storage_Err
|
|||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
// Put item — uses EXCLUSIVE lock (write operation)
|
// Put item — uses EXCLUSIVE lock (write operation)
|
||||||
|
// ATOMICITY: Uses WriteBatch to ensure base item + all GSI updates are atomic
|
||||||
put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Storage_Error {
|
put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Storage_Error {
|
||||||
table_lock := get_or_create_table_lock(engine, table_name)
|
table_lock := get_or_create_table_lock(engine, table_name)
|
||||||
sync.rw_mutex_lock(table_lock)
|
sync.rw_mutex_lock(table_lock)
|
||||||
@@ -771,34 +772,59 @@ put_item :: proc(engine: ^Storage_Engine, table_name: string, item: Item) -> Sto
|
|||||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||||
defer delete(storage_key)
|
defer delete(storage_key)
|
||||||
|
|
||||||
// --- GSI cleanup: delete OLD GSI entries if item already exists ---
|
// --- Check if item already exists (need old item for GSI cleanup) ---
|
||||||
|
old_item: Maybe(Item) = nil
|
||||||
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
|
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
|
||||||
if existing_err == .None && existing_value != nil {
|
if existing_err == .None && existing_value != nil {
|
||||||
defer delete(existing_value)
|
defer delete(existing_value)
|
||||||
old_item, decode_ok := decode(existing_value)
|
decoded_old, decode_ok := decode(existing_value)
|
||||||
if decode_ok {
|
if decode_ok {
|
||||||
defer item_destroy(&old_item)
|
old_item = decoded_old
|
||||||
gsi_delete_entries(engine, table_name, old_item, &metadata)
|
}
|
||||||
|
}
|
||||||
|
// Cleanup old_item at the end
|
||||||
|
defer {
|
||||||
|
if old, has_old := old_item.?; has_old {
|
||||||
|
old_copy := old
|
||||||
|
item_destroy(&old_copy)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode item
|
// Encode new item
|
||||||
encoded_item, encode_ok := encode(item)
|
encoded_item, encode_ok := encode(item)
|
||||||
if !encode_ok {
|
if !encode_ok {
|
||||||
return .Serialization_Error
|
return .Serialization_Error
|
||||||
}
|
}
|
||||||
defer delete(encoded_item)
|
defer delete(encoded_item)
|
||||||
|
|
||||||
// Store in RocksDB
|
// --- ATOMIC WRITE BATCH: base item + all GSI updates ---
|
||||||
put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item)
|
batch, batch_err := rocksdb.batch_create()
|
||||||
if put_err != .None {
|
if batch_err != .None {
|
||||||
return .RocksDB_Error
|
return .RocksDB_Error
|
||||||
}
|
}
|
||||||
|
defer rocksdb.batch_destroy(&batch)
|
||||||
|
|
||||||
// --- GSI maintenance: write NEW GSI entries ---
|
// Add base item write to batch
|
||||||
gsi_err := gsi_write_entries(engine, table_name, item, &metadata)
|
rocksdb.batch_put(&batch, storage_key, encoded_item)
|
||||||
if gsi_err != .None {
|
|
||||||
return gsi_err
|
// Add old GSI entry deletions to batch (if item existed)
|
||||||
|
if old, has_old := old_item.?; has_old {
|
||||||
|
gsi_del_err := gsi_batch_delete_entries(&batch, table_name, old, &metadata)
|
||||||
|
if gsi_del_err != .None {
|
||||||
|
return gsi_del_err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add new GSI entry writes to batch
|
||||||
|
gsi_write_err := gsi_batch_write_entries(&batch, table_name, item, &metadata)
|
||||||
|
if gsi_write_err != .None {
|
||||||
|
return gsi_write_err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write batch atomically - ALL or NOTHING
|
||||||
|
write_err := rocksdb.batch_write(&engine.db, &batch)
|
||||||
|
if write_err != .None {
|
||||||
|
return .RocksDB_Error
|
||||||
}
|
}
|
||||||
|
|
||||||
return .None
|
return .None
|
||||||
@@ -861,6 +887,7 @@ get_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> (May
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete item — uses EXCLUSIVE lock (write operation)
|
// Delete item — uses EXCLUSIVE lock (write operation)
|
||||||
|
// ATOMICITY: Uses WriteBatch to ensure base item + all GSI deletions are atomic
|
||||||
delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> Storage_Error {
|
delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> Storage_Error {
|
||||||
table_lock := get_or_create_table_lock(engine, table_name)
|
table_lock := get_or_create_table_lock(engine, table_name)
|
||||||
sync.rw_mutex_lock(table_lock)
|
sync.rw_mutex_lock(table_lock)
|
||||||
@@ -897,20 +924,50 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
|
|||||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||||
defer delete(storage_key)
|
defer delete(storage_key)
|
||||||
|
|
||||||
// --- GSI cleanup: read existing item to know which GSI entries to remove ---
|
// --- Read existing item to know which GSI entries to remove ---
|
||||||
|
old_item: Maybe(Item) = nil
|
||||||
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
|
existing_value, existing_err := rocksdb.db_get(&engine.db, storage_key)
|
||||||
if existing_err == .None && existing_value != nil {
|
if existing_err == .None && existing_value != nil {
|
||||||
defer delete(existing_value)
|
defer delete(existing_value)
|
||||||
old_item, decode_ok := decode(existing_value)
|
decoded_old, decode_ok := decode(existing_value)
|
||||||
if decode_ok {
|
if decode_ok {
|
||||||
defer item_destroy(&old_item)
|
old_item = decoded_old
|
||||||
gsi_delete_entries(engine, table_name, old_item, &metadata)
|
}
|
||||||
|
}
|
||||||
|
// Cleanup old_item at the end
|
||||||
|
defer {
|
||||||
|
if old, has_old := old_item.?; has_old {
|
||||||
|
old_copy := old
|
||||||
|
item_destroy(&old_copy)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete from RocksDB
|
// If item doesn't exist, nothing to delete (not an error in DynamoDB)
|
||||||
del_err := rocksdb.db_delete(&engine.db, storage_key)
|
if _, has_old := old_item.?; !has_old {
|
||||||
if del_err != .None {
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- ATOMIC WRITE BATCH: base item deletion + all GSI deletions ---
|
||||||
|
batch, batch_err := rocksdb.batch_create()
|
||||||
|
if batch_err != .None {
|
||||||
|
return .RocksDB_Error
|
||||||
|
}
|
||||||
|
defer rocksdb.batch_destroy(&batch)
|
||||||
|
|
||||||
|
// Add base item delete to batch
|
||||||
|
rocksdb.batch_delete(&batch, storage_key)
|
||||||
|
|
||||||
|
// Add GSI entry deletions to batch
|
||||||
|
if old, has_old := old_item.?; has_old {
|
||||||
|
gsi_del_err := gsi_batch_delete_entries(&batch, table_name, old, &metadata)
|
||||||
|
if gsi_del_err != .None {
|
||||||
|
return gsi_del_err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write batch atomically - ALL or NOTHING
|
||||||
|
write_err := rocksdb.batch_write(&engine.db, &batch)
|
||||||
|
if write_err != .None {
|
||||||
return .RocksDB_Error
|
return .RocksDB_Error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import "../rocksdb"
|
|||||||
|
|
||||||
// UpdateItem — fetch existing item, apply update plan, write back
|
// UpdateItem — fetch existing item, apply update plan, write back
|
||||||
// Uses EXCLUSIVE lock (write operation)
|
// Uses EXCLUSIVE lock (write operation)
|
||||||
|
// ATOMICITY: Uses WriteBatch to ensure base item + all GSI updates are atomic
|
||||||
//
|
//
|
||||||
// Returns:
|
// Returns:
|
||||||
// - old_item: the item BEFORE mutations (if it existed), for ReturnValues
|
// - old_item: the item BEFORE mutations (if it existed), for ReturnValues
|
||||||
@@ -59,7 +60,7 @@ update_item :: proc(
|
|||||||
return nil, nil, .Serialization_Error
|
return nil, nil, .Serialization_Error
|
||||||
}
|
}
|
||||||
existing_item = decoded
|
existing_item = decoded
|
||||||
// Save old item for ReturnValues
|
// Save old item for ReturnValues (and for GSI cleanup)
|
||||||
old_item = item_deep_copy(existing_item)
|
old_item = item_deep_copy(existing_item)
|
||||||
} else if get_err == .NotFound || existing_encoded == nil {
|
} else if get_err == .NotFound || existing_encoded == nil {
|
||||||
// Item doesn't exist yet — start with just the key attributes
|
// Item doesn't exist yet — start with just the key attributes
|
||||||
@@ -109,9 +110,46 @@ update_item :: proc(
|
|||||||
}
|
}
|
||||||
defer delete(encoded_item)
|
defer delete(encoded_item)
|
||||||
|
|
||||||
// Write back to RocksDB
|
// --- ATOMIC WRITE BATCH: base item + all GSI updates ---
|
||||||
put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item)
|
batch, batch_err := rocksdb.batch_create()
|
||||||
if put_err != .None {
|
if batch_err != .None {
|
||||||
|
item_destroy(&existing_item)
|
||||||
|
if old, has := old_item.?; has {
|
||||||
|
old_copy := old
|
||||||
|
item_destroy(&old_copy)
|
||||||
|
}
|
||||||
|
return nil, nil, .RocksDB_Error
|
||||||
|
}
|
||||||
|
defer rocksdb.batch_destroy(&batch)
|
||||||
|
|
||||||
|
// Add base item write to batch
|
||||||
|
rocksdb.batch_put(&batch, storage_key, encoded_item)
|
||||||
|
|
||||||
|
// Add old GSI entry deletions to batch (if item existed before)
|
||||||
|
if old, has := old_item.?; has {
|
||||||
|
gsi_del_err := gsi_batch_delete_entries(&batch, table_name, old, &metadata)
|
||||||
|
if gsi_del_err != .None {
|
||||||
|
item_destroy(&existing_item)
|
||||||
|
old_copy := old
|
||||||
|
item_destroy(&old_copy)
|
||||||
|
return nil, nil, gsi_del_err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add new GSI entry writes to batch
|
||||||
|
gsi_write_err := gsi_batch_write_entries(&batch, table_name, existing_item, &metadata)
|
||||||
|
if gsi_write_err != .None {
|
||||||
|
item_destroy(&existing_item)
|
||||||
|
if old, has := old_item.?; has {
|
||||||
|
old_copy := old
|
||||||
|
item_destroy(&old_copy)
|
||||||
|
}
|
||||||
|
return nil, nil, gsi_write_err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write batch atomically - ALL or NOTHING
|
||||||
|
write_err := rocksdb.batch_write(&engine.db, &batch)
|
||||||
|
if write_err != .None {
|
||||||
item_destroy(&existing_item)
|
item_destroy(&existing_item)
|
||||||
if old, has := old_item.?; has {
|
if old, has := old_item.?; has {
|
||||||
old_copy := old
|
old_copy := old
|
||||||
@@ -120,12 +158,6 @@ update_item :: proc(
|
|||||||
return nil, nil, .RocksDB_Error
|
return nil, nil, .RocksDB_Error
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- GSI maintenance: delete old entries, write new entries ---
|
|
||||||
if old, has := old_item.?; has {
|
|
||||||
gsi_delete_entries(engine, table_name, old, &metadata)
|
|
||||||
}
|
|
||||||
gsi_write_entries(engine, table_name, existing_item, &metadata)
|
|
||||||
|
|
||||||
new_item = existing_item
|
new_item = existing_item
|
||||||
return old_item, new_item, .None
|
return old_item, new_item, .None
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user