Prhub

#42694 [KVConnector][Mooncake] Wire reset_cache cascade end-to-end

原始 PR 作者 aoshen02 合并时间 2026-05-27 11:52 文件变更 8 提交数 18 评论 7 代码增减 +517 / -14

执行摘要

为 Mooncake 实现 connector reset_cache,修复 RL 权重更新后外部缓存静默过时问题

KVConnectorBase_V1.reset_cache (added in #27170) is currently a no-op for MooncakeStoreConnector. A caller hitting Scheduler.reset_prefix_cache(reset_connector=True) on a Mooncake engine gets the local prefix cache cleared but the external Mooncake master keeps all KV blocks computed against the previous weights. For RL post-training and other weight-update workflows this is a silent stale-cache correctness hole: the next request can read KV that was hashed against an old policy.

值得精读,尤其关注 ZMQ admin 通道从隐式约定演进为命名标签协议的设计决策,以及 drain 发送队列 + remove_all 的竞态处理。建议在未来 PR 中强化 process_request 的输入验证和异常捕获,避免后台线程静默失效。

讨论亮点
  1. 线程健壮性风险:gemini-code-assist[bot] 指出 process_request 循环缺乏稳健的错误处理和输入验证,空帧或消息格式错误会导致 IndexError 并终止后台线程,进而造成 prefix 查询和 reset 静默失效。该问题未被解决,属于已识别的潜在风险。
  2. 日志级别调整:ivanium 质疑无 connector 时 reset_connector_cachelogger.warning 降为 logger.debug 是否合理,作者回答大多数情况下(RL)会执行到这里,因此 debug 级别更合适。
  3. 注释精简:ivanium 指出 _reset_caches 注释过于冗长,作者随后在后续 commit 中进行了精简。

实现拆解

  1. 引入命名标签协议:在 protocol.py 中定义 LOOKUP_MSG/RESET_MSG 请求标签和 RESP_OK/RESP_ERR 响应常量,替代原 lookup 协议中隐式的帧位置假设,使 admin 通道自描述且可扩展。
  2. 改造 worker 端 LookupKeyServerprocess_request 循环根据首帧标签分发——LOOKUP_MSG 执行原有前缀查询,RESET_MSG 先 join 发送线程队列确保无 inflight put,再调用 store.remove_all(force=True),并返回 OK/ERR 响应。异常分支发送 ERR 并日志记录。
  3. 新增 LookupKeyClient.reset():复用现有 ZMQ 连接,发送 RESET_MSG(无负载),等待工作线程返回的 1 字节状态码,并转换为布尔返回。
  4. 实现 scheduler 端 reset 逻辑MooncakeStoreScheduler.reset_store() 调用 self.client.reset(),捕获异常返回 False,成功则返回 True。MooncakeStoreConnector.reset_cache() 在 SCHEDULER 角色时清除本地 load_specs_kv_cache_events,然后委托给 reset_store();WORKER 角色直接返回 None。
  5. 贯通引擎和调度器调用链EngineCore._reset_caches() 新增 reset_connector=True 参数,传递给 reset_prefix_cache()Scheduler.reset_connector_cache() 在无 connector 时不再 warning 并返回 False,改为 logger.debug 并返回 True,避免无 connector 场景下外部调用链断裂。
  6. 测试覆盖:新增 9 个单元/集成测试,覆盖 SCHEDULER 成功/失败、WORKER 空操作、client.reset 协议、reset_store 异常处理、无 connector 空转等场景。
文件 模块 状态 重要度
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py worker 层 modified 7.74
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py 协议层 added 6.69
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py 连接器 modified 6.77
tests/v1/kv_connector/unit/test_mooncake_store_connector.py 测试 modified 7.24
vllm/v1/engine/core.py 引擎 modified 6.06
vllm/v1/core/sched/scheduler.py 调度器 modified 5.44
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py 调度器端 modified 6.84
tests/v1/core/test_scheduler.py 测试 modified 5.12

关键符号

MooncakeStoreConnector.reset_cache MooncakeStoreScheduler.reset_store LookupKeyClient.reset LookupKeyServer.process_request Scheduler.reset_connector_cache EngineCore._reset_caches

关键源码片段

vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py core-logic

核心实现文件:LookupKeyServer 新增消息类型分发(LOOKUP_MSG/RESET_MSG),LookupKeyClient 新增 reset() 方法,是 reset 级联的 worker 端执行单元。

# vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py
# LookupKeyServer 的 process_request 循环(精简版)class LookupKeyServer:
    """ZMQ server on worker rank 0 for the LookupKey admin channel."""
    def __init__(self, store_worker, vllm_config):
        # ... 初始化 socket, decoder ...
        self.store_worker = store_worker
        self.running = True
​
        def process_request():
            while self.running:
                all_frames = self.socket.recv_multipart(copy=False)
                msg_type = bytes(all_frames[0])
​
                if msg_type == LOOKUP_MSG:
                    # 原 lookup 逻辑,帧索引后移(frame 0 是 tag)
                    token_len = int.from_bytes(all_frames[1], byteorder="big")
                    hash_frames = all_frames[2:]
                    hashes_str = self.decoder.decode(hash_frames)
                    block_hashes = [BlockHash(bytes.fromhex(s)) for s in hashes_str]
                    result = self.store_worker.lookup(token_len, block_hashes)
                    self.socket.send(result.to_bytes(4, "big"))
​
                elif msg_type == RESET_MSG:
                    try:
                        # 先 drain 发送线程队列,防止 put 在 reset 后重新写入
                        if self.store_worker.kv_send_thread is not None:
                            self.store_worker.kv_send_thread.request_queue.join()
                        self.store_worker.store.remove_all(force=True)
                        self.socket.send(RESP_OK)
                    except Exception as e:
                        self.socket.send(RESP_ERR)
                else:
                    self.socket.send(RESP_ERR)
​
        self.thread = threading.Thread(target=process_request, daemon=True)
        self.thread.start()
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py core-logic

新增的协议定义文件,是 admin 通道的单一事实来源,定义消息标签和响应状态码,使协议自描述、可扩展。

# vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py
"""Wire-format constants for the LookupKey ZMQ admin channel.Request: [msg_type: bytes] [payload_frames...]
  msg_type == LOOKUP_MSG:
    frame 1: token_len (u32 big-endian, 4 bytes)
    frame 2..n: msgpack-encoded list[str] of block-hash hex digests
    Response: [hit_count: u32 big-endian, 4 bytes]
  msg_type == RESET_MSG:
    (no payload frames)
    Response: [RESP_OK] or [RESP_ERR]
"""# 请求消息类型标签(帧 0)
LOOKUP_MSG: bytes = b"lookup"
RESET_MSG: bytes = b"reset"# 管理命令响应状态码(单字节)
RESP_OK: bytes = b"\x01"
RESP_ERR: bytes = b"\x00"
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py core-logic

暴露 reset_cache() 作为对外接口,根据角色进行调度器 /worker 分流,并清除本地状态。

# vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py
class MooncakeStoreConnector(KVConnectorBase_V1):
    # ... existing methods ...
​
    def reset_cache(self) -> bool | None:
        """Reset the external Mooncake store on prefix-cache reset.        Drains the worker send queue, then runs ``remove_all`` on the
        Mooncake master. Caller must first pause generation (e.g.
        ``pause_generation``) so no new puts are enqueued during drain.        Returns True on ack, False on failure, None for the worker role.
        """
        if self.role == KVConnectorRole.SCHEDULER:
            assert self.connector_scheduler is not None
            # 清除本地的 key 引用,避免 reset 后仍持有旧引用
            self.connector_scheduler.load_specs.clear()
            self._kv_cache_events = None
            return self.connector_scheduler.reset_store()
        return None # worker 不应触发 reset,由 ZMQ admin 驱动

评论区精华

process_request 循环缺乏错误和输入验证 正确性

gemini-code-assist[bot] 指出若 all_frames 为空或 LOOKUP_MSG 帧不足,IndexError 会使后台线程终止,导致 prefix 查询和 reset 静默失效。

结论:未解决,作者未在后续提交中增加防御性检查。 · unresolved

reset_connector_cache 日志级别从 warning 降为 debug style

ivanium 询问何时会进入无 connector 路径,作者回答大多数 RL 场景会进入,debug 级别合适。

结论:降级为 debug 已合入。 · 已解决

_reset_caches 注释冗长 style

ivanium 指出注释过于 verbose,作者表示 fix 并在后续 commit 中精简。

结论:已精简,后续 commit 中修改。 · 已解决

风险与影响

  1. 线程崩溃风险(高)worker.pyprocess_request 循环对消息帧数缺乏预验证,异常未捕获会导致后台线程退出,进而阻塞所有 admin 请求(lookup 和 reset)。虽不影响主推理路径(admin 通道独立),但会导致外部缓存无法查询或重置,性能或正确性退化。
  2. 调用顺序依赖:reset 前提条件是调用者必须暂停生成并确保无 inflight lookup/transfer,否则 drain 可能不彻底或 reset 与 put 并发。虽在代码注释中强调,但缺乏运行时检查,依赖方需要自行保证。
  3. 跨 HMA(异构内存访问)安全store.remove_all 会清空底层平坦键空间,适用于所有 HMA 区域,可能影响其他连接器共享的存储(如果存在)。但在当前 Mooncake 设计下每个 worker 独享 store,风险较低。

用户影响:使用 MooncakeStoreConnector 的 RL 工作流(如 verl)将自动获得正确的权重更新后外部缓存清空能力,无需手动 workaround;其他用户无感知。
系统影响:新增的 admin 通道复用已有 ZMQ 连接,无新端口或握手;reset 命令响应时间取决于 store.remove_all 耗时,可能引入短暂阻塞。
团队影响:为后续添加其他 admin 命令(如 dump stats)提供了可扩展的 tag 分发模式。

线程健壮性不足 调用顺序依赖 缺乏输入验证

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论