执行摘要
- 一句话:引入指标记者组件并重构度量状态路由
- 推荐动作:建议精读该PR以理解SGLang调度器重构的策略和实践。重点关注如何通过
@staticmethod将mixin方法转换为静态方法并显式传入reporter实例,从而实现逐步剥离。此设计可以推广到其他类似的重构场景。
功能与动机
PR是重构链条'introduce-metrics-reporter'的准备工作。描述中说'Inplace prep for the introduce-metrics-reporter mech move.'目的是逐步将度量逻辑从Mixin模式迁移到独立的组件类,以降低Scheduler类的复杂性,并为未来可能的功能扩展(如更丰富的度量收集)打下基础。
实现拆解
实现分为以下步骤:
- 创建
SchedulerMetricsReporter组件(scheduler_components/metrics_reporter.py):定义数据类,包含对Scheduler的引用、TP/PP/DP rank、metrics_collector_context和metrics_collector等字段。__post_init__方法调用SchedulerMetricsMixin._init_metrics和_install_device_timer_on_runners完成初始化。
- 引入
SchedulerMetricsCollectorContext(metrics_collector.py):定义不可变数据类,封装enable_metrics、is_stats_logging_rank等布尔标志以及一个可选的SchedulerMetricsCollector实例。新增init_new类方法构建该上下文,将原来在Scheduler.__init__中的计算逻辑迁移到工厂方法中。
- 改造
SchedulerMetricsMixin(observability/scheduler_metrics_mixin.py):将init_metrics、install_device_timer_on_runners、_init_fpm等方法改为@staticmethod,第一个参数类型限定为SchedulerMetricsReporter,方法体内部原本访问self.server_args等改为self.scheduler.server_args。这样方法不再绑定到具体的mixin实例,而是操作传入的reporter对象。
- 修改
Scheduler初始化(scheduler.py):用SchedulerMetricsCollector.init_new替代原来的self.init_metrics,然后创建SchedulerMetricsReporter实例并赋值给self.metrics_reporter。原来直接调用的self.install_device_timer_on_runners()被移除(由reporter的__post_init__调用)。所有对self.stats、self.num_generated_tokens等度量属性的访问改为self.metrics_reporter.stats等。
- 同步修改下游调用者:在
scheduler_output_processor_mixin.py、disaggregation/prefill.py、disaggregation/decode.py、dllm/mixin/scheduler.py、encode_receiver.py中,将原来直接访问self.enable_metrics、self.num_generated_tokens、self.kv_transfer_speed_gb_s等改为通过self.metrics_reporter访问。report_prefill_stats、report_decode_stats、update_spec_metrics也增加metrics_reporter参数。
- 测试文件调整:
test_scheduler_chunked_req_gate.py增加少量import。
关键文件:
python/sglang/srt/managers/scheduler_components/metrics_reporter.py(模块 度量层;类别 source;类型 core-logic;符号 SchedulerMetricsReporter, post_init): 新引入的组件,承载度量状态和初始化逻辑,是重构的核心产物。
python/sglang/srt/observability/scheduler_metrics_mixin.py(模块 度量层;类别 source;类型 core-logic;符号 _init_metrics, _install_device_timer_on_runners, _init_fpm, update_spec_metrics): 核心逻辑变更,将方法由普通实例方法改为@staticmethod,第一参数变为reporter类型,并调整内部访问路径。
python/sglang/srt/observability/metrics_collector.py(模块 度量层;类别 source;类型 core-logic;符号 SchedulerMetricsCollectorContext, init_new): 新增SchedulerMetricsCollectorContext数据类和SchedulerMetricsCollector.init_new工厂方法,将上下文构建集中化。
python/sglang/srt/managers/scheduler.py(模块 调度器;类别 source;类型 dependency-wiring): 主调度器类接入新组件,导入、初始化、代理访问的变更集中于此。
关键符号:_init_metrics, _install_device_timer_on_runners, init_new, SchedulerMetricsReporter.post_init, update_spec_metrics
关键源码片段
python/sglang/srt/managers/scheduler_components/metrics_reporter.py
新引入的组件,承载度量状态和初始化逻辑,是重构的核心产物。
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional
from sglang.srt.observability.metrics_collector import (
SchedulerMetricsCollector,
SchedulerMetricsCollectorContext,
)
from sglang.srt.observability.scheduler_metrics_mixin import (
SchedulerMetricsMixin,
)
if TYPE_CHECKING:
from sglang.srt.managers.scheduler import Scheduler
@dataclass(kw_only=True)
class SchedulerMetricsReporter:
"""将调度器的度量状态集中管理的组件。"""
scheduler: "Scheduler"
tp_rank: int
pp_rank: int
dp_rank: Optional[int]
metrics_collector_context: SchedulerMetricsCollectorContext
metrics_collector: Optional[SchedulerMetricsCollector]
num_retracted_reqs: int = 0
num_paused_reqs: int = 0
def __post_init__(self) -> None:
# 从上下文中解包供热路径快速访问的标志,转换为 Reporter 自身的属性
self.enable_metrics = self.metrics_collector_context.enable_metrics
self.is_stats_logging_rank = (
self.metrics_collector_context.is_stats_logging_rank
)
self.current_scheduler_metrics_enabled = (
self.metrics_collector_context.current_scheduler_metrics_enabled
)
self.enable_kv_cache_events = (
self.metrics_collector_context.enable_kv_cache_events
)
# 委托给 Mixin 中的静态初始化方法,完成具体字段的赋值
SchedulerMetricsMixin._init_metrics(
self, self.tp_rank, self.pp_rank, self.dp_rank
)
SchedulerMetricsMixin._install_device_timer_on_runners(self)
python/sglang/srt/observability/scheduler_metrics_mixin.py
核心逻辑变更,将方法由普通实例方法改为@staticmethod,第一参数变为reporter类型,并调整内部访问路径。
@staticmethod
def _init_metrics(
self: "SchedulerMetricsReporter",
tp_rank: int,
pp_rank: int,
dp_rank: Optional[int],
):
# Basic stats
self.forward_ct_decode = 0
self.num_generated_tokens = 0
# ... 省略相同部分 ...
# 原 getattr(self, "device", "") 改为 getattr(self.scheduler, "device", "")
self._graph_backend_label = {
"cpu": "cpu graph",
"npu": "npu graph",
"musa": "musa graph",
}.get(getattr(self.scheduler, "device", ""), "cuda graph")
# ... 后续调整
if self.enable_metrics:
self.enable_mfu_metrics = self.scheduler.server_args.enable_mfu_metrics
if self.enable_mfu_metrics:
SchedulerMetricsMixin._init_estimated_perf_constants(self)
# ...
python/sglang/srt/observability/metrics_collector.py
新增SchedulerMetricsCollectorContext数据类和SchedulerMetricsCollector.init_new工厂方法,将上下文构建集中化。
@dataclass(kw_only=True, frozen=True, slots=True)
class SchedulerMetricsCollectorContext:
"""封装调度器初始化时计算的度量配置。"""
enable_metrics: bool
is_stats_logging_rank: bool
current_scheduler_metrics_enabled: bool
enable_kv_cache_events: bool
collector: Optional["SchedulerMetricsCollector"]
class SchedulerMetricsCollector:
# ... 原有类 ...
@classmethod
def init_new(
cls,
*,
server_args: "ServerArgs",
ps: Any,
tp_rank: int,
pp_rank: int,
dp_rank: Optional[int],
enable_priority_scheduling: bool,
enable_lora: bool,
enable_hierarchical_cache: bool,
) -> "SchedulerMetricsCollectorContext":
enable_metrics = server_args.enable_metrics
is_stats_logging_rank = ps.attn_tp_rank == 0
current_scheduler_metrics_enabled = enable_metrics and (
is_stats_logging_rank or server_args.enable_metrics_for_all_schedulers
)
enable_kv_cache_events = bool(
server_args.kv_events_config
and ps.attn_tp_rank == 0
and ps.attn_cp_rank == 0
)
collector: Optional["SchedulerMetricsCollector"] = None
if enable_metrics:
engine_type = DisaggregationMode.to_engine_type(
server_args.disaggregation_mode
)
labels = {
"model_name": server_args.served_model_name,
"engine_type": engine_type,
"tp_rank": tp_rank,
"pp_rank": pp_rank,
"moe_ep_rank": ps.moe_ep_rank,
}
if enable_priority_scheduling:
labels["priority"] = ""
if dp_rank is not None:
labels["dp_rank"] = dp_rank
if server_args.extra_metric_labels:
labels.update(server_args.extra_metric_labels)
collector = cls(
labels=labels,
enable_lora=enable_lora,
enable_hierarchical_cache=enable_hierarchical_cache,
enable_streaming_session=server_args.enable_streaming_session,
server_args=server_args,
)
return SchedulerMetricsCollectorContext(
enable_metrics=enable_metrics,
is_stats_logging_rank=is_stats_logging_rank,
current_scheduler_metrics_enabled=current_scheduler_metrics_enabled,
enable_kv_cache_events=enable_kv_cache_events,
collector=collector,
)
评论区精华
PR无实质人工讨论,仅有一条gemini-code-assist的自动配额提示。由于作者也是合并者,且这是一个准备性提交,设计决策主要由作者单独决定。该PR是重构链条中的一环,因此设计权衡(如为什么选择Option b保持度量所有者)已在PR描述中说明:metrics_collector在Scheduler构造时仍由Scheduler持有,因为init_model_worker在reporter创建前需要读取它。
风险与影响
- 风险:将度量状态从
Scheduler实例变量迁移到SchedulerMetricsReporter,改变了热路径上的数据访问方式(原先的直接属性访问变为通过self.metrics_reporter.X),可能引入性能微退化。由于涉及多个文件,如果新reporter初始化顺序出错可能导致属性未初始化的运行时错误。Scheduler.__init__中reporter的创建位置(在init_pub_sub_connections和init_diffusion_llm之后,但在init_schedule_policy之前)需确保所有依赖就绪。另外,Mixin方法改为@staticmethod后,若遗漏某个调用点的参数调整会引发TypeError。当前没有增加新的测试覆盖,回归风险需要通过现有CI验证。
- 影响:无用户可见功能变化。对团队而言,该PR继续推进调度器重构,将
SchedulerMetricsMixin的职责逐步剥离到独立组件,减轻了Scheduler类的负担,有利于后续功能扩展和维护。影响范围包括调度器核心路径(run_batch、process_batch_result等)以及disaggregation预填充和解码模块。由于代码变化量较大(+406/-220行),但本质是机械的重构,功能等价性较高。
- 风险标记:核心路径变更, 依赖顺序敏感, 缺少测试覆盖, 多个调用点需要同步
关联脉络
- PR #25630 Move metrics reporting to SchedulerMetricsReporter and retire metrics mixin: 同为指标重构链的后续提交,将方法物理迁移到Reporter类并移除Mixin。
- PR #25631 Move idle-metrics logging to SchedulerMetricsReporter: 同系列重构,进一步将空闲指标日志迁移到Reporter。
参与讨论