Prhub

#35241 Create tests/distributed/test_mnnvl_alltoall.py

原始 PR 作者 puririshi98 合并时间 2026-04-30 05:56 文件变更 2 提交数 60 评论 32 代码增减 +779 / -0

执行摘要

新增 MNNVL AllToAll 分布式测试套件

作为 NVIDIA 将内部 CI 测试集成到 vLLM 上游仓库的持续努力的一部分,本 PR 针对关键的分布式通信原语 MNNVL alltoall 提供了测试覆盖。PR body 说明:'all 5 tests pass on 8xa100 w/ latest nvidia vllm container This is part of the NVIDIA effort to add CI to upstream github'

对于关心分布式通信和 CI 基础设施的工程师,值得精读测试框架设计,尤其是多进程环境管理和错误传播模式。对于主要关注模型推理逻辑的开发者,可快速浏览了解覆盖范围即可。

讨论亮点
  1. 测试验证有效性:gemini-code-assist[bot] 发现早期版本中 flashinfer_manager 被创建但未用于实际验证,仅使用了 AGRS 参考实现。审查者 hjjq 也指出 'Seems that this only validates AGRS? Where are the flashinfer backends being validated?'。作者后续提交 'adding coverage for flashinfer backend' 补充了相关测试用例。
  2. 端口硬编码风险:自动化审查工具指出硬编码端口可能导致 CI 不稳定,后期版本改用 get_open_port() 动态获取可用端口。
  3. SYS_PTRACE 检测策略:hjjq 质疑在 CI 环境中应直接要求 SYS_PTRACE 能力而非跳过测试,但作者保持了基于 /proc/self/status 检测并跳过的做法,可能考虑不同环境的灵活性需求。

实现拆解

  1. 测试基础设施搭建:在 tests/distributed/test_mnnvl_alltoall.py 中实现了多进程管理 helper,包括 _spawn_workers、_run_worker、_init_dp_environment。这些函数负责启动每个 GPU 上的子进程、设置分布式环境(支持 tp 或 DP 模式),并通过 mp.Queue 将所有错误传播回父进程,避免隐蔽的静默失败。
  2. 核心测试用例:涵盖 5 个主要场景——FlashInfer 模块导入、管理器初始化(验证 NVLink 连接)、工作区重初始化(在销毁后重试)、ensure_initialized 语义(确保延迟初始化只发生一次),以及 end-to-end 数据通信正确性(与 AGRS 参考实现对比)。
  3. CI 集成:修改 .buildkite/test_areas/distributed.yaml,在 'Distributed Tests (8 GPUs)(H100)' 的 source_file_dependencies 中添加本测试文件,确保任何相关变更都会触发该测试;同时在 'Distributed Tests (2 GPUs)(B200)' 的命令中添加 pytest 调用,确保在 B200 硬件上也运行这些测试。
文件 模块 状态 重要度
tests/distributed/test_mnnvl_alltoall.py 分布式通信 added 7.48
.buildkite/test_areas/distributed.yaml CI 配置 modified 3.14

关键符号

_has_sys_ptrace _spawn_workers _run_worker _init_dp_environment _make_forward_context _AttnMeta _two_sided_lifecycle_worker test_two_sided_manager_lifecycle

关键源码片段

tests/distributed/test_mnnvl_alltoall.py test-coverage

新测试文件,覆盖 MNNVL alltoall 核心操作

def _spawn_workers(worker_fn, world_size, *, dp_size=None):
    """Spawn one process per GPU, run worker_fn, assert all succeed.    Uses an mp.Queue to propagate worker tracebacks back to the parent
    so pytest shows the actual failure, not just an exit code.
    """
    # 确保使用 spawn 启动方法,这是跨平台分布式测试的必要条件
    if mp.get_start_method(allow_none=True) is None:
        mp.set_start_method("spawn")
​
    port = str(get_open_port())
    # 为 DP master 分配第二个端口,避免端口冲突,特别是在 xdist 下并行执行时
    dp_port = str(get_open_port()) if dp_size is not None else None
    err_queue: mp.Queue = mp.Queue()
    procs = []
    for rank in range(world_size):
        p = mp.Process(
            target=_run_worker,
            args=(rank, world_size, port, worker_fn, dp_size, dp_port, err_queue),
        )
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
​
    # 从队列收集所有 worker 的错误信息,然后统一断言,而不是只检查退出码
    errors = []
    while not err_queue.empty():
        errors.append(err_queue.get_nowait())
    err_queue.close()
    err_queue.join_thread()
    if errors:
        pytest.fail("Worker(s) failed:\n" + "\n---\n".join(errors))
​
​
def _run_worker(rank, world_size, port, worker_fn, dp_size, dp_port, err_queue):
    """Per-process setup: device, distributed env, then call worker_fn.    Args:
        dp_size: If set, initialize with tp=1 and data_parallel_size=dp_size.
                 Otherwise use tp=world_size (default for EP-based tests).
        dp_port: Separate port for the DP master (only used when dp_size is set).
        err_queue: Queue for propagating tracebacks to the parent process.
    """
    try:
        # 清除可能影响设备分配的 CUDA_VISIBLE_DEVICES 环境变量
        os.environ.pop("CUDA_VISIBLE_DEVICES", None)
        torch.accelerator.set_device_index(rank)
        if dp_size is not None:
            # 使用自定义的 DP 环境初始化,支持 tp=1、dp=dp_size
            _init_dp_environment(world_size, rank, port, dp_size, dp_port)
        else:
            # 默认使用 tp=world_size 的分布式环境(适合 EP 测试)
            init_test_distributed_environment(world_size, 1, rank, port)
        worker_fn(rank, world_size)
        torch.distributed.barrier()
    except Exception:
        # 将异常信息放入队列,避免子进程直接退出导致错误信息丢失
        err_queue.put(f"[Rank {rank}]\n{traceback.format_exc()}")
        import sys
        sys.exit(1)

评论区精华

测试未验证 flashinfer backend 输出 正确性

gemini-code-assist[bot] 发现 flashinfer_manager 被创建但未被用于实际验证,仅使用了 AGRS 参考实现。hjjq 也指出测试仅验证了 AGRS,未覆盖 flashinfer 后端。

结论:作者后续提交('adding coverage for flashinfer backend')补充了 flashinfer 相关测试用例,使测试同时覆盖两个后端。 · 已解决

使用硬编码端口风险 测试

gemini-code-assist[bot] 指出硬编码端口可能导致测试在 CI 中因端口冲突而 flaky,建议使用动态端口。

结论:最终代码改用 get_open_port() 动态获取可用端口,消除了硬编码风险。 · 已解决

SYS_PTRACE 检测策略 设计

hjjq 质疑在 CI 环境中应直接要求 SYS_PTRACE 能力而非跳过测试,认为跳过可能掩盖配置问题。

结论:作者保持了基于 /proc/self/status 检测并跳过的做法,未改为强制报错。该决定可能基于不同 CI 环境的灵活性需求。 · 待处理

风险与影响

  1. 硬件依赖:测试仅适用于 NVIDIA MNNVL 环境(特定 GPU+容器配置),在其他硬件上会自动跳过,可能导致覆盖盲区。
  2. 测试完整性:早期 review 发现 flashinfer 后端验证不足,虽然后续补充了测试,但仍需持续关注是否有遗漏场景。
  3. CI 配置变更:在 distributed.yaml 中添加依赖和命令可能影响其他 CI 步骤,但变更较小且经过验证。

对最终用户无直接功能影响。对开发者,增加了分布式通信回归测试覆盖,有助于及早发现 MNNVL 相关错误。对 CI 系统,新增的测试在 H100 和 B200 设备上运行,增加了测试时长和维护成本,但属于基础架构健全性的必要投入。

硬件依赖 ( 仅 MNNVL) 测试验证完整性待观察 CI 配置变更影响

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论