Prhub

#24229 [diffusion] chore: clean scheduler

原始 PR 作者 mickqian 合并时间 2026-05-02 09:30 文件变更 2 提交数 1 评论 2 代码增减 +70 / -46

执行摘要

重构 Multimodal 调度器请求分发与 warmup 日志逻辑,拆分辅助方法

标题'chore: clean scheduler'表明这是一次代码清理,旨在简化请求处理逻辑,消除重复,降低后续维护成本。无关联Issue但有内部重构需求。

建议阅读以了解Multimodal调度器的请求处理架构。_first_generation_req_dispatch_request 的设计模式可以复用。但PR缺少单元测试,建议在后续合并前补充对辅助方法的测试,尤其是边界情况。

讨论亮点

代码评论(gemini-code-assist[bot]):建议将 _log_warmup_result 中的日志逻辑进一步合并,先构造公共前缀(含(processed/total)信息),再根据错误与否决定输出的文本,以减少代码重复。评论已验证但未被采纳,属于低优先级的代码风格优化。

实现拆解

  1. 归一化请求格式:在 scheduler.py 新增 _normalize_generation_reqs 静态方法,将 _handle_generation 中的嵌套列表展开逻辑独立,使类型提示更精确。
  2. 提取首请求辅助函数:新增 _first_generation_req_is_warmup_item 方法,统一获取第一个请求并判断是否warmup,替代原先 isinstancereq.is_warmup 的分散写法。
  3. 统一请求分发:新增 _dispatch_request 方法,根据请求类型查找注册的 handler,替代原本 if isinstance(req, list) 的硬编码分派,并移除 request_handlers 中多余的 list -> _handle_generation 映射。
  4. Warmup日志封装:新增 _log_warmup_result 方法,集中管理warmup成功/失败的日志输出,避免在多个位置重复构造字符串。
  5. GPUWorker注释增强:在 gpu_worker.pyexecute_forward_execute_forward_common 中补充注释,说明分组前向、disaggregation返回机制以及文件路径优化行为,提升代码可读性。
文件 模块 状态 重要度
python/sglang/multimodal_gen/runtime/managers/scheduler.py 调度器 modified 8.77
python/sglang/multimodal_gen/runtime/managers/gpu_worker.py 工作节点 modified 4.3

关键符号

_normalize_generation_reqs _first_generation_req _is_warmup_item _dispatch_request _log_warmup_result _handle_generation

关键源码片段

python/sglang/multimodal_gen/runtime/managers/scheduler.py core-logic

核心文件:拆分了 _handle_generation,新增 5 个辅助方法,移除 1 个 handler 映射,重构主事件循环中的 warmup 处理,变更量最大(+63/-45)。

# python/sglang/multimodal_gen/runtime/managers/scheduler.py@staticmethod
def _normalize_generation_reqs(reqs: list[Any]) -> list[Req]:
    """将可能嵌套的请求列表展平为 req 列表"""
    if len(reqs) == 1 and isinstance(reqs[0], list):
        return reqs[0]
    return reqs@staticmethod
def _first_generation_req(req_or_group: Any) -> Req | None:
    """从单个请求或列表中提取第一个 Req 对象"""
    if isinstance(req_or_group, Req):
        return req_or_group
    if isinstance(req_or_group, list) and req_or_group:
        first_req = req_or_group[0]
        if isinstance(first_req, Req):
            return first_req
    return None@classmethod
def _is_warmup_item(cls, req_or_group: Any) -> bool:
    """判断请求或其第一个元素是否为 warmup 请求"""
    req = cls._first_generation_req(req_or_group)
    return req.is_warmup if req is not None else Falsedef _dispatch_request(self, reqs: list[Any]) -> OutputBatch:
    """根据第一个请求的类型分发到对应的 handler"""
    req_or_group = reqs[0]
    # 如果第一个元素是 list,则表示批量生成请求
    if isinstance(req_or_group, list):
        return self._handle_generation(reqs)
    handler = self.request_handlers.get(type(req_or_group))
    if handler is None:
        return OutputBatch(error=f"Unknown request type: {type(req_or_group)}")
    return handler(reqs)def _log_warmup_result(self, output_batch: OutputBatch, is_warmup: bool) -> None:
    """统一记录 warmup 请求的结果(成功/失败)"""
    if not is_warmup:
        return
    if output_batch.error is None:
        if self._warmup_total > 0:
            logger.info(
                f"Warmup req ({self._warmup_processed}/{self._warmup_total}) processed in {GREEN}%.2f{RESET} seconds",
                output_batch.metrics.total_duration_s,
            )
        else:
            logger.info(
                f"Warmup req processed in {GREEN}%.2f{RESET} seconds",
                output_batch.metrics.total_duration_s,
            )
    else:
        if self._warmup_total > 0:
            logger.info(f"Warmup req ({self._warmup_processed}/{self._warmup_total}) processing failed")
        else:
            logger.info("Warmup req processing failed")def _handle_generation(self, reqs: list[Any]):
    """处理生成请求(包含 warmup 识别与分发)"""
    reqs = self._normalize_generation_reqs(reqs)
    warmup_reqs = [req for req in reqs if req.is_warmup]
    if warmup_reqs:
        self._warmup_processed += len(warmup_reqs)
        if self._warmup_total > 0:
            logger.info(
                f"Processing warmup req... ({self._warmup_processed}/{self._warmup_total})"
            )
        else:
            logger.info("Processing warmup req...")
    req = reqs[0]
    req.trace_ctx.rebuild_thread_context()
    with trace_slice(
        req.trace_ctx,
        DiffStage.SCHEDULER_DISPATCH,
        thread_finish_flag=True,
    ):
        return self.worker.execute_forward(reqs)

评论区精华

简化 _log_warmup_result 的日志逻辑 设计

gemini-code-assist[bot] 建议在 _log_warmup_result 中先构造公共前缀(包含 (processed/total) 信息),再根据 error 是否为 None 输出不同文本,以减少 if-else 分支。评审者提供了具体的代码示例。

结论:当前实现保留了四分支,未采纳简化建议。可能是考虑到日志格式清晰度而保留原样。 · unresolved

风险与影响

低风险重构:改动了调度器的核心请求处理路径,若辅助方法存在边界情况(如空列表、嵌套列表深度)可能暴露之前隐藏的错误。由于未增加新测试,现有测试覆盖可能不足,需依赖集成测试。_dispatch_request 移除了对 list 类型的 handler 注册,万一有其他位置的代码直接给 list 类型请求可能走错分支。

影响范围:仅影响 sglang/multimodal_gen 内部的调度器与worker交互逻辑;对外部API和行为无可见变化。团队影响:代码更模块化,后续维护和扩展更容易;但没有测试配套,需要开发者自行关注正确性。

核心路径变更 缺少测试覆盖

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论