Prhub

#21887 [Ray] Add data parallel and DP attention support to Ray actor backend (DP > 1)

原始 PR 作者 xyuzh 合并时间 2026-04-16 06:00 文件变更 3 提交数 9 评论 2 代码增减 +505 / -104

执行摘要

为 Ray 后端添加数据并行和 DP 注意力支持

此前Ray后端不支持数据并行(dp_size > 1会抛出NotImplementedError),限制了Ray部署场景下的吞吐扩展能力。本PR填补了这一空白,使得用户可以在Ray集群上利用DP和DP注意力特性。

值得精读,尤其关注RayDataParallelController的设计模式(在子类构造函数中预先设置属性再调用父类构造器以覆盖方法)以及ZMQ安全绑定策略。建议后续将手动测试转化为CI任务(至少nightly),避免回归。

讨论亮点

该PR没有review评论,仅有合入者Qiaolin-Yu的批准。从提交历史可见多次迭代:对齐分支结构、内联方法、修复返回类型、修复DP注意力地址不匹配、以及端口绑定安全修复。说明在开发过程中有主动调整设计,但未形成公开讨论。

实现拆解

  1. 新增RayDataParallelController (python/sglang/srt/ray/data_parallel_controller.py): 继承自DataParallelController,重写launch_dp_schedulerslaunch_dp_attention_schedulers方法。在__init__中提前设置Ray相关的pg、bundle_for_node等属性,然后调用父类构造器触发启动逻辑。launch_dp_schedulers为每个DP rank预先分配端口和ZMQ socket,然后调用_launch_ray_tp_group创建SchedulerActor actor(每个DP rank内部可包含TP组)。launch_dp_attention_schedulers类似,但使用compute_dp_attention_world_info计算DP注意力世界信息,并绑定worker socket到rank0节点IP以增强安全性(CVE-2026-3060)。
  2. 修改RayEngine._launch_scheduler_processes (python/sglang/srt/ray/engine.py): 移除原先的dp_size > 1检查,改为条件分支:当dp_size == 1时走原有的TP-only启动逻辑;当dp_size > 1时,调用新增的_launch_dp_scheduler_processes方法,该方法创建RayDataParallelController实例并启动。返回类型保持为tuple[SchedulerInitResult, None]以兼容基类。
  3. 更新手动测试 (test/manual/test_ray_engine.py): 增加TestRayEngineOfflineDP2TestRayEngineOfflineDPAttention两个测试类,分别测试dp_size=2和DP注意力场景。修改辅助函数_create_engine_on_pg以支持dp_size参数和enable_dp_attention开关,并调整GPU总数为dp_size * tp_size * pp_size(DP注意力时折叠为tp_size * pp_size)。同时删除了原有的test_dp_greater_than_1_raises测试。
文件 模块 状态 重要度
python/sglang/srt/ray/data_parallel_controller.py 数据并行控制 added 9.08
python/sglang/srt/ray/engine.py 引擎 modified 8.36
test/manual/test_ray_engine.py 测试 modified 7.24

关键符号

RayDataParallelController.__init__ RayDataParallelController.launch_dp_schedulers RayDataParallelController.launch_dp_attention_schedulers RayDataParallelController._launch_ray_tp_group RayEngine._launch_scheduler_processes RayEngine._launch_dp_scheduler_processes

关键源码片段

python/sglang/srt/ray/data_parallel_controller.py entrypoint

新增文件,核心组件,实现 Ray-specific 的 DataParallelController,处理 DP 和 DP 注意力调度器 actor 的启动。

# python/sglang/srt/ray/data_parallel_controller.py
# RayDataParallelController: 使用 Ray actor 替代 mp.Process 的 DataParallelController
class RayDataParallelController(DataParallelController):
    def __init__(
        self,
        server_args: ServerArgs,
        port_args: PortArgs,
        placement_group,
        bundle_for_node: List[int],
        rank0_node_ip: str,
    ):
        # 必须在 super().__init__() 之前设置这些属性,因为父类构造器会调用我们重写的启动方法
        self.pg = placement_group
        self.bundle_for_node = bundle_for_node
        self.rank0_node_ip = rank0_node_ip
        self.scheduler_actors: List = []
        self.event_loop_refs: List = []
        # 调用父类构造器,传入 run_scheduler_process_func=None 以跳过进程启动
        super().__init__(server_args, port_args, run_scheduler_process_func=None)
​
    def launch_dp_schedulers(self, server_args: ServerArgs, port_args: PortArgs):
        """为每个DP rank启动一个Ray actor(内部可能包含TP组)"""
        sockets = []
        dp_port_args_list = []
        for dp_rank in range(server_args.dp_size):
            tmp_port_args = PortArgs.init_new(server_args)
            tmp_port_args.tokenizer_ipc_name = port_args.tokenizer_ipc_name
            tmp_port_args.detokenizer_ipc_name = port_args.detokenizer_ipc_name
            # 绑定 NCCL 端口以保证唯一性
            sockets.append(bind_port(tmp_port_args.nccl_port))
            dp_port_args_list.append(tmp_port_args)
            # 在 rank0 节点创建 ZMQ PUSH socket,用于 controller 向 scheduler 发送请求
            if server_args.node_rank == 0:
                self.workers[dp_rank] = get_zmq_socket(
                    self.context, zmq.PUSH,
                    tmp_port_args.scheduler_input_ipc_name, True)
        for sock in sockets:
            sock.close()
        for dp_rank in range(server_args.dp_size):
            self._launch_ray_tp_group(server_args, dp_port_args_list[dp_rank], dp_rank)
python/sglang/srt/ray/engine.py dependency-wiring

修改入口点,移除 NotImplementedError,添加 dp_size 分支逻辑,新增 `_launch_dp_scheduler_processes` 方法。

# python/sglang/srt/ray/engine.py
@classmethod
def _launch_scheduler_processes(cls, server_args, port_args, run_scheduler_process_func):
    pg = ray.util.get_current_placement_group()
    if pg is None:
        raise RuntimeError("use_ray=True requires a placement group")
    nnodes = server_args.nnodes
    engine_bundle, engine_ip = _find_engine_bundle(pg, nnodes)
    bundle_for_node = [engine_bundle] + [i for i in range(nnodes) if i != engine_bundle]
    if server_args.dp_size == 1:
        # 原有的 TP-only 启动逻辑(简化展示)
        world_size = server_args.tp_size * server_args.pp_size
        gpus_per_node = world_size // nnodes
        # ... 创建 SchedulerActor actors ...
        scheduler_actors = [...]
        scheduler_init_result = RaySchedulerInitResult(
            scheduler_actors=scheduler_actors,
            gpu_ids=[...], ...)
        return scheduler_init_result, None
    else:
        # dp_size > 1: 使用 RayDataParallelController
        return cls._launch_dp_scheduler_processes(
            pg, bundle_for_node, engine_ip,
            server_args, port_args)
test/manual/test_ray_engine.py test-coverage

新增 DP 和 DP 注意力测试类,验证新功能的正确性,同时修改辅助函数支持 dp_size 参数。

# test/manual/test_ray_engine.py
def _create_engine_on_pg(tp_size, pp_size=1, dp_size=1, model=_MODEL, extra_kwargs=None):
    # ...
    enable_dp_attention = (extra_kwargs or {}).get("enable_dp_attention", False)
    if enable_dp_attention:
        total_gpus = tp_size * pp_size # DP attention 折叠到 TP
    else:
        total_gpus = dp_size * tp_size * pp_size
    pg = placement_group([{"CPU": 1, "GPU": total_gpus}], strategy="STRICT_PACK")
    # ...
    kwargs = dict(model_path=model, tp_size=tp_size, pp_size=pp_size, dp_size=dp_size)
    # ...

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

主要风险来自Ray actor资源管理和端口分配逻辑。新的RayDataParallelController与父类DataParallelController的交互(特别是__init__中设置属性后再调用super)需要确保覆盖的方法正确执行。DP注意力路径使用了get_zmq_socket_on_host绑定到rank0节点IP,若网络配置有误可能导致连接失败。此外,测试仅覆盖手动运行场景(需2 GPU),尚未集成到CI流水线,存在回归风险。

正向影响:使用Ray部署的用户现在可以利用数据并行特性提升吞吐,DP注意力可进一步减少KV缓存占用。影响范围限于Ray后端,不影响多进程或其他后端。需要用户显式设置dp_size或enable_dp_attention。测试需手动执行,暂未加入CI,可能影响后续回归检测。

核心路径变更 缺少自动化测试(手动测试) ZMQ 安全绑定可能引入网络依赖

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论