Prhub

#21887 [Ray] Add data parallel (DP) and DP attention support to RayEngine

sgl-project/sglang · 作者 xyuzh · 合并时间 2026-04-16 06:00

分析状态 已生成
文件变更 3提交数 9 · 评论 2
代码增减 +505 / -104
feature ray scheduling run-ci

执行摘要

为 RayEngine 添加数据并行和 DP 注意力支持,扩展多 GPU 推理能力。

根据 PR body,动机是支持在 Ray 环境中使用数据并行,当 dp_size > 1 或 enable_dp_attention 设置时,之前会抛出 NotImplementedError,这限制了 RayEngine 在多 GPU 场景下的扩展性。

建议精读 RayDataParallelController 类的实现,了解如何将 Ray actors 集成到现有数据并行框架中,并覆盖基类方法。同时关注提交历史中的调整点,如返回类型修复和安全绑定,这些是重要的设计决策和陷阱规避。

讨论亮点

Review 中没有具体评论,但提交历史显示多次迭代:例如修复返回类型以匹配基类期望,避免运行时 ValueError;以及绑定 ZMQ 套接字到特定 IP 以提高安全性。这表明在实现过程中关注了正确性和安全细节,但未在 review 中形成显式讨论。

实现拆解

  1. 新增 RayDataParallelController 类:在 python/sglang/srt/ray/data_parallel_controller.py 中定义,继承自 DataParallelController,覆盖 launch_dp_schedulerslaunch_dp_attention_schedulers 方法,使用 Ray actors(SchedulerActor)替代多进程,处理端口绑定和 ZMQ 路由。
  2. 修改 RayEngine 初始化逻辑:在 python/sglang/srt/ray/engine.py 中,移除对 dp_size > 1NotImplementedError,根据 dp_size 分支处理:dp_size == 1 时沿用原有张量并行逻辑,否则调用 RayDataParallelController;添加 import threading 支持并发控制。
  3. 更新测试覆盖:在 test/manual/test_ray_engine.py 中添加测试类 TestRayEngineOfflineDP2TestRayEngineOfflineDPAttention,扩展 _create_engine_on_pg 函数以支持 dp_size 参数,验证 DP 和 DP-attention 功能。
  4. 安全修复:在提交历史中,修复 ZMQ 套接字绑定,使用 get_zmq_socket_on_host 绑定到 rank0_node_ip,避免暴露未认证套接字(CVE-2026-3060)。
  5. 代码结构调整:通过多个提交调整方法顺序和返回类型(如修复 _launch_scheduler_processes 返回 tuple[SchedulerInitResult, None]),以保持与基础引擎的一致性。
文件 模块 状态 重要度
python/sglang/srt/ray/data_parallel_controller.py Ray 控制器 added 9.25
python/sglang/srt/ray/engine.py Ray 引擎 modified 8.55
test/manual/test_ray_engine.py 测试套件 modified 7.82
python/sglang/srt/ray/data_parallel_controller.py entrypoint

新增 Ray 专用的数据并行控制器,核心实现 DP 支持,覆盖基类方法以使用 Ray actors 替代多进程。

class RayDataParallelController(DataParallelController):
    """DataParallelController that uses Ray actors for scheduler processes.    覆盖进程生成方法以创建 SchedulerActor Ray actors,而非 mp.Process。
    在进程中运行(非独立 mp.Process),复用父级的事件循环、分发和 ZMQ 路由。
    """
​
    def __init__(
        self,
        server_args: ServerArgs,
        port_args: PortArgs,
        placement_group,
        bundle_for_node: List[int],
        rank0_node_ip: str,
    ):
        # 在调用 super().__init__ 之前设置 Ray 特定属性,因为父构造函数会调用我们覆盖的 launch 方法
        self.pg = placement_group # Ray placement group,用于调度 actor
        self.bundle_for_node = bundle_for_node # 节点与 bundle 的映射
        self.rank0_node_ip = rank0_node_ip # 主节点 IP,用于安全绑定 ZMQ 套接字
        self.scheduler_actors: List = [] # 存储创建的 Ray actor 引用
        self.event_loop_refs: List = [] # 事件循环引用,用于异步控制
​
        # super().__init__ 会通过方法解析顺序(MRO)调用我们覆盖的方法
        # 传递 run_scheduler_process_func=None,因为不生成 mp.Process
        super().__init__(server_args, port_args, run_scheduler_process_func=None)
python/sglang/srt/ray/engine.py core-logic

修改 RayEngine 以支持 DP,移除 NotImplementedError 并集成新控制器,是功能启用的入口点。

@classmethod
def _launch_scheduler_processes(
    cls,
    server_args: ServerArgs,
    port_args: PortArgs,
    run_scheduler_process_func: Callable,
) -> tuple[SchedulerInitResult, None]:
    """启动调度器为 Ray actors。    返回:
        (RaySchedulerInitResult, None) 元组。
        scheduler_procs 为 None,因为 Ray 使用 actors 而非 mp.Process。
    """
    pg = ray.util.get_current_placement_group()
    if pg is None:
        raise RuntimeError(
            "use_ray=True 需要 placement group,但未检测到。"
            "请将 Engine actor 调度到 placement group 上。"
        )
​
    nnodes = server_args.nnodes
    engine_bundle, engine_ip = _find_engine_bundle(pg, nnodes)
    bundle_for_node = [engine_bundle] + [i for i in range(nnodes) if i != engine_bundle]
    rank0_node_ip = engine_ip
​
    if server_args.dp_size == 1:
        # dp_size == 1 时,启动张量并行调度器 actors
        world_size = server_args.tp_size * server_args.pp_size
        gpus_per_node = world_size // nnodes
        logger.info(f"Ray 集群: {nnodes} 节点,使用 {gpus_per_node} GPU/节点,world_size={world_size}")
        # 后续代码创建 SchedulerActor 并返回结果
        scheduler_actors = []
        # ... 省略具体 actor 创建逻辑
        return RaySchedulerInitResult(scheduler_actors=scheduler_actors), None
    else:
        # dp_size > 1 或 enable_dp_attention 设置时,使用 RayDataParallelController
        # 控制器会处理多 DP rank 的 actor 启动和路由
        controller = RayDataParallelController(
            server_args, port_args, pg, bundle_for_node, rank0_node_ip
        )
        return controller.wait_for_completion(), None

关键符号

RayDataParallelController.__init__ RayDataParallelController.launch_dp_schedulers RayDataParallelController.launch_dp_attention_schedulers RayEngine._launch_scheduler_processes

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

技术风险包括:新控制器 RayDataParallelController 可能引入并发或通信问题,特别是在多 DP rank 下的 actor 生命周期管理;ZMQ 套接字绑定变化可能影响网络配置,需确保跨节点通信稳定;DP 支持增加系统复杂性,可能带来性能开销,需监控推理延迟。具体到文件,data_parallel_controller.py 中的端口分配和 actor 启动逻辑,以及 engine.py 中的分支处理,是潜在错误点。

对用户影响:Ray 用户现在可以在多 GPU 上使用数据并行和 DP 注意力,提高推理吞吐量和模型扩展性。对系统影响:扩展了 RayEngine 的功能范围,增加了代码库复杂性,需要更多测试和维护。对团队影响:开发团队需熟悉新控制器设计,运维团队需关注部署时的资源分配和网络配置。

新控制器引入 ZMQ 安全变更 并发通信风险

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

执行摘要

  • 一句话:为 RayEngine 添加数据并行和 DP 注意力支持,扩展多 GPU 推理能力。
  • 推荐动作:建议精读 RayDataParallelController 类的实现,了解如何将 Ray actors 集成到现有数据并行框架中,并覆盖基类方法。同时关注提交历史中的调整点,如返回类型修复和安全绑定,这些是重要的设计决策和陷阱规避。

功能与动机

根据 PR body,动机是支持在 Ray 环境中使用数据并行,当 dp_size > 1 或 enable_dp_attention 设置时,之前会抛出 NotImplementedError,这限制了 RayEngine 在多 GPU 场景下的扩展性。

实现拆解

  1. 新增 RayDataParallelController 类:在 python/sglang/srt/ray/data_parallel_controller.py 中定义,继承自 DataParallelController,覆盖 launch_dp_schedulerslaunch_dp_attention_schedulers 方法,使用 Ray actors(SchedulerActor)替代多进程,处理端口绑定和 ZMQ 路由。
  2. 修改 RayEngine 初始化逻辑:在 python/sglang/srt/ray/engine.py 中,移除对 dp_size > 1NotImplementedError,根据 dp_size 分支处理:dp_size == 1 时沿用原有张量并行逻辑,否则调用 RayDataParallelController;添加 import threading 支持并发控制。
  3. 更新测试覆盖:在 test/manual/test_ray_engine.py 中添加测试类 TestRayEngineOfflineDP2TestRayEngineOfflineDPAttention,扩展 _create_engine_on_pg 函数以支持 dp_size 参数,验证 DP 和 DP-attention 功能。
  4. 安全修复:在提交历史中,修复 ZMQ 套接字绑定,使用 get_zmq_socket_on_host 绑定到 rank0_node_ip,避免暴露未认证套接字(CVE-2026-3060)。
  5. 代码结构调整:通过多个提交调整方法顺序和返回类型(如修复 _launch_scheduler_processes 返回 tuple[SchedulerInitResult, None]),以保持与基础引擎的一致性。

关键文件:

  • python/sglang/srt/ray/data_parallel_controller.py(模块 Ray控制器;类别 source;类型 entrypoint;符号 RayDataParallelController, init, launch_dp_schedulers, launch_dp_attention_schedulers): 新增 Ray 专用的数据并行控制器,核心实现 DP 支持,覆盖基类方法以使用 Ray actors 替代多进程。
  • python/sglang/srt/ray/engine.py(模块 Ray引擎;类别 source;类型 core-logic;符号 wait_for_completion, _launch_dp_scheduler_processes): 修改 RayEngine 以支持 DP,移除 NotImplementedError 并集成新控制器,是功能启用的入口点。
  • test/manual/test_ray_engine.py(模块 测试套件;类别 test;类型 test-coverage;符号 _create_engine_on_pg, TestRayEngineErrors, TestRayEngineOfflineDP2, test_dp_greater_than_1_raises): 添加手动测试验证 DP 和 DP-attention 功能,确保变更的正确性。

关键符号:RayDataParallelController.init, RayDataParallelController.launch_dp_schedulers, RayDataParallelController.launch_dp_attention_schedulers, RayEngine._launch_scheduler_processes

关键源码片段

python/sglang/srt/ray/data_parallel_controller.py

新增 Ray 专用的数据并行控制器,核心实现 DP 支持,覆盖基类方法以使用 Ray actors 替代多进程。

class RayDataParallelController(DataParallelController):
    """DataParallelController that uses Ray actors for scheduler processes.    覆盖进程生成方法以创建 SchedulerActor Ray actors,而非 mp.Process。
    在进程中运行(非独立 mp.Process),复用父级的事件循环、分发和 ZMQ 路由。
    """
​
    def __init__(
        self,
        server_args: ServerArgs,
        port_args: PortArgs,
        placement_group,
        bundle_for_node: List[int],
        rank0_node_ip: str,
    ):
        # 在调用 super().__init__ 之前设置 Ray 特定属性,因为父构造函数会调用我们覆盖的 launch 方法
        self.pg = placement_group # Ray placement group,用于调度 actor
        self.bundle_for_node = bundle_for_node # 节点与 bundle 的映射
        self.rank0_node_ip = rank0_node_ip # 主节点 IP,用于安全绑定 ZMQ 套接字
        self.scheduler_actors: List = [] # 存储创建的 Ray actor 引用
        self.event_loop_refs: List = [] # 事件循环引用,用于异步控制
​
        # super().__init__ 会通过方法解析顺序(MRO)调用我们覆盖的方法
        # 传递 run_scheduler_process_func=None,因为不生成 mp.Process
        super().__init__(server_args, port_args, run_scheduler_process_func=None)

python/sglang/srt/ray/engine.py

修改 RayEngine 以支持 DP,移除 NotImplementedError 并集成新控制器,是功能启用的入口点。

@classmethod
def _launch_scheduler_processes(
    cls,
    server_args: ServerArgs,
    port_args: PortArgs,
    run_scheduler_process_func: Callable,
) -> tuple[SchedulerInitResult, None]:
    """启动调度器为 Ray actors。    返回:
        (RaySchedulerInitResult, None) 元组。
        scheduler_procs 为 None,因为 Ray 使用 actors 而非 mp.Process。
    """
    pg = ray.util.get_current_placement_group()
    if pg is None:
        raise RuntimeError(
            "use_ray=True 需要 placement group,但未检测到。"
            "请将 Engine actor 调度到 placement group 上。"
        )
​
    nnodes = server_args.nnodes
    engine_bundle, engine_ip = _find_engine_bundle(pg, nnodes)
    bundle_for_node = [engine_bundle] + [i for i in range(nnodes) if i != engine_bundle]
    rank0_node_ip = engine_ip
​
    if server_args.dp_size == 1:
        # dp_size == 1 时,启动张量并行调度器 actors
        world_size = server_args.tp_size * server_args.pp_size
        gpus_per_node = world_size // nnodes
        logger.info(f"Ray 集群: {nnodes} 节点,使用 {gpus_per_node} GPU/节点,world_size={world_size}")
        # 后续代码创建 SchedulerActor 并返回结果
        scheduler_actors = []
        # ... 省略具体 actor 创建逻辑
        return RaySchedulerInitResult(scheduler_actors=scheduler_actors), None
    else:
        # dp_size > 1 或 enable_dp_attention 设置时,使用 RayDataParallelController
        # 控制器会处理多 DP rank 的 actor 启动和路由
        controller = RayDataParallelController(
            server_args, port_args, pg, bundle_for_node, rank0_node_ip
        )
        return controller.wait_for_completion(), None

评论区精华

Review 中没有具体评论,但提交历史显示多次迭代:例如修复返回类型以匹配基类期望,避免运行时 ValueError;以及绑定 ZMQ 套接字到特定 IP 以提高安全性。这表明在实现过程中关注了正确性和安全细节,但未在 review 中形成显式讨论。

  • 暂无高价值评论线程

风险与影响

  • 风险:技术风险包括:新控制器 RayDataParallelController 可能引入并发或通信问题,特别是在多 DP rank 下的 actor 生命周期管理;ZMQ 套接字绑定变化可能影响网络配置,需确保跨节点通信稳定;DP 支持增加系统复杂性,可能带来性能开销,需监控推理延迟。具体到文件,data_parallel_controller.py 中的端口分配和 actor 启动逻辑,以及 engine.py 中的分支处理,是潜在错误点。
  • 影响:对用户影响:Ray 用户现在可以在多 GPU 上使用数据并行和 DP 注意力,提高推理吞吐量和模型扩展性。对系统影响:扩展了 RayEngine 的功能范围,增加了代码库复杂性,需要更多测试和维护。对团队影响:开发团队需熟悉新控制器设计,运维团队需关注部署时的资源分配和网络配置。
  • 风险标记:新控制器引入, ZMQ安全变更, 并发通信风险

关联脉络

  • PR #22898 [Ray] Auto-create placement group in RayEngine when none is detected: 同属 Ray 模块改进,扩展 RayEngine 功能,涉及 placement group 自动创建,与本 PR 的 DP 支持相辅相成。

参与讨论