#!/usr/bin/env python3 """ Prepare tool-calling training data for SmolLM3-3B LoRA fine-tuning. Combines three datasets: 1. interstellarninja/tool-calls-multiturn 2. NousResearch/Hermes-Function-Calling-V1 3. Salesforce/xLAM-function-calling-60k Converts all to SmolLM3's native chat format with proper special tokens: - Tool calls wrapped in startPos/endPos tokens (IDs 128002/128016) - Tool responses wrapped in eni/eni_result tokens (IDs 128013/128014) - Thinking wrapped in think_start/think_end tags Output: train.jsonl, val.jsonl (tokenized & raw) """ import json import random import re from pathlib import Path from datasets import load_dataset # SmolLM3 special tokens (match the fixed chat_template.jinja) TOOL_CALL_START = "<|tool_call_start|>" # token 128002 TOOL_CALL_END = "<|tool_call_end|>" # token 128016 TOOL_RESP_START = "<|tool_response_start|>" # token 128013 TOOL_RESP_END = "<|tool_response_end|>" # token 128014 THINK_START = "" THINK_END = "" VAL_FRACTION = 0.05 SEED = 42 def render_tool_calls(tool_calls: list[dict]) -> str: """Render tool_calls list into SmolLM3's native format.""" parts = [] for tc in tool_calls: name = tc["function"]["name"] args = tc["function"]["arguments"] if isinstance(args, str): args_str = args else: args_str = json.dumps(args, ensure_ascii=False) parts.append(f'{{"name": "{name}", "arguments": {args_str}}}') body = "\n".join(parts) return f"{TOOL_CALL_START}\n{body}\n{TOOL_CALL_END}" def render_tool_response(content: str) -> str: """Wrap tool response content in SmolLM3's tool_response tokens.""" return f"{TOOL_RESP_START}\n{content}\n{TOOL_RESP_END}" def convert_openai_messages(messages: list[dict], tools: list[dict] | None = None) -> list[dict]: """Convert standard OpenAI-format messages to SmolLM3 native format. Transforms: - assistant.tool_calls → content with startPos/endPos tokens - tool role messages → user role with eni/eni_result tokens - Adds system prompt with tool definitions if tools present """ converted = [] # Build system message with tool defs if present if tools: tool_defs = "\n".join(json.dumps(t, ensure_ascii=False) for t in tools) system_content = ( "You are a helpful AI assistant named SmolLM, trained by Hugging Face.\n\n" "### Tools\n\n" "You may call one or more functions to assist with the user query.\n" "You are provided with function signatures within XML tags:\n\n" f"\n{tool_defs}\n\n\n" 'For each function call, return a json object with function name and arguments within ' f'{TOOL_CALL_START} {TOOL_CALL_END} tags:\n' f'{TOOL_CALL_START}\n{{"name": , "arguments": }}\n{TOOL_CALL_END}' ) converted.append({"role": "system", "content": system_content}) elif messages and messages[0].get("role") == "system": converted.append({"role": "system", "content": messages[0]["content"]}) messages = messages[1:] else: converted.append({ "role": "system", "content": "You are a helpful AI assistant named SmolLM, trained by Hugging Face." }) for msg in messages: role = msg.get("role", "user") if role == "user": converted.append({"role": "user", "content": msg["content"]}) elif role == "assistant": content = msg.get("content") or "" tool_calls = msg.get("tool_calls") if tool_calls: tc_text = render_tool_calls(tool_calls) full_content = f"{content}\n{tc_text}" if content else tc_text converted.append({"role": "assistant", "content": full_content}) else: converted.append({"role": "assistant", "content": content}) elif role == "tool": # Tool responses become user messages with eni/eni_result tokens content = msg.get("content", "") if isinstance(content, list): content = " ".join(c.get("text", "") for c in content if isinstance(c, dict)) converted.append({ "role": "user", "content": render_tool_response(str(content)) }) return converted def load_multiturn_dataset() -> list[dict]: """Load interstellarninja/tool-calls-multiturn.""" print("Loading interstellarninja/tool-calls-multiturn ...") ds = load_dataset("interstellarninja/tool-calls-multiturn", split="train") samples = [] for row in ds: messages = row.get("messages", []) tools = row.get("tools") if not messages or not any(m.get("tool_calls") for m in messages if m.get("role") == "assistant"): continue # skip conversations with no tool calls converted = convert_openai_messages(messages, tools) samples.append({"messages": converted}) print(f" → {len(samples)} samples with tool calls") return samples def load_hermes_fc_dataset() -> list[dict]: """Load NousResearch/Hermes-Function-Calling-V1.""" print("Loading NousResearch/Hermes-Function-Calling-V1 ...") ds = load_dataset("NousResearch/Hermes-Function-Calling-V1", split="train") samples = [] for row in ds: messages = row.get("messages", []) tools = row.get("tools") if not messages or not any(m.get("tool_calls") for m in messages if m.get("role") == "assistant"): continue converted = convert_openai_messages(messages, tools) samples.append({"messages": converted}) print(f" → {len(samples)} samples with tool calls") return samples def load_xlam_dataset() -> list[dict]: """Load Salesforce/xLAM-function-calling-60k. This dataset uses a different format: each row has 'tools', 'instruction', and 'outputs'. We convert to conversation format. """ print("Loading Salesforce/xLAM-function-calling-60k ...") ds = load_dataset("Salesforce/xLAM-function-calling-60k", split="train") samples = [] for row in ds: tools_raw = row.get("tools", "[]") instruction = row.get("instruction", "") outputs = row.get("answers", row.get("outputs", "")) if not instruction or not outputs: continue try: tools_list = json.loads(tools_raw) if isinstance(tools_raw, str) else tools_raw except json.JSONDecodeError: continue if not tools_list: continue # Parse the model output — may contain one or more tool calls try: output_parsed = json.loads(outputs) if isinstance(outputs, str) else outputs except json.JSONDecodeError: continue # Build messages messages = [{"role": "user", "content": instruction}] if isinstance(output_parsed, list): # Multiple tool calls tool_calls = [] for item in output_parsed: if isinstance(item, dict) and "name" in item: tool_calls.append({ "function": { "name": item["name"], "arguments": item.get("arguments", item.get("parameters", {})) } }) if tool_calls: messages.append({"role": "assistant", "tool_calls": tool_calls, "content": ""}) elif isinstance(output_parsed, dict) and "name" in output_parsed: messages.append({ "role": "assistant", "tool_calls": [{ "function": { "name": output_parsed["name"], "arguments": output_parsed.get("arguments", output_parsed.get("parameters", {})) } }], "content": "" }) else: continue converted = convert_openai_messages(messages, tools_list) samples.append({"messages": converted}) print(f" → {len(samples)} samples with tool calls") return samples def tokenize_sample(sample: dict, tokenizer) -> dict | None: """Tokenize a sample using the model's chat template. Returns dict with input_ids, attention_mask, labels (with system/user masked to -100). """ messages = sample["messages"] try: text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=False, ) enc = tokenizer(text, truncation=True, max_length=4096) except Exception as e: print(f" ⚠ Tokenization failed: {e}") return None input_ids = enc["input_ids"] attention_mask = enc["attention_mask"] # Build labels: mask system + user tokens, only train on assistant responses labels = [-100] * len(input_ids) # Find assistant turn boundaries in the raw text # We'll use a simpler approach: decode chunks and find assistant markers ASSISTANT_START = "<|im_start|>assistant\n" IM_END = "<|im_end|>" # Find all assistant spans in the tokenized text by decoding ranges text_for_search = text pos = 0 while True: start_idx = text_for_search.find(ASSISTANT_START, pos) if start_idx == -1: break end_idx = text_for_search.find(IM_END, start_idx + len(ASSISTANT_START)) if end_idx == -1: end_idx = len(text_for_search) # Map character offsets to token offsets # Approximate: count characters up to start/end, find token boundaries char_to_start = start_idx + len(ASSISTANT_START) # skip the marker itself char_to_end = end_idx + len(IM_END) # Use tokenizer offset mapping if available enc_with_offsets = tokenizer(text, truncation=True, max_length=4096, return_offsets_mapping=True) offsets = enc_with_offsets.get("offset_mapping", None) if offsets: tok_start = None tok_end = None for ti, (cs, ce) in enumerate(offsets): if cs >= char_to_start and tok_start is None: tok_start = ti if ce >= char_to_end: tok_end = ti + 1 break if tok_start is not None and tok_end is not None: for i in range(tok_start, min(tok_end, len(labels))): labels[i] = input_ids[i] pos = end_idx + 1 return { "input_ids": input_ids, "attention_mask": attention_mask, "labels": labels, } def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--output-dir", type=str, default="/data/processed") parser.add_argument("--max-samples", type=int, default=0, help="Limit total samples (0=all)") parser.add_argument("--tokenize", action="store_true", help="Also produce tokenized versions") parser.add_argument("--model", type=str, default="HuggingFaceTB/SmolLM3-3B") args = parser.parse_args() output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Load all datasets all_samples = [] all_samples.extend(load_multiturn_dataset()) all_samples.extend(load_hermes_fc_dataset()) all_samples.extend(load_xlam_dataset()) print(f"\nTotal raw samples: {len(all_samples)}") # Shuffle & split random.seed(SEED) random.shuffle(all_samples) if args.max_samples > 0: all_samples = all_samples[:args.max_samples] val_count = max(1, int(len(all_samples) * VAL_FRACTION)) val_samples = all_samples[:val_count] train_samples = all_samples[val_count:] print(f"Train: {len(train_samples)}, Val: {len(val_samples)}") # Write raw JSONL for split_name, split_data in [("train", train_samples), ("val", val_samples)]: path = output_dir / f"{split_name}.jsonl" with open(path, "w") as f: for s in split_data: f.write(json.dumps(s, ensure_ascii=False) + "\n") print(f"Wrote {path}") # Optionally tokenize if args.tokenize: print(f"\nTokenizing with {args.model} ...") from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(args.model) for split_name, split_data in [("train", train_samples), ("val", val_samples)]: tok_path = output_dir / f"{split_name}_tokenized.jsonl" count = 0 with open(tok_path, "w") as f: for s in split_data: tok = tokenize_sample(s, tokenizer) if tok: f.write(json.dumps(tok) + "\n") count += 1 print(f"Wrote {tok_path} ({count} samples)") if __name__ == "__main__": main()