Prhub

#41429 [Perf][1/n] Eliminate various GPU<->CPU syncs

原始 PR 作者 njhill 合并时间 2026-05-12 04:36 文件变更 7 提交数 7 评论 3 代码增减 +49 / -33

执行摘要

消除多路径 GPU-CPU 同步,提升推理性能

PR body 指出:'Fix first batch of unnecessary gpu/cpu syncs, found via #40561'。作者通过 profiling 工具定位到多个路径存在不必要的同步,这些同步来源于在 GPU 上直接构造张量(如 torch.tensor(list, device=cuda))或使用 .item() 等隐式 D2H 同步操作,拖累了异步调度性能。该 PR 旨在消除这些同步点,使 GPU 流水线更流畅。

值得精读。该 PR 展示了如何通过 profiling 识别隐式同步并给出消除模式,对理解 GPU 异步编程有参考价值。

讨论亮点
  • pin_memory 兼容性:gemini-code-assist 建议在 sampler.py 中使用 self.pin_memory 替代硬编码 True 以适应 WSL,已采纳。同样在 gpu_model_runner.py 中建议显式 pin 内存后做非阻塞传输,已采纳。
  • 注释冗余:yewentao256 指出 penalties.py 中关于 index_fill_ 的注释可能冗余,作者保留未修改。

实现拆解

  1. 模型运行器(vllm/v1/worker/gpu_model_runner.py:在 _init_model_kwargs 中,使用 CPU 上的 optimistic_seq_lens_cpu 替换 GPU 上的 seq_lens,避免 torch.arange(seq_lens[i]) 触发的 GPU 同步;在 _prepare_kv_sharing_fast_prefill 中,用切片赋值 logits_indices[-1] 替代 fill_(logits_indices[-1].item()),消除 .item() 带来的 D2H 同步。
  2. 采样器(vllm/v1/sample/sampler.pygather_specific_token_logprobs 将 logprob token ID 矩阵的构造从 GPU 直接分配改为先在固定 CPU 内存上填充,再通过 non_blocking=True 异步上传到 GPU。
  3. 数据并行工具(vllm/v1/worker/dp_utils.py_run_ar 中每个 rank 的贡献数据先在 CPU 上组装成 tensor_cpu,然后 non_blocking=True 上传 GPU 后再执行 dist.all_reduce
  4. GPU N-gram 提议器(vllm/v1/spec_decode/ngram_proposer_gpu.py:使用新增的 async_tensor_h2d 函数替代 torch.tensor 以非阻塞方式上传重排序索引。
  5. 惩罚模块(vllm/v1/worker/gpu/sample/penalties.pybincount 中使用 index_fill_ 替代直接索引赋值以消除隐式同步。
  6. Mamba 混合器(vllm/model_executor/layers/mamba/mamba_mixer.py:添加 metadata=attn_metadata 参数传递(变更较小)。
    此外,vllm/v1/sample/ops/bad_words.py 也有轻微调整以配合非阻塞传输。
文件 模块 状态 重要度
vllm/v1/worker/gpu_model_runner.py 模型运行器 modified 6.24
vllm/v1/sample/sampler.py 采样器 modified 6.11
vllm/v1/worker/dp_utils.py 数据并行 modified 5.57
vllm/v1/spec_decode/ngram_proposer_gpu.py 推测解码 modified 5.3
vllm/v1/worker/gpu/sample/penalties.py 惩罚 modified 5.19
vllm/model_executor/layers/mamba/mamba_mixer.py Mamba 层 modified 4.56
vllm/v1/sample/ops/bad_words.py 采样器 modified 2.48

关键符号

_init_model_kwargs _prepare_kv_sharing_fast_prefill gather_specific_token_logprobs _run_ar update_ngram_gpu_tensors_incremental bincount forward_impl

关键源码片段

vllm/v1/worker/gpu_model_runner.py core-logic

移除了池化模型初始化中的 GPU 同步:使用 CPU-resident seq_lens 上限并采用 pin_memory + non_blocking 传输;在 KV 共享预填充中避免 .item() 同步。

def _init_model_kwargs(self):
    model_kwargs = dict[str, Any]()
​
    if not self.is_pooling_model:
        return model_kwargs
​
    num_reqs = self.input_batch.num_reqs
    pooling_params = self.input_batch.get_pooling_params()
​
    token_type_id_requests = dict[int, Any]()
    for i, param in enumerate(pooling_params):
        if (
            param.extra_kwargs is not None
            and (token_types := param.extra_kwargs.get('compressed_token_type_ids'))
            is not None
        ):
            token_type_id_requests[i] = token_types
​
    if len(token_type_id_requests) == 0:
        return model_kwargs
​
    # 使用 CPU 上的乐观序列长度上限,避免 GPU 上的 seq_lens 引起的同步
    seq_lens_cpu = self.optimistic_seq_lens_cpu[:num_reqs].tolist()
    token_type_ids = []
​
    for i in range(num_reqs):
        seq_len_i = seq_lens_cpu[i]
        pos = token_type_id_requests.get(i, seq_len_i)
        ids = (torch.arange(seq_len_i) >= pos).int()
        token_type_ids.append(ids)
​
    # 在固定的 CPU 内存中拼接再异步传输到 GPU
    token_type_ids_cpu = torch.empty(
        sum(seq_lens_cpu), dtype=torch.int32, pin_memory=self.pin_memory
    )
    torch.cat(token_type_ids, out=token_type_ids_cpu)
    model_kwargs['token_type_ids'] = token_type_ids_cpu.to(
        device=self.device, non_blocking=True
    )
    return model_kwargs
vllm/v1/sample/sampler.py core-logic

将 logprob token ID 矩阵的构造移至固定的 CPU 内存,然后通过 non_blocking 上传到 GPU,避免 Python 列表到 GPU 同步。

def gather_specific_token_logprobs(
    self,
    logits: torch.Tensor,
    logprob_token_ids: dict[int, list[int]],
    sampled: torch.Tensor,
) -> LogprobsTensors | None:
    if not logprob_token_ids:
        return None
​
    batch_size = logits.shape[0]
    device = logits.device
    max_num_tokens = max(len(tids) for tids in logprob_token_ids.values())
    pin = self.pin_memory # 平台感知的 pin_memory 标志,在 WSL 上为 False
​
    # 在固定的 CPU 内存上分配张量,避免 GPU 直接构造带来的同步
    token_ids_cpu = torch.zeros(
        batch_size, max_num_tokens + 1, dtype=torch.int64, pin_memory=pin
    )
    valid_mask_cpu = torch.zeros(
        batch_size, max_num_tokens + 1, dtype=torch.bool, pin_memory=pin
    )
    valid_mask_cpu[:, 0] = True # 采样 token 始终有效
​
    # 在 CPU 上填充每个请求的 token ID
    for req_idx, token_ids in logprob_token_ids.items():
        num_tokens = len(token_ids)
        token_ids_cpu[req_idx, 1 : num_tokens + 1] = torch.as_tensor(
            token_ids, dtype=torch.int64
        )
        valid_mask_cpu[req_idx, 1 : num_tokens + 1] = True
​
    # 通过 non_blocking 传输到 GPU,不阻塞主机线程
    token_ids_tensor = token_ids_cpu.to(device, non_blocking=True)
    valid_mask = valid_mask_cpu.to(device, non_blocking=True)
    # 采样 token 列直接在 GPU 上赋值(sampled 已在 GPU 上),避免 D2H 回传
    token_ids_tensor[:, 0] = sampled
​
    logprobs = compute_token_logprobs(logits, token_ids_tensor)
    logprobs = logprobs.masked_fill(~valid_mask, float('-inf'))
​
    sampled_logits = logits.gather(-1, sampled.unsqueeze(-1))
    token_ranks = (logits > sampled_logits).sum(dim=-1)
​
    return LogprobsTensors(
        logprob_token_ids=token_ids_tensor.to(torch.int32),
        logprobs=logprobs,
        selected_token_ranks=token_ranks,
    )

评论区精华

sampler.py 中 pin_memory 硬编码应使用 self.pin_memory 正确性

gemini-code-assist 指出在 WSL 上 pin_memory=True 可能失败,建议使用 self.pin_memory 属性。

结论:代码已修改为使用 self.pin_memory。 · 已解决

gpu_model_runner.py 中非阻塞传输需确保源张量已 pin memory 正确性

gemini-code-assist 建议使用 self.pin_memory 条件性地 pin 张量,确保 non_blocking 有效。

结论:代码已采纳,使用了 pin_memory=self.pin_memory。 · 已解决

penalties.py 注释可能冗余 style

yewentao256 认为注释可能冗余。

结论:作者保留注释,未修改。 · unresolved

风险与影响

  1. 改动涉及多个核心路径,但缺少对应测试(无测试文件变更),存在回归风险。
  2. pin_memory 在 WSL 环境下可能不可用,代码已通过 self.pin_memory 标志处理,风险可控。
  3. 新增的 async_tensor_h2d 函数依赖 pin_memory 和正确的 CUDA 流同步,若未正确实现可能导致数据竞争。
  4. 非阻塞传输后若无后续同步点,可能出现读未完成,但现有代码已依赖 GPU 同步(如 all_reducecudagraph 同步)保证正确性。

对用户:无功能变化,推理延迟可能降低。对系统:减少 CPU-GPU 交互,降低延迟抖动。对团队:提供了可复用的消除同步模式(CPU 构造 + non_blocking 传输、index_fill_、切片赋值替代 item 同步),为后续优化奠定基础。

缺少测试覆盖 WSL pin_memory 兼容性 非阻塞传输依赖正确的 cuda 流同步

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论