执行摘要
本 PR 为 Rust 前端新增一个轻量级 mock engine 二进制工具,用于在无 GPU、无真实模型的情况下进行前端性能基准测试。它模拟了 engine-core 协议,能够隔离前端自身的调度、流处理等开销,并提供与真实模型运行对比的参考基线。同时将内存分配器从 jemalloc 切换为 mimalloc,提升跨平台兼容性。
功能与动机
正如 PR 描述所言:"This PR adds a lightweight Rust mock engine for the external-engine path. It gives us a GPU-free way to benchmark the frontend and external-engine transport with model execution removed, so the remaining cost is request scheduling, engine-core messaging, detokenization/streaming, and HTTP/SSE response delivery." 之前前端性能测试必须依赖完整模型部署,迭代周期长且受 GPU 资源限制。mock engine 提供了一种廉价、快速的替代方案,使得开发者可以迅速定位瓶颈究竟在模型前向还是前端自身。
实现拆解
- 核心引擎逻辑(
rust/src/mock-engine/src/engine.rs):定义 ActiveRequest 结构体管理单请求的模拟解码状态;通过 request_seed 从 CLI 种子、引擎编号和请求 ID 派生出确定性随机种子;utility_response 针对前端发出的各种控制类请求(如 get_supported_tasks, is_sleeping 等)返回伪造但格式正确的响应;empty_finish_outputs 在请求非法时快速终止。
- IO 与协议层(
rust/src/mock-engine/src/io.rs & rust/src/engine-core-client/src/mock_engine.rs):基于 ZMQ Dealer/Push socket 实现全双工通信。decode_request 将收到的消息按帧类型(Add/Abort/Utility/StartDpWave)解码为 EngineInput 枚举;connect_to_frontend 执行完整 handshake 包括发送 HELLO、等待 INIT、回复 READY;run_io_loop 作为永续的 select 循环在 dealer 输入流和 engine 输出通道之间转发数据。
- 入口与生命周期(
rust/src/mock-engine/src/main.rs, lib.rs):main 初始化 tracing、解析 CLI 参数、创建 Tokio 多线程运行时并调用 run。run 为每个引擎索引启动一个 run_engine 任务(包括 IO 协程和引擎协程),通过 CancellationToken 监听 Ctrl-C 信号实现优雅关闭。整体采用 actor 式设计:IO 协程只管收发,引擎协程只管状态推进,两者通过 mpsc 通道解耦。
- 测试与工具重构(
rust/src/mock-engine/src/tests.rs, test_utils.rs):新增 5 个基于 #[tokio::test] 的集成测试,覆盖 TCP 和 IPC 两种传输模式、单/多引擎身份注册、输出 chunk 为 1 时的逐 token 流、abort 请求、以及混合任务排队。同时将 test_utils.rs 中之前分散、冗长的 mock 辅助函数集中精简,削减 141 行代码,统一使用 MockEngineDataSockets 等新类型。
- 协议扩展与分配器切换(
protocol/mod.rs & 根 Cargo.toml):protocol/mod.rs 为 EngineCoreRequest 添加 default_top_p 和 default_repetition_penalty 默认值字段,确保 mock 引擎构造的请求协议完整。根 Cargo.toml 将 Rust 前端的全局分配器从 jemalloc 改为 mimalloc,解决 jemalloc 在不同 page size 系统上的兼容性问题,且基准测试表明吞吐量持平。
utility_response — 模拟 engine utility 请求(engine.rs)
/// 为 utility request 构造最小响应。
fn utility_response(
engine_index: u32,
request: EngineCoreUtilityRequest,
) -> Result<EngineCoreOutputs> {
let result = match request.method_name.as_str() {
"get_supported_tasks" => utility_envelope(vec!["generate"]),
"is_sleeping" => utility_envelope(false),
"reset_prefix_cache" => utility_envelope(true),
"reset_mm_cache"
| "reset_encoder_cache"
| "profile"
| "sleep"
| "wake_up"
| "execute_dummy_batch" => utility_envelope(()),
_ => utility_envelope(Value::Nil),
}?;
Ok(EngineCoreOutputs {
engine_index,
utility_output: Some(UtilityOutput {
call_id: request.call_id,
failure_message: None,
result: Some(result),
}),
timestamp: now_secs(),
..Default::default()
})
}
该函数简洁地展示了 mock engine 如何在不真正执行任何模型逻辑的前提下,返回前端所需的各种控制响应。它通过匹配
method_name 来分发到不同的空操作或假值,确保前端可以流畅推进。
decode_request — ZMQ 消息解码(io.rs)
/// 从 ZMQ 消息解码为 EngineInput。
fn decode_request(message: ZmqMessage) -> Result<EngineInput> {
let frames = message.into_vec();
if frames.is_empty() {
bail!("empty engine request message");
}
if frames.len() != 2 {
bail!("invalid frame count for engine request: {}", frames.len());
}
let request_type_frame = frames[0].as_ref();
let Some(request_type) = EngineCoreRequestType::from_frame(request_type_frame) else {
bail!("unknown engine request type: {:?}", request_type_frame);
};
let input = match request_type {
EngineCoreRequestType::Add => {
let request: Box<EngineCoreRequest> = decode_msgpack(frames[1].as_ref())?;
EngineInput::Request(request)
}
EngineCoreRequestType::Abort => {
let request_ids: Vec<String> = decode_msgpack(frames[1].as_ref())?;
EngineInput::Abort(request_ids)
}
EngineCoreRequestType::Utility => {
let request: EngineCoreUtilityRequest = decode_msgpack(frames[1].as_ref())?;
EngineInput::Utility(request)
}
EngineCoreRequestType::StartDpWave => EngineInput::StartDpWave,
};
Ok(input)
}
该函数展示了 mock engine 如何从 ZMQ 帧中解析出前端发送的请求:第一帧是类型标识,第二帧是 msgpack 编码的负载。它支持四种请求类型,是 IO 层的核心解析入口。
评论区精华
只读审查 gemini-code-assist[bot] 给出了三条建设性反馈:
io.rs 中通过 client_index 直接索引 push_sockets 可能 panic(若索引越界)。
engine.rs 的 step 中使用 BTreeMap 遍历活跃请求可能成为性能瓶颈。
run_engine_loop 中 yield_now() 忙循环应替换为定时等待,避免 CPU 空转。
维护者 njhill 批准评论道:"Thanks @BugenZhao, very nice!" 作者没有公开回应,但 PR 被合并,推测这些点已在后续迭代中处理或判定对当前场景影响有限。
风险与影响
- 直接索引 panic:
io.rs 的 send_engine_outputs_to_client 直接索引 push_sockets,若 client_index 越界会 panic。目前仅在 mock 内部使用,索引由引擎自身控制,但仍是不安全模式。
- 忙循环 CPU 消耗:引擎循环在无活跃请求时使用
yield_now() 忙等,可能拉高 CPU 占用并干扰基准测试的准确性。
- 与实际模型行为偏差:mock engine 忽略采样参数,仅产生随机 token,无法模拟采样抖动、stop token 等真实行为,基准结论需谨慎外推。
- mimalloc 分配器切换:基准数据显示吞吐量持平,但 jemalloc 到 mimalloc 的改变可能引起不同工作负载下的内存碎片或延迟波动。
影响范围集中在 Rust 前端开发团队,作为开发测试工具,不涉及生产部署。
关联脉络
本 PR 是 PR stack 的一部分,依赖于 #43405(parent PR),属于同一系列对 Rust 前端 external-engine 通信架构的改进。此外,近期 PR #43464、#38831 等也涉及 engine 协议和模型加载,但无直接依赖。mock engine 的引入使得后续性能优化工作(如 #43464 的缓冲区重用)可以更快速地验证收益。
参与讨论