Prhub

#22658 PD streaming: batch notify + SSE fast path

原始 PR 作者 inkcherry 合并时间 2026-04-22 17:21 文件变更 3 提交数 12 评论 37 代码增减 +134 / -76

执行摘要

批处理通知 +SSE 快速路径,PD streaming 吞吐 +20%

在高并发PD streaming(如2048并发)下,decode侧tokenizer_manager成为CPU瓶颈:asyncio wakeup storm导致频繁上下文切换,每个token的SSE chunk构建3个Pydantic对象并调用model_dump_json()产生大量开销。

强烈推荐合并。该PR提供了明确的性能收益和可衡量的基准结果。值得精读的部分包括:如何用msgspec替换Pydantic进行SSE序列化,以及批量通知模式的设计权衡。后续可关注IPC序列化迁移(PR #21643)以进一步释放性能。

讨论亮点

alexnails:建议使用msgspec struct并添加omit_defaults=True减少35% payload,移除tool_calls字段,类型化usage字段。作者已采纳。
alexnails:硬编码16应改为可配置参数。作者添加--batch-notify-size
alexnails:缓存batch_notify_size避免循环内属性链查找。作者已修改。
alexnails:统一SSE chunk构造路径。作者已将首chunk也通过_fast_sse_content生成。

实现拆解

  1. 批量通知机制 (tokenizer_manager.py): 将_handle_batch_output改为async,引入pending_notify字典,每积累batch_notify_size(默认16)个通知统一event.set()await asyncio.sleep(0)让出事件循环,循环结束后flush剩余。
  2. SSE快速路径 (serving_chat.py): 新增msgspec.Struct子类(_StreamDelta, _StreamChoice, _StreamChunk)和_fast_sse_content辅助函数,用预创建的msgspec.json.Encoder编码替代Pydantic model_dump_json(),仅在4个热路径yield点使用。
  3. 配置参数化 (server_args.py): 添加--batch-notify-sizeCLI参数,默认16,允许根据负载调整。
文件 模块 状态 重要度
python/sglang/srt/entrypoints/openai/serving_chat.py API 入口 modified 8.76
python/sglang/srt/managers/tokenizer_manager.py 分词管理器 modified 7.04
python/sglang/srt/server_args.py 配置 modified 5.45

关键符号

_handle_batch_output _fast_sse_content _StreamDelta _StreamChoice _StreamChunk

关键源码片段

python/sglang/srt/entrypoints/openai/serving_chat.py dependency-wiring

替换 Pydantic 序列化为 msgspec,新增 _fast_sse_content 热路径,是性能优化的核心

# 使用 msgspec.Struct 定义 SSE chunk 结构,omit_defaults=True 使输出更紧凑
class _StreamDelta(msgspec.Struct, omit_defaults=True):
    # reasoning_content 必须为必填(无默认值),因为 OpenAI SDK 的 ChoiceDelta 通过 extra 暴露该字段
    # omit_defaults=True 下若默认 None 会丢失 key,导致客户端 AttributeError
    reasoning_content: Optional[str]
    role: Optional[str] = None
    content: Optional[str] = Noneclass _StreamChoice(msgspec.Struct):
    index: int
    delta: _StreamDelta
    logprobs: Optional[dict] = None
    finish_reason: Optional[str] = None
    matched_stop: Union[None, int, str] = Noneclass _StreamChunk(msgspec.Struct, omit_defaults=True):
    id: str
    object: str
    created: int
    model: str
    choices: List[_StreamChoice]
    usage: Optional[dict] = None# 复用 msgspec.json.Encoder 实例避免重复创建
_stream_encoder = msgspec.json.Encoder()# 热路径 SSE 构造:直接构建 struct,用 msgspec 编码,替代 Pydantic model_dump_json
# 参数覆盖所有 hot-path 字段:role, content, reasoning_content, finish_reason 等
# 非热路径(hidden_states, tool_calls 等)仍走原 Pydantic 路径
_SSE_DATA_B = b"data: "
_SSE_NL_B = b"\n\n"def _fast_sse_content(
    chunk_id: str, created: int, model: str, index: int,
    role: Optional[str] = None, content: Optional[str] = None,
    reasoning_content: Optional[str] = None,
    finish_reason: Optional[str] = None,
    logprobs: Optional[dict] = None,
    matched_stop: Union[None, int, str] = None,
    usage: Optional[dict] = None,
) -> str:
    delta = _StreamDelta(role=role, content=content, reasoning_content=reasoning_content)
    choice = _StreamChoice(index=index, delta=delta, logprobs=logprobs,
                           finish_reason=finish_reason, matched_stop=matched_stop)
    chunk = _StreamChunk(id=chunk_id, object="chat.completion.chunk",
                         created=created, model=model, choices=[choice], usage=usage)
    # 直接拼接 bytes 再 decode,避免中间字符串
    return (_SSE_DATA_B + _stream_encoder.encode(chunk) + _SSE_NL_B).decode()
python/sglang/srt/managers/tokenizer_manager.py core-logic

将 _handle_batch_output 改为异步,引入批量通知机制,减少 asyncio 唤醒风暴

# 在 handle_loop 中,对 batch 输出类型提前过滤,直接 await 异步处理
async def handle_loop(self):
    while True:
        with self.soft_watchdog.disable():
            recv_obj = await self.recv_from_detokenizer.recv_pyobj()
        if isinstance(recv_obj, (BatchStrOutput, BatchEmbeddingOutput, BatchTokenIDOutput)):
            await self._handle_batch_output(recv_obj)
        else:
            self._result_dispatcher(recv_obj)
        self.last_receive_tstamp = real_time()
        self.soft_watchdog.feed()async def _handle_batch_output(self, recv_obj):
    pending_notify: dict[str, ReqState] = {}
    batch_notify_size = self.server_args.batch_notify_size # 缓存配置,避免循环内属性链查找
    for i, rid in enumerate(recv_obj.rids):
        state = self.rid_to_state.get(rid, None)
        if state is None:
            logger.error(...)
            continue
        # ... 构建 meta_info 等逻辑不变 ...
        # 不立即 event.set(),而是暂存
        pending_notify[rid] = state
        # 每积累 batch_notify_size 个通知,统一唤醒并 yield
        if len(pending_notify) >= batch_notify_size:
            for s in pending_notify.values():
                s.event.set()
            pending_notify = {}
            await asyncio.sleep(0) # 让出事件循环,允许其他协程处理已唤醒的流
        # ... 后续日志 /metrics 不变 ...
    # 循环结束,flush 剩余通知(handle_loop 会立即 await recv,无需额外 sleep)
    for s in pending_notify.values():
        s.event.set()
    # handle_loop awaits next recv immediately

评论区精华

使用 msgspec struct 替代 Pydantic 序列化 设计

alexnails 建议使用 msgspec struct 以避免原始 dict 缺乏类型约束,同时获得序列化性能提升。作者同意并修改为 msgspec.Struct。

结论:采用 msgspec.Struct 并添加 _fast_sse_content 辅助函数。 · 已解决

batch_notify_size 硬编码为 16,建议可配置 设计

alexnails 指出 magic number 应改为可配置参数,并询问为何选择 16。作者表示已测试多个值,并同意参数化。

结论:添加 --batch-notify-size CLI 参数,默认 16。 · 已解决

移除 _StreamDelta 中无用的 tool_calls 字段 性能

alexnails 指出 tool_calls 字段在 fast path 中从未设置,但导致每个 token 都输出 null,增加 payload 大小。

结论:作者同意并移除该字段。 · 已解决

使用 omit_defaults=True 减小 SSE payload 性能

alexnails 建议在 _StreamDelta 和 _StreamChunk 上使用 omit_defaults=True,可减少 35% payload 大小。作者添加后测试通过。

结论:添加 omit_defaults=True,同时保留首个 chunk 的 content="" 以匹配 OpenAI 预期。 · 已解决

类型化 usage 字段避免 Any 和编码回退 性能

alexnails 建议将 usage 类型从 Optional[Any] 改为 Optional[dict],并在调用点先 .model_dump(),或定义 _StreamUsage struct。作者选择 Option1 以避免同步风险。

结论:采用 Option1,调用侧先 .model_dump(),usage 类型改为 Optional[dict]。 · 已解决

缓存 batch_notify_size 避免循环内属性链查找 性能

alexnails 指出 self.server_args.batch_notify_size 在每次循环迭代中导致两次 dict 查找,建议缓存为局部变量。作者已修改。

结论:在循环前缓存 batch_notify_size = self.server_args.batch_notify_size。 · 已解决

最终 flush 后是否需要 asyncio.sleep(0) 正确性

alexnails 注意到最终 flush 没有 yield,但指出 handle_loop 接下来会 await recv,所以是安全的。建议添加注释说明。作者添加了注释。

结论:添加注释 '# handle_loop awaits next recv immediately'。 · 已解决

统一 SSE chunk 构造路径,将首 chunk 也通过 _fast_sse_content 生成 设计

alexnails 指出有三个不同路径构造 SSE chunk,建议统一。作者已将所有热路径 chunk 统一通过 _fast_sse_content 生成。

结论:三个路径已合并使用 _fast_sse_content。 · 已解决

风险与影响

1) 核心路径变更:_handle_batch_output改为async,但通过asyncio.sleep(0)保证公平调度,已通过CI。
2) 缺少测试覆盖:无新增单元测试,依赖现有CI。
3) 输出格式兼容性:omit_defaults=True可能改变wire format,但首个chunk的content=""已保留以匹配OpenAI SDK预期。
4) IPC序列化(pickle)仍为瓶颈,但PR #21643已在迁移中。

直接影响所有使用PD disaggregation streaming的部署,特别是高并发场景(如2048并发)。用户将体验到更低的TPOT和更高的吞吐量。对非PD streaming场景无影响。团队需注意新参数batch-notify-size的调优。

核心路径变更 缺少测试覆盖 输出格式兼容性

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论