Prhub

#44591 [Rust Frontend] Batch auto-abort requests by engine

原始 PR 作者 HueCodes 合并时间 2026-06-05 17:59 文件变更 2 提交数 2 评论 3 代码增减 +118 / -25

执行摘要

Rust 前端批量合并自动中止请求,减少 IPC 往返

当大量流同时丢弃(如高并发下客户端断开),原实现每个请求单独发送Abort消息,导致多次IPC往返。本PR通过按引擎批量聚合Abort请求,减少往返次数。

本PR展示了如何用recv_many实现批量消费,并保持原有非活跃过滤和日志行为,是异步消息批量化处理的典范。值得有类似需求的工程师阅读。

讨论亮点

无实质讨论。BugenZhao审核后直接批准(LGTM. Thanks!)。

实现拆解

  1. 移除单次接收循环,改用批量收集:在rust/src/engine-core-client/src/client/imp.rsrun_abort_loop函数中,将while let Some(...)逐个接收改为使用tokio::sync::mpsc::UnboundedReceiver::recv_many一次性接收最多1024个请求到本地batch向量。
  2. 按引擎分组并过滤不活跃请求:遍历batch,对每个请求通过inner.take_auto_abort_target判断是否仍活跃;活跃的请求按engine_id分组到BTreeMap<EngineId, Vec<String>>中。
  3. 每引擎一次发送:遍历分组后的Map,对每组调用inner.do_abort_requests,传入所有该引擎的request_ids,代替原来的单次调用。
  4. 清理依赖:移除了不再需要的use std::slice导入。
  5. 回归测试:在tests/client.rs中新增dropping_multiple_live_streams_aborts_all_in_a_burst,模拟三个流同时drop,验证engine端只收到一条Abort消息且包含全部三个ID。
文件 模块 状态 重要度
rust/src/engine-core-client/src/client/imp.rs 引擎客户端 modified 7.33
rust/src/engine-core-client/src/tests/client.rs 测试 modified 6.37

关键符号

run_abort_loop dropping_multiple_live_streams_aborts_all_in_a_burst

关键源码片段

rust/src/engine-core-client/src/client/imp.rs core-logic

实现了批量自动中止的核心逻辑

/// Background loop that listens for request IDs to abort and sends abort
/// messages to the engine. This is used to implement the auto-abort behavior
/// when a request stream is dropped without being properly terminated.
pub(crate) async fn run_abort_loop(
    inner: Arc<ClientInner>,
    mut abort_rx: mpsc::UnboundedReceiver<AbortRequest>,
) {
    // Coalesce bursts of auto-aborts into a single Abort message per engine.
    // A dropped-stream storm (e.g. many clients disconnecting at once under
    // high concurrency) would otherwise issue one engine round-trip per
    // request. `recv_many` returns as soon as at least one item is ready, so a
    // lone abort is still forwarded promptly.
    // 中文注释:批量收集自动中止请求,按引擎合并,减少 IPC 往返
    const MAX_DRAIN: usize = 1024;
    let mut batch: Vec<AbortRequest> = Vec::new();    while abort_rx.recv_many(&mut batch, MAX_DRAIN).await > 0 {
        // 按引擎 ID 分组,每个引擎累积一个 request ID 列表
        let mut by_engine: BTreeMap<EngineId, Vec<String>> = BTreeMap::new();        for AbortRequest { request_id, cause } in batch.drain(..) {
            let Some(engine_id) = inner.take_auto_abort_target(&request_id) else {
                debug!(request_id, "skip auto-abort for inactive request");
                continue;
            };            match cause {
                AbortCause::DroppedStream => {
                    info!(request_id, "auto-aborting request due to dropped stream")
                }
                AbortCause::StopStringMatched => {
                    debug!(
                        request_id,
                        "auto-aborting request due to stop string matched"
                    )
                }
            }            by_engine.entry(engine_id).or_default().push(request_id);
        }        // 每个引擎发送一次批量 Abort 消息
        for (engine_id, request_ids) in by_engine {
            if let Err(error) = inner.do_abort_requests(&engine_id, &request_ids).await {
                warn!(
                    ?engine_id,
                    ?request_ids,
                    error = %error.as_report(),
                    "failed to auto-abort request streams"
                );
            }
        }
    }
}
rust/src/engine-core-client/src/tests/client.rs test-coverage

新增集成测试验证批量自动中止

#[tokio::test]
async fn dropping_multiple_live_streams_aborts_all_in_a_burst() {
    init_tracing();
    let ipc = IpcNamespace::new().unwrap();
    let handshake_address = ipc.handshake_endpoint();
    let engine_id = b"engine-burst".to_vec();
    let request_ids = ["req-1", "req-2", "req-3"];    // 启动 mock engine,先接收三个 Add 请求,然后发送三个输出,
    // 最后验证只收到一条 Abort 消息且包含三个 ID
    let (shutdown_tx, engine_task) = spawn_mock_engine_task(
        handshake_address.clone(),
        engine_id.clone(),
        |dealer, push| {
            Box::pin(async move {
                // 接收三个添加请求
                for _ in 0..3 {
                    let add = recv_engine_message(dealer).await;
                    assert_eq!(add[0].as_ref(), &[0x00]);
                }
                send_outputs(
                    push,
                    EngineCoreOutputs {
                        outputs: vec![
                            request_output("req-1", vec![99], None),
                            request_output("req-2", vec![99], None),
                            request_output("req-3", vec![99], None),
                        ],
                        ..Default::default()
                    },
                )
                .await;                // 期望只收到一条批量 Abort 消息
                let abort =
                    timeout(Duration::from_secs(1), recv_engine_message(dealer)).await.unwrap();
                assert_eq!(abort[0].as_ref(), &[0x01]);
                let ids: Vec<String> = rmp_serde::from_slice(&abort[1]).unwrap();
                assert_eq!(
                    ids,
                    vec![
                        "req-1".to_string(),
                        "req-2".to_string(),
                        "req-3".to_string()
                    ]
                );
                // 确认后续没有额外 Abort 消息
                assert!(
                    timeout(Duration::from_millis(100), recv_engine_message(dealer)).await.is_err()
                );
            })
        },
    );    let client = connect_client_with_ipc(
        handshake_test_config(
            handshake_address,
            1,
            "test-model",
            Duration::from_secs(2),
            0,
            None,
        ),
        &ipc,
    )
    .await;    // 先打开所有请求流,让 engine 收到所有 Add
    let mut streams = Vec::new();
    for id in request_ids {
        streams.push(client.call(sample_request_with_id(id)).await.unwrap());
    }
    // 消费第一个 token
    for stream in streams.iter_mut() {
        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
        assert_eq!(first.new_token_ids, vec![99]);
    }
    // 同时丢弃所有流,触发批量自动中止
    drop(streams);    let _ = shutdown_tx.send(());
    engine_task.await.unwrap();
    client.shutdown().await.unwrap();
}

评论区精华

没有提炼出高价值讨论线程

当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。

风险与影响

变更集中在自动中止循环内,影响面小。主要风险:recv_manyMAX_DRAIN设为1024,若单引擎待处理请求超过1024,会分多批发送,但功能不变,仅延迟略增。do_abort_requests失败时日志记录所有request_ids,但行为与原单次失败一致。无安全、性能退化风险。

直接影响Rust前端的自动中止功能,减少引擎通信次数。对于大量流同时丢弃的场景(如负载均衡器重连),IPC负载显著降低。对单个流丢弃无影响,延迟略有降低。对系统其他部分无影响。

IPC 优化 低风险变更

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

参与讨论