执行摘要
- 一句话:新增 MooncakeStoreConnector 实现跨实例 KV 缓存共享与卸载
- 推荐动作:建议本 PR 合并至主线,以便用户测试并提供反馈。重点关注 ZMQ 错误处理和 TP 分片缓存查找的修复,建议在后续迭代中统一 AttentionBackend 的布局检测接口以消除魔法检测。
功能与动机
在后端生产环境中,请求常共享公共前缀(系统提示、few-shot 示例、多轮对话历史等)。vLLM 的本地前缀缓存仅适用于单实例,无法处理跨实例复用、驱逐后重算或冷启动。通过引入 Mooncake 分布式存储作为共享 KV 池,可实现跨实例的内容寻址缓存,显著降低延迟与计算开销。详见 RFC #38474。
实现拆解
- 数据结构与配置:
data.py 定义 PoolKey、KeyMetadata、ChunkedTokenDatabase 等,用于构建内容寻址的缓存键和 GPU 地址映射。worker.py 中 MooncakeStoreConfig 负责从 JSON 文件或环境变量加载 Mooncake 分布式存储配置。
- 调度器端:外部缓存查询:
scheduler.py 实现 MooncakeStoreScheduler,通过 LookupKeyClient(ZMQ IPC)查询 Mooncake 存储中的前缀命中情况,生成 LoadSpec 元数据,指导后续分配与加载。
- 工作器端:异步传输线程:
worker.py 实现 MooncakeStoreWorker,管理后台 KVCacheStoreSendingThread 和 KVCacheStoreRecvingThread,通过 Mooncake 传输引擎异步执行 KV 块的 put/get。支持 stride-based 布局检测,兼容 FlashAttn 和 FlashInfer。
- 连接器整合与角色分发:
connector.py 实现 MooncakeStoreConnector,继承 KVConnectorBase_V1,根据 KVConnectorRole(SCHEDULER/WORKER)分派给调度器或工作器;start_load_kv / wait_for_save 为 no-op,实际操作在 get_finished 中完成以最大化重叠。
- 注册与文档:在
factory.py 中注册新连接器,并通过 MooncakeStoreConnector_usage.md 提供配置示例和部署指南。
关键文件:
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py(模块 连接器;类别 source;类型 core-logic;符号 MooncakeStoreKVEvents, MooncakeStoreConnector, init, start_load_kv): 核心连接器类,负责角色分发和与调度器/工作器的桥接,是此新连接器的入口点。
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py(模块 调度器;类别 source;类型 dependency-wiring;符号 _new_req_prefill_tokens, MooncakeStoreScheduler, init, get_num_new_matched_tokens): 调度器端核心逻辑,包含外部缓存查询(get_num_new_matched_tokens)、元数据构建(build_connector_meta)和请求状态跟踪。
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py(模块 工作器;类别 source;类型 dependency-wiring;符号 MooncakeStoreConfig, from_file, load_from_env, _parse_size): 工作器端实现,包括 Mooncake 存储配置加载、KV 缓存注册、后台传输线程和 ZMQ 查找服务。
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/data.py(模块 数据结构;类别 source;类型 dependency-wiring;符号 KeyMetadata, PoolKey, hash, to_string): 定义核心数据结构:PoolKey(内容寻址键)、KeyMetadata、ChunkedTokenDatabase(GPU地址映射),是缓存操作的基础。
tests/v1/kv_connector/unit/test_mooncake_store_connector.py(模块 连接器测试;类别 test;类型 test-coverage;符号 _make_vllm_config, _make_block_stored, test_scheduler_role_initializes_store_scheduler_only, test_worker_role_initializes_store_worker): 单元测试覆盖连接器角色初始化、ZMQ 路径计算和事件聚合,验证核心逻辑正确性。
关键符号:MooncakeStoreConnector, MooncakeStoreScheduler, MooncakeStoreWorker, MooncakeStoreConfig, ChunkedTokenDatabase, PoolKey, KVCacheStoreSendingThread, KVCacheStoreRecvingThread, LookupKeyClient, LookupKeyServer
关键源码片段
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py
核心连接器类,负责角色分发和与调度器/工作器的桥接,是此新连接器的入口点。
# vllm/.../mooncake/store/connector.py
class MooncakeStoreConnector(KVConnectorBase_V1):
'''KV 连接器,使用 Mooncake 分布式存储作为共享 KV 池。'''
def __init__(
self,
vllm_config: VllmConfig,
role: KVConnectorRole,
kv_cache_config: KVCacheConfig | None = None,
):
super().__init__(vllm_config=vllm_config, role=role,
kv_cache_config=kv_cache_config)
assert vllm_config.kv_transfer_config is not None
self.kv_role = vllm_config.kv_transfer_config.kv_role
self._kv_cache_events: MooncakeStoreKVEvents | None = None
# 根据角色创建调度器组件或工作器组件 ——
# 调度器运行在 CPU 进程,负责查询缓存命中并构建元数据;
# Worker 运行在 GPU 进程,负责实际的异步 KV 传输。
if role == KVConnectorRole.SCHEDULER:
self.connector_scheduler = MooncakeStoreScheduler(vllm_config)
else:
self.connector_worker = MooncakeStoreWorker(vllm_config)
def get_num_new_matched_tokens(
self,
request: Request,
num_computed_tokens: int,
) -> tuple[int, bool]:
'''委托给调度器组件,查询外部存储的缓存命中。'''
assert self.connector_scheduler is not None
return self.connector_scheduler.get_num_new_matched_tokens(
request, num_computed_tokens
)
def build_connector_meta(
self,
scheduler_output: SchedulerOutput,
) -> KVConnectorMetadata:
'''委托给调度器组件,构建连接器元数据。'''
assert self.connector_scheduler is not None
return self.connector_scheduler.build_connector_meta(scheduler_output)
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py
调度器端核心逻辑,包含外部缓存查询(get_num_new_matched_tokens)、元数据构建(build_connector_meta)和请求状态跟踪。
# vllm/.../mooncake/store/scheduler.py
class MooncakeStoreScheduler:
'''调度器端组件,查询外部存储缓存命中并构建加载规格。'''
def __init__(self, vllm_config: VllmConfig):
assert vllm_config.kv_transfer_config is not None
self.kv_role = vllm_config.kv_transfer_config.kv_role
self.load_async = vllm_config.kv_transfer_config.kv_connector_extra_config.get(
'load_async', True
)
# 初始化 ZMQ 客户端,用于向 Rank 0 的 LookupKeyServer 发送查询
self.client = LookupKeyClient(vllm_config)
# 考虑并行上下文的块大小调整
self._block_size = vllm_config.cache_config.block_size
if vllm_config.parallel_config.prefill_context_parallel_size > 1:
self._block_size *= vllm_config.parallel_config.prefill_context_parallel_size
if vllm_config.parallel_config.decode_context_parallel_size > 1:
self._block_size *= vllm_config.parallel_config.decode_context_parallel_size
# 是否丢弃不完整块
self._discard_partial_chunks = (
vllm_config.kv_transfer_config.get_from_extra_config(
'discard_partial_chunks', True
)
)
# 每个请求的加载规格和跟踪器
self.load_specs: dict[str, LoadSpec] = {}
self._request_trackers: dict[str, RequestTracker] = {}
self._unfinished_requests: dict[str, tuple[Request, list[int]]] = {}
def get_num_new_matched_tokens(
self,
request: Request,
num_computed_tokens: int,
) -> tuple[int, bool]:
'''查询外部存储的缓存命中,返回需要额外分配的 token 数及异步标志。'''
if self._discard_partial_chunks:
token_len = (request.num_tokens // self._block_size) * self._block_size
else:
token_len = request.num_tokens
if token_len < self._block_size:
return 0, False # 不足一个块,无法复用外部缓存
# 通过 ZMQ 向 LookupKeyServer 查询命中 token 数
num_external_hit_tokens = self.client.lookup(token_len, request.block_hashes)
if num_external_hit_tokens == request.num_tokens:
num_external_hit_tokens -= 1 # 至少保留一个 token 避免零开销
need_to_allocate = num_external_hit_tokens - num_computed_tokens
if need_to_allocate <= 0:
return 0, False
# 记录加载规格,供 worker 在 get_finished 中执行实际加载
self.load_specs[request.request_id] = LoadSpec(
vllm_cached_tokens=num_computed_tokens,
kvpool_cached_tokens=num_external_hit_tokens,
can_load=False,
)
return need_to_allocate, self.load_async
评论区精华
审查中主要讨论了以下问题:
风险与影响
- 风险:主要风险包括:
1) ZMQ 通信可靠性:LookupKeyServer 缺乏异常处理,一旦出错可能导致调度器挂起;
2) 缓存正确性:TP 分片下前缀查找可能错误命中其他分片,造成推理错误;
3) 线程安全与资源泄漏:KVCacheStoreSendingThread 未妥善处理异常,可能阻塞请求完成;
4) 外部依赖:Mooncake 分布式存储本身可能存在 bug 或性能瓶颈;
5) 磁盘卸载暂未启用:当前版本已剥离磁盘卸载逻辑,但设计中的预算分割可能留有隐患。
- 影响:该连接器作为可选新功能,不影响现有连接器。用户需配置
kv-transfer-config 并运行 Mooncake master 进程。对系统增加约 2600 行新代码,新增 10 个文件。对团队增加一个需要维护的 KV 连接器实现,但重用了已有接口(KVConnectorBase_V1)。无功能回归风险。
- 风险标记:ZMQ 可靠性, 缓存正确性, 线程安全, 外部依赖
关联脉络
参与讨论