make batch operations work
This commit is contained in:
199
dynamodb/batch.odin
Normal file
199
dynamodb/batch.odin
Normal file
@@ -0,0 +1,199 @@
|
||||
// BatchWriteItem and BatchGetItem storage operations
|
||||
//
|
||||
// BatchWriteItem: Puts or deletes multiple items across one or more tables.
|
||||
// - Up to 25 items per batch (DynamoDB limit)
|
||||
// - Each item is an independent PutRequest or DeleteRequest
|
||||
// - Partial failures are reported via UnprocessedItems
|
||||
//
|
||||
// BatchGetItem: Retrieves multiple items from one or more tables.
|
||||
// - Up to 100 items per batch (DynamoDB limit)
|
||||
// - Each table request contains a list of Keys
|
||||
// - Partial failures reported via UnprocessedKeys
|
||||
package dynamodb
|
||||
|
||||
// ============================================================================
|
||||
// BatchWriteItem Types
|
||||
// ============================================================================
|
||||
|
||||
Write_Request_Type :: enum {
|
||||
Put,
|
||||
Delete,
|
||||
}
|
||||
|
||||
Write_Request :: struct {
|
||||
type: Write_Request_Type,
|
||||
item: Item, // For Put: the full item. For Delete: the key item.
|
||||
}
|
||||
|
||||
Batch_Write_Table_Request :: struct {
|
||||
table_name: string,
|
||||
requests: []Write_Request,
|
||||
}
|
||||
|
||||
Batch_Write_Result :: struct {
|
||||
// UnprocessedItems — requests that failed and should be retried.
|
||||
// For now we process everything or return an error, so this is
|
||||
// typically empty. Populated only on partial failures.
|
||||
unprocessed: [dynamic]Batch_Write_Table_Request,
|
||||
}
|
||||
|
||||
batch_write_result_destroy :: proc(result: ^Batch_Write_Result) {
|
||||
for &table_req in result.unprocessed {
|
||||
for &req in table_req.requests {
|
||||
item_destroy(&req.item)
|
||||
}
|
||||
delete(table_req.requests)
|
||||
}
|
||||
delete(result.unprocessed)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// BatchWriteItem — Execute a batch of put/delete operations
|
||||
//
|
||||
// DynamoDB semantics:
|
||||
// - Operations within a batch are NOT atomic (some may succeed, some fail)
|
||||
// - Each operation is validated independently
|
||||
// - Failed operations go into UnprocessedItems
|
||||
// - Limit: 25 operations total across all tables
|
||||
// ============================================================================
|
||||
|
||||
batch_write_item :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
table_requests: []Batch_Write_Table_Request,
|
||||
) -> (Batch_Write_Result, Storage_Error) {
|
||||
result := Batch_Write_Result{
|
||||
unprocessed = make([dynamic]Batch_Write_Table_Request),
|
||||
}
|
||||
|
||||
for table_req in table_requests {
|
||||
failed_requests := make([dynamic]Write_Request)
|
||||
|
||||
for req in table_req.requests {
|
||||
var_err: Storage_Error
|
||||
|
||||
switch req.type {
|
||||
case .Put:
|
||||
var_err = put_item(engine, table_req.table_name, req.item)
|
||||
case .Delete:
|
||||
var_err = delete_item(engine, table_req.table_name, req.item)
|
||||
}
|
||||
|
||||
if var_err != .None {
|
||||
// Deep copy the failed request for UnprocessedItems
|
||||
failed_item := item_deep_copy(req.item)
|
||||
append(&failed_requests, Write_Request{
|
||||
type = req.type,
|
||||
item = failed_item,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if len(failed_requests) > 0 {
|
||||
append(&result.unprocessed, Batch_Write_Table_Request{
|
||||
table_name = table_req.table_name,
|
||||
requests = failed_requests[:],
|
||||
})
|
||||
} else {
|
||||
delete(failed_requests)
|
||||
}
|
||||
}
|
||||
|
||||
return result, .None
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// BatchGetItem Types
|
||||
// ============================================================================
|
||||
|
||||
Batch_Get_Table_Request :: struct {
|
||||
table_name: string,
|
||||
keys: []Item,
|
||||
}
|
||||
|
||||
Batch_Get_Table_Result :: struct {
|
||||
table_name: string,
|
||||
items: []Item,
|
||||
}
|
||||
|
||||
Batch_Get_Result :: struct {
|
||||
responses: [dynamic]Batch_Get_Table_Result,
|
||||
unprocessed_keys: [dynamic]Batch_Get_Table_Request,
|
||||
}
|
||||
|
||||
batch_get_result_destroy :: proc(result: ^Batch_Get_Result) {
|
||||
for &table_result in result.responses {
|
||||
for &item in table_result.items {
|
||||
item_destroy(&item)
|
||||
}
|
||||
delete(table_result.items)
|
||||
}
|
||||
delete(result.responses)
|
||||
|
||||
for &table_req in result.unprocessed_keys {
|
||||
for &key in table_req.keys {
|
||||
item_destroy(&key)
|
||||
}
|
||||
delete(table_req.keys)
|
||||
}
|
||||
delete(result.unprocessed_keys)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// BatchGetItem — Retrieve multiple items from one or more tables
|
||||
//
|
||||
// DynamoDB semantics:
|
||||
// - Each key is fetched independently
|
||||
// - Missing items are silently omitted (no error)
|
||||
// - Failed lookups go into UnprocessedKeys
|
||||
// - Limit: 100 keys total across all tables
|
||||
// ============================================================================
|
||||
|
||||
batch_get_item :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
table_requests: []Batch_Get_Table_Request,
|
||||
) -> (Batch_Get_Result, Storage_Error) {
|
||||
result := Batch_Get_Result{
|
||||
responses = make([dynamic]Batch_Get_Table_Result),
|
||||
unprocessed_keys = make([dynamic]Batch_Get_Table_Request),
|
||||
}
|
||||
|
||||
for table_req in table_requests {
|
||||
found_items := make([dynamic]Item)
|
||||
failed_keys := make([dynamic]Item)
|
||||
|
||||
for key in table_req.keys {
|
||||
item_result, get_err := get_item(engine, table_req.table_name, key)
|
||||
|
||||
if get_err != .None && get_err != .Item_Not_Found {
|
||||
// Storage error — add to unprocessed
|
||||
append(&failed_keys, item_deep_copy(key))
|
||||
continue
|
||||
}
|
||||
|
||||
if item_val, has_item := item_result.?; has_item {
|
||||
append(&found_items, item_val)
|
||||
}
|
||||
// If item not found, silently omit (DynamoDB behavior)
|
||||
}
|
||||
|
||||
if len(found_items) > 0 {
|
||||
append(&result.responses, Batch_Get_Table_Result{
|
||||
table_name = table_req.table_name,
|
||||
items = found_items[:],
|
||||
})
|
||||
} else {
|
||||
delete(found_items)
|
||||
}
|
||||
|
||||
if len(failed_keys) > 0 {
|
||||
append(&result.unprocessed_keys, Batch_Get_Table_Request{
|
||||
table_name = table_req.table_name,
|
||||
keys = failed_keys[:],
|
||||
})
|
||||
} else {
|
||||
delete(failed_keys)
|
||||
}
|
||||
}
|
||||
|
||||
return result, .None
|
||||
}
|
||||
118
dynamodb/condition.odin
Normal file
118
dynamodb/condition.odin
Normal file
@@ -0,0 +1,118 @@
|
||||
// ConditionExpression support for PutItem, DeleteItem, and UpdateItem
|
||||
//
|
||||
// ConditionExpression uses the same grammar as FilterExpression but is evaluated
|
||||
// against the *existing* item (before the mutation). If the condition evaluates
|
||||
// to false, the operation is rejected with ConditionalCheckFailedException.
|
||||
//
|
||||
// When there is no existing item:
|
||||
// - attribute_not_exists(path) → true (attribute doesn't exist on a non-existent item)
|
||||
// - attribute_exists(path) → false
|
||||
// - All comparisons → false (no attribute to compare)
|
||||
//
|
||||
// This file provides:
|
||||
// 1. parse_condition_expression_string — extract ConditionExpression from JSON body
|
||||
// 2. evaluate_condition — evaluate parsed condition against an item
|
||||
// 3. Condition_Result — result enum for condition evaluation
|
||||
package dynamodb
|
||||
|
||||
import "core:encoding/json"
|
||||
|
||||
// ============================================================================
|
||||
// Condition Evaluation Result
|
||||
// ============================================================================
|
||||
|
||||
Condition_Result :: enum {
|
||||
Passed, // Condition met (or no condition specified)
|
||||
Failed, // Condition not met → ConditionalCheckFailedException
|
||||
Parse_Error, // Malformed ConditionExpression → ValidationException
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Request Parsing
|
||||
// ============================================================================
|
||||
|
||||
// Extract the raw ConditionExpression string from the request body.
|
||||
parse_condition_expression_string :: proc(request_body: []byte) -> (expr: string, ok: bool) {
|
||||
data, parse_err := json.parse(request_body, allocator = context.temp_allocator)
|
||||
if parse_err != nil {
|
||||
return
|
||||
}
|
||||
defer json.destroy_value(data)
|
||||
|
||||
root, root_ok := data.(json.Object)
|
||||
if !root_ok {
|
||||
return
|
||||
}
|
||||
|
||||
ce_val, found := root["ConditionExpression"]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
ce_str, str_ok := ce_val.(json.String)
|
||||
if !str_ok {
|
||||
return
|
||||
}
|
||||
|
||||
expr = string(ce_str)
|
||||
ok = true
|
||||
return
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Full Condition Evaluation Pipeline
|
||||
//
|
||||
// Parses ConditionExpression + ExpressionAttributeNames/Values from the
|
||||
// request body, then evaluates against the existing item.
|
||||
//
|
||||
// Parameters:
|
||||
// request_body — full JSON request body
|
||||
// existing_item — the item currently in the database (nil if no item exists)
|
||||
// attr_names — pre-parsed ExpressionAttributeNames (caller may already have these)
|
||||
// attr_values — pre-parsed ExpressionAttributeValues
|
||||
//
|
||||
// Returns Condition_Result:
|
||||
// .Passed — no ConditionExpression, or condition evaluated to true
|
||||
// .Failed — condition evaluated to false
|
||||
// .Parse_Error — ConditionExpression is malformed
|
||||
// ============================================================================
|
||||
|
||||
evaluate_condition_expression :: proc(
|
||||
request_body: []byte,
|
||||
existing_item: Maybe(Item),
|
||||
attr_names: Maybe(map[string]string),
|
||||
attr_values: map[string]Attribute_Value,
|
||||
) -> Condition_Result {
|
||||
// Extract ConditionExpression string
|
||||
condition_str, has_condition := parse_condition_expression_string(request_body)
|
||||
if !has_condition {
|
||||
return .Passed // No condition → always pass
|
||||
}
|
||||
|
||||
// Parse the condition into a filter tree (same grammar as FilterExpression)
|
||||
filter_node, parse_ok := parse_filter_expression(condition_str, attr_names, attr_values)
|
||||
if !parse_ok || filter_node == nil {
|
||||
return .Parse_Error
|
||||
}
|
||||
defer {
|
||||
filter_node_destroy(filter_node)
|
||||
free(filter_node)
|
||||
}
|
||||
|
||||
// If there is no existing item, build an empty item for evaluation.
|
||||
// This means attribute_not_exists → true, attribute_exists → false,
|
||||
// all comparisons → false (attribute not found).
|
||||
eval_item: Item
|
||||
if item, has_item := existing_item.?; has_item {
|
||||
eval_item = item
|
||||
} else {
|
||||
// Empty item — no attributes exist
|
||||
eval_item = Item{}
|
||||
}
|
||||
|
||||
if evaluate_filter(eval_item, filter_node) {
|
||||
return .Passed
|
||||
}
|
||||
|
||||
return .Failed
|
||||
}
|
||||
Reference in New Issue
Block a user