Prhub

#24513 Add e2e test with log snapshot in dumper grafter

原始 PR 作者 fzyzcjy 合并时间 2026-05-06 17:00 文件变更 2 提交数 10 评论 2 代码增减 +898 / -432

执行摘要

为 Grafter 添加端到端测试与日志快照

Grafter 功能允许跨系统张量移植,但缺乏端到端测试和日志快照,导致调试困难。本 PR 通过添加 E2E 测试和标准化日志,填补了这一空白,为后续功能迭代提供坚实基础。

建议团队关注 review 中的版本兼容性建议并在后续 PR 中修复;_Grafter 的设计模式(GraftTransformInput 数据类、_default_transform 可插拔回调)值得在类似跨进程通信场景中复用。

讨论亮点

Review 评论中,gemini-code-assist[bot] 指出 dumper.py 中检测 PyTorch 版本的字符串分割方式脆弱,建议改用 hasattr 判断 backend_options 属性是否存在,以提高跨版本兼容性。该建议未被作者合并前解决(评论时 PR 已合并)。

实现拆解

  1. 源码重构(dumper.py):在 _Grafter 类中添加 enabled 属性,使 maybe_intercept 入口处可提前短路;统一将 print 调用替换为 _log 前缀函数,便于日志过滤和快照。
  2. 功能增强:优化参数类型标注(value: Any),完善 docstring 并引用示例测试类;移除冗余的 Optional[dist.ProcessGroup] 类型声明。
  3. 测试扩展(test_dumper.py):新增 TestGrafterConfig 类,包含 test_from_env_parses_filterstest_from_env_parses_int_fieldstest_from_env_enable_flagtest_disabled_does_not_validate_other_fields 等单元测试,覆盖 DumperConfig.from_env 的字段解析逻辑。
  4. E2E 测试:添加 TestGrafterE2eExample 类的端到端测试,验证基线/目标角色间的张量移植与 _default_transform 回调。
  5. 日志快照:通过 _log 统一日志格式,并新增 TestLogtest_log_format 确保日志输出符合预期格式。
文件 模块 状态 重要度
python/sglang/srt/debug_utils/dumper.py 调试工具 modified 8.56
test/registered/debug_utils/test_dumper.py 调试测试 modified 7.24

关键符号

_Grafter.__init__ _Grafter.enabled _Grafter.maybe_intercept _sender_slice _apply_transform _default_transform _default_transform_error test_from_env_parses_filters test_from_env_parses_int_fields test_from_env_enable_flag test_disabled_does_not_validate_other_fields

关键源码片段

python/sglang/srt/debug_utils/dumper.py core-logic

核心源码,包含 Grafter 类的重构与功能增强,涉及 `enabled` 属性、`_log` 统一、类型标注改进等。

class _Grafter:
    """Cross-system tensor transplant. Triggered silently from dumper.dump.    Both sides set the SAME grafter_b2t_filter and grafter_t2b_filter. The
    only per-side difference is grafter_role ("baseline" | "target"), which
    determines whether a name match means send or recv.    Graft global rank layout: baseline occupies ranks 0..baseline_world-1;
    target occupies ranks baseline_world..baseline_world+target_world-1.
    """
​
    def __init__(self, *, config: DumperConfig):
        self._config = config
        self._pg = None
​
    @property
    def enabled(self) -> bool:
        # 新增属性:允许外部在调用 maybe_intercept 前快速判断是否启用
        return self._config.grafter_enable
​
    def maybe_intercept(
        self, *, value: Any, tags: dict, extras: Optional[dict] = None
    ) -> None:
        """Intercept a dumper.dump call. `extras` is per-call auxiliary data
        (e.g., shard layout, dtype hint) that the sender attaches and the
        recv side's transform receives as `received_extras_list`."""
        cfg = self._config
        if not cfg.grafter_enable:
            return
​
        direction = self._classify_direction(tags)
        if direction is None:
            return
​
        if not isinstance(value, torch.Tensor):
            # 非 tensor 值直接跳过,并记录日志
            _log(
                f"[Grafter] tags={tags} matched grafter_{direction.value}_filter but "
                f"value is not a torch.Tensor (got type={type(value).__name__}); "
                f"skipping graft."
            )
            return
        # ... 后续发送 / 接收逻辑
test/registered/debug_utils/test_dumper.py test-coverage

测试文件,新增 Grafter 配置解析单元测试、E2E 测试和日志格式测试,覆盖关键功能路径。

class TestGrafterConfig:
    def test_from_env_parses_filters(self):
        # 验证环境变量 DUMPER_GRAFTER_B2T_FILTER 和 DUMPER_GRAFTER_T2B_FILTER 被正确解析
        with temp_set_env(
            DUMPER_GRAFTER_B2T_FILTER="name == 'x'",
            DUMPER_GRAFTER_T2B_FILTER="name == 'y'",
        ):
            cfg = DumperConfig.from_env()
            assert cfg.grafter_b2t_filter == "name == 'x'"
            assert cfg.grafter_t2b_filter == "name == 'y'"
​
    def test_from_env_parses_int_fields(self):
        # 验证整数字段(world_size, port, timeout)从环境变量正确解析并保持类型
        with temp_set_env(
            DUMPER_GRAFTER_BASELINE_WORLD_SIZE="8",
            DUMPER_GRAFTER_TARGET_WORLD_SIZE="8",
            DUMPER_GRAFTER_MASTER_PORT="29999",
            DUMPER_GRAFTER_TIMEOUT="120",
        ):
            cfg = DumperConfig.from_env()
            assert cfg.grafter_baseline_world_size == 8
            assert type(cfg.grafter_baseline_world_size) is int
            assert cfg.grafter_timeout == 120
​
    def test_from_env_enable_flag(self):
        # 验证 enable 标志在提供所有必需字段时被正确设置为 True
        with temp_set_env(
            DUMPER_GRAFTER_ENABLE="1",
            DUMPER_GRAFTER_ROLE="baseline",
            DUMPER_GRAFTER_MASTER_ADDRESS="127.0.0.1",
            DUMPER_GRAFTER_MASTER_PORT="29999",
            DUMPER_GRAFTER_BASELINE_WORLD_SIZE="1",
            DUMPER_GRAFTER_TARGET_WORLD_SIZE="1",
            DUMPER_GRAFTER_B2T_FILTER="name == 'x'",
        ):
            assert DumperConfig.from_env().grafter_enable is True
        # 验证 "false" 字符串能正确解析为 False
        with temp_set_env(DUMPER_GRAFTER_ENABLE="false"):
            assert DumperConfig.from_env().grafter_enable is False
​
    def test_disabled_does_not_validate_other_fields(self):
        # 当 grafter_enable 为 False 时,其他 grafter_* 字段即使为不合理默认值也不会触发断言
        cfg = DumperConfig(grafter_enable=False)
        assert cfg.grafter_enable is False

评论区精华

PyTorch 版本兼容性检查 正确性

Review 指出使用 `torch.__version__` 字符串分割判断版本号脆弱,建议改用 `hasattr` 检测 `backend_options` 属性。

结论:作者未在合并前采纳,目前仍使用版本号解析。 · unresolved

风险与影响

  1. 兼容性风险dumper.pytorch.__version__ 解析方式可能在某些 PyTorch 构建版本(如 dev 版)出错,导致 ProcessGroup 初始化失败。建议按 review 意见修复。
  2. 回归风险_Grafterenabled 属性新增可能改变外部调用逻辑,但 maybe_intercept 入口已保留 if not cfg.grafter_enable 检查,风险较低。
  3. 测试覆盖:新增测试主要集中在配置解析,E2E 测试依赖分布式环境,可能因环境问题跳过,存在覆盖盲区。

影响范围限于 dumper 模块,主要面向调试场景,不影响生产推理路径。对用户:更易追踪跨系统张量传递问题;对团队:测试覆盖率提升有助于防止回归。

torch 版本兼容性风险 分布式测试覆盖不足

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论