#!/usr/bin/env python3 """ Universal model tool-call test suite. Tests any OpenAI-compatible endpoint for: 1. Basic chat (non-streaming + streaming) 2. Tool calls (non-streaming + streaming) 3. Multi-turn tool response flow (non-streaming + streaming) 4. Nested/bad tool schema handling (SGLang compatibility) 5. Streaming tool call chunking (are args actually streamed?) 6. Param sweep (what vLLM params does the endpoint accept?) Handles reasoning models (content in 'reasoning' field, null 'content'), different finish_reason values, and empty/tool_calls arrays gracefully. Usage: TOOLTEST_API_BASE=... TOOLTEST_API_KEY=... TOOLTEST_MODEL=... python3 run_suite.py python3 run_suite.py --all python3 run_suite.py --model 1 python3 run_suite.py --filter Devstral """ import os import sys import json import time import httpx import argparse from datetime import datetime from pathlib import Path from dataclasses import dataclass, field # ── Helpers ────────────────────────────────────────────────── def ts(): return datetime.now().strftime("%H:%M:%S.%f")[:-3] def safe_choice(body: dict, index: int = 0) -> dict: """Safely get a choice from a response body.""" choices = body.get("choices") or [] if index < len(choices): return choices[index] return {} def safe_message(body: dict) -> dict: """Safely get the message from the first choice.""" return safe_choice(body).get("message") or {} def safe_delta(chunk: dict) -> dict: """Safely get the delta from the first choice of a streaming chunk.""" choices = chunk.get("choices") or [] if choices: return choices[0].get("delta") or {} return {} def extract_content(msg: dict) -> tuple[str, str]: """Extract (content, reasoning) from a message, handling nulls.""" content = msg.get("content") or "" reasoning = msg.get("reasoning") or "" return content, reasoning # ── Config ─────────────────────────────────────────────────── @dataclass class ModelConfig: api_base: str api_key: str model: str @property def label(self): return self.model.split("/")[-1] def load_models_env(path: Path) -> list[ModelConfig]: """Load models from the models.env file (pipe-delimited).""" configs = [] for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue parts = [p.strip() for p in line.split("|")] if len(parts) >= 3: configs.append(ModelConfig(api_base=parts[0], api_key=parts[1], model=parts[2])) return configs def config_from_env() -> ModelConfig | None: """Get a single config from TOOLTEST_* environment variables.""" base = os.environ.get("TOOLTEST_API_BASE") key = os.environ.get("TOOLTEST_API_KEY") model = os.environ.get("TOOLTEST_MODEL") if base and key and model: return ModelConfig(api_base=base, api_key=key, model=model) return None # ── Test result types ──────────────────────────────────────── @dataclass class TestResult: name: str passed: bool detail: str = "" duration_s: float = 0.0 @dataclass class SuiteResult: model: str results: list[TestResult] = field(default_factory=list) @property def passed(self): return sum(1 for r in self.results if r.passed) @property def total(self): return len(self.results) def make_client(cfg: ModelConfig) -> httpx.Client: return httpx.Client( timeout=120.0, headers={ "Authorization": f"Bearer {cfg.api_key}", "Content-Type": "application/json", }, ) # ── Shared tool definitions ────────────────────────────────── WEATHER_TOOL = { "type": "function", "function": { "name": "get_weather", "description": "Get the current weather for a location", "parameters": { "type": "object", "properties": { "location": {"type": "string", "description": "City, e.g. 'Tokyo'"} }, "required": ["location"] } } } WRITE_FILE_TOOL = { "type": "function", "function": { "name": "write_file", "description": "Write content to a file.", "parameters": { "type": "object", "properties": { "filename": {"type": "string", "description": "Name of the file"}, "content": {"type": "string", "description": "The content to write"} }, "required": ["filename", "content"] } } } BAD_SCHEMA_TOOL = { "type": "function", "function": { "name": "web_search", "description": "Search the web", "parameters": { "type": "object", "properties": [] # Invalid — should be {} } } } NESTED_BAD_SCHEMA_TOOL = { "type": "function", "function": { "name": "message", "description": "Send a message", "parameters": { "type": "object", "properties": { "fields": { "type": "array", "items": { "type": "object", "properties": [] # Invalid — should be {} } } } } } } # ── Test functions ─────────────────────────────────────────── def test_basic_nonstream(cfg: ModelConfig) -> TestResult: """1. Basic non-streaming chat.""" with make_client(cfg) as c: start = time.time() try: r = c.post(f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": [{"role": "user", "content": "Say hello in one word."}], "stream": False, "max_tokens": 64, }) body = r.json() dur = time.time() - start if r.status_code != 200: return TestResult("basic non-stream", False, f"HTTP {r.status_code}: {json.dumps(body)[:200]}", dur) content, reasoning = extract_content(safe_message(body)) fr = safe_choice(body).get("finish_reason", "?") if content: return TestResult("basic non-stream", True, f"Got: {content[:80]}", dur) elif reasoning: return TestResult("basic non-stream", True, f"Reasoning-only (finish: {fr}): {reasoning[:80]}", dur) else: return TestResult("basic non-stream", False, f"Empty response (finish: {fr})", dur) except Exception as e: return TestResult("basic non-stream", False, f"Exception: {e}", time.time() - start) def test_basic_stream(cfg: ModelConfig) -> TestResult: """2. Basic streaming chat.""" with make_client(cfg) as c: start = time.time() try: with c.stream("POST", f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": [{"role": "user", "content": "Count from 1 to 5."}], "stream": True, "max_tokens": 64, }) as r: if r.status_code != 200: body = "".join(r.iter_lines()) dur = time.time() - start return TestResult("basic stream", False, f"HTTP {r.status_code}: {body[:200]}", dur) full_content = "" full_reasoning = "" for line in r.iter_lines(): if not line or line == "data: [DONE]": continue if line.startswith("data: "): try: chunk = json.loads(line[6:]) delta = safe_delta(chunk) if delta.get("content"): full_content += delta["content"] if delta.get("reasoning"): full_reasoning += delta["reasoning"] except json.JSONDecodeError: pass dur = time.time() - start if full_content: return TestResult("basic stream", True, f"Got: {full_content[:80]}", dur) elif full_reasoning: return TestResult("basic stream", True, f"Reasoning-only: {full_reasoning[:80]}", dur) else: return TestResult("basic stream", False, "No content or reasoning received", dur) except Exception as e: return TestResult("basic stream", False, f"Exception: {e}", time.time() - start) def test_toolcall_nonstream(cfg: ModelConfig) -> TestResult: """3. Tool call — non-streaming.""" with make_client(cfg) as c: start = time.time() try: r = c.post(f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": [{"role": "user", "content": "What's the weather in Tokyo?"}], "tools": [WEATHER_TOOL], "tool_choice": "auto", "stream": False, "max_tokens": 256, }) body = r.json() dur = time.time() - start if r.status_code != 200: return TestResult("tool call non-stream", False, f"HTTP {r.status_code}: {json.dumps(body)[:200]}", dur) msg = safe_message(body) tool_calls = msg.get("tool_calls") or [] if tool_calls: tc = tool_calls[0] fn = tc.get("function", {}) return TestResult("tool call non-stream", True, f"Tool: {fn.get('name','?')}, args: {fn.get('arguments','')[:60]}", dur) else: content, reasoning = extract_content(msg) out = content or reasoning or "(empty)" return TestResult("tool call non-stream", False, f"No tool call. Response: {out[:100]}", dur) except Exception as e: return TestResult("tool call non-stream", False, f"Exception: {e}", time.time() - start) def test_toolcall_stream(cfg: ModelConfig) -> TestResult: """4. Tool call — streaming.""" with make_client(cfg) as c: start = time.time() try: with c.stream("POST", f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": [{"role": "user", "content": "What's the weather in Tokyo?"}], "tools": [WEATHER_TOOL], "tool_choice": "auto", "stream": True, "max_tokens": 256, }) as r: if r.status_code != 200: body = "".join(r.iter_lines()) dur = time.time() - start return TestResult("tool call stream", False, f"HTTP {r.status_code}", dur) tool_name = None accumulated_args = "" content_parts = "" reasoning_parts = "" for line in r.iter_lines(): if not line or line == "data: [DONE]": continue if line.startswith("data: "): try: chunk = json.loads(line[6:]) delta = safe_delta(chunk) tc_list = delta.get("tool_calls") or [] for tc in tc_list: fn = tc.get("function") or {} if fn.get("name"): tool_name = fn["name"] if fn.get("arguments"): accumulated_args += fn["arguments"] if delta.get("content"): content_parts += delta["content"] if delta.get("reasoning"): reasoning_parts += delta["reasoning"] except json.JSONDecodeError: pass dur = time.time() - start if tool_name: return TestResult("tool call stream", True, f"Tool: {tool_name}, args: {accumulated_args[:60]}", dur) else: out = content_parts or reasoning_parts or "(empty)" return TestResult("tool call stream", False, f"No tool call. Response: {out[:100]}", dur) except Exception as e: return TestResult("tool call stream", False, f"Exception: {e}", time.time() - start) def test_tool_response_flow(cfg: ModelConfig, streaming: bool = False) -> TestResult: """5/6. Full tool call → response → follow-up flow.""" label = "tool response flow (stream)" if streaming else "tool response flow" with make_client(cfg) as c: start = time.time() try: messages = [{"role": "user", "content": "What's the weather in Tokyo?"}] # Step 1: Get tool call if not streaming: r = c.post(f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": messages, "tools": [WEATHER_TOOL], "tool_choice": "auto", "stream": False, "max_tokens": 256, }) body = r.json() if r.status_code != 200: return TestResult(label, False, f"Step 1 HTTP {r.status_code}", time.time() - start) msg = safe_message(body) else: tool_name = None tool_id = None accumulated_args = "" with c.stream("POST", f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": messages, "tools": [WEATHER_TOOL], "tool_choice": "auto", "stream": True, "max_tokens": 256, }) as r: if r.status_code != 200: return TestResult(label, False, f"Step 1 HTTP {r.status_code}", time.time() - start) for line in r.iter_lines(): if not line or line == "data: [DONE]": continue if line.startswith("data: "): try: chunk = json.loads(line[6:]) delta = safe_delta(chunk) for tc in (delta.get("tool_calls") or []): if tc.get("id"): tool_id = tc["id"] fn = tc.get("function") or {} if fn.get("name"): tool_name = fn["name"] if fn.get("arguments"): accumulated_args += fn["arguments"] except json.JSONDecodeError: pass if not tool_name: return TestResult(label, False, "No tool call in step 1", time.time() - start) msg = { "role": "assistant", "tool_calls": [{ "id": tool_id or "call_0", "type": "function", "function": {"name": tool_name, "arguments": accumulated_args} }] } tool_calls = msg.get("tool_calls") or [] if not tool_calls: return TestResult(label, False, "No tool call in step 1", time.time() - start) tc = tool_calls[0] tc_id = tc.get("id", "call_0") # Step 2: Send tool response messages.append(msg) messages.append({ "role": "tool", "tool_call_id": tc_id, "content": json.dumps({"location": "Tokyo", "temperature": "22°C", "condition": "Partly cloudy"}), }) # Step 3: Get follow-up r2 = c.post(f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": messages, "tools": [WEATHER_TOOL], "stream": False, "max_tokens": 256, }) body2 = r2.json() dur = time.time() - start if r2.status_code != 200: return TestResult(label, False, f"Step 3 HTTP {r2.status_code}", dur) final_msg = safe_message(body2) final_content, final_reasoning = extract_content(final_msg) final = final_content or final_reasoning or "" # Check the model actually used the tool data ok = "22" in final indicators = ["i don't have", "i cannot access", "don't have access", "cannot provide real-time"] for ind in indicators: if ind in final.lower(): ok = False break if not final_content and final_reasoning: return TestResult(label, ok, f"Reasoning-only (used data: {'yes' if ok else 'no'}) — {final[:100]}", dur) return TestResult(label, ok, f"{'Used' if ok else 'Did NOT use'} tool result — {final[:100]}", dur) except Exception as e: return TestResult(label, False, f"Exception: {e}", time.time() - start) def test_bad_tool_schema(cfg: ModelConfig) -> TestResult: """7. OpenClaw-style tool with properties=[] (tests schema validation/middleware).""" with make_client(cfg) as c: start = time.time() try: r = c.post(f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": [{"role": "user", "content": "Search for cats"}], "tools": [BAD_SCHEMA_TOOL], "tool_choice": "auto", "stream": False, "max_tokens": 128, }) body = r.json() dur = time.time() - start if r.status_code != 200: err = "" try: err = body.get("error", {}).get("message", "")[:150] except Exception: err = json.dumps(body)[:150] return TestResult("bad tool schema (properties=[])", False, f"HTTP {r.status_code}: {err}", dur) return TestResult("bad tool schema (properties=[])", True, "Endpoint accepted/fixed bad schema", dur) except Exception as e: return TestResult("bad tool schema (properties=[])", False, f"Exception: {e}", time.time() - start) def test_nested_bad_schema(cfg: ModelConfig) -> TestResult: """8. Nested properties=[] inside items (the Tool 21 bug).""" with make_client(cfg) as c: start = time.time() try: r = c.post(f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": [{"role": "user", "content": "Send a message to Bob"}], "tools": [NESTED_BAD_SCHEMA_TOOL], "tool_choice": "auto", "stream": False, "max_tokens": 128, }) body = r.json() dur = time.time() - start if r.status_code != 200: err = "" try: err = body.get("error", {}).get("message", "")[:150] except Exception: err = json.dumps(body)[:150] return TestResult("nested bad schema (items.properties=[])", False, f"HTTP {r.status_code}: {err}", dur) return TestResult("nested bad schema (items.properties=[])", True, "Endpoint accepted/fixed nested bad schema", dur) except Exception as e: return TestResult("nested bad schema (items.properties=[])", False, f"Exception: {e}", time.time() - start) def test_streaming_tool_chunks(cfg: ModelConfig) -> TestResult: """9. Streaming tool call chunking — are args actually streamed in multiple chunks?""" with make_client(cfg) as c: start = time.time() try: with c.stream("POST", f"{cfg.api_base}/chat/completions", json={ "model": cfg.model, "messages": [{ "role": "user", "content": "Write a Python hello world and save it using the write_file tool." }], "tools": [WRITE_FILE_TOOL], "tool_choice": "auto", "stream": True, "max_tokens": 1024, }) as r: if r.status_code != 200: dur = time.time() - start return TestResult("streaming tool chunking", False, f"HTTP {r.status_code}", dur) tool_name = None arg_chunks = 0 accumulated_args = "" content_chunks = 0 reasoning_chunks = 0 for line in r.iter_lines(): if not line or line == "data: [DONE]": continue if line.startswith("data: "): try: chunk = json.loads(line[6:]) delta = safe_delta(chunk) for tc in (delta.get("tool_calls") or []): fn = tc.get("function") or {} if fn.get("name"): tool_name = fn["name"] if fn.get("arguments"): arg_chunks += 1 accumulated_args += fn["arguments"] if delta.get("content"): content_chunks += 1 if delta.get("reasoning"): reasoning_chunks += 1 except json.JSONDecodeError: pass dur = time.time() - start if not tool_name: if content_chunks > 0 or reasoning_chunks > 0: return TestResult("streaming tool chunking", False, f"No tool call — model produced {content_chunks} content + {reasoning_chunks} reasoning chunks", dur) return TestResult("streaming tool chunking", False, "No tool call and no content", dur) # Evaluate chunking quality if arg_chunks > 1: return TestResult("streaming tool chunking", True, f"Args streamed in {arg_chunks} chunks ({len(accumulated_args)} chars)", dur) elif arg_chunks == 1 and len(accumulated_args) > 500: return TestResult("streaming tool chunking", False, f"Args in 1 chunk but {len(accumulated_args)} chars — buffered, not streamed", dur) elif arg_chunks == 1: return TestResult("streaming tool chunking", True, f"Args in 1 chunk ({len(accumulated_args)} chars — may be too short to stream)", dur) else: return TestResult("streaming tool chunking", False, "Tool name only, no arg chunks", dur) except Exception as e: return TestResult("streaming tool chunking", False, f"Exception: {e}", time.time() - start) def test_param_sweep(cfg: ModelConfig) -> list[TestResult]: """10. Parameter sweep — which vLLM params does the endpoint accept?""" results = [] base_req = { "model": cfg.model, "messages": [{"role": "user", "content": "Say hi."}], "stream": False, "max_tokens": 32, } extra_params = [ ("chat_template_kwargs", {"enable_thinking": False}), ("guided_json", None), ("guided_regex", None), ("response_format", {"type": "json_object"}), ("n", 1), ("presence_penalty", 0.0), ("frequency_penalty", 0.0), ("top_p", 1.0), ("temperature", 0.7), ("seed", 42), ("stop", ["\n"]), ("logprobs+top_logprobs", {"logprobs": True, "top_logprobs": 5}), ] with make_client(cfg) as c: for name, val in extra_params: start = time.time() try: if isinstance(val, dict): req = {**base_req, **val} else: req = {**base_req, name: val} r = c.post(f"{cfg.api_base}/chat/completions", json=req) dur = time.time() - start ok = r.status_code == 200 detail = f"HTTP {r.status_code}" if not ok: try: detail += f": {r.json().get('error', {}).get('message', '')[:80]}" except Exception: pass results.append(TestResult(f"param: {name}", ok, detail, dur)) except Exception as e: results.append(TestResult(f"param: {name}", False, f"Exception: {e}", time.time() - start)) return results # ── Suite runner ───────────────────────────────────────────── ALL_TESTS = [ test_basic_nonstream, test_basic_stream, test_toolcall_nonstream, test_toolcall_stream, lambda cfg: test_tool_response_flow(cfg, streaming=False), lambda cfg: test_tool_response_flow(cfg, streaming=True), test_bad_tool_schema, test_nested_bad_schema, test_streaming_tool_chunks, ] def run_suite(cfg: ModelConfig, verbose: bool = True) -> SuiteResult: """Run the full test suite against one model config.""" result = SuiteResult(model=cfg.model) print(f"\n{'='*60}") print(f"Testing: {cfg.model}") print(f"API: {cfg.api_base}") print(f"{'='*60}") for test_fn in ALL_TESTS: name = (test_fn.__doc__ or "").strip().split("\n")[0] or test_fn.__name__ if verbose: print(f"\n[{ts()}] Running: {name}...") tr = test_fn(cfg) if isinstance(tr, list): result.results.extend(tr) else: result.results.append(tr) if verbose: if isinstance(tr, list): for r in tr: s = "✓" if r.passed else "✗" print(f" {s} {r.name}: {r.detail} ({r.duration_s:.1f}s)") else: s = "✓" if tr.passed else "✗" print(f" {s} {tr.name}: {tr.detail} ({tr.duration_s:.1f}s)") # Param sweep if verbose: print(f"\n[{ts()}] Running: parameter sweep...") sweep_results = test_param_sweep(cfg) result.results.extend(sweep_results) if verbose: for r in sweep_results: s = "✓" if r.passed else "✗" print(f" {s} {r.name}: {r.detail} ({r.duration_s:.1f}s)") return result def print_summary(results: list[SuiteResult]): """Print a final summary across all models.""" print(f"\n\n{'='*60}") print("FINAL SUMMARY") print(f"{'='*60}") for sr in results: passed = sr.passed total = sr.total pct = (passed / total * 100) if total else 0 label = sr.model.split("/")[-1] print(f"\n {label}: {passed}/{total} passed ({pct:.0f}%)") for r in sr.results: if not r.passed: print(f" ✗ {r.name}: {r.detail[:80]}") # Cross-model comparison for key tests print(f"\n{'─'*60}") print("CROSS-MODEL COMPARISON") print(f"{'─'*60}") key_tests = [ "basic non-stream", "basic stream", "tool call non-stream", "tool call stream", "tool response flow", "tool response flow (stream)", "streaming tool chunking", "bad tool schema (properties=[])", "nested bad schema (items.properties=[])", ] # Calculate column width labels = [sr.model.split("/")[-1][:18] for sr in results] col_w = max(len(l) for l in labels) if labels else 16 col_w = max(col_w, 16) header = f"{'Test':<40}" for l in labels: header += f" {l:>{col_w}}" print(header) print("─" * len(header)) for test_name in key_tests: row = f"{test_name:<40}" for sr in results: match = [r for r in sr.results if r.name == test_name] if match: status = "✓" if match[0].passed else "✗" row += f" {status:>{col_w}}" else: row += f" {'—':>{col_w}}" print(row) print(f"\n{'='*60}") # ── CLI ────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Universal model tool-call test suite") parser.add_argument("--all", action="store_true", help="Test all models from models.env") parser.add_argument("--model", type=int, help="Test model by 1-based index from models.env") parser.add_argument("--filter", type=str, help="Test models matching substring") parser.add_argument("--quiet", action="store_true", help="Less output per test") args = parser.parse_args() models_path = Path(__file__).parent / "models.env" configs: list[ModelConfig] = [] if args.all: if not models_path.exists(): print("ERROR: models.env not found") sys.exit(1) configs = load_models_env(models_path) elif args.model: if not models_path.exists(): print("ERROR: models.env not found") sys.exit(1) all_configs = load_models_env(models_path) if args.model < 1 or args.model > len(all_configs): print(f"ERROR: --model index {args.model} out of range (1-{len(all_configs)})") sys.exit(1) configs = [all_configs[args.model - 1]] elif args.filter: if not models_path.exists(): print("ERROR: models.env not found") sys.exit(1) all_configs = load_models_env(models_path) configs = [c for c in all_configs if args.filter.lower() in c.model.lower()] if not configs: print(f"No models matching '{args.filter}'") sys.exit(1) else: cfg = config_from_env() if cfg: configs = [cfg] else: print("No model specified. Use --all, --model N, --filter NAME, or set TOOLTEST_* env vars.") if models_path.exists(): print("\nAvailable models from models.env:") for i, c in enumerate(load_models_env(models_path), 1): print(f" {i}. {c.model} @ {c.api_base}") sys.exit(1) all_results: list[SuiteResult] = [] for cfg in configs: sr = run_suite(cfg, verbose=not args.quiet) all_results.append(sr) print_summary(all_results) if any(sr.passed < sr.total for sr in all_results): sys.exit(1) if __name__ == "__main__": main()