flesh out the scan a bit
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
// Storage engine mapping DynamoDB operations to RocksDB
|
||||
package dynamodb
|
||||
|
||||
import "core:c"
|
||||
import "core:fmt"
|
||||
import "core:mem"
|
||||
import "core:slice"
|
||||
@@ -521,6 +522,113 @@ delete_item :: proc(engine: ^Storage_Engine, table_name: string, key: Item) -> S
|
||||
return .None
|
||||
}
|
||||
|
||||
scan :: proc(
|
||||
engine: ^Storage_Engine,
|
||||
table_name: string,
|
||||
exclusive_start_key: Maybe([]byte),
|
||||
limit: int,
|
||||
) -> (Scan_Result, Storage_Error) {
|
||||
table_lock := get_or_create_table_lock(engine, table_name)
|
||||
sync.rw_mutex_shared_lock(table_lock)
|
||||
defer sync.rw_mutex_shared_unlock(table_lock)
|
||||
|
||||
// Verify table exists
|
||||
metadata, meta_err := get_table_metadata(engine, table_name)
|
||||
if meta_err != .None {
|
||||
return {}, meta_err
|
||||
}
|
||||
defer table_metadata_destroy(&metadata, engine.allocator)
|
||||
|
||||
// Create iterator
|
||||
iter := rocksdb.rocksdb_create_iterator(engine.db.handle, engine.db.read_options)
|
||||
if iter == nil {
|
||||
return {}, .RocksDB_Error
|
||||
}
|
||||
defer rocksdb.rocksdb_iter_destroy(iter)
|
||||
|
||||
// Build table prefix to scan
|
||||
table_prefix := build_table_prefix(table_name)
|
||||
defer delete(table_prefix)
|
||||
|
||||
// Seek to start position
|
||||
if start_key, has_start := exclusive_start_key.?; has_start {
|
||||
// Resume from pagination token
|
||||
rocksdb.rocksdb_iter_seek(iter, raw_data(start_key), c.size_t(len(start_key)))
|
||||
// Skip the start key itself (it's exclusive)
|
||||
if rocksdb.rocksdb_iter_valid(iter) != 0 {
|
||||
rocksdb.rocksdb_iter_next(iter)
|
||||
}
|
||||
} else {
|
||||
// Start from beginning of table
|
||||
rocksdb.rocksdb_iter_seek(iter, raw_data(table_prefix), c.size_t(len(table_prefix)))
|
||||
}
|
||||
|
||||
// Collect items
|
||||
items := make([dynamic]Item, context.temp_allocator)
|
||||
count := 0
|
||||
last_key: Maybe([]byte) = nil
|
||||
|
||||
for rocksdb.rocksdb_iter_valid(iter) != 0 {
|
||||
// Get current key
|
||||
key_len: c.size_t
|
||||
key_ptr := rocksdb.rocksdb_iter_key(iter, &key_len)
|
||||
key_bytes := key_ptr[:key_len]
|
||||
|
||||
// Check if key still has table prefix
|
||||
if !has_prefix(key_bytes, table_prefix) {
|
||||
break
|
||||
}
|
||||
|
||||
// Check limit
|
||||
if count >= limit {
|
||||
// Save this key as pagination token
|
||||
last_key = slice.clone(key_bytes, engine.allocator)
|
||||
break
|
||||
}
|
||||
|
||||
// Get value
|
||||
value_len: c.size_t
|
||||
value_ptr := rocksdb.rocksdb_iter_value(iter, &value_len)
|
||||
value_bytes := value_ptr[:value_len]
|
||||
|
||||
// Decode item
|
||||
item, decode_ok := decode(value_bytes)
|
||||
if !decode_ok {
|
||||
// Skip corrupted items
|
||||
rocksdb.rocksdb_iter_next(iter)
|
||||
continue
|
||||
}
|
||||
|
||||
append(&items, item)
|
||||
count += 1
|
||||
|
||||
// Move to next
|
||||
rocksdb.rocksdb_iter_next(iter)
|
||||
}
|
||||
|
||||
// Convert to slice (owned by caller's allocator)
|
||||
result_items := make([]Item, len(items), engine.allocator)
|
||||
copy(result_items, items[:])
|
||||
|
||||
return Scan_Result{
|
||||
items = result_items,
|
||||
last_evaluated_key = last_key,
|
||||
}, .None
|
||||
}
|
||||
|
||||
// Helper to check if a byte slice has a prefix
|
||||
has_prefix :: proc(data: []byte, prefix: []byte) -> bool {
|
||||
if len(data) < len(prefix) {
|
||||
return false
|
||||
}
|
||||
for i in 0..<len(prefix) {
|
||||
if data[i] != prefix[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// List tables (simplified - returns empty list for now)
|
||||
list_tables :: proc(engine: ^Storage_Engine) -> []string {
|
||||
// TODO: Implement by iterating over meta keys
|
||||
|
||||
Reference in New Issue
Block a user