Prhub

#25629 Add SchedulerMetricsReporter and route metrics state through it

原始 PR 作者 fzyzcjy 合并时间 2026-05-18 18:41 文件变更 10 提交数 1 评论 1 代码增减 +406 / -220

执行摘要

引入指标记者组件并重构度量状态路由

PR是重构链条'introduce-metrics-reporter'的准备工作。描述中说'Inplace prep for the introduce-metrics-reporter mech move.'目的是逐步将度量逻辑从Mixin模式迁移到独立的组件类,以降低Scheduler类的复杂性,并为未来可能的功能扩展(如更丰富的度量收集)打下基础。

建议精读该PR以理解SGLang调度器重构的策略和实践。重点关注如何通过@staticmethod将mixin方法转换为静态方法并显式传入reporter实例,从而实现逐步剥离。此设计可以推广到其他类似的重构场景。

讨论亮点

PR无实质人工讨论,仅有一条gemini-code-assist的自动配额提示。由于作者也是合并者,且这是一个准备性提交,设计决策主要由作者单独决定。该PR是重构链条中的一环,因此设计权衡(如为什么选择Option b保持度量所有者)已在PR描述中说明:metrics_collector在Scheduler构造时仍由Scheduler持有,因为init_model_worker在reporter创建前需要读取它。

实现拆解

实现分为以下步骤:

  1. 创建SchedulerMetricsReporter组件scheduler_components/metrics_reporter.py):定义数据类,包含对Scheduler的引用、TP/PP/DP rank、metrics_collector_contextmetrics_collector等字段。__post_init__方法调用SchedulerMetricsMixin._init_metrics_install_device_timer_on_runners完成初始化。
  2. 引入SchedulerMetricsCollectorContextmetrics_collector.py):定义不可变数据类,封装enable_metricsis_stats_logging_rank等布尔标志以及一个可选的SchedulerMetricsCollector实例。新增init_new类方法构建该上下文,将原来在Scheduler.__init__中的计算逻辑迁移到工厂方法中。
  3. 改造SchedulerMetricsMixinobservability/scheduler_metrics_mixin.py):将init_metricsinstall_device_timer_on_runners_init_fpm等方法改为@staticmethod,第一个参数类型限定为SchedulerMetricsReporter,方法体内部原本访问self.server_args等改为self.scheduler.server_args。这样方法不再绑定到具体的mixin实例,而是操作传入的reporter对象。
  4. 修改Scheduler初始化scheduler.py):用SchedulerMetricsCollector.init_new替代原来的self.init_metrics,然后创建SchedulerMetricsReporter实例并赋值给self.metrics_reporter。原来直接调用的self.install_device_timer_on_runners()被移除(由reporter的__post_init__调用)。所有对self.statsself.num_generated_tokens等度量属性的访问改为self.metrics_reporter.stats等。
  5. 同步修改下游调用者:在scheduler_output_processor_mixin.pydisaggregation/prefill.pydisaggregation/decode.pydllm/mixin/scheduler.pyencode_receiver.py中,将原来直接访问self.enable_metricsself.num_generated_tokensself.kv_transfer_speed_gb_s等改为通过self.metrics_reporter访问。report_prefill_statsreport_decode_statsupdate_spec_metrics也增加metrics_reporter参数。
  6. 测试文件调整test_scheduler_chunked_req_gate.py增加少量import。
文件 模块 状态 重要度
python/sglang/srt/managers/scheduler_components/metrics_reporter.py 度量层 added 8.17
python/sglang/srt/observability/scheduler_metrics_mixin.py 度量层 modified 9.21
python/sglang/srt/observability/metrics_collector.py 度量层 modified 7.72
python/sglang/srt/managers/scheduler.py 调度器 modified 7.55

关键符号

_init_metrics _install_device_timer_on_runners init_new SchedulerMetricsReporter.__post_init__ update_spec_metrics

关键源码片段

python/sglang/srt/managers/scheduler_components/metrics_reporter.py core-logic

新引入的组件,承载度量状态和初始化逻辑,是重构的核心产物。

from __future__ import annotationsimport logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optionalfrom sglang.srt.observability.metrics_collector import (
    SchedulerMetricsCollector,
    SchedulerMetricsCollectorContext,
)
from sglang.srt.observability.scheduler_metrics_mixin import (
    SchedulerMetricsMixin,
)if TYPE_CHECKING:
    from sglang.srt.managers.scheduler import Scheduler
​
​
@dataclass(kw_only=True)
class SchedulerMetricsReporter:
    """将调度器的度量状态集中管理的组件。"""
    scheduler: "Scheduler"
    tp_rank: int
    pp_rank: int
    dp_rank: Optional[int]
    metrics_collector_context: SchedulerMetricsCollectorContext
    metrics_collector: Optional[SchedulerMetricsCollector]
    num_retracted_reqs: int = 0
    num_paused_reqs: int = 0
​
    def __post_init__(self) -> None:
        # 从上下文中解包供热路径快速访问的标志,转换为 Reporter 自身的属性
        self.enable_metrics = self.metrics_collector_context.enable_metrics
        self.is_stats_logging_rank = (
            self.metrics_collector_context.is_stats_logging_rank
        )
        self.current_scheduler_metrics_enabled = (
            self.metrics_collector_context.current_scheduler_metrics_enabled
        )
        self.enable_kv_cache_events = (
            self.metrics_collector_context.enable_kv_cache_events
        )
        # 委托给 Mixin 中的静态初始化方法,完成具体字段的赋值
        SchedulerMetricsMixin._init_metrics(
            self, self.tp_rank, self.pp_rank, self.dp_rank
        )
        SchedulerMetricsMixin._install_device_timer_on_runners(self)
python/sglang/srt/observability/scheduler_metrics_mixin.py core-logic

核心逻辑变更,将方法由普通实例方法改为 @staticmethod,第一参数变为 reporter 类型,并调整内部访问路径。

    @staticmethod
    def _init_metrics(
        self: "SchedulerMetricsReporter",
        tp_rank: int,
        pp_rank: int,
        dp_rank: Optional[int],
    ):
        # Basic stats
        self.forward_ct_decode = 0
        self.num_generated_tokens = 0
        # ... 省略相同部分 ...
        # 原 getattr(self, "device", "") 改为 getattr(self.scheduler, "device", "")
        self._graph_backend_label = {
            "cpu": "cpu graph",
            "npu": "npu graph",
            "musa": "musa graph",
        }.get(getattr(self.scheduler, "device", ""), "cuda graph")
​
        # ... 后续调整
        if self.enable_metrics:
            self.enable_mfu_metrics = self.scheduler.server_args.enable_mfu_metrics
            if self.enable_mfu_metrics:
                SchedulerMetricsMixin._init_estimated_perf_constants(self)
                # ...
python/sglang/srt/observability/metrics_collector.py core-logic

新增 SchedulerMetricsCollectorContext 数据类和 SchedulerMetricsCollector.init_new 工厂方法,将上下文构建集中化。

@dataclass(kw_only=True, frozen=True, slots=True)
class SchedulerMetricsCollectorContext:
    """封装调度器初始化时计算的度量配置。"""
    enable_metrics: bool
    is_stats_logging_rank: bool
    current_scheduler_metrics_enabled: bool
    enable_kv_cache_events: bool
    collector: Optional["SchedulerMetricsCollector"]
​
​
class SchedulerMetricsCollector:
    # ... 原有类 ...
    @classmethod
    def init_new(
        cls,
        *,
        server_args: "ServerArgs",
        ps: Any,
        tp_rank: int,
        pp_rank: int,
        dp_rank: Optional[int],
        enable_priority_scheduling: bool,
        enable_lora: bool,
        enable_hierarchical_cache: bool,
    ) -> "SchedulerMetricsCollectorContext":
        enable_metrics = server_args.enable_metrics
        is_stats_logging_rank = ps.attn_tp_rank == 0
        current_scheduler_metrics_enabled = enable_metrics and (
            is_stats_logging_rank or server_args.enable_metrics_for_all_schedulers
        )
        enable_kv_cache_events = bool(
            server_args.kv_events_config
            and ps.attn_tp_rank == 0
            and ps.attn_cp_rank == 0
        )
        collector: Optional["SchedulerMetricsCollector"] = None
        if enable_metrics:
            engine_type = DisaggregationMode.to_engine_type(
                server_args.disaggregation_mode
            )
            labels = {
                "model_name": server_args.served_model_name,
                "engine_type": engine_type,
                "tp_rank": tp_rank,
                "pp_rank": pp_rank,
                "moe_ep_rank": ps.moe_ep_rank,
            }
            if enable_priority_scheduling:
                labels["priority"] = ""
            if dp_rank is not None:
                labels["dp_rank"] = dp_rank
            if server_args.extra_metric_labels:
                labels.update(server_args.extra_metric_labels)
            collector = cls(
                labels=labels,
                enable_lora=enable_lora,
                enable_hierarchical_cache=enable_hierarchical_cache,
                enable_streaming_session=server_args.enable_streaming_session,
                server_args=server_args,
            )
        return SchedulerMetricsCollectorContext(
            enable_metrics=enable_metrics,
            is_stats_logging_rank=is_stats_logging_rank,
            current_scheduler_metrics_enabled=current_scheduler_metrics_enabled,
            enable_kv_cache_events=enable_kv_cache_events,
            collector=collector,
        )

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

将度量状态从Scheduler实例变量迁移到SchedulerMetricsReporter,改变了热路径上的数据访问方式(原先的直接属性访问变为通过self.metrics_reporter.X),可能引入性能微退化。由于涉及多个文件,如果新reporter初始化顺序出错可能导致属性未初始化的运行时错误。Scheduler.__init__中reporter的创建位置(在init_pub_sub_connectionsinit_diffusion_llm之后,但在init_schedule_policy之前)需确保所有依赖就绪。另外,Mixin方法改为@staticmethod后,若遗漏某个调用点的参数调整会引发TypeError。当前没有增加新的测试覆盖,回归风险需要通过现有CI验证。

无用户可见功能变化。对团队而言,该PR继续推进调度器重构,将SchedulerMetricsMixin的职责逐步剥离到独立组件,减轻了Scheduler类的负担,有利于后续功能扩展和维护。影响范围包括调度器核心路径(run_batchprocess_batch_result等)以及disaggregation预填充和解码模块。由于代码变化量较大(+406/-220行),但本质是机械的重构,功能等价性较高。

核心路径变更 依赖顺序敏感 缺少测试覆盖 多个调用点需要同步

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论