Prhub

#25635 Move output streaming to SchedulerOutputStreamer

原始 PR 作者 fzyzcjy 合并时间 2026-05-18 18:44 文件变更 8 提交数 1 评论 1 代码增减 +459 / -481

执行摘要

将输出流逻辑从 Mixin 剥离到独立组件

PR body 描述为 'Mechanical cut + paste for the introduce-output-streamer mech move',目的是将输出流职责从 SchedulerOutputProcessorMixin 移出,使其与调度器主体解耦。该移动是前序一系列重构(#25636、#25637、#25638)的延续,旨在将 scheduler.py 中的不同关注点拆分到 scheduler_components/ 目录下的独立类。

值得精读。该 PR 展示了一个复杂的机械重构如何在保证行为不变的前提下完成核心职责分离。代码结构清晰,提交信息描述了每一步的变换(去 @staticmethod、前缀变换)。特别关注 output_streamer.py 中的 stream_output 方法如何统一处理生成和嵌入两种模式。此外,需要确认所有调用点是否均已覆盖(可 grep '.stream_output(' 验证)。

讨论亮点

该 PR 未产生有效 review 讨论,仅有一条 gemini-code-assist 机器人评论提示配额耗尽。由于 PR 处于重构链末端且描述明确为机械移动,团队未提出进一步审查意见。

实现拆解

  1. 在 output_streamer.py 中新增方法:将 Mixin 中的 _get_storage_backend_typeget_cached_tokens_detailsstream_output_trigger_crash_for_tests_stream_output_generation_stream_output_embedding 复制到 SchedulerOutputStreamer 类,去掉 @staticmethod 装饰器,将参数中的 self: "SchedulerOutputStreamer" 简化为 self,并对内部调用(如 _get_storage_backend_type)调整为直接调用。
  2. 从 Mixin 中删除方法并简化内联调用:删除 SchedulerOutputProcessorMixin 中上述方法的定义,将其内部对 stream_output 的调用(如 self.stream_output(self.output_streamer, ...))改为 self.output_streamer.stream_output(...),涉及 process_batch_result_prebuiltprocess_batch_result_prefill 等函数。
  3. 更新外部调用点:修改 scheduler.py 中的 __init__stream_output lambda)、handle_generate_request 以及 disaggregation/decode.pydisaggregation/prefill.pydllm/mixin/scheduler.py 中所有 self.scheduler.stream_output(...)self.stream_output(...) 的调用,使用 self.scheduler.output_streamer.stream_output(...)self.output_streamer.stream_output(...)
  4. 调整导入依赖:Mixin 不再需要 BatchEmbeddingOutputBatchTokenIDOutputGetLoadsReqInputBaseFinishReason 以及 SchedulerOutputStreamer 类型导入;output_streamer.py 补充这些导入以及 torch
  5. 更新测试文件:两个测试文件(test_priority_scheduling_disaggregation.pytest_decode_radix_lock_ref.py)中的模拟调用也同步修改为 output_streamer.stream_output
文件 模块 状态 重要度
python/sglang/srt/managers/scheduler_components/output_streamer.py 输出流 modified 9.05
python/sglang/srt/managers/scheduler_output_processor_mixin.py 输出处理 modified 8.86
python/sglang/srt/managers/scheduler.py 调度器 modified 5.8
python/sglang/srt/disaggregation/decode.py 解码分离 modified 5.35
python/sglang/srt/disaggregation/prefill.py 预填充分离 modified 5.2

关键符号

_get_storage_backend_type get_cached_tokens_details stream_output _trigger_crash_for_tests _stream_output_generation _stream_output_embedding process_batch_result_prebuilt process_batch_result_prefill process_batch_result_dllm

关键源码片段

python/sglang/srt/managers/scheduler_components/output_streamer.py dependency-wiring

新增了完整的输出流逻辑(stream_output 系列方法),成为输出职责的唯一载体。

def stream_output(
    self,
    reqs: List[Req],
    return_logprob: bool,
    skip_req: Optional[Req] = None,
):
    """Stream the output to detokenizer."""
    # 根据 is_generation 区分生成模式与 embedding 模式
    if self.is_generation:
        self._stream_output_generation(reqs, return_logprob, skip_req)
    else: # embedding or reward model
        self._stream_output_embedding(reqs)
​
    # 测试专用崩溃触发器:流输出达到阈值时崩溃
    if envs.SGLANG_TEST_CRASH_AFTER_STREAM_OUTPUTS.get() > 0:
        self._trigger_crash_for_tests(
            envs.SGLANG_TEST_CRASH_AFTER_STREAM_OUTPUTS.get()
        )
python/sglang/srt/managers/scheduler_output_processor_mixin.py dependency-wiring

删除了所有输出流方法,Mixin 只保留输出处理核心逻辑(如 process_batch_result_prebuilt),职责进一步收窄。

def process_batch_result_prebuilt(self: Scheduler, batch: ScheduleBatch):
    assert self.disaggregation_mode == DisaggregationMode.DECODE
    use_free_group = self.server_args.disaggregation_decode_enable_radix_cache
    if use_free_group:
        self.token_to_kv_pool_allocator.free_group_begin()
    for req in batch.reqs:
        req.time_stats.set_decode_prebuilt_finish_time()
        req.check_finished()
        if req.finished():
            req.time_stats.set_quick_finish_time()
            if self.enable_hisparse:
                self.hisparse_coordinator.request_finished(req)
            release_kv_cache(req, self.tree_cache)
​
    # Note: Logprobs should be handled on the prefill engine.
    # 调用方式从 self.stream_output(self.output_streamer, ...) 改为直接调用
    self.output_streamer.stream_output(batch.reqs, batch.return_logprob)
    if use_free_group:
        self.token_to_kv_pool_allocator.free_group_end()

评论区精华

无实质讨论 other

仅有一条 gemini-code-assist 机器人评论提示配额耗尽,无人工 review。

结论:PR 直接合并。 · 已解决

风险与影响

变更虽为机械移动,但涉及调度核心路径(stream_output 在每次批处理结果后调用)。如果调用点漏改或参数传递错误,可能导致请求输出丢失或崩溃。disaggregation 模块和 dllm 模块中的调用点需要特别关注。测试文件中修改了两个测试文件以适配新调用方式,但未增加专门针对 SchedulerOutputStreamer 的单元测试,存在回归风险。

对用户无感知,行为不变。对系统:解耦了输出流逻辑,使 Scheduler 类更精简,SchedulerOutputStreamer 成为可独立测试的组件。对团队:后续需要维护两个文件而非一个 mixin,但职责划分更清晰。推动后续可能引入输出流日志、监控或重试逻辑等增强。

核心路径变更 调用点完整度风险 缺少单元测试

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论