执行摘要
- 一句话:新增基于Ray的FlowGRPO扩散模型训练器,支持图像生成强化学习。
- 推荐动作:该PR值得精读,特别是
ray_diffusion_trainer.py中的训练循环设计和diffusion_algos.py中的优势计算实现。关注点包括:(1) 扩散模型如何适配VERL的DataProto和训练框架;(2) 针对时间步的掩码和指标计算与语言模型处理的差异;(3) 审阅中关于优势计算标准差的未决争议,这反映了算法实现与原始论文的权衡。
功能与动机
PR作者在body中说明,这是“让flowgrpo训练器可运行的最后一环”,旨在为扩散模型(特别是图像生成任务如Qwen-Image)提供强化学习训练能力。它延续了#5297的工作,并与@AndyZhou952合作完成,目标是支持基于扩散模型的RL训练。
实现拆解
- 新增训练器核心模块:在
verl/trainer/diffusion/下新增ray_diffusion_trainer.py,定义了RayFlowGRPOTrainer类,负责扩散模型的分布式训练循环、数据加载、日志记录和验证生成。关键函数包括compute_response_mask(为扩散潜变量生成全1掩码)和compute_advantage(调用扩散专用优势估计器)。
- 新增训练入口和配置:新增
verl/trainer/main_flowgrpo.py作为Hydra入口点,包含run_flowgrpo函数和TaskRunner类,负责Ray集群初始化和资源池管理。同时更新了扩散训练和rollout的配置文件(如diffusion_rollout.yaml),引入了extra_configs字段以容纳算法或模型特定参数。
- 实现扩散专用算法和指标:在
verl/trainer/diffusion/diffusion_algos.py中新增DiffusionAdvantageEstimator.FLOW_GRPO枚举和compute_flow_grpo_outcome_advantage函数,实现了针对扩散模型(每个去噪时间步均为有效优化步)的GRPO优势计算逻辑。新增diffusion_metric_utils.py,包含compute_data_metrics_diffusion等函数,用于计算扩散训练特有的指标(如基于时间步的奖励、优势统计)。
- 提供数据预处理和示例脚本:在
examples/flowgrpo_trainer/下新增OCR数据集预处理脚本qwenimage_ocr.py,包含extract_solution、make_map_fn等函数,将原始文本数据转换为包含系统提示、负提示和奖励模型信息的训练格式。同时提供了端到端运行脚本和LoRA训练示例。
- 配套测试和现有模块适配:新增了CPU单元测试
test_diffusion_core_algos_on_cpu.py和端到端测试脚本run_flowgrpo_trainer_diffusers.sh。对现有模块进行了小范围适配,包括:修改verl/utils/dataset/rl_dataset.py中的_build_messages以支持负提示和多模态数据;更新verl/workers/engine_workers.py的导入关系以支持新训练器;调整examples/flowgrpo_trainer/vllm_omni/pipeline_qwenimage.py中的_coalesce_not_none逻辑。
关键文件:
verl/trainer/diffusion/ray_diffusion_trainer.py(模块 扩散训练器;类别 source;类型 core-logic;符号 compute_response_mask, compute_advantage, RayFlowGRPOTrainer, init): 新增的FlowGRPO训练器核心实现,负责分布式训练循环、数据加载、优势计算和日志记录。
verl/trainer/main_flowgrpo.py(模块 训练入口;类别 source;类型 entrypoint;符号 main, run_flowgrpo, TaskRunner, init): FlowGRPO训练的Hydra入口点,负责Ray初始化和任务运行器管理。
verl/trainer/diffusion/diffusion_algos.py(模块 扩散算法;类别 source;类型 core-logic;符号 DiffusionAdvantageEstimator, compute_flow_grpo_outcome_advantage): 扩散专用优势估计算法实现,包含Flow-GRPO的核心逻辑。
examples/flowgrpo_trainer/data_process/qwenimage_ocr.py(模块 示例脚本;类别 source;类型 data-contract;符号 extract_solution, make_map_fn, process_fn): OCR数据集预处理示例,展示了如何为扩散RL训练准备多模态数据。
tests/trainer/diffusion/test_diffusion_core_algos_on_cpu.py(模块 单元测试;类别 test;类型 test-coverage;符号 test_flow_grpo_advantage_return, test_compute_policy_loss_flow_grpo): 扩散核心算法的单元测试,验证Flow-GRPO优势计算和政策损失的正确性。
关键符号:compute_response_mask, compute_advantage, RayFlowGRPOTrainer, run_flowgrpo, compute_flow_grpo_outcome_advantage, compute_data_metrics_diffusion, extract_solution, _build_messages
关键源码片段
verl/trainer/diffusion/ray_diffusion_trainer.py
新增的FlowGRPO训练器核心实现,负责分布式训练循环、数据加载、优势计算和日志记录。
def compute_response_mask(data: DataProto):
"""计算扩散潜变量的有效步掩码。
对于扩散模型,每个去噪时间步都是一个有效的优化步,
因此返回的掩码是全1的,覆盖所有时间步。
"""
all_latents = data.batch["all_latents"]
b, t = all_latents.shape[:2] # 获取批次大小和时间步数
return torch.ones((b, t), dtype=torch.int32, device=all_latents.device) # 生成全1掩码
def compute_advantage(
data: DataProto,
adv_estimator: str,
norm_adv_by_std_in_grpo: bool = True,
global_std: bool = True,
config: Optional[DiffusionAlgoConfig] = None,
) -> DataProto:
"""为扩散策略优化计算优势估计。
此函数使用注册的优势估计器(如Flow-GRPO)为扩散模型计算优势估计。
优势估计用于指导跨去噪时间步的策略优化。
"""
if "response_mask" not in data.batch.keys():
data.batch["response_mask"] = compute_response_mask(data) # 确保响应掩码存在
adv_kwargs = {
"sample_level_rewards": data.batch["sample_level_rewards"],
"response_mask": data.batch["response_mask"],
"config": config,
}
if "uid" in data.non_tensor_batch:
adv_kwargs["index"] = data.non_tensor_batch["uid"] # 添加分组索引
# 调用注册的优势估计器函数进行计算
adv_fn = get_adv_estimator_fn(adv_estimator)
advantages, returns = adv_fn(**adv_kwargs)
data.batch["advantages"] = advantages
data.batch["returns"] = returns
return data
verl/trainer/diffusion/diffusion_algos.py
扩散专用优势估计算法实现,包含Flow-GRPO的核心逻辑。
@register_adv_est(DiffusionAdvantageEstimator.FLOW_GRPO)
def compute_flow_grpo_outcome_advantage(
sample_level_rewards: torch.Tensor,
response_mask: torch.Tensor,
index: np.ndarray,
epsilon: float = 1e-4,
norm_adv_by_std_in_grpo: bool = True,
global_std: bool = True,
config: Optional[DictConfig] = None,
) -> tuple[torch.Tensor, torch.Tensor]:
"""为GRPO计算优势,仅处理结果奖励(每个响应只有一个标量奖励)。
注意:如果norm_adv_by_std_in_grpo为True,则优势按标准差缩放,如原始GRPO;
如果为False,则不缩放,如Dr.GRPO。
"""
scores = sample_level_rewards
if scores.ndim == 1:
scores = scores.unsqueeze(-1)
scores = scores.expand_as(response_mask).clone() # 将奖励扩展到所有时间步
id2score = defaultdict(list)
id2mean = {}
id2std = {}
with torch.no_grad():
if global_std:
batch_std = torch.std(scores) # 计算全局标准差(审阅指出此处可能低估)
else:
batch_std = None
bsz = scores.shape[0]
for i in range(bsz):
id2score[index[i]].append(scores[i]) # 按索引分组存储分数
for idx in id2score:
if len(id2score[idx]) == 1:
id2mean[idx] = id2score[idx][0].mean() # 单样本组:均值设为自身分数(已修复)
if global_std:
id2std[idx] = batch_std
else:
id2std[idx] = torch.tensor(1.0)
elif len(id2score[idx]) > 1:
scores_tensor = torch.stack(id2score[idx])
id2mean[idx] = torch.mean(scores_tensor)
if global_std:
id2std[idx] = batch_std
else:
id2std[idx] = torch.std(scores_tensor) # 组内计算标准差
else:
raise ValueError(f"no score in prompt index: {idx}")
for i in range(bsz):
if norm_adv_by_std_in_grpo:
scores[i] = (scores[i] - id2mean[index[i]]) / (id2std[index[i]] + epsilon)
else:
scores[i] = scores[i] - id2mean[index[i]] # 计算优势
return scores, scores # 返回优势和回报(此处相同)
评论区精华
- 优势计算逻辑的正确性争议:
gemini-code-assist[bot]指出compute_flow_grpo_outcome_advantage中存在两个关键问题:(a) 对于单样本组,优势应设为0,但原实现将组均值设为0导致优势等于样本分数,作者zhtmike已修复;(b) 标准偏差计算应在样本级别而非扩展的时间步张量上进行,否则会因重复而低估标准差,导致优势膨胀,作者回应“遵循flowgrpo实现”,未作修改。
- 配置设计权衡:审阅者
SamitHuang建议在配置中明确extra_configs可接受的参数列表以提高易用性,作者zhtmike承认这是临时设计,将在后续PR中重构配置以避免臃肿,并计划提供更清晰的结构。
- 代码细节修正:
SamitHuang指出数据预处理脚本中local_save_dir未使用os.path.expanduser展开用户目录,可能导致路径错误,作者在后续提交中采纳建议并更新。
- FSDP包装器方法转发疑问:
gemini-code-assist[bot]认为verl/workers/engine/fsdp/diffusers_impl.py中的disable_adapter上下文管理器直接调用self.module.disable_adapters()可能因self.module是FSDP包装器而失败,作者回应“FSDP自动处理包装属性”,认为现有实现可行。
- GRPO优势计算中单样本组和标准差计算问题 (correctness): 作者修复了单样本组问题,但拒绝了修正标准差计算的建议,表示遵循原始FlowGRPO实现。
- 配置设计易用性和extra_configs字段 (design): 作者承认extra_configs是临时设计,将在后续PR中重构配置以避免臃肿,并计划提供更清晰的结构。
- 数据预处理脚本路径展开问题 (correctness): 作者在后续提交中采纳建议,更新了代码以正确展开用户目录路径。
风险与影响
- 风险:1. 算法正确性风险:
diffusion_algos.py中的优势计算未按审阅建议修正标准差计算方式(在扩展的时间步张量上计算),可能导致优势值被高估,影响训练稳定性和策略优化效果。
2. 兼容性风险:新增的RayFlowGRPOTrainer强制要求使用新引擎路径(trainer.use_legacy_worker_impl=disable),与旧有工作流不兼容,可能影响现有用户的迁移。
3. 配置复杂性风险:临时引入的extra_configs字段缺乏明确文档和验证,用户可能错误配置或难以理解可用参数,增加使用门槛和调试成本。
4. 依赖风险:示例脚本依赖特定版本的vllm和vllm-omni(0.18),若环境不匹配可能导致运行时错误。
- 影响:1. 对系统的影响:为VERL框架新增了扩散模型强化学习训练能力,扩展了其应用场景至图像生成等领域。新训练器与现有PPO训练器并行,不影响原有语言模型训练流程。
2. 对用户的影响:研究人员和工程师现在可以使用FlowGRPO算法对扩散模型进行RL训练,并通过提供的OCR示例快速上手。但需要适应新的配置结构和依赖要求。
3. 对团队的影响:引入了新的训练器模块和算法,需要团队成员熟悉扩散模型特有的数据处理、优势计算和指标统计逻辑。配置的临时性设计暗示后续将有重构,需关注演进。
- 风险标记:算法正确性争议, 配置临时设计, 兼容性约束
关联脉络
- PR #5297 [trainer] feat: flowgrpo trainer (part 1): PR body中提及本PR是继#5297之后的“最后一环”,两者共同构成FlowGRPO训练器的完整实现,可能存在功能拆分或依赖关系。
- PR #5997 [trainer,algo] feat: Support On-Policy Distillation in
main_ppo_sync: 同为训练器模块的新增功能,涉及算法集成和资源池管理,可对比学习VERL中训练器的扩展模式。
- PR #5978 [tool, rollout, cfg] feat: per-sample tool environment routing for ToolAgentLoop: 都涉及rollout配置和实验性功能扩展,展示了VERL在多样化任务(工具使用、图像生成)上的RL适配思路。
参与讨论