Prhub

#42611 [KV Connector][Offloading] Flush all pending jobs on last step

原始 PR 作者 liranschour 合并时间 2026-05-18 20:59 文件变更 3 提交数 18 评论 6 代码增减 +71 / -14

执行摘要

末步 flush 所有待定 KV 转移作业

PR 描述指出:'In case we know that there will not be another step. Tell the worker to flush all jobs to allow offloading to happen in this step.' 这解决了 KV offloading 中末步可能无法完成转移的问题。

建议关注 build_connector_meta 中的 flush 触发逻辑,以及其与 is_finished() 的关联。对于维护 KV offloading 的读者,这个 PR 的 review 讨论具有参考价值。

讨论亮点

gemini-code-assist 指出初始实现中使用 if not self._req_status 是 no-op,因为 _req_status_jobs 同步,改为通过 all(rs.req.is_finished()) 触发。orozery 建议使用 self._jobs.keys() 而非 self._jobs 来更新 flush set。最终实现采纳了这些建议。

实现拆解

  1. 在 scheduler.py 的 build_connector_meta 方法中,遍历完 preempted 请求后,增加对所有请求状态的检查:如果所有 _req_status 中的请求都已结束,则将 _jobs 中所有 job 的 key 加入 _current_batch_jobs_to_flush
  2. 在测试工具 _parse_transfers 中,处理 flush 时区分 store 和 load 方向,之前只处理了 store 方向。
  3. 在测试文件 test_scheduler.py 中,新增 test_flush_all_jobs_when_no_requests_remain 测试,并且为现有测试增加 expected_flushed 参数以验证 flush 行为。
文件 模块 状态 重要度
vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py 卸载调度 modified 5.8
tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py 调度测试 modified 6.44
tests/v1/kv_connector/unit/offloading_connector/utils.py 测试工具 modified 4.83

关键符号

build_connector_meta test_flush_all_jobs_when_no_requests_remain _parse_transfers

关键源码片段

vllm/distributed/kv_transfer/kv_connector/v1/offloading/scheduler.py core-logic

核心源码变更,新增末步 flush 检测逻辑

def build_connector_meta(self, scheduler_output: SchedulerOutput) -> KVConnectorMetadata:
    # 处理 preempted 请求的转移 jobs
    for req_id in scheduler_output.preempted_req_ids or ():
        req_status = self._req_status.get(req_id)
        if req_status is None or not req_status.transfer_jobs:
            continue
        any_jid = next(iter(req_status.transfer_jobs))
        assert self._jobs[any_jid].is_store
        self._current_batch_jobs_to_flush.update(req_status.transfer_jobs)
​
    # 新增:如果所有请求都已结束,则 flush 所有 pending jobs
    # 这是 PR 的核心变更,确保末步时所有作业都能完成
    if self._req_status and all(
        rs.req.is_finished() for rs in self._req_status.values()
    ):
        self._current_batch_jobs_to_flush.update(self._jobs.keys())
​
    # 构造元数据并清空当前 batch 状态
    meta = OffloadingConnectorMetadata(
        load_jobs=self._current_batch_load_jobs,
        store_jobs=self._build_store_jobs(scheduler_output),
        jobs_to_flush=self._current_batch_jobs_to_flush,
    )
    self._current_batch_load_jobs = {}
    self._current_batch_jobs_to_flush = set()
    return meta

评论区精华

检测末步的条件 正确性

初始使用 if not self._req_status 是 no-op,建议改为迭代 scheduler_output.finished_req_ids

结论:最终实现改为检查 all rs.req.is_finished(),并使用了 orozery 建议的 self._jobs.keys() · 已解决

测试场景的有效性 测试

初始测试模拟了违反类不变量的状态(_req_status 空但 _jobs 非空),建议使用现实场景

结论:测试被更新为基于请求结束状态触发 flush · 已解决

flush set 更新的写法 style

orozery 建议将 self._jobs 改为 self._jobs.keys()

结论:被采纳 · 已解决

风险与影响

核心风险在于 flush 检测条件可能误判:如果 is_finished() 的实现与期望语义不一致,可能导致 flush 过早或过晚。另外,新增的 load 方向 flush 可能引入未预期的 GPU 块释放。但测试覆盖了同步和异步调度两种模式,降低了风险。

影响范围限于 KV offloading 调度器模块,只影响末步的 flush 行为。对正常 step 的调度无影响。用户会观察到 offloading 完成更加及时,尤其在高负载场景避免残留 jobs。

条件检测误判 新测试可能遗漏边界

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论