Prhub

#25625 Stand up SchedulerKvEventsPublisher; migrate KV-event state to it

原始 PR 作者 fzyzcjy 合并时间 2026-05-18 18:39 文件变更 3 提交数 1 评论 1 代码增减 +100 / -28

执行摘要

准备 SchedulerKvEventsPublisher 组件并迁移 KV 事件状态

作为调度器职责分离重构(refactor chain: introduce-kv-events-publisher)的预备步骤,将 KV 事件相关的状态和逻辑从合集中提取,为后续将方法实际迁移到独立组件铺平道路,提升代码可维护性。

建议精读此 PR 以理解调度器组件化重构的手法:分离状态、静态方法适配、内联计算消除循环依赖。可从中学习渐进式重构的技巧。

讨论亮点

无实质 review 讨论;仅有一条 gemini-code-assist 机器人的配额提示。

实现拆解

  1. 创建新组件文件 (scheduler_components/kv_events_publisher.py):将原 scheduler_metrics_mixin.py 中的 KvMetrics 数据类逐字节复制到新文件;添加 SchedulerKvEventsPublisher 类骨架,其 __post_init__ 委托给旧的 mixin 初始化方法,以便在不移动方法的情况下完成注入。
  2. 重构 mixin 方法 (scheduler_metrics_mixin.py):删除本地的 KvMetrics 定义,改为从新模块导入;将 init_kv_eventsemit_kv_metrics(原 _emit_kv_metrics)、publish_kv_events(原 _publish_kv_events)转换为 @staticmethod,第一个参数类型标注为 SchedulerKvEventsPublisheremit_kv_metrics 内部将 self.stats.X 读取替换为 self.get_stats().X
  3. 调度器依赖注入 (scheduler.py):导入 SchedulerKvEventsPublisher;在 Scheduler.__init__ 末尾(is_initializing 之前)创建 self.kv_events_publisher 实例,注入所有必要依赖(包括 get_stats=lambda: self.stats);将 on_idle 中对 _publish_kv_events 的调用改为 self.publish_kv_events(self.kv_events_publisher);同时将 kv_cache_builder.build_kv_cacheenable_kv_cache_events 参数从预先计算的属性改为内联计算,避免对未初始化 publisher 的依赖。
文件 模块 状态 重要度
python/sglang/srt/managers/scheduler_components/kv_events_publisher.py 调度器组件 added 8.77
python/sglang/srt/observability/scheduler_metrics_mixin.py 调度指标 modified 8.29
python/sglang/srt/managers/scheduler.py 调度器 modified 6.72

关键符号

SchedulerKvEventsPublisher.__post_init__ SchedulerMetricsMixin.init_kv_events SchedulerMetricsMixin.emit_kv_metrics SchedulerMetricsMixin.publish_kv_events Scheduler.__init__ Scheduler.on_idle

关键源码片段

python/sglang/srt/managers/scheduler_components/kv_events_publisher.py core-logic

新文件,定义 KvMetrics 数据类和 SchedulerKvEventsPublisher 组件骨架,是本次重构的核心产出。

# 文件 : python/sglang/srt/managers/scheduler_components/kv_events_publisher.pyfrom __future__ import annotationsimport dataclasses
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Optionalimport zmqif TYPE_CHECKING:
    from sglang.srt.distributed.parallel_state_wrapper import ParallelState
    from sglang.srt.mem_cache.base_prefix_cache import BasePrefixCache# 临时占位定义,实际类型从 metrics_collector 导入(避免循环导入)
class SchedulerStats: ... # type: ignore[no-redef]
​
​
@dataclasses.dataclass
class KvMetrics:
    """KV 缓存事件相关的度量数据,原定义于 scheduler_metrics_mixin,现迁移至独立组件。"""
    request_active_slots: int = 0
    request_total_slots: int = 0
    kv_active_blocks: int = 0
    kv_total_blocks: int = 0
    num_requests_waiting: int = 0
    gpu_cache_usage_perc: float = 0.0
    gpu_prefix_cache_hit_rate: float = 0.0
    data_parallel_rank: int = 0
​
​
@dataclass(kw_only=True, slots=True)
class SchedulerKvEventsPublisher:
    """KV 事件发布器组件,当前仅包含骨架和初始化委托,后续提交会将方法移入。"""
    kv_events_config: Optional[str]
    ps: "ParallelState"
    attn_tp_rank: int
    attn_cp_rank: int
    attn_dp_rank: int
    dp_rank: Optional[int]
    tree_cache: "BasePrefixCache"
    send_metrics_from_scheduler: Optional["zmq.Socket"]
    max_running_requests: int
    max_total_num_tokens: int
    get_stats: Callable # 运行时可变 scheduler stats 的 getter
    enable_kv_cache_events: bool = False
    kv_event_publisher: Any = None
​
    def __post_init__(self) -> None:
        # 委托给旧 mixin 完成初始化,方法内容后续会搬到这里
        from sglang.srt.observability.scheduler_metrics_mixin import (
            SchedulerMetricsMixin,
        )
        SchedulerMetricsMixin.init_kv_events(self, self.kv_events_config)
python/sglang/srt/observability/scheduler_metrics_mixin.py core-logic

核心修改点:迁移 KvMetrics 定义、将三个方法转换为静态方法并调整内部实现,是逻辑适配的关键文件。

# 文件 : python/sglang/srt/observability/scheduler_metrics_mixin.py ( 部分摘录 )# ... 导入部分新增:
from sglang.srt.managers.scheduler_components.kv_events_publisher import KvMetrics
if TYPE_CHECKING:
    from sglang.srt.managers.scheduler_components.kv_events_publisher import (
        SchedulerKvEventsPublisher,
    )# 删除了本地的 KvMetrics 类定义class SchedulerMetricsMixin:
    # ... 其他方法
​
    @staticmethod
    def init_kv_events(
        self: "SchedulerKvEventsPublisher", kv_events_config: Optional[str]
    ):
        """初始化 KV 事件相关配置,现在通过 publisher 实例调用。"""
        self.enable_kv_cache_events = bool(
            kv_events_config and self.ps.attn_tp_rank == 0 and self.ps.attn_cp_rank == 0
        )
        # ... 后续初始化
​
    # 方法已重命名为 emit_kv_events ( 去掉下划线前缀 ),并变为静态方法
    @staticmethod
    def emit_kv_metrics(self: "SchedulerKvEventsPublisher"):
        if not self.enable_kv_cache_events:
            return
        kv_metrics = KvMetrics()
        # 原为 self.stats.X,现在通过 get_stats() callable 获取
        kv_metrics.request_active_slots = self.get_stats().num_running_reqs.total
        # ... 其余填充逻辑不变
​
    @staticmethod
    def publish_kv_events(self: "SchedulerKvEventsPublisher"):
        # 实现与原 _publish_kv_events 相同,但通过 self 操作 publisher 属性
        ...
​
    def report_prefill_stats(self: Scheduler):
        # ... 调用方式改为:
        self.emit_kv_metrics(self.kv_events_publisher)
        self.publish_kv_events(self.kv_events_publisher)
​
    def report_decode_stats(self: Scheduler):
        # ... 同理
        self.emit_kv_metrics(self.kv_events_publisher)
        self.publish_kv_events(self.kv_events_publisher)
python/sglang/srt/managers/scheduler.py dependency-wiring

调度器主文件,完成依赖注入并更新调用逻辑,包含对 enable_kv_cache_events 计算的内联调整。

# 文件 : python/sglang/srt/managers/scheduler.py ( 部分摘录 )# 新增导入
from sglang.srt.managers.scheduler_components.kv_events_publisher import (
    SchedulerKvEventsPublisher,
)# 在 __init__ 方法中,在 self.is_initializing = False 之前创建 publisher
self.kv_events_publisher = SchedulerKvEventsPublisher(
    kv_events_config=self.server_args.kv_events_config,
    ps=self.ps,
    attn_tp_rank=self.ps.attn_tp_rank,
    attn_cp_rank=self.ps.attn_cp_rank,
    attn_dp_rank=self.ps.attn_dp_rank,
    dp_rank=self.ps.dp_rank,
    tree_cache=self.tree_cache,
    send_metrics_from_scheduler=self.send_metrics_from_scheduler,
    max_running_requests=self.max_running_requests,
    max_total_num_tokens=self.max_total_num_tokens,
    get_stats=lambda: self.stats, # 运行时可变 stats
)# 在 kv_cache_builder 调用处,将 enable_kv_cache_events 改为内联计算
result = kv_cache_builder.build_kv_cache(
    ...
    # 原为 enable_kv_cache_events=self.enable_kv_cache_events,
    enable_kv_cache_events=bool(
        self.server_args.kv_events_config
        and self.ps.attn_tp_rank == 0
        and self.ps.attn_cp_rank == 0
    ),
    ...
)# on_idle 方法中的调用
# 原 : self._publish_kv_events()
# 现 :
self.publish_kv_events(self.kv_events_publisher)

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

行为等价风险:虽然方法被静态化且调用方式改变,但经过参数透传后逻辑不变。enable_kv_cache_events 的内联计算与 init_kv_events 中的计算完全一致。但仍需关注:init_kv_events__post_init__ 中被调用,而 __post_init__self.ps.attn_tp_rank 等属性可能尚未赋值?实际上在 SchedulerKvEventsPublisher__post_init__ 中,这些属性已经通过 kw_only 传入,所以安全。
回归风险:涉及调度器核心初始化路径,但无测试覆盖(本 PR 未增加测试)。

影响范围:仅影响调度器内部,对用户无功能变化,不影响现有 API。
影响程度:低。属于内部重构,但为后续大规模组件迁移奠定基础。

核心路径变更 缺少测试覆盖 依赖循环调整

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论