Prhub

#7426 [Engine] Allow parallel dp starting

PaddlePaddle/FastDeploy · 作者 RichardWooSJTU · 合并时间 2026-04-16 18:43

分析状态 已生成
文件变更 1提交数 1 · 评论 3
代码增减 +7 / -1
Engine Optimization infra

执行摘要

将数据并行进程启动从串行改为并行,优化启动性能。

根据AI Code Review的补充说明,原始代码中DP进程启动是串行的,每个进程启动后需要等待launched_expert_service_signal置位后才能启动下一个。这种方式在大规模DP部署时会导致较长的启动时间。PR作者通过并行启动来优化这一过程。

此PR值得快速浏览,了解如何通过并行化优化启动流程。重点关注launch_components方法的修改,并思考是否需要在团队代码规范中补充超时机制。

讨论亮点

AI Code Review指出等待循环缺少超时机制,如果DP进程启动失败或崩溃,代码会无限等待。建议添加超时检查和进程状态检测,并参考了代码中其他类似的等待逻辑(如第172-176行和第757-762行)。但此建议未被采纳,PR最终按原方案合并。

实现拆解

  1. 修改启动循环逻辑:在fastdeploy/engine/engine.pylaunch_components方法中,将DP进程的启动从串行改为并行。具体做法是先启动所有进程,然后统一等待所有进程的初始化信号。
  2. 缩短轮询间隔:将等待循环中的sleep时间从1秒减少到0.1秒,以加快对进程启动状态的响应。
  3. 核心变更位置:修改集中在launch_components方法内,涉及for循环和while等待循环的调整。
  4. 测试与配置配套:根据上下文,此PR没有添加单元测试或修改配置文件,仅为核心逻辑优化。
文件 模块 状态 重要度
fastdeploy/engine/engine.py 引擎启动 modified 4.57
fastdeploy/engine/engine.py core-logic

这是 PR 的唯一变更文件,修改了引擎启动的核心逻辑,直接影响数据并行进程的启动方式。

def launch_components(self):
    # ... 其他启动逻辑 ...
​
    # 启动第一个DP进程
    self.dp_processed[-1].start()
​
    # 并行启动剩余的DP进程
    for i in range(
        1,
        self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
    ):
        # 等待每个进程的初始化信号
        while self.launched_expert_service_signal.value[i] == 0:
            time.sleep(0.1) # 将轮询间隔从1秒缩短到0.1秒,加快响应
​
    # ... 后续检查逻辑 ...

关键符号

launch_components

评论区精华

等待循环缺少超时机制 设计

AI Code Review 建议添加超时检查和进程状态检测,以避免无限等待风险。

结论:建议未被采纳,PR 按原方案合并。 · 已解决

风险与影响

  1. 无限等待风险:修改后的等待循环仍缺少超时机制,如果某个DP进程启动失败,可能导致引擎卡死。
  2. 进程状态监控缺失:未添加对进程退出状态的检查,如果进程意外崩溃,无法及时报错。
  3. 并发启动压力:并行启动可能增加系统资源瞬时压力,在资源受限环境下可能引发问题。
  4. 轮询间隔缩短的副作用:将sleep时间从1秒减到0.1秒可能增加CPU使用率,但影响较小。
  1. 性能提升:对于大规模数据并行部署,启动时间将显著缩短,提升部署效率。
  2. 用户体验:终端用户感知到的服务启动延迟降低,尤其在大规模集群中。
  3. 系统影响:仅影响引擎启动阶段,不影响运行时推理性能。
  4. 团队影响:代码变更简单,易于理解和维护,但需注意潜在的死锁风险。
无限等待风险 缺少超时机制

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

执行摘要

  • 一句话:将数据并行进程启动从串行改为并行,优化启动性能。
  • 推荐动作:此PR值得快速浏览,了解如何通过并行化优化启动流程。重点关注launch_components方法的修改,并思考是否需要在团队代码规范中补充超时机制。

功能与动机

根据AI Code Review的补充说明,原始代码中DP进程启动是串行的,每个进程启动后需要等待launched_expert_service_signal置位后才能启动下一个。这种方式在大规模DP部署时会导致较长的启动时间。PR作者通过并行启动来优化这一过程。

实现拆解

  1. 修改启动循环逻辑:在fastdeploy/engine/engine.pylaunch_components方法中,将DP进程的启动从串行改为并行。具体做法是先启动所有进程,然后统一等待所有进程的初始化信号。
  2. 缩短轮询间隔:将等待循环中的sleep时间从1秒减少到0.1秒,以加快对进程启动状态的响应。
  3. 核心变更位置:修改集中在launch_components方法内,涉及for循环和while等待循环的调整。
  4. 测试与配置配套:根据上下文,此PR没有添加单元测试或修改配置文件,仅为核心逻辑优化。

关键文件:

  • fastdeploy/engine/engine.py(模块 引擎启动;类别 source;类型 core-logic;符号 launch_components): 这是PR的唯一变更文件,修改了引擎启动的核心逻辑,直接影响数据并行进程的启动方式。

关键符号:launch_components

关键源码片段

fastdeploy/engine/engine.py

这是PR的唯一变更文件,修改了引擎启动的核心逻辑,直接影响数据并行进程的启动方式。

def launch_components(self):
    # ... 其他启动逻辑 ...
​
    # 启动第一个DP进程
    self.dp_processed[-1].start()
​
    # 并行启动剩余的DP进程
    for i in range(
        1,
        self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
    ):
        # 等待每个进程的初始化信号
        while self.launched_expert_service_signal.value[i] == 0:
            time.sleep(0.1) # 将轮询间隔从1秒缩短到0.1秒,加快响应
​
    # ... 后续检查逻辑 ...

评论区精华

AI Code Review指出等待循环缺少超时机制,如果DP进程启动失败或崩溃,代码会无限等待。建议添加超时检查和进程状态检测,并参考了代码中其他类似的等待逻辑(如第172-176行和第757-762行)。但此建议未被采纳,PR最终按原方案合并。

  • 等待循环缺少超时机制 (design): 建议未被采纳,PR按原方案合并。

风险与影响

  • 风险:1. 无限等待风险:修改后的等待循环仍缺少超时机制,如果某个DP进程启动失败,可能导致引擎卡死。
    2. 进程状态监控缺失:未添加对进程退出状态的检查,如果进程意外崩溃,无法及时报错。
    3. 并发启动压力:并行启动可能增加系统资源瞬时压力,在资源受限环境下可能引发问题。
    4. 轮询间隔缩短的副作用:将sleep时间从1秒减到0.1秒可能增加CPU使用率,但影响较小。
  • 影响:1. 性能提升:对于大规模数据并行部署,启动时间将显著缩短,提升部署效率。
    2. 用户体验:终端用户感知到的服务启动延迟降低,尤其在大规模集群中。
    3. 系统影响:仅影响引擎启动阶段,不影响运行时推理性能。
    4. 团队影响:代码变更简单,易于理解和维护,但需注意潜在的死锁风险。
  • 风险标记:无限等待风险, 缺少超时机制

关联脉络

  • PR #7412 [PD Disaggregation] Enable PD deployment without Router: 同样涉及引擎部署配置和启动逻辑的修改,属于同一模块的近期变更。
  • PR #7407 [BugFix][Scheduler]Fix FD_DISABLE_CHUNKED_PREFILL max_num_batched_tokens limit: 涉及引擎配置和参数调整,与本PR的引擎启动优化有间接关联。

参与讨论