#!/usr/bin/env python3 """ JormunDB SDK Compatibility Test Suite Runs a comprehensive set of DynamoDB operations via boto3 against a live JormunDB instance and reports pass/fail for each test. """ import os import sys import time import traceback from contextlib import contextmanager from decimal import Decimal import boto3 from botocore.config import Config from botocore.exceptions import ClientError # ============================================================================ # Configuration # ============================================================================ ENDPOINT = os.environ.get("JORMUN_ENDPOINT", "http://localhost:8002") client = boto3.client( "dynamodb", endpoint_url=ENDPOINT, region_name="us-east-1", aws_access_key_id="local", aws_secret_access_key="local", config=Config(retries={"max_attempts": 0}), ) resource = boto3.resource( "dynamodb", endpoint_url=ENDPOINT, region_name="us-east-1", aws_access_key_id="local", aws_secret_access_key="local", config=Config(retries={"max_attempts": 0}), ) # ============================================================================ # Test framework # ============================================================================ results = [] def test(name): """Decorator that registers and runs a test, catching exceptions.""" def decorator(fn): def wrapper(): try: fn() results.append(("PASS", name, None)) except AssertionError as e: results.append(("FAIL", name, str(e))) except ClientError as e: code = e.response["Error"]["Code"] msg = e.response["Error"]["Message"] results.append(("FAIL", name, f"{code}: {msg}")) except Exception as e: results.append(("FAIL", name, f"{type(e).__name__}: {e}")) wrapper._test_name = name wrapper._test_fn = fn return wrapper return decorator def cleanup_table(table_name): """Delete a table, ignoring errors if it doesn't exist.""" try: client.delete_table(TableName=table_name) except ClientError: pass def assert_eq(actual, expected, msg=""): label = f" — {msg}" if msg else "" assert actual == expected, f"expected {expected!r}, got {actual!r}{label}" def assert_in(value, container, msg=""): label = f" — {msg}" if msg else "" assert value in container, f"{value!r} not found in {container!r}{label}" # ============================================================================ # 1. Table Operations # ============================================================================ @test("CreateTable — hash only") def _(): cleanup_table("T_HashOnly") resp = client.create_table( TableName="T_HashOnly", KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], BillingMode="PAY_PER_REQUEST", ) assert_eq(resp["TableDescription"]["TableName"], "T_HashOnly") @test("CreateTable — hash + range") def _(): cleanup_table("T_HashRange") resp = client.create_table( TableName="T_HashRange", KeySchema=[ {"AttributeName": "pk", "KeyType": "HASH"}, {"AttributeName": "sk", "KeyType": "RANGE"}, ], AttributeDefinitions=[ {"AttributeName": "pk", "AttributeType": "S"}, {"AttributeName": "sk", "AttributeType": "S"}, ], BillingMode="PAY_PER_REQUEST", ) assert_eq(resp["TableDescription"]["TableName"], "T_HashRange") @test("CreateTable — duplicate should fail ResourceInUseException") def _(): try: client.create_table( TableName="T_HashOnly", KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], BillingMode="PAY_PER_REQUEST", ) assert False, "Should have raised" except ClientError as e: assert_eq(e.response["Error"]["Code"], "ResourceInUseException") @test("ListTables — returns created tables") def _(): resp = client.list_tables() names = resp["TableNames"] assert_in("T_HashOnly", names) assert_in("T_HashRange", names) @test("DescribeTable — returns key schema") def _(): resp = client.describe_table(TableName="T_HashRange") table = resp["Table"] assert_eq(table["TableName"], "T_HashRange") key_types = {ks["KeyType"] for ks in table["KeySchema"]} assert_in("HASH", key_types) assert_in("RANGE", key_types) @test("DeleteTable — success") def _(): cleanup_table("T_DeleteMe") client.create_table( TableName="T_DeleteMe", KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], BillingMode="PAY_PER_REQUEST", ) client.delete_table(TableName="T_DeleteMe") resp = client.list_tables() assert "T_DeleteMe" not in resp["TableNames"] @test("DeleteTable — nonexistent should fail ResourceNotFoundException") def _(): try: client.delete_table(TableName="T_NoSuchTable_XYZ") assert False, "Should have raised" except ClientError as e: assert_eq(e.response["Error"]["Code"], "ResourceNotFoundException") # ============================================================================ # 2. PutItem / GetItem / DeleteItem # ============================================================================ @test("PutItem + GetItem — string key") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "user1"}, "name": {"S": "Alice"}, "age": {"N": "30"}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "user1"}}) item = resp["Item"] assert_eq(item["pk"]["S"], "user1") assert_eq(item["name"]["S"], "Alice") assert_eq(item["age"]["N"], "30") @test("PutItem + GetItem — composite key") def _(): client.put_item( TableName="T_HashRange", Item={"pk": {"S": "order1"}, "sk": {"S": "item#1"}, "qty": {"N": "5"}}, ) resp = client.get_item( TableName="T_HashRange", Key={"pk": {"S": "order1"}, "sk": {"S": "item#1"}}, ) assert_eq(resp["Item"]["qty"]["N"], "5") @test("PutItem — overwrite existing item") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "user1"}, "name": {"S": "Alice Updated"}, "age": {"N": "31"}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "user1"}}) assert_eq(resp["Item"]["name"]["S"], "Alice Updated") assert_eq(resp["Item"]["age"]["N"], "31") @test("GetItem — nonexistent key returns empty") def _(): resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "no_such_key"}}) assert "Item" not in resp @test("DeleteItem — removes item") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "to_delete"}, "data": {"S": "bye"}}, ) client.delete_item(TableName="T_HashOnly", Key={"pk": {"S": "to_delete"}}) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "to_delete"}}) assert "Item" not in resp @test("DeleteItem — nonexistent key is no-op (no error)") def _(): client.delete_item(TableName="T_HashOnly", Key={"pk": {"S": "never_existed"}}) # ============================================================================ # 3. Data Types # ============================================================================ @test("PutItem + GetItem — all scalar types (S, N, B, BOOL, NULL)") def _(): import base64 b64 = base64.b64encode(b"binary data").decode() client.put_item( TableName="T_HashOnly", Item={ "pk": {"S": "types_test"}, "str_val": {"S": "hello"}, "num_val": {"N": "42.5"}, "bin_val": {"B": b64}, "bool_val": {"BOOL": True}, "null_val": {"NULL": True}, }, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "types_test"}}) item = resp["Item"] assert_eq(item["str_val"]["S"], "hello") assert_eq(item["num_val"]["N"], "42.5") assert item["bool_val"]["BOOL"] is True assert item["null_val"]["NULL"] is True @test("PutItem + GetItem — list and map types") def _(): client.put_item( TableName="T_HashOnly", Item={ "pk": {"S": "complex_test"}, "tags": {"L": [{"S": "a"}, {"S": "b"}, {"N": "3"}]}, "meta": {"M": {"nested_key": {"S": "nested_val"}, "count": {"N": "10"}}}, }, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "complex_test"}}) item = resp["Item"] assert_eq(len(item["tags"]["L"]), 3) assert_eq(item["meta"]["M"]["nested_key"]["S"], "nested_val") @test("PutItem + GetItem — string set and number set") def _(): client.put_item( TableName="T_HashOnly", Item={ "pk": {"S": "set_test"}, "colors": {"SS": ["red", "blue", "green"]}, "scores": {"NS": ["100", "200", "300"]}, }, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "set_test"}}) item = resp["Item"] assert_eq(set(item["colors"]["SS"]), {"red", "blue", "green"}) assert_eq(set(item["scores"]["NS"]), {"100", "200", "300"}) # ============================================================================ # 4. UpdateItem # ============================================================================ @test("UpdateItem — SET new attribute") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "update1"}, "name": {"S": "Bob"}}, ) client.update_item( TableName="T_HashOnly", Key={"pk": {"S": "update1"}}, UpdateExpression="SET age = :a", ExpressionAttributeValues={":a": {"N": "25"}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "update1"}}) assert_eq(resp["Item"]["age"]["N"], "25") assert_eq(resp["Item"]["name"]["S"], "Bob") @test("UpdateItem — SET overwrite existing attribute") def _(): client.update_item( TableName="T_HashOnly", Key={"pk": {"S": "update1"}}, UpdateExpression="SET #n = :n", ExpressionAttributeNames={"#n": "name"}, ExpressionAttributeValues={":n": {"S": "Bobby"}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "update1"}}) assert_eq(resp["Item"]["name"]["S"], "Bobby") @test("UpdateItem — REMOVE attribute") def _(): client.update_item( TableName="T_HashOnly", Key={"pk": {"S": "update1"}}, UpdateExpression="REMOVE age", ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "update1"}}) assert "age" not in resp["Item"] @test("UpdateItem — SET with ADD arithmetic") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "counter"}, "count": {"N": "10"}}, ) client.update_item( TableName="T_HashOnly", Key={"pk": {"S": "counter"}}, UpdateExpression="SET #c = #c + :inc", ExpressionAttributeNames={"#c": "count"}, ExpressionAttributeValues={":inc": {"N": "5"}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "counter"}}) assert_eq(resp["Item"]["count"]["N"], "15") @test("UpdateItem — ReturnValues ALL_NEW") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "rv_test"}, "x": {"N": "1"}}, ) resp = client.update_item( TableName="T_HashOnly", Key={"pk": {"S": "rv_test"}}, UpdateExpression="SET x = :v", ExpressionAttributeValues={":v": {"N": "99"}}, ReturnValues="ALL_NEW", ) assert_eq(resp["Attributes"]["x"]["N"], "99") @test("UpdateItem — ReturnValues ALL_OLD") def _(): resp = client.update_item( TableName="T_HashOnly", Key={"pk": {"S": "rv_test"}}, UpdateExpression="SET x = :v", ExpressionAttributeValues={":v": {"N": "200"}}, ReturnValues="ALL_OLD", ) assert_eq(resp["Attributes"]["x"]["N"], "99") @test("UpdateItem — upsert (item doesn't exist yet)") def _(): cleanup_key("T_HashOnly", {"pk": {"S": "upserted"}}) client.update_item( TableName="T_HashOnly", Key={"pk": {"S": "upserted"}}, UpdateExpression="SET greeting = :g", ExpressionAttributeValues={":g": {"S": "hello"}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "upserted"}}) assert_eq(resp["Item"]["greeting"]["S"], "hello") # ============================================================================ # 5. ConditionExpression # ============================================================================ @test("PutItem — ConditionExpression attribute_not_exists succeeds on new item") def _(): cleanup_key("T_HashOnly", {"pk": {"S": "cond1"}}) client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "cond1"}, "val": {"S": "first"}}, ConditionExpression="attribute_not_exists(pk)", ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "cond1"}}) assert_eq(resp["Item"]["val"]["S"], "first") @test("PutItem — ConditionExpression attribute_not_exists fails on existing item") def _(): try: client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "cond1"}, "val": {"S": "second"}}, ConditionExpression="attribute_not_exists(pk)", ) assert False, "Should have raised ConditionalCheckFailedException" except ClientError as e: assert_eq(e.response["Error"]["Code"], "ConditionalCheckFailedException") @test("DeleteItem — ConditionExpression attribute_exists") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "cond_del"}, "status": {"S": "active"}}, ) client.delete_item( TableName="T_HashOnly", Key={"pk": {"S": "cond_del"}}, ConditionExpression="attribute_exists(#s)", ExpressionAttributeNames={"#s": "status"}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "cond_del"}}) assert "Item" not in resp @test("UpdateItem — ConditionExpression comparison") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "cond_upd"}, "score": {"N": "50"}}, ) # Should succeed: score = 50 > 10 client.update_item( TableName="T_HashOnly", Key={"pk": {"S": "cond_upd"}}, UpdateExpression="SET score = :new", ConditionExpression="score > :min", ExpressionAttributeValues={":new": {"N": "100"}, ":min": {"N": "10"}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "cond_upd"}}) assert_eq(resp["Item"]["score"]["N"], "100") # ============================================================================ # 6. Query # ============================================================================ @test("Query — partition key only") def _(): # Seed data for i in range(5): client.put_item( TableName="T_HashRange", Item={"pk": {"S": "q_test"}, "sk": {"S": f"item#{i:03d}"}, "val": {"N": str(i)}}, ) resp = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk", ExpressionAttributeValues={":pk": {"S": "q_test"}}, ) assert_eq(resp["Count"], 5) @test("Query — with sort key begins_with") def _(): resp = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk AND begins_with(sk, :prefix)", ExpressionAttributeValues={ ":pk": {"S": "q_test"}, ":prefix": {"S": "item#00"}, }, ) # item#000 through item#009 — but we only have 000..004 assert resp["Count"] >= 1 @test("Query — with sort key BETWEEN") def _(): resp = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk AND sk BETWEEN :lo AND :hi", ExpressionAttributeValues={ ":pk": {"S": "q_test"}, ":lo": {"S": "item#001"}, ":hi": {"S": "item#003"}, }, ) assert_eq(resp["Count"], 3) @test("Query — with Limit") def _(): resp = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk", ExpressionAttributeValues={":pk": {"S": "q_test"}}, Limit=2, ) assert_eq(resp["Count"], 2) assert "LastEvaluatedKey" in resp @test("Query — pagination with ExclusiveStartKey") def _(): # Get first page resp1 = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk", ExpressionAttributeValues={":pk": {"S": "q_test"}}, Limit=2, ) assert "LastEvaluatedKey" in resp1 # Get second page resp2 = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk", ExpressionAttributeValues={":pk": {"S": "q_test"}}, Limit=2, ExclusiveStartKey=resp1["LastEvaluatedKey"], ) # Second page items should differ from first page keys1 = {item["sk"]["S"] for item in resp1["Items"]} keys2 = {item["sk"]["S"] for item in resp2["Items"]} assert len(keys1 & keys2) == 0, "Pages should not overlap" @test("Query — with FilterExpression") def _(): resp = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk", FilterExpression="val > :min", ExpressionAttributeValues={ ":pk": {"S": "q_test"}, ":min": {"N": "2"}, }, ) # items 3, 4 have val > 2 assert_eq(resp["Count"], 2) assert resp["ScannedCount"] >= resp["Count"] @test("Query — with ProjectionExpression") def _(): resp = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk", ExpressionAttributeValues={":pk": {"S": "q_test"}}, ProjectionExpression="pk, sk", Limit=1, ) item = resp["Items"][0] assert "pk" in item assert "sk" in item assert "val" not in item # ============================================================================ # 7. Scan # ============================================================================ @test("Scan — returns all items") def _(): resp = client.scan(TableName="T_HashRange") assert resp["Count"] >= 5 @test("Scan — with Limit and pagination") def _(): resp = client.scan(TableName="T_HashRange", Limit=2) assert_eq(resp["Count"], 2) assert "LastEvaluatedKey" in resp @test("Scan — with FilterExpression") def _(): resp = client.scan( TableName="T_HashRange", FilterExpression="val >= :min", ExpressionAttributeValues={":min": {"N": "3"}}, ) for item in resp["Items"]: assert int(item["val"]["N"]) >= 3 # ============================================================================ # 8. BatchWriteItem / BatchGetItem # ============================================================================ @test("BatchWriteItem — put multiple items") def _(): cleanup_table("T_Batch") client.create_table( TableName="T_Batch", KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], BillingMode="PAY_PER_REQUEST", ) client.batch_write_item( RequestItems={ "T_Batch": [ {"PutRequest": {"Item": {"pk": {"S": f"b{i}"}, "val": {"N": str(i)}}}} for i in range(10) ] } ) resp = client.scan(TableName="T_Batch") assert_eq(resp["Count"], 10) @test("BatchWriteItem — delete items") def _(): client.batch_write_item( RequestItems={ "T_Batch": [ {"DeleteRequest": {"Key": {"pk": {"S": f"b{i}"}}}} for i in range(5) ] } ) resp = client.scan(TableName="T_Batch") assert_eq(resp["Count"], 5) @test("BatchWriteItem — exceeds 25 limit should fail") def _(): try: client.batch_write_item( RequestItems={ "T_Batch": [ {"PutRequest": {"Item": {"pk": {"S": f"overflow{i}"}}}} for i in range(26) ] } ) assert False, "Should have raised" except ClientError as e: assert_eq(e.response["Error"]["Code"], "ValidationException") @test("BatchGetItem — retrieve multiple items") def _(): resp = client.batch_get_item( RequestItems={ "T_Batch": { "Keys": [{"pk": {"S": f"b{i}"}} for i in range(5, 10)] } } ) items = resp["Responses"]["T_Batch"] assert_eq(len(items), 5) # ============================================================================ # 9. TransactWriteItems / TransactGetItems # ============================================================================ @test("TransactWriteItems — put multiple items atomically") def _(): cleanup_table("T_Txn") client.create_table( TableName="T_Txn", KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], BillingMode="PAY_PER_REQUEST", ) client.transact_write_items( TransactItems=[ {"Put": {"TableName": "T_Txn", "Item": {"pk": {"S": "tx1"}, "v": {"S": "a"}}}}, {"Put": {"TableName": "T_Txn", "Item": {"pk": {"S": "tx2"}, "v": {"S": "b"}}}}, ] ) resp = client.scan(TableName="T_Txn") assert_eq(resp["Count"], 2) @test("TransactWriteItems — with ConditionCheck") def _(): # Should succeed: tx1 exists client.transact_write_items( TransactItems=[ { "ConditionCheck": { "TableName": "T_Txn", "Key": {"pk": {"S": "tx1"}}, "ConditionExpression": "attribute_exists(pk)", } }, {"Put": {"TableName": "T_Txn", "Item": {"pk": {"S": "tx3"}, "v": {"S": "c"}}}}, ] ) resp = client.get_item(TableName="T_Txn", Key={"pk": {"S": "tx3"}}) assert "Item" in resp @test("TransactWriteItems — condition failure rolls back") def _(): try: client.transact_write_items( TransactItems=[ { "ConditionCheck": { "TableName": "T_Txn", "Key": {"pk": {"S": "nonexistent"}}, "ConditionExpression": "attribute_exists(pk)", } }, {"Put": {"TableName": "T_Txn", "Item": {"pk": {"S": "tx_should_not_exist"}}}}, ] ) assert False, "Should have raised" except ClientError as e: assert_in(e.response["Error"]["Code"], ["TransactionCanceledException", "ConditionalCheckFailedException"]) # Verify rollback resp = client.get_item(TableName="T_Txn", Key={"pk": {"S": "tx_should_not_exist"}}) assert "Item" not in resp @test("TransactGetItems — retrieve multiple items atomically") def _(): resp = client.transact_get_items( TransactItems=[ {"Get": {"TableName": "T_Txn", "Key": {"pk": {"S": "tx1"}}}}, {"Get": {"TableName": "T_Txn", "Key": {"pk": {"S": "tx2"}}}}, ] ) items = [r["Item"] for r in resp["Responses"] if "Item" in r] assert_eq(len(items), 2) # ============================================================================ # 10. Global Secondary Indexes (GSI) # ============================================================================ @test("CreateTable with GSI") def _(): cleanup_table("T_GSI") resp = client.create_table( TableName="T_GSI", KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], AttributeDefinitions=[ {"AttributeName": "pk", "AttributeType": "S"}, {"AttributeName": "email", "AttributeType": "S"}, ], GlobalSecondaryIndexes=[ { "IndexName": "email-index", "KeySchema": [{"AttributeName": "email", "KeyType": "HASH"}], "Projection": {"ProjectionType": "ALL"}, } ], BillingMode="PAY_PER_REQUEST", ) assert_eq(resp["TableDescription"]["TableName"], "T_GSI") @test("DescribeTable — shows GSI info") def _(): resp = client.describe_table(TableName="T_GSI") table = resp["Table"] assert "GlobalSecondaryIndexes" in table gsi = table["GlobalSecondaryIndexes"][0] assert_eq(gsi["IndexName"], "email-index") @test("GSI — Query by secondary key") def _(): # Seed items for i, email in enumerate(["alice@test.com", "bob@test.com", "alice@test.com"]): client.put_item( TableName="T_GSI", Item={ "pk": {"S": f"user{i}"}, "email": {"S": email}, "name": {"S": f"User{i}"}, }, ) resp = client.query( TableName="T_GSI", IndexName="email-index", KeyConditionExpression="email = :e", ExpressionAttributeValues={":e": {"S": "alice@test.com"}}, ) assert_eq(resp["Count"], 2) @test("GSI — Scan index") def _(): resp = client.scan(TableName="T_GSI", IndexName="email-index") assert resp["Count"] >= 3 @test("GSI — sparse index (item without GSI key not indexed)") def _(): client.put_item( TableName="T_GSI", Item={"pk": {"S": "no_email_user"}, "name": {"S": "Ghost"}}, ) resp = client.scan(TableName="T_GSI", IndexName="email-index") pks = {item["pk"]["S"] for item in resp["Items"]} assert "no_email_user" not in pks @test("GSI — updated item reflected in index") def _(): client.update_item( TableName="T_GSI", Key={"pk": {"S": "user0"}}, UpdateExpression="SET email = :e", ExpressionAttributeValues={":e": {"S": "newalice@test.com"}}, ) # Old email should no longer match user0 resp = client.query( TableName="T_GSI", IndexName="email-index", KeyConditionExpression="email = :e", ExpressionAttributeValues={":e": {"S": "alice@test.com"}}, ) pks = {item["pk"]["S"] for item in resp["Items"]} assert "user0" not in pks # New email should match user0 resp2 = client.query( TableName="T_GSI", IndexName="email-index", KeyConditionExpression="email = :e", ExpressionAttributeValues={":e": {"S": "newalice@test.com"}}, ) pks2 = {item["pk"]["S"] for item in resp2["Items"]} assert_in("user0", pks2) @test("GSI — deleted item removed from index") def _(): client.delete_item(TableName="T_GSI", Key={"pk": {"S": "user1"}}) resp = client.query( TableName="T_GSI", IndexName="email-index", KeyConditionExpression="email = :e", ExpressionAttributeValues={":e": {"S": "bob@test.com"}}, ) assert_eq(resp["Count"], 0) # ============================================================================ # 11. Edge Cases # ============================================================================ @test("PutItem — empty string attribute") def _(): client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "empty_str"}, "val": {"S": ""}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "empty_str"}}) assert_eq(resp["Item"]["val"]["S"], "") @test("PutItem — large item (~300KB)") def _(): big_val = "x" * 300_000 client.put_item( TableName="T_HashOnly", Item={"pk": {"S": "big_item"}, "data": {"S": big_val}}, ) resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "big_item"}}) assert_eq(len(resp["Item"]["data"]["S"]), 300_000) @test("PutItem — numeric key") def _(): cleanup_table("T_NumKey") client.create_table( TableName="T_NumKey", KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "N"}], BillingMode="PAY_PER_REQUEST", ) client.put_item( TableName="T_NumKey", Item={"id": {"N": "42"}, "data": {"S": "numeric key"}}, ) resp = client.get_item(TableName="T_NumKey", Key={"id": {"N": "42"}}) assert_eq(resp["Item"]["data"]["S"], "numeric key") @test("Query — nonexistent partition returns 0 items") def _(): resp = client.query( TableName="T_HashRange", KeyConditionExpression="pk = :pk", ExpressionAttributeValues={":pk": {"S": "no_such_partition"}}, ) assert_eq(resp["Count"], 0) # ============================================================================ # Helpers # ============================================================================ def cleanup_key(table_name, key): try: client.delete_item(TableName=table_name, Key=key) except ClientError: pass # ============================================================================ # Runner # ============================================================================ def main(): # Wait for JormunDB to be reachable print(f"\n{'='*60}") print(f" JormunDB Python SDK Test Suite") print(f" Endpoint: {ENDPOINT}") print(f"{'='*60}\n") print("Waiting for JormunDB...", end="", flush=True) for attempt in range(30): try: client.list_tables() print(" connected!\n") break except Exception: print(".", end="", flush=True) time.sleep(1) else: print("\n\nERROR: Could not connect to JormunDB at", ENDPOINT) sys.exit(1) # Collect all test functions from this module tests = [ obj for obj in globals().values() if callable(obj) and hasattr(obj, "_test_name") ] # Run tests in definition order for t in tests: sys.stdout.write(f" {t._test_name} ... ") sys.stdout.flush() t() status, name, err = results[-1] if status == "PASS": print("\033[32mPASS\033[0m") else: print(f"\033[31mFAIL\033[0m {err}") # Summary passed = sum(1 for s, _, _ in results if s == "PASS") failed = sum(1 for s, _, _ in results if s == "FAIL") total = len(results) print(f"\n{'='*60}") if failed == 0: print(f" \033[32mAll {total} tests passed!\033[0m") else: print(f" \033[32m{passed} passed\033[0m, \033[31m{failed} failed\033[0m out of {total}") print() for status, name, err in results: if status == "FAIL": print(f" \033[31m✗ {name}\033[0m") print(f" {err}") print(f"{'='*60}\n") # Cleanup test tables print("Cleaning up test tables...") for tbl in ["T_HashOnly", "T_HashRange", "T_Batch", "T_Txn", "T_GSI", "T_NumKey", "T_DeleteMe"]: cleanup_table(tbl) print("Done.\n") sys.exit(1 if failed else 0) if __name__ == "__main__": main()