Prhub

#7430 [Bugfix][RL] fix control request timeout in async update weights pipe…

PaddlePaddle/FastDeploy · 作者 jackyYang6 · 合并时间 2026-04-17 16:45

分析状态 已生成
文件变更 1提交数 1 · 评论 2
代码增减 +4 / -4
bugfix RL APIServer

执行摘要

修复异步 RL 权重更新流程中控制请求的竞态超时问题。

根据PR body描述,在异步RL权重更新流程中,/v1/resume 偶尔会超时(600秒),同时引擎侧日志显示“req_id 'control-...' not in req_dict, caching response”。这表明控制路径存在竞态:引擎可能在响应通道注册完成前就快速处理完控制请求,导致响应先进入缓存路径,API服务器在 response_queue.get() 上等待直到超时。

该PR值得精读,因为它展示了一个典型的竞态条件修复案例:通过调整异步操作顺序来消除时序问题。关注点在于 run_control_method 中响应通道注册与请求发送的顺序调整,这是分布式系统中控制流设计的常见模式。

讨论亮点

Review中PaddlePaddle-bot指出修复逻辑正确,通过调整执行顺序有效减少了竞态条件窗口,方案清晰且针对性强。同时建议在 run_control_method 中添加 cleanup_request 调用以避免 request_map 潜在的内存积累,但这并非本次修复的阻塞问题。liyonghua0910直接批准,无进一步讨论。

实现拆解

  1. 定位问题文件:问题出现在 fastdeploy/entrypoints/engine_client.pyrun_control_method 方法中,该方法负责发送控制请求并等待响应。
  2. 调整执行顺序:将响应通道注册(通过 self.connection_manager.get_connection(request_id) 获取 dealer 和 response_queue)的代码移到发送控制请求(self.zmq_client.send_jsonsend_pyobj)之前。这样确保在发送请求前,响应通道已准备就绪,减少竞态窗口。
  3. 保持其他逻辑不变:仅调整代码顺序,不修改请求发送、响应等待或超时处理逻辑,不影响正常推理请求流。
  4. 无测试或配置配套改动:PR未添加单元测试,因为这是时序敏感的竞态修复,通过异步RL权重更新流程验证;也未修改配置文件或部署脚本。
文件 模块 状态 重要度
fastdeploy/entrypoints/engine_client.py 入口点 modified 4.91
fastdeploy/entrypoints/engine_client.py core-logic

这是唯一修改的文件,包含控制请求的核心逻辑,修复了竞态超时问题。

async def run_control_method(self, request: ControlRequest):
    api_server_logger.info(f"Received control request: {request}")
    # 先获取请求ID并注册响应通道,确保在发送请求前通道已准备
    request_id = request.request_id
    dealer, response_queue = await self.connection_manager.get_connection(request_id)
    # 如果未启用批量数据发送,则先写入空消息和请求ID到dealer
    if not envs.ZMQ_SEND_BATCH_DATA:
        dealer.write([b"", request_id.encode("utf-8")])
​
    # 将请求转换为字典并发送
    req_dict = request.to_dict()
    if envs.ZMQ_SEND_BATCH_DATA:
        req_dict["zmq_worker_pid"] = self.worker_pid
    if not self.enable_mm:
        self.zmq_client.send_json(req_dict)
    else:
        self.zmq_client.send_pyobj(req_dict)
​
    try:
        # 等待响应,默认超时600秒适用于大多数控制场景
        response = await asyncio.wait_for(response_queue.get(), timeout=600)
        # 后续处理响应...
    except asyncio.TimeoutError:
        # 超时处理逻辑...

关键符号

run_control_method

评论区精华

竞态条件修复与内存优化建议 设计

PaddlePaddle-bot 指出修复逻辑正确,通过调整执行顺序减少了竞态窗口,同时建议添加 cleanup_request 调用以避免 request_map 内存积累。

结论:修复被批准,内存优化建议作为非阻塞问题记录。 · 已解决

风险与影响

  1. 回归风险低:仅调整代码顺序,未改变控制请求的核心逻辑(如序列化、网络通信、超时处理),且不影响正常推理请求流。
  2. 时序风险缓解:修复后缩小了竞态窗口,但极端情况下(如网络延迟极高)仍可能发生竞态,不过概率大幅降低。
  3. 内存风险未解决:review中提到的 request_map 内存积累问题未在本PR处理,但这不是直接风险。
  4. 测试覆盖不足:未添加单元测试,依赖集成流程验证,可能掩盖边缘情况。
  1. 用户影响:修复后,异步RL权重更新流程中的 /v1/resume 请求超时问题将减少,提升控制路径的可靠性,对终端用户透明。
  2. 系统影响:仅影响控制请求路径(如 pause、update_weights、resume),不影响推理性能或模型输出。
  3. 团队影响:代码变更极小,易于理解和维护,但需注意review中提到的内存优化建议。
时序竞态风险 缺少单元测试

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

执行摘要

  • 一句话:修复异步RL权重更新流程中控制请求的竞态超时问题。
  • 推荐动作:该PR值得精读,因为它展示了一个典型的竞态条件修复案例:通过调整异步操作顺序来消除时序问题。关注点在于 run_control_method 中响应通道注册与请求发送的顺序调整,这是分布式系统中控制流设计的常见模式。

功能与动机

根据PR body描述,在异步RL权重更新流程中,/v1/resume 偶尔会超时(600秒),同时引擎侧日志显示“req_id 'control-...' not in req_dict, caching response”。这表明控制路径存在竞态:引擎可能在响应通道注册完成前就快速处理完控制请求,导致响应先进入缓存路径,API服务器在 response_queue.get() 上等待直到超时。

实现拆解

  1. 定位问题文件:问题出现在 fastdeploy/entrypoints/engine_client.pyrun_control_method 方法中,该方法负责发送控制请求并等待响应。
  2. 调整执行顺序:将响应通道注册(通过 self.connection_manager.get_connection(request_id) 获取 dealer 和 response_queue)的代码移到发送控制请求(self.zmq_client.send_jsonsend_pyobj)之前。这样确保在发送请求前,响应通道已准备就绪,减少竞态窗口。
  3. 保持其他逻辑不变:仅调整代码顺序,不修改请求发送、响应等待或超时处理逻辑,不影响正常推理请求流。
  4. 无测试或配置配套改动:PR未添加单元测试,因为这是时序敏感的竞态修复,通过异步RL权重更新流程验证;也未修改配置文件或部署脚本。

关键文件:

  • fastdeploy/entrypoints/engine_client.py(模块 入口点;类别 source;类型 core-logic;符号 run_control_method): 这是唯一修改的文件,包含控制请求的核心逻辑,修复了竞态超时问题。

关键符号:run_control_method

关键源码片段

fastdeploy/entrypoints/engine_client.py

这是唯一修改的文件,包含控制请求的核心逻辑,修复了竞态超时问题。

async def run_control_method(self, request: ControlRequest):
    api_server_logger.info(f"Received control request: {request}")
    # 先获取请求ID并注册响应通道,确保在发送请求前通道已准备
    request_id = request.request_id
    dealer, response_queue = await self.connection_manager.get_connection(request_id)
    # 如果未启用批量数据发送,则先写入空消息和请求ID到dealer
    if not envs.ZMQ_SEND_BATCH_DATA:
        dealer.write([b"", request_id.encode("utf-8")])
​
    # 将请求转换为字典并发送
    req_dict = request.to_dict()
    if envs.ZMQ_SEND_BATCH_DATA:
        req_dict["zmq_worker_pid"] = self.worker_pid
    if not self.enable_mm:
        self.zmq_client.send_json(req_dict)
    else:
        self.zmq_client.send_pyobj(req_dict)
​
    try:
        # 等待响应,默认超时600秒适用于大多数控制场景
        response = await asyncio.wait_for(response_queue.get(), timeout=600)
        # 后续处理响应...
    except asyncio.TimeoutError:
        # 超时处理逻辑...

评论区精华

Review中PaddlePaddle-bot指出修复逻辑正确,通过调整执行顺序有效减少了竞态条件窗口,方案清晰且针对性强。同时建议在 run_control_method 中添加 cleanup_request 调用以避免 request_map 潜在的内存积累,但这并非本次修复的阻塞问题。liyonghua0910直接批准,无进一步讨论。

  • 竞态条件修复与内存优化建议 (design): 修复被批准,内存优化建议作为非阻塞问题记录。

风险与影响

  • 风险:1. 回归风险低:仅调整代码顺序,未改变控制请求的核心逻辑(如序列化、网络通信、超时处理),且不影响正常推理请求流。
    2. 时序风险缓解:修复后缩小了竞态窗口,但极端情况下(如网络延迟极高)仍可能发生竞态,不过概率大幅降低。
    3. 内存风险未解决:review中提到的 request_map 内存积累问题未在本PR处理,但这不是直接风险。
    4. 测试覆盖不足:未添加单元测试,依赖集成流程验证,可能掩盖边缘情况。
  • 影响:1. 用户影响:修复后,异步RL权重更新流程中的 /v1/resume 请求超时问题将减少,提升控制路径的可靠性,对终端用户透明。
    2. 系统影响:仅影响控制请求路径(如 pause、update_weights、resume),不影响推理性能或模型输出。
    3. 团队影响:代码变更极小,易于理解和维护,但需注意review中提到的内存优化建议。
  • 风险标记:时序竞态风险, 缺少单元测试

关联脉络

  • PR #7190 [Feature] implement log channel separation and request log level system: 同样涉及 entrypoints 模块(如 engine_client.py),关注API服务器和控制路径的改进。
  • PR #7412 [PD Disaggregation] Enable PD deployment without Router: 涉及引擎和控制路径的配置与测试,与本PR的控制请求超时问题相关。

参与讨论