Prhub

#25964 [EPD] Cross-request batching for image/audio encoder

原始 PR 作者 Abatom 合并时间 2026-05-26 15:39 文件变更 3 提交数 15 评论 21 代码增减 +505 / -66

执行摘要

EPD 编码器跨请求批处理图像 / 音频

在 EPD 部署中,编码器服务器以前独立处理每个 /encode HTTP 请求:每个请求一次 HF processor 调用 + 一次 encoder forward + 一次 ZMQ broadcast。在并发流量下,对于每个请求较小的 mm_items,编码器吞吐受启动开销主导,GPU 利用率低。本 PR 在编码器服务器内引入跨请求批处理。

建议精读 encode_server.py 中的 EncoderScheduler 设计,特别是积累窗口机制和预验证逻辑。值得关注的决策包括 tile 扩展适配、音频采样率修复、环境变量统一声明。如果需要扩展批处理到视频或其他模态,可参考此设计模式。

讨论亮点
  • ZhengWG 询问批处理积累窗口是否有效,Abatom 解释由于前一批下载时事件循环交出控制权,新请求能够积累,批处理大小通常超过 1。
  • ZhengWG 指出 Kimi-VL/K25 的 tile 扩展会破坏批处理切片,Abatom 添加了 _grid_count_per_leaf 在 grid 空间切片。
  • ShangmingCai 建议广播前验证批处理是否可处理,Abatom 添加了 _validate_request_shape。
  • gemini-code-assist[bot] 指出 aux_data 可能带整个批次的数据给每个请求,Abatom 移除了 batch_encode 中的 aux_data 构建(因为 batch_encode 只处理 IMAGE/AUDIO,不涉及 video-meta)。
  • ZhengWG 建议将环境变量集中到 environ.py,完成。

实现拆解

  1. 新增 PendingRequest 类和 EncoderScheduler 类。FastAPI lifespan 启动后台批处理 worker,从 asyncio.Queue 拉取待处理的 /encode 请求,按模态分组,分发到 batch_encode。
  2. 实现 MMEncoder.batch_encode 方法,将同模态请求的 mm_items 拼接为单一 processor 输入,执行一次 encoder forward,再按 items_per_req 将结果切片回各请求。
  3. 添加预广播验证 _validate_request_shape,在 rank 0 广播 batch_encode 任务前检查请求字段合法性,避免因单条无效请求导致 TP 死锁。
  4. 修复 Kimi-VL/K25 的 tile 扩展对齐问题:新增 _grid_count_per_leaf 计算每个 leaf 的 grid 数,batch slicing 在 grid 空间进行而非 leaf 空间。
  5. 将编码器相关环境变量(SGLANG_ENCODER_IMAGE_PROCESSOR_USE_GPU / MAX_BATCH_SIZE / REQ_TIMEOUT)从分散的 os.getenv 迁移到 environ.py 的 EnvBool/EnvInt/EnvFloat 声明。
  6. 修正 MiMo-V2 音频采样率推理:添加 _resolve_audio_sr 从模型配置中读取预期采样率,传递给 load_audio,避免采样率不匹配导致波形扭曲。同时修正音频 hash 计算和 encode-missing 路径,区分音频与图像/视频的布局。
文件 模块 状态 重要度
python/sglang/srt/disaggregation/encode_server.py 编码器 modified 8.84
python/sglang/srt/multimodal/processors/mimo_v2.py 处理器 modified 5.3
python/sglang/srt/environ.py 环境配置 modified 4.99

关键符号

PendingRequest EncoderScheduler batch_encode _validate_request_shape _grid_count_per_leaf _resolve_audio_sr

关键源码片段

python/sglang/srt/disaggregation/encode_server.py core-logic

核心实现文件,引入跨请求批处理调度器 EncoderScheduler、batch_encode 方法、预验证、tile 扩展修复、音频采样率推理等所有核心逻辑。

# encode_server.py — 跨请求批处理核心类import asyncio
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutorfrom sglang.srt.environ import envs
from sglang.srt.managers.schedule_batch import Modality# 从 typed envs 读取编码器批处理配置
ENCODER_MAX_BATCH_SIZE = envs.SGLANG_ENCODER_MAX_BATCH_SIZE.get()
ENCODER_REQ_TIMEOUT = envs.SGLANG_ENCODER_REQ_TIMEOUT.get()
​
​
class PendingRequest:
    """包装待处理编码请求及其 asyncio.Future"""
    __slots__ = ("request", "future", "submit_time")
​
    def __init__(self, request: dict, loop: asyncio.AbstractEventLoop):
        self.request = request
        self.future: asyncio.Future = loop.create_future()
        self.submit_time = time.time()
​
​
# 可批处理的模态(图像 / 音频),视频因预处理参数多变被排除
_BATCHABLE_MODALITIES = {Modality.IMAGE, Modality.AUDIO}
​
​
class EncoderScheduler:
    """跨请求编码批处理调度器"""
​
    def __init__(self, pending_queue: asyncio.Queue, mm_encoder: "MMEncoder"):
        self.pending_queue = pending_queue
        self.mm_encoder = mm_encoder
        self.io_executor = ThreadPoolExecutor(
            max_workers=envs.SGLANG_ENCODER_MM_LOAD_WORKERS.get()
        )
​
    async def _collect_batch(self) -> list:
        """收集最多 ENCODER_MAX_BATCH_SIZE 个请求"""
        batch = [await self.pending_queue.get()] # 第一个会等待
        while len(batch) < ENCODER_MAX_BATCH_SIZE:
            try:
                batch.append(self.pending_queue.get_nowait())
            except asyncio.QueueEmpty:
                break
        return batch
​
    async def run_batch_worker(self):
        """后台主循环:收集批次、按模态分组、分发"""
        while True:
            batch = await self._collect_batch()
            if not batch:
                continue
            groups = defaultdict(list)
            for pr in batch:
                mod = Modality.from_str(pr.request.get("modality", ""))
                if mod in _BATCHABLE_MODALITIES:
                    groups[mod].append(pr)
                else:
                    # 视频等不可批处理请求回退到单请求路径
                    asyncio.ensure_future(self._process_single(pr))
            for mod, group in groups.items():
                asyncio.ensure_future(self._dispatch_group(group, mod))

评论区精华

批处理积累窗口仅取 1 个请求 性能

ZhengWG 质疑第一个请求后 get_nowait 导致批次几乎始终大小为 1。

结论:Abatom 解释前一批下载时事件循环释放,新请求可积累,批次大小通常大于 1。 · 已解决

Kimi-VL tile 扩展破坏切片 正确性

ZhengWG 指出假设 1 leaf → 1 grid entry 对 Kimi-VL/K25 不成立。

结论:Abatom 添加 _grid_count_per_leaf 返回每个 leaf 的 grid 计数,切片在 grid 空间进行。 · 已解决

广播前验证批次有效性 正确性

ShangmingCai 建议只确保批次可处理才广播。

结论:Abatom 添加 _validate_request_shape 进行预验证,拒绝无效请求避免 TP 死锁。 · 已解决

批次级 aux_data 透传给各请求 正确性

gemini-code-assist[bot] 指出 batch_encode 中 aux_data 整个批次授予各请求。

结论:Abatom 移除了 batch_encode 中的 aux_data 构建(因为 IMAGE/AUDIO 不涉及其内容)。 · 已解决

环境变量集中到 environ.py refactor

ZhengWG 建议将编码器环境变量声明到 environ.py。

结论:完成迁移。 · 已解决

风险与影响

  • 超时机制(SGLANG_ENCODER_REQ_TIMEOUT)若设置过短,可能在高负载下返回 504;若 NCCL 真正 hang,超时前仍会阻塞。
  • 批次内任何一个请求在 processor-time 出错会导致整个批次失败(目前有预验证隔离结构性问题,但 processor 内错误仍会广播失败)。
  • 移除了 mm_global_cache 路径,依赖该缓存的用户会退化为无缓存路径。
  • 音频采样率回退到 16000 Hz 若不符合模型预期,将导致波形失真。
  • 用户:批处理带来吞吐提升,但个别请求可能因积累延迟增加;超时返回 504,需客户端容错。
  • 系统:减少编码器启动开销,提高 GPU 利用率;新增三个环境变量默认值安全,可配置。
  • 团队:核心代码集中在 encode_server.py,易于扩展其他模态批处理;需关注批次失败处理和超时配置。
批次失败影响所有请求 音频采样率回退可能失真 全局缓存路径被移除 超时可能掩盖 NCCL 死锁

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论