Files
model-tool-tests/run_suite.py

816 lines
31 KiB
Python

#!/usr/bin/env python3
"""
Universal model tool-call test suite.
Tests any OpenAI-compatible endpoint for:
1. Basic chat (non-streaming + streaming)
2. Tool calls (non-streaming + streaming)
3. Multi-turn tool response flow (non-streaming + streaming)
4. Nested/bad tool schema handling (SGLang compatibility)
5. Streaming tool call chunking (are args actually streamed?)
6. Param sweep (what vLLM params does the endpoint accept?)
Handles reasoning models (content in 'reasoning' field, null 'content'),
different finish_reason values, and empty/tool_calls arrays gracefully.
Usage:
TOOLTEST_API_BASE=... TOOLTEST_API_KEY=... TOOLTEST_MODEL=... python3 run_suite.py
python3 run_suite.py --all
python3 run_suite.py --model 1
python3 run_suite.py --filter Devstral
"""
import os
import sys
import json
import time
import httpx
import argparse
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field
# ── Helpers ──────────────────────────────────────────────────
def ts():
return datetime.now().strftime("%H:%M:%S.%f")[:-3]
def safe_choice(body: dict, index: int = 0) -> dict:
"""Safely get a choice from a response body."""
choices = body.get("choices") or []
if index < len(choices):
return choices[index]
return {}
def safe_message(body: dict) -> dict:
"""Safely get the message from the first choice."""
return safe_choice(body).get("message") or {}
def safe_delta(chunk: dict) -> dict:
"""Safely get the delta from the first choice of a streaming chunk."""
choices = chunk.get("choices") or []
if choices:
return choices[0].get("delta") or {}
return {}
def extract_content(msg: dict) -> tuple[str, str]:
"""Extract (content, reasoning) from a message, handling nulls."""
content = msg.get("content") or ""
reasoning = msg.get("reasoning") or ""
return content, reasoning
# ── Config ───────────────────────────────────────────────────
@dataclass
class ModelConfig:
api_base: str
api_key: str
model: str
@property
def label(self):
return self.model.split("/")[-1]
def load_models_env(path: Path) -> list[ModelConfig]:
"""Load models from the models.env file (pipe-delimited)."""
configs = []
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 3:
configs.append(ModelConfig(api_base=parts[0], api_key=parts[1], model=parts[2]))
return configs
def config_from_env() -> ModelConfig | None:
"""Get a single config from TOOLTEST_* environment variables."""
base = os.environ.get("TOOLTEST_API_BASE")
key = os.environ.get("TOOLTEST_API_KEY")
model = os.environ.get("TOOLTEST_MODEL")
if base and key and model:
return ModelConfig(api_base=base, api_key=key, model=model)
return None
# ── Test result types ────────────────────────────────────────
@dataclass
class TestResult:
name: str
passed: bool
detail: str = ""
duration_s: float = 0.0
@dataclass
class SuiteResult:
model: str
results: list[TestResult] = field(default_factory=list)
@property
def passed(self):
return sum(1 for r in self.results if r.passed)
@property
def total(self):
return len(self.results)
def make_client(cfg: ModelConfig) -> httpx.Client:
return httpx.Client(
timeout=120.0,
headers={
"Authorization": f"Bearer {cfg.api_key}",
"Content-Type": "application/json",
},
)
# ── Shared tool definitions ──────────────────────────────────
WEATHER_TOOL = {
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City, e.g. 'Tokyo'"}
},
"required": ["location"]
}
}
}
WRITE_FILE_TOOL = {
"type": "function",
"function": {
"name": "write_file",
"description": "Write content to a file.",
"parameters": {
"type": "object",
"properties": {
"filename": {"type": "string", "description": "Name of the file"},
"content": {"type": "string", "description": "The content to write"}
},
"required": ["filename", "content"]
}
}
}
BAD_SCHEMA_TOOL = {
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web",
"parameters": {
"type": "object",
"properties": [] # Invalid — should be {}
}
}
}
NESTED_BAD_SCHEMA_TOOL = {
"type": "function",
"function": {
"name": "message",
"description": "Send a message",
"parameters": {
"type": "object",
"properties": {
"fields": {
"type": "array",
"items": {
"type": "object",
"properties": [] # Invalid — should be {}
}
}
}
}
}
}
# ── Test functions ───────────────────────────────────────────
def test_basic_nonstream(cfg: ModelConfig) -> TestResult:
"""1. Basic non-streaming chat."""
with make_client(cfg) as c:
start = time.time()
try:
r = c.post(f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": [{"role": "user", "content": "Say hello in one word."}],
"stream": False,
"max_tokens": 64,
})
body = r.json()
dur = time.time() - start
if r.status_code != 200:
return TestResult("basic non-stream", False, f"HTTP {r.status_code}: {json.dumps(body)[:200]}", dur)
content, reasoning = extract_content(safe_message(body))
fr = safe_choice(body).get("finish_reason", "?")
if content:
return TestResult("basic non-stream", True, f"Got: {content[:80]}", dur)
elif reasoning:
return TestResult("basic non-stream", True, f"Reasoning-only (finish: {fr}): {reasoning[:80]}", dur)
else:
return TestResult("basic non-stream", False, f"Empty response (finish: {fr})", dur)
except Exception as e:
return TestResult("basic non-stream", False, f"Exception: {e}", time.time() - start)
def test_basic_stream(cfg: ModelConfig) -> TestResult:
"""2. Basic streaming chat."""
with make_client(cfg) as c:
start = time.time()
try:
with c.stream("POST", f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": [{"role": "user", "content": "Count from 1 to 5."}],
"stream": True,
"max_tokens": 64,
}) as r:
if r.status_code != 200:
body = "".join(r.iter_lines())
dur = time.time() - start
return TestResult("basic stream", False, f"HTTP {r.status_code}: {body[:200]}", dur)
full_content = ""
full_reasoning = ""
for line in r.iter_lines():
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "):
try:
chunk = json.loads(line[6:])
delta = safe_delta(chunk)
if delta.get("content"):
full_content += delta["content"]
if delta.get("reasoning"):
full_reasoning += delta["reasoning"]
except json.JSONDecodeError:
pass
dur = time.time() - start
if full_content:
return TestResult("basic stream", True, f"Got: {full_content[:80]}", dur)
elif full_reasoning:
return TestResult("basic stream", True, f"Reasoning-only: {full_reasoning[:80]}", dur)
else:
return TestResult("basic stream", False, "No content or reasoning received", dur)
except Exception as e:
return TestResult("basic stream", False, f"Exception: {e}", time.time() - start)
def test_toolcall_nonstream(cfg: ModelConfig) -> TestResult:
"""3. Tool call — non-streaming."""
with make_client(cfg) as c:
start = time.time()
try:
r = c.post(f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": [{"role": "user", "content": "What's the weather in Tokyo?"}],
"tools": [WEATHER_TOOL],
"tool_choice": "auto",
"stream": False,
"max_tokens": 256,
})
body = r.json()
dur = time.time() - start
if r.status_code != 200:
return TestResult("tool call non-stream", False, f"HTTP {r.status_code}: {json.dumps(body)[:200]}", dur)
msg = safe_message(body)
tool_calls = msg.get("tool_calls") or []
if tool_calls:
tc = tool_calls[0]
fn = tc.get("function", {})
return TestResult("tool call non-stream", True,
f"Tool: {fn.get('name','?')}, args: {fn.get('arguments','')[:60]}", dur)
else:
content, reasoning = extract_content(msg)
out = content or reasoning or "(empty)"
return TestResult("tool call non-stream", False, f"No tool call. Response: {out[:100]}", dur)
except Exception as e:
return TestResult("tool call non-stream", False, f"Exception: {e}", time.time() - start)
def test_toolcall_stream(cfg: ModelConfig) -> TestResult:
"""4. Tool call — streaming."""
with make_client(cfg) as c:
start = time.time()
try:
with c.stream("POST", f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": [{"role": "user", "content": "What's the weather in Tokyo?"}],
"tools": [WEATHER_TOOL],
"tool_choice": "auto",
"stream": True,
"max_tokens": 256,
}) as r:
if r.status_code != 200:
body = "".join(r.iter_lines())
dur = time.time() - start
return TestResult("tool call stream", False, f"HTTP {r.status_code}", dur)
tool_name = None
accumulated_args = ""
content_parts = ""
reasoning_parts = ""
for line in r.iter_lines():
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "):
try:
chunk = json.loads(line[6:])
delta = safe_delta(chunk)
tc_list = delta.get("tool_calls") or []
for tc in tc_list:
fn = tc.get("function") or {}
if fn.get("name"):
tool_name = fn["name"]
if fn.get("arguments"):
accumulated_args += fn["arguments"]
if delta.get("content"):
content_parts += delta["content"]
if delta.get("reasoning"):
reasoning_parts += delta["reasoning"]
except json.JSONDecodeError:
pass
dur = time.time() - start
if tool_name:
return TestResult("tool call stream", True,
f"Tool: {tool_name}, args: {accumulated_args[:60]}", dur)
else:
out = content_parts or reasoning_parts or "(empty)"
return TestResult("tool call stream", False, f"No tool call. Response: {out[:100]}", dur)
except Exception as e:
return TestResult("tool call stream", False, f"Exception: {e}", time.time() - start)
def test_tool_response_flow(cfg: ModelConfig, streaming: bool = False) -> TestResult:
"""5/6. Full tool call → response → follow-up flow."""
label = "tool response flow (stream)" if streaming else "tool response flow"
with make_client(cfg) as c:
start = time.time()
try:
messages = [{"role": "user", "content": "What's the weather in Tokyo?"}]
# Step 1: Get tool call
if not streaming:
r = c.post(f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": messages,
"tools": [WEATHER_TOOL],
"tool_choice": "auto",
"stream": False,
"max_tokens": 256,
})
body = r.json()
if r.status_code != 200:
return TestResult(label, False, f"Step 1 HTTP {r.status_code}", time.time() - start)
msg = safe_message(body)
else:
tool_name = None
tool_id = None
accumulated_args = ""
with c.stream("POST", f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": messages,
"tools": [WEATHER_TOOL],
"tool_choice": "auto",
"stream": True,
"max_tokens": 256,
}) as r:
if r.status_code != 200:
return TestResult(label, False, f"Step 1 HTTP {r.status_code}", time.time() - start)
for line in r.iter_lines():
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "):
try:
chunk = json.loads(line[6:])
delta = safe_delta(chunk)
for tc in (delta.get("tool_calls") or []):
if tc.get("id"):
tool_id = tc["id"]
fn = tc.get("function") or {}
if fn.get("name"):
tool_name = fn["name"]
if fn.get("arguments"):
accumulated_args += fn["arguments"]
except json.JSONDecodeError:
pass
if not tool_name:
return TestResult(label, False, "No tool call in step 1", time.time() - start)
msg = {
"role": "assistant",
"tool_calls": [{
"id": tool_id or "call_0",
"type": "function",
"function": {"name": tool_name, "arguments": accumulated_args}
}]
}
tool_calls = msg.get("tool_calls") or []
if not tool_calls:
return TestResult(label, False, "No tool call in step 1", time.time() - start)
tc = tool_calls[0]
tc_id = tc.get("id", "call_0")
# Step 2: Send tool response
messages.append(msg)
messages.append({
"role": "tool",
"tool_call_id": tc_id,
"content": json.dumps({"location": "Tokyo", "temperature": "22°C", "condition": "Partly cloudy"}),
})
# Step 3: Get follow-up
r2 = c.post(f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": messages,
"tools": [WEATHER_TOOL],
"stream": False,
"max_tokens": 256,
})
body2 = r2.json()
dur = time.time() - start
if r2.status_code != 200:
return TestResult(label, False, f"Step 3 HTTP {r2.status_code}", dur)
final_msg = safe_message(body2)
final_content, final_reasoning = extract_content(final_msg)
final = final_content or final_reasoning or ""
# Check the model actually used the tool data
ok = "22" in final
indicators = ["i don't have", "i cannot access", "don't have access", "cannot provide real-time"]
for ind in indicators:
if ind in final.lower():
ok = False
break
if not final_content and final_reasoning:
return TestResult(label, ok, f"Reasoning-only (used data: {'yes' if ok else 'no'}) — {final[:100]}", dur)
return TestResult(label, ok, f"{'Used' if ok else 'Did NOT use'} tool result — {final[:100]}", dur)
except Exception as e:
return TestResult(label, False, f"Exception: {e}", time.time() - start)
def test_bad_tool_schema(cfg: ModelConfig) -> TestResult:
"""7. OpenClaw-style tool with properties=[] (tests schema validation/middleware)."""
with make_client(cfg) as c:
start = time.time()
try:
r = c.post(f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": [{"role": "user", "content": "Search for cats"}],
"tools": [BAD_SCHEMA_TOOL],
"tool_choice": "auto",
"stream": False,
"max_tokens": 128,
})
body = r.json()
dur = time.time() - start
if r.status_code != 200:
err = ""
try:
err = body.get("error", {}).get("message", "")[:150]
except Exception:
err = json.dumps(body)[:150]
return TestResult("bad tool schema (properties=[])", False, f"HTTP {r.status_code}: {err}", dur)
return TestResult("bad tool schema (properties=[])", True, "Endpoint accepted/fixed bad schema", dur)
except Exception as e:
return TestResult("bad tool schema (properties=[])", False, f"Exception: {e}", time.time() - start)
def test_nested_bad_schema(cfg: ModelConfig) -> TestResult:
"""8. Nested properties=[] inside items (the Tool 21 bug)."""
with make_client(cfg) as c:
start = time.time()
try:
r = c.post(f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": [{"role": "user", "content": "Send a message to Bob"}],
"tools": [NESTED_BAD_SCHEMA_TOOL],
"tool_choice": "auto",
"stream": False,
"max_tokens": 128,
})
body = r.json()
dur = time.time() - start
if r.status_code != 200:
err = ""
try:
err = body.get("error", {}).get("message", "")[:150]
except Exception:
err = json.dumps(body)[:150]
return TestResult("nested bad schema (items.properties=[])", False, f"HTTP {r.status_code}: {err}", dur)
return TestResult("nested bad schema (items.properties=[])", True, "Endpoint accepted/fixed nested bad schema", dur)
except Exception as e:
return TestResult("nested bad schema (items.properties=[])", False, f"Exception: {e}", time.time() - start)
def test_streaming_tool_chunks(cfg: ModelConfig) -> TestResult:
"""9. Streaming tool call chunking — are args actually streamed in multiple chunks?"""
with make_client(cfg) as c:
start = time.time()
try:
with c.stream("POST", f"{cfg.api_base}/chat/completions", json={
"model": cfg.model,
"messages": [{
"role": "user",
"content": "Write a Python hello world and save it using the write_file tool."
}],
"tools": [WRITE_FILE_TOOL],
"tool_choice": "auto",
"stream": True,
"max_tokens": 1024,
}) as r:
if r.status_code != 200:
dur = time.time() - start
return TestResult("streaming tool chunking", False, f"HTTP {r.status_code}", dur)
tool_name = None
arg_chunks = 0
accumulated_args = ""
content_chunks = 0
reasoning_chunks = 0
for line in r.iter_lines():
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "):
try:
chunk = json.loads(line[6:])
delta = safe_delta(chunk)
for tc in (delta.get("tool_calls") or []):
fn = tc.get("function") or {}
if fn.get("name"):
tool_name = fn["name"]
if fn.get("arguments"):
arg_chunks += 1
accumulated_args += fn["arguments"]
if delta.get("content"):
content_chunks += 1
if delta.get("reasoning"):
reasoning_chunks += 1
except json.JSONDecodeError:
pass
dur = time.time() - start
if not tool_name:
if content_chunks > 0 or reasoning_chunks > 0:
return TestResult("streaming tool chunking", False,
f"No tool call — model produced {content_chunks} content + {reasoning_chunks} reasoning chunks", dur)
return TestResult("streaming tool chunking", False, "No tool call and no content", dur)
# Evaluate chunking quality
if arg_chunks > 1:
return TestResult("streaming tool chunking", True,
f"Args streamed in {arg_chunks} chunks ({len(accumulated_args)} chars)", dur)
elif arg_chunks == 1 and len(accumulated_args) > 500:
return TestResult("streaming tool chunking", False,
f"Args in 1 chunk but {len(accumulated_args)} chars — buffered, not streamed", dur)
elif arg_chunks == 1:
return TestResult("streaming tool chunking", True,
f"Args in 1 chunk ({len(accumulated_args)} chars — may be too short to stream)", dur)
else:
return TestResult("streaming tool chunking", False, "Tool name only, no arg chunks", dur)
except Exception as e:
return TestResult("streaming tool chunking", False, f"Exception: {e}", time.time() - start)
def test_param_sweep(cfg: ModelConfig) -> list[TestResult]:
"""10. Parameter sweep — which vLLM params does the endpoint accept?"""
results = []
base_req = {
"model": cfg.model,
"messages": [{"role": "user", "content": "Say hi."}],
"stream": False,
"max_tokens": 32,
}
extra_params = [
("chat_template_kwargs", {"enable_thinking": False}),
("guided_json", None),
("guided_regex", None),
("response_format", {"type": "json_object"}),
("n", 1),
("presence_penalty", 0.0),
("frequency_penalty", 0.0),
("top_p", 1.0),
("temperature", 0.7),
("seed", 42),
("stop", ["\n"]),
("logprobs+top_logprobs", {"logprobs": True, "top_logprobs": 5}),
]
with make_client(cfg) as c:
for name, val in extra_params:
start = time.time()
try:
if isinstance(val, dict):
req = {**base_req, **val}
else:
req = {**base_req, name: val}
r = c.post(f"{cfg.api_base}/chat/completions", json=req)
dur = time.time() - start
ok = r.status_code == 200
detail = f"HTTP {r.status_code}"
if not ok:
try:
detail += f": {r.json().get('error', {}).get('message', '')[:80]}"
except Exception:
pass
results.append(TestResult(f"param: {name}", ok, detail, dur))
except Exception as e:
results.append(TestResult(f"param: {name}", False, f"Exception: {e}", time.time() - start))
return results
# ── Suite runner ─────────────────────────────────────────────
ALL_TESTS = [
test_basic_nonstream,
test_basic_stream,
test_toolcall_nonstream,
test_toolcall_stream,
lambda cfg: test_tool_response_flow(cfg, streaming=False),
lambda cfg: test_tool_response_flow(cfg, streaming=True),
test_bad_tool_schema,
test_nested_bad_schema,
test_streaming_tool_chunks,
]
def run_suite(cfg: ModelConfig, verbose: bool = True) -> SuiteResult:
"""Run the full test suite against one model config."""
result = SuiteResult(model=cfg.model)
print(f"\n{'='*60}")
print(f"Testing: {cfg.model}")
print(f"API: {cfg.api_base}")
print(f"{'='*60}")
for test_fn in ALL_TESTS:
name = (test_fn.__doc__ or "").strip().split("\n")[0] or test_fn.__name__
if verbose:
print(f"\n[{ts()}] Running: {name}...")
tr = test_fn(cfg)
if isinstance(tr, list):
result.results.extend(tr)
else:
result.results.append(tr)
if verbose:
if isinstance(tr, list):
for r in tr:
s = "" if r.passed else ""
print(f" {s} {r.name}: {r.detail} ({r.duration_s:.1f}s)")
else:
s = "" if tr.passed else ""
print(f" {s} {tr.name}: {tr.detail} ({tr.duration_s:.1f}s)")
# Param sweep
if verbose:
print(f"\n[{ts()}] Running: parameter sweep...")
sweep_results = test_param_sweep(cfg)
result.results.extend(sweep_results)
if verbose:
for r in sweep_results:
s = "" if r.passed else ""
print(f" {s} {r.name}: {r.detail} ({r.duration_s:.1f}s)")
return result
def print_summary(results: list[SuiteResult]):
"""Print a final summary across all models."""
print(f"\n\n{'='*60}")
print("FINAL SUMMARY")
print(f"{'='*60}")
for sr in results:
passed = sr.passed
total = sr.total
pct = (passed / total * 100) if total else 0
label = sr.model.split("/")[-1]
print(f"\n {label}: {passed}/{total} passed ({pct:.0f}%)")
for r in sr.results:
if not r.passed:
print(f"{r.name}: {r.detail[:80]}")
# Cross-model comparison for key tests
print(f"\n{''*60}")
print("CROSS-MODEL COMPARISON")
print(f"{''*60}")
key_tests = [
"basic non-stream",
"basic stream",
"tool call non-stream",
"tool call stream",
"tool response flow",
"tool response flow (stream)",
"streaming tool chunking",
"bad tool schema (properties=[])",
"nested bad schema (items.properties=[])",
]
# Calculate column width
labels = [sr.model.split("/")[-1][:18] for sr in results]
col_w = max(len(l) for l in labels) if labels else 16
col_w = max(col_w, 16)
header = f"{'Test':<40}"
for l in labels:
header += f" {l:>{col_w}}"
print(header)
print("" * len(header))
for test_name in key_tests:
row = f"{test_name:<40}"
for sr in results:
match = [r for r in sr.results if r.name == test_name]
if match:
status = "" if match[0].passed else ""
row += f" {status:>{col_w}}"
else:
row += f" {'':>{col_w}}"
print(row)
print(f"\n{'='*60}")
# ── CLI ──────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Universal model tool-call test suite")
parser.add_argument("--all", action="store_true", help="Test all models from models.env")
parser.add_argument("--model", type=int, help="Test model by 1-based index from models.env")
parser.add_argument("--filter", type=str, help="Test models matching substring")
parser.add_argument("--quiet", action="store_true", help="Less output per test")
args = parser.parse_args()
models_path = Path(__file__).parent / "models.env"
configs: list[ModelConfig] = []
if args.all:
if not models_path.exists():
print("ERROR: models.env not found")
sys.exit(1)
configs = load_models_env(models_path)
elif args.model:
if not models_path.exists():
print("ERROR: models.env not found")
sys.exit(1)
all_configs = load_models_env(models_path)
if args.model < 1 or args.model > len(all_configs):
print(f"ERROR: --model index {args.model} out of range (1-{len(all_configs)})")
sys.exit(1)
configs = [all_configs[args.model - 1]]
elif args.filter:
if not models_path.exists():
print("ERROR: models.env not found")
sys.exit(1)
all_configs = load_models_env(models_path)
configs = [c for c in all_configs if args.filter.lower() in c.model.lower()]
if not configs:
print(f"No models matching '{args.filter}'")
sys.exit(1)
else:
cfg = config_from_env()
if cfg:
configs = [cfg]
else:
print("No model specified. Use --all, --model N, --filter NAME, or set TOOLTEST_* env vars.")
if models_path.exists():
print("\nAvailable models from models.env:")
for i, c in enumerate(load_models_env(models_path), 1):
print(f" {i}. {c.model} @ {c.api_base}")
sys.exit(1)
all_results: list[SuiteResult] = []
for cfg in configs:
sr = run_suite(cfg, verbose=not args.quiet)
all_results.append(sr)
print_summary(all_results)
if any(sr.passed < sr.total for sr in all_results):
sys.exit(1)
if __name__ == "__main__":
main()