执行摘要
- 一句话:PD 模块提前中止逻辑清理
- 推荐动作:值得精读以了解 PD 模块的中止流程演进。该 PR 是 PD 代码清理的延续,体现了团队在推动
conclude_state 统一化后的冗余清理工作。
功能与动机
PR body 指出:现在各个后端都已提供 conclude_state 属性,因此可以移除 getattr;此外 prepare_abort 会在内部设置 finished_reason,无需在 abort 路径上预先设置,这样在多 rank 同步完状态后再设置 finished_reason 更安全。
实现拆解
- scheduler.py (
abort_request 方法):在 Decode 模式的 prealloc 和 transfer 队列的中止循环中,删除了调用 decode_req.kv_receiver.abort() 后设置 decode_req.req.finished_reason = FINISH_ABORT() 的两处代码。此修改确保 finished_reason 的统一设置权交给后续的 prepare_abort,规避了多 rank 同步前错误设置状态的风险。
- decode.py (
_update_handshake_waiters 方法):将 getattr(decode_req.kv_receiver, "conclude_state", None) == KVPoll.Failed 直接改为 decode_req.kv_receiver.conclude_state == KVPoll.Failed。由于所有后端均已实现 conclude_state 属性,无需 fallback 默认值。
- 测试与配套:无直接测试变更,所有已有的 PD 相关测试应继续通过(CI 已通过)。
关键文件:
python/sglang/srt/managers/scheduler.py(模块 调度器;类别 source;类型 core-logic): 核心调度器的 abort_request 方法,移除 decode 模式下两处多余的 finished_reason 设置,简化了中止流程。
python/sglang/srt/disaggregation/decode.py(模块 调度器;类别 source;类型 core-logic): 修改 _update_handshake_waiters 中 conclude_state 的访问方式,移除 getattr fallback。
关键符号:未识别
关键源码片段
python/sglang/srt/managers/scheduler.py
核心调度器的 abort_request 方法,移除 decode 模式下两处多余的 finished_reason 设置,简化了中止流程。
# 位于 abort_request 方法的 Decode 分支
# 变更前:
# for decode_req in self.disagg_decode_prealloc_queue.queue:
# if ...:
# decode_req.kv_receiver.abort()
# if not isinstance(decode_req.req.finished_reason, FINISH_ABORT):
# decode_req.req.finished_reason = FINISH_ABORT() # 被删除,由 prepare_abort 统一处理
# 变更后:
for decode_req in self.disagg_decode_prealloc_queue.queue:
if recv_req.abort_all or decode_req.req.rid.startswith(recv_req.rid):
logger.debug(f"Abort prealloc queue request. {decode_req.req.rid=}")
decode_req.kv_receiver.abort()
# finished_reason 不再在此处设置,由后续 prepare_abort 在多 rank 同步后统一设置
# transfer_queue 部分同样改动
python/sglang/srt/disaggregation/decode.py
修改 _update_handshake_waiters 中 conclude_state 的访问方式,移除 getattr fallback。
# _update_handshake_waiters 方法中的条件判断
# 变更前:
# if all(decode_req.waiting_for_input for decode_req in self.queue) and not any(
# getattr(decode_req.kv_receiver, "conclude_state", None) == KVPoll.Failed
# for decode_req in self.queue
# ):
# 变更后:
if all(decode_req.waiting_for_input for decode_req in self.queue) and not any(
decode_req.kv_receiver.conclude_state == KVPoll.Failed
for decode_req in self.queue
):
return # 所有请求均等待输入且无失败状态,跳过本轮 poll
评论区精华
无 review 讨论。PR 作者自行合并,CI passed。
风险与影响
- 风险:风险极低。变更仅移除冗余代码,未引入新逻辑。需确认所有后端确实已提供
conclude_state 属性(PR 作者声明已完备)。若某个后端遗漏,_update_handshake_waiters 中直接访问属性会引发 AttributeError。但 CI 通过说明现有后端均已更新。
- 影响:影响范围小,仅限于 PD(Prefill-Decode)分解模式下 Decode 节点的中止与握手过程。不会影响正常请求路径。
- 风险标记:缺少测试覆盖
关联脉络
参与讨论