Prhub

#25444 Bundle Scheduler rank/size fields into a frozen ParallelState

原始 PR 作者 fzyzcjy 合并时间 2026-05-16 09:23 文件变更 14 提交数 1 评论 1 代码增减 +209 / -153

执行摘要

将调度器 17 个 rank/size 字段封装为冻结的 ParallelState 值对象

调度器中分散着大量与并行拓扑相关的整型字段(tp_rank, pp_size, attn_cp_rank 等),它们总是成组出现且初始化后不应改变。将它们捆绑进一个冻结的 dataclass,能明确表示它们属于同一概念模块,防止误修改,简化构造函数签名,并为后续将 ParallelState 作为参数传递给其他组件提供单一事实来源。

建议仔细阅读 parallel_state_wrapper.pyParallelState 的定义和 scheduler.py 中构建它的逻辑,理解作者如何通过值对象模式统一管理并行拓扑信息。对于代码评审者,重点关注是否有任何 self.<rank/size field> 仍留在未修改的文件中(尤其是条件编译或特定后端路径)。

讨论亮点

本 PR 无常规 review 讨论。作者在 PR body 中说明了使用 frozen=True, slots=True, kw_only=True 的设计选择,强调值对象的不可变性,并指出通过 squash 修复 disagg 遗漏来保持 commit 与逻辑映射的清晰。

实现拆解

  1. 定义 ParallelState dataclass:在新增文件 python/sglang/srt/distributed/parallel_state_wrapper.py 中创建 @dataclass(frozen=True, slots=True, kw_only=True) 类,包含 17 个字段,涵盖所有 rank/size 属性。
  2. 修改 Scheduler.init:移除原先逐个 self.tp_rank = tp_rank 的赋值方式,改为调用 ParallelState(...) 构造并赋值给 self.ps;从函数参数中删除曾经冗余的 gpu_id(现在由 ps.gpu_id 承载),同时调整 compute_dp_attention_world_info 调用以使用局部变量。
  3. 逐个替换 Mixin 与消费者:在 scheduler_pp_mixin.pyscheduler_profiler_mixin.pyscheduler_metrics_mixin.pyscheduler_output_processor_mixin.pyscheduler_dp_attn_mixin.pydisaggregation/prefill.pydisaggregation/decode.pyray/scheduler_actor.pylayers/dp_attention.py 中,将所有 self.<field> 替换为 self.ps.<field>,涉及日志、条件判断、构造函数参数以及数据处理逻辑。
  4. 修复 Disaggregation 遗漏:在 scheduler_profiler_mixin.pyinit_profilerstart_profile 中,将 self.tp_rankself.gpu_id 改为 self.ps.tp_rankself.ps.gpu_id,同时更新了 RPD profile 路径构建及 TP rank 判断逻辑,确保与新版 ParallelState 一致。
  5. 更新测试:在 test/registered/unit/observability/test_forward_pass_metrics.py 中添加辅助函数 _make_ps,用于快速构造具有合理默认值的 ParallelState 实例,调整现有测试用例使用 scheduler.ps = _make_ps(...) 替代直接设置属性。
文件 模块 状态 重要度
python/sglang/srt/distributed/parallel_state_wrapper.py 分布式状态 added 7.39
python/sglang/srt/managers/scheduler.py 调度器 modified 7.53
test/registered/unit/observability/test_forward_pass_metrics.py 测试 modified 5.49

关键符号

ParallelState _make_ps

关键源码片段

python/sglang/srt/managers/scheduler.py dependency-wiring

调度器主入口,修改了 __init__ 构造函数:移除分散的字段赋值,引入 self.ps 构造。

# 在 __init__ 中替换了原先零散的 self.tp_rank = tp_rank 等赋值
# 改为统一构造 ParallelState 值对象
self.ps = ParallelState(
    tp_rank=tp_rank,
    tp_size=server_args.tp_size,
    pp_rank=pp_rank,
    pp_size=server_args.pp_size,
    dp_rank=dp_rank,
    dp_size=server_args.dp_size,
    attn_tp_rank=attn_tp_rank,
    attn_tp_size=attn_tp_size,
    attn_cp_rank=attn_cp_rank,
    attn_cp_size=server_args.attn_cp_size,
    attn_dp_rank=attn_dp_rank,
    attn_dp_size=attn_dp_size,
    moe_ep_rank=moe_ep_rank,
    moe_ep_size=server_args.ep_size,
    moe_dp_rank=moe_dp_rank,
    moe_dp_size=server_args.moe_dp_size,
    gpu_id=gpu_id,
)
# 之后所有消费者通过 self.ps.xxx 访问并行状态

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

  • 遗漏引用:重构涉及 14 个文件的机械替换,虽然 diff 覆盖了所有显式出现的字段,但隐式或动态访问(如 getattr(self, 'tp_rank') 或通过字符串拼接)可能未被完全替换。PR 作者已专门扫描并修复了 disagg 中的两处遗漏,但其他非常用路径(如条件编译 NPU/ROCM 分支)仍需回归测试验证。
  • 初始化顺序self.ps__init__ 中构造较靠后(在第 400 行附近),若此前有任何方法使用了 self.ps 将引发 AttributeError。通过检查代码,__init__ 中在赋值 self.ps 之前未调用其他访问并行字段的方法,风险可控。
  • 兼容性:对用户透明,无 API 或配置变更。
  • 开发团队:代码可读性与可维护性提升,并行状态现在具有显式类型和不可变性,减少了误用可能;但所有涉及并行字段的代码需要适应 self.ps. 前缀。
  • 功能影响:无功能变化,所有行为应与之前一致。
  • 测试覆盖:核心调度路径由现有多组 CI 覆盖(等待 run-ci 标签),测试文件已同步更新。
  • 重构链:本 PR 是“parallel-state”重构链的基础步骤,后续 PR(如 #25445)将依赖此 ParallelState 类。
大规模机械替换 核心路径变更

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论