执行摘要
- 一句话:创建 SchedulerInvariantChecker 组件以持有不变量检查状态
- 推荐动作:建议阅读以理解大型类拆分的渐进式重构模式。这一模式(先引入状态持有组件、再迁移方法体)值得在类似场景中复现。特别关注
SchedulerInvariantChecker 的数据类设计以及调用点的统一更新方式。
功能与动机
PR body 明确说明这是 introduce-invariant-checker 移动的 inplace prep,目的是将不变量检查状态集中到独立组件中,从而解耦调度器核心逻辑。
实现拆解
-
创建 SchedulerInvariantChecker 数据类(python/sglang/srt/managers/scheduler_components/invariant_checker.py):定义了一个 @dataclass(kw_only=True, slots=True) 类,包含调度器配置、tree cache、pool allocator、Callable getter(get_last_batch/get_running_batch)以及两个计数器字段。该组件负责维护不变量检查所需的所有状态。
-
重构 SchedulerRuntimeCheckerMixin(python/sglang/srt/managers/scheduler_runtime_checker_mixin.py):将 _check_full_pool、_check_swa_pool、_check_mamba_pool、_get_total_uncached_sizes、self_check_during_busy、_check_req_pool、_report_leak、_check_tree_cache 等方法从 self: Scheduler 实例方法改为 @staticmethod,且 self 参数类型变为 SchedulerInvariantChecker。方法体中对 Scheduler 运行时状态的读取(如 self.last_batch)改为通过传入的 Callable 属性间接访问,解除了对 Scheduler 实例的直接依赖。
-
在 Scheduler.init 中实例化 SchedulerInvariantChecker(python/sglang/srt/managers/scheduler.py):在 self.pool_stats_observer 初始化之后构造 self.invariant_checker,传入所有必要字段和两个 lambda 包装的 getter。同时更新所有调用点:将 self._check_all_pools(...) 改为 self._check_all_pools(self.invariant_checker, ...),self._report_leak(...) 改为 self._report_leak(self.invariant_checker, ...),self._check_req_pool() 改为 self._check_req_pool(self.invariant_checker),self._check_tree_cache() 改为 self._check_tree_cache(self.invariant_checker),以及 self.self_check_during_busy() 改为 self.self_check_during_busy(self.invariant_checker)。此外,create_scheduler_watchdog 中的调用也同步更新。
-
配套导入更新:在 mixin 和 scheduler 中添加了对 SchedulerInvariantChecker 的导入(TYPE_CHECKING 块或顶层),确保类型检查无误。
-
测试配套:本次变更未添加新测试文件,但现有测试通过依赖注入模式得以继续工作(因为运行时行为未变)。
关键文件:
python/sglang/srt/managers/scheduler_components/invariant_checker.py(模块 调度器;类别 source;类型 core-logic;符号 SchedulerInvariantChecker): 新组件:容纳不变量检查所需的所有状态,作为后续迁移的目标容器。
python/sglang/srt/managers/scheduler_runtime_checker_mixin.py(模块 调度器;类别 source;类型 core-logic;符号 _check_mamba_pool, _get_total_uncached_sizes, self_check_during_busy, _check_req_pool): 核心变更:mixin 中的方法被重构为静态方法,为后续物理移动到 invariant_checker 做准备。
python/sglang/srt/managers/scheduler.py(模块 调度器;类别 source;类型 dependency-wiring): 导入并实例化 SchedulerInvariantChecker,更新所有调用点。
关键符号:SchedulerInvariantChecker.init, SchedulerRuntimeCheckerMixin._check_full_pool, SchedulerRuntimeCheckerMixin._check_swa_pool, SchedulerRuntimeCheckerMixin._check_mamba_pool, SchedulerRuntimeCheckerMixin._get_total_uncached_sizes, SchedulerRuntimeCheckerMixin.self_check_during_busy, SchedulerRuntimeCheckerMixin._check_req_pool, SchedulerRuntimeCheckerMixin._report_leak, SchedulerRuntimeCheckerMixin._check_tree_cache
关键源码片段
python/sglang/srt/managers/scheduler_components/invariant_checker.py
新组件:容纳不变量检查所需的所有状态,作为后续迁移的目标容器。
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Callable, Optional
from sglang.srt.disaggregation.utils import DisaggregationMode
from sglang.srt.managers.scheduler_components.pool_stats_observer import (
SchedulerPoolStatsObserver,
)
from sglang.srt.mem_cache.allocator import BaseTokenToKVPoolAllocator
from sglang.srt.mem_cache.base_prefix_cache import BasePrefixCache
from sglang.srt.mem_cache.memory_pool import ReqToTokenPool
from sglang.srt.server_args import ServerArgs
logger = logging.getLogger(__name__)
@dataclass(kw_only=True, slots=True)
class SchedulerInvariantChecker:
# 静态配置字段:直接从 Scheduler 构造函数中拷贝
is_hybrid_swa: bool
is_hybrid_ssm: bool
disaggregation_mode: DisaggregationMode
page_size: int
full_tokens_per_layer: Optional[int]
swa_tokens_per_layer: Optional[int]
max_total_num_tokens: int
server_args: ServerArgs
# 共享依赖对象:通过依赖注入传入的已有组件
tree_cache: BasePrefixCache
token_to_kv_pool_allocator: BaseTokenToKVPoolAllocator
req_to_token_pool: ReqToTokenPool
pool_stats_observer: SchedulerPoolStatsObserver
# Callable getter:用于访问 Scheduler 运行时的可变状态
# 通过 lambda 从 Scheduler 转为 getter,解除了对 Scheduler 类的直接引用
get_last_batch: Callable
get_running_batch: Callable
# 可变状态:不变量检查中用到的计数器,初始为 0
count_req_pool_leak_warnings: int = 0
count_memory_leak_warnings: int = 0
评论区精华
Review 评论包含两个主要关注点:
风险与影响
- 风险:
- 类型导入缺失:PoolStats 类型未在
scheduler_runtime_checker_mixin.py 的 TYPE_CHECKING 块中导入,可能导致静态分析警告或潜在的类型检查错误。
- 空值风险:
_get_total_uncached_sizes 中假设 self.get_last_batch() 不会返回 None,虽然当前调用时机避开了这一点,但将 getter 通过 lambda 传入后无法保证调用时 last_batch 非空,存在潜在的 AttributeError 风险。
- 运行时开销:
SchedulerInvariantChecker 存储的 Callable getter 每次调用都会触发 lambda 间接调用,频率较高(每次 idle 检查都会使用),可能带来微小性能损耗。
- 影响:影响范围限定在
sglang/srt 调度器模块内部。对外部 API(REST 接口、模型推理)无直接用户可见影响。对内部开发影响显著:SchedulerRuntimeCheckerMixin 中的方法不再直接访问 Scheduler 实例,而是通过 SchedulerInvariantChecker 对象间接访问,开发者需要理解这种新的依赖注入模式。后续迁移完成后,SchedulerInvariantChecker 将完全独立,使得调度器核心类更加精简。
- 风险标记:类型导入缺失, get_last_batch 空值风险, Callable getter 性能开销
关联脉络
- PR #25624 Move invariant checks to SchedulerInvariantChecker and retire runtime_checker mixin: 直接后续 PR,将不变量检查方法体从 mixin 物理移动到 SchedulerInvariantChecker 组件,并移除旧的 SchedulerRuntimeCheckerMixin。
参与讨论