Prhub

#43779 [Rust Frontend] Support streaming `generate` endpoint

原始 PR 作者 Xunzhuo 合并时间 2026-06-02 03:30 文件变更 5 提交数 7 评论 11 代码增减 +563 / -15

执行摘要

Rust 前端 generate 端点支持 SSE 流式

之前 Rust 前端的 generate 端点拒绝 stream=true 参数,无法满足需要逐 token 输出的业务场景。为了实现与 Python vLLM 后端的行为一致,并为用户提供更即时的反馈体验,需要添加流式支持。

建议仔细阅读 generate.rs 中的 generate_chunk_stream 和 generate_sse_stream 实现,特别是 asynk_strim_attr 宏的使用、错误用道的设计以及与 completions 端点的相似性。这些实现在后续其他 Rust 路由中可复用。

讨论亮点

Review 中主要讨论了四个技术点:

  • prompt_tokens 适时捕获:Copilot 指出如果 prompt_info 不在第一个 chunk 出现,先前的 started 标记会导致 future 不再尝试获取。BugenZhao 建议使用 Option 简化,最终通过 'Capture late raw generate prompt info' 提交修复。
  • 空 token_ids chunk 跳过 finish_reason:Copilot 发现当 chunk 仅有 finish_reason 而 token_ids 为空时会丢失 finish_reason。已通过 'Emit raw generate finish chunks' 提交处理。
  • FinishReason::Error 应转 SSE 错误:Codex 建议将引擎错误转为 SSE 错误事件,而非正常 chunk。BugenZhao 引用 completions 路由的实现,Xunzhuo 添加了错误匹配并返回 bail_server_error,BugenZhao 随后确认已覆盖。
  • 连续 usage 门控:Codex 指出连续 usage 统计不应在未设置 include_usage 时启用。Xunzhuo 在 convert.rs 中实现了门控逻辑。

实现拆解

  1. 类型扩展(types.rs):新增 GenerateResponseStreamChoice 和 GenerateStreamResponse 结构体,映射 Python 侧流式响应;为 GenerateRequest 添加 stream_options 字段。
  2. 请求转换(convert.rs):在 PreparedRequest 中增加 stream、include_usage、include_continuous_usage 字段,并在 prepare_generate_request 中从请求解析,同时实现连续 usage 门控(continuous_usage 仅当 include_usage 为 true 时启用)。
  3. 验证调整(validate.rs):移除 validate_request_compat 中对 stream=true 的拒绝,改为接受流式请求;添加 stream_options 必须配合 stream=true 的一致性检查。
  4. 核心流式逻辑(generate.rs):在 generate 函数中根据 stream 标志分流:流式则调用 generate_chunk_stream 将引擎输出逐 chunk 转换为内部 GenerateStreamResponse,再由 generate_sse_stream 转换为 SSE 事件,最终使用 axum Sse 响应返回;非流式保留原有 collect 逻辑。
  5. 集成测试(tests.rs):新增多个测试覆盖正常流式、usage 包含与连续、空 finish chunk、引擎错误响应等场景。
文件 模块 状态 重要度
rust/src/server/src/routes/tests.rs 测试 modified 8.49
rust/src/server/src/routes/inference/generate.rs 路由 modified 8.81
rust/src/server/src/routes/inference/generate/validate.rs 验证 modified 7.31
rust/src/server/src/routes/inference/generate/convert.rs 转换 modified 6.61
rust/src/server/src/routes/inference/generate/types.rs 类型 modified 5.6

关键符号

generate generate_chunk_stream generate_sse_stream validate_request_compat prepare_generate_request

关键源码片段

rust/src/server/src/routes/inference/generate/validate.rs core-logic

调整请求验证逻辑,移除 stream=true 的拒绝,添加 stream_options 与 stream 不一致的验证。

以下展示了 validate_request_compat 函数的完整实现及其核心变更点:

// 验证函数:在生成前校验请求的合法性。
// 主要变化:删除对 stream=true 的硬拒绝,改为接受流式请求;
// 同时如果设置了 stream_options 但没有同时设置 stream=true,则返回错误。
pub(super) fn validate_request_compat(
    request: &GenerateRequest,
    served_model_names: &[String],
) -> Result<(), ApiError> {
    // 1. 模型名校验
    if let Some(model) = request.model.as_ref()
        && !served_model_names.iter().any(|n| n == model)
    {
        return Err(ApiError::model_not_found(model.clone()));
    }    // 2. stream_options 只有在 stream=true 时才允许
    if request.stream_options.is_some() && !request.stream {
        bail_invalid_request!(
            param = "stream_options",
            "stream_options are only supported when stream=true."
        );
    }    // 3. token_ids 不能为空
    if request.token_ids.is_empty() {
        bail_invalid_request!(
            param = "token_ids",
            "token_ids must contain at least one token ID."
        );
    }    // 4. max_tokens 必须大于 0
    if request.sampling_params.max_tokens == Some(0) {
        bail_invalid_request!(
            param = "sampling_params",
            "max_tokens must be greater than 0."
        );
    }    // 5. prompt_logprobs 合法性(非负或 -1)
    if let Some(prompt_logprobs) = request.sampling_params.prompt_logprobs
        && prompt_logprobs < 0
        && prompt_logprobs != -1
    {
        bail_invalid_request!(
            param = "sampling_params",
            "`prompt_logprobs` must be a non-negative value or -1."
        );
    }    Ok(())
}

这一函数是请求进入路由后的第一道关卡,正确实现可确保不合法的流式请求尽早被拒绝。

评论区精华

prompt_tokens 在 prompt_info 延迟出现时的正确性 正确性

Copilot 指出如果 prompt_info 在第一个 chunk 中缺失,则 prompt_tokens 保持 0 且 started 标记设置后不再更新,导致 usage 统计不准确。BugenZhao 建议将 prompt_tokens 改为 Option 以简化逻辑,无需维护 started 标记。

结论:通过提交 'Capture late raw generate prompt info' 修复,将 prompt_tokens 设置为 Option,并在每个 chunk 中尝试更新,直到获取到 prompt_info 为止。 · 已解决

空 token_ids chunk 跳过导致 finish_reason 丢失 正确性

Copilot 注意到当输出 chunk 的 token_ids 为空(如仅包含 finish_reason)时,会被跳过而不 emit,导致客户端无法收到 finish_reason。

结论:已通过 'Emit raw generate finish chunks' 提交修复,在 token_ids 为空但 finish_reason 存在时仍然 emit chunk。 · 已解决

FinishReason::Error 响应应转为 SSE 错误事件 设计

Codex 建议将引擎返回的 FinishReason::Error 转换为 SSE 错误事件,而非作为正常 chunk 返回。BugenZhao 引用 completions 路由的实现作为参考。Xunzhuo 表示已处理。

结论:通过添加对 FinishReason::Error 的匹配,在遇到错误时使用 bail_server_error 返回 SSE 错误事件。BugenZhao 最终确认已覆盖。 · 已解决

连续 usage 统计应依赖 include_usage 开关 正确性

Codex 指出当只设置 continuous_usage_stats 而未设置 include_usage 时,不应启用连续 usage,否则与 Python vLLM 的语义不符。

结论:已在 prepare_generate_request 中实现门控:include_continuous_usage 仅当 include_usage 为 true 时才会被设置为 true。 · 已解决

风险与影响

  • 流式错误处理一致性风险(generate.rs):如果出现罕见的引擎内部错误,但未被正确转为 SSE 错误事件,可能导致客户端误解。当前已通过 FinishReason::Error 分支处理,但仍需关注其他异常路径。
  • 连续 usage 统计性能开销(generate.rs):当启用 included_continuous_usage 时,每个 chunk 都携带 usage 对象,可能增加响应大小和序列化开销。预计可接受,但需要大规模验证。
  • 测试覆盖边界不足(tests.rs):当前测试主要覆盖 happy path,对引擎中途失败、重连等异常场景覆盖不足,可能留下回归风险。

用户:可以使用 stream=true 参数获取逐 token 流式生成,配合 stream_options 控制 usage 统计,体验与 Python vLLM 保持一致。
系统:增加了一种响应模式,但流式处理对异步运行时和 SSE 框架的稳定性要求更高,需确保错误处理完善。
团队:扩展了 Rust 前端的功能集,为后续其他端点添加流式支持提供了参考实现。

流式错误处理一致风险 连续 usage 性能开销 测试覆盖边界不足

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论