Prhub

#25634 Stand up SchedulerOutputStreamer; migrate output-streaming state to it

原始 PR 作者 fzyzcjy 合并时间 2026-05-18 18:43 文件变更 6 提交数 1 评论 1 代码增减 +119 / -34

执行摘要

创建 SchedulerOutputStreamer 组件,迁移输出流状态

根据 PR body 和依赖文件意图,本变更属于 'introduce-output-streamer' 机制移动的现场准备(inplace prep)。目的是将调度器中分散的输出流状态集中到独立组件,从而降低 scheduler.py 的复杂度、增强模块化,并为后续的进一步拆分(如物理移动方法体到 SchedulerOutputStreamer)铺平道路。

建议阅读以了解调度器组件化模式的演进方向。特别关注静态方法+参数注入的设计思路,以及通过 lambda 实现延迟绑定的技巧。此 PR 为系列重构的基石,后续 PR 将物理移动方法体,值得跟踪。

讨论亮点

无,此 PR 为独立提交,未触发讨论。

实现拆解

  1. 创建新组件:新增 python/sglang/srt/managers/scheduler_components/output_streamer.py,使用 @dataclass 定义 SchedulerOutputStreamer,包含输出流所需的所有依赖(send_to_detokenizertree_cacheserver_args 等)以及两个可调用对象(enable_hicache_storageload_inquirer_get_loads)。
  2. 实例化和注入:在 Scheduler.__init__ 中导入 SchedulerOutputStreamer,在 logprob_computer 初始化之后实例化 self.output_streamer,通过 lambda 将 self.enable_hicache_storageself.load_inquirer.get_loads 包装为可调用对象传入。同时,将传递给 SchedulerRequestReceiverstream_output 参数改为 lambda,确保延迟绑定到 self.output_streamer
  3. 方法转换:在 SchedulerOutputProcessorMixin 中,将 _get_storage_backend_type_get_cached_tokens_detailsstream_output_generationstream_output_embedding_trigger_crash_for_tests 五个方法转换为 @staticmethod,并将第一个参数类型注解改为 "SchedulerOutputStreamer"。方法内部原通过 self 访问的属性(如 self.enable_hicache_storageself.tree_cache)改为通过参数实例访问。
  4. 可见性调整_get_cached_tokens_details 更名为公开方法 get_cached_tokens_detailsstream_output_generationstream_output_embedding 更名为私有方法 _stream_output_generation_stream_output_embedding,并更新 mixin 内部调用。
  5. 调用点更新:修改 mixin 内部其他方法、Scheduler.handle_generate_request、以及外部模块(disaggregation/decode.pydisaggregation/prefill.pydllm/mixin/scheduler.py)中对 stream_output 的调用,显式传递 self.output_streamer 作为第一个参数。
文件 模块 状态 重要度
python/sglang/srt/managers/scheduler_components/output_streamer.py 输出流组件 added 7.73
python/sglang/srt/managers/scheduler_output_processor_mixin.py 输出处理器 Mixin modified 8.61
python/sglang/srt/managers/scheduler.py 调度器 modified 6.36
python/sglang/srt/disaggregation/decode.py 解码分离 modified 5.0

关键符号

SchedulerOutputStreamer _get_storage_backend_type get_cached_tokens_details _stream_output_generation _stream_output_embedding

关键源码片段

python/sglang/srt/managers/scheduler_components/output_streamer.py core-logic

新文件,定义了 `SchedulerOutputStreamer` 数据类,集中持有输出流所需的依赖和配置,是本次重构的核心目标组件。

from __future__ import annotationsimport logging
from dataclasses import dataclass
from typing import Any, Callableimport zmqfrom sglang.srt.disaggregation.utils import DisaggregationMode
from sglang.srt.distributed.parallel_state_wrapper import ParallelState
from sglang.srt.environ import envs
from sglang.srt.mem_cache.base_prefix_cache import BasePrefixCache
from sglang.srt.server_args import ServerArgs
from sglang.srt.speculative.spec_info import SpeculativeAlgorithmlogger = logging.getLogger(__name__)# 全局配置常量,与 mixin 中一致
DEFAULT_FORCE_STREAM_INTERVAL = envs.SGLANG_FORCE_STREAM_INTERVAL.get()
​
​
@dataclass(kw_only=True, slots=True)
class SchedulerOutputStreamer:
    """
    输出流处理器,封装所有输出流所需的状态和依赖。
    后续将接收从 mixin 迁移过来的具体流处理方法。
    """
​
    send_to_detokenizer: zmq.Socket # 向 detokenizer 发送消息的 socket
    tree_cache: BasePrefixCache # 树缓存,用于缓存状态查询
    ps: ParallelState # 并行状态
    server_args: ServerArgs # 服务器参数
    is_generation: bool # 是否为生成模式
    spec_algorithm: SpeculativeAlgorithm # 推测解码算法
    disaggregation_mode: DisaggregationMode # 分离模式
​
    # 可调用对象:通过 lambda 注入,保持与 Scheduler 实例的延迟绑定
    enable_hicache_storage: Callable[[], bool]
    load_inquirer_get_loads: Callable[..., Any]
​
    # 测试用计数器
    _test_stream_output_count: int = 0
python/sglang/srt/managers/scheduler_output_processor_mixin.py core-logic

核心修改文件,五个流相关方法被转换为静态方法并调整参数,内部属性访问方式改变,为后续物理移动至 `SchedulerOutputStreamer` 做准备。

# 以下展示 _get_storage_backend_type 和 get_cached_tokens_details 的转换后代码
# 原为实例方法,现为静态方法,第一个参数 self 实际接收 SchedulerOutputStreamer 实例@staticmethod
def _get_storage_backend_type(self: "SchedulerOutputStreamer") -> str:
    """从 tree_cache 获取存储后端类型。"""
    storage_backend_type = "none"
    cache_controller = getattr(self.tree_cache, "cache_controller", None)
    if cache_controller and hasattr(cache_controller, "storage_backend"):
        storage_backend = cache_controller.storage_backend
        if storage_backend is not None:
            storage_backend_type = type(storage_backend).__name__
    return storage_backend_type@staticmethod
def get_cached_tokens_details(
    self: "SchedulerOutputStreamer", req: Req
) -> Optional[dict]:
    """获取请求的缓存详情,如果可用。"""
    if (
        req.cached_tokens_device > 0
        or req.cached_tokens_host > 0
        or req.cached_tokens_storage > 0
    ):
        details = {
            "device": req.cached_tokens_device,
            "host": req.cached_tokens_host,
        }
        # 若 L3 存储启用,则包含存储字段
        if self.enable_hicache_storage(): # 原为 self.enable_hicache_storage (bool 属性 )
            details["storage"] = req.cached_tokens_storage
            details["storage_backend"] = (
                SchedulerOutputProcessorMixin._get_storage_backend_type(self)
            )
        return details
​
    if req.cached_tokens > 0:
        return {"device": req.cached_tokens, "host": 0}
​
    return None
python/sglang/srt/managers/scheduler.py dependency-wiring

调度器主文件,导入新组件并在 `__init__` 中实例化,修改传递给 `SchedulerRequestReceiver` 的 `stream_output` 为 lambda,更新热路径调用。

# scheduler.py 中新增的导入和实例化代码
from sglang.srt.managers.scheduler_components.output_streamer import (
    SchedulerOutputStreamer,
)# 在 __init__ 方法中,logprob_computer 初始化后添加:
self.output_streamer = SchedulerOutputStreamer(
    send_to_detokenizer=self.send_to_detokenizer,
    tree_cache=self.tree_cache,
    ps=self.ps,
    server_args=self.server_args,
    is_generation=self.is_generation,
    spec_algorithm=self.spec_algorithm,
    disaggregation_mode=self.disaggregation_mode,
    # 使用 lambda 延迟绑定,避免循环引用或顺序问题
    enable_hicache_storage=lambda: self.enable_hicache_storage,
    load_inquirer_get_loads=lambda req: self.load_inquirer.get_loads(req),
)# 在 request_receiver 创建时,将 stream_output 也通过 lambda 指向 output_streamer
self.request_receiver = SchedulerRequestReceiver(
    ...
    stream_output=lambda *a, **kw: self.stream_output(
        self.output_streamer, *a, **kw
    ),
    ...
)

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

主要风险是核心路径变更:Scheduler.stream_output 的调用签名改变,所有调用点需传递 output_streamer 参数,若遗漏则导致运行时错误。由于变更本身为纯重构,逻辑行为不变,回归风险较低。但缺少测试配套(无测试文件修改),无法自动验证迁移正确性,存在隐藏 bug 风险。此外,lambda 延迟绑定可能使调试时堆栈更复杂。

对开发者:引入新组件 SchedulerOutputStreamer,后续维护需理解其职责;对调度器相关开发人员需适应新的方法签名。对用户:无功能影响,输出流行为完全一致。对系统:性能无变化,因仅为重构。

核心路径变更 缺少测试覆盖

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论