Prhub

#26816 Add the KV-canary perturb framework for fault-injection self-tests

原始 PR 作者 fzyzcjy 合并时间 2026-05-31 09:58 文件变更 9 提交数 1 评论 1 代码增减 +331 / -2

执行摘要

引入 KV-canary 扰动框架用于故障注入自测试

通过注入可控的 KV 损坏,验证 KV-canary 的校验机制确实能够检测到异常,提升系统的可测试性和可靠性。

该 PR 是 KV-canary 测试框架的重要基础,可精读以理解设计意图。重点关注 WarmupGate 和配置解析模式,未来扩展其他扰动类型时可以参考。

讨论亮点

本 PR 无 review 评论,由作者自行合并。

实现拆解

  1. 配置层 (perturb/config.py):定义 TargetGroupKind 枚举(FULL/SWA)和 PerturbConfig 冻结数据类,通过 from_env 类方法从环境变量 SGLANG_KV_CANARY_PERTURB_TARGET_GROUPSGLANG_KV_CANARY_PERTURB_WARMUP_STEPS 解析。解析函数进行大小写不敏感检查并严格拒绝非法值。
  2. 工具模块 (perturb/utils.py):实现 WarmupGate 类,根据全局步数判断是否处于预热窗口(前 N 步不扰动),并在首次进入/退出时记录日志。提供 should_run_perturbation 函数检查概率和前置条件,以及 pick_target_group 函数根据目标类型从 buffer groups 中随机选取一个目标。
  3. 管理器 (perturb/manager.py):PerturbManager 类聚合配置、buffer groups、预热门控,并预留 perturb 方法(当前为空实现),后续具体扰动子类将重写。
  4. 集成:在 canary_manager.py__init__ 中创建 PerturbManager 实例,在 _pre_ops_outside_graph 中调用 perturb,在 attach_radix_cache 中传递 radix cache;在 api.pyinstall_canary 入口创建 PerturbConfig 并传递给 CanaryManager;在 environ.py 注册两个新的环境变量。
  5. 测试配套test_self_unit_perturb.py 覆盖配置解析的各种边界情况(有效值、非法值、缺失环境变量);runner_test_base.py 添加 make_perturb_config 工厂方法;violation_assert_mixin.py 扩展断言辅助。
文件 模块 状态 重要度
python/sglang/srt/kv_canary/perturb/utils.py 扰动框架 added 8.87
python/sglang/srt/kv_canary/perturb/config.py 配置 added 8.65
python/sglang/srt/kv_canary/perturb/manager.py 管理器 added 8.45
test/registered/kv_canary/test_self_unit_perturb.py 单元测试 added 7.45
python/sglang/srt/kv_canary/runner/canary_manager.py 管理器集成 modified 6.03
python/sglang/srt/kv_canary/api.py 入口 modified 5.94
python/sglang/srt/environ.py 环境配置 modified 4.58
python/sglang/test/kv_canary/violation_assert_mixin.py 测试辅助 modified 5.54
python/sglang/test/kv_canary/runner_test_base.py 测试基类 modified 5.29

关键符号

WarmupGate should_run_perturbation pick_target_group PerturbConfig.from_env _parse_target_group_kind PerturbManager.__init__ PerturbManager.perturb CanaryManager.__init__ install_canary

关键源码片段

python/sglang/srt/kv_canary/perturb/utils.py dependency-wiring

实现了 WarmupGate 类、should_run_perturbation 和 pick_target_group 工具函数,是扰动框架的核心工具模块。

# WarmupGate 控制扰动启用的预热窗口
class WarmupGate:
    def __init__(self, *, config: PerturbConfig, outer_step_counter_getter: Callable[[], int]) -> None:
        self._config = config
        self._outer_step_counter_getter = outer_step_counter_getter
        self._warmup_disable_logged = False
        self._warmup_enable_logged = False
​
    def is_in_warmup(self) -> bool:
        step = self._outer_step_counter_getter()
        if step < self._config.warmup_steps:
            if not self._warmup_disable_logged:
                logger.info("kv_canary perturb: disabled during warmup window (first %d forward steps)", self._config.warmup_steps)
                self._warmup_disable_logged = True
            return True
        if not self._warmup_enable_logged:
            logger.info("kv_canary perturb: enabled after warmup window at step=%d", step)
            self._warmup_enable_logged = True
        return False# 根据目标类型从 buffer groups 中随机选择一个组
def pick_target_group(*, buffer_groups: tuple[CanaryBufferGroup, ...], target_kind: TargetGroupKind) -> Optional[CanaryBufferGroup]:
    if target_kind == TargetGroupKind.FULL:
        want = PoolKind.FULL
    elif target_kind == TargetGroupKind.SWA:
        want = PoolKind.SWA
    else:
        raise ValueError(f"Unsupported target_group_kind: {target_kind!r}")
    filtered = [g for g in buffer_groups if g.kind == want]
    if not filtered:
        return None
    return random.choice(filtered)
python/sglang/srt/kv_canary/perturb/manager.py core-logic

PerturbManager 类,协调扰动执行,但目前 perturb 方法是空实现。

# PerturbManager 类
class PerturbManager:
    def __init__(self, *, config: PerturbConfig, buffer_groups: tuple[CanaryBufferGroup, ...],
                 outer_step_counter_getter: Callable[[], int], swa_window_size: int = 0, sweep_interval: int = 0) -> None:
        self._config = config
        self._buffer_groups = buffer_groups
        self._outer_step_counter_getter = outer_step_counter_getter
        self._swa_window_size = swa_window_size
        self._sweep_interval = sweep_interval
        self._radix_cache: Optional["BasePrefixCache"] = None
        self._warmup_gate = WarmupGate(config=config, outer_step_counter_getter=outer_step_counter_getter)
​
    def attach_radix_cache(self, radix_cache: "BasePrefixCache") -> None:
        self._radix_cache = radix_cache
​
    def perturb(self, *, maybe_inaccurate_forward_batch: Optional["ForwardBatch"]) -> None:
        pass # 当前为空实现,后续子类将重写

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

当前 perturb 方法为空实现,未来实现可能引入性能开销,需关注 forward 路径的调用频率。环境变量未设置时不会启用扰动,对正常用户无影响。测试配置解析较为完善,降低了配置错误风险。整体影响面有限,主要在于可测试性的提升。

对用户:无直接影响,扰动仅在环境变量明确设置时启用。对系统:增加了少量启动开销。对团队:为测试提供了基础设施,未来实现具体扰动类型后能更全面验证 canary。

核心路径新增空实现可能被忽略 环境变量配置可能导致意外启用

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论