Prhub

#22789 feat: emit per-iteration forward pass metrics via ZMQ PUB

原始 PR 作者 KrishnanPrash 合并时间 2026-05-13 01:28 文件变更 9 提交数 25 评论 33 代码增减 +905 / -5

执行摘要

新增基于 ZMQ PUB 的调度器前向传递指标实时推送

下游应用需要每轮迭代的调度遥测来基于负载做伸缩决策(TTFT/ITL 在线预测)。当前 Prometheus 拉取模式无法满足毫秒级实时性。PR body 明确说明 'Downstream applications needs per-iteration scheduling telemetry to make load-based scaling decisions (TTFT/ITL prediction via online regression).'

值得精读。关键设计决策:1)使用 IPC 而非 TCP 避免端口冲突和监听安全问题;2)复用 DeviceTimer 而非自行实现 GPU 计时,降低维护成本;3)通过 dp_rank 后缀解决多副本 IPC 冲突;4)waiting_queue 中所有请求视为 prefill 确保下游正确进行 TTFT 估计。建议同步关注下游 Dynamo 对 FPM 的消费逻辑。

讨论亮点

Review 中主要讨论了:wall_time 在事件循环重叠模式下的测量精度问题(tedzhouhk 指出 overlap 模式下 wall_time 会包含下一轮调度的开销);variance 应使用完整 prompt 长度还是 chunk 长度的争论(最终保留完整长度,因 attention 时间取决于全上下文);disaggregation 模式下 queued_requests 始终为零的 bug(需根据模式读取不同队列);IPC 端点冲突导致 dp_size>1 时所有进程绑定同一 IPC 路径(通过追加 dp_rank 后缀修复);是否重用 DeviceTimer 代替手动 CUDA 事件(最终实现 FPM 作为 DeviceTimer 的第二 reporter);代码组织建议抽离 _init_fpm 方法等。

实现拆解

实现过程分为以下几个步骤:

  1. 定义 FPM 数据模型:在 forward_pass_metrics.py 中定义了 WelfordAccumulator(用于在线计算方差)、ScheduledRequestMetricsQueuedRequestMetricsForwardPassMetrics,均使用 msgspec.Struct 实现零拷贝序列化。全局编解码器 encode/decode 用于与下游 Dynamo 兼容。

  2. 实现 ZMQ 发布线程_FpmPublisherThread 在后台通过 ZMQ PUB socket 发送序列化后的 ForwardPassMetrics 字节流。支持心跳机制(空闲时每秒发送空指标)。队列大小为 10,000,防止下游消费不及时阻塞调度器。

  3. 集成调度指标收集:在 SchedulerMetricsMixin 中新增类属性 enable_fpm 和方法 _init_fpm_emit_forward_pass_metrics 等。_init_fpm 在 tp_rank=0 且 pp_rank 为最后一个时启动 publisher,并根据 dp_rank 区分 IPC 端点避免冲突。

  4. 埋入调度器事件循环:在 scheduler.pyget_next_batch_to_run 中记录批次开始时间(fpm_start_time),在 process_batch_result 末尾调用 _emit_forward_pass_metrics 计算指标并投递给 publisher 线程。

  5. 复用 DeviceTimer 提升精度:利用已有 DeviceTimer(PR #24197 引入)替代手动 CUDA Event,FPM 注册为第二个 reporter 实现 GPU 精确的 wall_time。当环境变量 SGLANG_ENABLE_METRICS_DEVICE_TIMER 未定义时,FPM 自动创建独立的 DeviceTimer

  6. 支持 PD 分离部署:在 _build_queued_request_metrics 中根据 disaggregation_mode 读取正确队列(prefill 引擎读 bootstrap_queue,decode 引擎读 waiting_queue),并在 _build_scheduled_request_metrics 中通过 req.prefix_indices 准确计算前缀命中 token。

  7. 添加 CLI 配置:在 server_args.py 新增 enable_forward_pass_metricsforward_pass_metrics_worker_idforward_pass_metrics_ipc_name 三个参数,其中 IPC 名称由框架自动生成并注入。

  8. 编写测试套件:注册 CPU CI 的 test/registered/unit/observability/test_forward_pass_metrics.py 测试了 mixed batch 分离、disagg 分支、GPU timer 等核心逻辑;手动测试 test/manual/test_forward_pass_metrics.py 验证了 schema roundtrip、ZMQ PUB/SUB 端到端通信和心跳机制。

文件 模块 状态 重要度
python/sglang/srt/observability/forward_pass_metrics.py 遥测模块 added 9.04
python/sglang/srt/observability/scheduler_metrics_mixin.py 调度器集成 modified 8.92
test/registered/unit/observability/test_forward_pass_metrics.py 单元测试 added 8.05
test/manual/test_forward_pass_metrics.py 手动测试 added 7.45
python/sglang/srt/managers/scheduler.py 调度器 modified 6.25
python/sglang/srt/server_args.py 服务配置 modified 6.0
python/sglang/srt/utils/device_timer.py 设备计时 modified 6.37
python/sglang/srt/managers/schedule_batch.py 调度批次 modified 5.07
python/sglang/srt/managers/utils.py 工具 modified 5.07

关键符号

WelfordAccumulator.add WelfordAccumulator.variance ScheduledRequestMetrics QueuedRequestMetrics ForwardPassMetrics encode decode _FpmPublisherThread.__init__ _FpmPublisherThread.publish _FpmPublisherThread.shutdown SchedulerMetricsMixin._init_fpm SchedulerMetricsMixin._build_scheduled_request_metrics SchedulerMetricsMixin._build_queued_request_metrics SchedulerMetricsMixin._emit_forward_pass_metrics SchedulerMetricsMixin._shutdown_fpm DeviceTimer.add_reporter Scheduler.get_next_batch_to_run Scheduler.process_batch_result Scheduler.run_scheduler_process

关键源码片段

python/sglang/srt/observability/forward_pass_metrics.py core-logic

新增 FPM 核心模块,定义数据模型、Welford 累加器、ZMQ 发布线程和编解码器,是 PR 的核心内容。

# Welford 在线方差累加器
class WelfordAccumulator:
    __slots__ = ('count', 'total', '_mean', '_m2')
    def __init__(self):
        self.count = 0
        self.total = 0
        self._mean = 0.0
        self._m2 = 0.0
​
    def add(self, v: int) -> None:
        self.count += 1
        self.total += v
        delta = v - self._mean
        self._mean += delta / self.count
        delta2 = v - self._mean
        self._m2 += delta * delta2
​
    def variance(self) -> float:
        if self.count == 0:
            return 0.0
        return self._m2 / self.count# msgspec Struct,位置编码以便与下游 Dynamo 兼容
class ScheduledRequestMetrics(msgspec.Struct, frozen=True, gc=False):
    num_prefill_requests: int = 0
    sum_prefill_tokens: int = 0
    var_prefill_length: float = 0.0
    sum_prefill_kv_tokens: int = 0
    num_decode_requests: int = 0
    sum_decode_kv_tokens: int = 0
    var_decode_kv_tokens: float = 0.0class QueuedRequestMetrics(msgspec.Struct, frozen=True, gc=False):
    num_prefill_requests: int = 0
    sum_prefill_tokens: int = 0
    var_prefill_length: float = 0.0
    num_decode_requests: int = 0
    sum_decode_kv_tokens: int = 0
    var_decode_kv_tokens: float = 0.0class ForwardPassMetrics(msgspec.Struct, frozen=True, gc=False):
    version: int = 1
    worker_id: str = ''
    dp_rank: int = 0
    counter_id: int = 0
    wall_time: float = 0.0
    scheduled_requests: ScheduledRequestMetrics = ScheduledRequestMetrics()
    queued_requests: QueuedRequestMetrics = QueuedRequestMetrics()# 模块级编码器,线程安全
_encoder = msgspec.msgpack.Encoder()
_decoder = msgspec.msgpack.Decoder(ForwardPassMetrics)
def encode(metrics: ForwardPassMetrics) -> bytes:
    return _encoder.encode(metrics)
def decode(data: bytes) -> ForwardPassMetrics:
    return _decoder.decode(data)

评论区精华

wall_time 在 overlap 模式下的测量精度 设计

tedzhouhk 指出在 event_loop_overlap 里 wall_time 包含下一轮调度开销,建议调整测量位置或文档化行为。

结论:采纳文档化说明的方式,保留现有测量点,因 ~1ms 误差对下游可接受。 · 已解决

variance 使用完整 prompt 长度 vs chunk 长度 正确性

gemini-code-assist 建议使用 req.extend_input_len 匹配 chunk 工作;tedzhouhk 反驳认为方差用于建模 attention 时间,应使用完整长度。

结论:保留 len(req.origin_input_ids),与 sum_prefill_tokens 语义区分。 · 已解决

disaggregation 模式下 queued_requests 始终为零 正确性

tedzhouhk 和 PeaBrane 均指出 waiting_queue 在 disagg 模式为空,导致 queued_requests 始终为零。作者修复为根据 disaggregation_mode 读取不同队列。

结论:已修复:PREFILL 模式读 bootstrap_queue,DECODE 读 waiting_queue。 · 已解决

IPC 传输 vs TCP 设计

sufeng-buaa 建议使用 IPC 而非 TCP 以避免端口管理;作者改为 tempfile 生成 IPC 路径。

结论:改为 IPC 传输,通过 server_args.forward_pass_metrics_ipc_name 暴露路径。 · 已解决

复用 DeviceTimer 替代手写 CUDA Event 设计

sufeng-buaa 指出已有 DeviceTimer(PR #24197),可以复用而非手动记录 CUDA Event。作者最终注册为第二 reporter。

结论:FPM 不再自建 CUDA Event,通过 DeviceTimer.add_reporter 集成 GPU 计时。 · 已解决

IPC 端点 dp_rank 冲突 设计

tedzhouhk 发现 dp_size>1 时所有 DP 进程使用同一 IPC 路径导致冲突;作者追加 .dp_rank 后缀修复。

结论:已修复,endpoint 格式为 '{base}.{dp_rank}'。 · 已解决

风险与影响

  1. 墙时间测量精度:在 event_loop_overlap 模式下,wall_time 可能包含~1ms 的下一轮调度开销,可能引入噪声,需下游消费者理解此语义。
  2. schema 兼容性:ForwardPassMetrics 字段顺序必须与下游 Dynamo 严格一致,msgspec 使用位置编码,任何顺序错位都会静默数据损坏。已通过版本号和跨仓库协作文档约束,但部署时需确保两端同步。
  3. disagg 队列指标:已修复但生产环境中不同 disagg 实现的队列行为可能仍有差异,需持续验证。
  4. IPC 端点清理:finally 块确保 publisher 关闭,但若 scheduler 初始化中途异常退出,ZMQ 上下文可能未正确终结,遗留 IPC 文件。
  5. 默认关闭不会造成性能影响,但开启后 ZMQ 后台线程占用少量 CPU 和内存。

用户:默认无影响;开启后可在外部消费毫秒级调度指标,用于自动伸缩、TTFT/ITL 预测等。系统:每个调度器进程增加一个 ZMQ PUB 线程(空闲时 CPU 占用~0),队列最大 10k 条消息。团队:新增 forward_pass_metrics.py 模块需与下游 Dynamo 团队共同维护 schema;测试覆盖了 CI 注册测试和手动 PUB/SUB 测试。

核心路径变更 跨团队接口依赖 默认关闭回归风险 DP 多副本兼容性 IPC 清理安全

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论