Prhub

#24851 [Session R3] Add routed_experts_start_len for absolute routing slice control

原始 PR 作者 ByronHsu 合并时间 2026-05-11 01:04 文件变更 16 提交数 5 评论 5 代码增减 +288 / -11

执行摘要

添加 routed_experts_start_len 参数,支持路由数据绝对切片控制

多轮 RL rollout 中,return_routed_experts 返回整个对话长度的路由数据,包括已缓存的前缀部分。随对话增长,host gather + ZMQ 负载 O(seqlen) 导致约 1s 的 ITL 峰值,且因 DP 注意力同步机制,单个 rank 的延迟会阻塞整个 batch。PR body 中明确指出这是生产环境 RL 训练循环中的主要 decode 瓶颈。

值得精读。该 PR 展示了如何通过一个简单的小参数消除多轮推理场景中的线性瓶颈,设计思路清晰:在数据采集点引入切片控制,从源头减少不必要的 gather 和序列化。对于其他类似的数据收集(如 logprobs、hidden states)也可参考此模式。

讨论亮点

Review 过程中,zyzshishui 在 python/sglang/srt/managers/scheduler.py 中建议添加 if recv_req.return_routed_experts and 包裹校验逻辑,确保仅在需要路由专家数据时才进行参数校验。最终 PR 接受了该建议,在提交 cb7575d1639555319f6c5bf3deb770e943ce42d7 中设置了默认值 0 并拒绝负值。另外,Qiaolin-Yu 在最终评论中要求添加文档,PR 在提交 98c7055626d0ff59a9051a8ca734d004c7d049c5 中补充了文档。

实现拆解

  1. 参数定义与传递:在 GenerateReqInputTokenizedGenerateReqInputCompletionRequestChatCompletionRequest 等数据结构中新增 routed_experts_start_len: int = 0 字段,并在 Engine.generateEngine.async_generateTokenizerManagerScheduler.handle_generate_request 等入口函数中透传。

  2. 服务端核心逻辑:在 SchedulerOutputProcessorMixin.maybe_collect_routed_experts 中读取 req.routed_experts_start_len,并将其作为 start_len 参数传递给 BaseTopkCapturer.get_topk()。新增 if not req.return_routed_experts: return 提前退出,避免非可选请求的开销。采集后增加行数校验日志,帮助排查静默错误。

  3. 底层数据切片:在 BaseTopkCapturer.get_topk() 中新增 start_len: int = 0 参数,生成索引时从 start_len 开始切片:req_to_token_pool[req_pool_idx][start_len:seqlen-1],并添加防御性 clamp(min(start_len, seqlen-1))和非负校验。

  4. 参数校验:在 Scheduler.handle_generate_request 中,当 return_routed_experts 为 True 时,校验 routed_experts_start_len 是否在 [0, prompt_tokens] 范围内,越界则 abort 请求并提示。

  5. 测试覆盖:新增 TestRoutedExpertsStartLen 测试类,包含 4 个用例:默认行为(等效于 0)、行数正确性、越界 abort、缓存命中场景。测试使用 Qwen3-30B-A3B-FP8 模型,TP=2。

文件 模块 状态 重要度
python/sglang/srt/managers/scheduler_output_processor_mixin.py 调度器 modified 7.27
python/sglang/srt/state_capturer/base.py 状态捕获 modified 6.43
test/registered/rl/test_return_routed_experts.py 测试 modified 7.15
python/sglang/srt/managers/scheduler.py 调度器 modified 6.67
python/sglang/srt/entrypoints/engine.py 引擎 modified 5.28
python/sglang/srt/managers/io_struct.py 数据结构 modified 5.23
python/sglang/srt/entrypoints/openai/protocol.py API 协议 modified 4.49
python/sglang/srt/managers/schedule_batch.py 调度器 modified 4.49
python/sglang/srt/entrypoints/openai/serving_chat.py API 路由 modified 3.95
python/sglang/srt/entrypoints/openai/serving_completions.py API 路由 modified 3.95
python/sglang/srt/managers/tokenizer_manager.py 预处理 modified 3.95
python/sglang/srt/disaggregation/encode_receiver.py 解聚层 modified 3.95

关键符号

get_topk maybe_collect_routed_experts handle_generate_request

关键源码片段

python/sglang/srt/managers/scheduler_output_processor_mixin.py core-logic

核心逻辑变更:`maybe_collect_routed_experts` 增加切片参数传递、提前退出和行数校验。

def maybe_collect_routed_experts(self: Scheduler, req: Req):
    """为已完成的请求收集路由专家数据。    如果请求未设置 return_routed_experts,则立即返回,
    避免非可选请求产生 host gather 开销。    遵守调用方的绝对起始位置,返回 [start_len, seqlen - 1) 范围的数据。
    默认 start_len 为 0,即返回完整序列。    如果结果张量的行数与期望值不匹配,会记录软警告,
    以便捕获静默回归。
    """
    # 非选定请求直接返回,避免不必要的开销
    if not req.return_routed_experts:
        return
    capturer = get_global_experts_capturer()
    if capturer is None:
        return
    start_len = req.routed_experts_start_len
    req.routed_experts = capturer.get_topk(
        req_pool_idx=req.req_pool_idx,
        seqlen=req.seqlen,
        req_to_token_pool=self.req_to_token_pool,
        start_len=start_len,
    )
​
    # 行数校验:期望行数为 seqlen - 1 - start_len,至少为 0
    expected_rows = max(0, req.seqlen - 1 - start_len)
    if (
        req.routed_experts is not None
        and req.routed_experts.shape[0] != expected_rows
    ):
        logger.warning(
            "req %s 的路由专家行数不匹配: 实际 %d, "
            "期望 %d (seqlen=%d, cached_tokens=%d, start_len=%s). "
            "这可能是一个静默错误。",
            req.rid,
            req.routed_experts.shape[0],
            expected_rows,
            req.seqlen,
            req.cached_tokens,
            req.routed_experts_start_len,
        )
python/sglang/srt/state_capturer/base.py core-logic

底层数据采集核心:`get_topk` 新增 start_len 参数,实现内存索引的切片操作。

def get_topk(
    self,
    req_pool_idx: int,
    seqlen: int,
    req_to_token_pool: ReqToTokenPool,
    start_len: int = 0, # 新增:绝对起始位置,默认 0 表示从头开始
) -> torch.Tensor:
    # 防御性检查:拒绝负值
    if start_len < 0:
        raise ValueError(f"{start_len=} 必须为非负数")
    # 钳制到合法范围 [0, seqlen-1],防止 index out of bounds
    start_len = min(start_len, seqlen - 1)
    # 从 start_len 开始切片到最后一个有效位置(seqlen-1)
    # 注意:req_to_token_pool 存储了每个 token 对应的 cache pool 索引
    cache_pool_idx = (
        req_to_token_pool.req_to_token[req_pool_idx][start_len : seqlen - 1]
        .cpu()
        .clone() # 显式 clone 以避免后续修改影响
    )
    return self.host_cache.buffer[cache_pool_idx]

评论区精华

校验逻辑位置优化 设计

zyzshishui 在 scheduler.py 的代码审查中建议添加 `if recv_req.return_routed_experts and` 包裹校验,避免非路由专家请求的无谓校验。

结论:已采纳,后续提交中添加了外围条件判断。 · 已解决

文档补充要求 documentation

Qiaolin-Yu 在 final review 中要求补充文档,指出 'better to add some doc'。

结论:已通过单独提交补充了文档,PR 被批准并合并。 · 已解决

风险与影响

兼容性风险routed_experts_start_len 默认 0,与之前未使用该参数的行为一致,因此不会破坏现有 API。但需确保旧客户端未设置该字段时,服务端能够正确缺省为 0。

正确性风险:切片索引逻辑中的 start_len 需严格小于 seqlen-1,代码已有 min 防御;但若 start_len 过大导致返回空张量,调用方需要能够正确处理。测试中已覆盖越界场景。

性能风险:无,该变更仅在启用 return_routed_experts 的场景下生效,且默认路径(start_len=0)与之前完全一致。对于使用切片的请求,性能收益显著。

安全风险:无。

对用户的影响:为 RL 训练框架(如使用 Kimi-K2、Qwen3-30B-A3B 的 MoE 模型)提供显著的性能优化,在最差情况下(32k 上下文)将路由数据收集延迟从 121ms 降至 1.9ms。用户需在客户端设置 routed_experts_start_len 为已累积的 prompt 长度,以获得增量路由数据。

对系统的影响:减少不必要的 GPU->CPU 数据搬运和 ZMQ 序列化负载,降低 decode 阶段的尾延迟。由于 DP 注意力机制,所有 DP rank 同步,本优化可整体提升系统吞吐。

对团队的影响:代码改动涉及请求生命周期多个模块,但均为简单的参数透传,理解和维护成本低。测试覆盖完整,降低了回归风险。

向后兼容 参数边界验证 切片行为依赖调用方正确传参

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论