From 01a745b7c020072e835185c13315d7bebd46e818 Mon Sep 17 00:00:00 2001 From: xixiaoyao Date: Mon, 11 May 2020 21:21:25 +0800 Subject: [PATCH] add multihead inference doc --- .gitignore | 2 + README.md | 4 + ...{evaluate-intent.py => evaluate_intent.py} | 0 .../{evaluate-slot.py => evaluate_slot.py} | 0 examples/multi-task/joint_predict.py | 90 ++++++++++ .../{predict-intent.py => predict_intent.py} | 0 .../{predict-slot.py => predict_slot.py} | 0 paddlepalm/head/ner.py | 4 +- paddlepalm/multihead_trainer.py | 164 +++++++++++++----- paddlepalm/trainer.py | 141 ++------------- paddlepalm/utils/reader_helper.py | 33 ++++ setup.cfg | 5 +- setup.py | 4 +- 13 files changed, 273 insertions(+), 174 deletions(-) rename examples/multi-task/{evaluate-intent.py => evaluate_intent.py} (100%) rename examples/multi-task/{evaluate-slot.py => evaluate_slot.py} (100%) create mode 100644 examples/multi-task/joint_predict.py rename examples/multi-task/{predict-intent.py => predict_intent.py} (100%) rename examples/multi-task/{predict-slot.py => predict_slot.py} (100%) diff --git a/.gitignore b/.gitignore index 04d23e2..3bf2d47 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ *.pyc paddlepalm.egg-info +data __pycache__ +*egg-info pretrain_model pretrain output* diff --git a/README.md b/README.md index 40f5320..e9cb69d 100644 --- a/README.md +++ b/README.md @@ -225,6 +225,10 @@ To save models/checkpoints and logs during training, just call `trainer.set_save #### Evaluation/Inference To do predict/evaluation after a training stage, just create another three reader, backbone and head instance with `phase='predict'` (repeat step 1~4 above). Then do predicting with `predict` method in trainer (no need to create another trainer). More implementation details see [this](https://github.com/PaddlePaddle/PALM/tree/master/examples/predict). +If you want to do evaluation during training process, use `trainer.train_one_step()` instead of `trainer.train()`. The `trainer.train_one_step(batch)` achieves to train only one step, thus you can insert evaluation code into any point of training process. The argument `batch` can be fetched from `trainer.get_one_batch`. + +PaddlePALM also supports multi-head inference, please reference `examples/multi-task/joint_predict.py`. + #### Play with Multiple GPUs If there exists multiple GPUs in your environment, you can control the number and index of these GPUs through the environment variable [CUDA_VISIBLE_DEVICES](https://devblogs.nvidia.com/cuda-pro-tip-control-gpu-visibility-cuda_visible_devices/). For example, if 4 GPUs in your enviroment, indexed with 0,1,2,3, you can run with GPU2 only with following commands diff --git a/examples/multi-task/evaluate-intent.py b/examples/multi-task/evaluate_intent.py similarity index 100% rename from examples/multi-task/evaluate-intent.py rename to examples/multi-task/evaluate_intent.py diff --git a/examples/multi-task/evaluate-slot.py b/examples/multi-task/evaluate_slot.py similarity index 100% rename from examples/multi-task/evaluate-slot.py rename to examples/multi-task/evaluate_slot.py diff --git a/examples/multi-task/joint_predict.py b/examples/multi-task/joint_predict.py new file mode 100644 index 0000000..8cef53f --- /dev/null +++ b/examples/multi-task/joint_predict.py @@ -0,0 +1,90 @@ +# coding=utf-8 +import paddlepalm as palm +import json +import numpy as np + + +if __name__ == '__main__': + + # configs + max_seqlen = 128 + batch_size = 128 + num_epochs = 20 + print_steps = 5 + lr = 2e-5 + num_classes = 130 + weight_decay = 0.01 + num_classes_intent = 26 + dropout_prob = 0.1 + random_seed = 0 + label_map = './data/atis/atis_slot/label_map.json' + vocab_path = './pretrain/ERNIE-v2-en-base/vocab.txt' + + train_slot = './data/atis/atis_slot/train.tsv' + train_intent = './data/atis/atis_intent/train.tsv' + + config = json.load(open('./pretrain/ERNIE-v2-en-base/ernie_config.json')) + input_dim = config['hidden_size'] + + # ----------------------- for training ----------------------- + + # step 1-1: create readers + slot_reader = palm.reader.SequenceLabelReader(vocab_path, max_seqlen, label_map, seed=random_seed, phase='predict') + intent_reader = palm.reader.ClassifyReader(vocab_path, max_seqlen, seed=random_seed, phase='predict') + + # step 1-2: load train data + slot_reader.load_data(train_slot, file_format='tsv', num_epochs=None, batch_size=batch_size) + intent_reader.load_data(train_intent, batch_size=batch_size, num_epochs=None) + + # step 2: create a backbone of the model to extract text features + ernie = palm.backbone.ERNIE.from_config(config, phase='predict') + + # step 3: register readers with ernie backbone + slot_reader.register_with(ernie) + intent_reader.register_with(ernie) + + # step 4: create task output heads + slot_head = palm.head.SequenceLabel(num_classes, input_dim, dropout_prob, phase='predict') + intent_head = palm.head.Classify(num_classes_intent, input_dim, dropout_prob, phase='predict') + + # step 5-1: create task trainers and multiHeadTrainer + trainer_slot = palm.Trainer("slot", mix_ratio=1.0) + trainer_intent = palm.Trainer("intent", mix_ratio=1.0) + trainer = palm.MultiHeadTrainer([trainer_slot, trainer_intent]) + # # step 5-2: build forward graph with backbone and task head + vars = trainer_intent.build_predict_forward(ernie, intent_head) + vars = trainer_slot.build_predict_forward(ernie, slot_head) + loss_var = trainer.build_predict_forward() + + # load checkpoint + trainer.load_ckpt('outputs/ckpt.step300') + + # merge inference readers + joint_iterator = trainer.merge_inference_readers([slot_reader, intent_reader]) + + # for test + # batch = next(joint_iterator('slot')) + # results = trainer.predict_one_batch('slot', batch) + # batch = next(joint_iterator('intent')) + # results = trainer.predict_one_batch('intent', batch) + + # predict slot filling + print('processing slot filling examples...') + print('num examples: '+str(slot_reader.num_examples)) + cnt = 0 + for batch in joint_iterator('slot'): + cnt += len(trainer.predict_one_batch('slot', batch)['logits']) + if cnt % 1000 <= 128: + print(str(cnt)+'th example processed.') + print(str(cnt)+'th example processed.') + + # predict intent recognition + print('processing intent recognition examples...') + print('num examples: '+str(intent_reader.num_examples)) + cnt = 0 + for batch in joint_iterator('intent'): + cnt += len(trainer.predict_one_batch('intent', batch)['logits']) + if cnt % 1000 <= 128: + print(str(cnt)+'th example processed.') + print(str(cnt)+'th example processed.') + diff --git a/examples/multi-task/predict-intent.py b/examples/multi-task/predict_intent.py similarity index 100% rename from examples/multi-task/predict-intent.py rename to examples/multi-task/predict_intent.py diff --git a/examples/multi-task/predict-slot.py b/examples/multi-task/predict_slot.py similarity index 100% rename from examples/multi-task/predict-slot.py rename to examples/multi-task/predict_slot.py diff --git a/paddlepalm/head/ner.py b/paddlepalm/head/ner.py index 9b6c67f..bf4242c 100644 --- a/paddlepalm/head/ner.py +++ b/paddlepalm/head/ner.py @@ -60,7 +60,7 @@ class SequenceLabel(Head): if self._is_training: return {'loss': [[1], 'float32']} else: - return {'emission': [[-1, self.num_classes], 'float32']} + return {'logits': [[-1, -1, self.num_classes], 'float32']} def build(self, inputs, scope_name=''): token_emb = inputs['backbone']['encoder_outputs'] @@ -107,7 +107,7 @@ class SequenceLabel(Head): return {"loss": avg_cost} else: - return {"emission": emission} + return {"logits": emission} def batch_postprocess(self, rt_outputs): if not self._is_training: diff --git a/paddlepalm/multihead_trainer.py b/paddlepalm/multihead_trainer.py index 7f6ceb7..f923d5c 100644 --- a/paddlepalm/multihead_trainer.py +++ b/paddlepalm/multihead_trainer.py @@ -24,8 +24,6 @@ class MultiHeadTrainer(Trainer): trainers: a list of Trainer objects. """ - # if reuse_flags is not None: - # assert len(reuse_flags) == len(trainers) Trainer.__init__(self, '') self._trainers = trainers @@ -56,7 +54,6 @@ class MultiHeadTrainer(Trainer): for t in self._trainers: t._set_multitask() - # def build_forward(self, backbone, heads): def build_forward(self): """ Build forward computation graph for training, which usually built from input layer to loss node. @@ -95,19 +92,120 @@ class MultiHeadTrainer(Trainer): self._task_id_var = task_id_var self._loss_var = loss_var self._fetch_list = [loss_var.name] - # for b in train_prog.blocks: - # for var in b.vars: - # pass - # if 'task_id' in var: - # print(var) - # exit() - # print(var) if not self._multi_task: self._init_exe_prog(for_train=True) return loss_var + + def build_predict_forward(self): + head_dict = {} + backbone = self._trainers[0]._pred_backbone + for i in self._trainers: + assert i._pred_head is not None and i._pred_backbone is not None, "You should build_predict_forward for the {} task".format(i._name) + assert i._pred_backbone == backbone, "The backbone for each task must be the same" + head_dict[i._name] = i._pred_head + + pred_prog = fluid.Program() + pred_init_prog = fluid.Program() + self._pred_prog = pred_prog + self._pred_init_prog = pred_init_prog - def fit_readers(self, reader_dict): - raise NotImplementedError() + def get_loss(i): + head = head_dict[self._trainers[i].name] + self._trainers[i]._lock_prog = True + pred_vars = self._trainers[i].build_predict_forward(backbone, head) + self._trainers[i]._lock_prog = False + # return loss_var + + task_fns = {i: lambda i=i: get_loss(i) for i in range(len(self._trainers))} + + with fluid.program_guard(pred_prog, pred_init_prog): + task_id_var = fluid.data(name="__task_id",shape=[1],dtype='int64') + + loss_var = layers.switch_case( + branch_index=task_id_var, + branch_fns=task_fns + ) + # self._task_id_var = task_id_var + # self._loss_var = loss_var + # self._fetch_list = [loss_var.name] + if not self._multi_task: + self._init_exe_prog(for_train=False) + # return self.build_forward() + + # """ + # Build computation graph for evaluation and prediction. + + # Arguments: + # - pred_backbone: a Backbone object with phase == 'predict'. For evaluating model during training, the predict backbone should keep the same with train backbone. + # - pred_head: a Head object with phase == 'predict'. For evaluating model during training, the predict head should keep the same with train head. + # + # Return: + # - output_vars: dict type. Each value is a computational graph variable(node) argumented by pred_head outputs_attr. + # """ + # for i in self._trainers: + # assert i._predict_vars is not None, "{} need to build_predict_forward before " + # + # return output_vars + + def merge_inference_readers(self, readers): + + for r in readers: + assert r._phase == 'predict' + + if isinstance(readers, list): + reader_dict = {k.name: v for k,v in zip(self._trainers, readers)} + elif isinstance(readers, dict): + reader_dict = readers + else: + raise ValueError() + + num_heads = len(self._trainers) + assert len(reader_dict) == num_heads, "received number of readers is not consistent with trainers." + + trainer_dict = {t.name: t for t in self._trainers} + task_name2id = {t.name: idx for idx, t in enumerate(self._trainers)} + self._task_name2id = task_name2id + + self._finish_steps = {} + self._finish = {} + input_names = [] + name_to_pos = [] + joint_shape_and_dtypes = [] + iterators = [] + prefixes = [] + mrs = [] + net_inputs = [] + global_steps = 0 + for t in self._trainers: + assert t.name in reader_dict + assert reader_dict[t.name].num_epochs is None, "{}: num_epochs is not None. \ + To run with multi-head mode, num_epochs of each Trainer should be set as None.".format(t.name) + # print(num_epochs, t.mix_ratio, base_steps_pur_epoch) + self._finish_steps[t.name] = 9999999999 + self._finish[t.name] = True + + # t._set_task_id(self._task_id_var) + t.fit_reader(reader_dict[t.name], phase='predict') + net_inputs.append(t._pred_net_inputs) + prefixes.append(t.name) + iterators.append(t._raw_iterator_fn()) + input_names.append(t._pred_input_names) + name_to_pos.append(t._pred_name_to_position) + joint_shape_and_dtypes.append(t._pred_shape_and_dtypes) + + iterator_fn = reader_helper.create_multihead_inference_fn(iterators, prefixes, joint_shape_and_dtypes, \ + input_names, name_to_pos, task_name2id, dev_count=dev_count) + feed_batch_process_fn = reader_helper.create_feed_batch_process_fn(net_inputs) + + if gpu_dev_count > 1: + raise NotImplementedError('currently only single-gpu mode has been supported running with multi-task mode.') + # distribute_feeder_fn = data_feeder(iterator_fn, feed_batch_process_fn, phase=phase, is_multi=True, with_arg=True) + else: + distribute_feeder_fn = iterator_fn + + self._predict_iterator_fn = distribute_feeder_fn + self._pred_feed_batch_process_fn = feed_batch_process_fn + return distribute_feeder_fn def fit_readers_with_mixratio(self, readers, sampling_reference, num_epochs, phase='train'): """ @@ -247,18 +345,6 @@ class MultiHeadTrainer(Trainer): if finish: break - # if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps: - # print(cur_task.name+': train finished!') - # cur_task.save() - - # if (save_predict or save_ckpt) and self._cur_train_step % save_steps == 0: - # if save_predict: - # self.save(save_path, suffix='pred.step'+str(self._cur_train_step)) - # if save_ckpt: - # fluid.io.save_persistables(self._exe, os.path.join(save_path, 'ckpt.step'+str(self._cur_train_step)), self._train_prog) - # print('checkpoint has been saved at '+os.path.join(save_path, 'ckpt.step'+str(self._cur_train_step))) - - def train_one_step(self, batch): if dev_count > 1: @@ -268,33 +354,29 @@ class MultiHeadTrainer(Trainer): assert isinstance(batch, dict) task_id = batch['__task_id'][0] - # rt_outputs = self._trainers[task_id].train_one_step(batch, self._exe, self._distribute_train_prog, self._fetch_list) rt_outputs = self._trainers[task_id].train_one_step(batch) self._cur_train_step += 1 self._check_save() return rt_outputs, task_id - # if dev_count > 1: - # # feed, mask, task_id = batch - # for f in feed: - # f['branch'] = np.array([task_id], dtype='int64') - # rt_outputs = self.exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._trainers[task_id]._fetch_list) - # num_fakes = decode_fake(len(rt_outputs[0]), mask, self._trainers[task_id]._batch_size) - # for _ in range(num_fakes): - # for item in rt_outputs: - # item.pop() - # else: - # feed, task_id = batch - # feed['branch'] = np.array([task_id], dtype='int64') - # rt_outputs = self._exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._trainers[task_id]._fetch_list) - - def predict_one_batch(self, batch): - raise NotImplementedError() + def predict_one_batch(self, task_name, batch): + if dev_count > 1: + raise NotImplementedError() + + # batch = next(self._predict_iterator_fn(task_name)) + t = self._trainers[self._task_name2id[task_name]] + # t._set_exe(self._exe) + t._set_dist_pred(self._trainers[self._task_name2id[task_name]]._pred_prog) + rt_outputs = t.predict_one_batch(batch) + return rt_outputs def predict(self, output_dir=None, print_steps=1000): raise NotImplementedError() + # iterator = self._predict_iterator + # self._distribute_pred_prog = fluid.CompiledProgram(self._pred_prog).with_data_parallel() @property def overall_train_steps(self): return self._overall_train_steps + diff --git a/paddlepalm/trainer.py b/paddlepalm/trainer.py index 8556019..e899696 100644 --- a/paddlepalm/trainer.py +++ b/paddlepalm/trainer.py @@ -60,21 +60,11 @@ class Trainer(object): self._check_save = lambda: False - # if save_predict_model: - # self._save_predict_model = True - # assert pred_head is not None, "pred_head is required to save predict model." - # self._pred_reader = reader.clone(phase='predict') - # else: - # assert pred_head is None, "You should set save_predict_model as True, or the pred_head is invalid." - # self._save_predict_model = False - # self._pred_reader = None - - # self._save_steps = save_steps - self._task_reuse_scope = name if reuse_head_with is None else reuse_head_with self._feeded_var_names = None self._target_vars = None + self._predict_vars = None self._num_examples = 0 @@ -98,8 +88,7 @@ class Trainer(object): self._pred_fetch_name_list = [] self._pred_fetch_var_list = [] - # exe is built when random_init_params is called. - # self._exe = helper.build_executor(gpu_dev_count>0) + # exe is built when random_init_params called. self._exe = None self._save_protocol = { @@ -124,11 +113,9 @@ class Trainer(object): """ - # assert not self._multi_task, "you cannot build_forward in trainer when a train is wrapper by MultiHeadTrainer." self._task_head = task_head self._backbone = backbone - # assert self._backbone is not None, "backbone is required for Trainer to build net forward to run with single task mode" self._build_forward = True # create reader, task @@ -137,12 +124,6 @@ class Trainer(object): pred_task_attrs = [] task_attr_from_reader = helper.encode_inputs(self._task_head.inputs_attrs['reader'], self.name) - # task_attr_from_reader = self._task_head.inputs_attrs['reader'] - - # _check_io(backbone.inputs_attr, inst._reader['train'].outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train') - # _check_io(inst.taskblock['train'].inputs_attrs['reader'], inst._reader['train'].outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train') - # _check_io(inst._taskblock['train'].inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone') - # merge reader input attrs from backbone and task_instances input_names, shape_and_dtypes, name_to_position = reader_helper.merge_input_attrs(backbone.inputs_attr, task_attr_from_reader, insert_taskid=False) @@ -177,11 +158,6 @@ class Trainer(object): self._net_inputs = net_inputs assert sorted(bb_output_vars.keys()) == sorted(backbone.outputs_attr.keys()) - # self._bb_output_vars.keys - - # fluid.framework.switch_main_program(train_prog) - # fluid.framework.switch_startup_program(train_init_prog) - task_output_vars = {} task_inputs = {'backbone': bb_output_vars} task_inputs_from_reader = helper.decode_inputs(net_inputs, self.name) @@ -205,23 +181,12 @@ class Trainer(object): task_fetches = {k: v.name for k,v in task_output_vars.items()} self._fetches = task_fetches self._fetch_names, self._fetch_list = zip(*self._fetches.items()) - # fetches = task_fetches - # fetches['__task_id'] = net_inputs['__task_id'].name - - # compute loss - # task_id_var = net_inputs['__task_id'] - # task_id_vec = layers.one_hot(task_id_var, num_instances) - # losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0) - # loss = layers.reduce_sum(task_id_vec * losses) if not self._lock_prog: with fluid.program_guard(train_prog, train_init_prog): loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss']) else: loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss']) - # for _id, block in enumerate(self._train_prog.blocks): - # for var in block.vars: - # print("[debug] : %d, %s" % (_id, var)) self._loss_var = loss_var if not self._multi_task: @@ -242,19 +207,13 @@ class Trainer(object): """ self._pred_head = pred_head self._pred_backbone = pred_backbone - # self._pred_reader = self._reader.clone(phase='pred') pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name) - # pred_task_attr_from_reader = self._pred_head.inputs_attrs['reader'] - # _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred') - - # _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred') - # _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred') - # _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone') pred_input_names, pred_shape_and_dtypes, pred_name_to_position = reader_helper.merge_input_attrs(pred_backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False) pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)] self._pred_shape_and_dtypes = pred_shape_and_dtypes self._pred_name_to_position = pred_name_to_position + self._pred_input_names = pred_input_names pred_prog = fluid.Program() self._pred_prog = pred_prog @@ -262,14 +221,12 @@ class Trainer(object): self._pred_init_prog = pred_init_prog with fluid.program_guard(pred_prog, pred_init_prog): pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs) - # pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_') pred_bb_output_vars = pred_backbone.build(pred_net_inputs) self._pred_net_inputs = pred_net_inputs # prepare predict vars for saving inference model with fluid.program_guard(pred_prog, pred_init_prog): cur_inputs = helper.decode_inputs(pred_net_inputs, self.name) - # self.pred_input = cur_inputs self._pred_input_name_list, self._pred_input_varname_list = \ zip(*[[k, v.name] for k,v in cur_inputs.items()]) @@ -284,9 +241,11 @@ class Trainer(object): self._pred_fetch_name_list = [] self._pred_fetch_var_list = [] - if not self._multi_task: - self._init_exe_prog(for_train=False) - self._exe.run(self._pred_init_prog) + # if not self._multi_task: + self._init_exe_prog(for_train=False) + self._exe.run(self._pred_init_prog) + + self._predict_vars = output_vars return output_vars @@ -301,7 +260,6 @@ class Trainer(object): - ema_decay: optional, default is None. Only works with use_ema == True. Control decay rate of EMA strategy. """ - # assert not self._multi_task, "you cannot build_backward in trainer when a train is wrapper by MultiHeadTrainer." # build optimizer assert self._loss_var is not None and self._train_init_prog is not None, "train graph not foung! You should build_forward first." optimizer._set_prog(self._train_prog, self._train_init_prog) @@ -334,18 +292,10 @@ class Trainer(object): param.name] * weight_decay * optimizer.get_cur_learning_rate() fluid.layers.assign(output=param, input=updated_param) - - # loss.persistable = True if use_ema: ema = fluid.optimizer.ExponentialMovingAverage(ema_decay) ema.update() - # for bid, block in enumerate(self._train_prog.blocks): - # print('block id: '+str(bid)) - # for var in block.vars: - # print("%d : %s" % (bid, var)) - - # print(self._train_prog) self._exe.run(self._train_init_prog) def set_as_aux(self): @@ -361,8 +311,6 @@ class Trainer(object): phase: running phase. Currently support: train, predict. """ - # assert not self._multi_task, "you cannot fit_reader in trainer when a train is wrapper by MultiHeadTrainer." - # load data self._check_phase(phase) if phase=='train': @@ -370,10 +318,6 @@ class Trainer(object): else: assert self._pred_shape_and_dtypes is not None, "You need to build_forward or build_predict_head first to prepare input features." - # 这里不确定是否要向上取整,需确认 - # tail = self._num_examples % batch_size > 0 - # self._steps_pur_epoch = self._num_examples // batch_size + 1 if tail else 0 - batch_size = reader._batch_size self._num_epochs = reader.num_epochs @@ -392,8 +336,6 @@ class Trainer(object): reader_helper.check_io(self._task_head.inputs_attrs['backbone'], self._backbone.outputs_attr, in_name='task_head(backbone, train)', out_name='backbone') elif phase == 'predict': self._predict_reader = reader - # tail = self._num_examples % batch_size > 0 - # self._pred_steps_pur_epoch = reader.num_examples // batch_size + 1 if tail else 0 self._pred_steps_pur_epoch = reader.num_examples // batch_size shape_and_dtypes = self._pred_shape_and_dtypes name_to_position = self._pred_name_to_position @@ -412,7 +354,6 @@ class Trainer(object): iterator = reader._iterator() prefix = self.name - # merge dataset iterators and create net input vars iterator = reader._iterator() prefix = self.name @@ -442,19 +383,8 @@ class Trainer(object): Args: model_path: the path of saved checkpoint/parameters. """ - # load pretrain model (or ckpt) - # assert self._exe is not None, "You need to random_init_params before load checkpoints." - # if phase == 'train' and not self._train_init: - # self._init_exe_prog(for_train=True) - # self._exe.run(self._train_init_prog) - # if phase == 'predict' and not self._predict_init: - # self._init_exe_prog(for_train=False) - # self._exe.run(self._pred_init_prog) - assert self._train_init_prog is not None or self._pred_init_prog is not None, "model graph not built. You should at least build_forward or build_predict_forward to load its checkpoint." - # if phase == 'train': - # assert self._train_init_prog is not None, "train graph not found! You should build_forward first before load checkpoint." if self._train_init_prog is not None: saver.init_pretraining_params( self._exe, @@ -462,9 +392,7 @@ class Trainer(object): convert=False, main_program=self._train_init_prog, strict=True) - # elif phase == 'predict': elif self._pred_init_prog is not None: - # assert self._pred_init_prog is not None, "predict graph not found! You should build_predict_head first before load checkpoint." saver.init_pretraining_params( self._exe, model_path, @@ -489,7 +417,6 @@ class Trainer(object): model_path, convert=convert, main_program=self._pred_prog) - # raise NotImplementedError() def load_pretrain(self, model_path, convert=False): """ @@ -498,8 +425,6 @@ class Trainer(object): Args: model_path: the path of saved pretrained parameters. """ - # load pretrain model (or ckpt) - # assert self._exe is not None, "You need to random_init_params before load pretrain models." assert self._train_init_prog is not None, "training graph not found. You should at least build_forward to load its pretrained parameters." saver.init_pretraining_params( @@ -569,33 +494,9 @@ class Trainer(object): iterator = self._train_iterator self._distribute_train_prog = fluid.CompiledProgram(self._train_prog).with_data_parallel(loss_name=self._loss_var.name) - # if save_path is not None or save_steps is not None: - # assert self._save_predict_model, "If you want to save model, you need set save_predict_model=True when this trainer is built." - # if self._save_predict_model: - # if save_path is None and save_steps is None: - # print('Warning: model will not be saved for this run. If you want to save model, set save_path and save_steps.') - # else: - # assert save_path is not None, "argument save_path is required to save models." - # assert save_steps == -1 or save_steps > 0, "argument save_steps should be -1 (only save the last step of this task) or larger than 0" - # if save_path is not None and not os.path.exists(save_path): - # os.makedirs(save_path) - # else: - # assert save_path is None, "You should set save_predict_model as True, or the argument save_path is invalid." - # assert save_steps is None, "You should set save_predict_model as True, or the argument save_steps is invalid." - time_begin = time.time() for feed in iterator: rt_outputs = self.train_one_step(feed) - # if gpu_dev_count > 1: - # feed, mask = feed - # rt_outputs = self._exe.run(self._train_prog, feed=feed, fetch_list=self._fetch_list) - # print(rt_outputs) - # print(len(rt_outputs)) - # if gpu_dev_count > 1: - # while mask.pop() == False: - # rt_outputs.pop() - - # rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)} task_rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')} self._task_head.batch_postprocess(task_rt_outputs) @@ -613,18 +514,9 @@ class Trainer(object): loss, print_steps / time_cost)) sys.stdout.flush() time_begin = time.time() - # self._check_save() - # if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps: - # print(cur_task.name+': train finished!') - # cur_task.save() if self._num_epochs is None and not self._multi_task and self._cur_train_step == self._steps_pur_epoch: break - # save_path = os.path.join(main_conf['save_path'], 'ckpt', - # "step_" + str(global_step)) - # fluid.io.save_persistables(self.exe, save_path, saver_program) - - # print("ALL tasks train finished, exiting...") def predict(self, output_dir=None, print_steps=1000): """ @@ -645,7 +537,6 @@ class Trainer(object): cur_predict_step = 0 for feed in iterator: rt_outputs = self.predict_one_batch(feed) - # rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')} self._pred_head.batch_postprocess(rt_outputs) cur_predict_step += 1 @@ -674,6 +565,9 @@ class Trainer(object): def _set_multitask(self): self._multi_task = True + def _set_nomultitask(self): + self._multi_task = False + def _set_task_id(self, task_id): self._task_id = task_id @@ -715,14 +609,13 @@ class Trainer(object): def _set_dist_train(self, prog): self._distribute_train_prog = prog + def _set_dist_pred(self, prog): + self._distribute_pred_prog = prog + def _set_fetch_list(self, fetch_list): self._fetch_list = fetch_list - # def train_one_step(self, batch, executor=None, distribute_train_prog=None, fetch_list=None): def train_one_step(self, batch): - # exe = self._exe if executor is None else executor - # distribute_train_prog = self._distribute_train_prog if distribute_train_prog is None else distribute_train_prog - # fetch_list = self._fetch_list if fetch_list is None else fetch_list exe = self._exe distribute_train_prog = self._distribute_train_prog @@ -748,19 +641,17 @@ class Trainer(object): def predict_one_batch(self, batch): if gpu_dev_count > 1: feed, mask = batch - rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list) + rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list, use_prune=True) num_fakes = decode_fake(len(rt_outputs[0]), mask, self._predict_batch_size) if num_fakes: rt_outputs = [i[:-num_fakes] for i in rt_outputs] else: feed = self._pred_feed_batch_process_fn(batch) - rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list) + rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list, use_prune=True) rt_outputs = {k:v for k,v in zip(self._pred_fetch_name_list, rt_outputs)} return rt_outputs - - @property def name(self): return self._name diff --git a/paddlepalm/utils/reader_helper.py b/paddlepalm/utils/reader_helper.py index 4dc5e19..a53a3f0 100644 --- a/paddlepalm/utils/reader_helper.py +++ b/paddlepalm/utils/reader_helper.py @@ -166,6 +166,39 @@ def create_iterator_fn(iterator, iterator_prefix, shape_and_dtypes, outname_to_p return iterator_fn +def create_multihead_inference_fn(iterators, iterator_prefixes, joint_shape_and_dtypes, names, outname_to_pos, task_name2id, dev_count=1): + + def iterator(task_name): + while True: + id = task_name2id[task_name] + # id = np.random.choice(task_ids, p=weights) + task_id_tensor = np.array([id]).astype("int64") + + for i in range(dev_count): + + outputs = next(iterators[id]) # dict type + + prefix = iterator_prefixes[id] + results = {} + results['__task_id'] = task_id_tensor + for outname, val in outputs.items(): + task_outname = prefix + '.' + outname + + if outname in names[id]: + idx = outname_to_pos[id][outname] + val = _check_and_adapt_shape_dtype(val, joint_shape_and_dtypes[id][idx], message=outname+': ') + results[outname] = val + + if task_outname in names[id]: + idx = outname_to_pos[id][task_outname] + val = _check_and_adapt_shape_dtype(val, joint_shape_and_dtypes[id][idx], message=task_outname+': ') + results[task_outname] = val + + yield results + + return iterator + + def create_multihead_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtypes, mrs, names, outname_to_pos, dev_count=1, keep_one_task=True): task_ids = range(len(iterators)) weights = [mr / float(sum(mrs)) for mr in mrs] diff --git a/setup.cfg b/setup.cfg index d25c843..e9a55de 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,7 +5,7 @@ name = paddlepalm author = zhangyiming author_email = zhangyiming04@baidu.com -version = 2.0.2 +version = 2.1.0 description = PaddlePALM long_description = file: README.md @@ -35,9 +35,6 @@ keywords = packages = find: -#install_requires = -# paddlepaddle-gpu >= 1.5.2 - include_package_data = True zip_safe = False diff --git a/setup.py b/setup.py index d923bc6..96e32b3 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ with open("README.md", "r") as fh: long_description = fh.read() setuptools.setup( name="paddlepalm", - version="2.0.2", + version="2.1.0", author="PaddlePaddle", author_email="zhangyiming04@baidu.com", description="a flexible, general and easy-to-use NLP large-scale pretraining and multi-task learning framework.", @@ -68,7 +68,7 @@ setuptools.setup( 'Programming Language :: Python :: 3.7', ], install_requires = [ - 'paddlepaddle-gpu>=1.7.0' + 'paddlepaddle-gpu>=1.8.0' ] ) -- GitLab