Prhub

#43429 [rust] fix: aggregate `is_sleeping` and `reset_prefix_cache` across DP engines

原始 PR 作者 willamhou 合并时间 2026-05-28 22:56 文件变更 3 提交数 4 评论 10 代码增减 +257 / -5

执行摘要

修复 DP 引擎结果仅取首个的 Bug,增加聚合检查

在数据并行(DP)场景下,不同引擎的睡眠状态或缓存重置结果可能不一致,但原有实现仅返回第一个引擎的返回值(TODO: we only return the result of the first engine here.),导致状态偏差被静默忽略。PR #199 的讨论及上游 Python 实现分析表明,所有引擎应状态一致,不一致应视为控制面 bug。该 PR 旨在显式暴露问题,提升系统可靠性。

该 PR 值得精读,特别是其系统性地将“只取第一个”的不安全模式改为显式聚合检查,并通过错误类型丰富诊断信息。对于分布式系统中一致性和错误可见性的设计有借鉴意义。建议阅读 client.rs 中的 is_sleepingreset_prefix_cache 实现,以及测试中的 mock 模式。

讨论亮点
  1. 索引安全:gemini-code-assist 指出原实现的 results[0] 在空 vector 时 panic,建议改用 .first().ok_or_else(|| ...)。贡献者采纳并修复。
  2. 测试稳定性:BugenZhao 质疑测试中的 sleep(50ms) 必要性,认为应去除非确定性同步。后续提交移除了 sleep,并通过 BTreeMap 握手保证引擎顺序确定性。
  3. 与 Python 实现对比:njhill 指出此改动与 Python 端行为不同(Python 未做一致性检查),但确认 Rust 端可更严格。
  4. DCO 签注:njhill 在 approve 时要求 commit 签名,已按要求完成。

实现拆解

  1. 修改 client.rs 中的 is_sleeping 方法:将返回 Result<bool>self.call_utility(...).await?[0] 改为收集所有引擎的 Vec<bool>,检查是否全部一致。若为空或存在不一致,返回新错误 Error::InconsistentUtilityResults
  2. 修改 client.rs 中的 reset_prefix_cache 方法:同样收集 Vec<bool>,若为空返回错误,否则返回 AND 聚合结果(.all(|ok| ok))。
  3. error.rs 中添加错误变体 InconsistentUtilityResults { method, values },提供结构化错误上下文。
  4. tests/client.rs 中新增辅助函数 spawn_mock_utility_engine 和三个新测试用例,分别验证引擎不一致时返回错误、引擎一致时返回值、以及 reset 成功/失败场景。
  5. 根据 review 反馈去掉测试中的非确定性 sleep,使用 .first() 替代索引以安全处理空结果。
  6. 所有改动集中在 rust/src/engine-core-client/ 目录,确保仅影响 Rust 前端客户端。
文件 模块 状态 重要度
rust/src/engine-core-client/src/client.rs Rust 客户端 modified 6.86
rust/src/engine-core-client/src/error.rs 错误类型 modified 4.13
rust/src/engine-core-client/src/tests/client.rs 客户端测试 modified 7.12

关键符号

is_sleeping reset_prefix_cache spawn_mock_utility_engine is_sleeping_returns_error_when_engines_disagree is_sleeping_returns_value_when_all_engines_agree reset_prefix_cache_returns_true_when_all_engines_succeed reset_prefix_cache_returns_false_when_any_engine_fails

关键源码片段

rust/src/engine-core-client/src/client.rs core-logic

核心逻辑变更,所有引擎结果聚合

/// Return whether the engine is currently sleeping at any level.
///
/// Under data parallel, all engines should agree on the sleep state: a
/// divergence signals a control-plane bug. Returns
/// `Error::InconsistentUtilityResults` if engines disagree.
pub async fn is_sleeping(&self) -> Result<bool> {
    // 调用 utility 方法并收集所有引擎的 bool 结果
    let results: Vec<bool> = self.call_utility("is_sleeping", ()).await?;
    // 安全获取第一个结果,空向量时返回 InconsistentUtilityResults
    let first = *results.first().ok_or_else(|| Error::InconsistentUtilityResults {
        method: "is_sleeping".to_string(),
        values: "[]".to_string(),
    })?;
    // 检查所有引擎是否一致
    if results.iter().all(|&v| v == first) {
        Ok(first)
    } else {
        Err(Error::InconsistentUtilityResults {
            method: "is_sleeping".to_string(),
            values: format!("{results:?}"),
        })
    }
}/// Reset the prefix cache and optionally the external connector cache.
///
/// Under data parallel, returns `true` only when every engine confirms the
/// reset (AND aggregation).
pub async fn reset_prefix_cache(
    &self,
    reset_running_requests: bool,
    reset_connector: bool,
) -> Result<bool> {
    let results: Vec<bool> = self
        .call_utility(
            "reset_prefix_cache",
            (reset_running_requests, reset_connector),
        )
        .await?;
    // 空结果返回错误
    if results.is_empty() {
        return Err(Error::InconsistentUtilityResults {
            method: "reset_prefix_cache".to_string(),
            values: "[]".to_string(),
        });
    }
    // AND 聚合:所有引擎必须成功
    Ok(results.into_iter().all(|ok| ok))
}

评论区精华

索引安全:避免 results[0] 直接索引导致 panic 正确性

gemini-code-assist 指出若 results 为空则 panic,建议使用 .first()

结论:作者在后续 commit 中改用 .first().ok_or_else(|| ...) 并返回 InconsistentUtilityResults 错误 · 已解决

测试中 sleep 导致时序非确定性 测试

BugenZhao 质疑 sleep(50ms) 的必要性,认为应使用确定性同步

结论:作者移除 sleep,并确认握手循环已保证引擎顺序确定性 · 已解决

与 Python 端行为差异 question

njhill 询问是否故意与 Python 区分(Python 只取第一个结果)

结论:作者解释这是有意改进,已在前端仓库 issue 中讨论过,得到 maintainer 认可 · 已解决

风险与影响

  1. 新错误类型:调用 is_sleeping 的代码需处理新增的 InconsistentUtilityResults 错误变体,未更新的下游可能因未匹配该变体而 panic 或忽略。Rust 前端需同步更新错误处理。
  2. 空结果保护:虽然启动握手保证 engine_count >= 1,但如果未来修改绕过该检查,first() 保护会返回错误而非 panic,但分布式系统的不变性可能被破坏。
  3. 性能影响:收集所有引擎结果而非仅取第一个,带来额外内存分配和比较,但 DP 引擎数量通常很小(2-4),可忽略。
  4. 向后兼容性:逻辑行为改变,之前能通过的静默不一致现在会报错,可能导致现有部署在升级后出现新错误。建议在 release notes 中说明。

范围:仅限于 Rust 前端客户端(rust/src/engine-core-client/),不影响 Python 核心或其他语言绑定。
用户:使用 Rust 前端及 DP 功能的用户将受益于更准确的状态检测,但需适配新错误变体。
系统:提高了 DP 模式下的结果一致性和可观测性,避免静默状态偏差。
团队:维护者需关注 Rust 前端与 Python 后端的行为差异(此处故意更严格)。

新错误类型需调用者更新错误处理 空结果保护依赖启动不变量 行为不能向后兼容可能引发新错误

关联 Issue

#199 fix: aggregate `is_sleeping` and `reset_prefix_cache` across DP engines

完整报告

参与讨论