Prhub

#27411 Add scripted-runtime harness core and wire scheduler/IPC hooks

原始 PR 作者 fzyzcjy 合并时间 2026-06-06 09:07 文件变更 24 提交数 1 评论 1 代码增减 +1313 / -4

执行摘要

新增 scripted-runtime 测试框架核心与调度器 IPC 钩子

为支持可脚本化的运行时测试框架(scripted-runtime),需要构建核心调度器钩子和 IPC 通信层,以允许外部脚本控制调度器的行为(如启停请求、检查状态),从而实现自动化、确定性测试。(来自 PR 标题及系列先决 PR 的上下文)

值得对 scripted-runtime 感兴趣或有复杂调度测试需求的工程师阅读,尤其 ScriptedSchedulerHook 的 IPC 分发和 ScriptedHttpServer 的生命周期管理设计。

讨论亮点

无 review 评论,PR 由作者自行合并。

实现拆解

  1. 新增 Scheduler Hook:在 python/sglang/test/scripted_runtime/scheduler_hook.py 中定义 ScriptedSchedulerHook,通过 _run_dispatch_loop 监听 ZMQ 消息,收到 RunScript 命令后重置引擎状态并执行用户脚本的生成器。
  2. 暴露 Context API:在 context/api.py 中提供 ScriptedContext,封装 start_reqpause_generationabort 等操作,内部委托给 lifecyclequeriesengine 子模块。
  3. 构建 HTTP Server 包装ScriptedHttpServer 类封装 SGLang HTTP 服务器的启动、心跳等待、脚本执行和优雅关闭,使用 ZMQ PAIR 与 hook 通信。
  4. 连接调度器:修改 scheduler.py 新增 maybe_init_scripted_scheduler_hook 函数,在调度器初始化时按 SGLANG_TEST_SCRIPTED_RUNTIME_IPC_ADDR 环境变量判断是否创建 hook 并注入。
  5. 编写支持模块:包括 BackgroundHttpPoster(异步 POST)、ScriptedTokenizerRecvProxy(代理 token receipt)、io_struct(IPC 消息定义)、req_handlecontext/lifecycle.pycontext/queries.pycontext/radix.pycontext/req_starter.py 等。
文件 模块 状态 重要度
python/sglang/test/scripted_runtime/scheduler_hook.py 调度器钩子 added 8.36
python/sglang/test/scripted_runtime/context/api.py API 层 added 8.07
python/sglang/test/scripted_runtime/http_server.py HTTP 服务器 added 8.05
python/sglang/srt/managers/scheduler.py 调度器入口 modified 7.03
python/sglang/test/scripted_runtime/context/queries.py 查询模块 added 7.52
python/sglang/test/scripted_runtime/context/lifecycle.py 生命周期 added 7.35
python/sglang/test/scripted_runtime/background_http_poster.py 后台 HTTP added 7.32

关键符号

ScriptedSchedulerHook ScriptedContext ScriptedHttpServer maybe_init_scripted_scheduler_hook ScriptedContext.start_req ScriptedSchedulerHook.on_run_batch

关键源码片段

python/sglang/test/scripted_runtime/context/api.py test-coverage

用户可见 API 层:ScriptedContext 提供 start_req、pause_generation、abort、flush_cache 等高层操作,并整合查询和生命周期子模块。

# python/sglang/test/scripted_runtime/context/api.pyfrom __future__ import annotations
import logging
from typing import TYPE_CHECKING, Dict, List, Literal, Optionalfrom sglang.test.scripted_runtime.context import (
    engine,
    lifecycle,
    queries,
    radix,
)
from sglang.test.scripted_runtime.context.req_starter import ScriptedContextReqStarterif TYPE_CHECKING:
    from sglang.srt.managers.schedule_batch import Req
    from sglang.test.scripted_runtime.background_http_poster import BackgroundHttpPoster
    from sglang.test.scripted_runtime.req_handle import ScriptedReqHandle
    from sglang.test.scripted_runtime.scheduler_hook import ScriptedSchedulerHook
    from sglang.test.scripted_runtime.tokenizer_recv_proxy import ScriptedTokenizerRecvProxylogger = logging.getLogger(__name__)class ScriptedContext:
    """测试脚本上下文:提供对调度器的控制与查询接口。"""
​
    def __init__(
        self,
        *,
        scheduler_hook: "ScriptedSchedulerHook",
        tokenizer_recv_proxy: Optional["ScriptedTokenizerRecvProxy"],
        http_poster: "BackgroundHttpPoster",
    ) -> None:
        # 只能在 driver rank 上构造
        assert scheduler_hook._is_driver, "ScriptedContext only exists on the driver rank"
        self.scheduler = scheduler_hook.scheduler
        self._scheduler_hook = scheduler_hook
        self._tokenizer_recv_proxy = tokenizer_recv_proxy
        self._http_poster = http_poster
​
        self._seen_rids: set[str] = set() # 记录所有见过的 rid,用于判断已结束的请求
        self._req_starter = ScriptedContextReqStarter(self) # 请求启动器(负责实际 HTTP 调用)
​
    def start_req(
        self,
        *,
        prompt_len: int,
        max_new_tokens: int = 8,
        rid: Optional[str] = None,
        ignore_eos: bool = False,
        priority: Optional[int] = None,
        dp_rank: Optional[int] = None,
        prompt_token: int = 1,
        return_logprob: bool = False,
        logprob_start_len: Optional[int] = None,
        top_logprobs_num: Optional[int] = None,
        lora_path: Optional[str] = None,
    ) -> "ScriptedReqHandle":
        """启动一个新的请求,返回句柄用于后续控制。"""
        return self._req_starter.start_req(
            prompt_len=prompt_len,
            max_new_tokens=max_new_tokens,
            rid=rid,
            ignore_eos=ignore_eos,
            priority=priority,
            dp_rank=dp_rank,
            prompt_token=prompt_token,
            return_logprob=return_logprob,
            logprob_start_len=logprob_start_len,
            top_logprobs_num=top_logprobs_num,
            lora_path=lora_path,
        )
​
    def pause_generation(self, *, mode: Literal["retract", "in_place"]) -> None:
        return lifecycle.pause_generation(self, mode=mode)
​
    def continue_generation(self, *, torch_empty_cache: bool = False) -> None:
        return lifecycle.continue_generation(self, torch_empty_cache=torch_empty_cache)
​
    def abort_all(self) -> None:
        return lifecycle.abort_all(self)
​
    def abort(self, handle: "ScriptedReqHandle") -> None:
        return lifecycle.abort(self, rid=handle.rid)
​
    def flush_cache(self) -> None:
        return lifecycle.flush_cache(self)
​
    # ... 其他方法(evict_radix, get_all_node_hit_counts, 属性等)

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

scheduler.py 中新增的条件钩子初始化路径可能干扰原有调度流程,但钩子仅在测试环境通过 SGLANG_TEST_SCRIPTED_RUNTIME_IPC_ADDR 环境变量激活;新增的 IPC 通信依赖 ZMQ,若端口冲突或连接超时可能导致测试孤悬。

影响范围限于测试框架使用者:提供了一套完整的脚本化测试 API 和 HTTP 服务器包装。对生产调度器无影响(条件初始化)。团队可以获得更可靠的自动化回归测试能力。

新增调度器条件钩子 依赖 ZMQ IPC 通信

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论