Prhub

#21249 Support allreduce fusion with cp

原始 PR 作者 Shunkangz 合并时间 2026-04-20 12:06 文件变更 4 提交数 3 评论 25 代码增减 +201 / -27

执行摘要

支持上下文并行下的 AllReduce 融合,移除 CP 大小限制

从PR body中可见,动机是“Enable the allreduce_fusion with context parallel. This requires the allreduce_fusion can work with any sub comm group.”即让AllReduce融合功能在上下文并行中工作,扩展其适用场景。

该PR值得精读,特别是 flashinfer_comm_fusion.py 中自定义 _FixedTorchDistBackend 的设计,展示了如何修复第三方库的广播问题并适配子通信组。关注工作空间预初始化时机以避免CUDA图死锁。

讨论亮点
  • use_attn_tp_group 变更争议:Fridge003询问为何将 use_attn_tp_groupTrue 改为 False,Shunkangz回复这是修复之前的bug,AllReduce应跨MoE组而非注意力组进行。
  • 工作空间初始化优化:Fridge003建议惰性创建工作空间,Shunkangz解释现有逻辑已按需初始化MoE TP工作空间,并保持注意力TP工作空间预先初始化。
  • 测试覆盖与重命名:Fridge003询问测试用例是否覆盖CP场景,并建议重命名测试文件以更通用,Shunkangz确认测试启用并同意重命名。

实现拆解

  1. 修改通信融合层(flashinfer_comm_fusion.py):引入 _FixedTorchDistBackend 类修复FlashInfer广播问题,扩展 FlashInferWorkspaceManager 支持子通信组(如注意力TP组和MoE TP组),并新增 pre_initialize_workspaces 函数预初始化工作空间。
  2. 在模型运行器中添加预初始化(model_runner.py):新增 _pre_initialize_flashinfer_allreduce_workspace 方法,在CUDA图捕获前调用 pre_initialize_workspaces,避免集合操作死锁。
  3. 调整通信器逻辑(communicator.py):移除 apply_flashinfer_allreduce_fusion 函数中 attn_cp_size <= 1 的限制,并将 prepare_attn 中的 use_attn_tp_groupTrue 改为 False,确保AllReduce跨MoE组而非注意力组。
  4. 更新服务器参数配置(server_args.py):移除自动启用AllReduce融合时对 attn_cp_size 的限制,允许CP场景下默认启用。
文件 模块 状态 重要度
python/sglang/srt/layers/flashinfer_comm_fusion.py 通信融合层 modified 8.58
python/sglang/srt/model_executor/model_runner.py 模型执行器 modified 7.06
python/sglang/srt/layers/communicator.py 通信层 modified 5.36
python/sglang/srt/server_args.py 服务器参数 modified 4.58

关键符号

_FixedTorchDistBackend.__init__ _FixedTorchDistBackend.bcast FlashInferWorkspaceManager.initialize pre_initialize_workspaces _pre_initialize_flashinfer_allreduce_workspace apply_flashinfer_allreduce_fusion

关键源码片段

python/sglang/srt/layers/flashinfer_comm_fusion.py dependency-wiring

核心实现文件,引入自定义通信后端并扩展工作空间管理以支持子通信组

# 新增的 _FixedTorchDistBackend 类,修复 FlashInfer 广播问题并适配子通信组
class _FixedTorchDistBackend(TorchDistBackend):
    """Workaround for FlashInfer TorchDistBackend issues.    1. bcast fix: TorchDistBackend.bcast passes the in-group rank
       directly as `src` to broadcast_object_list, which expects a
       global rank.
    2. Graph-capture fix: initialize with NCCL device_group (so
       the backend derives correct device_idx / GPU mapping), but
       broadcast via GLOO cpu_group (to avoid NCCL collectives
       that interfere with CUDA graph capture).
    """
​
    def __init__(self, device_group, cpu_group):
        super().__init__(group=device_group) # 使用设备组初始化以获取 GPU 映射
        self._cpu_group = cpu_group # 保存 CPU 组用于广播,避免 NCCL 干扰 CUDA 图捕获
​
    def bcast(self, data, root):
        import torch.distributed as dist
​
        group_ranks = dist.get_process_group_ranks(self._cpu_group) # 获取 CPU 组全局排名
        global_root = group_ranks[root] # 将子组排名转换为全局排名
        object_list = [data]
        dist.broadcast_object_list(
            object_list, src=global_root, group=self._cpu_group # 使用 CPU 组进行广播
        )
        return object_list[0] # 返回广播后的数据
python/sglang/srt/model_executor/model_runner.py data-contract

新增预初始化方法,在 CUDA 图捕获前调用以避免死锁

# 新增的预初始化方法,确保在 CUDA 图捕获前初始化 AllReduce 工作空间
def _pre_initialize_flashinfer_allreduce_workspace(self):
    """Pre-initialize flashinfer allreduce fusion workspaces.    Must run before CUDA graph capture to avoid collective operations
    (broadcasts, barriers) inside the graph capture context, which can
    deadlock with custom_all_reduce.register_graph_buffers.
    """
    if not self.server_args.enable_flashinfer_allreduce_fusion: # 检查是否启用融合
        return
​
    from sglang.srt.layers.communicator import FUSE_ALLREDUCE_MAX_BATCH_SIZE
    from sglang.srt.layers.flashinfer_comm_fusion import pre_initialize_workspaces
​
    pre_initialize_workspaces(
        max_token_num=FUSE_ALLREDUCE_MAX_BATCH_SIZE, # 使用最大批次大小
        hidden_dim=self.model_config.hidden_size, # 模型隐藏维度
        dtype=self.dtype, # 数据类型
    )

评论区精华

use_attn_tp_group 从 True 改为 False 的原因 正确性

Fridge003 询问为什么在 communicator.py 中将 use_attn_tp_group 从 True 改为 False,Shunkangz 回复这是修复之前的 bug,AllReduce 应该跨 MoE 组而非注意力组进行。

结论:确认为正确修复,确保 AllReduce 在正确通信组中执行。 · 已解决

工作空间初始化的惰性创建优化 设计

Fridge003 建议惰性创建工作空间以避免不必要的内存开销,Shunkangz 解释现有逻辑已按需初始化 MoE TP 工作空间,并保持注意力 TP 工作空间预先初始化以维持一致性。

结论:保持当前设计,仅在需要时初始化 MoE TP 工作空间。 · 已解决

预初始化函数是否为死代码 正确性

Fridge003 质疑 pre_initialize_workspaces 是否为死代码,Shunkangz 澄清该方法在 model_runner.py 中被调用,用于预初始化工作空间。

结论:确认为有效代码,用于避免 CUDA 图捕获死锁。 · 已解决

风险与影响

  • 兼容性风险:新引入的 _FixedTorchDistBackend 类依赖FlashInfer版本,需确保上游PR(flashinfer PR #2662)合并,否则可能回退到默认实现。
  • 性能风险:预初始化工作空间可能增加内存开销,尤其在多个子通信组场景下;代码中已通过条件检查避免不必要初始化。
  • 正确性风险communicator.pyuse_attn_tp_group 的变更可能影响其他并行配置,需确保测试覆盖所有场景。
  • 用户影响:用户可在上下文并行配置下启用AllReduce融合,提升多GPU通信性能,尤其对DeepSeek、Qwen等模型有益。
  • 系统影响:扩展了AllReduce融合的适用场景,支持更灵活的并行策略;依赖FlashInfer更新,需同步升级。
  • 团队影响:工程师需关注CP与AllReduce融合的交互,后续测试需增加CP场景覆盖。
依赖外部库更新 工作空间内存开销 并行配置兼容性

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论