Prhub

#27413 Add scripted-runtime unit, core integration, and chunked-prefill tests

原始 PR 作者 fzyzcjy 合并时间 2026-06-06 09:08 文件变更 10 提交数 1 评论 3 代码增减 +1912 / -0

执行摘要

为 scripted-runtime 添加单元 / 集成和 chunked-prefill 测试

从PR标题和内容看,作者(同时也是合并者)旨在为scripted-runtime填补测试空白。SCripted-runtime是sglang测试基础设施的一部分,此前缺乏系统的测试覆盖,导致调度器核心逻辑(如请求生命周期、chunked prefill、暂停恢复等)的稳定性依赖手动验证。本PR通过脚本驱动的方式,将调度器关键路径的验证自动化,防止后续重构引入回归。

本PR值得精读,特别是对sglang测试基础设施感兴趣的团队成员。test_scripted_runtime_core.py展示了如何通过生成器脚本驱动调度器步进测试,这种模式可复用于其他模块的集成测试。test_scripted_core_1gpu.py中的生命周期暂停测试设计精巧,覆盖了pause_generation(mode='retract')后的waiting_queue行为和输出冻结验证。建议所有scripted-runtime的相关修改都运行这些测试以确保不破坏语义。

讨论亮点

无review评论。PR为作者自行合并,未经过公开讨论。

实现拆解

  1. 核心API集成测试test_scripted_runtime_core.py):基于ScriptedTestCase编写9个测试,覆盖自动/显式rid生成、find_req_by_rid命中与缺失、is_finished语义、ReqHandle.req/finished属性、is_chunking状态检测、abort_all终止、finish状态传递等场景,每个测试通过生成器脚本与调度器交互。
  2. Chunked-prefill集成测试test_scripted_core_1gpu.pytest_scripted_core_4gpu.py):验证多chunk预填充的冒烟、不同边界偏移、生命周期各阶段的暂停/retract后恢复、abort_all以及流水线并行下chunk大小组合扫描。其中暂停测试严格检查调度器waiting_queue状态和output_ids冻结。
  3. 辅助组件单元测试test_background_http_poster.py(线程/协程生命周期、异常处理)、test_tokenizer_recv_proxy.py(消息队列/等待/过期处理)、test_http_server.py(执行脚本/回复匹配/超时)、test_scheduler_hook.py(生成器推进结果检测)、test_scripted_runtime_utils.py(函数路径解析)。均使用fake/Mock隔离外部依赖。
  4. 通用测试帮助函数scripted_runtime_chunked_helpers.py):提供run_until_finishedadvance_to_nth_chunkexhaust_row_pool等可复用生成器,封装调度推进逻辑。
  5. CI注册:通过register_cuda_ci/register_cpu_ci将每个测试分配给特定stage和GPU配置,确保并行测试效率。
文件 模块 状态 重要度
test/registered/scripted_runtime/test_scripted_runtime_core.py 运行核心测试 added 7.97
test/registered/chunked_prefill/test_scripted_core_1gpu.py 分块预填充测试 added 7.91
python/sglang/test/scripted_runtime_chunked_helpers.py 测试辅助库 added 7.28
test/registered/unit/scripted_runtime/test_background_http_poster.py HTTP 后台测试 added 7.08
test/registered/unit/scripted_runtime/test_tokenizer_recv_proxy.py 代理接收测试 added 7.02
test/registered/unit/scripted_runtime/test_http_server.py HTTP 服务测试 added 6.94
test/registered/unit/scripted_runtime/test_scheduler_hook.py 调度钩子测试 added 6.56
test/registered/unit/scripted_runtime/test_scripted_runtime_utils.py 工具函数测试 added 6.46
test/registered/chunked_prefill/test_scripted_core_4gpu.py 分块预填充测试 added 6.6
test/registered/chunked_prefill/test_scripted_swa_1gpu.py SWA 分块测试 added 6.59

关键符号

_script_noop start_req find_req_by_rid run_until_finished advance_to_nth_chunk advance_to_lifecycle_stage pause_generation continue_generation abort_all execute_script recv_pyobj wait_until_arrived submit_coro close resolve_fn

关键源码片段

test/registered/scripted_runtime/test_scripted_runtime_core.py test-coverage

核心集成测试,覆盖 scripted-runtime 主要 API 功能。

# test/registered/scripted_runtime/test_scripted_runtime_core.py
# 展示两个典型测试:自动 rid 与显式 ridclass TestScriptedRuntimeCore(ScriptedTestCase):
    ENGINE_KWARGS = _ENGINE_KWARGS
​
    def test_start_req_auto_rid_and_finishes(self):
        # 执行脚本,验证自动生成的 rid 以 "scripted-" 开头且请求最终完成
        self.server.execute_script(self._script_start_req_auto_rid_and_finishes)
​
    @staticmethod
    def _script_start_req_auto_rid_and_finishes(t: ScriptedContext):
        r = t.start_req(prompt_len=_SHORT_PROMPT_LEN, max_new_tokens=4)
        # 验证自动 rid 格式
        assert r.rid.startswith("scripted-"), f"unexpected auto rid {r.rid!r}"
        # 步进直到请求完成
        yield from run_until_finished(r)
        assert r.finished, "auto-rid req did not finish"
​
    def test_start_req_explicit_rid(self):
        self.server.execute_script(self._script_start_req_explicit_rid)
​
    @staticmethod
    def _script_start_req_explicit_rid(t: ScriptedContext):
        # 使用显式 rid 启动请求
        r = t.start_req(
            prompt_len=_SHORT_PROMPT_LEN, max_new_tokens=2, rid="explicit-rid-test"
        )
        # 验证 rid 被正确设置
        assert r.rid == "explicit-rid-test", f"explicit rid not honored: {r.rid!r}"
        yield
        # 一步之后,调度器应能看到该请求
        assert (
            t.find_req_by_rid("explicit-rid-test") is not None
        ), "explicit rid not visible to the scheduler after one step"
test/registered/chunked_prefill/test_scripted_core_1gpu.py test-coverage

单卡 chunked prefill 集成测试,包含边界偏移和生命周期暂停验证。

# test/registered/chunked_prefill/test_scripted_core_1gpu.py
# 展示暂停 / 恢复测试:在生命周期每个阶段执行 pause(retract),验证调度器状态冻结class TestScriptedCore(ScriptedTestCase):
    ENGINE_KWARGS = base_engine_kwargs(chunked_prefill_size=_CHUNK_SIZE)
​
    def test_pause_retract_at_lifecycle_points_then_resume(self):
        # 遍历定义的 5 个生命周期阶段:first_chunk, last_chunk, first_decode, mid_decode, last_decode
        for stage in LIFECYCLE_STAGES:
            with self.subTest(stage=stage):
                self.server.execute_script(
                    self._script_pause_retract_at_stage,
                    args=(stage,),
                )
​
    @staticmethod
    def _script_pause_retract_at_stage(t: ScriptedContext, stage: str):
        r = t.start_req(
            prompt_len=_PROMPT_LEN, max_new_tokens=_LIFECYCLE_MAX_NEW_TOKENS
        )
        # 步进到指定生命周期阶段
        yield from _advance_to_stage(r, stage)
        assert r.req is not None, f"stage={stage}: req vanished before pause"
​
        # 执行 pause(retract) —— 请求应回退到 waiting_queue
        t.pause_generation(mode="retract")
        yield
​
        # 如果请求尚未结束(最后解码阶段可能立即完成),验证调度器状态
        if not r.finished:
            req = r.req
            # 必须回到 waiting_queue
            assert req is not None and req in t.scheduler.waiting_queue, (
                f"stage={stage}: pause(retract) should park the req back in waiting_queue"
            )
            output_tokens_after_pause = len(req.output_ids)
            # 再推进 3 步,输出 tokens 不得增加
            for _ in range(3):
                yield
                req = r.req
                assert (
                    req is not None and len(req.output_ids) == output_tokens_after_pause
                ), (
                    f"stage={stage}: paused engine advanced the req "
                    f"({len(req.output_ids) if req is not None else None} output tokens, expected {output_tokens_after_pause})"
                )
​
        # 恢复生成,最终完成
        t.continue_generation()
        yield from run_until_finished(r)
        assert r.finished, f"stage={stage}: req did not finish after pause/continue"
python/sglang/test/scripted_runtime_chunked_helpers.py test-coverage

通用测试辅助函数,被所有集成测试使用。

# python/sglang/test/scripted_runtime_chunked_helpers.py
# 核心辅助函数:run_until 和 run_until_finished 驱动调度器步进直到条件满足from __future__ import annotations
from typing import Any, Dict, ListDEFAULT_CHUNK_SIZE: int = 256
DEFAULT_MAX_STEPS: int = 400
SMALL_MODEL: str = "Qwen/Qwen3-0.6B"def base_engine_kwargs(
    *,
    model_path: str = SMALL_MODEL,
    chunked_prefill_size: int = DEFAULT_CHUNK_SIZE,
    **overrides: Any,
) -> Dict[str, Any]:
    """生成引擎参数字典,默认使用小模型和指定chunk大小。"""
    kwargs: Dict[str, Any] = dict(
        model_path=model_path,
        chunked_prefill_size=chunked_prefill_size,
    )
    kwargs.update(overrides)
    return kwargsdef run_until(handle, predicate, *, max_steps: int = DEFAULT_MAX_STEPS):
    """生成器:每次yield后检查predicate(handle),满足则返回,否则继续直到max_steps后断言失败。"""
    for _ in range(max_steps):
        if predicate(handle):
            return
        yield
    raise AssertionError(
        f"run_until: predicate never satisfied after {max_steps} steps "
        f"(handle rid={handle.rid!r}, finished={handle.finished})"
    )def run_until_finished(handle, *, max_steps: int = DEFAULT_MAX_STEPS):
    """等待单个请求完成。"""
    yield from run_until(handle, lambda h: h.finished, max_steps=max_steps)def run_until_all_finished(handles: List[Any], *, max_steps: int = DEFAULT_MAX_STEPS):
    """等待多个请求全部完成。"""
    for _ in range(max_steps):
        if all(h.finished for h in handles):
            return
        yield
    raise AssertionError(
        f"run_until_all_finished: not all reqs finished after {max_steps} steps "
        f"(finished={[h.finished for h in handles]})"
    )
# ... 其他函数如 warmup_radix, exhaust_row_pool, advance_to_nth_chunk 等

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

低风险。所有变更均为测试代码,未修改生产源码。潜在风险包括:(1)测试可能因环境依赖(如GPU型号、CUDA版本)产生不稳定假阴性,尤其是依赖特定显存大小的exhaust_row_pooltest_scripted_core_4gpu.py;(2)新增CI测试约1550秒的时间开销,可能延长CI pipeline;(3)部分测试(如test_scripted_runtime_core.py中FINISH_ABORT的引入)需要确保与最新调度器行为一致,若调度器内部逻辑变化需同步更新测试。

  • 用户:无直接影响,用户不运行这些测试。
  • 系统:CI流水线新增约10个测试用例,总执行时间约26分钟(CUDA + CPU),但分布在多个stage和runner上,不会阻塞关键路径。
  • 团队:开发者修改scripted-runtime或调度器相关代码时,可通过这些测试快速验证正确性,降低手动测试成本。测试本身也是文档,展示了API的预期用法和边界条件。
新增测试可能影响 CI 稳定性 部分测试依赖具体 GPU 显存容量 测试与调度器行为耦合,需同步更新

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论