Prhub

#42683 [Bugfix][Frontend] streaming tool-call serializer drops first args chunk when name and args share a DeltaMessage

原始 PR 作者 ignaciosica 合并时间 2026-05-28 13:20 文件变更 4 提交数 3 评论 11 代码增减 +241 / -39

执行摘要

修复 streaming tool-call 序列化器丢弃第一个 args chunk 的 bug

根据 PR body,在 kimi 2.6 通过 /v1/responses (streaming) 部署时,约 60% 的工具调用产生的 done.arguments 无法通过 json.loads 解析,因为开头的 { 块从 SSE 流中丢失。这是因为 emit_delta 只检查 delta_message.tool_calls[0],而像 KimiK2ToolParserextract_tool_calls_streaming 会将 name 和 args 分成两个 DeltaToolCall 放在同一个 DeltaMessage 中,导致第一个 args chunk 被静默丢弃。

值得阅读,展示了一个高质量的 bugfix 设计:通过引入 split_delta 函数将复合 delta 原子化,简化了状态机设计,同时通过分组按 index 保持 name 和 args 的关联。测试覆盖完善。可以学习这种将复杂问题分解的思路。

讨论亮点

Review 中 Andreas Karatzas 建议将新测试文件 test_streaming_events.py 的内容合并到 test_serving_responses.pyTestAutoToolStreaming 中。sfeng33 回应保留了该文件,理由是对 split_delta 作为纯函数进行独立单元测试与集成验证 wiring 有不同目的。最终 Andreas 同意保留两份测试。

实现拆解

  1. 新增 split_delta 函数streaming_events.py):接受一个 DeltaMessage,根据它包含的字段(reasoning, content, tool_calls)分解为若干原子 DeltaMessage,每个只包含一个字段。
    - 对于 tool_calls,按 index 分组,同一 index 的多个 DeltaToolCall 合并到一个 DeltaMessage 中,确保 name 和 args 保持在同一原子 delta 中。
    - 如果只有一个字段且 tool_calls index 唯一,则直接返回原 delta。
  2. 简化 emit_delta 方法streaming_events.py):移除原来 reasoning→content 的特殊处理逻辑,因为这个场景现在由 split_delta 在外部保证不会出现复合字段。
  3. 集成到流式循环serving.py):在 _process_simple_streaming_events 中,对每个 delta_message 先调用 split_delta,然后对每个分解后的原子 delta 分别执行状态机步骤(resolve_target_state, needs_transition, emit_delta)。
  4. 测试配套
    • 新增 test_streaming_events.py:包含 TestSplitDelta 测试 split_delta 纯函数,以及 TestProcessorCompoundDeltas 测试通过 SimpleStreamingEventProcessor 处理复合 delta 的集成。
    • test_serving_responses.py 中新增 test_compound_content_and_tool_name_args_same_delta 端到端测试,验证 content 和 tool name/args 在同一个 DeltaMessage 中的正确事件序列。
文件 模块 状态 重要度
vllm/entrypoints/openai/responses/streaming_events.py 流式事件 modified 7.52
vllm/entrypoints/openai/responses/serving.py 服务编排 modified 6.0
tests/entrypoints/openai/responses/test_streaming_events.py 流式测试 added 7.24
tests/entrypoints/openai/responses/test_serving_responses.py 服务测试 modified 5.68

关键符号

split_delta SimpleStreamingEventProcessor.emit_delta OpenAIServingResponses._process_simple_streaming_events

关键源码片段

vllm/entrypoints/openai/responses/streaming_events.py core-logic

核心变更文件:新增 `split_delta` 函数,修改 `emit_delta` 方法,移除特殊处理逻辑。

# 文件 : vllm/entrypoints/openai/responses/streaming_events.pydef split_delta(delta: DeltaMessage) -> list[DeltaMessage]:
    # 将复合 `DeltaMessage` 分解为原子 delta,每个仅含一个字段。
    # Responses API 要求每个 SSE 事件只有一种类型,因此在进入状态机之前需拆分。
    # 固定顺序 : reasoning -> content -> tool_calls ( 按 index 分组 )。
    has_reasoning = delta.reasoning is not None
    has_content = delta.content is not None
    has_tools = bool(delta.tool_calls)
    parts = int(has_reasoning) + int(has_content) + int(has_tools)
​
    # 如果只有一个字段,且 tool_calls 的 index 不冲突 (≤1 个不同 index)
    if parts <= 1 and (
        not has_tools
        or len({tc.index for tc in delta.tool_calls if tc.index is not None}) <= 1
    ):
        return [delta]
​
    deltas: list[DeltaMessage] = []
    if has_reasoning:
        deltas.append(DeltaMessage(reasoning=delta.reasoning))
    if has_content:
        deltas.append(DeltaMessage(content=delta.content))
    if has_tools:
        # 按 index 分组,同一 index 的 tool_calls 合并在一起
        groups: dict[int | None, list[DeltaToolCall]] = {}
        for tc in delta.tool_calls:
            groups.setdefault(tc.index, []).append(tc)
        for tcs in groups.values():
            deltas.append(DeltaMessage(tool_calls=tcs))
    return deltas or [delta]
​
​
class SimpleStreamingEventProcessor:
    # ... 其他代码 ...
​
    def emit_delta(
        self,
        delta_message: DeltaMessage,
        output: Any | None,
        get_logprobs: Callable[..., list[Logprob] | None] | None = None,
    ) -> list[StreamingResponsesResponse]:
        # 发射当前状态的增量事件。现在假设 delta_message 已是原子 delta。
        handlers = self._STATE_HANDLERS[self.state.current_state]
        if self.state.current_state == _StateType.TOOL_CALL:
            assert delta_message.tool_calls is not None
            # 遍历所有 tool_calls ( 可能包含同一 index 的 name 和 args)
            for tc in delta_message.tool_calls:
                if tc.function is not None and tc.function.arguments:
                    return handlers.delta_fn(self.state, tc.function.arguments)
            return []
        # 对于 CONTENT / REASONING 状态,直接取对应字段
        field = (
            delta_message.content
            if self.state.current_state == _StateType.CONTENT
            else delta_message.reasoning
        )
        if field:
            logprobs = get_logprobs(output) if get_logprobs else []
            return handlers.delta_fn(self.state, field, logprobs)
        return []
vllm/entrypoints/openai/responses/serving.py core-logic

集成 `split_delta` 到流式循环中,修改控制流,是 bugfix 的另一个关键部分。

# 文件 : vllm/entrypoints/openai/responses/serving.py
# 在 _process_simple_streaming_events 方法中# 原来 : 直接使用 delta_message
# 现在 : 先拆分再逐个处理for dm in split_delta(delta_message):
    target_state, tool_call = processor.resolve_target_state(dm)
    if target_state == _StateType.NONE:
        continue
​
    if processor.needs_transition(target_state, tool_call):
        for event in processor.close_current():
            yield _increment_sequence_number_and_return(event)
        for event in processor.open(target_state, tool_call):
            yield _increment_sequence_number_and_return(event)
​
    for event in processor.emit_delta(dm, output, _get_logprobs):
        yield _increment_sequence_number_and_return(event)

评论区精华

测试文件结构:是否将新测试合并到现有文件 设计

Andreas Karatzas 建议将新测试文件 test_streaming_events.py 的内容合并到 test_serving_responses.py 的 TestAutoToolStreaming 中,认为不需要独立的测试文件。sfeng33 回应保留了该文件,理由是对 split_delta 作为纯函数进行独立单元测试与集成验证 wiring 有不同目的。

结论:双方同意保留两份测试文件,各自覆盖不同层次。 · 已解决

风险与影响

主要风险在于 split_delta 改变了 DeltaMessage 的处理流程,可能对尚未覆盖的 tool parser 或状态机行为产生影响。但测试覆盖了关键路径(复合字段、并行 tool calls、reasoning→content 转换),且 bugfix 本身增加的是通用拆分逻辑,风险可控。没有发现性能或安全风险。

直接影响使用 /v1/responses streaming endpoint 且 tool parser 会在同一 DeltaMessage 中发送 name 和 args 独立 DeltaToolCall 的场景(如 KimiK2ToolParser)。修复后,这些场景的工具调用参数不再丢失,SSE 流中 function_call_arguments.delta 序列完整。对其他场景无负面影响。

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论