执行摘要
- 一句话:MooncakeStore 新增混合注意力缓存支持
- 推荐动作:建议仔细审阅
MooncakeStoreCoordinator.find_longest_cache_hit 的实现,确保与内部 HybridKVCacheCoordinator 的掩码计算逻辑一致。优先处理 review 中提出的 key_list 空检查和 segment 注册过滤问题,建议添加对应边界测试。
功能与动机
PR 旨在为 MooncakeStoreConnector 添加 hybrid KV cache 管理支持,以便使用混合注意力布局(如全注意力 + 滑动窗口)的模型(如 DSV4)可以利用 Mooncake Store 作为共享外部 KV 池。同时声明 SupportsHMA 接口并在初始化阶段验证 KVCacheConfig,拒绝不支持的配置组合。
实现拆解
- 连接器入口适配:在
connector.py 中声明 SupportsHMA 接口,新增 _validate_kv_cache_config 静态方法,在初始化时验证配置是否兼容(拒绝 CrossAttentionSpec、非 align 的 Mamba、多组且 PCP/DCP>1 组合)。
- 引入外部缓存协调器:新增
coordinator.py 文件,包含 ExternalCachedBlockPool(基于 (group_id, hash) 集合的 BlockPool 模拟)和 MooncakeStoreCoordinator(镜像 HybridKVCacheCoordinator.find_longest_cache_hit,根据外部存在集计算每组 load_mask 和 hit_length)。
- 工作线程改造:修改
worker.py 中的 KVTransferThread 及其子类,将单 token_database 改为 token_databases 列表,引入 coordinator 引用,在发送线程中根据 load_mask 过滤需要存储的块,并修复了 _handle_request 中的 token_len 对齐逻辑。
- 调度器调整:在
scheduler.py 的 __init__ 中接收 kv_cache_config,将 block_ids 扩展为元组支持多组,更新 build_connector_meta 和 request_finished 等方法。
- 数据结构更新:
data.py 中 ReqMeta、PoolKey 等适配多组标识,MooncakeStoreConnectorMetadata 支持每组的状态。
- 测试覆盖:新增 4 个测试文件(coordinator 单元测试、e2e HMA 测试、scheduler 测试),修改 worker 和 connector 测试,覆盖单组/多组、全命中/部分命中/无命中、滑动窗口掩码等场景。
关键文件:
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/coordinator.py(模块 协调器;类别 source;类型 core-logic;符号 ExternalCachedBlockPool, init, get_cached_block, MooncakeStoreCoordinator): 新增的核心文件,包含 ExternalCachedBlockPool 和 MooncakeStoreCoordinator,实现外部缓存命中判断逻辑,是整个 HMA 支持的关键。
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py(模块 工作线程;类别 source;类型 dependency-wiring;符号 init, register_kv_caches, _repr_tensor, lookup): 工作线程核心实现被大幅改造,支持多组 token_databases 和协调器,store/load 逻辑需按组掩码进行。
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py(模块 连接器;类别 source;类型 dependency-wiring;符号 MooncakeStoreConnector, _validate_kv_cache_config, request_finished_all_groups): 连接器入口声明 SupportsHMA,并实现配置验证,连接 coordinator、scheduler 和 worker。
tests/v1/kv_connector/unit/test_mooncake_store_coordinator.py(模块 协调器测试;类别 test;类型 test-coverage;符号 _make_coord, test_external_cached_block_pool_tautological_returns_present_for_any_hash, test_external_cached_block_pool_hit_all_groups, test_external_cached_block_pool_miss_one_group): 新增的协调器单元测试,覆盖 ExternalCachedBlockPool 各种命中/未命中组合以及 MooncakeStoreCoordinator 的单组/混合场景。
tests/v1/kv_connector/unit/test_mooncake_store_hma_e2e.py(模块 e2e 测试;类别 test;类型 test-coverage;符号 _DictStore, init, setup, register_buffer): 端到端集成测试,使用 dict 模拟 MooncakeStore,验证混合注意力配置下存储/读取的正确性。
关键符号:ExternalCachedBlockPool.get_cached_block, MooncakeStoreCoordinator.find_longest_cache_hit, MooncakeStoreConnector._validate_kv_cache_config, MooncakeStoreConnector.request_finished_all_groups, MooncakeStoreWorker.register_kv_caches, KVCacheStoreSendingThread._handle_request, MooncakeStoreScheduler.init
关键源码片段
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/coordinator.py
新增的核心文件,包含 ExternalCachedBlockPool 和 MooncakeStoreCoordinator,实现外部缓存命中判断逻辑,是整个 HMA 支持的关键。
# coordinator.py – 核心类:ExternalCachedBlockPool 与 MooncakeStoreCoordinator
class ExternalCachedBlockPool:
"""Duck-typed BlockPool backed by a ``(group_id, hash)`` exists set.
Used by MooncakeStoreCoordinator to simulate GPU-side BlockPool.
"""
def __init__(self, exists: set[tuple[int, bytes]] | None = None) -> None:
# ``exists=None`` 用于接收端,此时 hit_length 已确定,只需分配一个总是命中
# 的假块(present_block)让每个 spec 的管理器应用自己的 mask。
self._exists = exists
self.null_block = KVCacheBlock(block_id=0)
self._present_block = KVCacheBlock(block_id=1)
def get_cached_block(
self,
block_hash: BlockHash,
group_ids: list[int],
) -> list[KVCacheBlock] | None:
# 只有当给定 group_ids 中的所有组都缓存了该 hash 时,视为命中
# (这是 duck-typing 行为,与 BlockPool.get_cached_block 一致)
if self._exists is None:
return [self._present_block] * len(group_ids)
h = bytes(block_hash)
if all((g, h) in self._exists for g in group_ids):
return [self._present_block] * len(group_ids)
return None
class MooncakeStoreCoordinator:
"""Mirror of ``HybridKVCacheCoordinator.find_longest_cache_hit`` over an
``ExternalCachedBlockPool``."""
def __init__(
self,
kv_cache_groups: list[KVCacheGroupSpec],
scheduler_block_size: int,
hash_block_size: int,
use_eagle: bool = False,
) -> None:
# 确保 block_size 关系兼容
assert all(
g.kv_cache_spec.block_size % hash_block_size == 0 for g in kv_cache_groups
), "block_size must be divisible by hash_block_size"
assert scheduler_block_size % hash_block_size == 0
assert all(
scheduler_block_size % g.kv_cache_spec.block_size == 0
for g in kv_cache_groups
), "scheduler_block_size must be a multiple of each group's block_size"
self.kv_cache_groups = kv_cache_groups
self.hash_block_size = hash_block_size
self.lcm_block_size = scheduler_block_size
self.use_eagle = use_eagle
self._verify_and_split_kv_cache_groups()
def _verify_and_split_kv_cache_groups(self) -> None:
"""将 kv_cache_groups 按 spec 分组(与内部 KVCacheCoordinator 类似)
但不分配实际管理器,仅记录分组关系。"""
attention_groups: list[
tuple[KVCacheSpec, list[int], type[SingleTypeKVCacheManager]]
] = []
for i, g in enumerate(self.kv_cache_groups):
spec = g.kv_cache_spec
manager_cls = spec_manager_map[type(spec)]
for existing_spec, group_ids, existing_cls in attention_groups:
if existing_spec == spec:
assert manager_cls is existing_cls
group_ids.append(i)
break
else:
attention_groups.append((spec, [i], manager_cls))
# Full attention 优先(与上游收敛顺序一致)
self.attention_groups = sorted(
attention_groups,
key=lambda x: not isinstance(x[0], FullAttentionSpec),
)
self.eagle_attn_group_indices = {
i for i, (_, group_ids, _) in enumerate(self.attention_groups)
if any(self.kv_cache_groups[gid].is_eagle_group for gid in group_ids)
}
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py
连接器入口声明 SupportsHMA,并实现配置验证,连接 coordinator、scheduler 和 worker。
# connector.py – 配置验证与多组 finish 接口
class MooncakeStoreConnector(KVConnectorBase_V1, SupportsHMA):
"""KV connector using MooncakeDistributedStore as shared KV pool."""
@staticmethod
def _validate_kv_cache_config(
vllm_config: VllmConfig, kv_cache_config: KVCacheConfig
) -> None:
"""在初始化时检查配置兼容性,避免运行时报错。"""
from vllm.v1.kv_cache_interface import CrossAttentionSpec, MambaSpec
unsupported: list[str] = []
cache_block_size = vllm_config.cache_config.block_size
for g_idx, g in enumerate(kv_cache_config.kv_cache_groups):
spec = g.kv_cache_spec
# 不支持 CrossAttentionSpec
if isinstance(spec, CrossAttentionSpec):
unsupported.append(f"group {g_idx}: CrossAttentionSpec")
# Mamba 必须使用 align 模式(block_size 等于 cache block size)
if isinstance(spec, MambaSpec) and spec.block_size != cache_block_size:
unsupported.append(
f"group {g_idx}: MambaSpec with block_size="
f"{spec.block_size} != cache_config.block_size="
f"{cache_block_size} (mamba_cache_mode != 'align')"
)
pcp = vllm_config.parallel_config.prefill_context_parallel_size
dcp = vllm_config.parallel_config.decode_context_parallel_size
# 混合注意力下暂时不支持 CP
if len(kv_cache_config.kv_cache_groups) > 1 and pcp * dcp > 1:
unsupported.append(
f"PCP/DCP > 1 (pcp={pcp}, dcp={dcp}) with hybrid attention"
)
if unsupported:
raise ValueError(
"MooncakeStoreConnector does not support: " + "; ".join(unsupported)
)
def request_finished_all_groups(
self,
request: Request,
block_ids: tuple[list[int], ...],
) -> tuple[bool, dict[str, Any] | None]:
# 扩展基类的 request_finished,支持多组 block_ids
# 后续调用 scheduler 和 worker 进行每组的资源回收和 KV 存储
...
评论区精华
- ZeroDivisionError 风险:gemini-code-assist 指出当
load_mask 过滤掉所有块时 key_list 可能为空,导致后续 rotation = self.tp_rank % len(key_list) 出现除零错误,建议添加空检查跳过处理。
- 缓存段注册错误:gemini-code-assist 指出
register_kv_caches 中所有 base address 被注册到每个组的 token_database,可能导致跨组数据污染,应按 layer 分组过滤。
- 注释补充建议:zhewenl 建议在
ExternalCachedBlockPool.get_cached_block 中更清晰地说明两种模式(确定 mask 与真实缓存命中)的用途。
- ZeroDivisionError when key_list is empty (correctness): review 已记录,但最终代码中未显式添加空检查,存在潜在风险,建议后续确认或修复。
- Incorrect segment registration for hybrid models (correctness): review 指出问题,但最终代码未明确修正;后续需确保每个 token_db 只注册对应层的段。
- Clarify two modes of cache hit in ExternalCachedBlockPool (documentation): 该建议合理,已确认在最终代码的 docstring 和注释中有所体现,状态已解决。
风险与影响
关联脉络
参与讨论