执行摘要
将队列负载查询逻辑迁移到独立组件
继续执行调度器重构链,将队列负载度量报告职责从 SchedulerMetricsMixin 移至独立组件 SchedulerLoadInquirer,以实现关注点分离和模块化,为后续简化调度器提供基础。
值得精读,以理解如何将静态辅助方法从混入类迁移到组合组件,同时保持调用方最小改动。该模式可推广到其他混入类。
该 PR 无人工审核评论,仅有一条机器人生成的每日配额警告。变更本身是机械性移动,未产生设计争议。
继续执行调度器重构链,将队列负载度量报告职责从 SchedulerMetricsMixin 移至独立组件 SchedulerLoadInquirer,以实现关注点分离和模块化,为后续简化调度器提供基础。
值得精读,以理解如何将静态辅助方法从混入类迁移到组合组件,同时保持调用方最小改动。该模式可推广到其他混入类。
该 PR 无人工审核评论,仅有一条机器人生成的每日配额警告。变更本身是机械性移动,未产生设计争议。
load_inquirer.py 中新增方法:将 _get_num_pending_tokens 和 get_loads 从 SchedulerMetricsMixin 复制到 SchedulerLoadInquirer 类,移除 @staticmethod 装饰器,将 self: 'SchedulerLoadInquirer' 类型注解简化为 self,并添加必要的导入(io_struct 类型等)。scheduler_metrics_mixin.py 中删除方法及相关导入:删除这两个方法及其所有关联代码,同时移除不再需要的导入(如 GetLoadsReqInput、GetLoadsReqOutput 以及类型检查块中的 SchedulerLoadInquirer 引用)。scheduler.py 中的调用:在 init_request_dispatcher 的 RPC 分发 lambda 中,将 lambda req: self.get_loads(self.load_inquirer, req) 改为 lambda req: self.load_inquirer.get_loads(req);在 _get_new_batch_prefill_raw 中,将 self._get_num_pending_tokens(self.load_inquirer, ...) 改为 self.load_inquirer._get_num_pending_tokens(...)。scheduler_output_processor_mixin.py 中的调用:在 stream_output_generation 中,将 self.get_loads(self.load_inquirer, GetLoadsReqInput(...)) 改为 self.load_inquirer.get_loads(GetLoadsReqInput(...))。| 文件 | 模块 | 状态 | 重要度 |
|---|---|---|---|
python/sglang/srt/managers/scheduler_components/load_inquirer.py |
调度器组件 | modified | 7.98 |
python/sglang/srt/observability/scheduler_metrics_mixin.py |
可观测性 | modified | 8.0 |
python/sglang/srt/managers/scheduler.py |
调度器 | modified | 5.34 |
python/sglang/srt/managers/scheduler_output_processor_mixin.py |
调度器 | modified | 5.2 |
python/sglang/srt/managers/scheduler_components/load_inquirer.py
dependency-wiring
接收了两个核心方法,新增了大量导入,是本次移动的目标组件。
# python/sglang/srt/managers/scheduler_components/load_inquirer.py
# 新增的方法:计算待填充 token 数量
def _get_num_pending_tokens(self, chunk_deduct: int = 0) -> int:
"""Get the total number of tokens pending prefill.
This includes tokens from waiting queue requests plus remaining tokens
from the currently chunked request.
Args:
chunk_deduct: extra tokens to subtract from the chunked request's
remaining count. At batch-scheduling time the current chunk
has been planned but ``prefix_indices`` does not yet include it,
so callers pass ``extend_input_len`` here. At load-reporting
time ``prefix_indices`` is already up-to-date, so the default
0 is correct.
"""
# 等待队列中所有请求的序列长度之和
num_pending_tokens = sum(req.seqlen for req in self.get_waiting_queue())
# 如果存在分块请求,则加上未处理的 token 数
if self.get_chunked_req() is not None:
req = self.get_chunked_req()
num_pending_tokens += req.seqlen - len(req.prefix_indices) - chunk_deduct
return num_pending_tokens
# 新增的方法:获取综合负载度量(用于 /v1/loads 端点)
def get_loads(self, req: GetLoadsReqInput = None) -> GetLoadsReqOutput:
# 默认只包含核心字段
if req is None:
req = GetLoadsReqInput()
include = set(req.include) if req.include else {"core"}
include_all = "all" in include
num_running_reqs = len(self.get_running_batch().reqs)
# 根据分离模式收集不同的等待队列
waiting_queues = [self.get_waiting_queue()]
if self.disaggregation_mode == DisaggregationMode.PREFILL:
waiting_queues.append(self.get_disagg_prefill_bootstrap_queue().queue)
elif self.disaggregation_mode == DisaggregationMode.DECODE:
waiting_queues.append(self.get_disagg_decode_prealloc_queue().queue)
waiting_queues.append(self.get_disagg_decode_transfer_queue().queue)
waiting_queues.append(
self.get_disagg_decode_prealloc_queue().retracted_queue
)
num_waiting_reqs = sum(len(queue) for queue in waiting_queues)
# 获取 token 池统计
num_used_tokens, kv_token_usage = (
self.pool_stats_observer.get_pool_stats().get_kv_token_stats()
)
num_total_tokens = num_used_tokens + sum(
req.seqlen for queue in waiting_queues for req in queue
)
# ... 构建 GetLoadsReqOutput ...
当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。
变更纯属重构,逻辑零改动,但存在以下风险:
load_inquirer.py 新增了对 io_struct 多个类的导入,若遗漏可能导致运行时 NameError。scheduler.py 和 scheduler_output_processor_mixin.py 中的调用方式改变,若出现拼写错误或参数错位会导致崩溃。用户/系统:无功能影响,/v1/loads 端点和调度逻辑行为完全一致。
开发者:改善了调度器模块化,未来维护负载查询逻辑只需关注 SchedulerLoadInquirer,无需修改混入类。
团队:延续了大规模重构链,需确保后续 PR 兼容。
当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。
参与讨论