执行摘要
Rust 前端批量合并自动中止请求,减少 IPC 往返
当大量流同时丢弃(如高并发下客户端断开),原实现每个请求单独发送Abort消息,导致多次IPC往返。本PR通过按引擎批量聚合Abort请求,减少往返次数。
本PR展示了如何用recv_many实现批量消费,并保持原有非活跃过滤和日志行为,是异步消息批量化处理的典范。值得有类似需求的工程师阅读。
无实质讨论。BugenZhao审核后直接批准(LGTM. Thanks!)。
当大量流同时丢弃(如高并发下客户端断开),原实现每个请求单独发送Abort消息,导致多次IPC往返。本PR通过按引擎批量聚合Abort请求,减少往返次数。
本PR展示了如何用recv_many实现批量消费,并保持原有非活跃过滤和日志行为,是异步消息批量化处理的典范。值得有类似需求的工程师阅读。
无实质讨论。BugenZhao审核后直接批准(LGTM. Thanks!)。
rust/src/engine-core-client/src/client/imp.rs的run_abort_loop函数中,将while let Some(...)逐个接收改为使用tokio::sync::mpsc::UnboundedReceiver::recv_many一次性接收最多1024个请求到本地batch向量。batch,对每个请求通过inner.take_auto_abort_target判断是否仍活跃;活跃的请求按engine_id分组到BTreeMap<EngineId, Vec<String>>中。inner.do_abort_requests,传入所有该引擎的request_ids,代替原来的单次调用。use std::slice导入。tests/client.rs中新增dropping_multiple_live_streams_aborts_all_in_a_burst,模拟三个流同时drop,验证engine端只收到一条Abort消息且包含全部三个ID。| 文件 | 模块 | 状态 | 重要度 |
|---|---|---|---|
rust/src/engine-core-client/src/client/imp.rs |
引擎客户端 | modified | 7.33 |
rust/src/engine-core-client/src/tests/client.rs |
测试 | modified | 6.37 |
rust/src/engine-core-client/src/client/imp.rs
core-logic
实现了批量自动中止的核心逻辑
/// Background loop that listens for request IDs to abort and sends abort
/// messages to the engine. This is used to implement the auto-abort behavior
/// when a request stream is dropped without being properly terminated.
pub(crate) async fn run_abort_loop(
inner: Arc<ClientInner>,
mut abort_rx: mpsc::UnboundedReceiver<AbortRequest>,
) {
// Coalesce bursts of auto-aborts into a single Abort message per engine.
// A dropped-stream storm (e.g. many clients disconnecting at once under
// high concurrency) would otherwise issue one engine round-trip per
// request. `recv_many` returns as soon as at least one item is ready, so a
// lone abort is still forwarded promptly.
// 中文注释:批量收集自动中止请求,按引擎合并,减少 IPC 往返
const MAX_DRAIN: usize = 1024;
let mut batch: Vec<AbortRequest> = Vec::new();
while abort_rx.recv_many(&mut batch, MAX_DRAIN).await > 0 {
// 按引擎 ID 分组,每个引擎累积一个 request ID 列表
let mut by_engine: BTreeMap<EngineId, Vec<String>> = BTreeMap::new();
for AbortRequest { request_id, cause } in batch.drain(..) {
let Some(engine_id) = inner.take_auto_abort_target(&request_id) else {
debug!(request_id, "skip auto-abort for inactive request");
continue;
};
match cause {
AbortCause::DroppedStream => {
info!(request_id, "auto-aborting request due to dropped stream")
}
AbortCause::StopStringMatched => {
debug!(
request_id,
"auto-aborting request due to stop string matched"
)
}
}
by_engine.entry(engine_id).or_default().push(request_id);
}
// 每个引擎发送一次批量 Abort 消息
for (engine_id, request_ids) in by_engine {
if let Err(error) = inner.do_abort_requests(&engine_id, &request_ids).await {
warn!(
?engine_id,
?request_ids,
error = %error.as_report(),
"failed to auto-abort request streams"
);
}
}
}
}
rust/src/engine-core-client/src/tests/client.rs
test-coverage
新增集成测试验证批量自动中止
#[tokio::test]
async fn dropping_multiple_live_streams_aborts_all_in_a_burst() {
init_tracing();
let ipc = IpcNamespace::new().unwrap();
let handshake_address = ipc.handshake_endpoint();
let engine_id = b"engine-burst".to_vec();
let request_ids = ["req-1", "req-2", "req-3"];
// 启动 mock engine,先接收三个 Add 请求,然后发送三个输出,
// 最后验证只收到一条 Abort 消息且包含三个 ID
let (shutdown_tx, engine_task) = spawn_mock_engine_task(
handshake_address.clone(),
engine_id.clone(),
|dealer, push| {
Box::pin(async move {
// 接收三个添加请求
for _ in 0..3 {
let add = recv_engine_message(dealer).await;
assert_eq!(add[0].as_ref(), &[0x00]);
}
send_outputs(
push,
EngineCoreOutputs {
outputs: vec![
request_output("req-1", vec![99], None),
request_output("req-2", vec![99], None),
request_output("req-3", vec![99], None),
],
..Default::default()
},
)
.await;
// 期望只收到一条批量 Abort 消息
let abort =
timeout(Duration::from_secs(1), recv_engine_message(dealer)).await.unwrap();
assert_eq!(abort[0].as_ref(), &[0x01]);
let ids: Vec<String> = rmp_serde::from_slice(&abort[1]).unwrap();
assert_eq!(
ids,
vec![
"req-1".to_string(),
"req-2".to_string(),
"req-3".to_string()
]
);
// 确认后续没有额外 Abort 消息
assert!(
timeout(Duration::from_millis(100), recv_engine_message(dealer)).await.is_err()
);
})
},
);
let client = connect_client_with_ipc(
handshake_test_config(
handshake_address,
1,
"test-model",
Duration::from_secs(2),
0,
None,
),
&ipc,
)
.await;
// 先打开所有请求流,让 engine 收到所有 Add
let mut streams = Vec::new();
for id in request_ids {
streams.push(client.call(sample_request_with_id(id)).await.unwrap());
}
// 消费第一个 token
for stream in streams.iter_mut() {
let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
assert_eq!(first.new_token_ids, vec![99]);
}
// 同时丢弃所有流,触发批量自动中止
drop(streams);
let _ = shutdown_tx.send(());
engine_task.await.unwrap();
client.shutdown().await.unwrap();
}
当前评论区没有形成足够清晰的争议点或结论,后续有更多讨论时会体现在这里。
变更集中在自动中止循环内,影响面小。主要风险:recv_many的MAX_DRAIN设为1024,若单引擎待处理请求超过1024,会分多批发送,但功能不变,仅延迟略增。do_abort_requests失败时日志记录所有request_ids,但行为与原单次失败一致。无安全、性能退化风险。
直接影响Rust前端的自动中止功能,减少引擎通信次数。对于大量流同时丢弃的场景(如负载均衡器重连),IPC负载显著降低。对单个流丢弃无影响,延迟略有降低。对系统其他部分无影响。
当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。
参与讨论