Prhub

#25626 Move KV-cache event emission to SchedulerKvEventsPublisher

原始 PR 作者 fzyzcjy 合并时间 2026-05-18 18:40 文件变更 3 提交数 1 评论 1 代码增减 +50 / -58

执行摘要

将 KV-cache 事件发射从 Mixin 迁移到独立组件

作为调度器模块解耦重构的一部分,将 KV-cache 事件发射从 SchedulerMetricsMixin 中迁出,减少 Mixin 的职责,促进模块化。PR body 描述为机械性的剪切粘贴操作,方法体 byte-identical。

值得精读,作为大规模解耦重构的典型模式示范:通过机械性剪切粘贴将职责从 Mixin 转移到独立组件,保持方法体不变,大幅降低回归风险。后续开发者可参考此模式继续分解调度器模块。

讨论亮点

无独立 review 讨论,PR 为单人一次性提交,无争议点。

实现拆解

  1. kv_events_publisher.py 中直接定义方法:将 init_kv_eventsemit_kv_metricspublish_kv_events 作为 SchedulerKvEventsPublisher 的实例方法添加,并在 __post_init__ 中调用 init_kv_events 替代之前的跨 Mixin 调用;同时新增对 timeKVEventBatch 的导入。
  2. scheduler_metrics_mixin.py 中移除定义和导入:删除三个静态方法及相关导入(如 EventPublisherFactoryKVEventBatchKvMetricsSchedulerKvEventsPublisher),并将 report_prefill_statsreport_decode_stats 中的调用改为 self.kv_events_publisher.emit_kv_metrics()self.kv_events_publisher.publish_kv_events()
  3. 更新 scheduler.py 中的调用:在 on_idle 方法中,将 self.publish_kv_events(self.kv_events_publisher) 改为 self.kv_events_publisher.publish_kv_events(),符合新组件的方法签名。
文件 模块 状态 重要度
python/sglang/srt/managers/scheduler_components/kv_events_publisher.py 调度器 modified 7.78
python/sglang/srt/observability/scheduler_metrics_mixin.py 可观测性 modified 7.86
python/sglang/srt/managers/scheduler.py 调度器 modified 4.13

关键符号

init_kv_events emit_kv_metrics publish_kv_events

关键源码片段

python/sglang/srt/managers/scheduler_components/kv_events_publisher.py core-logic

KV-cache 事件发射的新主场:接收了三个方法(init_kv_events、emit_kv_metrics、publish_kv_events),成为独立组件。变更包括新增导入和内部调用调整。

from __future__ import annotationsimport dataclasses
import time # 新增,用于 KVEventBatch 时间戳
from dataclasses import dataclass
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Optional,
)import zmqfrom sglang.srt.disaggregation.kv_events import ( # 新增导入
    EventPublisherFactory,
    KVEventBatch,
)if TYPE_CHECKING:
    from sglang.srt.distributed.parallel_state_wrapper import ParallelState
    from sglang.srt.mem_cache.base_prefix_cache import BasePrefixCache# ... (KvMetrics 和 SchedulerKvEventsPublisher dataclass 定义省略 ) ...@dataclass(kw_only=True, slots=True)
class SchedulerKvEventsPublisher:
    # ... 字段定义保持不变 ...
​
    def __post_init__(self) -> None:
        # 以前通过跨 Mixin 调用 init_kv_events,现在直接调用自身实例方法
        self.init_kv_events(self.kv_events_config)
​
    def init_kv_events(self, kv_events_config: Optional[str]):
        """根据配置和并行状态启用 KV-cache 事件发布器"""
        self.enable_kv_cache_events = bool(
            kv_events_config and self.ps.attn_tp_rank == 0 and self.ps.attn_cp_rank == 0
        )
        if self.enable_kv_cache_events:
            self.kv_event_publisher = EventPublisherFactory.create(
                kv_events_config, self.ps.attn_dp_rank
            )
​
    def emit_kv_metrics(self):
        """收集并发送 KV-cache 指标(通过 ZMQ socket)"""
        if not self.enable_kv_cache_events:
            return
        kv_metrics = KvMetrics()
        kv_metrics.request_active_slots = self.get_stats().num_running_reqs.total
        kv_metrics.request_total_slots = self.max_running_requests
        kv_metrics.kv_active_blocks = int(
            self.get_stats().token_usage * self.max_total_num_tokens
        )
        kv_metrics.kv_total_blocks = self.max_total_num_tokens
        kv_metrics.num_requests_waiting = self.get_stats().num_queue_reqs.total
        kv_metrics.gpu_cache_usage_perc = self.get_stats().token_usage
        kv_metrics.gpu_prefix_cache_hit_rate = self.get_stats().cache_hit_rate
        kv_metrics.data_parallel_rank = (
            self.ps.dp_rank if self.ps.dp_rank is not None else 0
        )
        if not self.send_metrics_from_scheduler.closed:
            self.send_metrics_from_scheduler.send_pyobj(kv_metrics)
​
    def publish_kv_events(self):
        """从 tree cache 获取事件并发布"""
        if not self.enable_kv_cache_events:
            return
        events = self.tree_cache.take_events()
        if events:
            batch = KVEventBatch(ts=time.time(), events=events)
            self.kv_event_publisher.publish(batch)

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

风险极低。变更仅为纯代码移动,方法体完全不变,调用点以简单前缀变换更新,没有逻辑修改。由于涉及调度器核心路径,回归风险理论上存在,但方法体 byte-identical 且无依赖行为变化,实际风险几乎为零。

对用户无影响,对外部行为透明。对内,SchedulerMetricsMixin 职责减少,SchedulerKvEventsPublisher 成为独立的 KV-cache 事件发射组件,便于后续维护和测试。影响范围限于调度器模块,无其他系统依赖。

核心路径变更 无测试配套

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论