提交 01a745b7 编写于 作者: X xixiaoyao

add multihead inference doc

上级 1775f28f
*.pyc *.pyc
paddlepalm.egg-info paddlepalm.egg-info
data
__pycache__ __pycache__
*egg-info
pretrain_model pretrain_model
pretrain pretrain
output* output*
......
...@@ -225,6 +225,10 @@ To save models/checkpoints and logs during training, just call `trainer.set_save ...@@ -225,6 +225,10 @@ To save models/checkpoints and logs during training, just call `trainer.set_save
#### Evaluation/Inference #### Evaluation/Inference
To do predict/evaluation after a training stage, just create another three reader, backbone and head instance with `phase='predict'` (repeat step 1~4 above). Then do predicting with `predict` method in trainer (no need to create another trainer). More implementation details see [this](https://github.com/PaddlePaddle/PALM/tree/master/examples/predict). To do predict/evaluation after a training stage, just create another three reader, backbone and head instance with `phase='predict'` (repeat step 1~4 above). Then do predicting with `predict` method in trainer (no need to create another trainer). More implementation details see [this](https://github.com/PaddlePaddle/PALM/tree/master/examples/predict).
If you want to do evaluation during training process, use `trainer.train_one_step()` instead of `trainer.train()`. The `trainer.train_one_step(batch)` achieves to train only one step, thus you can insert evaluation code into any point of training process. The argument `batch` can be fetched from `trainer.get_one_batch`.
PaddlePALM also supports multi-head inference, please reference `examples/multi-task/joint_predict.py`.
#### Play with Multiple GPUs #### Play with Multiple GPUs
If there exists multiple GPUs in your environment, you can control the number and index of these GPUs through the environment variable [CUDA_VISIBLE_DEVICES](https://devblogs.nvidia.com/cuda-pro-tip-control-gpu-visibility-cuda_visible_devices/). For example, if 4 GPUs in your enviroment, indexed with 0,1,2,3, you can run with GPU2 only with following commands If there exists multiple GPUs in your environment, you can control the number and index of these GPUs through the environment variable [CUDA_VISIBLE_DEVICES](https://devblogs.nvidia.com/cuda-pro-tip-control-gpu-visibility-cuda_visible_devices/). For example, if 4 GPUs in your enviroment, indexed with 0,1,2,3, you can run with GPU2 only with following commands
......
# coding=utf-8
import paddlepalm as palm
import json
import numpy as np
if __name__ == '__main__':
# configs
max_seqlen = 128
batch_size = 128
num_epochs = 20
print_steps = 5
lr = 2e-5
num_classes = 130
weight_decay = 0.01
num_classes_intent = 26
dropout_prob = 0.1
random_seed = 0
label_map = './data/atis/atis_slot/label_map.json'
vocab_path = './pretrain/ERNIE-v2-en-base/vocab.txt'
train_slot = './data/atis/atis_slot/train.tsv'
train_intent = './data/atis/atis_intent/train.tsv'
config = json.load(open('./pretrain/ERNIE-v2-en-base/ernie_config.json'))
input_dim = config['hidden_size']
# ----------------------- for training -----------------------
# step 1-1: create readers
slot_reader = palm.reader.SequenceLabelReader(vocab_path, max_seqlen, label_map, seed=random_seed, phase='predict')
intent_reader = palm.reader.ClassifyReader(vocab_path, max_seqlen, seed=random_seed, phase='predict')
# step 1-2: load train data
slot_reader.load_data(train_slot, file_format='tsv', num_epochs=None, batch_size=batch_size)
intent_reader.load_data(train_intent, batch_size=batch_size, num_epochs=None)
# step 2: create a backbone of the model to extract text features
ernie = palm.backbone.ERNIE.from_config(config, phase='predict')
# step 3: register readers with ernie backbone
slot_reader.register_with(ernie)
intent_reader.register_with(ernie)
# step 4: create task output heads
slot_head = palm.head.SequenceLabel(num_classes, input_dim, dropout_prob, phase='predict')
intent_head = palm.head.Classify(num_classes_intent, input_dim, dropout_prob, phase='predict')
# step 5-1: create task trainers and multiHeadTrainer
trainer_slot = palm.Trainer("slot", mix_ratio=1.0)
trainer_intent = palm.Trainer("intent", mix_ratio=1.0)
trainer = palm.MultiHeadTrainer([trainer_slot, trainer_intent])
# # step 5-2: build forward graph with backbone and task head
vars = trainer_intent.build_predict_forward(ernie, intent_head)
vars = trainer_slot.build_predict_forward(ernie, slot_head)
loss_var = trainer.build_predict_forward()
# load checkpoint
trainer.load_ckpt('outputs/ckpt.step300')
# merge inference readers
joint_iterator = trainer.merge_inference_readers([slot_reader, intent_reader])
# for test
# batch = next(joint_iterator('slot'))
# results = trainer.predict_one_batch('slot', batch)
# batch = next(joint_iterator('intent'))
# results = trainer.predict_one_batch('intent', batch)
# predict slot filling
print('processing slot filling examples...')
print('num examples: '+str(slot_reader.num_examples))
cnt = 0
for batch in joint_iterator('slot'):
cnt += len(trainer.predict_one_batch('slot', batch)['logits'])
if cnt % 1000 <= 128:
print(str(cnt)+'th example processed.')
print(str(cnt)+'th example processed.')
# predict intent recognition
print('processing intent recognition examples...')
print('num examples: '+str(intent_reader.num_examples))
cnt = 0
for batch in joint_iterator('intent'):
cnt += len(trainer.predict_one_batch('intent', batch)['logits'])
if cnt % 1000 <= 128:
print(str(cnt)+'th example processed.')
print(str(cnt)+'th example processed.')
...@@ -60,7 +60,7 @@ class SequenceLabel(Head): ...@@ -60,7 +60,7 @@ class SequenceLabel(Head):
if self._is_training: if self._is_training:
return {'loss': [[1], 'float32']} return {'loss': [[1], 'float32']}
else: else:
return {'emission': [[-1, self.num_classes], 'float32']} return {'logits': [[-1, -1, self.num_classes], 'float32']}
def build(self, inputs, scope_name=''): def build(self, inputs, scope_name=''):
token_emb = inputs['backbone']['encoder_outputs'] token_emb = inputs['backbone']['encoder_outputs']
...@@ -107,7 +107,7 @@ class SequenceLabel(Head): ...@@ -107,7 +107,7 @@ class SequenceLabel(Head):
return {"loss": avg_cost} return {"loss": avg_cost}
else: else:
return {"emission": emission} return {"logits": emission}
def batch_postprocess(self, rt_outputs): def batch_postprocess(self, rt_outputs):
if not self._is_training: if not self._is_training:
......
...@@ -24,8 +24,6 @@ class MultiHeadTrainer(Trainer): ...@@ -24,8 +24,6 @@ class MultiHeadTrainer(Trainer):
trainers: a list of Trainer objects. trainers: a list of Trainer objects.
""" """
# if reuse_flags is not None:
# assert len(reuse_flags) == len(trainers)
Trainer.__init__(self, '') Trainer.__init__(self, '')
self._trainers = trainers self._trainers = trainers
...@@ -56,7 +54,6 @@ class MultiHeadTrainer(Trainer): ...@@ -56,7 +54,6 @@ class MultiHeadTrainer(Trainer):
for t in self._trainers: for t in self._trainers:
t._set_multitask() t._set_multitask()
# def build_forward(self, backbone, heads):
def build_forward(self): def build_forward(self):
""" """
Build forward computation graph for training, which usually built from input layer to loss node. Build forward computation graph for training, which usually built from input layer to loss node.
...@@ -95,19 +92,120 @@ class MultiHeadTrainer(Trainer): ...@@ -95,19 +92,120 @@ class MultiHeadTrainer(Trainer):
self._task_id_var = task_id_var self._task_id_var = task_id_var
self._loss_var = loss_var self._loss_var = loss_var
self._fetch_list = [loss_var.name] self._fetch_list = [loss_var.name]
# for b in train_prog.blocks:
# for var in b.vars:
# pass
# if 'task_id' in var:
# print(var)
# exit()
# print(var)
if not self._multi_task: if not self._multi_task:
self._init_exe_prog(for_train=True) self._init_exe_prog(for_train=True)
return loss_var return loss_var
def fit_readers(self, reader_dict): def build_predict_forward(self):
raise NotImplementedError() head_dict = {}
backbone = self._trainers[0]._pred_backbone
for i in self._trainers:
assert i._pred_head is not None and i._pred_backbone is not None, "You should build_predict_forward for the {} task".format(i._name)
assert i._pred_backbone == backbone, "The backbone for each task must be the same"
head_dict[i._name] = i._pred_head
pred_prog = fluid.Program()
pred_init_prog = fluid.Program()
self._pred_prog = pred_prog
self._pred_init_prog = pred_init_prog
def get_loss(i):
head = head_dict[self._trainers[i].name]
self._trainers[i]._lock_prog = True
pred_vars = self._trainers[i].build_predict_forward(backbone, head)
self._trainers[i]._lock_prog = False
# return loss_var
task_fns = {i: lambda i=i: get_loss(i) for i in range(len(self._trainers))}
with fluid.program_guard(pred_prog, pred_init_prog):
task_id_var = fluid.data(name="__task_id",shape=[1],dtype='int64')
loss_var = layers.switch_case(
branch_index=task_id_var,
branch_fns=task_fns
)
# self._task_id_var = task_id_var
# self._loss_var = loss_var
# self._fetch_list = [loss_var.name]
if not self._multi_task:
self._init_exe_prog(for_train=False)
# return self.build_forward()
# """
# Build computation graph for evaluation and prediction.
# Arguments:
# - pred_backbone: a Backbone object with phase == 'predict'. For evaluating model during training, the predict backbone should keep the same with train backbone.
# - pred_head: a Head object with phase == 'predict'. For evaluating model during training, the predict head should keep the same with train head.
#
# Return:
# - output_vars: dict type. Each value is a computational graph variable(node) argumented by pred_head outputs_attr.
# """
# for i in self._trainers:
# assert i._predict_vars is not None, "{} need to build_predict_forward before "
#
# return output_vars
def merge_inference_readers(self, readers):
for r in readers:
assert r._phase == 'predict'
if isinstance(readers, list):
reader_dict = {k.name: v for k,v in zip(self._trainers, readers)}
elif isinstance(readers, dict):
reader_dict = readers
else:
raise ValueError()
num_heads = len(self._trainers)
assert len(reader_dict) == num_heads, "received number of readers is not consistent with trainers."
trainer_dict = {t.name: t for t in self._trainers}
task_name2id = {t.name: idx for idx, t in enumerate(self._trainers)}
self._task_name2id = task_name2id
self._finish_steps = {}
self._finish = {}
input_names = []
name_to_pos = []
joint_shape_and_dtypes = []
iterators = []
prefixes = []
mrs = []
net_inputs = []
global_steps = 0
for t in self._trainers:
assert t.name in reader_dict
assert reader_dict[t.name].num_epochs is None, "{}: num_epochs is not None. \
To run with multi-head mode, num_epochs of each Trainer should be set as None.".format(t.name)
# print(num_epochs, t.mix_ratio, base_steps_pur_epoch)
self._finish_steps[t.name] = 9999999999
self._finish[t.name] = True
# t._set_task_id(self._task_id_var)
t.fit_reader(reader_dict[t.name], phase='predict')
net_inputs.append(t._pred_net_inputs)
prefixes.append(t.name)
iterators.append(t._raw_iterator_fn())
input_names.append(t._pred_input_names)
name_to_pos.append(t._pred_name_to_position)
joint_shape_and_dtypes.append(t._pred_shape_and_dtypes)
iterator_fn = reader_helper.create_multihead_inference_fn(iterators, prefixes, joint_shape_and_dtypes, \
input_names, name_to_pos, task_name2id, dev_count=dev_count)
feed_batch_process_fn = reader_helper.create_feed_batch_process_fn(net_inputs)
if gpu_dev_count > 1:
raise NotImplementedError('currently only single-gpu mode has been supported running with multi-task mode.')
# distribute_feeder_fn = data_feeder(iterator_fn, feed_batch_process_fn, phase=phase, is_multi=True, with_arg=True)
else:
distribute_feeder_fn = iterator_fn
self._predict_iterator_fn = distribute_feeder_fn
self._pred_feed_batch_process_fn = feed_batch_process_fn
return distribute_feeder_fn
def fit_readers_with_mixratio(self, readers, sampling_reference, num_epochs, phase='train'): def fit_readers_with_mixratio(self, readers, sampling_reference, num_epochs, phase='train'):
""" """
...@@ -247,18 +345,6 @@ class MultiHeadTrainer(Trainer): ...@@ -247,18 +345,6 @@ class MultiHeadTrainer(Trainer):
if finish: if finish:
break break
# if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps:
# print(cur_task.name+': train finished!')
# cur_task.save()
# if (save_predict or save_ckpt) and self._cur_train_step % save_steps == 0:
# if save_predict:
# self.save(save_path, suffix='pred.step'+str(self._cur_train_step))
# if save_ckpt:
# fluid.io.save_persistables(self._exe, os.path.join(save_path, 'ckpt.step'+str(self._cur_train_step)), self._train_prog)
# print('checkpoint has been saved at '+os.path.join(save_path, 'ckpt.step'+str(self._cur_train_step)))
def train_one_step(self, batch): def train_one_step(self, batch):
if dev_count > 1: if dev_count > 1:
...@@ -268,33 +354,29 @@ class MultiHeadTrainer(Trainer): ...@@ -268,33 +354,29 @@ class MultiHeadTrainer(Trainer):
assert isinstance(batch, dict) assert isinstance(batch, dict)
task_id = batch['__task_id'][0] task_id = batch['__task_id'][0]
# rt_outputs = self._trainers[task_id].train_one_step(batch, self._exe, self._distribute_train_prog, self._fetch_list)
rt_outputs = self._trainers[task_id].train_one_step(batch) rt_outputs = self._trainers[task_id].train_one_step(batch)
self._cur_train_step += 1 self._cur_train_step += 1
self._check_save() self._check_save()
return rt_outputs, task_id return rt_outputs, task_id
# if dev_count > 1: def predict_one_batch(self, task_name, batch):
# # feed, mask, task_id = batch if dev_count > 1:
# for f in feed:
# f['branch'] = np.array([task_id], dtype='int64')
# rt_outputs = self.exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._trainers[task_id]._fetch_list)
# num_fakes = decode_fake(len(rt_outputs[0]), mask, self._trainers[task_id]._batch_size)
# for _ in range(num_fakes):
# for item in rt_outputs:
# item.pop()
# else:
# feed, task_id = batch
# feed['branch'] = np.array([task_id], dtype='int64')
# rt_outputs = self._exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._trainers[task_id]._fetch_list)
def predict_one_batch(self, batch):
raise NotImplementedError() raise NotImplementedError()
# batch = next(self._predict_iterator_fn(task_name))
t = self._trainers[self._task_name2id[task_name]]
# t._set_exe(self._exe)
t._set_dist_pred(self._trainers[self._task_name2id[task_name]]._pred_prog)
rt_outputs = t.predict_one_batch(batch)
return rt_outputs
def predict(self, output_dir=None, print_steps=1000): def predict(self, output_dir=None, print_steps=1000):
raise NotImplementedError() raise NotImplementedError()
# iterator = self._predict_iterator
# self._distribute_pred_prog = fluid.CompiledProgram(self._pred_prog).with_data_parallel()
@property @property
def overall_train_steps(self): def overall_train_steps(self):
return self._overall_train_steps return self._overall_train_steps
...@@ -60,21 +60,11 @@ class Trainer(object): ...@@ -60,21 +60,11 @@ class Trainer(object):
self._check_save = lambda: False self._check_save = lambda: False
# if save_predict_model:
# self._save_predict_model = True
# assert pred_head is not None, "pred_head is required to save predict model."
# self._pred_reader = reader.clone(phase='predict')
# else:
# assert pred_head is None, "You should set save_predict_model as True, or the pred_head is invalid."
# self._save_predict_model = False
# self._pred_reader = None
# self._save_steps = save_steps
self._task_reuse_scope = name if reuse_head_with is None else reuse_head_with self._task_reuse_scope = name if reuse_head_with is None else reuse_head_with
self._feeded_var_names = None self._feeded_var_names = None
self._target_vars = None self._target_vars = None
self._predict_vars = None
self._num_examples = 0 self._num_examples = 0
...@@ -98,8 +88,7 @@ class Trainer(object): ...@@ -98,8 +88,7 @@ class Trainer(object):
self._pred_fetch_name_list = [] self._pred_fetch_name_list = []
self._pred_fetch_var_list = [] self._pred_fetch_var_list = []
# exe is built when random_init_params is called. # exe is built when random_init_params called.
# self._exe = helper.build_executor(gpu_dev_count>0)
self._exe = None self._exe = None
self._save_protocol = { self._save_protocol = {
...@@ -124,11 +113,9 @@ class Trainer(object): ...@@ -124,11 +113,9 @@ class Trainer(object):
""" """
# assert not self._multi_task, "you cannot build_forward in trainer when a train is wrapper by MultiHeadTrainer."
self._task_head = task_head self._task_head = task_head
self._backbone = backbone self._backbone = backbone
# assert self._backbone is not None, "backbone is required for Trainer to build net forward to run with single task mode"
self._build_forward = True self._build_forward = True
# create reader, task # create reader, task
...@@ -137,12 +124,6 @@ class Trainer(object): ...@@ -137,12 +124,6 @@ class Trainer(object):
pred_task_attrs = [] pred_task_attrs = []
task_attr_from_reader = helper.encode_inputs(self._task_head.inputs_attrs['reader'], self.name) task_attr_from_reader = helper.encode_inputs(self._task_head.inputs_attrs['reader'], self.name)
# task_attr_from_reader = self._task_head.inputs_attrs['reader']
# _check_io(backbone.inputs_attr, inst._reader['train'].outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train')
# _check_io(inst.taskblock['train'].inputs_attrs['reader'], inst._reader['train'].outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train')
# _check_io(inst._taskblock['train'].inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone')
# merge reader input attrs from backbone and task_instances # merge reader input attrs from backbone and task_instances
input_names, shape_and_dtypes, name_to_position = reader_helper.merge_input_attrs(backbone.inputs_attr, task_attr_from_reader, insert_taskid=False) input_names, shape_and_dtypes, name_to_position = reader_helper.merge_input_attrs(backbone.inputs_attr, task_attr_from_reader, insert_taskid=False)
...@@ -177,11 +158,6 @@ class Trainer(object): ...@@ -177,11 +158,6 @@ class Trainer(object):
self._net_inputs = net_inputs self._net_inputs = net_inputs
assert sorted(bb_output_vars.keys()) == sorted(backbone.outputs_attr.keys()) assert sorted(bb_output_vars.keys()) == sorted(backbone.outputs_attr.keys())
# self._bb_output_vars.keys
# fluid.framework.switch_main_program(train_prog)
# fluid.framework.switch_startup_program(train_init_prog)
task_output_vars = {} task_output_vars = {}
task_inputs = {'backbone': bb_output_vars} task_inputs = {'backbone': bb_output_vars}
task_inputs_from_reader = helper.decode_inputs(net_inputs, self.name) task_inputs_from_reader = helper.decode_inputs(net_inputs, self.name)
...@@ -205,23 +181,12 @@ class Trainer(object): ...@@ -205,23 +181,12 @@ class Trainer(object):
task_fetches = {k: v.name for k,v in task_output_vars.items()} task_fetches = {k: v.name for k,v in task_output_vars.items()}
self._fetches = task_fetches self._fetches = task_fetches
self._fetch_names, self._fetch_list = zip(*self._fetches.items()) self._fetch_names, self._fetch_list = zip(*self._fetches.items())
# fetches = task_fetches
# fetches['__task_id'] = net_inputs['__task_id'].name
# compute loss
# task_id_var = net_inputs['__task_id']
# task_id_vec = layers.one_hot(task_id_var, num_instances)
# losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0)
# loss = layers.reduce_sum(task_id_vec * losses)
if not self._lock_prog: if not self._lock_prog:
with fluid.program_guard(train_prog, train_init_prog): with fluid.program_guard(train_prog, train_init_prog):
loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss']) loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss'])
else: else:
loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss']) loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss'])
# for _id, block in enumerate(self._train_prog.blocks):
# for var in block.vars:
# print("[debug] : %d, %s" % (_id, var))
self._loss_var = loss_var self._loss_var = loss_var
if not self._multi_task: if not self._multi_task:
...@@ -242,19 +207,13 @@ class Trainer(object): ...@@ -242,19 +207,13 @@ class Trainer(object):
""" """
self._pred_head = pred_head self._pred_head = pred_head
self._pred_backbone = pred_backbone self._pred_backbone = pred_backbone
# self._pred_reader = self._reader.clone(phase='pred')
pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name) pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name)
# pred_task_attr_from_reader = self._pred_head.inputs_attrs['reader']
# _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
# _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
pred_input_names, pred_shape_and_dtypes, pred_name_to_position = reader_helper.merge_input_attrs(pred_backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False) pred_input_names, pred_shape_and_dtypes, pred_name_to_position = reader_helper.merge_input_attrs(pred_backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False)
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)] pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)]
self._pred_shape_and_dtypes = pred_shape_and_dtypes self._pred_shape_and_dtypes = pred_shape_and_dtypes
self._pred_name_to_position = pred_name_to_position self._pred_name_to_position = pred_name_to_position
self._pred_input_names = pred_input_names
pred_prog = fluid.Program() pred_prog = fluid.Program()
self._pred_prog = pred_prog self._pred_prog = pred_prog
...@@ -262,14 +221,12 @@ class Trainer(object): ...@@ -262,14 +221,12 @@ class Trainer(object):
self._pred_init_prog = pred_init_prog self._pred_init_prog = pred_init_prog
with fluid.program_guard(pred_prog, pred_init_prog): with fluid.program_guard(pred_prog, pred_init_prog):
pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs) pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs)
# pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
pred_bb_output_vars = pred_backbone.build(pred_net_inputs) pred_bb_output_vars = pred_backbone.build(pred_net_inputs)
self._pred_net_inputs = pred_net_inputs self._pred_net_inputs = pred_net_inputs
# prepare predict vars for saving inference model # prepare predict vars for saving inference model
with fluid.program_guard(pred_prog, pred_init_prog): with fluid.program_guard(pred_prog, pred_init_prog):
cur_inputs = helper.decode_inputs(pred_net_inputs, self.name) cur_inputs = helper.decode_inputs(pred_net_inputs, self.name)
# self.pred_input = cur_inputs
self._pred_input_name_list, self._pred_input_varname_list = \ self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in cur_inputs.items()]) zip(*[[k, v.name] for k,v in cur_inputs.items()])
...@@ -284,10 +241,12 @@ class Trainer(object): ...@@ -284,10 +241,12 @@ class Trainer(object):
self._pred_fetch_name_list = [] self._pred_fetch_name_list = []
self._pred_fetch_var_list = [] self._pred_fetch_var_list = []
if not self._multi_task: # if not self._multi_task:
self._init_exe_prog(for_train=False) self._init_exe_prog(for_train=False)
self._exe.run(self._pred_init_prog) self._exe.run(self._pred_init_prog)
self._predict_vars = output_vars
return output_vars return output_vars
def build_backward(self, optimizer, weight_decay=None, use_ema=False, ema_decay=None): def build_backward(self, optimizer, weight_decay=None, use_ema=False, ema_decay=None):
...@@ -301,7 +260,6 @@ class Trainer(object): ...@@ -301,7 +260,6 @@ class Trainer(object):
- ema_decay: optional, default is None. Only works with use_ema == True. Control decay rate of EMA strategy. - ema_decay: optional, default is None. Only works with use_ema == True. Control decay rate of EMA strategy.
""" """
# assert not self._multi_task, "you cannot build_backward in trainer when a train is wrapper by MultiHeadTrainer."
# build optimizer # build optimizer
assert self._loss_var is not None and self._train_init_prog is not None, "train graph not foung! You should build_forward first." assert self._loss_var is not None and self._train_init_prog is not None, "train graph not foung! You should build_forward first."
optimizer._set_prog(self._train_prog, self._train_init_prog) optimizer._set_prog(self._train_prog, self._train_init_prog)
...@@ -334,18 +292,10 @@ class Trainer(object): ...@@ -334,18 +292,10 @@ class Trainer(object):
param.name] * weight_decay * optimizer.get_cur_learning_rate() param.name] * weight_decay * optimizer.get_cur_learning_rate()
fluid.layers.assign(output=param, input=updated_param) fluid.layers.assign(output=param, input=updated_param)
# loss.persistable = True
if use_ema: if use_ema:
ema = fluid.optimizer.ExponentialMovingAverage(ema_decay) ema = fluid.optimizer.ExponentialMovingAverage(ema_decay)
ema.update() ema.update()
# for bid, block in enumerate(self._train_prog.blocks):
# print('block id: '+str(bid))
# for var in block.vars:
# print("%d : %s" % (bid, var))
# print(self._train_prog)
self._exe.run(self._train_init_prog) self._exe.run(self._train_init_prog)
def set_as_aux(self): def set_as_aux(self):
...@@ -361,8 +311,6 @@ class Trainer(object): ...@@ -361,8 +311,6 @@ class Trainer(object):
phase: running phase. Currently support: train, predict. phase: running phase. Currently support: train, predict.
""" """
# assert not self._multi_task, "you cannot fit_reader in trainer when a train is wrapper by MultiHeadTrainer."
# load data
self._check_phase(phase) self._check_phase(phase)
if phase=='train': if phase=='train':
...@@ -370,10 +318,6 @@ class Trainer(object): ...@@ -370,10 +318,6 @@ class Trainer(object):
else: else:
assert self._pred_shape_and_dtypes is not None, "You need to build_forward or build_predict_head first to prepare input features." assert self._pred_shape_and_dtypes is not None, "You need to build_forward or build_predict_head first to prepare input features."
# 这里不确定是否要向上取整,需确认
# tail = self._num_examples % batch_size > 0
# self._steps_pur_epoch = self._num_examples // batch_size + 1 if tail else 0
batch_size = reader._batch_size batch_size = reader._batch_size
self._num_epochs = reader.num_epochs self._num_epochs = reader.num_epochs
...@@ -392,8 +336,6 @@ class Trainer(object): ...@@ -392,8 +336,6 @@ class Trainer(object):
reader_helper.check_io(self._task_head.inputs_attrs['backbone'], self._backbone.outputs_attr, in_name='task_head(backbone, train)', out_name='backbone') reader_helper.check_io(self._task_head.inputs_attrs['backbone'], self._backbone.outputs_attr, in_name='task_head(backbone, train)', out_name='backbone')
elif phase == 'predict': elif phase == 'predict':
self._predict_reader = reader self._predict_reader = reader
# tail = self._num_examples % batch_size > 0
# self._pred_steps_pur_epoch = reader.num_examples // batch_size + 1 if tail else 0
self._pred_steps_pur_epoch = reader.num_examples // batch_size self._pred_steps_pur_epoch = reader.num_examples // batch_size
shape_and_dtypes = self._pred_shape_and_dtypes shape_and_dtypes = self._pred_shape_and_dtypes
name_to_position = self._pred_name_to_position name_to_position = self._pred_name_to_position
...@@ -412,7 +354,6 @@ class Trainer(object): ...@@ -412,7 +354,6 @@ class Trainer(object):
iterator = reader._iterator() iterator = reader._iterator()
prefix = self.name prefix = self.name
# merge dataset iterators and create net input vars # merge dataset iterators and create net input vars
iterator = reader._iterator() iterator = reader._iterator()
prefix = self.name prefix = self.name
...@@ -442,19 +383,8 @@ class Trainer(object): ...@@ -442,19 +383,8 @@ class Trainer(object):
Args: Args:
model_path: the path of saved checkpoint/parameters. model_path: the path of saved checkpoint/parameters.
""" """
# load pretrain model (or ckpt)
# assert self._exe is not None, "You need to random_init_params before load checkpoints."
# if phase == 'train' and not self._train_init:
# self._init_exe_prog(for_train=True)
# self._exe.run(self._train_init_prog)
# if phase == 'predict' and not self._predict_init:
# self._init_exe_prog(for_train=False)
# self._exe.run(self._pred_init_prog)
assert self._train_init_prog is not None or self._pred_init_prog is not None, "model graph not built. You should at least build_forward or build_predict_forward to load its checkpoint." assert self._train_init_prog is not None or self._pred_init_prog is not None, "model graph not built. You should at least build_forward or build_predict_forward to load its checkpoint."
# if phase == 'train':
# assert self._train_init_prog is not None, "train graph not found! You should build_forward first before load checkpoint."
if self._train_init_prog is not None: if self._train_init_prog is not None:
saver.init_pretraining_params( saver.init_pretraining_params(
self._exe, self._exe,
...@@ -462,9 +392,7 @@ class Trainer(object): ...@@ -462,9 +392,7 @@ class Trainer(object):
convert=False, convert=False,
main_program=self._train_init_prog, main_program=self._train_init_prog,
strict=True) strict=True)
# elif phase == 'predict':
elif self._pred_init_prog is not None: elif self._pred_init_prog is not None:
# assert self._pred_init_prog is not None, "predict graph not found! You should build_predict_head first before load checkpoint."
saver.init_pretraining_params( saver.init_pretraining_params(
self._exe, self._exe,
model_path, model_path,
...@@ -489,7 +417,6 @@ class Trainer(object): ...@@ -489,7 +417,6 @@ class Trainer(object):
model_path, model_path,
convert=convert, convert=convert,
main_program=self._pred_prog) main_program=self._pred_prog)
# raise NotImplementedError()
def load_pretrain(self, model_path, convert=False): def load_pretrain(self, model_path, convert=False):
""" """
...@@ -498,8 +425,6 @@ class Trainer(object): ...@@ -498,8 +425,6 @@ class Trainer(object):
Args: Args:
model_path: the path of saved pretrained parameters. model_path: the path of saved pretrained parameters.
""" """
# load pretrain model (or ckpt)
# assert self._exe is not None, "You need to random_init_params before load pretrain models."
assert self._train_init_prog is not None, "training graph not found. You should at least build_forward to load its pretrained parameters." assert self._train_init_prog is not None, "training graph not found. You should at least build_forward to load its pretrained parameters."
saver.init_pretraining_params( saver.init_pretraining_params(
...@@ -569,33 +494,9 @@ class Trainer(object): ...@@ -569,33 +494,9 @@ class Trainer(object):
iterator = self._train_iterator iterator = self._train_iterator
self._distribute_train_prog = fluid.CompiledProgram(self._train_prog).with_data_parallel(loss_name=self._loss_var.name) self._distribute_train_prog = fluid.CompiledProgram(self._train_prog).with_data_parallel(loss_name=self._loss_var.name)
# if save_path is not None or save_steps is not None:
# assert self._save_predict_model, "If you want to save model, you need set save_predict_model=True when this trainer is built."
# if self._save_predict_model:
# if save_path is None and save_steps is None:
# print('Warning: model will not be saved for this run. If you want to save model, set save_path and save_steps.')
# else:
# assert save_path is not None, "argument save_path is required to save models."
# assert save_steps == -1 or save_steps > 0, "argument save_steps should be -1 (only save the last step of this task) or larger than 0"
# if save_path is not None and not os.path.exists(save_path):
# os.makedirs(save_path)
# else:
# assert save_path is None, "You should set save_predict_model as True, or the argument save_path is invalid."
# assert save_steps is None, "You should set save_predict_model as True, or the argument save_steps is invalid."
time_begin = time.time() time_begin = time.time()
for feed in iterator: for feed in iterator:
rt_outputs = self.train_one_step(feed) rt_outputs = self.train_one_step(feed)
# if gpu_dev_count > 1:
# feed, mask = feed
# rt_outputs = self._exe.run(self._train_prog, feed=feed, fetch_list=self._fetch_list)
# print(rt_outputs)
# print(len(rt_outputs))
# if gpu_dev_count > 1:
# while mask.pop() == False:
# rt_outputs.pop()
# rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)}
task_rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')} task_rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')}
self._task_head.batch_postprocess(task_rt_outputs) self._task_head.batch_postprocess(task_rt_outputs)
...@@ -613,18 +514,9 @@ class Trainer(object): ...@@ -613,18 +514,9 @@ class Trainer(object):
loss, print_steps / time_cost)) loss, print_steps / time_cost))
sys.stdout.flush() sys.stdout.flush()
time_begin = time.time() time_begin = time.time()
# self._check_save()
# if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps:
# print(cur_task.name+': train finished!')
# cur_task.save()
if self._num_epochs is None and not self._multi_task and self._cur_train_step == self._steps_pur_epoch: if self._num_epochs is None and not self._multi_task and self._cur_train_step == self._steps_pur_epoch:
break break
# save_path = os.path.join(main_conf['save_path'], 'ckpt',
# "step_" + str(global_step))
# fluid.io.save_persistables(self.exe, save_path, saver_program)
# print("ALL tasks train finished, exiting...")
def predict(self, output_dir=None, print_steps=1000): def predict(self, output_dir=None, print_steps=1000):
""" """
...@@ -645,7 +537,6 @@ class Trainer(object): ...@@ -645,7 +537,6 @@ class Trainer(object):
cur_predict_step = 0 cur_predict_step = 0
for feed in iterator: for feed in iterator:
rt_outputs = self.predict_one_batch(feed) rt_outputs = self.predict_one_batch(feed)
# rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')}
self._pred_head.batch_postprocess(rt_outputs) self._pred_head.batch_postprocess(rt_outputs)
cur_predict_step += 1 cur_predict_step += 1
...@@ -674,6 +565,9 @@ class Trainer(object): ...@@ -674,6 +565,9 @@ class Trainer(object):
def _set_multitask(self): def _set_multitask(self):
self._multi_task = True self._multi_task = True
def _set_nomultitask(self):
self._multi_task = False
def _set_task_id(self, task_id): def _set_task_id(self, task_id):
self._task_id = task_id self._task_id = task_id
...@@ -715,14 +609,13 @@ class Trainer(object): ...@@ -715,14 +609,13 @@ class Trainer(object):
def _set_dist_train(self, prog): def _set_dist_train(self, prog):
self._distribute_train_prog = prog self._distribute_train_prog = prog
def _set_dist_pred(self, prog):
self._distribute_pred_prog = prog
def _set_fetch_list(self, fetch_list): def _set_fetch_list(self, fetch_list):
self._fetch_list = fetch_list self._fetch_list = fetch_list
# def train_one_step(self, batch, executor=None, distribute_train_prog=None, fetch_list=None):
def train_one_step(self, batch): def train_one_step(self, batch):
# exe = self._exe if executor is None else executor
# distribute_train_prog = self._distribute_train_prog if distribute_train_prog is None else distribute_train_prog
# fetch_list = self._fetch_list if fetch_list is None else fetch_list
exe = self._exe exe = self._exe
distribute_train_prog = self._distribute_train_prog distribute_train_prog = self._distribute_train_prog
...@@ -748,19 +641,17 @@ class Trainer(object): ...@@ -748,19 +641,17 @@ class Trainer(object):
def predict_one_batch(self, batch): def predict_one_batch(self, batch):
if gpu_dev_count > 1: if gpu_dev_count > 1:
feed, mask = batch feed, mask = batch
rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list) rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list, use_prune=True)
num_fakes = decode_fake(len(rt_outputs[0]), mask, self._predict_batch_size) num_fakes = decode_fake(len(rt_outputs[0]), mask, self._predict_batch_size)
if num_fakes: if num_fakes:
rt_outputs = [i[:-num_fakes] for i in rt_outputs] rt_outputs = [i[:-num_fakes] for i in rt_outputs]
else: else:
feed = self._pred_feed_batch_process_fn(batch) feed = self._pred_feed_batch_process_fn(batch)
rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list) rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list, use_prune=True)
rt_outputs = {k:v for k,v in zip(self._pred_fetch_name_list, rt_outputs)} rt_outputs = {k:v for k,v in zip(self._pred_fetch_name_list, rt_outputs)}
return rt_outputs return rt_outputs
@property @property
def name(self): def name(self):
return self._name return self._name
......
...@@ -166,6 +166,39 @@ def create_iterator_fn(iterator, iterator_prefix, shape_and_dtypes, outname_to_p ...@@ -166,6 +166,39 @@ def create_iterator_fn(iterator, iterator_prefix, shape_and_dtypes, outname_to_p
return iterator_fn return iterator_fn
def create_multihead_inference_fn(iterators, iterator_prefixes, joint_shape_and_dtypes, names, outname_to_pos, task_name2id, dev_count=1):
def iterator(task_name):
while True:
id = task_name2id[task_name]
# id = np.random.choice(task_ids, p=weights)
task_id_tensor = np.array([id]).astype("int64")
for i in range(dev_count):
outputs = next(iterators[id]) # dict type
prefix = iterator_prefixes[id]
results = {}
results['__task_id'] = task_id_tensor
for outname, val in outputs.items():
task_outname = prefix + '.' + outname
if outname in names[id]:
idx = outname_to_pos[id][outname]
val = _check_and_adapt_shape_dtype(val, joint_shape_and_dtypes[id][idx], message=outname+': ')
results[outname] = val
if task_outname in names[id]:
idx = outname_to_pos[id][task_outname]
val = _check_and_adapt_shape_dtype(val, joint_shape_and_dtypes[id][idx], message=task_outname+': ')
results[task_outname] = val
yield results
return iterator
def create_multihead_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtypes, mrs, names, outname_to_pos, dev_count=1, keep_one_task=True): def create_multihead_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtypes, mrs, names, outname_to_pos, dev_count=1, keep_one_task=True):
task_ids = range(len(iterators)) task_ids = range(len(iterators))
weights = [mr / float(sum(mrs)) for mr in mrs] weights = [mr / float(sum(mrs)) for mr in mrs]
......
...@@ -5,7 +5,7 @@ name = paddlepalm ...@@ -5,7 +5,7 @@ name = paddlepalm
author = zhangyiming author = zhangyiming
author_email = zhangyiming04@baidu.com author_email = zhangyiming04@baidu.com
version = 2.0.2 version = 2.1.0
description = PaddlePALM description = PaddlePALM
long_description = file: README.md long_description = file: README.md
...@@ -35,9 +35,6 @@ keywords = ...@@ -35,9 +35,6 @@ keywords =
packages = find: packages = find:
#install_requires =
# paddlepaddle-gpu >= 1.5.2
include_package_data = True include_package_data = True
zip_safe = False zip_safe = False
......
...@@ -25,7 +25,7 @@ with open("README.md", "r") as fh: ...@@ -25,7 +25,7 @@ with open("README.md", "r") as fh:
long_description = fh.read() long_description = fh.read()
setuptools.setup( setuptools.setup(
name="paddlepalm", name="paddlepalm",
version="2.0.2", version="2.1.0",
author="PaddlePaddle", author="PaddlePaddle",
author_email="zhangyiming04@baidu.com", author_email="zhangyiming04@baidu.com",
description="a flexible, general and easy-to-use NLP large-scale pretraining and multi-task learning framework.", description="a flexible, general and easy-to-use NLP large-scale pretraining and multi-task learning framework.",
...@@ -68,7 +68,7 @@ setuptools.setup( ...@@ -68,7 +68,7 @@ setuptools.setup(
'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.7',
], ],
install_requires = [ install_requires = [
'paddlepaddle-gpu>=1.7.0' 'paddlepaddle-gpu>=1.8.0'
] ]
) )
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册