Files
jormun-db/test_sdk.py

1002 lines
31 KiB
Python

#!/usr/bin/env python3
"""
JormunDB SDK Compatibility Test Suite
Runs a comprehensive set of DynamoDB operations via boto3 against a live
JormunDB instance and reports pass/fail for each test.
"""
import os
import sys
import time
import traceback
from contextlib import contextmanager
from decimal import Decimal
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
# ============================================================================
# Configuration
# ============================================================================
ENDPOINT = os.environ.get("JORMUN_ENDPOINT", "http://localhost:8002")
client = boto3.client(
"dynamodb",
endpoint_url=ENDPOINT,
region_name="us-east-1",
aws_access_key_id="local",
aws_secret_access_key="local",
config=Config(retries={"max_attempts": 0}),
)
resource = boto3.resource(
"dynamodb",
endpoint_url=ENDPOINT,
region_name="us-east-1",
aws_access_key_id="local",
aws_secret_access_key="local",
config=Config(retries={"max_attempts": 0}),
)
# ============================================================================
# Test framework
# ============================================================================
results = []
def test(name):
"""Decorator that registers and runs a test, catching exceptions."""
def decorator(fn):
def wrapper():
try:
fn()
results.append(("PASS", name, None))
except AssertionError as e:
results.append(("FAIL", name, str(e)))
except ClientError as e:
code = e.response["Error"]["Code"]
msg = e.response["Error"]["Message"]
results.append(("FAIL", name, f"{code}: {msg}"))
except Exception as e:
results.append(("FAIL", name, f"{type(e).__name__}: {e}"))
wrapper._test_name = name
wrapper._test_fn = fn
return wrapper
return decorator
def cleanup_table(table_name):
"""Delete a table, ignoring errors if it doesn't exist."""
try:
client.delete_table(TableName=table_name)
except ClientError:
pass
def assert_eq(actual, expected, msg=""):
label = f"{msg}" if msg else ""
assert actual == expected, f"expected {expected!r}, got {actual!r}{label}"
def assert_in(value, container, msg=""):
label = f"{msg}" if msg else ""
assert value in container, f"{value!r} not found in {container!r}{label}"
# ============================================================================
# 1. Table Operations
# ============================================================================
@test("CreateTable — hash only")
def _():
cleanup_table("T_HashOnly")
resp = client.create_table(
TableName="T_HashOnly",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
assert_eq(resp["TableDescription"]["TableName"], "T_HashOnly")
@test("CreateTable — hash + range")
def _():
cleanup_table("T_HashRange")
resp = client.create_table(
TableName="T_HashRange",
KeySchema=[
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
)
assert_eq(resp["TableDescription"]["TableName"], "T_HashRange")
@test("CreateTable — duplicate should fail ResourceInUseException")
def _():
try:
client.create_table(
TableName="T_HashOnly",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
assert False, "Should have raised"
except ClientError as e:
assert_eq(e.response["Error"]["Code"], "ResourceInUseException")
@test("ListTables — returns created tables")
def _():
resp = client.list_tables()
names = resp["TableNames"]
assert_in("T_HashOnly", names)
assert_in("T_HashRange", names)
@test("DescribeTable — returns key schema")
def _():
resp = client.describe_table(TableName="T_HashRange")
table = resp["Table"]
assert_eq(table["TableName"], "T_HashRange")
key_types = {ks["KeyType"] for ks in table["KeySchema"]}
assert_in("HASH", key_types)
assert_in("RANGE", key_types)
@test("DeleteTable — success")
def _():
cleanup_table("T_DeleteMe")
client.create_table(
TableName="T_DeleteMe",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
client.delete_table(TableName="T_DeleteMe")
resp = client.list_tables()
assert "T_DeleteMe" not in resp["TableNames"]
@test("DeleteTable — nonexistent should fail ResourceNotFoundException")
def _():
try:
client.delete_table(TableName="T_NoSuchTable_XYZ")
assert False, "Should have raised"
except ClientError as e:
assert_eq(e.response["Error"]["Code"], "ResourceNotFoundException")
# ============================================================================
# 2. PutItem / GetItem / DeleteItem
# ============================================================================
@test("PutItem + GetItem — string key")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "user1"}, "name": {"S": "Alice"}, "age": {"N": "30"}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "user1"}})
item = resp["Item"]
assert_eq(item["pk"]["S"], "user1")
assert_eq(item["name"]["S"], "Alice")
assert_eq(item["age"]["N"], "30")
@test("PutItem + GetItem — composite key")
def _():
client.put_item(
TableName="T_HashRange",
Item={"pk": {"S": "order1"}, "sk": {"S": "item#1"}, "qty": {"N": "5"}},
)
resp = client.get_item(
TableName="T_HashRange",
Key={"pk": {"S": "order1"}, "sk": {"S": "item#1"}},
)
assert_eq(resp["Item"]["qty"]["N"], "5")
@test("PutItem — overwrite existing item")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "user1"}, "name": {"S": "Alice Updated"}, "age": {"N": "31"}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "user1"}})
assert_eq(resp["Item"]["name"]["S"], "Alice Updated")
assert_eq(resp["Item"]["age"]["N"], "31")
@test("GetItem — nonexistent key returns empty")
def _():
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "no_such_key"}})
assert "Item" not in resp
@test("DeleteItem — removes item")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "to_delete"}, "data": {"S": "bye"}},
)
client.delete_item(TableName="T_HashOnly", Key={"pk": {"S": "to_delete"}})
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "to_delete"}})
assert "Item" not in resp
@test("DeleteItem — nonexistent key is no-op (no error)")
def _():
client.delete_item(TableName="T_HashOnly", Key={"pk": {"S": "never_existed"}})
# ============================================================================
# 3. Data Types
# ============================================================================
@test("PutItem + GetItem — all scalar types (S, N, B, BOOL, NULL)")
def _():
import base64
b64 = base64.b64encode(b"binary data").decode()
client.put_item(
TableName="T_HashOnly",
Item={
"pk": {"S": "types_test"},
"str_val": {"S": "hello"},
"num_val": {"N": "42.5"},
"bin_val": {"B": b64},
"bool_val": {"BOOL": True},
"null_val": {"NULL": True},
},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "types_test"}})
item = resp["Item"]
assert_eq(item["str_val"]["S"], "hello")
assert_eq(item["num_val"]["N"], "42.5")
assert item["bool_val"]["BOOL"] is True
assert item["null_val"]["NULL"] is True
@test("PutItem + GetItem — list and map types")
def _():
client.put_item(
TableName="T_HashOnly",
Item={
"pk": {"S": "complex_test"},
"tags": {"L": [{"S": "a"}, {"S": "b"}, {"N": "3"}]},
"meta": {"M": {"nested_key": {"S": "nested_val"}, "count": {"N": "10"}}},
},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "complex_test"}})
item = resp["Item"]
assert_eq(len(item["tags"]["L"]), 3)
assert_eq(item["meta"]["M"]["nested_key"]["S"], "nested_val")
@test("PutItem + GetItem — string set and number set")
def _():
client.put_item(
TableName="T_HashOnly",
Item={
"pk": {"S": "set_test"},
"colors": {"SS": ["red", "blue", "green"]},
"scores": {"NS": ["100", "200", "300"]},
},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "set_test"}})
item = resp["Item"]
assert_eq(set(item["colors"]["SS"]), {"red", "blue", "green"})
assert_eq(set(item["scores"]["NS"]), {"100", "200", "300"})
# ============================================================================
# 4. UpdateItem
# ============================================================================
@test("UpdateItem — SET new attribute")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "update1"}, "name": {"S": "Bob"}},
)
client.update_item(
TableName="T_HashOnly",
Key={"pk": {"S": "update1"}},
UpdateExpression="SET age = :a",
ExpressionAttributeValues={":a": {"N": "25"}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "update1"}})
assert_eq(resp["Item"]["age"]["N"], "25")
assert_eq(resp["Item"]["name"]["S"], "Bob")
@test("UpdateItem — SET overwrite existing attribute")
def _():
client.update_item(
TableName="T_HashOnly",
Key={"pk": {"S": "update1"}},
UpdateExpression="SET #n = :n",
ExpressionAttributeNames={"#n": "name"},
ExpressionAttributeValues={":n": {"S": "Bobby"}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "update1"}})
assert_eq(resp["Item"]["name"]["S"], "Bobby")
@test("UpdateItem — REMOVE attribute")
def _():
client.update_item(
TableName="T_HashOnly",
Key={"pk": {"S": "update1"}},
UpdateExpression="REMOVE age",
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "update1"}})
assert "age" not in resp["Item"]
@test("UpdateItem — SET with ADD arithmetic")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "counter"}, "count": {"N": "10"}},
)
client.update_item(
TableName="T_HashOnly",
Key={"pk": {"S": "counter"}},
UpdateExpression="SET #c = #c + :inc",
ExpressionAttributeNames={"#c": "count"},
ExpressionAttributeValues={":inc": {"N": "5"}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "counter"}})
assert_eq(resp["Item"]["count"]["N"], "15")
@test("UpdateItem — ReturnValues ALL_NEW")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "rv_test"}, "x": {"N": "1"}},
)
resp = client.update_item(
TableName="T_HashOnly",
Key={"pk": {"S": "rv_test"}},
UpdateExpression="SET x = :v",
ExpressionAttributeValues={":v": {"N": "99"}},
ReturnValues="ALL_NEW",
)
assert_eq(resp["Attributes"]["x"]["N"], "99")
@test("UpdateItem — ReturnValues ALL_OLD")
def _():
resp = client.update_item(
TableName="T_HashOnly",
Key={"pk": {"S": "rv_test"}},
UpdateExpression="SET x = :v",
ExpressionAttributeValues={":v": {"N": "200"}},
ReturnValues="ALL_OLD",
)
assert_eq(resp["Attributes"]["x"]["N"], "99")
@test("UpdateItem — upsert (item doesn't exist yet)")
def _():
cleanup_key("T_HashOnly", {"pk": {"S": "upserted"}})
client.update_item(
TableName="T_HashOnly",
Key={"pk": {"S": "upserted"}},
UpdateExpression="SET greeting = :g",
ExpressionAttributeValues={":g": {"S": "hello"}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "upserted"}})
assert_eq(resp["Item"]["greeting"]["S"], "hello")
# ============================================================================
# 5. ConditionExpression
# ============================================================================
@test("PutItem — ConditionExpression attribute_not_exists succeeds on new item")
def _():
cleanup_key("T_HashOnly", {"pk": {"S": "cond1"}})
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "cond1"}, "val": {"S": "first"}},
ConditionExpression="attribute_not_exists(pk)",
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "cond1"}})
assert_eq(resp["Item"]["val"]["S"], "first")
@test("PutItem — ConditionExpression attribute_not_exists fails on existing item")
def _():
try:
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "cond1"}, "val": {"S": "second"}},
ConditionExpression="attribute_not_exists(pk)",
)
assert False, "Should have raised ConditionalCheckFailedException"
except ClientError as e:
assert_eq(e.response["Error"]["Code"], "ConditionalCheckFailedException")
@test("DeleteItem — ConditionExpression attribute_exists")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "cond_del"}, "status": {"S": "active"}},
)
client.delete_item(
TableName="T_HashOnly",
Key={"pk": {"S": "cond_del"}},
ConditionExpression="attribute_exists(#s)",
ExpressionAttributeNames={"#s": "status"},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "cond_del"}})
assert "Item" not in resp
@test("UpdateItem — ConditionExpression comparison")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "cond_upd"}, "score": {"N": "50"}},
)
# Should succeed: score = 50 > 10
client.update_item(
TableName="T_HashOnly",
Key={"pk": {"S": "cond_upd"}},
UpdateExpression="SET score = :new",
ConditionExpression="score > :min",
ExpressionAttributeValues={":new": {"N": "100"}, ":min": {"N": "10"}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "cond_upd"}})
assert_eq(resp["Item"]["score"]["N"], "100")
# ============================================================================
# 6. Query
# ============================================================================
@test("Query — partition key only")
def _():
# Seed data
for i in range(5):
client.put_item(
TableName="T_HashRange",
Item={"pk": {"S": "q_test"}, "sk": {"S": f"item#{i:03d}"}, "val": {"N": str(i)}},
)
resp = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "q_test"}},
)
assert_eq(resp["Count"], 5)
@test("Query — with sort key begins_with")
def _():
resp = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk AND begins_with(sk, :prefix)",
ExpressionAttributeValues={
":pk": {"S": "q_test"},
":prefix": {"S": "item#00"},
},
)
# item#000 through item#009 — but we only have 000..004
assert resp["Count"] >= 1
@test("Query — with sort key BETWEEN")
def _():
resp = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk AND sk BETWEEN :lo AND :hi",
ExpressionAttributeValues={
":pk": {"S": "q_test"},
":lo": {"S": "item#001"},
":hi": {"S": "item#003"},
},
)
assert_eq(resp["Count"], 3)
@test("Query — with Limit")
def _():
resp = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "q_test"}},
Limit=2,
)
assert_eq(resp["Count"], 2)
assert "LastEvaluatedKey" in resp
@test("Query — pagination with ExclusiveStartKey")
def _():
# Get first page
resp1 = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "q_test"}},
Limit=2,
)
assert "LastEvaluatedKey" in resp1
# Get second page
resp2 = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "q_test"}},
Limit=2,
ExclusiveStartKey=resp1["LastEvaluatedKey"],
)
# Second page items should differ from first page
keys1 = {item["sk"]["S"] for item in resp1["Items"]}
keys2 = {item["sk"]["S"] for item in resp2["Items"]}
assert len(keys1 & keys2) == 0, "Pages should not overlap"
@test("Query — with FilterExpression")
def _():
resp = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk",
FilterExpression="val > :min",
ExpressionAttributeValues={
":pk": {"S": "q_test"},
":min": {"N": "2"},
},
)
# items 3, 4 have val > 2
assert_eq(resp["Count"], 2)
assert resp["ScannedCount"] >= resp["Count"]
@test("Query — with ProjectionExpression")
def _():
resp = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "q_test"}},
ProjectionExpression="pk, sk",
Limit=1,
)
item = resp["Items"][0]
assert "pk" in item
assert "sk" in item
assert "val" not in item
# ============================================================================
# 7. Scan
# ============================================================================
@test("Scan — returns all items")
def _():
resp = client.scan(TableName="T_HashRange")
assert resp["Count"] >= 5
@test("Scan — with Limit and pagination")
def _():
resp = client.scan(TableName="T_HashRange", Limit=2)
assert_eq(resp["Count"], 2)
assert "LastEvaluatedKey" in resp
@test("Scan — with FilterExpression")
def _():
resp = client.scan(
TableName="T_HashRange",
FilterExpression="val >= :min",
ExpressionAttributeValues={":min": {"N": "3"}},
)
for item in resp["Items"]:
assert int(item["val"]["N"]) >= 3
# ============================================================================
# 8. BatchWriteItem / BatchGetItem
# ============================================================================
@test("BatchWriteItem — put multiple items")
def _():
cleanup_table("T_Batch")
client.create_table(
TableName="T_Batch",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
client.batch_write_item(
RequestItems={
"T_Batch": [
{"PutRequest": {"Item": {"pk": {"S": f"b{i}"}, "val": {"N": str(i)}}}}
for i in range(10)
]
}
)
resp = client.scan(TableName="T_Batch")
assert_eq(resp["Count"], 10)
@test("BatchWriteItem — delete items")
def _():
client.batch_write_item(
RequestItems={
"T_Batch": [
{"DeleteRequest": {"Key": {"pk": {"S": f"b{i}"}}}}
for i in range(5)
]
}
)
resp = client.scan(TableName="T_Batch")
assert_eq(resp["Count"], 5)
@test("BatchWriteItem — exceeds 25 limit should fail")
def _():
try:
client.batch_write_item(
RequestItems={
"T_Batch": [
{"PutRequest": {"Item": {"pk": {"S": f"overflow{i}"}}}}
for i in range(26)
]
}
)
assert False, "Should have raised"
except ClientError as e:
assert_eq(e.response["Error"]["Code"], "ValidationException")
@test("BatchGetItem — retrieve multiple items")
def _():
resp = client.batch_get_item(
RequestItems={
"T_Batch": {
"Keys": [{"pk": {"S": f"b{i}"}} for i in range(5, 10)]
}
}
)
items = resp["Responses"]["T_Batch"]
assert_eq(len(items), 5)
# ============================================================================
# 9. TransactWriteItems / TransactGetItems
# ============================================================================
@test("TransactWriteItems — put multiple items atomically")
def _():
cleanup_table("T_Txn")
client.create_table(
TableName="T_Txn",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
client.transact_write_items(
TransactItems=[
{"Put": {"TableName": "T_Txn", "Item": {"pk": {"S": "tx1"}, "v": {"S": "a"}}}},
{"Put": {"TableName": "T_Txn", "Item": {"pk": {"S": "tx2"}, "v": {"S": "b"}}}},
]
)
resp = client.scan(TableName="T_Txn")
assert_eq(resp["Count"], 2)
@test("TransactWriteItems — with ConditionCheck")
def _():
# Should succeed: tx1 exists
client.transact_write_items(
TransactItems=[
{
"ConditionCheck": {
"TableName": "T_Txn",
"Key": {"pk": {"S": "tx1"}},
"ConditionExpression": "attribute_exists(pk)",
}
},
{"Put": {"TableName": "T_Txn", "Item": {"pk": {"S": "tx3"}, "v": {"S": "c"}}}},
]
)
resp = client.get_item(TableName="T_Txn", Key={"pk": {"S": "tx3"}})
assert "Item" in resp
@test("TransactWriteItems — condition failure rolls back")
def _():
try:
client.transact_write_items(
TransactItems=[
{
"ConditionCheck": {
"TableName": "T_Txn",
"Key": {"pk": {"S": "nonexistent"}},
"ConditionExpression": "attribute_exists(pk)",
}
},
{"Put": {"TableName": "T_Txn", "Item": {"pk": {"S": "tx_should_not_exist"}}}},
]
)
assert False, "Should have raised"
except ClientError as e:
assert_in(e.response["Error"]["Code"],
["TransactionCanceledException", "ConditionalCheckFailedException"])
# Verify rollback
resp = client.get_item(TableName="T_Txn", Key={"pk": {"S": "tx_should_not_exist"}})
assert "Item" not in resp
@test("TransactGetItems — retrieve multiple items atomically")
def _():
resp = client.transact_get_items(
TransactItems=[
{"Get": {"TableName": "T_Txn", "Key": {"pk": {"S": "tx1"}}}},
{"Get": {"TableName": "T_Txn", "Key": {"pk": {"S": "tx2"}}}},
]
)
items = [r["Item"] for r in resp["Responses"] if "Item" in r]
assert_eq(len(items), 2)
# ============================================================================
# 10. Global Secondary Indexes (GSI)
# ============================================================================
@test("CreateTable with GSI")
def _():
cleanup_table("T_GSI")
resp = client.create_table(
TableName="T_GSI",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "email", "AttributeType": "S"},
],
GlobalSecondaryIndexes=[
{
"IndexName": "email-index",
"KeySchema": [{"AttributeName": "email", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
],
BillingMode="PAY_PER_REQUEST",
)
assert_eq(resp["TableDescription"]["TableName"], "T_GSI")
@test("DescribeTable — shows GSI info")
def _():
resp = client.describe_table(TableName="T_GSI")
table = resp["Table"]
assert "GlobalSecondaryIndexes" in table
gsi = table["GlobalSecondaryIndexes"][0]
assert_eq(gsi["IndexName"], "email-index")
@test("GSI — Query by secondary key")
def _():
# Seed items
for i, email in enumerate(["alice@test.com", "bob@test.com", "alice@test.com"]):
client.put_item(
TableName="T_GSI",
Item={
"pk": {"S": f"user{i}"},
"email": {"S": email},
"name": {"S": f"User{i}"},
},
)
resp = client.query(
TableName="T_GSI",
IndexName="email-index",
KeyConditionExpression="email = :e",
ExpressionAttributeValues={":e": {"S": "alice@test.com"}},
)
assert_eq(resp["Count"], 2)
@test("GSI — Scan index")
def _():
resp = client.scan(TableName="T_GSI", IndexName="email-index")
assert resp["Count"] >= 3
@test("GSI — sparse index (item without GSI key not indexed)")
def _():
client.put_item(
TableName="T_GSI",
Item={"pk": {"S": "no_email_user"}, "name": {"S": "Ghost"}},
)
resp = client.scan(TableName="T_GSI", IndexName="email-index")
pks = {item["pk"]["S"] for item in resp["Items"]}
assert "no_email_user" not in pks
@test("GSI — updated item reflected in index")
def _():
client.update_item(
TableName="T_GSI",
Key={"pk": {"S": "user0"}},
UpdateExpression="SET email = :e",
ExpressionAttributeValues={":e": {"S": "newalice@test.com"}},
)
# Old email should no longer match user0
resp = client.query(
TableName="T_GSI",
IndexName="email-index",
KeyConditionExpression="email = :e",
ExpressionAttributeValues={":e": {"S": "alice@test.com"}},
)
pks = {item["pk"]["S"] for item in resp["Items"]}
assert "user0" not in pks
# New email should match user0
resp2 = client.query(
TableName="T_GSI",
IndexName="email-index",
KeyConditionExpression="email = :e",
ExpressionAttributeValues={":e": {"S": "newalice@test.com"}},
)
pks2 = {item["pk"]["S"] for item in resp2["Items"]}
assert_in("user0", pks2)
@test("GSI — deleted item removed from index")
def _():
client.delete_item(TableName="T_GSI", Key={"pk": {"S": "user1"}})
resp = client.query(
TableName="T_GSI",
IndexName="email-index",
KeyConditionExpression="email = :e",
ExpressionAttributeValues={":e": {"S": "bob@test.com"}},
)
assert_eq(resp["Count"], 0)
# ============================================================================
# 11. Edge Cases
# ============================================================================
@test("PutItem — empty string attribute")
def _():
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "empty_str"}, "val": {"S": ""}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "empty_str"}})
assert_eq(resp["Item"]["val"]["S"], "")
@test("PutItem — large item (~300KB)")
def _():
big_val = "x" * 300_000
client.put_item(
TableName="T_HashOnly",
Item={"pk": {"S": "big_item"}, "data": {"S": big_val}},
)
resp = client.get_item(TableName="T_HashOnly", Key={"pk": {"S": "big_item"}})
assert_eq(len(resp["Item"]["data"]["S"]), 300_000)
@test("PutItem — numeric key")
def _():
cleanup_table("T_NumKey")
client.create_table(
TableName="T_NumKey",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "N"}],
BillingMode="PAY_PER_REQUEST",
)
client.put_item(
TableName="T_NumKey",
Item={"id": {"N": "42"}, "data": {"S": "numeric key"}},
)
resp = client.get_item(TableName="T_NumKey", Key={"id": {"N": "42"}})
assert_eq(resp["Item"]["data"]["S"], "numeric key")
@test("Query — nonexistent partition returns 0 items")
def _():
resp = client.query(
TableName="T_HashRange",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "no_such_partition"}},
)
assert_eq(resp["Count"], 0)
# ============================================================================
# Helpers
# ============================================================================
def cleanup_key(table_name, key):
try:
client.delete_item(TableName=table_name, Key=key)
except ClientError:
pass
# ============================================================================
# Runner
# ============================================================================
def main():
# Wait for JormunDB to be reachable
print(f"\n{'='*60}")
print(f" JormunDB Python SDK Test Suite")
print(f" Endpoint: {ENDPOINT}")
print(f"{'='*60}\n")
print("Waiting for JormunDB...", end="", flush=True)
for attempt in range(30):
try:
client.list_tables()
print(" connected!\n")
break
except Exception:
print(".", end="", flush=True)
time.sleep(1)
else:
print("\n\nERROR: Could not connect to JormunDB at", ENDPOINT)
sys.exit(1)
# Collect all test functions from this module
tests = [
obj for obj in globals().values()
if callable(obj) and hasattr(obj, "_test_name")
]
# Run tests in definition order
for t in tests:
sys.stdout.write(f" {t._test_name} ... ")
sys.stdout.flush()
t()
status, name, err = results[-1]
if status == "PASS":
print("\033[32mPASS\033[0m")
else:
print(f"\033[31mFAIL\033[0m {err}")
# Summary
passed = sum(1 for s, _, _ in results if s == "PASS")
failed = sum(1 for s, _, _ in results if s == "FAIL")
total = len(results)
print(f"\n{'='*60}")
if failed == 0:
print(f" \033[32mAll {total} tests passed!\033[0m")
else:
print(f" \033[32m{passed} passed\033[0m, \033[31m{failed} failed\033[0m out of {total}")
print()
for status, name, err in results:
if status == "FAIL":
print(f" \033[31m✗ {name}\033[0m")
print(f" {err}")
print(f"{'='*60}\n")
# Cleanup test tables
print("Cleaning up test tables...")
for tbl in ["T_HashOnly", "T_HashRange", "T_Batch", "T_Txn", "T_GSI", "T_NumKey", "T_DeleteMe"]:
cleanup_table(tbl)
print("Done.\n")
sys.exit(1 if failed else 0)
if __name__ == "__main__":
main()