diff --git a/dynamodb/storage.odin b/dynamodb/storage.odin index d0f4733..df51235 100644 --- a/dynamodb/storage.odin +++ b/dynamodb/storage.odin @@ -1,6 +1,7 @@ // Storage engine mapping DynamoDB operations to RocksDB package dynamodb +import "core:c" import "core:fmt" import "core:mem" import "core:slice" @@ -521,6 +522,113 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S return .None } +scan :: proc( + engine: ^Storage_Engine, + table_name: string, + exclusive_start_key: Maybe([]byte), + limit: int, +) -> (Scan_Result, Storage_Error) { + table_lock := get_or_create_table_lock(engine, table_name) + sync.rw_mutex_shared_lock(table_lock) + defer sync.rw_mutex_shared_unlock(table_lock) + + // Verify table exists + metadata, meta_err := get_table_metadata(engine, table_name) + if meta_err != .None { + return {}, meta_err + } + defer table_metadata_destroy(&metadata, engine.allocator) + + // Create iterator + iter := rocksdb.rocksdb_create_iterator(engine.db.handle, engine.db.read_options) + if iter == nil { + return {}, .RocksDB_Error + } + defer rocksdb.rocksdb_iter_destroy(iter) + + // Build table prefix to scan + table_prefix := build_table_prefix(table_name) + defer delete(table_prefix) + + // Seek to start position + if start_key, has_start := exclusive_start_key.?; has_start { + // Resume from pagination token + rocksdb.rocksdb_iter_seek(iter, raw_data(start_key), c.size_t(len(start_key))) + // Skip the start key itself (it's exclusive) + if rocksdb.rocksdb_iter_valid(iter) != 0 { + rocksdb.rocksdb_iter_next(iter) + } + } else { + // Start from beginning of table + rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix))) + } + + // Collect items + items := make([dynamic]Item, context.temp_allocator) + count := 0 + last_key: Maybe([]byte) = nil + + for rocksdb.rocksdb_iter_valid(iter) != 0 { + // Get current key + key_len: c.size_t + key_ptr := rocksdb.rocksdb_iter_key(iter, &key_len) + key_bytes := key_ptr[:key_len] + + // Check if key still has table prefix + if !has_prefix(key_bytes, table_prefix) { + break + } + + // Check limit + if count >= limit { + // Save this key as pagination token + last_key = slice.clone(key_bytes, engine.allocator) + break + } + + // Get value + value_len: c.size_t + value_ptr := rocksdb.rocksdb_iter_value(iter, &value_len) + value_bytes := value_ptr[:value_len] + + // Decode item + item, decode_ok := decode(value_bytes) + if !decode_ok { + // Skip corrupted items + rocksdb.rocksdb_iter_next(iter) + continue + } + + append(&items, item) + count += 1 + + // Move to next + rocksdb.rocksdb_iter_next(iter) + } + + // Convert to slice (owned by caller's allocator) + result_items := make([]Item, len(items), engine.allocator) + copy(result_items, items[:]) + + return Scan_Result{ + items = result_items, + last_evaluated_key = last_key, + }, .None +} + +// Helper to check if a byte slice has a prefix +has_prefix :: proc(data: []byte, prefix: []byte) -> bool { + if len(data) < len(prefix) { + return false + } + for i in 0.. []string { // TODO: Implement by iterating over meta keys diff --git a/main.odin b/main.odin index f8e5523..7e8ea9d 100644 --- a/main.odin +++ b/main.odin @@ -398,11 +398,64 @@ handle_query :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, r } handle_scan :: proc(engine: ^dynamodb.Storage_Engine, request: ^HTTP_Request, response: ^HTTP_Response) { - _ = engine - _ = request + table_name, ok := dynamodb.parse_table_name(request.body) + if !ok { + make_error_response(response, .ValidationException, "Invalid request or missing TableName") + return + } - // TODO: Implement scan operation in storage.odin - make_error_response(response, .ValidationException, "Scan operation not yet implemented") + // Parse Limit (default to 100 if not specified) + limit := dynamodb.parse_limit(request.body) + if limit == 0 { + limit = 100 + } + + // Parse ExclusiveStartKey if present + // For now, we'll implement basic scan without ExclusiveStartKey parsing + // TODO: Parse ExclusiveStartKey from request body and convert to binary key + exclusive_start_key: Maybe([]byte) = nil + + // Perform scan + result, err := dynamodb.scan(engine, table_name, exclusive_start_key, limit) + if err != .None { + #partial switch err { + case .Table_Not_Found: + make_error_response(response, .ResourceNotFoundException, "Table not found") + case: + make_error_response(response, .InternalServerError, "Failed to scan table") + } + return + } + defer dynamodb.scan_result_destroy(&result) + + // Build response + builder := strings.builder_make() + strings.write_string(&builder, `{"Items":[`) + + for item, i in result.items { + if i > 0 do strings.write_string(&builder, ",") + item_json := dynamodb.serialize_item(item) + strings.write_string(&builder, item_json) + } + + strings.write_string(&builder, `],"Count":`) + fmt.sbprintf(&builder, "%d", len(result.items)) + strings.write_string(&builder, `,"ScannedCount":`) + fmt.sbprintf(&builder, "%d", len(result.items)) + + // Add LastEvaluatedKey if present (pagination) + if last_key, has_last := result.last_evaluated_key.?; has_last { + // TODO: Convert binary key back to DynamoDB JSON format + // For now, we'll just include it as base64 (not DynamoDB-compatible yet) + _ = last_key + // When fully implemented, this should decode the key and serialize as: + // ,"LastEvaluatedKey":{"pk":{"S":"value"},"sk":{"N":"123"}} + } + + strings.write_string(&builder, "}") + + resp_body := strings.to_string(builder) + response_set_body(response, transmute([]byte)resp_body) } // ============================================================================