fix the stupid internal transactions
This commit is contained in:
@@ -124,13 +124,11 @@ transact_write_items :: proc(
|
|||||||
table_set[action.table_name] = true
|
table_set[action.table_name] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Acquire exclusive locks on all tables in deterministic order
|
|
||||||
// to prevent deadlocks
|
|
||||||
table_names := make([dynamic]string, allocator = context.temp_allocator)
|
table_names := make([dynamic]string, allocator = context.temp_allocator)
|
||||||
for name in table_set {
|
for name in table_set {
|
||||||
append(&table_names, name)
|
append(&table_names, name)
|
||||||
}
|
}
|
||||||
// Simple sort for deterministic lock ordering
|
// Sort for deterministic lock ordering
|
||||||
for i := 0; i < len(table_names); i += 1 {
|
for i := 0; i < len(table_names); i += 1 {
|
||||||
for j := i + 1; j < len(table_names); j += 1 {
|
for j := i + 1; j < len(table_names); j += 1 {
|
||||||
if table_names[j] < table_names[i] {
|
if table_names[j] < table_names[i] {
|
||||||
@@ -146,17 +144,15 @@ transact_write_items :: proc(
|
|||||||
append(&locks, lock)
|
append(&locks, lock)
|
||||||
}
|
}
|
||||||
defer {
|
defer {
|
||||||
// Release all locks in reverse order
|
|
||||||
for i := len(locks) - 1; i >= 0; i -= 1 {
|
for i := len(locks) - 1; i >= 0; i -= 1 {
|
||||||
sync.rw_mutex_unlock(locks[i])
|
sync.rw_mutex_unlock(locks[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Step 2: Pre-flight — fetch metadata and existing items, evaluate conditions ----
|
// ---- Step 2: Fetch metadata and evaluate conditions ----
|
||||||
reasons := make([]Cancellation_Reason, len(actions))
|
reasons := make([]Cancellation_Reason, len(actions))
|
||||||
any_failed := false
|
any_failed := false
|
||||||
|
|
||||||
// Cache table metadata to avoid redundant lookups
|
|
||||||
metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator)
|
metadata_cache := make(map[string]Table_Metadata, allocator = context.temp_allocator)
|
||||||
defer {
|
defer {
|
||||||
for _, meta in metadata_cache {
|
for _, meta in metadata_cache {
|
||||||
@@ -166,7 +162,6 @@ transact_write_items :: proc(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for action, idx in actions {
|
for action, idx in actions {
|
||||||
// Get table metadata (cached)
|
|
||||||
metadata: ^Table_Metadata
|
metadata: ^Table_Metadata
|
||||||
if cached, found := &metadata_cache[action.table_name]; found {
|
if cached, found := &metadata_cache[action.table_name]; found {
|
||||||
metadata = cached
|
metadata = cached
|
||||||
@@ -184,12 +179,11 @@ transact_write_items :: proc(
|
|||||||
metadata = &metadata_cache[action.table_name]
|
metadata = &metadata_cache[action.table_name]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine the key item for this action
|
|
||||||
key_item: Item
|
key_item: Item
|
||||||
switch action.type {
|
switch action.type {
|
||||||
case .Put:
|
case .Put:
|
||||||
if item, has := action.item.?; has {
|
if item, has := action.item.?; has {
|
||||||
key_item = item // For Put, key is extracted from the item
|
key_item = item
|
||||||
} else {
|
} else {
|
||||||
reasons[idx] = Cancellation_Reason{
|
reasons[idx] = Cancellation_Reason{
|
||||||
code = "ValidationError",
|
code = "ValidationError",
|
||||||
@@ -211,9 +205,8 @@ transact_write_items :: proc(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Evaluate ConditionExpression if present
|
// Evaluate ConditionExpression
|
||||||
if cond_str, has_cond := action.condition_expr.?; has_cond {
|
if cond_str, has_cond := action.condition_expr.?; has_cond {
|
||||||
// Fetch existing item
|
|
||||||
existing_item, get_err := get_item_internal(engine, action.table_name, key_item, metadata)
|
existing_item, get_err := get_item_internal(engine, action.table_name, key_item, metadata)
|
||||||
if get_err != .None && get_err != .Item_Not_Found {
|
if get_err != .None && get_err != .Item_Not_Found {
|
||||||
reasons[idx] = Cancellation_Reason{
|
reasons[idx] = Cancellation_Reason{
|
||||||
@@ -230,7 +223,6 @@ transact_write_items :: proc(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse and evaluate condition
|
|
||||||
filter_node, parse_ok := parse_filter_expression(
|
filter_node, parse_ok := parse_filter_expression(
|
||||||
cond_str, action.expr_attr_names, action.expr_attr_values,
|
cond_str, action.expr_attr_names, action.expr_attr_values,
|
||||||
)
|
)
|
||||||
@@ -263,13 +255,12 @@ transact_write_items :: proc(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConditionCheck actions only validate — they don't mutate
|
|
||||||
if action.type == .Condition_Check {
|
if action.type == .Condition_Check {
|
||||||
reasons[idx] = Cancellation_Reason{code = "None"}
|
reasons[idx] = Cancellation_Reason{code = "None"}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate key/item against schema
|
// Validate key/item
|
||||||
switch action.type {
|
switch action.type {
|
||||||
case .Put:
|
case .Put:
|
||||||
if item, has := action.item.?; has {
|
if item, has := action.item.?; has {
|
||||||
@@ -286,35 +277,87 @@ transact_write_items :: proc(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case .Delete, .Update:
|
case .Delete, .Update:
|
||||||
// Key validation happens during execution
|
// Key validation happens during batch building
|
||||||
case .Condition_Check:
|
case .Condition_Check:
|
||||||
// Already handled above
|
// Already handled
|
||||||
}
|
}
|
||||||
|
|
||||||
reasons[idx] = Cancellation_Reason{code = "None"}
|
reasons[idx] = Cancellation_Reason{code = "None"}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Step 3: If any condition failed, return cancellation ----
|
|
||||||
if any_failed {
|
if any_failed {
|
||||||
result.cancellation_reasons = reasons
|
result.cancellation_reasons = reasons
|
||||||
return result, .Cancelled
|
return result, .Cancelled
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Step 4: Apply all mutations ----
|
// ---- Step 3: Build atomic WriteBatch with all operations ----
|
||||||
for &action, idx in actions {
|
batch, batch_err := rocksdb.batch_create()
|
||||||
metadata := &metadata_cache[action.table_name]
|
if batch_err != .None {
|
||||||
|
|
||||||
apply_err := transact_apply_action(engine, &action, metadata)
|
|
||||||
if apply_err != .None {
|
|
||||||
// This shouldn't happen after pre-validation, but handle gracefully
|
|
||||||
reasons[idx] = Cancellation_Reason{
|
|
||||||
code = "InternalError",
|
|
||||||
message = "Failed to apply mutation",
|
|
||||||
}
|
|
||||||
// In a real impl we'd need to rollback. For now, report the failure.
|
|
||||||
result.cancellation_reasons = reasons
|
result.cancellation_reasons = reasons
|
||||||
return result, .Internal_Error
|
return result, .Internal_Error
|
||||||
}
|
}
|
||||||
|
defer rocksdb.batch_destroy(&batch)
|
||||||
|
|
||||||
|
// Read old items for GSI cleanup (must happen before batch write)
|
||||||
|
old_items := make([]Maybe(Item), len(actions), allocator = context.temp_allocator)
|
||||||
|
defer {
|
||||||
|
for old_item in old_items {
|
||||||
|
if old, has := old_item.?; has {
|
||||||
|
old_copy := old
|
||||||
|
item_destroy(&old_copy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for action, idx in actions {
|
||||||
|
if action.type == .Condition_Check {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
metadata := &metadata_cache[action.table_name]
|
||||||
|
|
||||||
|
// Read old item if needed for GSI cleanup
|
||||||
|
key_item: Item
|
||||||
|
#partial switch action.type {
|
||||||
|
case .Put:
|
||||||
|
if item, has := action.item.?; has {
|
||||||
|
key_item = item
|
||||||
|
}
|
||||||
|
case .Delete, .Update:
|
||||||
|
if key, has := action.key.?; has {
|
||||||
|
key_item = key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
existing, _ := get_item_internal(engine, action.table_name, key_item, metadata)
|
||||||
|
old_items[idx] = existing
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add all operations to batch
|
||||||
|
for &action, idx in actions {
|
||||||
|
if action.type == .Condition_Check {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
metadata := &metadata_cache[action.table_name]
|
||||||
|
old_item := old_items[idx]
|
||||||
|
|
||||||
|
apply_err := transact_apply_action_batch(&batch, engine, &action, metadata, old_item)
|
||||||
|
if apply_err != .None {
|
||||||
|
reasons[idx] = Cancellation_Reason{
|
||||||
|
code = "InternalError",
|
||||||
|
message = "Failed to build mutation",
|
||||||
|
}
|
||||||
|
result.cancellation_reasons = reasons
|
||||||
|
return result, .Internal_Error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- Step 4: Write batch atomically (ALL or NOTHING) ----
|
||||||
|
write_err := rocksdb.batch_write(&engine.db, &batch)
|
||||||
|
if write_err != .None {
|
||||||
|
result.cancellation_reasons = reasons
|
||||||
|
return result, .Internal_Error
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(reasons)
|
delete(reasons)
|
||||||
@@ -323,21 +366,23 @@ transact_write_items :: proc(
|
|||||||
|
|
||||||
// Apply a single transact write action (called after all conditions have passed)
|
// Apply a single transact write action (called after all conditions have passed)
|
||||||
@(private = "file")
|
@(private = "file")
|
||||||
transact_apply_action :: proc(
|
transact_apply_action_batch :: proc(
|
||||||
|
batch: ^rocksdb.WriteBatch,
|
||||||
engine: ^Storage_Engine,
|
engine: ^Storage_Engine,
|
||||||
action: ^Transact_Write_Action,
|
action: ^Transact_Write_Action,
|
||||||
metadata: ^Table_Metadata,
|
metadata: ^Table_Metadata,
|
||||||
|
old_item: Maybe(Item),
|
||||||
) -> Storage_Error {
|
) -> Storage_Error {
|
||||||
switch action.type {
|
switch action.type {
|
||||||
case .Put:
|
case .Put:
|
||||||
if item, has := action.item.?; has {
|
if item, has := action.item.?; has {
|
||||||
return put_item_internal(engine, action.table_name, item, metadata)
|
return put_item_batch(batch, engine, action.table_name, item, metadata, old_item)
|
||||||
}
|
}
|
||||||
return .Invalid_Key
|
return .Invalid_Key
|
||||||
|
|
||||||
case .Delete:
|
case .Delete:
|
||||||
if key, has := action.key.?; has {
|
if key, has := action.key.?; has {
|
||||||
return delete_item_internal(engine, action.table_name, key, metadata)
|
return delete_item_batch(batch, engine, action.table_name, key, metadata, old_item)
|
||||||
}
|
}
|
||||||
return .Invalid_Key
|
return .Invalid_Key
|
||||||
|
|
||||||
@@ -345,19 +390,177 @@ transact_apply_action :: proc(
|
|||||||
if key, has := action.key.?; has {
|
if key, has := action.key.?; has {
|
||||||
if plan, has_plan := action.update_plan.?; has_plan {
|
if plan, has_plan := action.update_plan.?; has_plan {
|
||||||
plan_copy := plan
|
plan_copy := plan
|
||||||
_, _, err := update_item_internal(engine, action.table_name, key, &plan_copy, metadata)
|
return update_item_batch(batch, engine, action.table_name, key, &plan_copy, metadata, old_item)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
return .Invalid_Key
|
return .Invalid_Key
|
||||||
}
|
}
|
||||||
return .Invalid_Key
|
return .Invalid_Key
|
||||||
|
|
||||||
case .Condition_Check:
|
case .Condition_Check:
|
||||||
return .None // No mutation
|
return .None
|
||||||
}
|
}
|
||||||
return .None
|
return .None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@(private = "file")
|
||||||
|
put_item_batch :: proc(
|
||||||
|
batch: ^rocksdb.WriteBatch,
|
||||||
|
engine: ^Storage_Engine,
|
||||||
|
table_name: string,
|
||||||
|
item: Item,
|
||||||
|
metadata: ^Table_Metadata,
|
||||||
|
old_item: Maybe(Item),
|
||||||
|
) -> Storage_Error {
|
||||||
|
key_struct, key_ok := key_from_item(item, metadata.key_schema)
|
||||||
|
if !key_ok {
|
||||||
|
return .Missing_Key_Attribute
|
||||||
|
}
|
||||||
|
defer key_destroy(&key_struct)
|
||||||
|
|
||||||
|
key_values, kv_ok := key_get_values(&key_struct)
|
||||||
|
if !kv_ok {
|
||||||
|
return .Invalid_Key
|
||||||
|
}
|
||||||
|
|
||||||
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||||
|
defer delete(storage_key)
|
||||||
|
|
||||||
|
encoded_item, encode_ok := encode(item)
|
||||||
|
if !encode_ok {
|
||||||
|
return .Serialization_Error
|
||||||
|
}
|
||||||
|
defer delete(encoded_item)
|
||||||
|
|
||||||
|
// Add base item to batch
|
||||||
|
rocksdb.batch_put(batch, storage_key, encoded_item)
|
||||||
|
|
||||||
|
// Add old GSI deletions to batch
|
||||||
|
if old, has_old := old_item.?; has_old {
|
||||||
|
gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata)
|
||||||
|
if gsi_del_err != .None {
|
||||||
|
return gsi_del_err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add new GSI writes to batch
|
||||||
|
gsi_write_err := gsi_batch_write_entries(batch, table_name, item, metadata)
|
||||||
|
if gsi_write_err != .None {
|
||||||
|
return gsi_write_err
|
||||||
|
}
|
||||||
|
|
||||||
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add delete operation to batch (with GSI cleanup)
|
||||||
|
@(private = "file")
|
||||||
|
delete_item_batch :: proc(
|
||||||
|
batch: ^rocksdb.WriteBatch,
|
||||||
|
engine: ^Storage_Engine,
|
||||||
|
table_name: string,
|
||||||
|
key: Item,
|
||||||
|
metadata: ^Table_Metadata,
|
||||||
|
old_item: Maybe(Item),
|
||||||
|
) -> Storage_Error {
|
||||||
|
key_struct, key_ok := key_from_item(key, metadata.key_schema)
|
||||||
|
if !key_ok {
|
||||||
|
return .Missing_Key_Attribute
|
||||||
|
}
|
||||||
|
defer key_destroy(&key_struct)
|
||||||
|
|
||||||
|
key_values, kv_ok := key_get_values(&key_struct)
|
||||||
|
if !kv_ok {
|
||||||
|
return .Invalid_Key
|
||||||
|
}
|
||||||
|
|
||||||
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||||
|
defer delete(storage_key)
|
||||||
|
|
||||||
|
// Add base item delete to batch
|
||||||
|
rocksdb.batch_delete(batch, storage_key)
|
||||||
|
|
||||||
|
// Add GSI deletions to batch
|
||||||
|
if old, has_old := old_item.?; has_old {
|
||||||
|
gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata)
|
||||||
|
if gsi_del_err != .None {
|
||||||
|
return gsi_del_err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add update operation to batch (with GSI maintenance)
|
||||||
|
@(private = "file")
|
||||||
|
update_item_batch :: proc(
|
||||||
|
batch: ^rocksdb.WriteBatch,
|
||||||
|
engine: ^Storage_Engine,
|
||||||
|
table_name: string,
|
||||||
|
key_item: Item,
|
||||||
|
plan: ^Update_Plan,
|
||||||
|
metadata: ^Table_Metadata,
|
||||||
|
old_item_pre: Maybe(Item),
|
||||||
|
) -> Storage_Error {
|
||||||
|
key_struct, key_ok := key_from_item(key_item, metadata.key_schema)
|
||||||
|
if !key_ok {
|
||||||
|
return .Missing_Key_Attribute
|
||||||
|
}
|
||||||
|
defer key_destroy(&key_struct)
|
||||||
|
|
||||||
|
key_values, kv_ok := key_get_values(&key_struct)
|
||||||
|
if !kv_ok {
|
||||||
|
return .Invalid_Key
|
||||||
|
}
|
||||||
|
|
||||||
|
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
||||||
|
defer delete(storage_key)
|
||||||
|
|
||||||
|
// Start with existing item or create new
|
||||||
|
existing_item: Item
|
||||||
|
if old, has_old := old_item_pre.?; has_old {
|
||||||
|
existing_item = item_deep_copy(old)
|
||||||
|
} else {
|
||||||
|
existing_item = make(Item)
|
||||||
|
for ks in metadata.key_schema {
|
||||||
|
if val, found := key_item[ks.attribute_name]; found {
|
||||||
|
existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer item_destroy(&existing_item)
|
||||||
|
|
||||||
|
// Apply update plan
|
||||||
|
if !execute_update_plan(&existing_item, plan) {
|
||||||
|
return .Invalid_Key
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode updated item
|
||||||
|
encoded_item, encode_ok := encode(existing_item)
|
||||||
|
if !encode_ok {
|
||||||
|
return .Serialization_Error
|
||||||
|
}
|
||||||
|
defer delete(encoded_item)
|
||||||
|
|
||||||
|
// Add base item to batch
|
||||||
|
rocksdb.batch_put(batch, storage_key, encoded_item)
|
||||||
|
|
||||||
|
// Add old GSI deletions to batch
|
||||||
|
if old, has_old := old_item_pre.?; has_old {
|
||||||
|
gsi_del_err := gsi_batch_delete_entries(batch, table_name, old, metadata)
|
||||||
|
if gsi_del_err != .None {
|
||||||
|
return gsi_del_err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add new GSI writes to batch
|
||||||
|
gsi_write_err := gsi_batch_write_entries(batch, table_name, existing_item, metadata)
|
||||||
|
if gsi_write_err != .None {
|
||||||
|
return gsi_write_err
|
||||||
|
}
|
||||||
|
|
||||||
|
return .None
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Internal storage operations that skip lock acquisition
|
// Internal storage operations that skip lock acquisition
|
||||||
// (Used by transact_write_items which manages its own locking)
|
// (Used by transact_write_items which manages its own locking)
|
||||||
@@ -400,146 +603,6 @@ get_item_internal :: proc(
|
|||||||
return item, .None
|
return item, .None
|
||||||
}
|
}
|
||||||
|
|
||||||
put_item_internal :: proc(
|
|
||||||
engine: ^Storage_Engine,
|
|
||||||
table_name: string,
|
|
||||||
item: Item,
|
|
||||||
metadata: ^Table_Metadata,
|
|
||||||
) -> Storage_Error {
|
|
||||||
key_struct, key_ok := key_from_item(item, metadata.key_schema)
|
|
||||||
if !key_ok {
|
|
||||||
return .Missing_Key_Attribute
|
|
||||||
}
|
|
||||||
defer key_destroy(&key_struct)
|
|
||||||
|
|
||||||
key_values, kv_ok := key_get_values(&key_struct)
|
|
||||||
if !kv_ok {
|
|
||||||
return .Invalid_Key
|
|
||||||
}
|
|
||||||
|
|
||||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
|
||||||
defer delete(storage_key)
|
|
||||||
|
|
||||||
encoded_item, encode_ok := encode(item)
|
|
||||||
if !encode_ok {
|
|
||||||
return .Serialization_Error
|
|
||||||
}
|
|
||||||
defer delete(encoded_item)
|
|
||||||
|
|
||||||
put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item)
|
|
||||||
if put_err != .None {
|
|
||||||
return .RocksDB_Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return .None
|
|
||||||
}
|
|
||||||
|
|
||||||
delete_item_internal :: proc(
|
|
||||||
engine: ^Storage_Engine,
|
|
||||||
table_name: string,
|
|
||||||
key: Item,
|
|
||||||
metadata: ^Table_Metadata,
|
|
||||||
) -> Storage_Error {
|
|
||||||
key_struct, key_ok := key_from_item(key, metadata.key_schema)
|
|
||||||
if !key_ok {
|
|
||||||
return .Missing_Key_Attribute
|
|
||||||
}
|
|
||||||
defer key_destroy(&key_struct)
|
|
||||||
|
|
||||||
key_values, kv_ok := key_get_values(&key_struct)
|
|
||||||
if !kv_ok {
|
|
||||||
return .Invalid_Key
|
|
||||||
}
|
|
||||||
|
|
||||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
|
||||||
defer delete(storage_key)
|
|
||||||
|
|
||||||
del_err := rocksdb.db_delete(&engine.db, storage_key)
|
|
||||||
if del_err != .None {
|
|
||||||
return .RocksDB_Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return .None
|
|
||||||
}
|
|
||||||
|
|
||||||
update_item_internal :: proc(
|
|
||||||
engine: ^Storage_Engine,
|
|
||||||
table_name: string,
|
|
||||||
key_item: Item,
|
|
||||||
plan: ^Update_Plan,
|
|
||||||
metadata: ^Table_Metadata,
|
|
||||||
) -> (old_item: Maybe(Item), new_item: Maybe(Item), err: Storage_Error) {
|
|
||||||
key_struct, key_ok := key_from_item(key_item, metadata.key_schema)
|
|
||||||
if !key_ok {
|
|
||||||
return nil, nil, .Missing_Key_Attribute
|
|
||||||
}
|
|
||||||
defer key_destroy(&key_struct)
|
|
||||||
|
|
||||||
key_values, kv_ok := key_get_values(&key_struct)
|
|
||||||
if !kv_ok {
|
|
||||||
return nil, nil, .Invalid_Key
|
|
||||||
}
|
|
||||||
|
|
||||||
storage_key := build_data_key(table_name, key_values.pk, key_values.sk)
|
|
||||||
defer delete(storage_key)
|
|
||||||
|
|
||||||
// Fetch existing item
|
|
||||||
existing_encoded, get_err := rocksdb.db_get(&engine.db, storage_key)
|
|
||||||
existing_item: Item
|
|
||||||
|
|
||||||
if get_err == .None && existing_encoded != nil {
|
|
||||||
defer delete(existing_encoded)
|
|
||||||
decoded, decode_ok := decode(existing_encoded)
|
|
||||||
if !decode_ok {
|
|
||||||
return nil, nil, .Serialization_Error
|
|
||||||
}
|
|
||||||
existing_item = decoded
|
|
||||||
old_item = item_deep_copy(existing_item)
|
|
||||||
} else if get_err == .NotFound || existing_encoded == nil {
|
|
||||||
existing_item = make(Item)
|
|
||||||
for ks in metadata.key_schema {
|
|
||||||
if val, found := key_item[ks.attribute_name]; found {
|
|
||||||
existing_item[strings.clone(ks.attribute_name)] = attr_value_deep_copy(val)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return nil, nil, .RocksDB_Error
|
|
||||||
}
|
|
||||||
|
|
||||||
if !execute_update_plan(&existing_item, plan) {
|
|
||||||
item_destroy(&existing_item)
|
|
||||||
if old, has := old_item.?; has {
|
|
||||||
old_copy := old
|
|
||||||
item_destroy(&old_copy)
|
|
||||||
}
|
|
||||||
return nil, nil, .Invalid_Key
|
|
||||||
}
|
|
||||||
|
|
||||||
encoded_item, encode_ok := encode(existing_item)
|
|
||||||
if !encode_ok {
|
|
||||||
item_destroy(&existing_item)
|
|
||||||
if old, has := old_item.?; has {
|
|
||||||
old_copy := old
|
|
||||||
item_destroy(&old_copy)
|
|
||||||
}
|
|
||||||
return nil, nil, .Serialization_Error
|
|
||||||
}
|
|
||||||
defer delete(encoded_item)
|
|
||||||
|
|
||||||
put_err := rocksdb.db_put(&engine.db, storage_key, encoded_item)
|
|
||||||
if put_err != .None {
|
|
||||||
item_destroy(&existing_item)
|
|
||||||
if old, has := old_item.?; has {
|
|
||||||
old_copy := old
|
|
||||||
item_destroy(&old_copy)
|
|
||||||
}
|
|
||||||
return nil, nil, .RocksDB_Error
|
|
||||||
}
|
|
||||||
|
|
||||||
new_item = existing_item
|
|
||||||
return old_item, new_item, .None
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// TransactGetItems Types
|
// TransactGetItems Types
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|||||||
Reference in New Issue
Block a user