Prhub

#41269 [Bugfix][KV Transfer][NIXL] Notify P node on pre-admission rejection to free stranded KV blocks

原始 PR 作者 Dao007forever 合并时间 2026-05-10 13:52 文件变更 11 提交数 17 评论 9 代码增减 +156 / -2

执行摘要

新增提前拒绝通知机制,避免 P 节点 KV 块滞留

当请求在D节点准入前被拒绝(如渲染错误、模型检查失败、输入验证错误等),D从未打开NIXL传输,P节点无法收到传输完成信号,导致KV块滞留直到VLLM_NIXL_ABORT_REQUEST_TIMEOUT(默认480s)超时。该PR添加显式的提前拒绝通知,使P节点能立即释放块。

该PR值得详细审阅,特别是对于参与KV传输模块的工程师。主要看点包括:

  • 如何通过构造一个abort_immediately=True的合成请求来重用现有的request_finished钩子,避免了为清理而引入额外IPC。
  • _with_kv_transfer_rejection_cleanup中利用has_kv_connector+do_remote_prefill双重门控,确保只对启用connector的请求执行通知。
  • 对于社区用户,建议确认自定义EngineClient是否需要实现新方法以避免运行时错误。
讨论亮点
  • njhill提议重命名abort_immediately:在review vllm/v1/engine/__init__.py时,njhill建议将EngineCoreRequest的新字段从pre_aborted改为abort_immediately以提升可读性。Dao007forever同意并更新了代码。
  • NickLucche提出gating问题:NickLucche在review时指出,当前实现仅检查请求参数中的kv_transfer_params,而未验证服务器是否实际配置了KV connector;如果不存在connector,通知应当跳过。njhill随后添加了has_kv_connector检查,在OpenAIServing.__init__中从vllm_config.kv_transfer_config推导并缓存。
  • gemini-code-assist bot建议扩展异常覆盖:bot建议将所有pre-admission逻辑(包括adapter resolution、model name lookup等)纳入try-except块以确保所有拒绝路径都能触发通知。但该建议未被采纳,因为当前设计假设这些步骤不在'pre-admission'范围,且正常部署中此类失败应在P节点已有相同判断。该讨论有待进一步确认是否需要覆盖更多路径。

实现拆解

  1. 新增协议抽象方法:在vllm/engine/protocol.pyEngineClient接口中定义notify_kv_transfer_request_rejected抽象方法,所有EngineClient实现必须提供该能力。
  2. 核心包装器:在vllm/entrypoints/openai/engine/serving.pyOpenAIServing类中新增_with_kv_transfer_rejection_cleanup方法。它包装一个awaitable,在awaitable抛出异常或返回ErrorResponse时,检查请求携带了do_remote_prefill=True并且当前服务器配置了KV connector,然后调用self.engine_client.notify_kv_transfer_request_rejected发送通知。
  3. AsyncLLM实现:在vllm/v1/engine/async_llm.pyAsyncLLM类中实现该方法,构造一个EngineCoreRequest实例,设置abort_immediately=True,以及填写从原请求拷贝的kv_transfer_paramsdata_parallel_rank等,然后调用self.engine_core.add_request_async(request)将其提交给引擎核心。该请求会在调度器的waiting队列中立即被标识为要终止,从而触发request_finished钩子,完成connector侧的清理。
  4. EngineCoreRequest扩展:在vllm/v1/engine/__init__.py中为EngineCoreRequest添加abort_immediately字段(类型为bool),并在调度器中识别该标志,在request_finished后不再继续处理。
  5. serving入口迁移:在chat_completion/serving.py, completion/serving.py, responses/serving.py中,将原有的create_chat_completion等方法重命名为_create_chat_completion等私有方法,然后在原方法中调用_with_kv_transfer_rejection_cleanup包装私有方法,从而自动覆盖所有拒绝路径。
  6. NixlScheduler处理:在vllm/distributed/kv_transfer/kv_connector/v1/nixl/scheduler.py中,当收到携带abort_immediately的请求时,将其作为_reqs_need_recv的空条目处理,等待调度tick触发worker侧发送通知释放远端block。
  7. MultiConnector fan-out:在MultiConnector.request_rejected_before_admission中,遍历子connector,返回第一个识别出该参数并返回True的connector的结果,实现短接。
  8. 测试:在tests/v1/kv_connector/unit/test_nixl_connector.py中添加两个测试用例:验证abort_immediately空请求是否被正确入队、以及缺少必需元数据时忽略。在tests/v1/kv_connector/unit/test_multi_connector.py中验证MultiConnector的fan-out行为。
文件 模块 状态 重要度
vllm/entrypoints/openai/engine/serving.py 服务层 modified 7.55
vllm/v1/engine/async_llm.py 异步引擎 modified 7.13
vllm/engine/protocol.py 引擎协议 modified 6.46
vllm/entrypoints/openai/chat_completion/serving.py 聊天 API modified 6.28
vllm/entrypoints/openai/completion/serving.py 补全 API modified 6.28
vllm/entrypoints/openai/responses/serving.py 响应 API modified 6.17
tests/v1/kv_connector/unit/test_nixl_connector.py NIXL 测试 modified 5.4

关键符号

notify_kv_transfer_request_rejected (vllm/engine/protocol.py) notify_kv_transfer_request_rejected (vllm/v1/engine/async_llm.py) _with_kv_transfer_rejection_cleanup _create_chat_completion _create_completion _create_responses has_kv_connector (initialization) test_abort_immediately_remote_prefill_enqueues_empty_recv test_rejected_remote_prefill_request_missing_metadata_is_ignored

关键源码片段

vllm/entrypoints/openai/engine/serving.py core-logic

核心包装器 _with_kv_transfer_rejection_cleanup 的实现所在,负责检测拒绝并触发通知。

# 文件:vllm/entrypoints/openai/engine/serving.py
# OpenAIServing 类中新增的方法# 在 __init__ 中缓存 has_kv_connector 标志
vllm_config = getattr(engine_client, 'vllm_config', None)
kv_transfer_config = getattr(vllm_config, 'kv_transfer_config', None)
self.has_kv_connector = kv_transfer_config is not None
​
​
async def _with_kv_transfer_rejection_cleanup(
    self,
    awaitable: Awaitable[_T],
    request: ChatCompletionRequest | CompletionRequest | ResponsesRequest,
    raw_request: Request | None,
) -> _T:
    """
    包装一个 create_xxx 协程,当它抛出异常或返回 ErrorResponse
    (即请求从未到达引擎)时,通知 KV connector 释放预占的远程 prefill 块。
    """
    kv_transfer_params = self.has_kv_connector and request.kv_transfer_params
    if not kv_transfer_params or not kv_transfer_params.get('do_remote_prefill'):
        return await awaitable
    notify = True
    try:
        result = await awaitable
        if not isinstance(result, ErrorResponse):
            notify = False
        return result
    finally:
        if notify:
            try:
                await self.engine_client.notify_kv_transfer_request_rejected(
                    request.request_id,
                    kv_transfer_params,
                    data_parallel_rank=self._get_data_parallel_rank(raw_request),
                )
            except Exception:
                logger.warning(
                    'Failed to notify KV connector about rejected request %s',
                    request.request_id,
                    exc_info=True,
                )
vllm/v1/engine/async_llm.py core-logic

实现 notify_kv_transfer_request_rejected 方法,构造 abort_immediately 请求。

# 文件 : vllm/v1/engine/async_llm.py
# AsyncLLM.notify_kv_transfer_request_rejected 实现async def notify_kv_transfer_request_rejected(
    self,
    request_id: str,
    kv_transfer_params: dict[str, Any],
    *,
    data_parallel_rank: int | None = None,
) -> None:
    """
    提交一个 pre-aborted 请求,使得 connector 的 request_finished 钩子
    能够运行以释放任何 pre-admission KV-transfer 资源(如 P 节点上的
    NIXL prefill 块)。
    """
    request = EngineCoreRequest(
        request_id=request_id,
        prompt_token_ids=[0], # 占位
        mm_features=None,
        sampling_params=SamplingParams(
            max_tokens=1,
            extra_args={'kv_transfer_params': dict(kv_transfer_params)},
        ),
        pooling_params=None,
        arrival_time=time.time(),
        lora_request=None,
        cache_salt=None,
        data_parallel_rank=data_parallel_rank,
        abort_immediately=True, # 关键标志:调度器会立刻将其加工为已完成
    )
    await self.engine_core.add_request_async(request)
vllm/engine/protocol.py core-logic

新增 EngineClient 抽象方法 notify_kv_transfer_request_rejected,定义了接口契约。

# 文件 : vllm/engine/protocol.py
# EngineClient 新抽象方法@abstractmethod
async def notify_kv_transfer_request_rejected(
    self,
    request_id: str,
    kv_transfer_params: dict[str, Any],
    *,
    data_parallel_rank: int | None = None,
) -> None:
    """
    通知引擎,一个 KV-transfer 请求在引擎准入之前被拒绝了,
    让 connector 侧可以执行清理(如释放 P 节点上的 prefill 块)。
    """
    ...

评论区精华

重命名 abort_immediately 字段 设计

njhill 在 review vllm/v1/engine/__init__.py 时建议将 pre_aborted 改为 abort_immediately 以提高清晰度。

结论:Dao007forever 同意并修改了代码。 · 已解决

无 KV connector 配置时不应发送通知 正确性

NickLucche 指出当前实现仅检查请求参数,未检查服务器是否配置了 KV connector,建议 gating。

结论:njhill 在 OpenAIServing 中添加了 has_kv_connector 检查,并在 _with_kv_transfer_rejection_cleanup 中前置判断。 · 已解决

扩展异常覆盖以涵盖所有 pre-admission 路径 正确性

gemini-code-assist bot 建议将 _maybe_get_adapters 等调用纳入 try-except,以确保所有失败都能触发通知。

结论:该建议未在 PR 中采纳,可能留作后续改进。 · unresolved

风险与影响

  • 新增协议接口EngineClient新增抽象方法,所有子类必须实现。忽略了可能导致运行时 TypeError,对于使用自定义 EngineClient 的部署(如 DPLBAsyncMPClient)需要同步更新。
  • 请求状态机干扰abort_immediately字段改变了EngineCoreRequest的生命周期,原本正常请求不会在等待队列中被立即终止。必须确保调度器正确处理此标志,不影响其他请求的状态转换。
  • 性能影响:仅在拒绝路径触发额外调用,对于正常请求无影响。但由于使用了合成请求,增加了 engine_core 的消息传递,可能在高拒绝率场景下产生轻微开销。
  • 缺少端到端集成测试:PR 作者建议 reviewer 在真实 P/D NIXL 部署上验证,当前仅包含单元测试,未覆盖完整的端到端流程。
  • 安全性:无明显的安全问题。
  • 用户/业务影响:对于使用 NIXL KV 传输的 PD 分离部署用户,P 节点上因为提前拒绝导致的 KV 块滞留时间从数百秒降低到毫秒级,显著提升 prefill 节点的内存利用率和整体吞吐量。对于不使用 KV 传输的用户,变更不产生影响(依赖 has_kv_connector 门控)。
  • 系统影响:引入新的协议方法与消息路径,可能会影响引擎消息处理的性能边界,但仅拒绝时会触发。DPLBAsyncMPClient 等变体需要相应适配。
  • 团队维护:增加一个横跨协议层、引擎层、serving 层和 connector 层的协作机制,维护复杂度上升,但逻辑集中,可测试性较好。
新增接口方法 调度器逻辑变更 缺少端到端测试

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论