执行摘要
- 一句话:编码器健康检查升级为真实功能验证
- 推荐动作:值得精读,特别是健康检查何时跳过、模态选择策略以及分布式 TP 一致性处理的权衡。适合作为类似 health check 模式(忙时跳过、最小负载)的参考实现。
功能与动机
编码器服务器的 /health 端点之前只检查 encoder 对象是否初始化(encoder is not None),即使真实编码管道故障也返回 200,不适合作为生产环境的 readiness/liveness 探针。PR 旨在与主 http_server.py 模式对齐,执行轻量但真实的前向传递来验证编码器可以实际服务请求。
实现拆解
- 添加常量和导入:在
python/sglang/srt/disaggregation/encode_server.py 中定义 HEALTH_CHECK_TIMEOUT、MINIMUM_PNG_PICTURE_BASE64(32x32 黑色 PNG 的 Base64)、MINIMUM_WAV_SILENCE_BASE64(0.01s 无声 WAV),并从 sglang.srt.constants 导入 HEALTH_CHECK_RID_PREFIX。
- 重写 health_generate 函数:该函数现在:
- 若
encoder 为 None 则返回 503。
- 若
encoder.embedding_to_send 非空(有正在处理的真实请求)则跳过虚拟编码直接返回 200,因为已证明存活。
- 按优先级选择首个可用模态(
image_processor → audio_processor → 无处理器时仅返回 200)。
- 构造包含对应最小 Base64 数据的虚拟请求,设置
req_id 为 HEALTH_CHECK_RID_PREFIX + 时间戳。
- 将请求广播到所有 TP rank 的
send_sockets 以保持分布式状态同步。
- 调用
encoder.encode(dummy_request) 执行实际编码,成功返回 200,异常返回 503。
- 配套调整:无新增测试文件,但 CI 运行了现有的 EPD 分布式测试
test_epd_disaggregation.py。
关键文件:
python/sglang/srt/disaggregation/encode_server.py(模块 编码服务器;类别 source;类型 core-logic;符号 health_generate, HEALTH_CHECK_TIMEOUT, MINIMUM_PNG_PICTURE_BASE64, MINIMUM_WAV_SILENCE_BASE64): 唯一修改文件,新增真实健康检查逻辑,定义最小负载常量和重写 health_generate 端点
关键符号:health_generate
关键源码片段
python/sglang/srt/disaggregation/encode_server.py
唯一修改文件,新增真实健康检查逻辑,定义最小负载常量和重写 health_generate 端点
@app.get("/health")
@app.get("/health_generate")
async def health_generate():
"""
Health check endpoint for the encoder server.
通过执行虚拟编码验证编码器管道是否真正可用,而不是仅检查对象存在。
返回 200 表示健康,503 表示不可用。
"""
if encoder is None:
return Response(status_code=503)
# 当有真实请求在飞行时,跳过虚拟编码——实时流量已经证明存活,
# 这与调度器 `is_fully_idle` 的跳过模式一致。
if encoder.embedding_to_send:
return Response(status_code=200)
# 选择第一个可用的模态用于虚拟编码:优先使用图像处理器,
# 然后是音频处理器,都没有则仅进行存在性检查。
if encoder.image_processor is not None:
mm_items = [f"data:image/png;base64,{MINIMUM_PNG_PICTURE_BASE64}"]
modality = Modality.IMAGE
elif encoder.audio_processor is not None:
mm_items = [f"data:audio/wav;base64,{MINIMUM_WAV_SILENCE_BASE64}"]
modality = Modality.AUDIO
else:
return Response(status_code=200)
try:
req_id = f"{HEALTH_CHECK_RID_PREFIX}_{time.time()}"
dummy_request = {
"mm_items": mm_items,
"modality": modality.name,
"req_id": req_id,
"num_parts": 1,
"part_idx": 0,
}
# 广播到所有 TP rank 以确保分布式操作同步
for socket in send_sockets:
socket.send_pyobj(dummy_request)
await encoder.encode(dummy_request)
return Response(status_code=200)
except Exception:
logger.exception("Health check dummy encode failed")
return Response(status_code=503)
评论区精华
风险与影响
- 风险:
- 分布式同步风险:虚拟编码需广播到所有 TP rank,若
mm_global_cache 启用但各 rank 编码方法不一致,可能导致挂起(gemini-code-assist 指出,但未见修复)。
- GPU 资源争用:虚拟编码虽轻量,但高并发时仍可能干扰真实请求。当前通过
embedding_to_send 非空时跳过缓解。
- 模态覆盖不全:仅支持 image 和 audio,若模型使用其他模态(如 video),虚拟编码可能无法真正验证管道,但 fallback 到无处理器时返回 200 可接受。
- 缺少测试覆盖:无新增单元测试,仅依靠现有 E2E 测试。
- 影响:对用户:启用真正的健康检查,提高生产环境可靠性,使编码器更适合容器编排的 readiness/liveness 探测。对系统:增加极小的额外开销(仅在空闲时),对性能影响可忽略。对团队:无维护负担,代码简洁。
- 风险标记:分布式同步风险, 模态覆盖不全, 缺少测试覆盖
关联脉络
参与讨论