Prhub

#25588 perf(mimo-v2-epd): enable GPU image preprocess and parallel video decode

原始 PR 作者 Abatom 合并时间 2026-05-19 11:47 文件变更 3 提交数 5 评论 10 代码增减 +75 / -8

执行摘要

MIMO-v2 EPD: GPU 图像预处理与并行视频解码

提升 MIMO-v2 EPD(Encoder Prefill Decoder)编码器的性能,通过 GPU 图像预处理和多线程视频解码显著缩短视频编码时间。同时修复编码器初始化时缺少 hf_config 配置导致的视频音频元数据提取问题。

值得精读,尤其是线程数调优的 benchmark 数据和权衡过程。设计上配置灵活、默认上限保守,对类似优化有参考意义。建议补充分支测试并考虑将 import torch 移到模块顶层。

讨论亮点

review 中主要讨论集中在并行线程数上限:gemini-code-assist 指出 os.cpu_count() 可能过多,ShangmingCai 要求限制上限。作者提供了 benchmark(1小时视频,128线程26s,32线程28s,16线程32s,8线程40s,4线程56s,1线程156s),最终将自动上限从32调整为16。另一个讨论是关于将 import torch 移到模块顶层,作者未采纳。

实现拆解

  1. 多线程并行解码:在 VideoDecoderWrapper 中新增 num_decode_threads 参数,get_frames_as_tensor 方法根据条件启用并行路径,使用 ThreadPoolExecutor 分块解码。自动线程数上限 16。
  2. 配置集成:在 MimoV2Processor 中添加 video_decode_num_threads 参数,从配置或环境变量读取,并传递给 VideoDecoderWrapper
  3. GPU 图像预处理:修改 preprocess_for_encoder,在调用 get_visual_transform 时传递 self.device
  4. 修复元数据提取:在 tokenizer_manager.pyinit_disaggregation 中,创建 mm_receiver 时增加 hf_config 参数。
文件 模块 状态 重要度
python/sglang/srt/utils/video_decoder.py 视频解码 modified 8.09
python/sglang/srt/multimodal/processors/mimo_v2.py MIMO-v2 处理器 modified 6.7
python/sglang/srt/managers/tokenizer_manager.py 令牌化管理器 modified 4.35

关键符号

VideoDecoderWrapper.__init__ VideoDecoderWrapper._parallel_decode VideoDecoderWrapper._decode_chunk VideoDecoderWrapper.get_frames_as_tensor MimoV2Processor.__init__ MimoV2Processor._load_video_for_encoder MimoV2Processor.preprocess_for_encoder TokenizerManager.init_disaggregation

关键源码片段

python/sglang/srt/utils/video_decoder.py core-logic

视频解码器核心,新增多线程并行帧提取逻辑,显著提升解码性能。

# 文件 : python/sglang/srt/utils/video_decoder.pydef get_frames_as_tensor(self, indices: list):
    """Return frames at given indices as a torch tensor (NHWC, uint8, pinned memory)."""
    import torch
​
    # 并行解码条件:仅 torchcodec 后端,且线程数不为 1,且帧数 > 1
    if (
        _BACKEND == "torchcodec"
        and self._num_decode_threads != 1
        and len(indices) > 1
    ):
        num_threads = self._num_decode_threads
        if num_threads <= 0:
            # 自动模式:上限 16 线程
            num_threads = min(os.cpu_count() or 8, 16)
        num_threads = min(num_threads, len(indices))
        if num_threads > 1:
            return self._parallel_decode(indices, num_threads)
​
    # fallback 到单解码器路径
    if _BACKEND == "torchcodec":
        batch = self._decoder.get_frames_at(indices)
        return batch.data.pin_memory()
    else:
        arr = self._decoder.get_batch(indices).asnumpy()
        return torch.from_numpy(arr).pin_memory()
​
​
def _parallel_decode(self, indices, num_threads):
    """使用多个 VideoDecoder 实例并行解码帧。    将帧索引分块,每个块由一个独立解码器在独立线程中处理,
    最后按原始顺序合并返回。
    """
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import torch
    import numpy as np
​
    # 将帧索引池均匀分组
    chunks = [list(c) for c in np.array_split(indices, num_threads) if len(c) > 0]
    source = self._source
    kwargs = self._tc_kwargs
​
    def _decode_chunk(chunk):
        # 每个线程新建一个解码器实例:避免共享状态
        d = VideoDecoder(source, **kwargs)
        return d.get_frames_at(chunk).data
​
    results = [None] * len(chunks)
    with ThreadPoolExecutor(max_workers=len(chunks)) as executor:
        future_to_idx = {
            executor.submit(_decode_chunk, chunk): idx
            for idx, chunk in enumerate(chunks)
        }
        for future in as_completed(future_to_idx):
            idx = future_to_idx[future]
            results[idx] = future.result()
​
    # 按原始顺序拼接并返回 pinned tensor
    return torch.cat(results, dim=0).pin_memory()
python/sglang/srt/multimodal/processors/mimo_v2.py dependency-wiring

MIMO-v2 处理器,整合并行解码参数和 GPU 图像预处理,连接配置系统与解码器。

# 文件 : python/sglang/srt/multimodal/processors/mimo_v2.py# 从配置或环境变量读取视频解码线程数
video_cfg = (mm_config or {}).get("video", {})
if "video_decode_num_threads" in video_cfg:
    kwargs["video_decode_num_threads"] = video_cfg["video_decode_num_threads"]
else:
    from sglang.srt.utils.common import get_int_env_var
    kwargs["video_decode_num_threads"] = get_int_env_var(
        "SGLANG_ENCODER_VIDEO_DECODE_NUM_THREADS", 0
    )

评论区精华

并行解码线程数上限设置 性能

gemini-code-assist 指出 os.cpu_count() 默认值在高核心机器上可能导致线程爆炸和资源耗尽。ShangmingCai 要求使用 min 限制上限,并建议使用更安全的值(16 或 8)。作者提供了不同线程数下的端到端基准测试(1 小时视频:128 线程 26s,32 线程 28s,16 线程 32s,8 线程 40s,4 线程 56s,1 线程 156s),确认 16 线程即可获得接近最优的性能。

结论:最终将自动上限从 os.cpu_count() 修改为 min(num_threads, 16),并保留可配置性。 · 已解决

本地 torch 导入移至模块顶层 style

gemini-code-assist 建议将 get_frames_as_tensor 内部的 import torch 移到文件顶部,以减少运行时开销并遵循 PEP 8。

结论:作者未进行修改(最终代码仍为本地导入),该建议未被采纳。 · unresolved

风险与影响

资源竞争风险:并行解码线程可能与其他 CPU 密集型模块(如 Hicache + PD per rank)竞争 CPU,在高负载下可能导致调度延迟。GPU 预处理依赖 device 配置,配置错误会引发运行时错误。缺少直接测试覆盖新路径。torchcodec CUDA 后端仍为 beta,稳定性不确定。

用户侧:使用 MIMO-v2 EPD 编码器的用户将获得视频编码加速,尤其是长视频(benchmark 显示 16 线程约 32s vs 单线程 156s)。系统侧:增加 CPU 线程使用,管理员需注意资源隔离。团队侧:提供多线程调优的 benchmark 数据作为参考。

多线程资源竞争 GPU 设备依赖 缺少测试覆盖

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论