Prhub

#41061 [DSV4] Enable Multi-stream for Pre-Attn GEMM

原始 PR 作者 zyongye 合并时间 2026-04-29 00:08 文件变更 4 提交数 7 评论 5 代码增减 +187 / -57

执行摘要

DeepSeek V4 pre-attn GEMM 多流并行优化

DeepSeek V4 模型在 prefill 阶段需要执行多个矩阵乘法(GEMM)来计算 Q、K、V 投影及 compressor、indexer 等,这些 GEMM 在原本实现中顺序执行,无法充分利用 GPU 并行能力。本 PR 旨在通过 CUDA 流并发这些 GEMM 以缩短 prefill 时间,特别是首个 Token 延迟。PR 中所附基准测试显示,TTFT 从 2282ms 降低至 278ms,总吞吐从 567.81 tok/s 提升至 695.04 tok/s。

对于优化 DeepSeek V4 模型推理性能的团队,此 PR 提供了显著的 prefill 加速,值得精读其实现模式和同步设计。代码引入了新的 execute_in_parallel 工具,可在其他场景复用。建议后续添加单元测试覆盖多流路径,并在 PyTorch 2.12 就绪后考虑简化实现。

讨论亮点

在代码审查中,围绕是否应等待 PyTorch 2.12 以减少不可读代码路径产生了讨论(LopezCastroRoberto 引用 issue #39309 和 PR #39943)。贡献者 ZJY0516 表示对于高优先级模型 DSV4,可以先合并再重构。另外,gemini-code-assist 自动检查发现了三个关于 wq_b 线性层返回值需解包的高优先级正确性缺陷,均涉及 attn_gemm_parallel_execute.view() 调用的潜在 AttributeError。

实现拆解

  1. 添加泛化多流执行工具:在 vllm/utils/multi_stream_utils.py 中新增 execute_in_parallel 函数,作为 maybe_execute_in_parallel 的扩展,支持任意数量的辅助流和可跳过的任务。

  2. 修改 Attention 模块接口:将 DeepseekV4MLAModules 中的 aux_stream 字段改为 aux_stream_list(类型为 list[torch.cuda.Stream]),以便容纳多个辅助流。在 DeepseekV4Attention.__init__ 中初始化 4 个 CUDA 事件(ln_events),分别用于同步各流。

  3. 实现并行 GEMM 方法:在 deepseek_v4_attention.py 中添加 attn_gemm_parallel_execute 方法。将最重的 fused_wqa_wkv 留在默认流,将 compressor 的 kv_score GEMM、indexer 的 weights_proj 和 compressor kv_score GEMM(若 indexer 存在)分配到三个辅助流上并行执行。

  4. 调整 Compressor 前向接口:在 deepseek_compressor.py 中,将 forward 方法的输入从原始 hidden_states 改为预计算的 kv_score 张量,将 GEMM 计算移出到调用方(多流并行部分)。

  5. 修改模型构造入口:在 deepseek_v4.py 中,DeepseekV4DecoderLayer 改为接收 aux_stream_list;在模型顶层初始化时创建 3 个 CUDA 流并传递,同时移除旧的 AuxStreamType 枚举引用。

  6. 调整注意力 forward 路径:将原先在 forward 内顺序执行的 fused_wqa_wkv 和输入 GEMM 拆分为调用 attn_gemm_parallel_execute,然后将结果送入后续的 RoPE、FlashMLA 等操作。

文件 模块 状态 重要度
vllm/model_executor/layers/deepseek_v4_attention.py 注意力层 modified 8.88
vllm/utils/multi_stream_utils.py 多流工具 modified 7.21
vllm/model_executor/models/deepseek_v4.py 模型层 modified 6.15
vllm/model_executor/layers/deepseek_compressor.py 压缩器 modified 5.69

关键符号

attn_gemm_parallel_execute execute_in_parallel DeepseekV4MultiHeadLatentAttentionWrapper.forward DeepseekCompressor.forward DeepseekV4Attention.__init__

关键源码片段

vllm/utils/multi_stream_utils.py core-logic

新增泛化多流执行工具 `execute_in_parallel`,支撑本 PR 的并发调度。

def execute_in_parallel(
    default_fn: Callable[[], Any],
    aux_fns: list[Callable[[], Any] | None],
    start_event: torch.cuda.Event,
    done_events: list[torch.cuda.Event],
    aux_streams: list[torch.cuda.Stream] | None = None,
) -> tuple[Any, list[Any]]:
    """在默认流上运行 default_fn,在 aux_streams 上并发运行 aux_fns。    将 maybe_execute_in_parallel 泛化为 N 个辅助可调用对象。
    对于 aux_fns[i] 为 None 的槽位跳过(不切换流、不记录事件),
    其对应的 aux_results 条目返回 None。    start_event 从当前流扇出到每个启动的辅助流;
    done_events[i] 在 aux_fns[i] 之后记录,当前流在返回前 join。
    当 aux_streams 为 None 时,所有 aux_fns 在当前流上顺序执行。    参数:
        default_fn: 在默认(当前)流上运行的可调用对象。
        aux_fns: 每个辅助流的可调用对象;条目可为 None 表示跳过。
        start_event: 在 default_fn 之前在当前流上记录的 CUDA 事件,
            每个启动的辅助流会等待此事件。
        done_events: 每个辅助槽位对应一个 CUDA 事件,在相应 aux_fn
            执行后记录。长度必须与 aux_fns 相同。
        aux_streams: 每个辅助流对应的 CUDA 流。长度必须与 aux_fns
            相同。当为 None 时禁用多流。    返回:
        (default_result, aux_results) 元组,其中 aux_results[i] 是
        aux_fns[i] 的结果(若跳过则为 None)。
    """
    aux_results: list[Any]
    if aux_streams is None:
        # 顺序执行:先 default_fn,再依次执行每个非 None 的 aux_fn
        default_result = default_fn()
        aux_results = [fn() if fn is not None else None for fn in aux_fns]
        return default_result, aux_results
​
    # 多流模式:验证列表长度一致
    assert len(aux_fns) == len(aux_streams) == len(done_events), (
        "aux_fns, aux_streams, and done_events must be the same length"
    )
​
    aux_results = [None] * len(aux_fns)
    pending: list[torch.cuda.Event] = []
​
    # 记录开始事件,然后启动所有辅助流上的任务
    start_event.record()
    for i, fn in enumerate(aux_fns):
        if fn is None:
            continue # 跳过空槽位
        with torch.cuda.stream(aux_streams[i]):
            start_event.wait()
            aux_results[i] = fn()
            done_events[i].record()
        pending.append(done_events[i])
​
    # 在默认流上执行主任务
    default_result = default_fn()
​
    # 等待所有辅助流完成
    for ev in pending:
        ev.wait()
​
    return default_result, aux_results

评论区精华

是否应等待 PyTorch 2.12 以避免不可读代码路径 设计

审查者 LopezCastroRoberto 提出之前共识是等待 PyTorch 2.12 以避免不可读代码路径(引用 #39309, #39943)。贡献者 ZJY0516 回应表示对于高优先级模型 DSV4,可以先合并再重构。

结论:决定先合并,后续在 PyTorch 2.12 到来时重构。 · 已解决

wq_b 返回值需解包以避免 AttributeError 正确性

gemini-code-assist 指出在 attn_gemm_parallel_execute 中三处 self.wq_b(qr) 直接调用 .view(),但 ColumnParallelLinear 返回元组,应解包。

结论:建议被采纳,提交历史中有相关 bug fix。 · 已解决

风险与影响

主要风险来自 CUDA 多流同步的正确性(如提交历史曾修正并发 bug);若 indexer 为 None 则辅助流可能空转,但代码已处理 None 跳过;接口变更(如 compressor 的 forward 签名改变)可能影响 OOT 后端的自定义实现;新 execute_in_parallel 假设所有列表长度一致,调用方需保证。当前没有测试覆盖多流路径,无法通过回归测试保证正确性。

直接影响 DeepSeek V4 用户的推理延迟和吞吐,prefill 加速显著(TTFT 降低约 88%)。负面影响是增加了 CUDA 流使用量(3 个额外流),可能对资源受限环境造成竞争。对非 DeepSeek 模型无影响。团队需要维护多流相关代码并与未来 PyTorch 2.12 的原生多流支持对齐。

核心路径变更 缺少测试覆盖 接口兼容性风险 CUDA 同步风险

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论