actually make filters work right
This commit is contained in:
328
main.odin
328
main.odin
@@ -362,18 +362,136 @@ handle_delete_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Requ
|
||||
response_set_body(response, transmute([]byte)string("{}"))
|
||||
}
|
||||
|
||||
// UpdateItem — minimal stub: supports SET for scalar attributes
|
||||
// UpdateItem ...
|
||||
handle_update_item :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
|
||||
// TODO: Implement UpdateExpression parsing (SET x = :val, REMOVE y, etc.)
|
||||
// For now, return a clear error so callers know it's not yet supported.
|
||||
make_error_response(response, .ValidationException,
|
||||
"UpdateItem is not yet supported. Use PutItem to replace the full item.")
|
||||
// Parse TableName
|
||||
table_name, ok := dynamodb.parse_table_name(request.body)
|
||||
if !ok {
|
||||
make_error_response(response, .ValidationException, "Invalid request or missing TableName")
|
||||
return
|
||||
}
|
||||
|
||||
// Parse Key
|
||||
key_item, key_ok := dynamodb.parse_key_from_request(request.body)
|
||||
if !key_ok {
|
||||
make_error_response(response, .ValidationException, "Invalid or missing Key")
|
||||
return
|
||||
}
|
||||
defer dynamodb.item_destroy(&key_item)
|
||||
|
||||
// Parse UpdateExpression
|
||||
update_expr, ue_ok := dynamodb.parse_update_expression_string(request.body)
|
||||
if !ue_ok {
|
||||
make_error_response(response, .ValidationException, "Missing or invalid UpdateExpression")
|
||||
return
|
||||
}
|
||||
|
||||
// Parse ExpressionAttributeNames and ExpressionAttributeValues
|
||||
attr_names := dynamodb.parse_expression_attribute_names(request.body)
|
||||
defer {
|
||||
if names, has_names := attr_names.?; has_names {
|
||||
for k, v in names {
|
||||
delete(k)
|
||||
delete(v)
|
||||
}
|
||||
names_copy := names
|
||||
delete(names_copy)
|
||||
}
|
||||
}
|
||||
|
||||
attr_values, vals_ok := dynamodb.parse_expression_attribute_values(request.body)
|
||||
if !vals_ok {
|
||||
make_error_response(response, .ValidationException, "Invalid ExpressionAttributeValues")
|
||||
return
|
||||
}
|
||||
defer {
|
||||
for k, v in attr_values {
|
||||
delete(k)
|
||||
v_copy := v
|
||||
dynamodb.attr_value_destroy(&v_copy)
|
||||
}
|
||||
delete(attr_values)
|
||||
}
|
||||
|
||||
// Parse update plan
|
||||
plan, plan_ok := dynamodb.parse_update_expression(update_expr, attr_names, attr_values)
|
||||
if !plan_ok {
|
||||
make_error_response(response, .ValidationException, "Failed to parse UpdateExpression")
|
||||
return
|
||||
}
|
||||
defer dynamodb.update_plan_destroy(&plan)
|
||||
|
||||
// Parse ReturnValues
|
||||
return_values := dynamodb.parse_return_values(request.body)
|
||||
|
||||
// Execute update
|
||||
old_item, new_item, err := dynamodb.update_item(engine, table_name, key_item, &plan)
|
||||
if err != .None {
|
||||
handle_storage_error(response, err)
|
||||
return
|
||||
}
|
||||
defer {
|
||||
if old, has := old_item.?; has {
|
||||
old_copy := old
|
||||
dynamodb.item_destroy(&old_copy)
|
||||
}
|
||||
if new_val, has := new_item.?; has {
|
||||
new_copy := new_val
|
||||
dynamodb.item_destroy(&new_copy)
|
||||
}
|
||||
}
|
||||
|
||||
// Build response based on ReturnValues
|
||||
switch return_values {
|
||||
case "ALL_NEW":
|
||||
if new_val, has := new_item.?; has {
|
||||
item_json := dynamodb.serialize_item(new_val)
|
||||
resp := fmt.aprintf(`{"Attributes":%s}`, item_json)
|
||||
response_set_body(response, transmute([]byte)resp)
|
||||
} else {
|
||||
response_set_body(response, transmute([]byte)string("{}"))
|
||||
}
|
||||
|
||||
case "ALL_OLD":
|
||||
if old, has := old_item.?; has {
|
||||
item_json := dynamodb.serialize_item(old)
|
||||
resp := fmt.aprintf(`{"Attributes":%s}`, item_json)
|
||||
response_set_body(response, transmute([]byte)resp)
|
||||
} else {
|
||||
response_set_body(response, transmute([]byte)string("{}"))
|
||||
}
|
||||
|
||||
case "UPDATED_NEW":
|
||||
// Return only the attributes that were updated (in the new item)
|
||||
// For simplicity, return the full new item (DynamoDB returns affected attributes)
|
||||
if new_val, has := new_item.?; has {
|
||||
item_json := dynamodb.serialize_item(new_val)
|
||||
resp := fmt.aprintf(`{"Attributes":%s}`, item_json)
|
||||
response_set_body(response, transmute([]byte)resp)
|
||||
} else {
|
||||
response_set_body(response, transmute([]byte)string("{}"))
|
||||
}
|
||||
|
||||
case "UPDATED_OLD":
|
||||
if old, has := old_item.?; has {
|
||||
item_json := dynamodb.serialize_item(old)
|
||||
resp := fmt.aprintf(`{"Attributes":%s}`, item_json)
|
||||
response_set_body(response, transmute([]byte)resp)
|
||||
} else {
|
||||
response_set_body(response, transmute([]byte)string("{}"))
|
||||
}
|
||||
|
||||
case:
|
||||
// "NONE" or default
|
||||
response_set_body(response, transmute([]byte)string("{}"))
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Query and Scan Operations
|
||||
// ============================================================================
|
||||
|
||||
// handle_query ...
|
||||
handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
|
||||
table_name, ok := dynamodb.parse_table_name(request.body)
|
||||
if !ok {
|
||||
@@ -381,7 +499,7 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
||||
return
|
||||
}
|
||||
|
||||
// ---- Fetch table metadata early so we can parse ExclusiveStartKey ----
|
||||
// Fetch table metadata early for ExclusiveStartKey parsing
|
||||
metadata, meta_err := dynamodb.get_table_metadata(engine, table_name)
|
||||
if meta_err != .None {
|
||||
handle_storage_error(response, meta_err)
|
||||
@@ -404,7 +522,6 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
||||
return
|
||||
}
|
||||
|
||||
// Clone pk_bytes so it survives kc cleanup (kc borrows from the parsed value)
|
||||
pk_owned := make([]byte, len(pk_bytes))
|
||||
copy(pk_owned, pk_bytes)
|
||||
defer delete(pk_owned)
|
||||
@@ -415,7 +532,7 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
||||
limit = 100
|
||||
}
|
||||
|
||||
// ---- Parse ExclusiveStartKey with proper type handling ----
|
||||
// Parse ExclusiveStartKey
|
||||
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key(
|
||||
request.body, table_name, metadata.key_schema,
|
||||
)
|
||||
@@ -429,7 +546,7 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Pass sort key condition through to storage layer ----
|
||||
// Pass sort key condition through
|
||||
sk_condition: Maybe(dynamodb.Sort_Key_Condition) = nil
|
||||
if skc, has_skc := kc.sk_condition.?; has_skc {
|
||||
sk_condition = skc
|
||||
@@ -442,10 +559,62 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r
|
||||
}
|
||||
defer dynamodb.query_result_destroy(&result)
|
||||
|
||||
// Build response with proper pagination
|
||||
write_items_response_with_pagination(response, result.items, result.last_evaluated_key, &metadata)
|
||||
// ---- Parse ExpressionAttributeNames/Values for filter/projection ----
|
||||
attr_names := dynamodb.parse_expression_attribute_names(request.body)
|
||||
defer {
|
||||
if names, has_names := attr_names.?; has_names {
|
||||
for k, v in names {
|
||||
delete(k)
|
||||
delete(v)
|
||||
}
|
||||
names_copy := names
|
||||
delete(names_copy)
|
||||
}
|
||||
}
|
||||
|
||||
attr_values, _ := dynamodb.parse_expression_attribute_values(request.body)
|
||||
defer {
|
||||
for k, v in attr_values {
|
||||
delete(k)
|
||||
v_copy := v
|
||||
dynamodb.attr_value_destroy(&v_copy)
|
||||
}
|
||||
delete(attr_values)
|
||||
}
|
||||
|
||||
// ---- Apply FilterExpression (post-query filter) ----
|
||||
filtered_items := apply_filter_to_items(request.body, result.items, attr_names, attr_values)
|
||||
scanned_count := len(result.items)
|
||||
|
||||
// ---- Apply ProjectionExpression ----
|
||||
projection, has_proj := dynamodb.parse_projection_expression(request.body, attr_names)
|
||||
final_items: []dynamodb.Item
|
||||
|
||||
if has_proj && len(projection) > 0 {
|
||||
projected := make([]dynamodb.Item, len(filtered_items))
|
||||
for item, i in filtered_items {
|
||||
projected[i] = dynamodb.apply_projection(item, projection)
|
||||
}
|
||||
final_items = projected
|
||||
} else {
|
||||
final_items = filtered_items
|
||||
}
|
||||
|
||||
// Build response
|
||||
write_items_response_with_pagination_ex(
|
||||
response, final_items, result.last_evaluated_key, &metadata, scanned_count,
|
||||
)
|
||||
|
||||
// Cleanup projected items if we created them
|
||||
if has_proj && len(projection) > 0 {
|
||||
for &item in final_items {
|
||||
dynamodb.item_destroy(&item)
|
||||
}
|
||||
delete(final_items)
|
||||
}
|
||||
}
|
||||
|
||||
// handle_scan ...
|
||||
handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) {
|
||||
table_name, ok := dynamodb.parse_table_name(request.body)
|
||||
if !ok {
|
||||
@@ -453,7 +622,6 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
|
||||
return
|
||||
}
|
||||
|
||||
// ---- Fetch table metadata early so we can parse ExclusiveStartKey ----
|
||||
metadata, meta_err := dynamodb.get_table_metadata(engine, table_name)
|
||||
if meta_err != .None {
|
||||
handle_storage_error(response, meta_err)
|
||||
@@ -461,13 +629,11 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
|
||||
}
|
||||
defer dynamodb.table_metadata_destroy(&metadata, engine.allocator)
|
||||
|
||||
// Parse Limit (default to 100 if not specified)
|
||||
limit := dynamodb.parse_limit(request.body)
|
||||
if limit == 0 {
|
||||
limit = 100
|
||||
}
|
||||
|
||||
// ---- Parse ExclusiveStartKey with proper type handling ----
|
||||
exclusive_start_key, esk_ok := dynamodb.parse_exclusive_start_key(
|
||||
request.body, table_name, metadata.key_schema,
|
||||
)
|
||||
@@ -481,7 +647,6 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
|
||||
}
|
||||
}
|
||||
|
||||
// Perform scan
|
||||
result, err := dynamodb.scan(engine, table_name, exclusive_start_key, limit)
|
||||
if err != .None {
|
||||
handle_storage_error(response, err)
|
||||
@@ -489,10 +654,139 @@ handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, re
|
||||
}
|
||||
defer dynamodb.scan_result_destroy(&result)
|
||||
|
||||
// Build response with proper pagination
|
||||
write_items_response_with_pagination(response, result.items, result.last_evaluated_key, &metadata)
|
||||
// ---- Parse ExpressionAttributeNames/Values for filter/projection ----
|
||||
attr_names := dynamodb.parse_expression_attribute_names(request.body)
|
||||
defer {
|
||||
if names, has_names := attr_names.?; has_names {
|
||||
for k, v in names {
|
||||
delete(k)
|
||||
delete(v)
|
||||
}
|
||||
names_copy := names
|
||||
delete(names_copy)
|
||||
}
|
||||
}
|
||||
|
||||
attr_values, _ := dynamodb.parse_expression_attribute_values(request.body)
|
||||
defer {
|
||||
for k, v in attr_values {
|
||||
delete(k)
|
||||
v_copy := v
|
||||
dynamodb.attr_value_destroy(&v_copy)
|
||||
}
|
||||
delete(attr_values)
|
||||
}
|
||||
|
||||
// ---- Apply FilterExpression ----
|
||||
filtered_items := apply_filter_to_items(request.body, result.items, attr_names, attr_values)
|
||||
scanned_count := len(result.items)
|
||||
|
||||
// ---- Apply ProjectionExpression ----
|
||||
projection, has_proj := dynamodb.parse_projection_expression(request.body, attr_names)
|
||||
final_items: []dynamodb.Item
|
||||
|
||||
if has_proj && len(projection) > 0 {
|
||||
projected := make([]dynamodb.Item, len(filtered_items))
|
||||
for item, i in filtered_items {
|
||||
projected[i] = dynamodb.apply_projection(item, projection)
|
||||
}
|
||||
final_items = projected
|
||||
} else {
|
||||
final_items = filtered_items
|
||||
}
|
||||
|
||||
// Build response
|
||||
write_items_response_with_pagination_ex(
|
||||
response, final_items, result.last_evaluated_key, &metadata, scanned_count,
|
||||
)
|
||||
|
||||
if has_proj && len(projection) > 0 {
|
||||
for &item in final_items {
|
||||
dynamodb.item_destroy(&item)
|
||||
}
|
||||
delete(final_items)
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Shared helper: apply FilterExpression to a set of items
|
||||
// ============================================================================
|
||||
|
||||
apply_filter_to_items :: proc(
|
||||
request_body: []byte,
|
||||
items: []dynamodb.Item,
|
||||
attr_names: Maybe(map[string]string),
|
||||
attr_values: map[string]dynamodb.Attribute_Value,
|
||||
) -> []dynamodb.Item {
|
||||
filter_expr, has_filter := dynamodb.parse_filter_expression_string(request_body)
|
||||
if !has_filter {
|
||||
return items // no filter, return as-is
|
||||
}
|
||||
|
||||
filter_node, filter_ok := dynamodb.parse_filter_expression(filter_expr, attr_names, attr_values)
|
||||
if !filter_ok || filter_node == nil {
|
||||
return items // failed to parse, return unfiltered
|
||||
}
|
||||
defer {
|
||||
dynamodb.filter_node_destroy(filter_node)
|
||||
free(filter_node)
|
||||
}
|
||||
|
||||
// Filter items
|
||||
filtered := make([dynamic]dynamodb.Item)
|
||||
for item in items {
|
||||
if dynamodb.evaluate_filter(item, filter_node) {
|
||||
append(&filtered, item)
|
||||
}
|
||||
}
|
||||
|
||||
return filtered[:]
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Extended pagination response builder (includes ScannedCount vs Count)
|
||||
//
|
||||
// DynamoDB distinguishes:
|
||||
// Count = number of items AFTER FilterExpression
|
||||
// ScannedCount = number of items BEFORE FilterExpression
|
||||
// ============================================================================
|
||||
|
||||
write_items_response_with_pagination_ex :: proc(
|
||||
response: ^HTTP_Response,
|
||||
items: []dynamodb.Item,
|
||||
last_evaluated_key_binary: Maybe([]byte),
|
||||
metadata: ^dynamodb.Table_Metadata,
|
||||
scanned_count: int,
|
||||
) {
|
||||
builder := strings.builder_make()
|
||||
strings.write_string(&builder, `{"Items":[`)
|
||||
|
||||
for item, i in items {
|
||||
if i > 0 do strings.write_string(&builder, ",")
|
||||
item_json := dynamodb.serialize_item(item)
|
||||
strings.write_string(&builder, item_json)
|
||||
}
|
||||
|
||||
strings.write_string(&builder, `],"Count":`)
|
||||
fmt.sbprintf(&builder, "%d", len(items))
|
||||
strings.write_string(&builder, `,"ScannedCount":`)
|
||||
fmt.sbprintf(&builder, "%d", scanned_count)
|
||||
|
||||
if binary_key, has_last := last_evaluated_key_binary.?; has_last {
|
||||
lek_json, lek_ok := dynamodb.serialize_last_evaluated_key(binary_key, metadata)
|
||||
if lek_ok {
|
||||
strings.write_string(&builder, `,"LastEvaluatedKey":`)
|
||||
strings.write_string(&builder, lek_json)
|
||||
}
|
||||
}
|
||||
|
||||
strings.write_string(&builder, "}")
|
||||
|
||||
resp_body := strings.to_string(builder)
|
||||
response_set_body(response, transmute([]byte)resp_body)
|
||||
}
|
||||
|
||||
|
||||
// ============================================================================
|
||||
// Shared Pagination Response Builder
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user