Prhub

#5951 [5/n][trainer] feat: flowgrpo trainer

verl-project/verl · 作者 zhtmike · 合并时间 2026-04-17 12:11

分析状态 已生成
文件变更 35提交数 15 · 评论 17
代码增减 +2171 / -147
trainer diffusion rollout experimental algo

执行摘要

新增基于 Ray 的 FlowGRPO 扩散模型训练器,支持图像生成强化学习。

PR作者在body中说明,这是“让flowgrpo训练器可运行的最后一环”,旨在为扩散模型(特别是图像生成任务如Qwen-Image)提供强化学习训练能力。它延续了#5297的工作,并与@AndyZhou952合作完成,目标是支持基于扩散模型的RL训练。

该PR值得精读,特别是ray_diffusion_trainer.py中的训练循环设计和diffusion_algos.py中的优势计算实现。关注点包括:(1) 扩散模型如何适配VERL的DataProto和训练框架;(2) 针对时间步的掩码和指标计算与语言模型处理的差异;(3) 审阅中关于优势计算标准差的未决争议,这反映了算法实现与原始论文的权衡。

讨论亮点
  1. 优势计算逻辑的正确性争议gemini-code-assist[bot]指出compute_flow_grpo_outcome_advantage中存在两个关键问题:(a) 对于单样本组,优势应设为0,但原实现将组均值设为0导致优势等于样本分数,作者zhtmike已修复;(b) 标准偏差计算应在样本级别而非扩展的时间步张量上进行,否则会因重复而低估标准差,导致优势膨胀,作者回应“遵循flowgrpo实现”,未作修改。
  2. 配置设计权衡:审阅者SamitHuang建议在配置中明确extra_configs可接受的参数列表以提高易用性,作者zhtmike承认这是临时设计,将在后续PR中重构配置以避免臃肿,并计划提供更清晰的结构。
  3. 代码细节修正SamitHuang指出数据预处理脚本中local_save_dir未使用os.path.expanduser展开用户目录,可能导致路径错误,作者在后续提交中采纳建议并更新。
  4. FSDP包装器方法转发疑问gemini-code-assist[bot]认为verl/workers/engine/fsdp/diffusers_impl.py中的disable_adapter上下文管理器直接调用self.module.disable_adapters()可能因self.module是FSDP包装器而失败,作者回应“FSDP自动处理包装属性”,认为现有实现可行。

实现拆解

  1. 新增训练器核心模块:在verl/trainer/diffusion/下新增ray_diffusion_trainer.py,定义了RayFlowGRPOTrainer类,负责扩散模型的分布式训练循环、数据加载、日志记录和验证生成。关键函数包括compute_response_mask(为扩散潜变量生成全1掩码)和compute_advantage(调用扩散专用优势估计器)。
  2. 新增训练入口和配置:新增verl/trainer/main_flowgrpo.py作为Hydra入口点,包含run_flowgrpo函数和TaskRunner类,负责Ray集群初始化和资源池管理。同时更新了扩散训练和rollout的配置文件(如diffusion_rollout.yaml),引入了extra_configs字段以容纳算法或模型特定参数。
  3. 实现扩散专用算法和指标:在verl/trainer/diffusion/diffusion_algos.py中新增DiffusionAdvantageEstimator.FLOW_GRPO枚举和compute_flow_grpo_outcome_advantage函数,实现了针对扩散模型(每个去噪时间步均为有效优化步)的GRPO优势计算逻辑。新增diffusion_metric_utils.py,包含compute_data_metrics_diffusion等函数,用于计算扩散训练特有的指标(如基于时间步的奖励、优势统计)。
  4. 提供数据预处理和示例脚本:在examples/flowgrpo_trainer/下新增OCR数据集预处理脚本qwenimage_ocr.py,包含extract_solutionmake_map_fn等函数,将原始文本数据转换为包含系统提示、负提示和奖励模型信息的训练格式。同时提供了端到端运行脚本和LoRA训练示例。
  5. 配套测试和现有模块适配:新增了CPU单元测试test_diffusion_core_algos_on_cpu.py和端到端测试脚本run_flowgrpo_trainer_diffusers.sh。对现有模块进行了小范围适配,包括:修改verl/utils/dataset/rl_dataset.py中的_build_messages以支持负提示和多模态数据;更新verl/workers/engine_workers.py的导入关系以支持新训练器;调整examples/flowgrpo_trainer/vllm_omni/pipeline_qwenimage.py中的_coalesce_not_none逻辑。
文件 模块 状态 重要度
verl/trainer/diffusion/ray_diffusion_trainer.py 扩散训练器 added 9.08
verl/trainer/main_flowgrpo.py 训练入口 added 8.89
verl/trainer/diffusion/diffusion_algos.py 扩散算法 renamed 8.1
examples/flowgrpo_trainer/data_process/qwenimage_ocr.py 示例脚本 added 8.08
tests/trainer/diffusion/test_diffusion_core_algos_on_cpu.py 单元测试 added 6.85
verl/trainer/diffusion/ray_diffusion_trainer.py core-logic

新增的 FlowGRPO 训练器核心实现,负责分布式训练循环、数据加载、优势计算和日志记录。

def compute_response_mask(data: DataProto):
    """计算扩散潜变量的有效步掩码。    对于扩散模型,每个去噪时间步都是一个有效的优化步,
    因此返回的掩码是全1的,覆盖所有时间步。
    """
    all_latents = data.batch["all_latents"]
    b, t = all_latents.shape[:2] # 获取批次大小和时间步数
    return torch.ones((b, t), dtype=torch.int32, device=all_latents.device) # 生成全1掩码
​
​
def compute_advantage(
    data: DataProto,
    adv_estimator: str,
    norm_adv_by_std_in_grpo: bool = True,
    global_std: bool = True,
    config: Optional[DiffusionAlgoConfig] = None,
) -> DataProto:
    """为扩散策略优化计算优势估计。    此函数使用注册的优势估计器(如Flow-GRPO)为扩散模型计算优势估计。
    优势估计用于指导跨去噪时间步的策略优化。
    """
    if "response_mask" not in data.batch.keys():
        data.batch["response_mask"] = compute_response_mask(data) # 确保响应掩码存在
​
    adv_kwargs = {
        "sample_level_rewards": data.batch["sample_level_rewards"],
        "response_mask": data.batch["response_mask"],
        "config": config,
    }
    if "uid" in data.non_tensor_batch:
        adv_kwargs["index"] = data.non_tensor_batch["uid"] # 添加分组索引
    # 调用注册的优势估计器函数进行计算
    adv_fn = get_adv_estimator_fn(adv_estimator)
    advantages, returns = adv_fn(**adv_kwargs)
    data.batch["advantages"] = advantages
    data.batch["returns"] = returns
    return data
verl/trainer/diffusion/diffusion_algos.py core-logic

扩散专用优势估计算法实现,包含 Flow-GRPO 的核心逻辑。

@register_adv_est(DiffusionAdvantageEstimator.FLOW_GRPO)
def compute_flow_grpo_outcome_advantage(
    sample_level_rewards: torch.Tensor,
    response_mask: torch.Tensor,
    index: np.ndarray,
    epsilon: float = 1e-4,
    norm_adv_by_std_in_grpo: bool = True,
    global_std: bool = True,
    config: Optional[DictConfig] = None,
) -> tuple[torch.Tensor, torch.Tensor]:
    """为GRPO计算优势,仅处理结果奖励(每个响应只有一个标量奖励)。    注意:如果norm_adv_by_std_in_grpo为True,则优势按标准差缩放,如原始GRPO;
    如果为False,则不缩放,如Dr.GRPO。
    """
    scores = sample_level_rewards
    if scores.ndim == 1:
        scores = scores.unsqueeze(-1)
    scores = scores.expand_as(response_mask).clone() # 将奖励扩展到所有时间步
​
    id2score = defaultdict(list)
    id2mean = {}
    id2std = {}
​
    with torch.no_grad():
        if global_std:
            batch_std = torch.std(scores) # 计算全局标准差(审阅指出此处可能低估)
        else:
            batch_std = None
​
        bsz = scores.shape[0]
        for i in range(bsz):
            id2score[index[i]].append(scores[i]) # 按索引分组存储分数
        for idx in id2score:
            if len(id2score[idx]) == 1:
                id2mean[idx] = id2score[idx][0].mean() # 单样本组:均值设为自身分数(已修复)
                if global_std:
                    id2std[idx] = batch_std
                else:
                    id2std[idx] = torch.tensor(1.0)
            elif len(id2score[idx]) > 1:
                scores_tensor = torch.stack(id2score[idx])
                id2mean[idx] = torch.mean(scores_tensor)
                if global_std:
                    id2std[idx] = batch_std
                else:
                    id2std[idx] = torch.std(scores_tensor) # 组内计算标准差
            else:
                raise ValueError(f"no score in prompt index: {idx}")
        for i in range(bsz):
            if norm_adv_by_std_in_grpo:
                scores[i] = (scores[i] - id2mean[index[i]]) / (id2std[index[i]] + epsilon)
            else:
                scores[i] = scores[i] - id2mean[index[i]] # 计算优势
​
    return scores, scores # 返回优势和回报(此处相同)

关键符号

compute_response_mask compute_advantage RayFlowGRPOTrainer run_flowgrpo compute_flow_grpo_outcome_advantage compute_data_metrics_diffusion extract_solution _build_messages

评论区精华

GRPO 优势计算中单样本组和标准差计算问题 正确性

gemini-code-assist[bot] 指出两个关键错误:单样本组优势应设为 0 但原实现导致非零;标准差计算应在样本级别而非扩展的时间步张量上,否则会低估标准差。

结论:作者修复了单样本组问题,但拒绝了修正标准差计算的建议,表示遵循原始 FlowGRPO 实现。 · partially_resolved

配置设计易用性和 extra_configs 字段 设计

SamitHuang 建议明确 extra_configs 可接受的参数列表以提升易用性,并指出配置可能泄露内部抽象。

结论:作者承认 extra_configs 是临时设计,将在后续 PR 中重构配置以避免臃肿,并计划提供更清晰的结构。 · deferred

数据预处理脚本路径展开问题 正确性

SamitHuang 指出 qwenimage_ocr.py 中 local_save_dir 未使用 os.path.expanduser,可能导致路径错误。

结论:作者在后续提交中采纳建议,更新了代码以正确展开用户目录路径。 · 已解决

风险与影响

  1. 算法正确性风险diffusion_algos.py中的优势计算未按审阅建议修正标准差计算方式(在扩展的时间步张量上计算),可能导致优势值被高估,影响训练稳定性和策略优化效果。
  2. 兼容性风险:新增的RayFlowGRPOTrainer强制要求使用新引擎路径(trainer.use_legacy_worker_impl=disable),与旧有工作流不兼容,可能影响现有用户的迁移。
  3. 配置复杂性风险:临时引入的extra_configs字段缺乏明确文档和验证,用户可能错误配置或难以理解可用参数,增加使用门槛和调试成本。
  4. 依赖风险:示例脚本依赖特定版本的vllmvllm-omni(0.18),若环境不匹配可能导致运行时错误。
  1. 对系统的影响:为VERL框架新增了扩散模型强化学习训练能力,扩展了其应用场景至图像生成等领域。新训练器与现有PPO训练器并行,不影响原有语言模型训练流程。
  2. 对用户的影响:研究人员和工程师现在可以使用FlowGRPO算法对扩散模型进行RL训练,并通过提供的OCR示例快速上手。但需要适应新的配置结构和依赖要求。
  3. 对团队的影响:引入了新的训练器模块和算法,需要团队成员熟悉扩散模型特有的数据处理、优势计算和指标统计逻辑。配置的临时性设计暗示后续将有重构,需关注演进。
算法正确性争议 配置临时设计 兼容性约束

关联 Issue

未识别关联 Issue

当前没有检测到明确关联的 Issue 链接,后续同步到相关引用后会出现在这里。

完整报告

执行摘要

  • 一句话:新增基于Ray的FlowGRPO扩散模型训练器,支持图像生成强化学习。
  • 推荐动作:该PR值得精读,特别是ray_diffusion_trainer.py中的训练循环设计和diffusion_algos.py中的优势计算实现。关注点包括:(1) 扩散模型如何适配VERL的DataProto和训练框架;(2) 针对时间步的掩码和指标计算与语言模型处理的差异;(3) 审阅中关于优势计算标准差的未决争议,这反映了算法实现与原始论文的权衡。

功能与动机

PR作者在body中说明,这是“让flowgrpo训练器可运行的最后一环”,旨在为扩散模型(特别是图像生成任务如Qwen-Image)提供强化学习训练能力。它延续了#5297的工作,并与@AndyZhou952合作完成,目标是支持基于扩散模型的RL训练。

实现拆解

  1. 新增训练器核心模块:在verl/trainer/diffusion/下新增ray_diffusion_trainer.py,定义了RayFlowGRPOTrainer类,负责扩散模型的分布式训练循环、数据加载、日志记录和验证生成。关键函数包括compute_response_mask(为扩散潜变量生成全1掩码)和compute_advantage(调用扩散专用优势估计器)。
  2. 新增训练入口和配置:新增verl/trainer/main_flowgrpo.py作为Hydra入口点,包含run_flowgrpo函数和TaskRunner类,负责Ray集群初始化和资源池管理。同时更新了扩散训练和rollout的配置文件(如diffusion_rollout.yaml),引入了extra_configs字段以容纳算法或模型特定参数。
  3. 实现扩散专用算法和指标:在verl/trainer/diffusion/diffusion_algos.py中新增DiffusionAdvantageEstimator.FLOW_GRPO枚举和compute_flow_grpo_outcome_advantage函数,实现了针对扩散模型(每个去噪时间步均为有效优化步)的GRPO优势计算逻辑。新增diffusion_metric_utils.py,包含compute_data_metrics_diffusion等函数,用于计算扩散训练特有的指标(如基于时间步的奖励、优势统计)。
  4. 提供数据预处理和示例脚本:在examples/flowgrpo_trainer/下新增OCR数据集预处理脚本qwenimage_ocr.py,包含extract_solutionmake_map_fn等函数,将原始文本数据转换为包含系统提示、负提示和奖励模型信息的训练格式。同时提供了端到端运行脚本和LoRA训练示例。
  5. 配套测试和现有模块适配:新增了CPU单元测试test_diffusion_core_algos_on_cpu.py和端到端测试脚本run_flowgrpo_trainer_diffusers.sh。对现有模块进行了小范围适配,包括:修改verl/utils/dataset/rl_dataset.py中的_build_messages以支持负提示和多模态数据;更新verl/workers/engine_workers.py的导入关系以支持新训练器;调整examples/flowgrpo_trainer/vllm_omni/pipeline_qwenimage.py中的_coalesce_not_none逻辑。

关键文件:

  • verl/trainer/diffusion/ray_diffusion_trainer.py(模块 扩散训练器;类别 source;类型 core-logic;符号 compute_response_mask, compute_advantage, RayFlowGRPOTrainer, init): 新增的FlowGRPO训练器核心实现,负责分布式训练循环、数据加载、优势计算和日志记录。
  • verl/trainer/main_flowgrpo.py(模块 训练入口;类别 source;类型 entrypoint;符号 main, run_flowgrpo, TaskRunner, init): FlowGRPO训练的Hydra入口点,负责Ray初始化和任务运行器管理。
  • verl/trainer/diffusion/diffusion_algos.py(模块 扩散算法;类别 source;类型 core-logic;符号 DiffusionAdvantageEstimator, compute_flow_grpo_outcome_advantage): 扩散专用优势估计算法实现,包含Flow-GRPO的核心逻辑。
  • examples/flowgrpo_trainer/data_process/qwenimage_ocr.py(模块 示例脚本;类别 source;类型 data-contract;符号 extract_solution, make_map_fn, process_fn): OCR数据集预处理示例,展示了如何为扩散RL训练准备多模态数据。
  • tests/trainer/diffusion/test_diffusion_core_algos_on_cpu.py(模块 单元测试;类别 test;类型 test-coverage;符号 test_flow_grpo_advantage_return, test_compute_policy_loss_flow_grpo): 扩散核心算法的单元测试,验证Flow-GRPO优势计算和政策损失的正确性。

关键符号:compute_response_mask, compute_advantage, RayFlowGRPOTrainer, run_flowgrpo, compute_flow_grpo_outcome_advantage, compute_data_metrics_diffusion, extract_solution, _build_messages

关键源码片段

verl/trainer/diffusion/ray_diffusion_trainer.py

新增的FlowGRPO训练器核心实现,负责分布式训练循环、数据加载、优势计算和日志记录。

def compute_response_mask(data: DataProto):
    """计算扩散潜变量的有效步掩码。    对于扩散模型,每个去噪时间步都是一个有效的优化步,
    因此返回的掩码是全1的,覆盖所有时间步。
    """
    all_latents = data.batch["all_latents"]
    b, t = all_latents.shape[:2] # 获取批次大小和时间步数
    return torch.ones((b, t), dtype=torch.int32, device=all_latents.device) # 生成全1掩码
​
​
def compute_advantage(
    data: DataProto,
    adv_estimator: str,
    norm_adv_by_std_in_grpo: bool = True,
    global_std: bool = True,
    config: Optional[DiffusionAlgoConfig] = None,
) -> DataProto:
    """为扩散策略优化计算优势估计。    此函数使用注册的优势估计器(如Flow-GRPO)为扩散模型计算优势估计。
    优势估计用于指导跨去噪时间步的策略优化。
    """
    if "response_mask" not in data.batch.keys():
        data.batch["response_mask"] = compute_response_mask(data) # 确保响应掩码存在
​
    adv_kwargs = {
        "sample_level_rewards": data.batch["sample_level_rewards"],
        "response_mask": data.batch["response_mask"],
        "config": config,
    }
    if "uid" in data.non_tensor_batch:
        adv_kwargs["index"] = data.non_tensor_batch["uid"] # 添加分组索引
    # 调用注册的优势估计器函数进行计算
    adv_fn = get_adv_estimator_fn(adv_estimator)
    advantages, returns = adv_fn(**adv_kwargs)
    data.batch["advantages"] = advantages
    data.batch["returns"] = returns
    return data

verl/trainer/diffusion/diffusion_algos.py

扩散专用优势估计算法实现,包含Flow-GRPO的核心逻辑。

@register_adv_est(DiffusionAdvantageEstimator.FLOW_GRPO)
def compute_flow_grpo_outcome_advantage(
    sample_level_rewards: torch.Tensor,
    response_mask: torch.Tensor,
    index: np.ndarray,
    epsilon: float = 1e-4,
    norm_adv_by_std_in_grpo: bool = True,
    global_std: bool = True,
    config: Optional[DictConfig] = None,
) -> tuple[torch.Tensor, torch.Tensor]:
    """为GRPO计算优势,仅处理结果奖励(每个响应只有一个标量奖励)。    注意:如果norm_adv_by_std_in_grpo为True,则优势按标准差缩放,如原始GRPO;
    如果为False,则不缩放,如Dr.GRPO。
    """
    scores = sample_level_rewards
    if scores.ndim == 1:
        scores = scores.unsqueeze(-1)
    scores = scores.expand_as(response_mask).clone() # 将奖励扩展到所有时间步
​
    id2score = defaultdict(list)
    id2mean = {}
    id2std = {}
​
    with torch.no_grad():
        if global_std:
            batch_std = torch.std(scores) # 计算全局标准差(审阅指出此处可能低估)
        else:
            batch_std = None
​
        bsz = scores.shape[0]
        for i in range(bsz):
            id2score[index[i]].append(scores[i]) # 按索引分组存储分数
        for idx in id2score:
            if len(id2score[idx]) == 1:
                id2mean[idx] = id2score[idx][0].mean() # 单样本组:均值设为自身分数(已修复)
                if global_std:
                    id2std[idx] = batch_std
                else:
                    id2std[idx] = torch.tensor(1.0)
            elif len(id2score[idx]) > 1:
                scores_tensor = torch.stack(id2score[idx])
                id2mean[idx] = torch.mean(scores_tensor)
                if global_std:
                    id2std[idx] = batch_std
                else:
                    id2std[idx] = torch.std(scores_tensor) # 组内计算标准差
            else:
                raise ValueError(f"no score in prompt index: {idx}")
        for i in range(bsz):
            if norm_adv_by_std_in_grpo:
                scores[i] = (scores[i] - id2mean[index[i]]) / (id2std[index[i]] + epsilon)
            else:
                scores[i] = scores[i] - id2mean[index[i]] # 计算优势
​
    return scores, scores # 返回优势和回报(此处相同)

评论区精华

  1. 优势计算逻辑的正确性争议gemini-code-assist[bot]指出compute_flow_grpo_outcome_advantage中存在两个关键问题:(a) 对于单样本组,优势应设为0,但原实现将组均值设为0导致优势等于样本分数,作者zhtmike已修复;(b) 标准偏差计算应在样本级别而非扩展的时间步张量上进行,否则会因重复而低估标准差,导致优势膨胀,作者回应“遵循flowgrpo实现”,未作修改。
  2. 配置设计权衡:审阅者SamitHuang建议在配置中明确extra_configs可接受的参数列表以提高易用性,作者zhtmike承认这是临时设计,将在后续PR中重构配置以避免臃肿,并计划提供更清晰的结构。
  3. 代码细节修正SamitHuang指出数据预处理脚本中local_save_dir未使用os.path.expanduser展开用户目录,可能导致路径错误,作者在后续提交中采纳建议并更新。
  4. FSDP包装器方法转发疑问gemini-code-assist[bot]认为verl/workers/engine/fsdp/diffusers_impl.py中的disable_adapter上下文管理器直接调用self.module.disable_adapters()可能因self.module是FSDP包装器而失败,作者回应“FSDP自动处理包装属性”,认为现有实现可行。
  • GRPO优势计算中单样本组和标准差计算问题 (correctness): 作者修复了单样本组问题,但拒绝了修正标准差计算的建议,表示遵循原始FlowGRPO实现。
  • 配置设计易用性和extra_configs字段 (design): 作者承认extra_configs是临时设计,将在后续PR中重构配置以避免臃肿,并计划提供更清晰的结构。
  • 数据预处理脚本路径展开问题 (correctness): 作者在后续提交中采纳建议,更新了代码以正确展开用户目录路径。

风险与影响

  • 风险:1. 算法正确性风险diffusion_algos.py中的优势计算未按审阅建议修正标准差计算方式(在扩展的时间步张量上计算),可能导致优势值被高估,影响训练稳定性和策略优化效果。
    2. 兼容性风险:新增的RayFlowGRPOTrainer强制要求使用新引擎路径(trainer.use_legacy_worker_impl=disable),与旧有工作流不兼容,可能影响现有用户的迁移。
    3. 配置复杂性风险:临时引入的extra_configs字段缺乏明确文档和验证,用户可能错误配置或难以理解可用参数,增加使用门槛和调试成本。
    4. 依赖风险:示例脚本依赖特定版本的vllmvllm-omni(0.18),若环境不匹配可能导致运行时错误。
  • 影响:1. 对系统的影响:为VERL框架新增了扩散模型强化学习训练能力,扩展了其应用场景至图像生成等领域。新训练器与现有PPO训练器并行,不影响原有语言模型训练流程。
    2. 对用户的影响:研究人员和工程师现在可以使用FlowGRPO算法对扩散模型进行RL训练,并通过提供的OCR示例快速上手。但需要适应新的配置结构和依赖要求。
    3. 对团队的影响:引入了新的训练器模块和算法,需要团队成员熟悉扩散模型特有的数据处理、优势计算和指标统计逻辑。配置的临时性设计暗示后续将有重构,需关注演进。
  • 风险标记:算法正确性争议, 配置临时设计, 兼容性约束

关联脉络

  • PR #5297 [trainer] feat: flowgrpo trainer (part 1): PR body中提及本PR是继#5297之后的“最后一环”,两者共同构成FlowGRPO训练器的完整实现,可能存在功能拆分或依赖关系。
  • PR #5997 [trainer,algo] feat: Support On-Policy Distillation in main_ppo_sync: 同为训练器模块的新增功能,涉及算法集成和资源池管理,可对比学习VERL中训练器的扩展模式。
  • PR #5978 [tool, rollout, cfg] feat: per-sample tool environment routing for ToolAgentLoop: 都涉及rollout配置和实验性功能扩展,展示了VERL在多样化任务(工具使用、图像生成)上的RL适配思路。

参与讨论