Prhub

#24278 [SMG][CI] Add K8s integration tests + wire into pr-test-rust

原始 PR 作者 Kangyan-Zhou 合并时间 2026-05-05 01:27 文件变更 12 提交数 5 评论 5 代码增减 +1652 / -3

执行摘要

为 SMG 添加端到端 K8s 集成测试并接入 CI

确保 SMG 网关在 K8s 环境中的可靠性,尤其是服务发现和 PD 模式下的滚动更新场景。PR body 提到:"Adds an end-to-end K8s integration test suite for SMG ... exercising the K8s watcher, the reconciliation loop, and the UID-based eviction path used during hostNetwork rollouts."

值得精读。本 PR 展示了如何为网关组件构建端到端集成测试,尤其是 fake worker 的设计、基于 UID 的驱逐验证以及 CI 轻量构建策略,对类似组件的测试设计有参考价值。

讨论亮点
  • Rust 版本无效:gemini-code-assist[bot] 指出 Dockerfile.gateway 中 FROM rust:1.901.90 尚未发布(当前稳定版约 1.85),导致构建失败,建议改为 rust:1.85 或通用标签如 rust:bookworm
  • 未使用函数:gemini-code-assist[bot] 发现 conftest.py 中的 _kubectl_jsontest_reconciliation.py 中的 _get_worker_urls 定义但未被调用,建议移除以保持代码整洁。
  • Pod 清理逻辑可以简化:gemini-code-assist[bot] 认为 _cleanup_pods 函数可以使用 kubectl delete pods -l app=fake-worker --ignore-not-found 替代列举和循环,并避免固定 time.sleep(5)

实现拆解

  1. 测试套件与基础结构:创建 sgl-model-gateway/e2e_test/k8s_integration/conftest.py 提供与 kind 集群交互的核心辅助函数(_kubectl_wait_for_pod_ready_poll_until 等),定义可安全重试的传输层异常集合 _TRANSIENT_ERRORS,以及 reconciliation 等待时间常量。同时新增 fake_worker.py 实现一个轻量 HTTP 服务器,模拟 SGLang worker 的 /health/v1/models/server_info 等端点。
  2. 服务发现与 reconciliation 测试:在 test_reconciliation.py 中实现 TestWatcherDiscovery 测试类,通过程序化地部署和删除 Pod,验证 K8s watcher 能否正确发现新 Pod、worker 注册与注销是否及时、Prometheus 指标是否准确,以及多次 reconciliation 周期内系统保持稳定。
  3. PD 类型转换测试:在 test_pd_type_change.py 中专门模拟 hostNetwork 场景下的 Prefill -> Decode 滚动更新:当旧 prefill Pod 被删除、新 decode Pod 以相同 IP 不同 UID 启动时,网关必须基于 UID 驱逐旧 worker 并注册新 worker。测试通过检查 /workers 端点的 worker_type 字段确保转换正确。
  4. 部署清单与集群设置:在 manifests/ 下提供 namespace、RBAC、基础网关和 PD 模式网关的 YAML 文件;setup.sh 自动化 kind 集群创建、镜像构建/加载、基础清单应用,并支持 SKIP_DOCKER_BUILD=1 环境变量以在 CI 中复用预构建镜像。
  5. CI 集成:在 .github/workflows/pr-test-rust.yml 中新增 k8s-integration 作业,使用 ubuntu-22.04 运行器,通过 docker/build-push-action 并使用 GHA 缓存构建测试镜像,运行完整测试套件,并加入 finish.needs 作为 PR 合并门禁。同时,在 sgl-model-gateway/e2e_test/conftest.py 中调整了导入逻辑,避免在缺少 sglang_router 依赖时破坏测试收集。
文件 模块 状态 重要度
sgl-model-gateway/e2e_test/k8s_integration/test_reconciliation.py 服务发现测试 added 8.02
sgl-model-gateway/e2e_test/k8s_integration/conftest.py 测试配置 added 8.02
sgl-model-gateway/e2e_test/k8s_integration/test_pd_type_change.py PD 类型测试 added 8.02
sgl-model-gateway/e2e_test/k8s_integration/fake_worker.py Fake 工作节点 added 7.01
sgl-model-gateway/e2e_test/k8s_integration/setup.sh 集群设置 added 6.02
.github/workflows/pr-test-rust.yml CI 工作流 modified 5.07
sgl-model-gateway/e2e_test/k8s_integration/Dockerfile.gateway 构建配置 added 4.83
sgl-model-gateway/e2e_test/k8s_integration/manifests/gateway-pd.yaml PD 网关部署 added 5.38
sgl-model-gateway/e2e_test/k8s_integration/manifests/gateway.yaml 基础网关部署 added 5.38
sgl-model-gateway/e2e_test/conftest.py 测试配置 modified 4.22

关键符号

_kubectl _kubectl_json _wait_for_pod_ready _wait_for_deployment_ready _get_gateway_url _get_metrics_url _wait_for_port _poll_until _get_workers _get_worker_count _cleanup_port_forward _get_metrics _get_worker_urls _parse_metric_value _deploy_worker_pod _delete_worker_pod _safe_delete_worker_pod _wait_for_pod_gone _get_workers_by_type _deploy_pd_worker _safe_delete_pod pd_gateway FakeWorkerHandler.do_GET FakeWorkerHandler.log_message TestWatcherDiscovery TestPDRolloutTypeChange

关键源码片段

sgl-model-gateway/e2e_test/k8s_integration/conftest.py test-coverage

所有测试的基座,提供 kubectl 封装、等待函数、端口转发管理、轮询工具和可重试错误分类。

# 只重试传输层异常(连接级别),不重试 HTTP 状态码错误。
# httpx.HTTPStatusError (4xx/5xx) 故意排除,
# 网关返回 5xx 应被视为回归而暴露,不应被静默吞掉。
_TRANSIENT_ERRORS = (
    httpx.TransportError,
    httpx.TimeoutException,
    ConnectionError,
    OSError,
)def _poll_until(
    predicate: Callable[[], bool],
    description: str,
    timeout: int,
    interval: float = 5,
) -> bool:
    """轮询直到 predicate 返回 True,超时时引发 TimeoutError。"""
    deadline = time.time() + timeout
    while time.time() < deadline:
        try:
            if predicate():
                return True
        except _TRANSIENT_ERRORS:
            logger.debug("Transient error while polling %s, retrying", description)
        time.sleep(interval)
    raise TimeoutError(
        f"Timed out after {timeout}s waiting for: {description}"
    )
sgl-model-gateway/e2e_test/k8s_integration/fake_worker.py test-coverage

轻量级 HTTP 服务器模拟真实 worker 端点,支撑服务发现测试。

"""Minimal fake worker that mimics an SGLang worker for integration testing."""
import json
from http.server import BaseHTTPRequestHandler, HTTPServerPORT = 8000class FakeWorkerHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 健康检查端点: always return OK
        if self.path == "/health":
            self.send_response(200)
            self.send_header("Content-Type", "text/plain")
            self.end_headers()
            self.wfile.write(b"OK")
        # 模型列表端点: 返回固定的 fake-model 信息
        elif self.path == "/v1/models":
            body = json.dumps({"object": "list", "data": [{"id": "fake-model", "object": "model", "created": 0, "owned_by": "sglang"}]})
            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(body.encode())
        # 服务器信息端点: 返回 fake-model 的元信息
        elif self.path in ("/server_info", "/get_server_info"):
            body = json.dumps({"model_path": "fake-model", "version": "0.0.0-test", "tp_size": 1, "dp_size": 1})
            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(body.encode())
        # 模型信息端点: 标记为生成模型
        elif self.path in ("/model_info", "/get_model_info"):
            body = json.dumps({"model_path": "fake-model", "is_generation": True})
            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(body.encode())
        else: # 未匹配路径返回 404
            self.send_response(404)
            self.end_headers()
​
    def log_message(self, format, *args):
        pass # 静默日志if __name__ == "__main__":
    server = HTTPServer(("0.0.0.0", PORT), FakeWorkerHandler)
    print(f"Fake worker listening on port {PORT}", flush=True)
    server.serve_forever()

评论区精华

Dockerfile 中 Rust 版本无效 正确性

gemini-code-assist[bot] 指出 Rust 版本 1.90 尚未发布,建议使用有效版本如 1.85 或通用标签如 rust:bookworm。

结论:未在后续提交中明确看到修复,但 PR 已合并,推测已修正或标记为待处理。 · unresolved

未使用函数 _kubectl_json 和 _get_worker_urls style

gemini-code-assist[bot] 建议移除未使用的辅助函数以保持代码整洁。

结论:未发现移除这些函数的后续提交,建议仍未处理。 · unresolved

简化 Pod 清理逻辑 设计

gemini-code-assist[bot] 建议使用 kubectl delete 标签选择器替代手动列举和循环,并移除固定 time.sleep(5)。

结论:未明确解决。 · unresolved

风险与影响

  1. Rust 版本错误:Dockerfile.gateway 中 rust:1.90 镜像标签不存在,可能导致测试镜像构建失败,阻塞 CI。
  2. 外部运行时依赖:测试依赖 kind 集群和 kubectl,要求运行器支持 Docker daemon;当前配置为 ubuntu-22.04 满足条件,但未来环境变更可能导致兼容性问题。
  3. 测试耗时较长:reconciliation 测试需等待完整的 60 秒 tick 加上 30 秒缓冲,约 90 秒的轮询等待在 CI 中可能引发超时或 flaky 失败。
  4. 未使用函数遗留_kubectl_json_get_worker_urls 未使用但保留,增加维护负担和代码歧义。

用户:无直接影响,仅 CI 流程变化。系统:新增 K8s 集成测试作业,首次构建镜像约 5-10 分钟,后续利用 GHA 缓存可缩短;总 CI 时间增加约 5 分钟。团队:为 SMG 核心的 K8s 服务发现和 PD 模式提供自动化回归防护,提升对 hostNetwork 滚动更新场景的信心。可维护性:Dockerfile.gateway 分离了测试与生产构建,减少构建时间;但需确保版本号同步更新。

Rust 版本无效 依赖外部运行时环境 reconciliation 测试等待较长 存在未使用函数

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论