[Bugfix] missing tokens occur in harmony streaming (#30437)

Signed-off-by: RioS <aa248424@gmail.com>
Signed-off-by: Ri0S <aa248424@gmail.com>
Co-authored-by: Chauncey <chaunceyjiang@gmail.com>
This commit is contained in:
RioS
2026-01-09 12:59:34 +09:00
committed by GitHub
parent 8413868dab
commit e2d49ec2a4
2 changed files with 13 additions and 7 deletions

View File

@@ -824,6 +824,7 @@ class StreamingHarmonyContext(HarmonyContext):
self.encoding = get_encoding()
self.last_tok = None
self.first_tok_of_message = True
self.last_content_delta = None
@property
def messages(self) -> list:
@@ -832,6 +833,7 @@ class StreamingHarmonyContext(HarmonyContext):
def append_output(self, output: RequestOutput) -> None:
# append_output is called for each output token in streaming case,
# so we only want to add the prompt tokens once for each message.
self.last_content_delta = None
if self.first_tok_of_message:
self._update_prefill_token_usage(output)
# Reset self.first_tok_of_message if needed:
@@ -839,8 +841,12 @@ class StreamingHarmonyContext(HarmonyContext):
# (finished=True), then the next token processed will mark the
# beginning of a new message
self.first_tok_of_message = output.finished
last_delta_text = ""
for tok in output.outputs[0].token_ids:
self.parser.process(tok)
last_delta_text += self.parser.last_content_delta or ""
if last_delta_text:
self.last_content_delta = last_delta_text
self._update_decode_token_usage(output)
# For streaming, update previous turn when message is complete

View File

@@ -1811,7 +1811,7 @@ class OpenAIServingResponses(OpenAIServing):
content_index=state.current_content_index,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.parser.last_content_delta,
delta=ctx.last_content_delta,
# TODO, use logprobs from ctx.last_request_output
logprobs=[],
)
@@ -1861,7 +1861,7 @@ class OpenAIServingResponses(OpenAIServing):
item_id=state.current_item_id,
output_index=state.current_output_index,
content_index=state.current_content_index,
delta=ctx.parser.last_content_delta,
delta=ctx.last_content_delta,
sequence_number=-1,
)
)
@@ -1908,7 +1908,7 @@ class OpenAIServingResponses(OpenAIServing):
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.parser.last_content_delta,
delta=ctx.last_content_delta,
)
)
return events
@@ -1952,7 +1952,7 @@ class OpenAIServingResponses(OpenAIServing):
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.parser.last_content_delta,
delta=ctx.last_content_delta,
)
)
return events
@@ -1999,7 +1999,7 @@ class OpenAIServingResponses(OpenAIServing):
sequence_number=-1,
output_index=state.current_output_index,
item_id=state.current_item_id,
delta=ctx.parser.last_content_delta,
delta=ctx.last_content_delta,
)
)
return events
@@ -2010,7 +2010,7 @@ class OpenAIServingResponses(OpenAIServing):
state: HarmonyStreamingState,
) -> list[StreamingResponsesResponse]:
"""Emit events for content delta streaming based on channel type."""
if not ctx.parser.last_content_delta:
if not ctx.last_content_delta:
return []
if (
@@ -2364,7 +2364,7 @@ class OpenAIServingResponses(OpenAIServing):
events.append(
ResponseFunctionCallArgumentsDeltaEvent(
item_id=state.current_item_id,
delta=ctx.parser.last_content_delta,
delta=ctx.last_content_delta,
output_index=state.current_output_index,
sequence_number=-1,
type="response.function_call_arguments.delta",