[Bugfix][Model] Prevent special token leakage in KimiK2ToolParser streaming mode (#28543)

Signed-off-by: Jscaldwell55 <jay.s.caldwell@gmail.com>
This commit is contained in:
Jay Caldwell
2025-11-16 23:54:46 -06:00
committed by GitHub
parent 60e089f0b9
commit 6f37419244
2 changed files with 791 additions and 6 deletions

View File

@@ -34,8 +34,27 @@ class KimiK2ToolParser(ToolParser):
str
] = [] # map what has been streamed for each tool so far to a list
# Section-level state management to prevent token leakage
self.in_tool_section: bool = False
self.token_buffer: str = ""
# Buffer size: empirical worst-case for longest marker (~30 chars) * 2
# + safety margin for unicode + partial overlap. Prevents unbounded growth.
self.buffer_max_size: int = 1024
self.section_char_count: int = 0 # Track characters processed in tool section
self.max_section_chars: int = 8192 # Force exit if section exceeds this
self._buffer_overflow_logged: bool = False # Log overflow once per session
# Support both singular and plural variants
self.tool_calls_start_token: str = "<|tool_calls_section_begin|>"
self.tool_calls_end_token: str = "<|tool_calls_section_end|>"
self.tool_calls_start_token_variants: list[str] = [
"<|tool_calls_section_begin|>",
"<|tool_call_section_begin|>", # singular variant
]
self.tool_calls_end_token_variants: list[str] = [
"<|tool_calls_section_end|>",
"<|tool_call_section_end|>", # singular variant
]
self.tool_call_start_token: str = "<|tool_call_begin|>"
self.tool_call_end_token: str = "<|tool_call_end|>"
@@ -58,6 +77,18 @@ class KimiK2ToolParser(ToolParser):
self.tool_calls_start_token_id = self.vocab.get(self.tool_calls_start_token)
self.tool_calls_end_token_id = self.vocab.get(self.tool_calls_end_token)
# Get token IDs for all variants
self.tool_calls_start_token_ids: list[int] = [
tid
for variant in self.tool_calls_start_token_variants
if (tid := self.vocab.get(variant)) is not None
]
self.tool_calls_end_token_ids: list[int] = [
tid
for variant in self.tool_calls_end_token_variants
if (tid := self.vocab.get(variant)) is not None
]
self.tool_call_start_token_id = self.vocab.get(self.tool_call_start_token)
self.tool_call_end_token_id = self.vocab.get(self.tool_call_end_token)
@@ -70,6 +101,51 @@ class KimiK2ToolParser(ToolParser):
"tokens in the tokenizer!"
)
def _check_and_strip_markers(self, text: str) -> tuple[str, bool, bool]:
"""
Check for section begin/end markers in text and strip them.
Returns: (cleaned_text, found_section_begin, found_section_end)
"""
found_begin = False
found_end = False
cleaned = text
# Check for section begin markers (any variant)
for variant in self.tool_calls_start_token_variants:
if variant in cleaned:
cleaned = cleaned.replace(variant, "")
found_begin = True
# Check for section end markers (any variant)
for variant in self.tool_calls_end_token_variants:
if variant in cleaned:
cleaned = cleaned.replace(variant, "")
found_end = True
return cleaned, found_begin, found_end
def _reset_section_state(self) -> None:
"""Reset state when exiting tool section."""
self.in_tool_section = False
self.token_buffer = ""
self.section_char_count = 0
def reset_streaming_state(self) -> None:
"""
Reset all streaming state. Call this between requests to prevent
state leakage when parser instance is reused.
"""
# Reset section state
self._reset_section_state()
# Reset parent class state
self.current_tool_name_sent = False
self.prev_tool_call_arr = []
self.current_tool_id = -1
self.streamed_args_for_tool = []
logger.debug("Streaming state reset")
def extract_tool_calls(
self,
model_output: str,
@@ -131,13 +207,94 @@ class KimiK2ToolParser(ToolParser):
) -> DeltaMessage | None:
logger.debug("delta_text: %s", delta_text)
logger.debug("delta_token_ids: %s", delta_token_ids)
# check to see if we should be streaming a tool call - is there a
if self.tool_calls_start_token_id not in current_token_ids:
logger.debug("No tool call tokens found!")
return DeltaMessage(content=delta_text)
delta_text = delta_text.replace(self.tool_calls_start_token, "").replace(
self.tool_calls_end_token, ""
# Flag to defer section exit until after tool parsing completes
deferred_section_exit = False
# Add delta to buffer for split marker detection
self.token_buffer += delta_text
# Enforce buffer size limit to prevent memory issues
if len(self.token_buffer) > self.buffer_max_size:
if not self._buffer_overflow_logged:
logger.warning(
"Token buffer exceeded max size (%d bytes), flushing excess. "
"This may indicate very long markers or unusual tokenization.",
self.buffer_max_size,
)
self._buffer_overflow_logged = True
# Keep only the most recent content that might contain partial markers
self.token_buffer = self.token_buffer[-self.buffer_max_size // 2 :]
# Check buffer for section markers (handles split tokens)
buffered_text, found_section_begin, found_section_end = (
self._check_and_strip_markers(self.token_buffer)
)
# Track section state transitions
if found_section_begin and not self.in_tool_section:
logger.debug("Entering tool section")
self.in_tool_section = True
self.token_buffer = buffered_text # Use cleaned buffer
self.section_char_count = 0 # Reset counter for new section
if found_section_end and self.in_tool_section:
logger.debug("Detected section end marker")
# CRITICAL: Don't exit early if tool_call_end is in this chunk.
# Tool parser must emit final arguments/close first to avoid dropping
# the final tool update and leaking tokens into reasoning channel.
has_tool_end = self.tool_call_end_token_id in delta_token_ids
if has_tool_end:
# Defer exit until after tool parsing completes
deferred_section_exit = True
logger.debug("Deferring section exit: tool_call_end in same chunk")
self.token_buffer = buffered_text
else:
# No tool call ending, safe to exit immediately
logger.debug("Exiting tool section")
remaining = buffered_text
self._reset_section_state()
# Return remaining text as reasoning content if non-empty
if remaining.strip():
return DeltaMessage(content=remaining)
# Return empty delta to maintain function contract
# (always returns DeltaMessage)
return DeltaMessage(content="")
else:
self.token_buffer = buffered_text
# Check if any variant of section start token is in current_token_ids
has_section_token = any(
tid in current_token_ids for tid in self.tool_calls_start_token_ids
)
# Early return: if no section token detected yet, return as reasoning content
if not has_section_token and not self.in_tool_section:
logger.debug("No tool call tokens found!")
# Don't clear buffer - it needs to accumulate partial markers across deltas
# Buffer overflow is already protected by lines 215-224
return DeltaMessage(content=delta_text)
# Strip section markers from delta_text for subsequent processing
# NOTE: This preprocessing happens BEFORE the regex-based tool call
# parsing (from PR #24847) to ensure markers are removed cleanly
# before pattern matching. No double-stripping occurs because
# section markers and tool call markers are distinct.
delta_text, _, _ = self._check_and_strip_markers(delta_text)
# Error recovery: If in tool section for too long, force exit
if self.in_tool_section:
self.section_char_count += len(delta_text)
if self.section_char_count > self.max_section_chars:
logger.warning(
"Tool section exceeded max length (%d chars), forcing exit. "
"This may indicate malformed model output.",
self.max_section_chars,
)
self._reset_section_state()
# Deferred exit already handled by forced exit above
# Return remaining content as reasoning (or empty delta if no content)
return DeltaMessage(content=delta_text if delta_text.strip() else "")
try:
# figure out where we are in the parsing by counting tool call
# start & end tags
@@ -158,6 +315,16 @@ class KimiK2ToolParser(ToolParser):
and prev_tool_end_count == cur_tool_end_count
and self.tool_call_end_token not in delta_text
):
# CRITICAL FIX: Suppress content if in tool section but
# no tool calls started
if self.in_tool_section and cur_tool_start_count == 0:
logger.debug(
"In tool section but no tool calls started yet. "
"Suppressing: %s",
delta_text,
)
# Return empty delta to maintain iterator contract
return DeltaMessage(content="")
logger.debug("Generating text content! skipping tool parsing.")
return DeltaMessage(content=delta_text)
@@ -209,6 +376,9 @@ class KimiK2ToolParser(ToolParser):
):
if self.prev_tool_call_arr is None or len(self.prev_tool_call_arr) == 0:
logger.debug("attempting to close tool call, but no tool call")
# Handle deferred section exit before returning
if deferred_section_exit and self.in_tool_section:
self._reset_section_state()
return None
diff = self.prev_tool_call_arr[self.current_tool_id].get("arguments")
if diff:
@@ -218,6 +388,9 @@ class KimiK2ToolParser(ToolParser):
else diff
)
if '"}' not in delta_text:
# Handle deferred section exit before returning
if deferred_section_exit and self.in_tool_section:
self._reset_section_state()
return None
end_loc = delta_text.rindex('"}')
diff = delta_text[:end_loc] + '"}'
@@ -227,6 +400,10 @@ class KimiK2ToolParser(ToolParser):
diff,
)
self.streamed_args_for_tool[self.current_tool_id] += diff
# Handle deferred section exit before returning
if deferred_section_exit and self.in_tool_section:
logger.debug("Completing deferred section exit")
self._reset_section_state()
return DeltaMessage(
tool_calls=[
DeltaToolCall(
@@ -240,9 +417,19 @@ class KimiK2ToolParser(ToolParser):
# case -- otherwise we're just generating text
else:
# Check if we're in tool section - if so, suppress
if self.in_tool_section:
logger.debug("In tool section, suppressing text generation")
# Handle deferred section exit before returning
if deferred_section_exit:
self._reset_section_state()
return DeltaMessage(content="")
text = delta_text.replace(self.tool_call_start_token, "")
text = text.replace(self.tool_call_end_token, "")
delta = DeltaMessage(tool_calls=[], content=text)
# Handle deferred section exit before returning
if deferred_section_exit and self.in_tool_section:
self._reset_section_state()
return delta
current_tool_call = dict()
@@ -390,6 +577,11 @@ class KimiK2ToolParser(ToolParser):
else:
self.prev_tool_call_arr.append(current_tool_call)
# Handle deferred section exit after tool parsing completes
if deferred_section_exit and self.in_tool_section:
logger.debug("Completing deferred section exit")
self._reset_section_state()
return delta
except Exception: