Prhub

#27011 [Bugfix] Clean up failed NIXL sender state

原始 PR 作者 Ronnie-Rui 合并时间 2026-06-03 12:15 文件变更 2 提交数 3 评论 4 代码增减 +79 / -1

执行摘要

修复 NIXL 发送器失败后房间状态未清理

NIXL传输工作线程失败会被记录并暴露为KVPoll.Failed,但NixlKVSender.failure_exception()在抛出异常前未清理per-room状态,导致请求终止后残留request_status、transfer_infos、req_to_decode_prefix_len、failure_records以及staging prefetch状态。这与Mooncake和Mori的sender失败路径不一致,需要修复。

该PR值得合并,修复了明确的状态泄漏bug。建议后续考虑对staging_ctx加锁或明确其线程安全模型。

讨论亮点

Reviewer ShangmingCai 批准了PR,并提及@YAMY1234 @ishandhanani,表明需要相关开发者关注或测试。无其他讨论。

实现拆解

变更集中在两个文件:

  1. python/sglang/srt/disaggregation/nixl/conn.py

    • 新增clear()方法:先调用父类CommonKVSender.clear()清理通用状态(request_status、transfer_infos等),再针对NIXL staging场景,从kv_mgr._staging_ctx.prefetched_rooms中移除当前room,并从prefetch_requested中删除所有以当前room为key的记录。
    • 重写failure_exception()方法:
      • 先弹出异常和失败原因(加锁保护),标记sender为失败状态(_send_failed = Trueconclude_state = KVPoll.Failed)。
      • 调用self.clear()清理房间状态。
      • 然后按优先顺序抛出异常:_send_error → 弹出的exception → 弹出的failure_reason → 默认RuntimeError。
  2. test/registered/unit/disaggregation/test_nixl_sender_failure_cleanup.py

    • 新增CPU单元测试test_failure_exception_cleans_room_state_before_raising,注册到base-a-test-cpu测试套件。
    • 通过SimpleNamespace模拟kv_mgr和staging上下文,验证:
      • 异常被正确抛出(assertRaises)。
      • sender的_send_failedconclude_state被正确设置。
      • 所有房间相关状态(request_status、req_to_decode_prefix_len、transfer_infos、exceptions、failure_records)被清除。
      • staging prefetch状态中当前room被清除,而不影响其他room。
文件 模块 状态 重要度
python/sglang/srt/disaggregation/nixl/conn.py 传输层 modified 6.91
test/registered/unit/disaggregation/test_nixl_sender_failure_cleanup.py 测试 added 6.49

关键符号

clear failure_exception

关键源码片段

python/sglang/srt/disaggregation/nixl/conn.py core-logic

核心修复文件,新增 clear() 方法并重写 failure_exception(),确保失败时清理 per-room 状态。

# python/sglang/srt/disaggregation/nixl/conn.py
# NixlKVSender 类新增 clear() 方法,重写 failure_exception() 方法def clear(self) -> None:
    # 先调用父类清理通用的 per-room 状态 (request_status, transfer_infos 等 )
    super().clear()
    # 如果启用了 staging,还需要清理 staging prefetch 状态
    if (
        getattr(self.kv_mgr, "enable_staging", False)
        and getattr(self.kv_mgr, "_staging_ctx", None) is not None
    ):
        # 从 prefetched_rooms 集合中移除当前 room
        self.kv_mgr._staging_ctx.prefetched_rooms.discard(self.bootstrap_room)
        # 从 prefetch_requested 集合中过滤掉所有以当前 room 开头的请求
        self.kv_mgr._staging_ctx.prefetch_requested = {
            key
            for key in self.kv_mgr._staging_ctx.prefetch_requested
            if key[0] != self.bootstrap_room
        }def failure_exception(self):
    # 先从 kv_mgr 中弹出异常和失败原因
    exc = self.kv_mgr.exceptions.pop(self.bootstrap_room, None)
    with self.kv_mgr.failure_lock:
        failure_reason = self.kv_mgr.failure_records.pop(self.bootstrap_room, None)
​
    # 标记 sender 为失败状态
    if self.conclude_state is None:
        self.conclude_state = KVPoll.Failed
    self._send_failed = True
​
    # 清理房间状态 —— 先于异常抛出,确保资源释放
    self.clear()
​
    # 按优先顺序抛出异常:自定义错误 > 传输异常 > 失败原因 > 默认 RuntimeError
    if self._send_error is not None:
        raise self._send_error
    if exc is not None:
        raise exc
    if failure_reason is not None:
        raise RuntimeError(failure_reason)
    raise RuntimeError("NIXL KVSender Exception")
test/registered/unit/disaggregation/test_nixl_sender_failure_cleanup.py test-coverage

新增 CPU 单元测试,验证 failure_exception() 中清理行为的正确性,覆盖 staging 场景。

# test/registered/unit/disaggregation/test_nixl_sender_failure_cleanup.py
# CPU 单元测试,注册到 base-a-test-cpu 套件
import threading
import unittest
from types import SimpleNamespacefrom sglang.srt.disaggregation.base.conn import KVPoll
from sglang.srt.disaggregation.nixl.conn import NixlKVSender
from sglang.test.ci.ci_register import register_cpu_ciregister_cpu_ci(est_time=2, suite="base-a-test-cpu")class TestNixlSenderFailureCleanup(unittest.TestCase):
    def test_failure_exception_cleans_room_state_before_raising(self):
        room = 7
        expected_exc = RuntimeError("transfer failed")
        # 直接通过 __new__ 创建 sender 实例,避免初始化复杂依赖
        sender = NixlKVSender.__new__(NixlKVSender)
        sender.bootstrap_room = room
        sender.conclude_state = None
        sender._send_failed = False
        sender._send_error = None
​
        # 构造 staging 上下文,包含当前 room 和其他 room 的数据
        staging_ctx = SimpleNamespace(
            prefetched_rooms={room, 8},
            prefetch_requested={(room, 0, "session-a"), (8, 0, "session-b")},
        )
        # 构造 kv_mgr 模拟器,包含所有需要清理的状态
        sender.kv_mgr = SimpleNamespace(
            enable_staging=True,
            _staging_ctx=staging_ctx,
            request_status={room: object()},
            req_to_decode_prefix_len={room: 3},
            transfer_infos={room: object()},
            exceptions={room: expected_exc},
            failure_records={room: "transfer failed"},
            failure_lock=threading.Lock(),
        )
​
        with self.assertRaises(RuntimeError) as cm:
            sender.failure_exception()
​
        # 验证异常被正确传播
        self.assertIs(cm.exception, expected_exc)
        # 验证 sender 被标记为失败
        self.assertTrue(sender._send_failed)
        self.assertEqual(sender.conclude_state, KVPoll.Failed)
        # 验证房间状态被清理
        self.assertNotIn(room, sender.kv_mgr.request_status)
        self.assertNotIn(room, sender.kv_mgr.req_to_decode_prefix_len)
        self.assertNotIn(room, sender.kv_mgr.transfer_infos)
        self.assertNotIn(room, sender.kv_mgr.exceptions)
        self.assertNotIn(room, sender.kv_mgr.failure_records)
        # 验证 staging 状态中当前 room 被清理,但其他 room 不受影响
        self.assertNotIn(room, staging_ctx.prefetched_rooms)
        self.assertNotIn((room, 0, "session-a"), staging_ctx.prefetch_requested)
        self.assertIn(8, staging_ctx.prefetched_rooms)
        self.assertIn((8, 0, "session-b"), staging_ctx.prefetch_requested)if __name__ == "__main__":
    unittest.main()

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

  1. 回归风险:clear()方法重用了父类清理逻辑,且在failure_exception()中先标记失败再清理,不会影响正常成功路径。
  2. 并发安全:清理failure_records时使用了failure_lock,但clear()中访问staging_ctx时未加锁,若staging_ctx被多个线程并发修改可能引入竞态。建议确认staging_ctx的线程安全设计。
  3. 测试覆盖:单元测试模拟了典型场景,但未覆盖并发场景或staging关闭时的路径(enable_staging=False)。

影响范围限于NIXL传输失败时的资源清理。修复后,失败请求的资源将被及时释放,不会干扰后续请求。对用户透明,系统资源利用率和稳定性提升。

staging_ctx 并发安全需确认

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论