Prhub

#25851 sgl-router: experimental Rust HTTP router for SGLang worker pools

原始 PR 作者 Kangyan-Zhou 合并时间 2026-05-25 15:34 文件变更 131 提交数 158 评论 2 代码增减 +28623 / -55

执行摘要

实验性 Rust HTTP 路由代理,面向 SGLang 推理节点池

PR 说明指出,sgl-router 的目标是扮演与 model-gateway 侧车(SMG)相同的角色,但以单一直立 Rust 二进制程序代替 Python 服务。它旨在提供更轻量、自包含的路由层,通过 ZMQ 订阅 SGLang 工作节点的 KV 缓存事件来实现缓存感知路由,支持 K8s 环境下的 PD 分离部署。

该 PR 为草案状态,不推荐立即合并。但值得所有关注 SGLang 路由层和 PD 分离部署的工程师精读其设计文档和关键源码片段,尤其是 cache_aware_zmq.rs 的选择算法、k8s.rs 的 EndpointSlice 处理、以及 active_load.rs 的 RAII 守卫与 janitor 实现。PR 的分层监督和每 commit 可审查的做法值得参考。

讨论亮点

PR 为草案状态,无正式代码审查评论。技术讨论主要反映在作者自述的提交消息和 PR body 的已知限制部分:作者提到测试布局重组、超时默认值提升至 300s/600s、K8s+PD 流水线集成测试的覆盖策略等。此外,作者在 M4 review-findings 提交中列出了 9 项自审修复,涵盖断路器分配、过时请求处理、PD 池隔离错误码区分等。关键权衡包括:使用 Arc<HashTree> 的读写锁粗粒度设计(写侧用写锁,读侧用读锁+原子最后使用时间);缓存感知策略首先检查负载不均衡以避免热点集中;K8s 发现模式下 PD 角色分类基于客户端侧子集标签匹配。未解决的疑虑包括:/server_info 获取失败时的回退行为(目前仅记录并依赖发现后端的默认分类);节点模式在注册后改变(如 worker 从 prefill 切换到 decode 无通知)的处理未覆盖。

实现拆解

  1. 项目脚手架与配置:建立 Cargo workspace、工具链锁定、Cargo.lock;在 experimental/sgl-router/src/config/types.rs 定义顶层配置结构 Config(包含 server、observability、models、discovery、proxy、active_load 等区块),支持 YAML/TOML 加载。PolicyKind 枚举将路由策略序列化为蛇形字符串,拒绝未知值。
  2. 发现与工作节点注册:实现 Discovery trait,定义 WorkerIdWorkerSpecDiscoveryEvent 等类型。后端 static_urls(取代早期 static_file)直接由配置的 URL 列表提供节点;K8s 后端(experimental/sgl-router/src/discovery/k8s.rs)通过 kube-rs 监听 EndpointSlice 资源,使用客户端子集标签匹配区分 Plain/PD 模式。WorkerManager 消费发现事件流,维护 WorkerRegistry(DashMap 实现),并在注册时通过 /server_info 内省来补充模型名、事件配置和分离角色。
  3. 路由策略与断连器:实现 Policy trait,现有 RoundRobinRandomPowerOfTwoChoicesCacheAwareZmqexperimental/sgl-router/src/policies/cache_aware_zmq.rs)。缓存感知策略结合哈希树匹配得分与动态负载:先检查负载是否严重不均衡,若不均衡则直接选最低负载节点;否则解析请求体、提取 prompt 文本、计算块哈希、查询哈希树匹配前缀并选匹配率超阈值的节点;最终回退为最低负载选择。每种策略在 PolicyRegistry 中按模型绑定。断连器实现三状态机(Closed/Open/HalfOpen),支持冷却时间与会话探测。
  4. KV 事件管道与缓存感知路由:在 experimental/sgl-router/src/policies/kv_events/ 下,wire.rs 解析 SGLang ZmqEventPublisher 的 msgpack 编码事件;subscriber.rs 为每个 (worker, dp_rank) 创建 ZMQ SUB 连接,将消息转为 WorkerEvent 并送入共享 channel;tree.rs 实现哈希键基数树,维护块哈希到持有节点的映射,支持匹配最长前缀;index.rsKvEventIndex 串联订阅者与树,处理 worker 生命周期。ActiveLoadRegistryactive_load.rs)提供 RAII 守卫和过时清理(janitor),为缓存感知策略提供双轴负载信号(预填 token 计数与解码块计数)。
  5. HTTP 代理与聊天处理experimental/sgl-router/src/proxy/mod.rs 实现基于 reqwest 的 HTTP 转发与 SSE 流泵(64 槽有界通道)。chat.rs 处理 /v1/chat/completions,支持非流式和流式,解析请求体中的 model 字段,通过 PolicyRegistry 选择策略、按模式获取候选节点(普通模式或 PD 隔离池),在 PD 模式下注入引导端口并生成 room ID,发起预填请求后触发解码请求并添加节点亲和性。流式响应保持 text/event-stream 类型。
  6. 观测性与运维/metrics 端点暴露 Prometheus 格式的关键指标(请求总数、重叠块数、活跃负载、过期请求、解码亲和性)。支持 JSON 日志格式(适合 K8s 日志聚合器)。实现优雅关闭 drain:安装信号处理器,在收到 SIGTERM 后停止接受新请求并等待正在执行的请求完成。Docker 镜像构建与 Helm chart(后续已移除)。测试布局分为单元/组件/代理/端到端,包括面向 K8s 的 kind 集成测试与 4×H200 真实 GPU 端到端验证。
文件 模块 状态 重要度
experimental/sgl-router/src/discovery/k8s.rs 发现层 added 9.17
experimental/sgl-router/src/policies/active_load.rs 负载跟踪 added 9.17
experimental/sgl-router/src/policies/cache_aware_zmq.rs 路由策略 added 9.17
experimental/sgl-router/src/policies/kv_events/tree.rs KV 索引 added 9.17
experimental/sgl-router/src/config/types.rs 配置定义 added 9.28
experimental/sgl-router/src/server/routes/chat.rs 代理处理 added 9.28

关键符号

classify_mode extract_workers process_events select is_imbalanced pick_min_load register sweep_stale spawn_janitor run_subscriber add_worker remove_worker insert match_prefix decode_event_batch chat_completions resolve fetch

关键源码片段

experimental/sgl-router/src/discovery/k8s.rs core-logic

K8s 发现后端核心实现,包含 client-side 角色分类、EndpointSlice 到 WorkerSpec 的转换逻辑。

// files: experimental/sgl-router/src/discovery/k8s.rs
// 根据 K8s 发现模式将 EndpointSlice 分类为 WorkerMode
fn classify_mode(es: &EndpointSlice, mode: &K8sDiscoveryMode) -> Option<WorkerMode> {
    match mode {
        K8sDiscoveryMode::Plain { .. } => Some(WorkerMode::Plain),
        K8sDiscoveryMode::PdDisaggregation {
            prefill_selector,
            decode_selector,
        } => {
            let labels = es.metadata.labels.as_ref().cloned().unwrap_or_default();
            if labels_match_selector(&labels, prefill_selector) {
                Some(WorkerMode::Prefill)
            } else if labels_match_selector(&labels, decode_selector) {
                Some(WorkerMode::Decode)
            } else {
                None // 两个选择器都不匹配,过滤掉此分片
            }
        }
    }
}// 匹配 K8s 标签集与逗号分隔的相等选择器(支持 `=` 和 `==`)
fn labels_match_selector(labels: &BTreeMap<String, String>, selector: &str) -> bool {
    for term in selector.split(',') {
        let term = term.trim();
        if term.is_empty() { continue; }
        let (key, expected) = if let Some((k, v)) = term.split_once("==") {
            (k.trim(), v.trim())
        } else if let Some((k, v)) = term.split_once('=') {
            (k.trim(), v.trim())
        } else {
            return false;
        };
        match labels.get(key) {
            Some(v) if v == expected => {}
            _ => return false,
        }
    }
    true
}
experimental/sgl-router/src/policies/active_load.rs core-logic

RAII 守卫和 janitor 机制,提供双轴活跃负载跟踪,是缓存感知策略的负载信号源。

// files: experimental/sgl-router/src/policies/active_load.rs
// 为每个在途请求创建的唯一标识符
#[derive(Clone, Eq, Hash, PartialEq, Debug)]
pub struct RequestId(pub Uuid);impl RequestId {
    pub fn new_v4() -> Self {
        Self(Uuid::new_v4())
    }
}// 每个工作节点的计数器:预填负载和解码负载分开跟踪
#[derive(Debug, Default)]
struct WorkerCounters {
    prefill_load: AtomicUsize,
    decode_load: AtomicUsize,
}// 在途请求的元数据,janitor 通过此结构找到过期请求并触发取消
#[derive(Debug)]
struct RequestEntry {
    worker: WorkerId,
    cancel: CancellationToken, // janitor 发送取消信号
    counters: Arc<WorkerCounters>, // 直接持有 Arc,防止 worker 移除后下溢
    registered_at: Instant, // 注册的时间戳
}
experimental/sgl-router/src/config/types.rs configuration

定义完整的配置 schema、策略枚举和默认值,是路由器的配置入口。

// files: experimental/sgl-router/src/config/types.rs
// 顶层配置结构,通过 serde 反序列化,支持 YAML/TOML 格式
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
    pub server: ServerConfig,
    #[serde(default)]
    pub observability: ObservabilityConfig,
    pub models: Vec<ModelConfig>,
    pub discovery: DiscoveryConfig,
    #[serde(default)]
    pub proxy: ProxyConfig,
    #[serde(default)]
    pub active_load: ActiveLoadConfig,
}// 代理超时配置,默认 300 秒,匹配 SGLang 预填 / 解码延迟预算
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ProxyConfig {
    #[serde(default = "default_proxy_request_timeout_secs")]
    pub request_timeout_secs: u64,
}
fn default_proxy_request_timeout_secs() -> u64 { 300 }// 活跃负载跟踪超时,默认 600 秒,高于 proxy 超时,确保 proxy 超时优先触发
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ActiveLoadConfig {
    #[serde(default = "default_stale_request_timeout_secs")]
    pub stale_request_timeout_secs: u64,
}
fn default_stale_request_timeout_secs() -> u64 { 600 }// 路由策略枚举,使用 `#[serde(rename_all = "snake_case")]` 拒绝未知值
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PolicyKind {
    #[default]
    RoundRobin,
    Random,
    PowerOfTwo,
    CacheAwareZmq, // 需要模型加载 tokenizer,依赖 cache_aware 配置
}

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

  • 新组件风险:整个 experimental/sgl-router/ 为全新代码,未经长期生产验证,可能存在未发现的 bug 或性能问题。
  • 版本兼容性:与 SGLang 的 ZMQ KV 事件发布格式紧耦合,若 SGLang 改变事件序列化格式(msgspec.msgpack)或端点结构,路由器需要同步更新。
  • 测试覆盖不足:虽包含大量单元测试和集成测试,但 K8s+PD 端到端测试在真实集群中未运行(仅 Rust 集成测试覆盖关键接缝)。真实 GPU 端到端仅验证了单路由器/单模型场景,多路由收敛和故障转移场景的测试覆盖有限。
  • 性能风险:哈希树的读写锁在写密集场景下可能成为瓶颈;缓存感知策略在请求量极大时可能增加决策延迟。
  • 安全风险:未提及身份认证或请求验证(仅 body 大小限制为 1 MiB),恶意请求可能导致资源泄漏或服务拒绝。
  • 用户影响:实验性功能,默认不启用,不影响现有 SGLang 用户。若将来合并并推广,可作为 model-gateway 的替代选项。
  • 系统影响:引入 Rust 编译依赖(Cargo 工作区),但仅限 experimental/sgl-router/ 目录,不影响主 Python 构建流程。CI 新增多个 tier 的工作流。
  • 团队影响:需要 Rust 相关的维护能力;文档和部署脚本需要同步更新。
新组件 实验性代码 需真实集群验证 性能基准待评估 K8s 端到端覆盖不全

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论