Prhub

#23145 integrate streaming session into UnifiedRadixCache

原始 PR 作者 hnyls2002 合并时间 2026-04-20 11:47 文件变更 7 提交数 4 评论 2 代码增减 +269 / -117

执行摘要

将流式会话原生集成到 UnifiedRadixCache,统一缓存管理接口并消除代码重复。

根据 PR body,主要动机是“集成流式会话支持到 UnifiedRadixCache 中,避免外部包装器模式和嵌入式模式之间的代码重复”。通过暴露统一的 try_* 合约,使调用方(如调度器)无需依赖 isinstance 检查,从而简化架构。

建议技术管理者和工程师精读此 PR,重点关注嵌入式 SessionAwareCache 的组合设计模式如何避免代码重复,以及统一接口如何简化调用方逻辑。同时注意递归风险的防范措施。

讨论亮点

Review 中仅有一个来自 bot 的评论,指出本 PR 通过集成 SessionAwareCache 简化了缓存管理逻辑,移除了显式类型检查,且无进一步反馈。无实质性争议或讨论。

实现拆解

  1. 扩展基类接口:在 base_prefix_cache.py 中添加了 supports_streaming_sessionrelease_sessionsession_held_tokens 等默认方法,为所有缓存提供统一接口,避免调用方进行类型判断。
  2. 改造会话感知缓存:在 session_aware_cache.py 中新增 try_inc_lock_reftry_dec_lock_reffind_active_slot 等“尝试处理”方法,支持嵌入式组合;同时调整 match_prefix 逻辑,确保非流式请求直接透传。
  3. 集成到统一缓存:在 unified_radix_cache.py__init__ 中,根据 params.enable_streaming_session 条件创建嵌入式 SessionAwareCache 实例;在 match_prefixinc_lock_refcache_finished_req 等方法中预检查会话状态,避免递归调用。
  4. 更新调用方代码:修改 scheduler.pyscheduler_runtime_checker_mixin.py,移除对 SessionAwareCache 的显式 isinstance 检查,直接调用基类方法;调整 session_controller.py 以使用统一接口释放会话。
文件 模块 状态 重要度
python/sglang/srt/session/session_aware_cache.py 会话缓存 modified 8.65
python/sglang/srt/mem_cache/unified_radix_cache.py 统一缓存 modified 7.96
python/sglang/srt/mem_cache/base_prefix_cache.py 缓存基类 modified 6.45
python/sglang/srt/managers/scheduler_runtime_checker_mixin.py 调度器 modified 5.49
python/sglang/srt/managers/scheduler.py 调度器 modified 5.16
python/sglang/srt/session/session_controller.py 会话控制 modified 4.83

关键符号

supports_streaming_session release_session try_inc_lock_ref try_dec_lock_ref find_active_slot match_prefix inc_lock_ref cache_finished_req

关键源码片段

python/sglang/srt/session/session_aware_cache.py core-logic

核心改造,新增了支持嵌入式组合的 try-handle 方法,并调整了流式会话的状态管理逻辑。

class SessionAwareCache(BasePrefixCache):
    """Adds streaming-session KV save/restore on top of any BasePrefixCache.    Works both as an external wrapper (``SessionAwareCache(RadixCache(...))``)
    and in embedded composition (``SessionAwareCache(inner=self)``). For the
    embedded case, the composing cache must pre-check dispatch conditions
    (``_is_streaming`` / ``find_active_slot`` / ``has_slot``) so the internal
    fall-through to ``self.inner.xxx`` never fires -- otherwise it recurses.
    """
​
    # -- Condition helpers (used by embedded-mode callers for pre-dispatch) --
    def has_slot(self, session_id: str) -> bool:
        return session_id in self.slots # 检查指定会话 ID 是否存在活动槽位
​
    def any_holding_kv(self) -> bool:
        return any(s.is_holding_kv for s in self.slots.values()) # 判断是否有槽位持有 KV 缓存
​
    # -- Try-handle entries for composition (see class docstring) --
    def try_inc_lock_ref(self, node: Any) -> Optional[IncLockRefResult]:
        """No-op lock if ``node`` is a session-internal sentinel; returns
        None to tell the caller to run its raw tree lock path."""
        if isinstance(node, _VirtualNode): # 如果是流式会话内部的虚拟节点,则执行无操作锁
            return IncLockRefResult()
        return None # 返回 None 表示调用方应继续执行原始树锁逻辑
​
    def try_dec_lock_ref(
        self, node: Any, params: Optional[DecLockRefParams] = None
    ) -> Optional[DecLockRefResult]:
        if isinstance(node, _VirtualNode): # 类似地,对虚拟节点执行无操作解锁
            return DecLockRefResult()
        return None
​
    def find_active_slot(self, req: Req) -> Optional[SessionSlot]:
        """Returns an active slot for this req, or None.        Side effect: if req is pre-aborted (to_finish set, e.g. input too
        long), detach it from the session so cache_finished_req treats it
        as a normal req. The slot stays intact for the next request.
        """
        # 如果请求不是流式会话或没有活动槽位,则返回 None;否则处理预中止情况并返回槽位
        if not _is_streaming(req):
            return None
        session_id = req.session.session_id
        slot = self.slots.get(session_id)
        if slot is None or slot.req_pool_idx is None:
            return None
        if req.to_finish is not None:
            req.session.abort_req()
            req.session = None
            return None
        return slot
python/sglang/srt/mem_cache/unified_radix_cache.py entrypoint

集成入口点,在初始化时根据配置嵌入 SessionAwareCache,并调整核心缓存方法以支持流式会话预检查。

class UnifiedRadixCache(BasePrefixCache):
    def __init__(self, params):
        # ... 其他初始化逻辑 ...
        # Streaming session: embedded SessionAwareCache with self as inner.
        # Dispatch methods below pre-check conditions so the session's
        # internal fall-through to self.inner.xxx never fires -- no recursion.
        self.session: Optional[SessionAwareCache] = (
            SessionAwareCache(inner=self) if params.enable_streaming_session else None # 根据参数决定是否创建嵌入式会话缓存
        )
        self.reset()
​
    def match_prefix(self, params: MatchPrefixParams) -> MatchResult:
        if self.session is not None:
            result = self.session.try_match_prefix(params) # 先尝试会话缓存处理
            if result is not None:
                return result # 如果会话缓存处理成功,则直接返回结果
        # 否则执行原始的统一缓存匹配逻辑
        key = params.key
        key, _ = maybe_bigram_convert(self.is_eagle, key)
        if self.disable or len(key) == 0:
            return MatchResult(
                device_indices=torch.empty((0,), dtype=torch.int64, device=self.device),
                last_device_node=self.root_node,
                last_host_node=self.root_node,
            )
        # ... 其余匹配逻辑 ...
​
    def inc_lock_ref(self, node: Any) -> IncLockRefResult:
        if self.session is not None:
            result = self.session.try_inc_lock_ref(node) # 预检查会话锁
            if result is not None:
                return result # 如果是虚拟节点,则返回无操作锁结果
        # 否则执行原始树锁逻辑
        if self.disable:
            return IncLockRefResult()
        result = IncLockRefResult()
        for component in self._components_tuple:
            component.inc_component_lock(node)
        return result

评论区精华

重构简化缓存管理逻辑 设计

Bot 评论指出本 PR 通过集成 SessionAwareCache 简化了逻辑,移除了显式类型检查。

结论:无实质性反馈,变更被认可。 · 已解决

风险与影响

  1. 递归风险:如 PR body 所述,嵌入式组合需确保调用方预检查条件(如 _is_streaming),否则 SessionAwareCache 的内部回退到 self.inner.xxx 可能导致无限递归;关键文件为 unified_radix_cache.py 中的 match_prefix 等方法。
  2. 兼容性风险:新增的默认方法可能影响其他继承 BasePrefixCache 的缓存类型(如 RadixCache),需确认它们的行为一致性;但默认实现为空操作,风险较低。
  3. 性能影响:在缓存核心路径中增加了条件检查(如 if self.session is not None),可能引入轻微开销,但影响有限。
  4. 测试覆盖:上下文未提供测试文件变更,但 PR 作者运行了相关单元测试(如 test_streaming_session_unit.py),暗示已有测试保障。
  1. 对用户:透明无影响,流式会话功能行为不变。
  2. 对系统:统一了缓存管理接口,简化了调度器和会话控制器的逻辑,提升代码可维护性;核心缓存路径增加轻微条件分支,性能影响可忽略。
  3. 对团队:减少了代码重复,使未来扩展流式会话支持更易于实现;需确保开发者理解嵌入式组合模式以避免误用。
递归风险 接口变更 核心路径变更

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论