816 lines
31 KiB
Python
816 lines
31 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Universal model tool-call test suite.
|
|
|
|
Tests any OpenAI-compatible endpoint for:
|
|
1. Basic chat (non-streaming + streaming)
|
|
2. Tool calls (non-streaming + streaming)
|
|
3. Multi-turn tool response flow (non-streaming + streaming)
|
|
4. Nested/bad tool schema handling (SGLang compatibility)
|
|
5. Streaming tool call chunking (are args actually streamed?)
|
|
6. Param sweep (what vLLM params does the endpoint accept?)
|
|
|
|
Handles reasoning models (content in 'reasoning' field, null 'content'),
|
|
different finish_reason values, and empty/tool_calls arrays gracefully.
|
|
|
|
Usage:
|
|
TOOLTEST_API_BASE=... TOOLTEST_API_KEY=... TOOLTEST_MODEL=... python3 run_suite.py
|
|
python3 run_suite.py --all
|
|
python3 run_suite.py --model 1
|
|
python3 run_suite.py --filter Devstral
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import httpx
|
|
import argparse
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────
|
|
|
|
def ts():
|
|
return datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
|
|
|
|
def safe_choice(body: dict, index: int = 0) -> dict:
|
|
"""Safely get a choice from a response body."""
|
|
choices = body.get("choices") or []
|
|
if index < len(choices):
|
|
return choices[index]
|
|
return {}
|
|
|
|
|
|
def safe_message(body: dict) -> dict:
|
|
"""Safely get the message from the first choice."""
|
|
return safe_choice(body).get("message") or {}
|
|
|
|
|
|
def safe_delta(chunk: dict) -> dict:
|
|
"""Safely get the delta from the first choice of a streaming chunk."""
|
|
choices = chunk.get("choices") or []
|
|
if choices:
|
|
return choices[0].get("delta") or {}
|
|
return {}
|
|
|
|
|
|
def extract_content(msg: dict) -> tuple[str, str]:
|
|
"""Extract (content, reasoning) from a message, handling nulls."""
|
|
content = msg.get("content") or ""
|
|
reasoning = msg.get("reasoning") or ""
|
|
return content, reasoning
|
|
|
|
|
|
# ── Config ───────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class ModelConfig:
|
|
api_base: str
|
|
api_key: str
|
|
model: str
|
|
|
|
@property
|
|
def label(self):
|
|
return self.model.split("/")[-1]
|
|
|
|
|
|
def load_models_env(path: Path) -> list[ModelConfig]:
|
|
"""Load models from the models.env file (pipe-delimited)."""
|
|
configs = []
|
|
for line in path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
parts = [p.strip() for p in line.split("|")]
|
|
if len(parts) >= 3:
|
|
configs.append(ModelConfig(api_base=parts[0], api_key=parts[1], model=parts[2]))
|
|
return configs
|
|
|
|
|
|
def config_from_env() -> ModelConfig | None:
|
|
"""Get a single config from TOOLTEST_* environment variables."""
|
|
base = os.environ.get("TOOLTEST_API_BASE")
|
|
key = os.environ.get("TOOLTEST_API_KEY")
|
|
model = os.environ.get("TOOLTEST_MODEL")
|
|
if base and key and model:
|
|
return ModelConfig(api_base=base, api_key=key, model=model)
|
|
return None
|
|
|
|
|
|
# ── Test result types ────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class TestResult:
|
|
name: str
|
|
passed: bool
|
|
detail: str = ""
|
|
duration_s: float = 0.0
|
|
|
|
|
|
@dataclass
|
|
class SuiteResult:
|
|
model: str
|
|
results: list[TestResult] = field(default_factory=list)
|
|
|
|
@property
|
|
def passed(self):
|
|
return sum(1 for r in self.results if r.passed)
|
|
|
|
@property
|
|
def total(self):
|
|
return len(self.results)
|
|
|
|
|
|
def make_client(cfg: ModelConfig) -> httpx.Client:
|
|
return httpx.Client(
|
|
timeout=120.0,
|
|
headers={
|
|
"Authorization": f"Bearer {cfg.api_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
)
|
|
|
|
|
|
# ── Shared tool definitions ──────────────────────────────────
|
|
|
|
WEATHER_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_weather",
|
|
"description": "Get the current weather for a location",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"location": {"type": "string", "description": "City, e.g. 'Tokyo'"}
|
|
},
|
|
"required": ["location"]
|
|
}
|
|
}
|
|
}
|
|
|
|
WRITE_FILE_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "write_file",
|
|
"description": "Write content to a file.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"filename": {"type": "string", "description": "Name of the file"},
|
|
"content": {"type": "string", "description": "The content to write"}
|
|
},
|
|
"required": ["filename", "content"]
|
|
}
|
|
}
|
|
}
|
|
|
|
BAD_SCHEMA_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "web_search",
|
|
"description": "Search the web",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": [] # Invalid — should be {}
|
|
}
|
|
}
|
|
}
|
|
|
|
NESTED_BAD_SCHEMA_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "message",
|
|
"description": "Send a message",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"fields": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": [] # Invalid — should be {}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
# ── Test functions ───────────────────────────────────────────
|
|
|
|
def test_basic_nonstream(cfg: ModelConfig) -> TestResult:
|
|
"""1. Basic non-streaming chat."""
|
|
with make_client(cfg) as c:
|
|
start = time.time()
|
|
try:
|
|
r = c.post(f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": [{"role": "user", "content": "Say hello in one word."}],
|
|
"stream": False,
|
|
"max_tokens": 64,
|
|
})
|
|
body = r.json()
|
|
dur = time.time() - start
|
|
if r.status_code != 200:
|
|
return TestResult("basic non-stream", False, f"HTTP {r.status_code}: {json.dumps(body)[:200]}", dur)
|
|
content, reasoning = extract_content(safe_message(body))
|
|
fr = safe_choice(body).get("finish_reason", "?")
|
|
if content:
|
|
return TestResult("basic non-stream", True, f"Got: {content[:80]}", dur)
|
|
elif reasoning:
|
|
return TestResult("basic non-stream", True, f"Reasoning-only (finish: {fr}): {reasoning[:80]}", dur)
|
|
else:
|
|
return TestResult("basic non-stream", False, f"Empty response (finish: {fr})", dur)
|
|
except Exception as e:
|
|
return TestResult("basic non-stream", False, f"Exception: {e}", time.time() - start)
|
|
|
|
|
|
def test_basic_stream(cfg: ModelConfig) -> TestResult:
|
|
"""2. Basic streaming chat."""
|
|
with make_client(cfg) as c:
|
|
start = time.time()
|
|
try:
|
|
with c.stream("POST", f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": [{"role": "user", "content": "Count from 1 to 5."}],
|
|
"stream": True,
|
|
"max_tokens": 64,
|
|
}) as r:
|
|
if r.status_code != 200:
|
|
body = "".join(r.iter_lines())
|
|
dur = time.time() - start
|
|
return TestResult("basic stream", False, f"HTTP {r.status_code}: {body[:200]}", dur)
|
|
full_content = ""
|
|
full_reasoning = ""
|
|
for line in r.iter_lines():
|
|
if not line or line == "data: [DONE]":
|
|
continue
|
|
if line.startswith("data: "):
|
|
try:
|
|
chunk = json.loads(line[6:])
|
|
delta = safe_delta(chunk)
|
|
if delta.get("content"):
|
|
full_content += delta["content"]
|
|
if delta.get("reasoning"):
|
|
full_reasoning += delta["reasoning"]
|
|
except json.JSONDecodeError:
|
|
pass
|
|
dur = time.time() - start
|
|
if full_content:
|
|
return TestResult("basic stream", True, f"Got: {full_content[:80]}", dur)
|
|
elif full_reasoning:
|
|
return TestResult("basic stream", True, f"Reasoning-only: {full_reasoning[:80]}", dur)
|
|
else:
|
|
return TestResult("basic stream", False, "No content or reasoning received", dur)
|
|
except Exception as e:
|
|
return TestResult("basic stream", False, f"Exception: {e}", time.time() - start)
|
|
|
|
|
|
def test_toolcall_nonstream(cfg: ModelConfig) -> TestResult:
|
|
"""3. Tool call — non-streaming."""
|
|
with make_client(cfg) as c:
|
|
start = time.time()
|
|
try:
|
|
r = c.post(f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": [{"role": "user", "content": "What's the weather in Tokyo?"}],
|
|
"tools": [WEATHER_TOOL],
|
|
"tool_choice": "auto",
|
|
"stream": False,
|
|
"max_tokens": 256,
|
|
})
|
|
body = r.json()
|
|
dur = time.time() - start
|
|
if r.status_code != 200:
|
|
return TestResult("tool call non-stream", False, f"HTTP {r.status_code}: {json.dumps(body)[:200]}", dur)
|
|
msg = safe_message(body)
|
|
tool_calls = msg.get("tool_calls") or []
|
|
if tool_calls:
|
|
tc = tool_calls[0]
|
|
fn = tc.get("function", {})
|
|
return TestResult("tool call non-stream", True,
|
|
f"Tool: {fn.get('name','?')}, args: {fn.get('arguments','')[:60]}", dur)
|
|
else:
|
|
content, reasoning = extract_content(msg)
|
|
out = content or reasoning or "(empty)"
|
|
return TestResult("tool call non-stream", False, f"No tool call. Response: {out[:100]}", dur)
|
|
except Exception as e:
|
|
return TestResult("tool call non-stream", False, f"Exception: {e}", time.time() - start)
|
|
|
|
|
|
def test_toolcall_stream(cfg: ModelConfig) -> TestResult:
|
|
"""4. Tool call — streaming."""
|
|
with make_client(cfg) as c:
|
|
start = time.time()
|
|
try:
|
|
with c.stream("POST", f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": [{"role": "user", "content": "What's the weather in Tokyo?"}],
|
|
"tools": [WEATHER_TOOL],
|
|
"tool_choice": "auto",
|
|
"stream": True,
|
|
"max_tokens": 256,
|
|
}) as r:
|
|
if r.status_code != 200:
|
|
body = "".join(r.iter_lines())
|
|
dur = time.time() - start
|
|
return TestResult("tool call stream", False, f"HTTP {r.status_code}", dur)
|
|
tool_name = None
|
|
accumulated_args = ""
|
|
content_parts = ""
|
|
reasoning_parts = ""
|
|
for line in r.iter_lines():
|
|
if not line or line == "data: [DONE]":
|
|
continue
|
|
if line.startswith("data: "):
|
|
try:
|
|
chunk = json.loads(line[6:])
|
|
delta = safe_delta(chunk)
|
|
tc_list = delta.get("tool_calls") or []
|
|
for tc in tc_list:
|
|
fn = tc.get("function") or {}
|
|
if fn.get("name"):
|
|
tool_name = fn["name"]
|
|
if fn.get("arguments"):
|
|
accumulated_args += fn["arguments"]
|
|
if delta.get("content"):
|
|
content_parts += delta["content"]
|
|
if delta.get("reasoning"):
|
|
reasoning_parts += delta["reasoning"]
|
|
except json.JSONDecodeError:
|
|
pass
|
|
dur = time.time() - start
|
|
if tool_name:
|
|
return TestResult("tool call stream", True,
|
|
f"Tool: {tool_name}, args: {accumulated_args[:60]}", dur)
|
|
else:
|
|
out = content_parts or reasoning_parts or "(empty)"
|
|
return TestResult("tool call stream", False, f"No tool call. Response: {out[:100]}", dur)
|
|
except Exception as e:
|
|
return TestResult("tool call stream", False, f"Exception: {e}", time.time() - start)
|
|
|
|
|
|
def test_tool_response_flow(cfg: ModelConfig, streaming: bool = False) -> TestResult:
|
|
"""5/6. Full tool call → response → follow-up flow."""
|
|
label = "tool response flow (stream)" if streaming else "tool response flow"
|
|
with make_client(cfg) as c:
|
|
start = time.time()
|
|
try:
|
|
messages = [{"role": "user", "content": "What's the weather in Tokyo?"}]
|
|
|
|
# Step 1: Get tool call
|
|
if not streaming:
|
|
r = c.post(f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": messages,
|
|
"tools": [WEATHER_TOOL],
|
|
"tool_choice": "auto",
|
|
"stream": False,
|
|
"max_tokens": 256,
|
|
})
|
|
body = r.json()
|
|
if r.status_code != 200:
|
|
return TestResult(label, False, f"Step 1 HTTP {r.status_code}", time.time() - start)
|
|
msg = safe_message(body)
|
|
else:
|
|
tool_name = None
|
|
tool_id = None
|
|
accumulated_args = ""
|
|
with c.stream("POST", f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": messages,
|
|
"tools": [WEATHER_TOOL],
|
|
"tool_choice": "auto",
|
|
"stream": True,
|
|
"max_tokens": 256,
|
|
}) as r:
|
|
if r.status_code != 200:
|
|
return TestResult(label, False, f"Step 1 HTTP {r.status_code}", time.time() - start)
|
|
for line in r.iter_lines():
|
|
if not line or line == "data: [DONE]":
|
|
continue
|
|
if line.startswith("data: "):
|
|
try:
|
|
chunk = json.loads(line[6:])
|
|
delta = safe_delta(chunk)
|
|
for tc in (delta.get("tool_calls") or []):
|
|
if tc.get("id"):
|
|
tool_id = tc["id"]
|
|
fn = tc.get("function") or {}
|
|
if fn.get("name"):
|
|
tool_name = fn["name"]
|
|
if fn.get("arguments"):
|
|
accumulated_args += fn["arguments"]
|
|
except json.JSONDecodeError:
|
|
pass
|
|
if not tool_name:
|
|
return TestResult(label, False, "No tool call in step 1", time.time() - start)
|
|
msg = {
|
|
"role": "assistant",
|
|
"tool_calls": [{
|
|
"id": tool_id or "call_0",
|
|
"type": "function",
|
|
"function": {"name": tool_name, "arguments": accumulated_args}
|
|
}]
|
|
}
|
|
|
|
tool_calls = msg.get("tool_calls") or []
|
|
if not tool_calls:
|
|
return TestResult(label, False, "No tool call in step 1", time.time() - start)
|
|
|
|
tc = tool_calls[0]
|
|
tc_id = tc.get("id", "call_0")
|
|
|
|
# Step 2: Send tool response
|
|
messages.append(msg)
|
|
messages.append({
|
|
"role": "tool",
|
|
"tool_call_id": tc_id,
|
|
"content": json.dumps({"location": "Tokyo", "temperature": "22°C", "condition": "Partly cloudy"}),
|
|
})
|
|
|
|
# Step 3: Get follow-up
|
|
r2 = c.post(f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": messages,
|
|
"tools": [WEATHER_TOOL],
|
|
"stream": False,
|
|
"max_tokens": 256,
|
|
})
|
|
body2 = r2.json()
|
|
dur = time.time() - start
|
|
if r2.status_code != 200:
|
|
return TestResult(label, False, f"Step 3 HTTP {r2.status_code}", dur)
|
|
|
|
final_msg = safe_message(body2)
|
|
final_content, final_reasoning = extract_content(final_msg)
|
|
final = final_content or final_reasoning or ""
|
|
|
|
# Check the model actually used the tool data
|
|
ok = "22" in final
|
|
indicators = ["i don't have", "i cannot access", "don't have access", "cannot provide real-time"]
|
|
for ind in indicators:
|
|
if ind in final.lower():
|
|
ok = False
|
|
break
|
|
if not final_content and final_reasoning:
|
|
return TestResult(label, ok, f"Reasoning-only (used data: {'yes' if ok else 'no'}) — {final[:100]}", dur)
|
|
return TestResult(label, ok, f"{'Used' if ok else 'Did NOT use'} tool result — {final[:100]}", dur)
|
|
except Exception as e:
|
|
return TestResult(label, False, f"Exception: {e}", time.time() - start)
|
|
|
|
|
|
def test_bad_tool_schema(cfg: ModelConfig) -> TestResult:
|
|
"""7. OpenClaw-style tool with properties=[] (tests schema validation/middleware)."""
|
|
with make_client(cfg) as c:
|
|
start = time.time()
|
|
try:
|
|
r = c.post(f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": [{"role": "user", "content": "Search for cats"}],
|
|
"tools": [BAD_SCHEMA_TOOL],
|
|
"tool_choice": "auto",
|
|
"stream": False,
|
|
"max_tokens": 128,
|
|
})
|
|
body = r.json()
|
|
dur = time.time() - start
|
|
if r.status_code != 200:
|
|
err = ""
|
|
try:
|
|
err = body.get("error", {}).get("message", "")[:150]
|
|
except Exception:
|
|
err = json.dumps(body)[:150]
|
|
return TestResult("bad tool schema (properties=[])", False, f"HTTP {r.status_code}: {err}", dur)
|
|
return TestResult("bad tool schema (properties=[])", True, "Endpoint accepted/fixed bad schema", dur)
|
|
except Exception as e:
|
|
return TestResult("bad tool schema (properties=[])", False, f"Exception: {e}", time.time() - start)
|
|
|
|
|
|
def test_nested_bad_schema(cfg: ModelConfig) -> TestResult:
|
|
"""8. Nested properties=[] inside items (the Tool 21 bug)."""
|
|
with make_client(cfg) as c:
|
|
start = time.time()
|
|
try:
|
|
r = c.post(f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": [{"role": "user", "content": "Send a message to Bob"}],
|
|
"tools": [NESTED_BAD_SCHEMA_TOOL],
|
|
"tool_choice": "auto",
|
|
"stream": False,
|
|
"max_tokens": 128,
|
|
})
|
|
body = r.json()
|
|
dur = time.time() - start
|
|
if r.status_code != 200:
|
|
err = ""
|
|
try:
|
|
err = body.get("error", {}).get("message", "")[:150]
|
|
except Exception:
|
|
err = json.dumps(body)[:150]
|
|
return TestResult("nested bad schema (items.properties=[])", False, f"HTTP {r.status_code}: {err}", dur)
|
|
return TestResult("nested bad schema (items.properties=[])", True, "Endpoint accepted/fixed nested bad schema", dur)
|
|
except Exception as e:
|
|
return TestResult("nested bad schema (items.properties=[])", False, f"Exception: {e}", time.time() - start)
|
|
|
|
|
|
def test_streaming_tool_chunks(cfg: ModelConfig) -> TestResult:
|
|
"""9. Streaming tool call chunking — are args actually streamed in multiple chunks?"""
|
|
with make_client(cfg) as c:
|
|
start = time.time()
|
|
try:
|
|
with c.stream("POST", f"{cfg.api_base}/chat/completions", json={
|
|
"model": cfg.model,
|
|
"messages": [{
|
|
"role": "user",
|
|
"content": "Write a Python hello world and save it using the write_file tool."
|
|
}],
|
|
"tools": [WRITE_FILE_TOOL],
|
|
"tool_choice": "auto",
|
|
"stream": True,
|
|
"max_tokens": 1024,
|
|
}) as r:
|
|
if r.status_code != 200:
|
|
dur = time.time() - start
|
|
return TestResult("streaming tool chunking", False, f"HTTP {r.status_code}", dur)
|
|
|
|
tool_name = None
|
|
arg_chunks = 0
|
|
accumulated_args = ""
|
|
content_chunks = 0
|
|
reasoning_chunks = 0
|
|
for line in r.iter_lines():
|
|
if not line or line == "data: [DONE]":
|
|
continue
|
|
if line.startswith("data: "):
|
|
try:
|
|
chunk = json.loads(line[6:])
|
|
delta = safe_delta(chunk)
|
|
for tc in (delta.get("tool_calls") or []):
|
|
fn = tc.get("function") or {}
|
|
if fn.get("name"):
|
|
tool_name = fn["name"]
|
|
if fn.get("arguments"):
|
|
arg_chunks += 1
|
|
accumulated_args += fn["arguments"]
|
|
if delta.get("content"):
|
|
content_chunks += 1
|
|
if delta.get("reasoning"):
|
|
reasoning_chunks += 1
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
dur = time.time() - start
|
|
if not tool_name:
|
|
if content_chunks > 0 or reasoning_chunks > 0:
|
|
return TestResult("streaming tool chunking", False,
|
|
f"No tool call — model produced {content_chunks} content + {reasoning_chunks} reasoning chunks", dur)
|
|
return TestResult("streaming tool chunking", False, "No tool call and no content", dur)
|
|
|
|
# Evaluate chunking quality
|
|
if arg_chunks > 1:
|
|
return TestResult("streaming tool chunking", True,
|
|
f"Args streamed in {arg_chunks} chunks ({len(accumulated_args)} chars)", dur)
|
|
elif arg_chunks == 1 and len(accumulated_args) > 500:
|
|
return TestResult("streaming tool chunking", False,
|
|
f"Args in 1 chunk but {len(accumulated_args)} chars — buffered, not streamed", dur)
|
|
elif arg_chunks == 1:
|
|
return TestResult("streaming tool chunking", True,
|
|
f"Args in 1 chunk ({len(accumulated_args)} chars — may be too short to stream)", dur)
|
|
else:
|
|
return TestResult("streaming tool chunking", False, "Tool name only, no arg chunks", dur)
|
|
except Exception as e:
|
|
return TestResult("streaming tool chunking", False, f"Exception: {e}", time.time() - start)
|
|
|
|
|
|
def test_param_sweep(cfg: ModelConfig) -> list[TestResult]:
|
|
"""10. Parameter sweep — which vLLM params does the endpoint accept?"""
|
|
results = []
|
|
base_req = {
|
|
"model": cfg.model,
|
|
"messages": [{"role": "user", "content": "Say hi."}],
|
|
"stream": False,
|
|
"max_tokens": 32,
|
|
}
|
|
extra_params = [
|
|
("chat_template_kwargs", {"enable_thinking": False}),
|
|
("guided_json", None),
|
|
("guided_regex", None),
|
|
("response_format", {"type": "json_object"}),
|
|
("n", 1),
|
|
("presence_penalty", 0.0),
|
|
("frequency_penalty", 0.0),
|
|
("top_p", 1.0),
|
|
("temperature", 0.7),
|
|
("seed", 42),
|
|
("stop", ["\n"]),
|
|
("logprobs+top_logprobs", {"logprobs": True, "top_logprobs": 5}),
|
|
]
|
|
|
|
with make_client(cfg) as c:
|
|
for name, val in extra_params:
|
|
start = time.time()
|
|
try:
|
|
if isinstance(val, dict):
|
|
req = {**base_req, **val}
|
|
else:
|
|
req = {**base_req, name: val}
|
|
r = c.post(f"{cfg.api_base}/chat/completions", json=req)
|
|
dur = time.time() - start
|
|
ok = r.status_code == 200
|
|
detail = f"HTTP {r.status_code}"
|
|
if not ok:
|
|
try:
|
|
detail += f": {r.json().get('error', {}).get('message', '')[:80]}"
|
|
except Exception:
|
|
pass
|
|
results.append(TestResult(f"param: {name}", ok, detail, dur))
|
|
except Exception as e:
|
|
results.append(TestResult(f"param: {name}", False, f"Exception: {e}", time.time() - start))
|
|
|
|
return results
|
|
|
|
|
|
# ── Suite runner ─────────────────────────────────────────────
|
|
|
|
ALL_TESTS = [
|
|
test_basic_nonstream,
|
|
test_basic_stream,
|
|
test_toolcall_nonstream,
|
|
test_toolcall_stream,
|
|
lambda cfg: test_tool_response_flow(cfg, streaming=False),
|
|
lambda cfg: test_tool_response_flow(cfg, streaming=True),
|
|
test_bad_tool_schema,
|
|
test_nested_bad_schema,
|
|
test_streaming_tool_chunks,
|
|
]
|
|
|
|
|
|
def run_suite(cfg: ModelConfig, verbose: bool = True) -> SuiteResult:
|
|
"""Run the full test suite against one model config."""
|
|
result = SuiteResult(model=cfg.model)
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Testing: {cfg.model}")
|
|
print(f"API: {cfg.api_base}")
|
|
print(f"{'='*60}")
|
|
|
|
for test_fn in ALL_TESTS:
|
|
name = (test_fn.__doc__ or "").strip().split("\n")[0] or test_fn.__name__
|
|
if verbose:
|
|
print(f"\n[{ts()}] Running: {name}...")
|
|
|
|
tr = test_fn(cfg)
|
|
if isinstance(tr, list):
|
|
result.results.extend(tr)
|
|
else:
|
|
result.results.append(tr)
|
|
|
|
if verbose:
|
|
if isinstance(tr, list):
|
|
for r in tr:
|
|
s = "✓" if r.passed else "✗"
|
|
print(f" {s} {r.name}: {r.detail} ({r.duration_s:.1f}s)")
|
|
else:
|
|
s = "✓" if tr.passed else "✗"
|
|
print(f" {s} {tr.name}: {tr.detail} ({tr.duration_s:.1f}s)")
|
|
|
|
# Param sweep
|
|
if verbose:
|
|
print(f"\n[{ts()}] Running: parameter sweep...")
|
|
sweep_results = test_param_sweep(cfg)
|
|
result.results.extend(sweep_results)
|
|
if verbose:
|
|
for r in sweep_results:
|
|
s = "✓" if r.passed else "✗"
|
|
print(f" {s} {r.name}: {r.detail} ({r.duration_s:.1f}s)")
|
|
|
|
return result
|
|
|
|
|
|
def print_summary(results: list[SuiteResult]):
|
|
"""Print a final summary across all models."""
|
|
print(f"\n\n{'='*60}")
|
|
print("FINAL SUMMARY")
|
|
print(f"{'='*60}")
|
|
|
|
for sr in results:
|
|
passed = sr.passed
|
|
total = sr.total
|
|
pct = (passed / total * 100) if total else 0
|
|
label = sr.model.split("/")[-1]
|
|
print(f"\n {label}: {passed}/{total} passed ({pct:.0f}%)")
|
|
|
|
for r in sr.results:
|
|
if not r.passed:
|
|
print(f" ✗ {r.name}: {r.detail[:80]}")
|
|
|
|
# Cross-model comparison for key tests
|
|
print(f"\n{'─'*60}")
|
|
print("CROSS-MODEL COMPARISON")
|
|
print(f"{'─'*60}")
|
|
key_tests = [
|
|
"basic non-stream",
|
|
"basic stream",
|
|
"tool call non-stream",
|
|
"tool call stream",
|
|
"tool response flow",
|
|
"tool response flow (stream)",
|
|
"streaming tool chunking",
|
|
"bad tool schema (properties=[])",
|
|
"nested bad schema (items.properties=[])",
|
|
]
|
|
|
|
# Calculate column width
|
|
labels = [sr.model.split("/")[-1][:18] for sr in results]
|
|
col_w = max(len(l) for l in labels) if labels else 16
|
|
col_w = max(col_w, 16)
|
|
|
|
header = f"{'Test':<40}"
|
|
for l in labels:
|
|
header += f" {l:>{col_w}}"
|
|
print(header)
|
|
print("─" * len(header))
|
|
|
|
for test_name in key_tests:
|
|
row = f"{test_name:<40}"
|
|
for sr in results:
|
|
match = [r for r in sr.results if r.name == test_name]
|
|
if match:
|
|
status = "✓" if match[0].passed else "✗"
|
|
row += f" {status:>{col_w}}"
|
|
else:
|
|
row += f" {'—':>{col_w}}"
|
|
print(row)
|
|
|
|
print(f"\n{'='*60}")
|
|
|
|
|
|
# ── CLI ──────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Universal model tool-call test suite")
|
|
parser.add_argument("--all", action="store_true", help="Test all models from models.env")
|
|
parser.add_argument("--model", type=int, help="Test model by 1-based index from models.env")
|
|
parser.add_argument("--filter", type=str, help="Test models matching substring")
|
|
parser.add_argument("--quiet", action="store_true", help="Less output per test")
|
|
args = parser.parse_args()
|
|
|
|
models_path = Path(__file__).parent / "models.env"
|
|
|
|
configs: list[ModelConfig] = []
|
|
|
|
if args.all:
|
|
if not models_path.exists():
|
|
print("ERROR: models.env not found")
|
|
sys.exit(1)
|
|
configs = load_models_env(models_path)
|
|
elif args.model:
|
|
if not models_path.exists():
|
|
print("ERROR: models.env not found")
|
|
sys.exit(1)
|
|
all_configs = load_models_env(models_path)
|
|
if args.model < 1 or args.model > len(all_configs):
|
|
print(f"ERROR: --model index {args.model} out of range (1-{len(all_configs)})")
|
|
sys.exit(1)
|
|
configs = [all_configs[args.model - 1]]
|
|
elif args.filter:
|
|
if not models_path.exists():
|
|
print("ERROR: models.env not found")
|
|
sys.exit(1)
|
|
all_configs = load_models_env(models_path)
|
|
configs = [c for c in all_configs if args.filter.lower() in c.model.lower()]
|
|
if not configs:
|
|
print(f"No models matching '{args.filter}'")
|
|
sys.exit(1)
|
|
else:
|
|
cfg = config_from_env()
|
|
if cfg:
|
|
configs = [cfg]
|
|
else:
|
|
print("No model specified. Use --all, --model N, --filter NAME, or set TOOLTEST_* env vars.")
|
|
if models_path.exists():
|
|
print("\nAvailable models from models.env:")
|
|
for i, c in enumerate(load_models_env(models_path), 1):
|
|
print(f" {i}. {c.model} @ {c.api_base}")
|
|
sys.exit(1)
|
|
|
|
all_results: list[SuiteResult] = []
|
|
for cfg in configs:
|
|
sr = run_suite(cfg, verbose=not args.quiet)
|
|
all_results.append(sr)
|
|
|
|
print_summary(all_results)
|
|
|
|
if any(sr.passed < sr.total for sr in all_results):
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|