Prhub

#25677 [PD] Clean early abort logic in PD module

原始 PR 作者 ShangmingCai 合并时间 2026-05-20 11:17 文件变更 2 提交数 2 评论 6 代码增减 +1 / -5

执行摘要

PD 模块提前中止逻辑清理

PR body 指出:现在各个后端都已提供 conclude_state 属性,因此可以移除 getattr;此外 prepare_abort 会在内部设置 finished_reason,无需在 abort 路径上预先设置,这样在多 rank 同步完状态后再设置 finished_reason 更安全。

值得精读以了解 PD 模块的中止流程演进。该 PR 是 PD 代码清理的延续,体现了团队在推动 conclude_state 统一化后的冗余清理工作。

讨论亮点

无 review 讨论。PR 作者自行合并,CI passed。

实现拆解

  1. scheduler.py (abort_request 方法):在 Decode 模式的 prealloc 和 transfer 队列的中止循环中,删除了调用 decode_req.kv_receiver.abort() 后设置 decode_req.req.finished_reason = FINISH_ABORT() 的两处代码。此修改确保 finished_reason 的统一设置权交给后续的 prepare_abort,规避了多 rank 同步前错误设置状态的风险。
  2. decode.py (_update_handshake_waiters 方法):将 getattr(decode_req.kv_receiver, "conclude_state", None) == KVPoll.Failed 直接改为 decode_req.kv_receiver.conclude_state == KVPoll.Failed。由于所有后端均已实现 conclude_state 属性,无需 fallback 默认值。
  3. 测试与配套:无直接测试变更,所有已有的 PD 相关测试应继续通过(CI 已通过)。
文件 模块 状态 重要度
python/sglang/srt/managers/scheduler.py 调度器 modified 5.74
python/sglang/srt/disaggregation/decode.py 调度器 modified 4.82

关键源码片段

python/sglang/srt/managers/scheduler.py core-logic

核心调度器的 abort_request 方法,移除 decode 模式下两处多余的 finished_reason 设置,简化了中止流程。

# 位于 abort_request 方法的 Decode 分支
# 变更前:
# for decode_req in self.disagg_decode_prealloc_queue.queue:
# if ...:
# decode_req.kv_receiver.abort()
# if not isinstance(decode_req.req.finished_reason, FINISH_ABORT):
# decode_req.req.finished_reason = FINISH_ABORT() # 被删除,由 prepare_abort 统一处理# 变更后:
for decode_req in self.disagg_decode_prealloc_queue.queue:
    if recv_req.abort_all or decode_req.req.rid.startswith(recv_req.rid):
        logger.debug(f"Abort prealloc queue request. {decode_req.req.rid=}")
        decode_req.kv_receiver.abort()
        # finished_reason 不再在此处设置,由后续 prepare_abort 在多 rank 同步后统一设置# transfer_queue 部分同样改动
python/sglang/srt/disaggregation/decode.py core-logic

修改 _update_handshake_waiters 中 conclude_state 的访问方式,移除 getattr fallback。

# _update_handshake_waiters 方法中的条件判断
# 变更前:
# if all(decode_req.waiting_for_input for decode_req in self.queue) and not any(
# getattr(decode_req.kv_receiver, "conclude_state", None) == KVPoll.Failed
# for decode_req in self.queue
# ):# 变更后:
if all(decode_req.waiting_for_input for decode_req in self.queue) and not any(
    decode_req.kv_receiver.conclude_state == KVPoll.Failed
    for decode_req in self.queue
):
    return # 所有请求均等待输入且无失败状态,跳过本轮 poll

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

风险极低。变更仅移除冗余代码,未引入新逻辑。需确认所有后端确实已提供 conclude_state 属性(PR 作者声明已完备)。若某个后端遗漏,_update_handshake_waiters 中直接访问属性会引发 AttributeError。但 CI 通过说明现有后端均已更新。

影响范围小,仅限于 PD(Prefill-Decode)分解模式下 Decode 节点的中止与握手过程。不会影响正常请求路径。

缺少测试覆盖

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论