执行摘要
- 一句话:EPD 编码器跨请求批处理图像/音频
- 推荐动作:建议精读 encode_server.py 中的 EncoderScheduler 设计,特别是积累窗口机制和预验证逻辑。值得关注的决策包括 tile 扩展适配、音频采样率修复、环境变量统一声明。如果需要扩展批处理到视频或其他模态,可参考此设计模式。
功能与动机
在 EPD 部署中,编码器服务器以前独立处理每个 /encode HTTP 请求:每个请求一次 HF processor 调用 + 一次 encoder forward + 一次 ZMQ broadcast。在并发流量下,对于每个请求较小的 mm_items,编码器吞吐受启动开销主导,GPU 利用率低。本 PR 在编码器服务器内引入跨请求批处理。
实现拆解
- 新增 PendingRequest 类和 EncoderScheduler 类。FastAPI lifespan 启动后台批处理 worker,从 asyncio.Queue 拉取待处理的 /encode 请求,按模态分组,分发到 batch_encode。
- 实现 MMEncoder.batch_encode 方法,将同模态请求的 mm_items 拼接为单一 processor 输入,执行一次 encoder forward,再按 items_per_req 将结果切片回各请求。
- 添加预广播验证 _validate_request_shape,在 rank 0 广播 batch_encode 任务前检查请求字段合法性,避免因单条无效请求导致 TP 死锁。
- 修复 Kimi-VL/K25 的 tile 扩展对齐问题:新增 _grid_count_per_leaf 计算每个 leaf 的 grid 数,batch slicing 在 grid 空间进行而非 leaf 空间。
- 将编码器相关环境变量(SGLANG_ENCODER_IMAGE_PROCESSOR_USE_GPU / MAX_BATCH_SIZE / REQ_TIMEOUT)从分散的 os.getenv 迁移到 environ.py 的 EnvBool/EnvInt/EnvFloat 声明。
- 修正 MiMo-V2 音频采样率推理:添加 _resolve_audio_sr 从模型配置中读取预期采样率,传递给 load_audio,避免采样率不匹配导致波形扭曲。同时修正音频 hash 计算和 encode-missing 路径,区分音频与图像/视频的布局。
关键文件:
python/sglang/srt/disaggregation/encode_server.py(模块 编码器;类别 source;类型 core-logic;符号 PendingRequest, EncoderScheduler, batch_encode, encode_request): 核心实现文件,引入跨请求批处理调度器 EncoderScheduler、batch_encode 方法、预验证、tile 扩展修复、音频采样率推理等所有核心逻辑。
python/sglang/srt/multimodal/processors/mimo_v2.py(模块 处理器;类别 source;类型 dependency-wiring;符号 init): 将图像处理器 GPU 标志从 os.getenv 切换为 envs 访问,统一环境变量管理。
python/sglang/srt/environ.py(模块 环境配置;类别 source;类型 configuration): 声明编码器批处理相关的三个环境变量,统一配置声明。
关键符号:PendingRequest, EncoderScheduler, batch_encode, _validate_request_shape, _grid_count_per_leaf, _resolve_audio_sr
关键源码片段
python/sglang/srt/disaggregation/encode_server.py
核心实现文件,引入跨请求批处理调度器 EncoderScheduler、batch_encode 方法、预验证、tile 扩展修复、音频采样率推理等所有核心逻辑。
# encode_server.py — 跨请求批处理核心类
import asyncio
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from sglang.srt.environ import envs
from sglang.srt.managers.schedule_batch import Modality
# 从 typed envs 读取编码器批处理配置
ENCODER_MAX_BATCH_SIZE = envs.SGLANG_ENCODER_MAX_BATCH_SIZE.get()
ENCODER_REQ_TIMEOUT = envs.SGLANG_ENCODER_REQ_TIMEOUT.get()
class PendingRequest:
"""包装待处理编码请求及其 asyncio.Future"""
__slots__ = ("request", "future", "submit_time")
def __init__(self, request: dict, loop: asyncio.AbstractEventLoop):
self.request = request
self.future: asyncio.Future = loop.create_future()
self.submit_time = time.time()
# 可批处理的模态(图像 / 音频),视频因预处理参数多变被排除
_BATCHABLE_MODALITIES = {Modality.IMAGE, Modality.AUDIO}
class EncoderScheduler:
"""跨请求编码批处理调度器"""
def __init__(self, pending_queue: asyncio.Queue, mm_encoder: "MMEncoder"):
self.pending_queue = pending_queue
self.mm_encoder = mm_encoder
self.io_executor = ThreadPoolExecutor(
max_workers=envs.SGLANG_ENCODER_MM_LOAD_WORKERS.get()
)
async def _collect_batch(self) -> list:
"""收集最多 ENCODER_MAX_BATCH_SIZE 个请求"""
batch = [await self.pending_queue.get()] # 第一个会等待
while len(batch) < ENCODER_MAX_BATCH_SIZE:
try:
batch.append(self.pending_queue.get_nowait())
except asyncio.QueueEmpty:
break
return batch
async def run_batch_worker(self):
"""后台主循环:收集批次、按模态分组、分发"""
while True:
batch = await self._collect_batch()
if not batch:
continue
groups = defaultdict(list)
for pr in batch:
mod = Modality.from_str(pr.request.get("modality", ""))
if mod in _BATCHABLE_MODALITIES:
groups[mod].append(pr)
else:
# 视频等不可批处理请求回退到单请求路径
asyncio.ensure_future(self._process_single(pr))
for mod, group in groups.items():
asyncio.ensure_future(self._dispatch_group(group, mod))
评论区精华
风险与影响
- 风险:
- 超时机制(SGLANG_ENCODER_REQ_TIMEOUT)若设置过短,可能在高负载下返回 504;若 NCCL 真正 hang,超时前仍会阻塞。
- 批次内任何一个请求在 processor-time 出错会导致整个批次失败(目前有预验证隔离结构性问题,但 processor 内错误仍会广播失败)。
- 移除了 mm_global_cache 路径,依赖该缓存的用户会退化为无缓存路径。
- 音频采样率回退到 16000 Hz 若不符合模型预期,将导致波形失真。
- 影响:
- 用户:批处理带来吞吐提升,但个别请求可能因积累延迟增加;超时返回 504,需客户端容错。
- 系统:减少编码器启动开销,提高 GPU 利用率;新增三个环境变量默认值安全,可配置。
- 团队:核心代码集中在 encode_server.py,易于扩展其他模态批处理;需关注批次失败处理和超时配置。
- 风险标记:批次失败影响所有请求, 音频采样率回退可能失真, 全局缓存路径被移除, 超时可能掩盖 NCCL 死锁
关联脉络
- PR #26281 [CI] Enable EPD CI for EPD architecture enhancements: 构架 EPD CI 测试,本 PR 实现了编码器批处理,两者均属于 EPD 架构演进。
参与讨论