Prhub

#19524 feature: upstream cancel

原始 PR 作者 dougyster 合并时间 2026-05-18 16:20 文件变更 13 提交数 11 评论 8 代码增减 +4209 / -212

执行摘要

客户端断连后自动取消上游 stream 请求

PR body 指出:"When a client disconnects mid-stream, the router now detects the closed connection via tx.closed() in a tokio::select! loop and drops the upstream HTTP connection immediately. This signals the engine to abort generation, freeing GPU compute that would be wasted otherwise." 此前存在的浪费在于,即使客户端断开,上游仍会继续生成 token,浪费大量 GPU 算力。

该 PR 值得所有 sgl-model-gateway 的维护者和使用者精读。特别是 BreakerTrackedStream 的设计(利用 Drop 在流结束时确定断路器状态)以及断路器记录时机的调整,是一个优雅地分离 HTTP 响应状态和实际流传输状态的模式。review 中关于性能优化和避免双重计数的讨论也值得学习。建议部署前做 Canary 测试,验证断路器行为符合预期。

讨论亮点
  • 性能关键路径改革:alexnails 指出每个 SSE 块通过 format! 拼接

会额外分配 String,建议优化。PR 新增 sse_block_to_bytes 函数,利用 BytesMut 预分配并直接追加,避免中间 String 分配。

  • 额外 task 争议:alexnails 认为纯透传 streaming 可以避免额外 spawn 和 unbounded_channel,直接返回 res.bytes_stream() 即可减少调度开销。PR 保留现有架构,但后续可通过统一 helper 减少重复代码。
  • 断路器双重计数:alexnails 在 pd_router.rs 和 openai/router.rs 都询问 "double accounting"。PR 通过调整条件:仅非 streaming 记录初始成功,streaming 由包装器在 Drop 时记录;同时修复了请求发送失败时忘记记录失败的问题。该修复在 commit d538879 中体现。
  • 测试确定性建议:alexnails 建议使上游取消测试确定性,但测试目前依赖系统事件行为,可能偶发。未完全解决,但覆盖率达标。

实现拆解

  1. 新增 BreakerTrackedStream 结构体(sgl-model-gateway/src/routers/streaming_utils.rs),作为 Stream 适配器,内部维护 Terminal 状态枚举(Active/Completed/Errored)。在 poll_next 中传递正常数据,遇到错误或流结束时更新状态。在 Drop 中根据终端状态调用 worker.record_success() 或 worker.record_failure(),并在丢弃时不做任何记录(代表客户端断连,worker 无辜)。还提供 mark_completed 和 mark_errored 方法供路由器提前标记。
  2. 修改四个路由器的 streaming 转发路径(http/router.rs、pd_router.rs、openai/router.rs、openai/responses/streaming.rs),将原本直接 stream.next().await 改为 tokio::select! 良性竞争 stream.next() 与 tx.closed()。当客户端断开导致 tx.closed() 就绪时,立即跳出循环并丢弃上游 body,reqwest 底层连接因此关闭,引擎收到断开信号中止生成。注意在转发失败时(发送请求阶段),也要记录断路器失败,因为此时尚无 BreakerTrackedStream 包装。
  3. 调整断路器(circuit breaker)的记录时机。在 http/router.rs 和 pd_router.rs 中,对于 streaming 请求,不再在获得初始 200 响应时立即调用 record_success(),而是改由 BreakerTrackedStream 在流真正结束(或出错)时决定记录成功或失败。非 streaming 请求仍然在状态码阶段记录。同时修复了 prefill/decode 断路器的归属问题,避免 decode 的传输故障被错误地归因到 prefill。
  4. 添加测试基础设施和测试用例。扩展 mock_worker.rs,支持配置慢速流(set_slow_stream_chunks)、错误注入(set_stream_error_after_chunks)和可配置的失败状态码(set_fail_status_code)。新增 upstream_cancel_test.rs 集成测试文件,覆盖客户端断开取消、正常完成、非 streaming 正确性、错误恢复等场景。
  5. 添加微基准测试 streaming_utils_bench.rs,使用 Criterion 测量 BreakerTrackedStream 的每 chunk 开销,与裸 stream 迭代对比,确保包装器无显著性能退化。A/B 测试结果(由 @Kangyan-Zhou 提供)显示无 measurable 回归。
  6. 更新 Cargo.toml 添加 dev 依赖,并在 .pre-commit-config.yaml 中添加 rustfmt-sgl-model-gateway 钩子,确保 Rust 代码格式一致。
文件 模块 状态 重要度
sgl-model-gateway/src/routers/streaming_utils.rs 流处理 added 9.36
sgl-model-gateway/src/routers/http/pd_router.rs PD 路由 modified 7.73
sgl-model-gateway/src/routers/http/router.rs HTTP 路由 modified 7.3
sgl-model-gateway/benches/streaming_utils_bench.rs 性能基准 added 8.95
sgl-model-gateway/tests/common/mock_worker.rs 测试模拟 modified 7.73
sgl-model-gateway/tests/reliability/upstream_cancel_test.rs 集成测试 added 6.44

关键符号

BreakerTrackedStream::new BreakerTrackedStream::poll_next BreakerTrackedStream::drop BreakerTrackedStream::mark_completed BreakerTrackedStream::mark_errored sse_block_to_bytes fail_status_code set_slow_stream_chunks set_stream_error_after_chunks bench_tracked_clean bench_tracked_mark_completed_drop

关键源码片段

sgl-model-gateway/src/routers/streaming_utils.rs core-logic

核心新增文件,定义 BreakerTrackedStream 结构,封装 upstream stream 的终止状态,并在 Drop 时安全地决定断路器状态。是所有路由器客户端断开取消的基础。

/// 三态状态机,表示流终止的方式。
enum Terminal {
    Active, // 仍在飞行或丢弃前未终止
    Completed, // 流正常结束或调用方标记完成
    Errored, // 流产生错误
}/// 包装一个 `Stream<Item = Result<Bytes, E>>`,使得断路器只在 Drop 时更新。
pub struct BreakerTrackedStream<E = reqwest::Error> {
    inner: Pin<Box<dyn Stream<Item = Result<Bytes, E>> + Send + 'static>>,
    worker: Arc<dyn Worker>,
    log_url: String,
    terminal: Terminal,
}impl<E> BreakerTrackedStream<E> {
    pub fn new<S>(inner: S, worker: Arc<dyn Worker>, log_url: String) -> Self {
        Self {
            inner: Box::pin(inner),
            worker,
            log_url,
            terminal: Terminal::Active,
        }
    }    /// 标记流为正常完成。用于路由器通过带内哨兵(如 `data: [DONE]`)提前发现流结束。
    pub fn mark_completed(&mut self) {
        if self.terminal == Terminal::Active {
            self.terminal = Terminal::Completed;
        }
    }    /// 预标记流为错误。用于包装已包含失败结果的内容(如非 2xx 响应体)。
    pub fn mark_errored(&mut self) {
        self.terminal = Terminal::Errored;
    }
}impl<E: Display> Stream for BreakerTrackedStream<E> {
    type Item = Result<Bytes, E>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.inner.as_mut().poll_next(cx) {
            Poll::Ready(Some(Ok(b))) => Poll::Ready(Some(Ok(b))),
            Poll::Ready(Some(Err(e))) => {
                tracing::error!("Upstream stream error from worker {}: {}", self.log_url, e);
                self.terminal = Terminal::Errored;
                Poll::Ready(Some(Err(e)))
            }
            Poll::Ready(None) => {
                if self.terminal == Terminal::Active {
                    self.terminal = Terminal::Completed;
                }
                Poll::Ready(None)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}impl<E> Drop for BreakerTrackedStream<E> {
    fn drop(&mut self) {
        match self.terminal {
            Terminal::Completed => self.worker.record_success(),
            Terminal::Errored => self.worker.record_failure(),
            Terminal::Active => { /* client disconnected, do nothing */ }
        }
    }
}

评论区精华

SSE 块分配优化 性能

alexnails 指出每个 SSE 块通过 format! 拼接 \n\n 会额外分配 String,建议优化。

结论:PR 新增 sse_block_to_bytes 函数,使用 BytesMut 预分配并直接追加,避免中间 String 分配,从而减少每 token 的开销。 · 已解决

断路器双重计数问题 正确性

alexnails 在 pd_router.rs 和 openai/router.rs 都询问 'double accounting',担心断路器记录逻辑导致成功 / 失败被重复计数或错误归属。

结论:PR 通过调整记录条件:仅非 streaming 请求在初始状态码时记录成功,streaming 请求改由 BreakerTrackedStream 在 Drop 时根据实际流终止状态记录,从而避免重复计数和掩盖 200-then-broken 场景。该修复在 commit d538879 中体现。 · 已解决

测试确定性 测试

alexnails 建议使上游取消测试确定性,避免随机失败。

结论:未完全解决,测试中仍存在部分系统时间依赖,但总体覆盖率达标。 · 待处理

额外任务与通道开销 性能

alexnails 建议纯透传流可以直接返回 res.bytes_stream(),避免额外 spawn 和 unbounded channel,减少调度开销。

结论:PR 保留现有 select! 架构,未移除额外 task。但未来的统一 helper 可以解决此问题。 · not resolved

风险与影响

  • 回归风险:更改了断路器记录逻辑,可能影响非 streaming 请求的错误计数,但经过测试覆盖和 A/B 验证无回归。
  • 性能风险:BreakerTrackedStream 每 chunk 增加一次状态检查和 Drop 逻辑,但 bench 显示开销极低(~2.6% 在 generate_request 中)。tokio::select! 在热路径中可能轻微增加延迟,但 A/B 测试无显著差异。
  • 并发安全:BreakerTrackedStream 内部状态仅在 poll_next 和 Drop 中修改,Drop 与 poll_next 可能发生竞态?但设计上 Drop 是在拥有该流的任务中唯一执行,不存在并发问题。mark_completed 和 mark_errored 在 &mut self 上调用,需外部同步。目前使用场景中调用发生在同一任务中,安全。
  • 兼容性:仅影响 streaming 请求的处理逻辑,非 streaming 完全不变。API 契约不变(HTTP 响应状态码、头部等不变)。向前兼容。
  • 用户视角:客户端断连后,GPU 资源立即可用,减少浪费,提高集群利用率。对于使用 gateway 的流式推理场景有直接收益。
  • 系统视角:Gateway 能够及时向引擎传递断开信号(通过连接关闭),引擎(sglang server)能更快终止生成,释放显存和计算资源。
  • 团队维护:引入新组件 BreakerTrackedStream 和 Terminal 状态机,增加了一定复杂度,但模块隔离良好,且有详尽文档和单元测试覆盖,维护成本可控。
  • 影响范围:所有经过 sgl-model-gateway 的 streaming 请求(/v1/chat/completions stream、PD 路由等)。非 streaming 和其他后端不受影响。
streaming 路径变更 断路器记录逻辑调整 性能热路径 并发安全考虑

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论