flesh out the query stuff

This commit is contained in:
2026-02-15 15:04:43 -05:00
parent 94296ae925
commit 7a2f26b75d
5 changed files with 917 additions and 23 deletions

View File

@@ -2,9 +2,11 @@
package dynamodb
import "core:c"
import "core:encoding/json"
import "core:fmt"
import "core:mem"
import "core:slice"
import "core:strconv"
import "core:strings"
import "core:sync"
import "core:time"
@@ -95,6 +97,16 @@ table_metadata_get_partition_key_name :: proc(metadata: ^Table_Metadata) -> Mayb
return nil
}
// Get the attribute type for a given attribute name
table_metadata_get_attribute_type :: proc(metadata: ^Table_Metadata, attr_name: string) -> Maybe(Scalar_Attribute_Type) {
for ad in metadata.attribute_definitions {
if ad.attribute_name == attr_name {
return ad.attribute_type
}
}
return nil
}
// Get the sort key attribute name (if any)
table_metadata_get_sort_key_name :: proc(metadata: ^Table_Metadata) -> Maybe(string) {
for ks in metadata.key_schema {
@@ -229,12 +241,196 @@ deserialize_table_metadata :: proc(data: []byte, allocator: mem.Allocator) -> (T
metadata: Table_Metadata
// TODO: Parse KeySchema and AttributeDefinitions from JSON strings
// For now, return empty - this will be implemented when needed
// Parse table status
if status_val, found := meta_item["TableStatus"]; found {
#partial switch v in status_val {
case String:
metadata.table_status = table_status_from_string(string(v))
}
} else {
metadata.table_status = .ACTIVE
}
// Parse creation date time
if time_val, found := meta_item["CreationDateTime"]; found {
#partial switch v in time_val {
case Number:
val, parse_ok := strconv.parse_i64(string(v))
metadata.creation_date_time = val if parse_ok else 0
}
}
// Parse KeySchema from embedded JSON string
if ks_val, found := meta_item["KeySchema"]; found {
#partial switch v in ks_val {
case String:
ks, ks_ok := parse_key_schema_json(string(v), allocator)
if ks_ok {
metadata.key_schema = ks
}
}
}
// Parse AttributeDefinitions from embedded JSON string
if ad_val, found := meta_item["AttributeDefinitions"]; found {
#partial switch v in ad_val {
case String:
ad, ad_ok := parse_attr_defs_json(string(v), allocator)
if ad_ok {
metadata.attribute_definitions = ad
}
}
}
return metadata, true
}
// Parse key schema from JSON string like [{"AttributeName":"id","KeyType":"HASH"}]
parse_key_schema_json :: proc(json_str: string, allocator: mem.Allocator) -> ([]Key_Schema_Element, bool) {
data, parse_err := json.parse(transmute([]byte)json_str, allocator = context.temp_allocator)
if parse_err != nil {
return nil, false
}
defer json.destroy_value(data)
arr, ok := data.(json.Array)
if !ok {
return nil, false
}
result := make([]Key_Schema_Element, len(arr), allocator)
for elem, i in arr {
obj, obj_ok := elem.(json.Object)
if !obj_ok {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_name_val, name_found := obj["AttributeName"]
if !name_found {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_name, name_ok := attr_name_val.(json.String)
if !name_ok {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
key_type_val, type_found := obj["KeyType"]
if !type_found {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
key_type_str, type_ok := key_type_val.(json.String)
if !type_ok {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
kt, kt_ok := key_type_from_string(string(key_type_str))
if !kt_ok {
cleanup_key_schema(result[:i], allocator)
delete(result, allocator)
return nil, false
}
result[i] = Key_Schema_Element{
attribute_name = strings.clone(string(attr_name), allocator),
key_type = kt,
}
}
return result, true
}
cleanup_key_schema :: proc(elems: []Key_Schema_Element, allocator: mem.Allocator) {
for ks in elems {
delete(ks.attribute_name, allocator)
}
}
// Parse attribute definitions from JSON string
parse_attr_defs_json :: proc(json_str: string, allocator: mem.Allocator) -> ([]Attribute_Definition, bool) {
data, parse_err := json.parse(transmute([]byte)json_str, allocator = context.temp_allocator)
if parse_err != nil {
return nil, false
}
defer json.destroy_value(data)
arr, ok := data.(json.Array)
if !ok {
return nil, false
}
result := make([]Attribute_Definition, len(arr), allocator)
for elem, i in arr {
obj, obj_ok := elem.(json.Object)
if !obj_ok {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_name_val, name_found := obj["AttributeName"]
if !name_found {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_name, name_ok := attr_name_val.(json.String)
if !name_ok {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_type_val, type_found := obj["AttributeType"]
if !type_found {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
attr_type_str, type_ok := attr_type_val.(json.String)
if !type_ok {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
at, at_ok := scalar_type_from_string(string(attr_type_str))
if !at_ok {
cleanup_attr_defs(result[:i], allocator)
delete(result, allocator)
return nil, false
}
result[i] = Attribute_Definition{
attribute_name = strings.clone(string(attr_name), allocator),
attribute_type = at,
}
}
return result, true
}
cleanup_attr_defs :: proc(elems: []Attribute_Definition, allocator: mem.Allocator) {
for ad in elems {
delete(ad.attribute_name, allocator)
}
}
// Get table metadata
get_table_metadata :: proc(engine: ^Storage_Engine, table_name: string) -> (Table_Metadata, Storage_Error) {
meta_key := build_meta_key(table_name)
@@ -616,6 +812,93 @@ scan :: proc(
}, .None
}
// Query items by partition key with optional pagination
query :: proc(
engine: ^Storage_Engine,
table_name: string,
partition_key_value: []byte,
exclusive_start_key: Maybe([]byte),
limit: int,
) -> (Query_Result, Storage_Error) {
table_lock := get_or_create_table_lock(engine, table_name)
sync.rw_mutex_shared_lock(table_lock)
defer sync.rw_mutex_shared_unlock(table_lock)
// Verify table exists
metadata, meta_err := get_table_metadata(engine, table_name)
if meta_err != .None {
return {}, meta_err
}
defer table_metadata_destroy(&metadata, engine.allocator)
// Build partition prefix
prefix := build_partition_prefix(table_name, partition_key_value)
defer delete(prefix)
iter, iter_err := rocksdb.iter_create(&engine.db)
if iter_err != .None {
return {}, .RocksDB_Error
}
defer rocksdb.iter_destroy(&iter)
max_items := limit if limit > 0 else 1_000_000
// Seek to start position
if start_key, has_start := exclusive_start_key.?; has_start {
if has_prefix(start_key, prefix) {
rocksdb.iter_seek(&iter, start_key)
if rocksdb.iter_valid(&iter) {
rocksdb.iter_next(&iter)
}
} else {
rocksdb.iter_seek(&iter, prefix)
}
} else {
rocksdb.iter_seek(&iter, prefix)
}
items := make([dynamic]Item)
count := 0
last_key: Maybe([]byte) = nil
for rocksdb.iter_valid(&iter) {
key := rocksdb.iter_key(&iter)
if key == nil || !has_prefix(key, prefix) {
break
}
// Hit limit — save this key as pagination token and stop
if count >= max_items {
last_key = slice.clone(key)
break
}
value := rocksdb.iter_value(&iter)
if value == nil {
rocksdb.iter_next(&iter)
continue
}
item, decode_ok := decode(value)
if !decode_ok {
rocksdb.iter_next(&iter)
continue
}
append(&items, item)
count += 1
rocksdb.iter_next(&iter)
}
result_items := make([]Item, len(items))
copy(result_items, items[:])
return Query_Result{
items = result_items,
last_evaluated_key = last_key,
}, .None
}
// Helper to check if a byte slice has a prefix
has_prefix :: proc(data: []byte, prefix: []byte) -> bool {
if len(data) < len(prefix) {
@@ -629,8 +912,39 @@ has_prefix :: proc(data: []byte, prefix: []byte) -> bool {
return true
}
// List tables (simplified - returns empty list for now)
list_tables :: proc(engine: ^Storage_Engine) -> []string {
// TODO: Implement by iterating over meta keys
return {}
}
// List tables by iterating over meta keys in RocksDB
list_tables :: proc(engine: ^Storage_Engine) -> ([]string, Storage_Error) {
iter, iter_err := rocksdb.iter_create(&engine.db)
if iter_err != .None {
return nil, .RocksDB_Error
}
defer rocksdb.iter_destroy(&iter)
meta_prefix := []byte{u8(Entity_Type.Meta)}
rocksdb.iter_seek(&iter, meta_prefix)
tables := make([dynamic]string)
for rocksdb.iter_valid(&iter) {
key := rocksdb.iter_key(&iter)
if key == nil || len(key) == 0 || key[0] != u8(Entity_Type.Meta) {
break
}
decoder := Key_Decoder{data = key, pos = 0}
_, et_ok := decoder_read_entity_type(&decoder)
if !et_ok {
break
}
tbl_name_bytes, seg_ok := decoder_read_segment_borrowed(&decoder)
if !seg_ok {
break
}
append(&tables, strings.clone(string(tbl_name_bytes)))
rocksdb.iter_next(&iter)
}
return tables[:], .None
}