执行摘要
- 一句话:为Ray后端添加数据并行和DP注意力支持
- 推荐动作:值得精读,尤其关注
RayDataParallelController的设计模式(在子类构造函数中预先设置属性再调用父类构造器以覆盖方法)以及ZMQ安全绑定策略。建议后续将手动测试转化为CI任务(至少nightly),避免回归。
功能与动机
此前Ray后端不支持数据并行(dp_size > 1会抛出NotImplementedError),限制了Ray部署场景下的吞吐扩展能力。本PR填补了这一空白,使得用户可以在Ray集群上利用DP和DP注意力特性。
实现拆解
- 新增
RayDataParallelController (python/sglang/srt/ray/data_parallel_controller.py): 继承自DataParallelController,重写launch_dp_schedulers和launch_dp_attention_schedulers方法。在__init__中提前设置Ray相关的pg、bundle_for_node等属性,然后调用父类构造器触发启动逻辑。launch_dp_schedulers为每个DP rank预先分配端口和ZMQ socket,然后调用_launch_ray_tp_group创建SchedulerActor actor(每个DP rank内部可包含TP组)。launch_dp_attention_schedulers类似,但使用compute_dp_attention_world_info计算DP注意力世界信息,并绑定worker socket到rank0节点IP以增强安全性(CVE-2026-3060)。
- 修改
RayEngine._launch_scheduler_processes (python/sglang/srt/ray/engine.py): 移除原先的dp_size > 1检查,改为条件分支:当dp_size == 1时走原有的TP-only启动逻辑;当dp_size > 1时,调用新增的_launch_dp_scheduler_processes方法,该方法创建RayDataParallelController实例并启动。返回类型保持为tuple[SchedulerInitResult, None]以兼容基类。
- 更新手动测试 (
test/manual/test_ray_engine.py): 增加TestRayEngineOfflineDP2和TestRayEngineOfflineDPAttention两个测试类,分别测试dp_size=2和DP注意力场景。修改辅助函数_create_engine_on_pg以支持dp_size参数和enable_dp_attention开关,并调整GPU总数为dp_size * tp_size * pp_size(DP注意力时折叠为tp_size * pp_size)。同时删除了原有的test_dp_greater_than_1_raises测试。
关键文件:
python/sglang/srt/ray/data_parallel_controller.py(模块 数据并行控制;类别 source;类型 entrypoint;符号 RayDataParallelController, init, launch_dp_schedulers, launch_dp_attention_schedulers): 新增文件,核心组件,实现Ray-specific的DataParallelController,处理DP和DP注意力调度器actor的启动。
python/sglang/srt/ray/engine.py(模块 引擎;类别 source;类型 dependency-wiring;符号 wait_for_completion, _launch_dp_scheduler_processes): 修改入口点,移除NotImplementedError,添加dp_size分支逻辑,新增_launch_dp_scheduler_processes方法。
test/manual/test_ray_engine.py(模块 测试;类别 test;类型 test-coverage;符号 _create_engine_on_pg, TestRayEngineErrors, TestRayEngineOfflineDP2, test_dp_greater_than_1_raises): 新增DP和DP注意力测试类,验证新功能的正确性,同时修改辅助函数支持dp_size参数。
关键符号:RayDataParallelController.init, RayDataParallelController.launch_dp_schedulers, RayDataParallelController.launch_dp_attention_schedulers, RayDataParallelController._launch_ray_tp_group, RayEngine._launch_scheduler_processes, RayEngine._launch_dp_scheduler_processes
关键源码片段
python/sglang/srt/ray/data_parallel_controller.py
新增文件,核心组件,实现Ray-specific的DataParallelController,处理DP和DP注意力调度器actor的启动。
# python/sglang/srt/ray/data_parallel_controller.py
# RayDataParallelController: 使用 Ray actor 替代 mp.Process 的 DataParallelController
class RayDataParallelController(DataParallelController):
def __init__(
self,
server_args: ServerArgs,
port_args: PortArgs,
placement_group,
bundle_for_node: List[int],
rank0_node_ip: str,
):
# 必须在 super().__init__() 之前设置这些属性,因为父类构造器会调用我们重写的启动方法
self.pg = placement_group
self.bundle_for_node = bundle_for_node
self.rank0_node_ip = rank0_node_ip
self.scheduler_actors: List = []
self.event_loop_refs: List = []
# 调用父类构造器,传入 run_scheduler_process_func=None 以跳过进程启动
super().__init__(server_args, port_args, run_scheduler_process_func=None)
def launch_dp_schedulers(self, server_args: ServerArgs, port_args: PortArgs):
"""为每个DP rank启动一个Ray actor(内部可能包含TP组)"""
sockets = []
dp_port_args_list = []
for dp_rank in range(server_args.dp_size):
tmp_port_args = PortArgs.init_new(server_args)
tmp_port_args.tokenizer_ipc_name = port_args.tokenizer_ipc_name
tmp_port_args.detokenizer_ipc_name = port_args.detokenizer_ipc_name
# 绑定 NCCL 端口以保证唯一性
sockets.append(bind_port(tmp_port_args.nccl_port))
dp_port_args_list.append(tmp_port_args)
# 在 rank0 节点创建 ZMQ PUSH socket,用于 controller 向 scheduler 发送请求
if server_args.node_rank == 0:
self.workers[dp_rank] = get_zmq_socket(
self.context, zmq.PUSH,
tmp_port_args.scheduler_input_ipc_name, True)
for sock in sockets:
sock.close()
for dp_rank in range(server_args.dp_size):
self._launch_ray_tp_group(server_args, dp_port_args_list[dp_rank], dp_rank)
python/sglang/srt/ray/engine.py
修改入口点,移除NotImplementedError,添加dp_size分支逻辑,新增_launch_dp_scheduler_processes方法。
# python/sglang/srt/ray/engine.py
@classmethod
def _launch_scheduler_processes(cls, server_args, port_args, run_scheduler_process_func):
pg = ray.util.get_current_placement_group()
if pg is None:
raise RuntimeError("use_ray=True requires a placement group")
nnodes = server_args.nnodes
engine_bundle, engine_ip = _find_engine_bundle(pg, nnodes)
bundle_for_node = [engine_bundle] + [i for i in range(nnodes) if i != engine_bundle]
if server_args.dp_size == 1:
# 原有的 TP-only 启动逻辑(简化展示)
world_size = server_args.tp_size * server_args.pp_size
gpus_per_node = world_size // nnodes
# ... 创建 SchedulerActor actors ...
scheduler_actors = [...]
scheduler_init_result = RaySchedulerInitResult(
scheduler_actors=scheduler_actors,
gpu_ids=[...], ...)
return scheduler_init_result, None
else:
# dp_size > 1: 使用 RayDataParallelController
return cls._launch_dp_scheduler_processes(
pg, bundle_for_node, engine_ip,
server_args, port_args)
test/manual/test_ray_engine.py
新增DP和DP注意力测试类,验证新功能的正确性,同时修改辅助函数支持dp_size参数。
# test/manual/test_ray_engine.py
def _create_engine_on_pg(tp_size, pp_size=1, dp_size=1, model=_MODEL, extra_kwargs=None):
# ...
enable_dp_attention = (extra_kwargs or {}).get("enable_dp_attention", False)
if enable_dp_attention:
total_gpus = tp_size * pp_size # DP attention 折叠到 TP
else:
total_gpus = dp_size * tp_size * pp_size
pg = placement_group([{"CPU": 1, "GPU": total_gpus}], strategy="STRICT_PACK")
# ...
kwargs = dict(model_path=model, tp_size=tp_size, pp_size=pp_size, dp_size=dp_size)
# ...
评论区精华
该PR没有review评论,仅有合入者Qiaolin-Yu的批准。从提交历史可见多次迭代:对齐分支结构、内联方法、修复返回类型、修复DP注意力地址不匹配、以及端口绑定安全修复。说明在开发过程中有主动调整设计,但未形成公开讨论。
风险与影响
- 风险:主要风险来自Ray actor资源管理和端口分配逻辑。新的
RayDataParallelController与父类DataParallelController的交互(特别是__init__中设置属性后再调用super)需要确保覆盖的方法正确执行。DP注意力路径使用了get_zmq_socket_on_host绑定到rank0节点IP,若网络配置有误可能导致连接失败。此外,测试仅覆盖手动运行场景(需2 GPU),尚未集成到CI流水线,存在回归风险。
- 影响:正向影响:使用Ray部署的用户现在可以利用数据并行特性提升吞吐,DP注意力可进一步减少KV缓存占用。影响范围限于Ray后端,不影响多进程或其他后端。需要用户显式设置dp_size或enable_dp_attention。测试需手动执行,暂未加入CI,可能影响后续回归检测。
- 风险标记:核心路径变更, 缺少自动化测试(手动测试), ZMQ安全绑定可能引入网络依赖
关联脉络
- PR #17684 基础Ray actor后端支持: 本PR是PR #17684的后续,为Ray后端添加之前缺失的DP支持(dp_size > 1)。
参与讨论