执行摘要
- 一句话:将 KV-cache 事件发射从 Mixin 迁移到独立组件
- 推荐动作:值得精读,作为大规模解耦重构的典型模式示范:通过机械性剪切粘贴将职责从 Mixin 转移到独立组件,保持方法体不变,大幅降低回归风险。后续开发者可参考此模式继续分解调度器模块。
功能与动机
作为调度器模块解耦重构的一部分,将 KV-cache 事件发射从 SchedulerMetricsMixin 中迁出,减少 Mixin 的职责,促进模块化。PR body 描述为机械性的剪切粘贴操作,方法体 byte-identical。
实现拆解
- 在
kv_events_publisher.py 中直接定义方法:将 init_kv_events、emit_kv_metrics、publish_kv_events 作为 SchedulerKvEventsPublisher 的实例方法添加,并在 __post_init__ 中调用 init_kv_events 替代之前的跨 Mixin 调用;同时新增对 time 和 KVEventBatch 的导入。
- 从
scheduler_metrics_mixin.py 中移除定义和导入:删除三个静态方法及相关导入(如 EventPublisherFactory、KVEventBatch、KvMetrics、SchedulerKvEventsPublisher),并将 report_prefill_stats 和 report_decode_stats 中的调用改为 self.kv_events_publisher.emit_kv_metrics() 和 self.kv_events_publisher.publish_kv_events()。
- 更新
scheduler.py 中的调用:在 on_idle 方法中,将 self.publish_kv_events(self.kv_events_publisher) 改为 self.kv_events_publisher.publish_kv_events(),符合新组件的方法签名。
关键文件:
python/sglang/srt/managers/scheduler_components/kv_events_publisher.py(模块 调度器;类别 source;类型 core-logic;符号 init_kv_events, emit_kv_metrics, publish_kv_events): KV-cache 事件发射的新主场:接收了三个方法(init_kv_events、emit_kv_metrics、publish_kv_events),成为独立组件。变更包括新增导入和内部调用调整。
python/sglang/srt/observability/scheduler_metrics_mixin.py(模块 可观测性;类别 source;类型 core-logic;符号 init_kv_events, emit_kv_metrics, publish_kv_events): Mixin 职责清理:删除三个静态方法及相关导入,调用改为委托给 kv_events_publisher 实例。
python/sglang/srt/managers/scheduler.py(模块 调度器;类别 source;类型 core-logic): 调用点微调:on_idle 中调用方式从 self.publish_kv_events(self.kv_events_publisher) 改为 self.kv_events_publisher.publish_kv_events()
关键符号:init_kv_events, emit_kv_metrics, publish_kv_events
关键源码片段
python/sglang/srt/managers/scheduler_components/kv_events_publisher.py
KV-cache 事件发射的新主场:接收了三个方法(init_kv_events、emit_kv_metrics、publish_kv_events),成为独立组件。变更包括新增导入和内部调用调整。
from __future__ import annotations
import dataclasses
import time # 新增,用于 KVEventBatch 时间戳
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Callable,
Optional,
)
import zmq
from sglang.srt.disaggregation.kv_events import ( # 新增导入
EventPublisherFactory,
KVEventBatch,
)
if TYPE_CHECKING:
from sglang.srt.distributed.parallel_state_wrapper import ParallelState
from sglang.srt.mem_cache.base_prefix_cache import BasePrefixCache
# ... (KvMetrics 和 SchedulerKvEventsPublisher dataclass 定义省略 ) ...
@dataclass(kw_only=True, slots=True)
class SchedulerKvEventsPublisher:
# ... 字段定义保持不变 ...
def __post_init__(self) -> None:
# 以前通过跨 Mixin 调用 init_kv_events,现在直接调用自身实例方法
self.init_kv_events(self.kv_events_config)
def init_kv_events(self, kv_events_config: Optional[str]):
"""根据配置和并行状态启用 KV-cache 事件发布器"""
self.enable_kv_cache_events = bool(
kv_events_config and self.ps.attn_tp_rank == 0 and self.ps.attn_cp_rank == 0
)
if self.enable_kv_cache_events:
self.kv_event_publisher = EventPublisherFactory.create(
kv_events_config, self.ps.attn_dp_rank
)
def emit_kv_metrics(self):
"""收集并发送 KV-cache 指标(通过 ZMQ socket)"""
if not self.enable_kv_cache_events:
return
kv_metrics = KvMetrics()
kv_metrics.request_active_slots = self.get_stats().num_running_reqs.total
kv_metrics.request_total_slots = self.max_running_requests
kv_metrics.kv_active_blocks = int(
self.get_stats().token_usage * self.max_total_num_tokens
)
kv_metrics.kv_total_blocks = self.max_total_num_tokens
kv_metrics.num_requests_waiting = self.get_stats().num_queue_reqs.total
kv_metrics.gpu_cache_usage_perc = self.get_stats().token_usage
kv_metrics.gpu_prefix_cache_hit_rate = self.get_stats().cache_hit_rate
kv_metrics.data_parallel_rank = (
self.ps.dp_rank if self.ps.dp_rank is not None else 0
)
if not self.send_metrics_from_scheduler.closed:
self.send_metrics_from_scheduler.send_pyobj(kv_metrics)
def publish_kv_events(self):
"""从 tree cache 获取事件并发布"""
if not self.enable_kv_cache_events:
return
events = self.tree_cache.take_events()
if events:
batch = KVEventBatch(ts=time.time(), events=events)
self.kv_event_publisher.publish(batch)
评论区精华
无独立 review 讨论,PR 为单人一次性提交,无争议点。
风险与影响
- 风险:风险极低。变更仅为纯代码移动,方法体完全不变,调用点以简单前缀变换更新,没有逻辑修改。由于涉及调度器核心路径,回归风险理论上存在,但方法体 byte-identical 且无依赖行为变化,实际风险几乎为零。
- 影响:对用户无影响,对外部行为透明。对内,
SchedulerMetricsMixin 职责减少,SchedulerKvEventsPublisher 成为独立的 KV-cache 事件发射组件,便于后续维护和测试。影响范围限于调度器模块,无其他系统依赖。
- 风险标记:核心路径变更, 无测试配套
关联脉络
- PR #25627 Carve out SchedulerLoadInquirer for queue-load state: 同一重构链:将队列负载查询逻辑从 Mixin 剥离到独立组件,与本 PR 类似的设计模式。
- PR #25628 Move queue-load reporting to SchedulerLoadInquirer: 同上链,继续迁移职责。
- PR #25635 Move output streaming to SchedulerOutputStreamer: 类似职责拆分:将输出流从 Mixin 迁移到独立组件,手法与本 PR 一致。
- PR #25637 Move batch-result processing to SchedulerBatchResultProcessor and retire output_processor mixin: 同类重构:batch 结果处理逻辑的独立化。
参与讨论