import copy from pathlib import Path from tqdm import tqdm from beartype import beartype from beartype.typing import Tuple, Optional import torch from torch import nn import torch.nn.functional as F import pytorch_lightning as pl from pytorch_lightning.utilities import rank_zero_info import deepspeed from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam from deepspeed.runtime.fp16.onebit.zoadam import ZeroOneAdam from pytorch_lightning.strategies import DeepSpeedStrategy from einops import rearrange, repeat, reduce, pack, unpack from einops.layers.torch import Rearrange, Reduce from src.rlhf.utils import masked_mean, gumbel_sample from src.model import RWKV # helper functions def exists(val): return val is not None # loss function def loss_function(prefer_reward, alter_reward): return -torch.mean(torch.log(torch.sigmoid(prefer_reward - alter_reward))) # Reward Model - RWKV with a scalar head @beartype class RewardModel(pl.LightningModule): def __init__(self, args, rwkv: RWKV): super().__init__() # 用预训练模型初始化奖励模型 self.rwkv = rwkv self.args = args # 输出 token 向量的维度 dim = self.args.n_embd # 用于区分输入中的 prompt 和 response,当作模型参数进行训练,初始化为全0 self.prompt_embed = nn.Parameter(torch.zeros(dim)) self.response_embed = nn.Parameter(torch.zeros(dim)) self.padding_embed = nn.Parameter(torch.zeros(dim), requires_grad=False) # reward 得分计算 self.pred_reward = nn.Linear(dim, 1, bias=False) def load(self, path): path = Path(path) assert path.exists() self.load_state_dict(torch.load(str(path))) def configure_optimizers(self): args = self.args if args.layerwise_lr > 0: lr_1x = set() lr_2x = set() lr_3x = set() for n, p in self.named_parameters(): if "time_mix" in n: if args.my_pile_stage == 2: lr_2x.add(n) else: lr_1x.add(n) elif "time_decay" in n: if args.my_pile_stage == 2: lr_3x.add(n) else: lr_2x.add(n) elif "time_first" in n: lr_3x.add(n) else: lr_1x.add(n) lr_1x = sorted(list(lr_1x)) lr_2x = sorted(list(lr_2x)) lr_3x = sorted(list(lr_3x)) # print('1x', lr_1x) # print('2x', lr_2x) # print('3x', lr_3x) param_dict = {n: p for n, p in self.named_parameters()} if args.my_pile_stage == 2: optim_groups = [ {"params": [param_dict[n] for n in lr_1x], "weight_decay": 0.0, "my_lr_scale": 1.0}, {"params": [param_dict[n] for n in lr_2x], "weight_decay": 0.0, "my_lr_scale": 5.0},# test: 2e-3 / args.lr_init}, {"params": [param_dict[n] for n in lr_3x], "weight_decay": 0.0, "my_lr_scale": 5.0},# test: 3e-3 / args.lr_init}, ] else: optim_groups = [ {"params": [param_dict[n] for n in lr_1x], "weight_decay": 0.0, "my_lr_scale": 1.0}, {"params": [param_dict[n] for n in lr_2x], "weight_decay": 0.0, "my_lr_scale": 2.0}, {"params": [param_dict[n] for n in lr_3x], "weight_decay": 0.0, "my_lr_scale": 3.0}, ] else: optim_groups = [ {"params": [p for n, p in self.named_parameters()], "weight_decay": 0.0}, ] if self.deepspeed_offload: return DeepSpeedCPUAdam(optim_groups, lr=self.args.lr_init, betas=self.args.betas, eps=self.args.adam_eps, bias_correction=True, adamw_mode=False, weight_decay=0, amsgrad=False) return FusedAdam(optim_groups, lr=self.args.lr_init, betas=self.args.betas, eps=self.args.adam_eps, bias_correction=True, adam_w_mode=False, weight_decay=0, amsgrad=False) # return ZeroOneAdam(optim_groups, lr=self.args.lr_init, betas=self.args.betas, eps=self.args.adam_eps, bias_correction=True, weight_decay=0, amsgrad=False, cuda_aware=False) @property def deepspeed_offload(self) -> bool: strategy = self.trainer.strategy if isinstance(strategy, DeepSpeedStrategy): cfg = strategy.config["zero_optimization"] return bool(cfg.get("offload_optimizer") or cfg.get("offload_param")) return False def forward( self, x, mask = None, prompt_mask = None, prompt_lengths = None ): # prompt_mask 和 prompt_lengths 只能二选一 assert not (exists(prompt_mask) and exists(prompt_lengths)) # derive prompt mask from prompt lengths if exists(prompt_lengths): batch, seq_len = x.shape arange = torch.arange(seq_len, device=x.device) prompt_mask = repeat(arange, 'n -> b n', b = batch) < rearrange(prompt_lengths, 'b -> b 1') # reward model should have an understanding of which section is prompt, and which section is response # 根据 prompt_mask 中 token 的 True 和 False,从 prompt_embed 或 response_embed 中取值 # 如果为 True,则从 prompt_embed 中选,否则从 response_embed 中选 prompt_response_mask_embed = torch.stack([ self.prompt_embed, self.response_embed, self.padding_embed ]).to(prompt_mask.device) extra_embed = None if exists(prompt_mask): extra_embed = prompt_response_mask_embed[prompt_mask] # 获得最后一个 token 的 embedding last_token_embeds = self.rwkv( x, extra_embed=extra_embed, rm_train=True )[:, -1, :] # 计算奖励 reward = self.pred_reward(last_token_embeds) reward = reward.squeeze(-1) return reward def train_forward(self, x_p, x_a, m_p, m_a): # 因为前向传播的时候,需要过两次模型。所以反馈的时候需要冻结其中一次的参数 # 不然梯度会被计算两次,在包含 deepspeed 框架下会报错 # 报错信息:Gradient computed twice for this partition. with torch.enable_grad(): prefer_reward = self.forward(x_p, prompt_mask=m_p) with torch.no_grad(): alter_reward = self.forward(x_a, prompt_mask=m_a) return prefer_reward, alter_reward def training_step(self, batch, batch_idx): x_p, x_a, m_p, m_a = batch prefer_reward, alter_reward = self.train_forward( x_p, x_a, m_p, m_a) loss = loss_function(prefer_reward, alter_reward) return loss def training_step_end(self, batch_parts): all = self.all_gather(batch_parts) if self.trainer.is_global_zero: self.trainer.my_loss_all = all