Prhub

#27205 Fix customized_info incremental streaming

原始 PR 作者 aurickq 合并时间 2026-06-05 21:55 文件变更 2 提交数 3 评论 8 代码增减 +207 / -9

执行摘要

修复 incremental streaming 下 customized_info 切分与累积逻辑

Custom sampler backends can attach per-token metadata through customized_info. With incremental streaming enabled, that metadata should follow the same chunking semantics as output_ids: streaming chunks should contain only newly emitted values, while non-streaming responses should contain the accumulated final sequence.

值得精读。该 PR 展示了如何将动态键纳入现有流式框架,以及通过继承 Engine 进行集成测试的模式。对于需要自定义采样器并依赖流式正确性的开发者有直接参考价值。

讨论亮点

Reviewer ispobock 建议将测试文件移动到 test/registered/unit/managers 目录,作者随后执行了移动(commit f692b4c5b3a5)。此外无其他有意义的讨论。

实现拆解

实现分为四步:

  1. python/sglang/srt/managers/tokenizer_manager.pyStreamingOutput 数据类中添加 customized_info_accumulated 字段,用于累积每个请求的定制信息。
  2. 修改 _slice_streaming_output_meta_info 函数,增加 customized_info_keys 参数,将定制信息键纳入切片集合,确保流式块中只包含新产生的数据。
  3. 修改 _coalesce_streaming_chunks 函数,接受 customized_info_keys 参数,在合并批量通知时同时合并定制信息键的值。
  4. _wait_one_response 中传入 customized_info_accumulated.keys() 给合并函数,在 _handle_batch_output 中将调度器返回的定制信息累积到状态中,并在 meta_info 中设置当前累积值。
  5. 新增测试文件 test/registered/unit/managers/test_customized_info_streaming.py,包含自定义采样器、引擎子类和多个测试场景。
文件 模块 状态 重要度
test/registered/unit/managers/test_customized_info_streaming.py 流式测试 added 7.35
python/sglang/srt/managers/tokenizer_manager.py 流式输出 modified 6.86

关键符号

_slice_streaming_output_meta_info _coalesce_streaming_chunks CustomizedInfoSampler.forward install_customized_info_sampler run_scheduler_process_with_customized_info_sampler

关键源码片段

test/registered/unit/managers/test_customized_info_streaming.py test-coverage

新增集成测试,验证 customized_info 在流式、非流式和 stream_interval 场景下与 output_ids 的行为一致性

# test/registered/unit/managers/test_customized_info_streaming.py
# 自定义采样器:将每个 step 的 batch_next_token_ids 镜像到 customized_info
class CustomizedInfoSampler(Sampler):
    def forward(
        self,
        logits_output: "LogitsProcessorOutput",
        sampling_info: "SamplingBatchInfo",
        return_logprob: bool,
        top_logprobs_nums: List[int],
        token_ids_logprobs: List[List[int]],
        positions: torch.Tensor,
    ) -> torch.Tensor:
        # 先通过父类获取采样 token id
        batch_next_token_ids = super().forward(
            logits_output,
            sampling_info,
            return_logprob,
            top_logprobs_nums,
            token_ids_logprobs,
            positions,
        )
        # 初始化 customized_info dict
        if logits_output.customized_info is None:
            logits_output.customized_info = {}
        # 将 token id list 存入自定义字段
        logits_output.customized_info[CUSTOMIZED_INFO_FIELD] = (
            batch_next_token_ids.detach().cpu().tolist()
        )
        return batch_next_token_ids
​
​
def install_customized_info_sampler() -> None:
    """在 sampler 构造前注册自定义后端。"""
    register_sampler_backend(CUSTOMIZED_INFO_SAMPLER_BACKEND, CustomizedInfoSampler)
python/sglang/srt/managers/tokenizer_manager.py core-logic

核心修复文件,修改流式输出切片与合并逻辑,添加累积字段支持动态 customized_info 键

# python/sglang/srt/managers/tokenizer_manager.py
# 修改 _slice_streaming_output_meta_info 支持动态定制键def _slice_streaming_output_meta_info(
    meta_info: Dict[Any, Any],
    last_output_offset: int,
    customized_info_keys: Optional[Iterable[str]] = None,
) -> None:
    """按 last_output_offset 对元数据进行切片。"""
    # 合并内置键与定制键
    streaming_meta_info_keys = set(_INCREMENTAL_STREAMING_META_INFO_KEYS)
    if customized_info_keys is not None:
        streaming_meta_info_keys.update(customized_info_keys)
    # 切片操作
    for key in meta_info.keys() & streaming_meta_info_keys:
        meta_info[key] = meta_info[key][last_output_offset:]
​
​
# 修改 _coalesce_streaming_chunks 合并定制信息def _coalesce_streaming_chunks(
    self,
    out_list: list,
    rid: str,
    customized_info_keys: Optional[Iterable[str]] = None,
) -> dict:
    """合并多条流式块为一条,并正确拼接定制信息列表。"""
    if "meta_info" in out:
        meta_info_list = [chunk["meta_info"] for chunk in out_list]
        meta_info = dict(meta_info_list[-1])
        incremental_streaming_keys = set(_INCREMENTAL_STREAMING_META_INFO_KEYS)
        if customized_info_keys is not None:
            incremental_streaming_keys.update(customized_info_keys)
        for key in incremental_streaming_keys:
            if any(key in m for m in meta_info_list):
                meta_info[key] = [
                    item
                    for m in meta_info_list
                    for item in m.get(key, [])
                ]
        out["meta_info"] = meta_info
    return out

评论区精华

测试文件位置 question

Reviewer ispobock 建议将测试文件移动到 test/registered/unit/managers 目录下

结论:作者执行移动并提交 · 已解决

风险与影响

核心风险在于 tokenizer_manager.py 的流式输出切分与合并逻辑,如果 customized_info_keys 处理不当,可能导致流式块中数据错乱或非流式响应中丢失累积数据。但已有集成测试覆盖流式、非流式和 stream_interval 场景,风险可控。

影响范围:使用自定义采样器后端并通过 customized_info 传递每 token 元数据的用户。影响程度:修复了 incremental streaming 下的正确性问题,使 customized_info 行为与 output_ids 一致,属于重要 bugfix,但 API 无变化,兼容性良好。

增量流式关键路径 动态键偶发性问题

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论