Prhub

#42828 [KVConnector][DSV4] HMA support for Mooncake store connector

原始 PR 作者 ivanium 合并时间 2026-05-19 16:07 文件变更 10 提交数 8 评论 4 代码增减 +1835 / -446

执行摘要

MooncakeStore 新增混合注意力缓存支持

PR 旨在为 MooncakeStoreConnector 添加 hybrid KV cache 管理支持,以便使用混合注意力布局(如全注意力 + 滑动窗口)的模型(如 DSV4)可以利用 Mooncake Store 作为共享外部 KV 池。同时声明 SupportsHMA 接口并在初始化阶段验证 KVCacheConfig,拒绝不支持的配置组合。

建议仔细审阅 MooncakeStoreCoordinator.find_longest_cache_hit 的实现,确保与内部 HybridKVCacheCoordinator 的掩码计算逻辑一致。优先处理 review 中提出的 key_list 空检查和 segment 注册过滤问题,建议添加对应边界测试。

讨论亮点
  1. ZeroDivisionError 风险:gemini-code-assist 指出当 load_mask 过滤掉所有块时 key_list 可能为空,导致后续 rotation = self.tp_rank % len(key_list) 出现除零错误,建议添加空检查跳过处理。
  2. 缓存段注册错误:gemini-code-assist 指出 register_kv_caches 中所有 base address 被注册到每个组的 token_database,可能导致跨组数据污染,应按 layer 分组过滤。
  3. 注释补充建议:zhewenl 建议在 ExternalCachedBlockPool.get_cached_block 中更清晰地说明两种模式(确定 mask 与真实缓存命中)的用途。

实现拆解

  1. 连接器入口适配:在 connector.py 中声明 SupportsHMA 接口,新增 _validate_kv_cache_config 静态方法,在初始化时验证配置是否兼容(拒绝 CrossAttentionSpec、非 align 的 Mamba、多组且 PCP/DCP>1 组合)。
  2. 引入外部缓存协调器:新增 coordinator.py 文件,包含 ExternalCachedBlockPool(基于 (group_id, hash) 集合的 BlockPool 模拟)和 MooncakeStoreCoordinator(镜像 HybridKVCacheCoordinator.find_longest_cache_hit,根据外部存在集计算每组 load_mask 和 hit_length)。
  3. 工作线程改造:修改 worker.py 中的 KVTransferThread 及其子类,将单 token_database 改为 token_databases 列表,引入 coordinator 引用,在发送线程中根据 load_mask 过滤需要存储的块,并修复了 _handle_request 中的 token_len 对齐逻辑。
  4. 调度器调整:在 scheduler.py__init__ 中接收 kv_cache_config,将 block_ids 扩展为元组支持多组,更新 build_connector_metarequest_finished 等方法。
  5. 数据结构更新data.pyReqMetaPoolKey 等适配多组标识,MooncakeStoreConnectorMetadata 支持每组的状态。
  6. 测试覆盖:新增 4 个测试文件(coordinator 单元测试、e2e HMA 测试、scheduler 测试),修改 worker 和 connector 测试,覆盖单组/多组、全命中/部分命中/无命中、滑动窗口掩码等场景。
文件 模块 状态 重要度
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/coordinator.py 协调器 added 9.28
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py 工作线程 modified 8.84
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py 连接器 modified 7.89
tests/v1/kv_connector/unit/test_mooncake_store_coordinator.py 协调器测试 added 8.05
tests/v1/kv_connector/unit/test_mooncake_store_hma_e2e.py e2e 测试 added 8.05

关键符号

ExternalCachedBlockPool.get_cached_block MooncakeStoreCoordinator.find_longest_cache_hit MooncakeStoreConnector._validate_kv_cache_config MooncakeStoreConnector.request_finished_all_groups MooncakeStoreWorker.register_kv_caches KVCacheStoreSendingThread._handle_request MooncakeStoreScheduler.__init__

关键源码片段

vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/coordinator.py core-logic

新增的核心文件,包含 ExternalCachedBlockPool 和 MooncakeStoreCoordinator,实现外部缓存命中判断逻辑,是整个 HMA 支持的关键。

# coordinator.py – 核心类:ExternalCachedBlockPool 与 MooncakeStoreCoordinatorclass ExternalCachedBlockPool:
    """Duck-typed BlockPool backed by a ``(group_id, hash)`` exists set.    Used by MooncakeStoreCoordinator to simulate GPU-side BlockPool.
    """
​
    def __init__(self, exists: set[tuple[int, bytes]] | None = None) -> None:
        # ``exists=None`` 用于接收端,此时 hit_length 已确定,只需分配一个总是命中
        # 的假块(present_block)让每个 spec 的管理器应用自己的 mask。
        self._exists = exists
        self.null_block = KVCacheBlock(block_id=0)
        self._present_block = KVCacheBlock(block_id=1)
​
    def get_cached_block(
        self,
        block_hash: BlockHash,
        group_ids: list[int],
    ) -> list[KVCacheBlock] | None:
        # 只有当给定 group_ids 中的所有组都缓存了该 hash 时,视为命中
        # (这是 duck-typing 行为,与 BlockPool.get_cached_block 一致)
        if self._exists is None:
            return [self._present_block] * len(group_ids)
        h = bytes(block_hash)
        if all((g, h) in self._exists for g in group_ids):
            return [self._present_block] * len(group_ids)
        return None
​
​
class MooncakeStoreCoordinator:
    """Mirror of ``HybridKVCacheCoordinator.find_longest_cache_hit`` over an
    ``ExternalCachedBlockPool``."""
​
    def __init__(
        self,
        kv_cache_groups: list[KVCacheGroupSpec],
        scheduler_block_size: int,
        hash_block_size: int,
        use_eagle: bool = False,
    ) -> None:
        # 确保 block_size 关系兼容
        assert all(
            g.kv_cache_spec.block_size % hash_block_size == 0 for g in kv_cache_groups
        ), "block_size must be divisible by hash_block_size"
        assert scheduler_block_size % hash_block_size == 0
        assert all(
            scheduler_block_size % g.kv_cache_spec.block_size == 0
            for g in kv_cache_groups
        ), "scheduler_block_size must be a multiple of each group's block_size"
        self.kv_cache_groups = kv_cache_groups
        self.hash_block_size = hash_block_size
        self.lcm_block_size = scheduler_block_size
        self.use_eagle = use_eagle
        self._verify_and_split_kv_cache_groups()
​
    def _verify_and_split_kv_cache_groups(self) -> None:
        """将 kv_cache_groups 按 spec 分组(与内部 KVCacheCoordinator 类似)
        但不分配实际管理器,仅记录分组关系。"""
        attention_groups: list[
            tuple[KVCacheSpec, list[int], type[SingleTypeKVCacheManager]]
        ] = []
        for i, g in enumerate(self.kv_cache_groups):
            spec = g.kv_cache_spec
            manager_cls = spec_manager_map[type(spec)]
            for existing_spec, group_ids, existing_cls in attention_groups:
                if existing_spec == spec:
                    assert manager_cls is existing_cls
                    group_ids.append(i)
                    break
            else:
                attention_groups.append((spec, [i], manager_cls))
        # Full attention 优先(与上游收敛顺序一致)
        self.attention_groups = sorted(
            attention_groups,
            key=lambda x: not isinstance(x[0], FullAttentionSpec),
        )
        self.eagle_attn_group_indices = {
            i for i, (_, group_ids, _) in enumerate(self.attention_groups)
            if any(self.kv_cache_groups[gid].is_eagle_group for gid in group_ids)
        }
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py dependency-wiring

连接器入口声明 SupportsHMA,并实现配置验证,连接 coordinator、scheduler 和 worker。

# connector.py – 配置验证与多组 finish 接口class MooncakeStoreConnector(KVConnectorBase_V1, SupportsHMA):
    """KV connector using MooncakeDistributedStore as shared KV pool."""
​
    @staticmethod
    def _validate_kv_cache_config(
        vllm_config: VllmConfig, kv_cache_config: KVCacheConfig
    ) -> None:
        """在初始化时检查配置兼容性,避免运行时报错。"""
        from vllm.v1.kv_cache_interface import CrossAttentionSpec, MambaSpec
​
        unsupported: list[str] = []
        cache_block_size = vllm_config.cache_config.block_size
        for g_idx, g in enumerate(kv_cache_config.kv_cache_groups):
            spec = g.kv_cache_spec
            # 不支持 CrossAttentionSpec
            if isinstance(spec, CrossAttentionSpec):
                unsupported.append(f"group {g_idx}: CrossAttentionSpec")
            # Mamba 必须使用 align 模式(block_size 等于 cache block size)
            if isinstance(spec, MambaSpec) and spec.block_size != cache_block_size:
                unsupported.append(
                    f"group {g_idx}: MambaSpec with block_size="
                    f"{spec.block_size} != cache_config.block_size="
                    f"{cache_block_size} (mamba_cache_mode != 'align')"
                )
        pcp = vllm_config.parallel_config.prefill_context_parallel_size
        dcp = vllm_config.parallel_config.decode_context_parallel_size
        # 混合注意力下暂时不支持 CP
        if len(kv_cache_config.kv_cache_groups) > 1 and pcp * dcp > 1:
            unsupported.append(
                f"PCP/DCP > 1 (pcp={pcp}, dcp={dcp}) with hybrid attention"
            )
        if unsupported:
            raise ValueError(
                "MooncakeStoreConnector does not support: " + "; ".join(unsupported)
            )
​
    def request_finished_all_groups(
        self,
        request: Request,
        block_ids: tuple[list[int], ...],
    ) -> tuple[bool, dict[str, Any] | None]:
        # 扩展基类的 request_finished,支持多组 block_ids
        # 后续调用 scheduler 和 worker 进行每组的资源回收和 KV 存储
        ...

评论区精华

ZeroDivisionError when key_list is empty 正确性

gemini-code-assist 指出在 worker.py 的 _handle_request 中,如果 load_mask 过滤掉所有块导致 key_list 为空,后续 `rotation = self.tp_rank % len(key_list)` 会引发 ZeroDivisionError,建议添加空检查并跳过处理。

结论:review 已记录,但最终代码中未显式添加空检查,存在潜在风险,建议后续确认或修复。 · unresolved

Incorrect segment registration for hybrid models 正确性

gemini-code-assist 指出 register_kv_caches 中将所有 base address 注册到每个 ChunkedTokenDatabase,导致每组可能操作其他组的缓存段,造成存储键错误和潜在内存损坏,建议按 layer 分组过滤。

结论:review 指出问题,但最终代码未明确修正;后续需确保每个 token_db 只注册对应层的段。 · unresolved

Clarify two modes of cache hit in ExternalCachedBlockPool documentation

zhewenl 建议在 ExternalCachedBlockPool 的注释中更清楚地说明两种模式:一种用于确定 mask,一种用于获取真实缓存命中。

结论:该建议合理,已确认在最终代码的 docstring 和注释中有所体现,状态已解决。 · 已解决

风险与影响

  1. 核心缓存命中逻辑一致性MooncakeStoreCoordinator.find_longest_cache_hit 是外部缓存命中判断的核心,若与 GPU 端 HybridKVCacheCoordinator 行为不一致可能导致缓存误判或漏用,影响推理正确性。
  2. 多组配置兼容性_validate_kv_cache_config 拒绝了部分组合(如 CrossAttention、非 align Mamba、多组+CP>1),但若未来有新的混合配置可能遗漏,需要同步更新。
  3. 空 key_list 边界:worker 发送线程中未明确处理 key_list 为空的情况,可能引发除零错误(review 已指出但未在最终代码中确认修复)。
  4. 段注册跨组污染:若 register_kv_caches 未正确按 layer 过滤,不同组的 KV 数据可能错存到彼此的 token_database,导致读取时数据错乱(review 已指出需关注)。

用户与系统:MooncakeStoreConnector 现在支持混合注意力模型(如 DSV4),扩展了其应用范围,用户无需额外配置即可使用外部 KV 池。对现有单组场景完全兼容,改造对接口透明。
团队:新增约 1800 行代码(含测试),核心逻辑集中在 coordinator.py 和 worker.py,测试覆盖率大幅提升。review 中提出的两个潜在问题(除零、段注册)应在合并前或后续中修复。

核心缓存命中逻辑变更 多组兼容性验证不足 空 key_list 边界情况 缓存段注册可能跨组污染

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论