Prhub

#40150 [CPU][BugFix] Fix inter-node pipeline parallel

原始 PR 作者 fadara01 合并时间 2026-04-20 17:21 文件变更 2 提交数 2 评论 3 代码增减 +17 / -2

执行摘要

修复跨节点流水线并行中 CPU 通信器因 torch.distributed 后端不支持张量字典传输而失败的问题。

修复 Issue #37933(v0.18.0 跨节点流水线并行失败)。在跨节点环境中,CPU 通信器使用 torch.distributed 后端,但该后端不支持 send/recv_tensor_dict 操作,而流水线并行需要这些操作。PR body 明确指出:“torch.distributed does not support send/recieve tensor_dict which are needed for pipeline-parallel。”

该 PR 值得精读,特别是对于从事分布式 CPU 推理或流水线并行开发的工程师。关注 supports_tensor_dict 属性的引入和 use_cpu_custom_send_recv 逻辑的更新,这些设计决策明确了后端支持的条件。同时,注意 review 中提到的 ARM 性能风险和属性访问安全问题,这些是未来需要关注的潜在改进点。

讨论亮点

gemini-code-assist[bot] 指出两个潜在问题:

  1. ARM 架构检查被注释可能导致性能回归:在 cpu_communicator.py 中,ARM 架构检查被注释(# or current_platform.get_cpu_architecture() == CpuArchEnum.ARM),这可能强制 ARM 平台使用较慢的 torch.distributed 后端,建议明确是否故意禁用 SHM 后端。
  2. 直接访问 supports_tensor_dict 属性存在风险:在 parallel_state.py 中,直接访问 self.device_communicator.supports_tensor_dict 可能在其他通信器(如 CudaCommunicator)上引发 AttributeError,建议使用 getattr 更安全。bigPYJ1151 批准了 PR,但未直接回应这些评论,因此这些疑虑可能未解决。

实现拆解

  1. 在 CPU 通信器中添加后端支持检测:修改 vllm/distributed/device_communicators/cpu_communicator.py,在 __init__ 方法中添加 self.supports_tensor_dict 属性,仅当使用 SHM 后端(_CPUSHMDistributed)时为 True。
  2. 禁用不支持的张量字典传输:在 send_tensor_dictrecv_tensor_dict 方法中添加检查,如果 supports_tensor_dict 为 False(即使用 torch.distributed 后端),则抛出 NotImplementedError,阻止使用不支持的快速路径。
  3. 更新并行状态中的自定义发送/接收判断逻辑:修改 vllm/distributed/parallel_state.py 中的 use_cpu_custom_send_recv 属性,从检查 torch.ops._C.init_shm_manager 存在性,改为检查 CPU 平台且设备通信器的 supports_tensor_dict 属性为 True,确保仅当 SHM 后端可用时才启用自定义发送/接收。
  4. 无测试或配置配套改动:本次变更仅涉及核心逻辑修复,未包含测试文件或配置文件的修改。
文件 模块 状态 重要度
vllm/distributed/device_communicators/cpu_communicator.py 分布式通信 modified 6.45
vllm/distributed/parallel_state.py 并行状态 modified 5.59

关键符号

__init__ send_tensor_dict recv_tensor_dict

关键源码片段

vllm/distributed/device_communicators/cpu_communicator.py core-logic

核心变更文件,添加了后端支持检测并禁用不支持的张量字典传输。

class CpuCommunicator(DeviceCommunicatorBase):
    def __init__(
        self,
        cpu_group: dist.ProcessGroup,
        device: torch.device,
        device_group: dist.ProcessGroup,
        unique_name: str,
    ):
        super().__init__(cpu_group, device, device_group, unique_name)
        self.dist_module = torch.distributed
        # 根据条件选择 SHM 后端或回退到 torch.distributed
        if (
            (
                current_platform.get_cpu_architecture() == CpuArchEnum.X86
                # or current_platform.get_cpu_architecture() == CpuArchEnum.ARM # ARM 检查被注释
            )
            and hasattr(torch.ops._C, "init_shm_manager")
            and (unique_name.startswith("tp") or unique_name.startswith("pp"))
            and self._all_group_ranks_share_shm_group_name()
        ):
            self.dist_module = _CPUSHMDistributed(self)
        elif unique_name.startswith("tp") or unique_name.startswith("pp"):
            logger.info(
                "CPU SHM communicator disabled for group %s: ranks do not share "
                "the same SHM group name, falling back to torch.distributed.",
                unique_name,
            )
        # 关键新增:标记张量字典传输仅支持 SHM 后端
        self.supports_tensor_dict = isinstance(self.dist_module, _CPUSHMDistributed)
        # 后续 all2all 初始化逻辑 ...
​
    def send_tensor_dict(
        self,
        tensor_dict: dict[str, torch.Tensor | Any],
        dst: int,
    ) -> None:
        # 关键新增:检查后端支持,如果不支持则抛出异常
        if not self.supports_tensor_dict:
            raise NotImplementedError(
                "CpuCommunicator does not support tensor dict fastpath with "
                "torch.distributed backend."
            )
        return self.dist_module.send_tensor_dict(tensor_dict, dst)
​
    def recv_tensor_dict(
        self,
        src: int,
    ) -> dict[str, torch.Tensor | Any]:
        # 关键新增:检查后端支持,如果不支持则抛出异常
        if not self.supports_tensor_dict:
            raise NotImplementedError(
                "CpuCommunicator does not support tensor dict fastpath with "
                "torch.distributed backend."
            )
        return self.dist_module.recv_tensor_dict(src)
vllm/distributed/parallel_state.py core-logic

更新了自定义发送 / 接收的判断逻辑,以依赖 CPU 通信器的 supports_tensor_dict 属性。

class ProcessGroup:
    def __init__(
        self,
        cpu_group: dist.ProcessGroup,
        device: torch.device,
        device_group: dist.ProcessGroup,
        unique_name: str,
        use_message_queue_broadcaster: bool = False,
    ):
        # ... 设备通信器初始化 ...
        self.device_communicator = device_comm_cls(
            cpu_group=self.cpu_group,
            device=self.device,
            device_group=self.device_group,
            unique_name=self.unique_name,
        )
        # ... 其他初始化 ...
        # 关键变更:更新自定义发送 / 接收支持判断
        self.use_cpu_custom_send_recv = (
            current_platform.is_cpu()
            and self.device_communicator
            and getattr(self.device_communicator, "supports_tensor_dict", False) # 使用 getattr 避免 AttributeError
        )
        # 注意:原代码直接访问 self.device_communicator.supports_tensor_dict,
        # review 建议使用 getattr 以提高鲁棒性,防止其他通信器缺少此属性。

评论区精华

ARM 架构检查注释可能导致性能回归 性能

gemini-code-assist[bot] 指出 ARM 架构检查被注释,可能强制 ARM 平台使用较慢的 torch.distributed 后端,建议明确是否故意禁用 SHM 后端。

结论:未在 PR 中直接回应,疑虑可能未解决。 · unresolved

直接访问 supports_tensor_dict 属性存在风险 正确性

gemini-code-assist[bot] 建议在 parallel_state.py 中使用 getattr 访问 supports_tensor_dict,以避免其他通信器(如 CudaCommunicator)可能缺少此属性而引发 AttributeError。

结论:PR 已采纳建议,将直接访问改为 getattr(self.device_communicator, "supports_tensor_dict", False)。 · 已解决

风险与影响

  1. 回归风险:ARM 架构检查被注释可能导致 ARM 平台无法使用 SHM 后端,性能下降(如 gemini-code-assist[bot] 所述)。
  2. 兼容性风险:直接访问 supports_tensor_dict 属性(在 parallel_state.py 中)可能在其他通信器上引发 AttributeError,虽然当前有 is_cpu() 检查,但未来扩展可能出错。
  3. 功能风险:如果 SHM 后端不可用,send_tensor_dictrecv_tensor_dict 将抛出 NotImplementedError,依赖这些方法的流水线并行操作可能回退到通用实现,但通用实现可能未充分测试或性能较差。
  1. 用户影响:修复了跨节点流水线并行在 CPU 上的失败问题,使多节点 CPU 推理能够正常工作。
  2. 系统影响:CPU 通信器现在能正确区分后端支持,避免使用不支持的 torch.distributed 张量字典传输,提升系统稳定性。
  3. 团队影响:为分布式 CPU 流水线并行提供了明确的后端支持边界,便于后续开发和调试。
ARM 性能风险 属性访问安全 缺少测试覆盖

关联 Issue

#37933 [Bug]: v0.18.0 fails to run pipeline parallel across nodes

完整报告

参与讨论