Prhub

#43720 [KVConnector][1/N] PP-aware handshake aggregation and intermediate-PP output plumbing

原始 PR 作者 zixi-qi 合并时间 2026-06-05 13:04 文件变更 9 提交数 4 评论 11 代码增减 +459 / -26

执行摘要

PP-aware KV connector 握手聚合与中间输出

PR 目的明确:让 KV connector / engine / model runner / gpu worker PP-aware,为 PP + PD 分解提供基础设施。本 PR 是连接器无关的,变更对于 NIXL connector 和 Mooncake connector 支持 PP + PD 都是必要的,纯 PP-aware 重构,不引入行为变化。

建议精读,尤其关注 EngineTransferInfoTransferTopology 的键变更,以及基类默认实现中的校验逻辑。设计简洁,没有过度抽象,值得借鉴。

讨论亮点
  • 设计简化:njhill 认为独立的 SupportsPP 标记接口过于重量级,建议直接在基类中添加方法。作者采纳并移除了标记类。
  • 非 PP-aware 连接器的保护:njhill 提出疑问:非 PP-aware 连接器用于 PP 上下文是否安全?作者在默认实现中添加了校验,当遇到 pp_rank>0 时抛出 ValueError。

实现拆解

  1. 扩展 EngineTransferInfo 和 TransferTopology._engines 键:在 vllm/distributed/kv_transfer/kv_connector/utils.py 中,为 EngineTransferInfo 数据类添加 remote_pp_rankstart_layerend_layer 字段(默认 0),将 TransferTopology._engines 的键类型从 EngineId 改为 (EngineId, int)(pp_rank),并修改 register_remote_engineget_engine_infois_kv_replicatedreplicates_kv_cachetarget_remote_ranksdescribe 方法的签名或内部实现以接受可选的 remote_pp_rank 参数。
  2. 添加 PP-aware 握手元数据设置方法:在 vllm/distributed/kv_transfer/kv_connector/v1/base.pyKVConnectorBase_V1 基类中新增 set_xfer_handshake_metadata_pp_aware 方法,默认实现检查字典中是否存在 pp_rank != 0,若有则抛出 ValueError;否则将 (pp_rank, tp_rank) 键字典转换为 {tp_rank: metadata} 并委托给 set_xfer_handshake_metadata。在 multi_connector.py 中覆盖此方法,转发给所有子连接器。
  3. Worker 握手元数据生产端适配:在 vllm/v1/worker/gpu_worker.py 中,修改 get_kv_connector_handshake_metadata 的返回类型为 dict[tuple[int, int], KVConnectorHandshakeMetadata] | None,并改为以 (pp_rank, tp_rank) 为键构造字典,pp_rank 来自 get_pp_group().rank_in_group
  4. Engine Core 握手消费端适配:在 vllm/v1/engine/core.py 中,将握手合并字典的类型从 dict[int, Any] 改为 dict[tuple[int, int], Any],以接收新键结构;在调用连接器方法时根据是否支持 PP 选择 set_xfer_handshake_metadataset_xfer_handshake_metadata_pp_aware
  5. 配套测试:新增 test_transfer_topology_sharded.py 覆盖 PP 感知的引擎注册、pp_rank 分离存储、辅助方法的 pp_rank 参数及向后兼容性;新增 test_handshake_pp_aggregation.py 使用 fake executor 验证 PP 感知握手聚合流程;修改 test_multi_connector.py 同步接口变更。
文件 模块 状态 重要度
vllm/distributed/kv_transfer/kv_connector/utils.py 传输拓扑 modified 8.11
tests/v1/kv_connector/unit/test_handshake_pp_aggregation.py 握手测试 added 7.45
tests/v1/kv_connector/unit/test_transfer_topology_sharded.py 拓扑测试 added 7.29
vllm/distributed/kv_transfer/kv_connector/v1/base.py 连接器基类 modified 6.77
vllm/v1/worker/gpu_worker.py GPU 工作器 modified 6.44
vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py 多连接器 modified 5.41
vllm/v1/engine/core.py 引擎核心 modified 5.4
vllm/v1/executor/abstract.py 执行器抽象 modified 3.92
tests/v1/kv_connector/unit/test_multi_connector.py 多连接测试 modified 4.07

关键符号

register_remote_engine get_engine_info is_kv_replicated replicates_kv_cache target_remote_ranks describe set_xfer_handshake_metadata_pp_aware get_kv_connector_handshake_metadata

关键源码片段

vllm/distributed/kv_transfer/kv_connector/utils.py core-logic

核心数据结构变更:EngineTransferInfo 新增 PP 相关字段,TransferTopology._engines 键扩展为 (engine_id, pp_rank),所有相关方法适配 PP 维度。

@dataclass(frozen=True)
class EngineTransferInfo:
    """Common per-remote-engine transfer state, computed at handshake.    Stored per ``(engine_id, pp_rank)`` inside ``TransferTopology._engines``.
    """
    remote_tp_size: int
    remote_block_len: int # Block length (bytes)
    remote_block_size: int # Tokens per block.
    remote_physical_blocks_per_logical: int # Physical blocks per logical block.
    remote_pp_rank: int = 0 # Remote producer PP rank for this engine.
    start_layer: int = 0 # Global index of first layer owned by this PP rank.
    end_layer: int = 0 # Exclusive global index after last layer owned by this PP rank.
​
​
@dataclass
class TransferTopology:
    # ... other fields ...
    _engines: dict[tuple[EngineId, int], EngineTransferInfo] # keyed by (engine_id, pp_rank)
​
    def register_remote_engine(self, remote_engine_id: EngineId, info: EngineTransferInfo) -> EngineTransferInfo:
        engine_key = (remote_engine_id, info.remote_pp_rank)
        if engine_key in self._engines:
            return self._engines[engine_key]
        self._engines[engine_key] = info
        return info
​
    def get_engine_info(self, remote_engine_id: EngineId, remote_pp_rank: int = 0) -> EngineTransferInfo:
        return self._engines[(remote_engine_id, remote_pp_rank)]
​
    def is_kv_replicated(self, remote_engine_id: EngineId, remote_pp_rank: int = 0) -> bool:
        return self._engines[(remote_engine_id, remote_pp_rank)].remote_tp_size > self.total_num_kv_heads
​
    def replicates_kv_cache(self, remote_engine_id: EngineId, remote_pp_rank: int = 0) -> bool:
        return self.is_mla or self.is_kv_replicated(remote_engine_id, remote_pp_rank)

评论区精华

去除 SupportsPP 标记接口,采用基类默认方法 设计

njhill 认为 SupportsPP 接口过于重量级,建议直接在 KVConnectorBase_V1 中添加 pp_aware 方法作为默认实现,而不是使用单独标记类。

结论:zixi-qi 采纳建议,移除了 SupportsPP,在基类中添加了 set_xfer_handshake_metadata_pp_aware 默认方法。 · 已解决

非 PP-aware 连接器在 PP 上下文中的安全性 正确性

njhill 提问:若非 PP-aware 连接器用于 PP 上下文能否正常工作?若不能,应在何处校验。

结论:zixi-qi 在默认实现中添加校验,当 metadata 中出现 pp_rank>0 时抛出 ValueError,确保早期失败。 · 已解决

风险与影响

  • 向后兼容风险EngineTransferInfo 新字段有默认 0,现有非 PP 场景调用时行为一致;但需确认所有 get_engine_info 调用点是否都正确提供了 pp_rank(默认 0 在非 PP 场景是安全的)。
  • 测试覆盖风险:新增测试覆盖了核心注册和辅助方法,但 engine core 中根据连接器类型选择调用哪个方法的分支逻辑未被充分测试(通过 mock 覆盖较弱)。
  • 跨模块耦合:变更涉及 distributed、v1/worker、v1/engine 三大模块,后续重构需注意边界。
  • 用户:无直接可见变化。
  • 系统:为 KV Connector 支持 Pipeline Parallelism + PD 分解打下基础,未来连接器可按 PP rank 区分 producer,提升跨节点 KV 传输灵活性。
  • 团队:连接器开发者需关心 set_xfer_handshake_metadata_pp_aware 方法;后续 PP 相关功能开发需理解本变更。
核心数据结构变更 向后兼容依赖默认值 跨模块耦合

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论