diff --git a/paddlepalm/__init__.py b/paddlepalm/__init__.py index 5522f0faf96ce48efa034e522b4d793b740bcb23..402eb17787b0cdbc5f7079cdaff81d93585d5b34 100644 --- a/paddlepalm/__init__.py +++ b/paddlepalm/__init__.py @@ -1,9 +1,16 @@ import downloader -from mtl_controller import Controller +# from mtl_controller import Controller +import controller +import optimizer +import lr_sched +import backbone +import reader +import head +from trainer import Trainer + del interface del task_instance del default_settings del utils -del mtl_controller \ No newline at end of file diff --git a/paddlepalm/backbone/__init__.py b/paddlepalm/backbone/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..9f98d7a6659548e322b5218c54bf53f73562b99d 100644 --- a/paddlepalm/backbone/__init__.py +++ b/paddlepalm/backbone/__init__.py @@ -0,0 +1,4 @@ + +from ernie import ERNIE +from bert import BERT + diff --git a/paddlepalm/basebackbone.py b/paddlepalm/backbone/base_backbone.py similarity index 60% rename from paddlepalm/basebackbone.py rename to paddlepalm/backbone/base_backbone.py index b8c3f78b716944528018a596aca0a73325f8b8d7..aab1ddea30fb496e5161a05547bb5fed6b034078 100644 --- a/paddlepalm/basebackbone.py +++ b/paddlepalm/backbone/base_backbone.py @@ -14,76 +14,8 @@ # limitations under the License. """v1.1""" -class reader(object): - """interface of data manager.""" - def __init__(self, config): - assert isinstance(config, dict) - - # @property - # def inputs_attr(self): - # """描述reader输入对象的属性,包含各个对象的名字、shape以及数据类型。当某个对象为标量数据类型(如str, int, float等)时,shape设置为空列表[],当某个对象的某个维度长度可变时,shape中的相应维度设置为-1. - # Return: - # dict类型。对各个输入对象的属性描述。例如, - # 对于文本分类任务,可能需要包含输入文本和所属标签的id - # {"text": ([], 'str'), - # "label": ([], 'int')} - # 对于标注任务,可能需要输入词序列和对应的标签 - # {"tokens", ([-1], 'str'), - # "tags", ([-1], 'str')} - # 对于机器阅读理解任务,可能需要包含上下文、问题、回答、答案区域的起止位置等 - # {"paragraph", ([], 'str'), - # "question", ([], 'str'), - # "start_position", ([], 'int') - # """ - # raise NotImplementedError() - - @property - def outputs_attr(self): - """描述reader输出对象(被yield出的对象)的属性,包含各个对象的名字、shape以及数据类型。当某个对象为标量数据类型(如str, int, float等)时,shape设置为空列表[],当某个对象的某个维度长度可变时,shape中的相应维度设置为-1。 - 注意:当使用mini-batch梯度下降学习策略时,,应为常规的输入对象设置batch_size维度(一般为-1) - Return: - dict类型。对各个输入对象的属性描述。例如, - 对于文本分类和匹配任务,yield的输出内容可能包含如下的对象(下游backbone和task可按需访问其中的对象) - {"token_ids": ([-1, max_len], 'int64'), - "input_ids": ([-1, max_len], 'int64'), - "segment_ids": ([-1, max_len], 'int64'), - "input_mask": ([-1, max_len], 'float32'), - "label": ([-1], 'int')} - """ - raise NotImplementedError() - - # def parse_line(self): - # """框架内部使用字典描述每个样本,字典的key为inputs_attr,value为每个input对应的符合attr描述的值。 - # 该函数负责将文本行解析成符合inputs_attr描述的字典类型的样本。默认的parse_line方法会读取json格式的数据集文件,数据集的每一行为json格式描述的样本。 - # 用户可通过对该方法的继承改写来适配不同格式的数据集,例如csv格式甚至tfrecord文件。 - # """ - # raise NotImplementedError() - # - # def tokenize(self, line): - # """框架中内置了word piece tokenizer等分词器,用户可通过修改tokenizer超参数来制定使用的分词器,若内置的分词器均无法满足需求,用户可通过对该方法的继承改写来自定义分词器。 - # Args: - # - line: a unicode string. - # Return: - # a list of tokens - # """ - # raise NotImplementedError() - - def iterator(self): - """数据集遍历接口,注意,当数据集遍历到尾部时该接口应自动完成指针重置,即重新从数据集头部开始新的遍历。 - Yield: - (dict) elements that meet the requirements in output_templete - """ - raise NotImplementedError() - - @property - def num_examples(self): - """数据集中的样本数量,即每个epoch中iterator所生成的样本数。注意,使用滑动窗口等可能导致数据集样本数发生变化的策略时,该接口应返回runtime阶段的实际样本数。""" - raise NotImplementedError() - - - -class backbone(object): +class BaseBackbone(object): """interface of backbone model.""" def __init__(self, config, phase): diff --git a/paddlepalm/backbone/bert.py b/paddlepalm/backbone/bert.py index d3592a5526447694e8a14d01dee2b9987740b2ed..5c5ece0291b6e2470bdd500c4050040613f47dbe 100644 --- a/paddlepalm/backbone/bert.py +++ b/paddlepalm/backbone/bert.py @@ -23,12 +23,44 @@ from paddle import fluid from paddle.fluid import layers from paddlepalm.backbone.utils.transformer import pre_process_layer, encoder -from paddlepalm.interface import backbone +from paddlepalm.backbone.base_backbone import BaseBackbone - -class Model(backbone): - - def __init__(self, config, phase): + +class BERT(BaseBackbone): + + + def __init__(hidden_size, num_hidden_layers, num_attention_heads, vocab_size, \ + max_position_embeddings, type_vocab_size, hidden_act, hidden_dropout_prob, \ + attention_probs_dropout_prob, initializer_range, phase='train'): + config = {} + config['hidden_size'] = hidden_size + config['num_hidden_layers'] = num_hidden_layers + config['num_attention_heads'] = num_attention_heads + config['vocab_size'] = vocab_size + config['max_position_embeddings'] = max_position_embeddings + config['type_vocab_size'] = sent_type_vocab_size + config['hidden_act'] = hidden_act + config['hidden_dropout_prob'] = hidden_dropout_prob + config['attention_probs_dropout_prob'] = attention_probs_dropout_prob + config['initializer_range'] = initializer_range + + self.from_config(config, phase=phase) + + @classmethod + def from_config(self, config, phase='train'): + + assert 'hidden_size' in config, "{} is required to initialize ERNIE".format('') + assert 'num_hidden_layers' in config, "{} is required to initialize ERNIE".format('num_hidden_layers') + assert 'num_attention_heads' in config, "{} is required to initialize ERNIE".format('num_attention_heads') + assert 'vocab_size' in config, "{} is required to initialize ERNIE".format('vocab_size') + assert 'max_position_embeddings' in config, "{} is required to initialize ERNIE".format('max_position_embeddings') + assert 'sent_type_vocab_size' in config or 'type_vocab_size' in config, \ + "{} is required to initialize ERNIE".format('type_vocab_size') + assert 'hidden_act' in config, "{} is required to initialize ERNIE".format('hidden_act') + assert 'hidden_dropout_prob' in config, "{} is required to initialize ERNIE".format('hidden_dropout_prob') + assert 'attention_probs_dropout_prob' in config, \ + "{} is required to initialize ERNIE".format('attention_probs_dropout_prob') + assert 'initializer_range' in config, "{} is required to initialize ERNIE".format('initializer_range') # self._is_training = phase == 'train' # backbone一般不用关心运行阶段,因为outputs在任何阶段基本不会变 self._emb_size = config["hidden_size"] @@ -153,3 +185,9 @@ class Model(backbone): pass +class Model(BERT): + """BERT wrapper for ConfigController""" + def __init__(self, config, phase): + BERT.from_config(config, phase=phase) + + diff --git a/paddlepalm/backbone/ernie.py b/paddlepalm/backbone/ernie.py index ded196385112513d001c6db4505cdc3883592984..63e3f464792835cf5d0d2b3f8915132e0c37a2c5 100644 --- a/paddlepalm/backbone/ernie.py +++ b/paddlepalm/backbone/ernie.py @@ -24,32 +24,30 @@ from paddle import fluid from paddle.fluid import layers from paddlepalm.backbone.utils.transformer import pre_process_layer, encoder -from paddlepalm.interface import backbone +from paddlepalm.backbone.base_backbone import BaseBackbone -class Model(backbone): - def __init__(self, - config, - phase): +class ERNIE(BaseBackbone): + + def __init__(self, hidden_size, num_hidden_layers, num_attention_heads, vocab_size, \ + max_position_embeddings, sent_type_vocab_size, task_type_vocab_size, \ + hidden_act, hidden_dropout_prob, attention_probs_dropout_prob, initializer_range, phase='train'): # self._is_training = phase == 'train' # backbone一般不用关心运行阶段,因为outputs在任何阶段基本不会变 - self._emb_size = config['hidden_size'] - self._n_layer = config['num_hidden_layers'] - self._n_head = config['num_attention_heads'] - self._voc_size = config['vocab_size'] - self._max_position_seq_len = config['max_position_embeddings'] - if config['sent_type_vocab_size']: - self._sent_types = config['sent_type_vocab_size'] - else: - self._sent_types = config['type_vocab_size'] + self._emb_size = hidden_size + self._n_layer = num_hidden_layers + self._n_head = num_attention_heads + self._voc_size = vocab_size + self._max_position_seq_len = max_position_embeddings + self._sent_types = sent_type_vocab_size - self._task_types = config['task_type_vocab_size'] + self._task_types = task_type_vocab_size - self._hidden_act = config['hidden_act'] - self._prepostprocess_dropout = config['hidden_dropout_prob'] - self._attention_dropout = config['attention_probs_dropout_prob'] + self._hidden_act = hidden_act + self._prepostprocess_dropout = hidden_dropout_prob + self._attention_dropout = attention_probs_dropout_prob self._word_emb_name = "word_embedding" self._pos_emb_name = "pos_embedding" @@ -58,7 +56,40 @@ class Model(backbone): self._emb_dtype = "float32" self._param_initializer = fluid.initializer.TruncatedNormal( - scale=config['initializer_range']) + scale=initializer_range) + + @classmethod + def from_config(cls, config, phase='train'): + assert 'hidden_size' in config, "{} is required to initialize ERNIE".format('hidden_size') + assert 'num_hidden_layers' in config, "{} is required to initialize ERNIE".format('num_hidden_layers') + assert 'num_attention_heads' in config, "{} is required to initialize ERNIE".format('num_attention_heads') + assert 'vocab_size' in config, "{} is required to initialize ERNIE".format('vocab_size') + assert 'max_position_embeddings' in config, "{} is required to initialize ERNIE".format('max_position_embeddings') + assert 'sent_type_vocab_size' in config or 'type_vocab_size' in config, "{} is required to initialize ERNIE".format('sent_type_vocab_size') + assert 'task_type_vocab_size' in config, "{} is required to initialize ERNIE".format('task_type_vocab_size') + assert 'hidden_act' in config, "{} is required to initialize ERNIE".format('hidden_act') + assert 'hidden_dropout_prob' in config, "{} is required to initialize ERNIE".format('hidden_dropout_prob') + assert 'attention_probs_dropout_prob' in config, "{} is required to initialize ERNIE".format('attention_probs_dropout_prob') + assert 'initializer_range' in config, "{} is required to initialize ERNIE".format('initializer_range') + + hidden_size = config['hidden_size'] + num_hidden_layers = config['num_hidden_layers'] + num_attention_heads = config['num_attention_heads'] + vocab_size = config['vocab_size'] + max_position_embeddings = config['max_position_embeddings'] + if 'sent_type_vocab_size' in config: + sent_type_vocab_size = config['sent_type_vocab_size'] + else: + sent_type_vocab_size = config['type_vocab_size'] + task_type_vocab_size = config['task_type_vocab_size'] + hidden_act = config['hidden_act'] + hidden_dropout_prob = config['hidden_dropout_prob'] + attention_probs_dropout_prob = config['attention_probs_dropout_prob'] + initializer_range = config['initializer_range'] + + return cls(hidden_size, num_hidden_layers, num_attention_heads, vocab_size, \ + max_position_embeddings, sent_type_vocab_size, task_type_vocab_size, \ + hidden_act, hidden_dropout_prob, attention_probs_dropout_prob, initializer_range, phase=phase) @property def inputs_attr(self): @@ -173,3 +204,12 @@ class Model(backbone): def postprocess(self, rt_outputs): pass + + + +class Model(ERNIE): + + def __init__(self, config, phase): + ERNIE.from_config(config, phase=phase) + + diff --git a/paddlepalm/controller/__init__.py b/paddlepalm/controller/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..9e43b331db94fc358903fb64b73e04b1dfcd3dff --- /dev/null +++ b/paddlepalm/controller/__init__.py @@ -0,0 +1,3 @@ + +from conf_controller import ConfigController +from controller import Controller diff --git a/paddlepalm/controller/conf_controller.py b/paddlepalm/controller/conf_controller.py index 01ee5cec31626688b4e11637718b29a48a2f8ee0..33f227d81a73c202c02c3c982356d1822d309212 100755 --- a/paddlepalm/controller/conf_controller.py +++ b/paddlepalm/controller/conf_controller.py @@ -35,6 +35,9 @@ from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn from paddlepalm.default_settings import * from paddlepalm.task_instance import TaskInstance, check_instances +import Queue +from threading import Thread + DEBUG=False VERBOSE=0 @@ -182,7 +185,7 @@ def _fit_attr(conf, fit_attr, strict=False): return conf -class ConfController(object): +class ConfigController(object): def __init__(self, config, task_dir='.', for_train=True): """ @@ -234,7 +237,7 @@ class ConfController(object): bb_conf = _merge_conf(mtl_conf, bb_conf) else: bb_conf = mtl_conf - print_dict(bb_conf, title='backbone configuration'.format(instname)) + print_dict(bb_conf, title = 'backbone configuration'.format(instname)) bb_name = mtl_conf['backbone'] bb_mod = importlib.import_module(BACKBONE_DIR + '.' + bb_name) @@ -338,6 +341,7 @@ class ConfController(object): main_conf = main_inst.config if not os.path.exists(main_conf['save_path']): os.makedirs(main_conf['save_path']) + os.makedirs(os.path.join(main_conf['save_path'], 'ckpt')) # prepare backbone train_backbone = Backbone(bb_conf, phase='train') @@ -398,11 +402,14 @@ class ConfController(object): prefixes.append(inst.name) mrs.append(inst.mix_ratio) - joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE) + joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict') + self._joint_iterator_fn = joint_iterator_fn input_attrs = [[i, j, k] for i, (j,k) in zip(joint_input_names, joint_shape_and_dtypes)] pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)] - net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3) + # net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3) + net_inputs = create_net_inputs(input_attrs, async=False) + self._net_inputs = net_inputs # build backbone and task layers train_prog = fluid.default_main_program() @@ -453,7 +460,7 @@ class ConfController(object): # compute loss task_id_var = net_inputs['__task_id'] - task_id_vec = layers.one_hot(task_id_var, num_instances) + task_id_vec = fluid.one_hot(task_id_var, num_instances) losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0) loss = layers.reduce_sum(task_id_vec * losses) @@ -522,15 +529,15 @@ class ConfController(object): inst.reader['pred'] = pred_reader return pred_prog - def load_pretrain(self, pretrain_model_path=None): + def load_pretrain(self, pretrain_path=None): # load pretrain model (or ckpt) - if pretrain_model_path is None: - assert 'pretrain_model_path' in self.main_conf, "pretrain_model_path NOT set." - pretrain_model_path = self.main_conf['pretrain_model_path'] + if pretrain_path is None: + assert 'pretrain_path' in self.main_conf, "pretrain_path NOT set." + pretrain_path = self.main_conf['pretrain_path'] init_pretraining_params( self.exe, - pretrain_model_path, + pretrain_path, main_program=fluid.default_startup_program()) @@ -567,6 +574,18 @@ class ConfController(object): return False return True + def pack_multicard_feed(iterator, net_inputs, dev_count): + ret = [] + mask = [] + for i in range(dev_count): + temp = {} + content, flag = next(iterator) + for q, var in net_inputs.items(): + temp[var.name] = content[q] + ret.append(temp) + mask.append(1 if flag else 0) + return ret, mask + # do training fetch_names, fetch_list = zip(*fetches.items()) @@ -575,8 +594,50 @@ class ConfController(object): epoch = 0 time_begin = time.time() backbone_buffer = [] + + def multi_dev_reader(reader, dev_count): + def worker(reader, dev_count, queue): + dev_batches = [] + for index, data in enumerate(reader()): + if len(dev_batches) < dev_count: + dev_batches.append(data) + if len(dev_batches) == dev_count: + queue.put((dev_batches, 0)) + dev_batches = [] + # For the prediction of the remained batches, pad more batches to + # the number of devices and the padded samples would be removed in + # prediction outputs. + if len(dev_batches) > 0: + num_pad = dev_count - len(dev_batches) + for i in range(len(dev_batches), dev_count): + dev_batches.append(dev_batches[-1]) + queue.put((dev_batches, num_pad)) + queue.put(None) + + queue = Queue.Queue(dev_count*2) + p = Thread( + target=worker, args=(reader, dev_count, queue)) + p.daemon = True + p.start() + while True: + ret = queue.get() + if ret is not None: + batches, num_pad = ret + queue.task_done() + for batch in batches: + flag = num_pad == 0 + if num_pad > 0: + num_pad -= 1 + yield batch, flag + else: + break + queue.join() + + joint_iterator = multi_dev_reader(self._joint_iterator_fn, self.dev_count) + while not train_finish(): - rt_outputs = self.exe.run(train_program, fetch_list=fetch_list) + feed, mask = pack_multicard_feed(joint_iterator, self._net_inputs, self.dev_count) + rt_outputs = self.exe.run(train_program, feed=feed, fetch_list=fetch_list) rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)} rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist() rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id @@ -591,8 +652,9 @@ class ConfController(object): global_step += 1 cur_task.cur_train_step += 1 - if cur_task.save_infermodel_every_n_steps > 0 and cur_task.cur_train_step % cur_task.save_infermodel_every_n_steps == 0: - cur_task.save(suffix='.step'+str(cur_task.cur_train_step)) + cur_task_global_step = cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch + if cur_task.is_target and cur_task.save_infermodel_every_n_steps > 0 and cur_task_global_step % cur_task.save_infermodel_every_n_steps == 0: + cur_task.save(suffix='.step'+str(cur_task_global_step)) if global_step % main_conf.get('print_every_n_steps', 5) == 0: loss = rt_outputs[cur_task.name+'/loss'] @@ -610,10 +672,16 @@ class ConfController(object): print(cur_task.name+': train finished!') cur_task.save() - if 'save_every_n_steps' in main_conf and global_step % main_conf['save_every_n_steps'] == 0: - save_path = os.path.join(main_conf['save_path'], + if 'save_ckpt_every_n_steps' in main_conf and global_step % main_conf['save_ckpt_every_n_steps'] == 0: + save_path = os.path.join(main_conf['save_path'], 'ckpt', "step_" + str(global_step)) fluid.io.save_persistables(self.exe, save_path, saver_program) + print('checkpoint has been saved at '+save_path) + + save_path = os.path.join(main_conf['save_path'], 'ckpt', + "step_" + str(global_step)) + fluid.io.save_persistables(self.exe, save_path, saver_program) + print('checkpoint has been saved at '+save_path) print("ALL tasks train finished, exiting...") @@ -673,6 +741,7 @@ if __name__ == '__main__': +__all__ = ["Controller"] diff --git a/paddlepalm/controller/controller.py b/paddlepalm/controller/controller.py index 2e98dddb71ad652e1bc1e0f100fcc2a064eb0a62..2fb73ca96326be9a7bb0ff09afb1c58472f815ea 100755 --- a/paddlepalm/controller/controller.py +++ b/paddlepalm/controller/controller.py @@ -415,9 +415,6 @@ class Controller(object): return loss, max_train_steps - - - def build_backward(self, optimizer, use_ema=False, ema_decay=0.9999): # build optimizer optimizer.optimize(fluid.default_main_program()) diff --git a/paddlepalm/head/__init__.py b/paddlepalm/head/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..97e3abd5ac55044342675236fea32ed5434059a9 --- /dev/null +++ b/paddlepalm/head/__init__.py @@ -0,0 +1,5 @@ + +from cls import Classify +# from match import Match +# from mrc import MRC +# from mlm import MaskLM diff --git a/paddlepalm/base_task.py b/paddlepalm/head/base_head.py similarity index 85% rename from paddlepalm/base_task.py rename to paddlepalm/head/base_head.py index 51b2025f0dfec38e7ea7ba6e06858e382042ce62..09cce602e05746b4e75f43d1863d222b762fc496 100644 --- a/paddlepalm/base_task.py +++ b/paddlepalm/head/base_head.py @@ -14,13 +14,15 @@ # limitations under the License. -class task(object): +class BaseHead(object): def __init__(self, config, phase, backbone_config): """ config: dict类型。描述了 任务实例(task instance)+多任务配置文件 中定义超参数 phase: str类型。运行阶段,目前支持train和predict """ + self._stop_gradient = {} + self._prog = None @property def inputs_attrs(self): @@ -43,6 +45,17 @@ class task(object): def epoch_inputs_attrs(self): return {} + # def stop_gradient(source, inputs): + # # if self._inputs is None: + # # raise Exception('You need to build this head first before stop gradient.') + # self._inputs = inputs + # for name, var in self._inputs[source].items(): + # # cur_block = self._prog.current_block() + # var = fluid.layers.assign(var) + # var.stop_gradient = True + # self._inputs[name] = var + # return self._inputs + def build(self, inputs, scope_name=""): """建立task_layer的计算图。将符合inputs_attrs描述的来自各个对象集的静态图Variables映射成符合outputs_attr描述的静态图Variable输出。 Args: @@ -52,6 +65,7 @@ class task(object): """ raise NotImplementedError() + def postprocess(self, rt_outputs): """每个训练或推理step后针对当前batch的task_layer的runtime计算结果进行相关后处理。注意,rt_outputs除了包含build方法,还自动包含了loss的计算结果。""" diff --git a/paddlepalm/task/classify.py b/paddlepalm/head/cls.py similarity index 70% rename from paddlepalm/task/classify.py rename to paddlepalm/head/cls.py index 87255260232543bdd96e325975d8c7cb89d286c9..f1b64d5a59b2a9f51882f5a6c8ecce311edcfb6d 100644 --- a/paddlepalm/task/classify.py +++ b/paddlepalm/head/cls.py @@ -15,51 +15,47 @@ import paddle.fluid as fluid from paddle.fluid import layers -from paddlepalm.base_task import base_task +from paddlepalm.head.base_head import BaseHead import numpy as np import os -def classify(num_classes, input_dim, dropout_prob, pred_output_dir=None, param_initializer_range=0.02, phase='train'): +# def classify(num_classes, input_dim, dropout_prob, pred_output_dir=None, param_initializer_range=0.02, phase='train'): +# +# config = { +# 'num_classes': num_classes, +# 'hidden_size': input_dim, +# 'dropout_prob': dropout_prob, +# 'pred_output_dir': pred_output_dir, +# 'initializer_range': param_initializer_range +# } +# +# return Task(config, phase, config) - config = { - 'num_classes': num_classes, - 'hidden_size': input_dim, - 'dropout_prob': dropout_prob, - 'pred_output_dir': pred_output_dir, - 'initializer_range': param_initializer_range - } - return Task(config, phase, config) - - -class Task(base_task): +class Classify(BaseHead): ''' classification ''' - def __init__(self, config, phase, backbone_config=None): + # def __init__(self, config, phase, backbone_config=None): + def __init__(self, num_classes, input_dim, dropout_prob=0.0, \ + param_initializer_range=0.02, phase='train'): + self._is_training = phase == 'train' - self._hidden_size = backbone_config['hidden_size'] - self.num_classes = config['num_classes'] + self._hidden_size = input_dim + + self.num_classes = num_classes - if 'initializer_range' in config: - self._param_initializer = config['initializer_range'] - else: - self._param_initializer = fluid.initializer.TruncatedNormal( - scale=backbone_config.get('initializer_range', 0.02)) - if 'dropout_prob' in config: - self._dropout_prob = config['dropout_prob'] - else: - self._dropout_prob = backbone_config.get('hidden_dropout_prob', 0.0) - self._pred_output_path = config.get('pred_output_dir', None) + self._dropout_prob = dropout_prob if phase == 'train' else 0.0 + self._param_initializer = fluid.initializer.TruncatedNormal( + scale=param_initializer_range) self._preds = [] @property def inputs_attrs(self): - if self._is_training: - reader = {"label_ids": [[-1], 'int64']} - else: - reader = {} + reader = {} bb = {"sentence_embedding": [[-1, self._hidden_size], 'float32']} + if self._is_training: + reader["label_ids"] = [[-1], 'int64'] return {'reader': reader, 'backbone': bb} @property @@ -96,11 +92,12 @@ class Task(base_task): else: return {"logits":logits} - def postprocess(self, rt_outputs): + def batch_postprocess(self, rt_outputs): if not self._is_training: logits = rt_outputs['logits'] preds = np.argmax(logits, -1) self._preds.extend(preds.tolist()) + return preds def epoch_postprocess(self, post_inputs): # there is no post_inputs needed and not declared in epoch_inputs_attrs, hence no elements exist in post_inputs diff --git a/paddlepalm/task/match.py b/paddlepalm/head/match.py similarity index 100% rename from paddlepalm/task/match.py rename to paddlepalm/head/match.py diff --git a/paddlepalm/task/mlm.py b/paddlepalm/head/mlm.py similarity index 100% rename from paddlepalm/task/mlm.py rename to paddlepalm/head/mlm.py diff --git a/paddlepalm/task/mrc.py b/paddlepalm/head/mrc.py similarity index 100% rename from paddlepalm/task/mrc.py rename to paddlepalm/head/mrc.py diff --git a/paddlepalm/lr_sched/__init__.py b/paddlepalm/lr_sched/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ea6406e6cb74f5f18e1bc6f90769f8ea7fd298f4 --- /dev/null +++ b/paddlepalm/lr_sched/__init__.py @@ -0,0 +1,3 @@ + +from slanted_triangular_schedualer import TriangularSchedualer +from warmup_schedualer import WarmupSchedualer diff --git a/paddlepalm/lr_sched/noam_decay_schedualer.py b/paddlepalm/lr_sched/noam_decay_schedualer.py new file mode 100644 index 0000000000000000000000000000000000000000..65ef8f1e56c188380615db79fc6a14018d175e2c --- /dev/null +++ b/paddlepalm/lr_sched/noam_decay_schedualer.py @@ -0,0 +1,4 @@ + +# scheduled_lr = fluid.layers.learning_rate_scheduler\ +# .noam_decay(1/(warmup_steps *(config['learning_rate'] ** 2)), +# warmup_steps) diff --git a/paddlepalm/lr_sched/schedualer.py b/paddlepalm/lr_sched/schedualer.py new file mode 100644 index 0000000000000000000000000000000000000000..5c3c7ac501b1a0ac88bebcb77bab851651381097 --- /dev/null +++ b/paddlepalm/lr_sched/schedualer.py @@ -0,0 +1,12 @@ + +class BaseSchedualer(): + + def __init__(self): + self._prog = None + + def _set_prog(self, prog): + self._prog = prog + + def build(self, learning_rate): + raise NotImplementedError() + diff --git a/paddlepalm/lr_sched/slanted_triangular_schedualer.py b/paddlepalm/lr_sched/slanted_triangular_schedualer.py new file mode 100644 index 0000000000000000000000000000000000000000..8a3f3a7b676a4bd4aea58bb849240e340806a0f5 --- /dev/null +++ b/paddlepalm/lr_sched/slanted_triangular_schedualer.py @@ -0,0 +1,42 @@ + +from paddlepalm.lr_sched.schedualer import BaseSchedualer +from paddle import fluid + +class TriangularSchedualer(BaseSchedualer): + + """ Applies linear warmup of learning rate from 0 to learning_rate until warmup_steps, and then decay to 0 linearly until num_train_steps.""" + + def __init__(self, warmup_steps, num_train_steps): + BaseSchedualer.__init__(self) + assert num_train_steps > warmup_steps > 0 + self.warmup_steps = warmup_steps + self.num_train_steps = num_train_steps + + + def build(self, learning_rate): + with self._prog._lr_schedule_guard(): + lr = fluid.layers.tensor.create_global_var( + shape=[1], + value=0.0, + dtype='float32', + persistable=True, + name="scheduled_learning_rate") + + global_step = fluid.layers.learning_rate_scheduler._decay_step_counter() + + with fluid.layers.control_flow.Switch() as switch: + with switch.case(global_step < self.warmup_steps): + warmup_lr = learning_rate * (global_step / self.warmup_steps) + fluid.layers.tensor.assign(warmup_lr, lr) + with switch.default(): + decayed_lr = fluid.layers.learning_rate_scheduler.polynomial_decay( + learning_rate=learning_rate, + decay_steps=self.num_train_steps, + end_learning_rate=0.0, + power=1.0, + cycle=False) + fluid.layers.tensor.assign(decayed_lr, lr) + + return lr + + diff --git a/paddlepalm/lr_sched/warmup_schedualer.py b/paddlepalm/lr_sched/warmup_schedualer.py new file mode 100644 index 0000000000000000000000000000000000000000..63157ead376b46585df9df249d577cfaedb82187 --- /dev/null +++ b/paddlepalm/lr_sched/warmup_schedualer.py @@ -0,0 +1,31 @@ + +from paddlepalm.lr_sched.schedualer import BaseSchedualer + +def WarmupSchedualer(BaseSchedualer): + """ Applies linear warmup of learning rate from 0 to learning_rate until warmup_steps, and then decay to 0 linearly until num_train_steps.""" + + def __init__(self, warmup_steps): + schedualer.__init__(self) + self.warmup_steps = warmup_steps + + def build(self, learning_rate): + + with self._prog._lr_schedule_guard(): + lr = fluid.layers.tensor.create_global_var( + shape=[1], + value=0.0, + dtype='float32', + persistable=True, + name="scheduled_learning_rate") + + global_step = fluid.layers.learning_rate_scheduler._decay_step_counter() + + with fluid.layers.control_flow.Switch() as switch: + with switch.case(global_step < self.warmup_steps): + warmup_lr = learning_rate * (global_step / self.warmup_steps) + fluid.layers.tensor.assign(warmup_lr, lr) + with switch.default(): + fluid.layers.tensor.assign(learning_rate, lr) + + return lr + diff --git a/paddlepalm/mtl_controller.py b/paddlepalm/mtl_controller.py deleted file mode 100755 index db9c9c8924248c6bff38d20121c0c499563f3e9a..0000000000000000000000000000000000000000 --- a/paddlepalm/mtl_controller.py +++ /dev/null @@ -1,747 +0,0 @@ -# -*- coding: UTF-8 -*- -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import os -import sys -import importlib -import multiprocessing -from paddle import fluid -from paddle.fluid import layers -import yaml -import json -import logging -import time -import numpy as np - -from paddlepalm.utils.saver import init_pretraining_params, init_checkpoint -from paddlepalm.utils.config_helper import PDConfig -from paddlepalm.utils.print_helper import print_dict -from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn, create_joint_iterator_fn, merge_input_attrs - -from paddlepalm.default_settings import * -from task_instance import TaskInstance, check_instances - -import Queue -from threading import Thread - -DEBUG=False -VERBOSE=0 - -def _get_basename(f): - return os.path.splitext(f)[0] - - -def _get_suffix(f): - return os.path.splitext(f)[-1] - - -def _parse_yaml(f, asdict=True, support_cmd_line=False): - assert os.path.exists(f), "file {} not found.".format(f) - if support_cmd_line: - args = PDConfig(yaml_file=f, fuse_args=True) - args.build() - return args.asdict() if asdict else args - else: - if asdict: - with open(f, "r") as fin: - yaml_config = yaml.load(fin, Loader=yaml.SafeLoader) - return yaml_config - else: - raise NotImplementedError() - - -def _parse_json(f, asdict=True, support_cmd_line=False): - assert os.path.exists(f), "file {} not found.".format(f) - if support_cmd_line: - args = PDConfig(json_file=f, fuse_args=support_cmd_line) - args.build() - return args.asdict() if asdict else args - else: - if asdict: - with open(f, "r") as fin: - config = json.load(fin) - return config - else: - raise NotImplementedError() - - -def _parse_list(string, astype=str): - assert isinstance(string, str), "{} is not a string.".format(string) - if ',' not in string: - return [astype(string)] - string = string.replace(',', ' ') - return [astype(i) for i in string.split()] - - -def _try_float(s): - try: - float(s) - return(float(s)) - except: - return s - - -def _check_conf(conf, checklist=None): - assert isinstance(conf, dict), "{} is not a dict.".format(conf) - ret = {} - for k,v in conf.items(): - if isinstance(v, str): - v = _try_float(v) - ret[k] = v - if checklist is not None: - for k, t in checklist: - assert k in ret, "required argument {} is NOT exist in config file.".format(k) - assert isintance(ret[k], t), "value type of argument {} should be {}".format(k, t) - return ret - - -# TODO: 增加None机制,允许hidden size、batch size和seqlen设置为None -def _check_io(in_attr, out_attr, strict=False, in_name="left", out_name="right"): - for name, attr in in_attr.items(): - assert name in out_attr, in_name+': '+name+' not found in '+out_name - if attr != out_attr[name]: - if strict: - raise ValueError(name+': shape or dtype not consistent!') - else: - logging.warning('{}: shape or dtype not consistent!\n{}:\n{}\n{}:\n{}'.format(name, in_name, attr, out_name, out_attr[name])) - - -def _merge_conf(conf1, conf2, conf1_first=True, strict=False): - assert isinstance(conf1, dict), "{} is not a dict.".format(conf1) - assert isinstance(conf2, dict), "{} is not a dict.".format(conf2) - base_conf = conf2 if conf1_first else conf1 - base_conf = base_conf.copy() - new_conf = conf1 if conf1_first else conf2 - - for k, v in new_conf.items(): - if k in base_conf: - if base_conf[k] != v: - raise Warning("value of argument {} has been updated to {}.".format(k, v)) - else: - if strict: - continue - - base_conf[k] = v - return base_conf - - -def _encode_inputs(inputs, scope_name, sep='/', cand_set=None): - outputs = {} - for k, v in inputs.items(): - if cand_set is not None: - if k in cand_set: - outputs[k] = v - if scope_name+sep+k in cand_set: - outputs[scope_name+sep+k] = v - else: - outputs[scope_name+sep+k] = v - return outputs - - -def _decode_inputs(inputs, scope_name, sep='/', keep_unk_keys=True): - outputs = {} - for name, value in inputs.items(): - # var for backbone are also available to tasks - if keep_unk_keys and sep not in name: - outputs[name] = value - # var for this inst - if name.startswith(scope_name+'/'): - outputs[name[len(scope_name+'/'):]] = value - return outputs - - -def _init_env(use_gpu): - if use_gpu: - place = fluid.CUDAPlace(0) - dev_count = fluid.core.get_cuda_device_count() - else: - place = fluid.CPUPlace() - dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count())) - return fluid.Executor(place), dev_count - - -def _fit_attr(conf, fit_attr, strict=False): - for i, attr in fit_attr.items(): - if i not in conf: - if strict: - raise Exception('Argument {} is required to create a controller.'.format(i)) - else: - continue - conf[i] = attr(conf[i]) - return conf - - -class Controller(object): - - def __init__(self, config, task_dir='.', for_train=True): - """ - Args: - config: (str|dict) 字符串类型时,给出yaml格式的config配置文件路径; - """ - - self._for_train = for_train - assert isinstance(config, str) or isinstance(config, dict), "a config dict or config file path is required to create a Controller." - - if isinstance(config, str): - mtl_conf = _parse_yaml(config, support_cmd_line=True) - else: - mtl_conf = config - - mtl_conf = _check_conf(mtl_conf) - mtl_conf = _fit_attr(mtl_conf, REQUIRED_ARGS, strict=True) - mtl_conf = _fit_attr(mtl_conf, OPTIONAL_ARGS, strict=False) - - exe, dev_count = _init_env(use_gpu=mtl_conf.get('use_gpu', True)) - self.exe = exe - self.dev_count = dev_count - - print_dict(mtl_conf, title='global configuration') - - # parse task instances and target tags - instnames = _parse_list(mtl_conf['task_instance']) - assert len(instnames) == len(set(instnames)), "repeated task_instance is NOT supported." - num_instances = len(instnames) - self.num_instances = num_instances - - instname_to_conf = {} - instname_to_id = {} - for id, instname in enumerate(instnames): - instpath = os.path.join(task_dir, instname+'.yaml') - conf = _parse_yaml(instpath, support_cmd_line=False) - # conf = _check_conf(conf, TASK_INSTANCE_REQUIRED_ARGS) - conf = _check_conf(conf) - temp_conf = _merge_conf(mtl_conf, conf, strict=True) - print_dict(temp_conf, title='{} configuration'.format(instname)) - conf = _merge_conf(mtl_conf, conf) - - instname_to_conf[instname] = conf - instname_to_id[instname] = id - - # prepare backbone - if 'backbone_config_path' in mtl_conf: - bb_conf = _parse_json(mtl_conf['backbone_config_path']) - bb_conf = _merge_conf(mtl_conf, bb_conf) - else: - bb_conf = mtl_conf - print_dict(bb_conf, title = 'backbone configuration'.format(instname)) - - bb_name = mtl_conf['backbone'] - bb_mod = importlib.import_module(BACKBONE_DIR + '.' + bb_name) - Backbone = getattr(bb_mod, 'Model') - - # create task instances - instances = [] - for name in instnames: - instances.append(TaskInstance(name, instname_to_id[name], instname_to_conf[name])) - - check_instances(instances) - - # parse target_tag - if 'target_tag' in mtl_conf: - target_tag = str(mtl_conf['target_tag']) - tags = _parse_list(target_tag, astype=int) - assert len(tags) == len(instnames), "number of target_tag is NOT consistent with that in task_instance." - for tag, inst in zip(tags, instances): - inst.is_target = tag - else: - tags = [i.is_target for i in instances] - num_targets = sum(tags) - num_auxes = num_instances - num_targets - - # parse mix ratios - if 'mix_ratio' in mtl_conf: - mix_ratio = str(mtl_conf['mix_ratio']) - mrs = _parse_list(mix_ratio, astype=float) - assert len(mrs) == num_instances, "number of mix_ratios is NOT consistent with num_instances." - else: - mrs = [1.0] * num_instances - - for mr, inst in zip(mrs, instances): - inst.mix_ratio = mr - - # parse task layer reuse tags - instname_to_reusehost = {i:i for i in instnames} - if 'task_reuse_tag' in mtl_conf: - tags = _parse_list(mtl_conf['task_reuse_tag'], astype=int) - assert len(tags) == num_targets, 'number of reuse_tags is NOT consistent with number of instances.' - else: - tags = [] - mapper = {} - for inst in instances: - history = set() - history.add(inst.name) - cur_inst = inst - while True: - if cur_inst.task_reuse_scope in history: - mapper[inst.name] = len(tags) - break - elif cur_inst.task_reuse_scope in mapper: - mapper[inst.name] = mapper[cur_inst.task_reuse_scope] - break - else: - cur_inst = name_to_instance[cur_inst.task_reuse_scope] - history.add(cur_inst.name) - - tags.append(mapper[inst.name]) - - for i in range(1, num_instances): - for j in range(i): - if tags[i] == tags[j]: - assert instances[i].Paradigm == \ - instances[j].Paradigm, \ - "paradigm of reuse tasks should be consistent" - instances[i].task_reuse_scope = instances[j].name - break - - self.instances = instances - self.mrs = mrs - self.Backbone = Backbone - self.bb_conf = bb_conf - self.bb_name = bb_name - - self.has_init_train = False - self.has_init_pred = False - - if self._for_train: - print("initialing for training...") - self._init_train() - self.has_init_train = True - - def _init_train(self): - - instances = self.instances - Backbone = self.Backbone - bb_conf = self.bb_conf - bb_name = self.bb_name - dev_count = self.dev_count - num_instances = len(instances) - mrs = self.mrs - - # set first_target/main task instance - main_inst = None - for inst in instances: - if inst.is_target: - main_inst = inst - inst.is_first_target = True - break - main_conf = main_inst.config - if not os.path.exists(main_conf['save_path']): - os.makedirs(main_conf['save_path']) - os.makedirs(os.path.join(main_conf['save_path'], 'ckpt')) - - # prepare backbone - train_backbone = Backbone(bb_conf, phase='train') - pred_backbone = Backbone(bb_conf, phase='pred') - - # create reader, task - # then check i/o across reader, backbone and task_layer - task_attrs = [] - pred_task_attrs = [] - for inst in instances: - train_reader = inst.Reader(inst.config, phase='train') - inst.reader['train'] = train_reader - train_parad = inst.Paradigm(inst.config, phase='train', backbone_config=bb_conf) - inst.task_layer['train'] = train_parad - task_attr_from_reader = _encode_inputs(train_parad.inputs_attrs['reader'], inst.name) - task_attrs.append(task_attr_from_reader) - - _check_io(train_backbone.inputs_attr, train_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train') - _check_io(train_parad.inputs_attrs['reader'], train_reader.outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train') - _check_io(train_parad.inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone') - - if inst.is_target: - if 'pred_file' not in inst.config: - inst.config['pred_file'] = '' - pred_reader = inst.Reader(inst.config, phase='pred') - pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf) - inst.task_layer['pred'] = pred_parad - task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name) - pred_task_attrs.append(task_attr_from_reader) - _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred') - _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred') - _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone') - - # merge reader input attrs from backbone and task_instances - joint_input_names, joint_shape_and_dtypes, name_to_position = merge_input_attrs(train_backbone.inputs_attr, task_attrs) - pred_joint_input_names, pred_joint_shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, pred_task_attrs, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False) - # shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN] - - if DEBUG: - print('----- for debug -----') - print('joint input names:') - print(joint_input_names) - print('joint input shape and dtypes:') - print(joint_shape_and_dtypes) - - # load data - for inst in instances: - print(inst.name+": preparing data...", end='') - inst.reader['train'].load_data() - print('ok!') - - # merge dataset iterators and create net input vars - iterators = [] - prefixes = [] - mrs = [] - for inst in instances: - iterators.append(inst.reader['train'].iterator()) - prefixes.append(inst.name) - mrs.append(inst.mix_ratio) - - joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict') - self._joint_iterator_fn = joint_iterator_fn - - input_attrs = [[i, j, k] for i, (j,k) in zip(joint_input_names, joint_shape_and_dtypes)] - pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)] - # net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3) - net_inputs = create_net_inputs(input_attrs, async=False) - self._net_inputs = net_inputs - - # build backbone and task layers - train_prog = fluid.default_main_program() - train_init_prog = fluid.default_startup_program() - bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_') - assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys()) - - pred_prog = fluid.Program() - pred_init_prog = fluid.Program() - - with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog): - pred_net_inputs = create_net_inputs(pred_input_attrs) - pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_') - - fluid.framework.switch_main_program(train_prog) - fluid.framework.switch_startup_program(train_init_prog) - - task_output_vars = {} - for inst in instances: - task_inputs = {'backbone': bb_output_vars} - task_inputs_from_reader = _decode_inputs(net_inputs, inst.name) - task_inputs['reader'] = task_inputs_from_reader - - scope = inst.task_reuse_scope + '/' - with fluid.unique_name.guard(scope): - output_vars = inst.build_task_layer(task_inputs, phase='train', scope=scope) - output_vars = {inst.name+'/'+key: val for key, val in output_vars.items()} - old = len(task_output_vars) # for debug - task_output_vars.update(output_vars) - assert len(task_output_vars) - old == len(output_vars) # for debug - - # prepare predict vars for saving inference model - if inst.is_target: - - with fluid.program_guard(pred_prog, pred_init_prog): - cur_inputs = _decode_inputs(pred_net_inputs, inst.name) - inst.pred_input = cur_inputs - pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs} - scope = inst.task_reuse_scope + '/' - with fluid.unique_name.guard(scope): - inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope) - - - bb_fetches = {k: v.name for k,v in bb_output_vars.items()} - task_fetches = {k: v.name for k,v in task_output_vars.items()} - fetches = task_fetches - fetches['__task_id'] = net_inputs['__task_id'].name - - # compute loss - task_id_var = net_inputs['__task_id'] - task_id_vec = fluid.one_hot(task_id_var, num_instances) - losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0) - loss = layers.reduce_sum(task_id_vec * losses) - - main_reader = main_inst.reader['train'] - - num_examples = main_reader.num_examples - for inst in instances: - max_train_steps = int(main_conf['num_epochs']* inst.mix_ratio * (num_examples // main_conf['batch_size'] // dev_count)) - if inst.is_target: - print('{}: expected train steps {}.'.format(inst.name, max_train_steps)) - inst.steps_pur_epoch = inst.reader['train'].num_examples // main_conf['batch_size'] // dev_count - inst.expected_train_steps = max_train_steps - - global_max_train_steps = int(main_conf['num_epochs'] * sum(mrs) * (num_examples // main_conf['batch_size'] // dev_count)) - print('Estimated overall train steps {}.'.format(global_max_train_steps)) - - if 'warmup_proportion' in main_conf and main_conf['warmup_proportion'] > 0: - warmup_steps = int(global_max_train_steps * main_conf['warmup_proportion']) - print('Warmup steps: '+str(warmup_steps)) - else: - warmup_steps = 0 - - # build optimizer - if 'optimizer' in main_conf: - optim_mod = importlib.import_module(OPTIMIZER_DIR + '.' + main_conf['optimizer']) - optimize = getattr(optim_mod, OPTIMIZE_METHOD) - optimize(loss, main_conf, max_train_steps, warmup_steps, fluid.default_main_program()) - - loss.persistable = True - if main_conf.get('use_ema', False): - assert 'ema_decay' in main_conf, "ema_decay should be set when use_ema is enabled." - ema = fluid.optimizer.ExponentialMovingAverage(main_conf['ema_decay']) - ema.update() - - # prepare for train - self.train_backbone = train_backbone - self.train_program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(loss_name=loss.name) - self.saver_program = fluid.default_main_program() - - self.main_inst = main_inst - self.fetches = fetches - self.has_init_train = True - self.has_init_pred = True - - self.exe.run(fluid.default_startup_program()) - print("\nRandomly initialize parameters...\n") - - def _init_pred(self, instance, infer_model_path): - inst = instance - if 'pred_output_path' not in inst.config: - inst.config['pred_output_path'] = os.path.join(inst.config.get('save_path', '.'), inst.name) - - if not os.path.exists(inst.config['pred_output_path']): - os.makedirs(inst.config['pred_output_path']) - - pred_backbone = self.Backbone(self.bb_conf, phase='pred') - pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=self.bb_conf) - inst.task_layer['pred'] = pred_parad - pred_joint_input_names, pred_joint_shape_and_dtypes, name_to_position = merge_input_attrs( - pred_backbone.inputs_attr, inst.task_layer['pred'].inputs_attrs['reader'], - insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False) - - pred_prog = inst.load(infer_model_path) - if inst.reader['pred'] is None: - pred_reader = inst.Reader(inst.config, phase='pred') - inst.reader['pred'] = pred_reader - return pred_prog - - def load_pretrain(self, pretrain_path=None): - # load pretrain model (or ckpt) - if pretrain_path is None: - assert 'pretrain_path' in self.main_conf, "pretrain_path NOT set." - pretrain_path = self.main_conf['pretrain_path'] - - init_pretraining_params( - self.exe, - pretrain_path, - main_program=fluid.default_startup_program()) - - - def train(self): - - if not self.has_init_train: - self._init_train() - self.has_init_train = True - - instances = self.instances - num_instances = self.num_instances - main_inst = self.main_inst - main_conf = main_inst.config - - backbone = self.train_backbone - train_program = self.train_program - saver_program = self.saver_program - fetches = self.fetches - - finish = [] - for inst in instances: - if inst.is_target: - if inst.expected_train_steps > 0: - finish.append(False) - else: - finish.append(True) - print(inst.name+': train finished!') - inst.save() - - def train_finish(): - for inst in instances: - if inst.is_target: - if not inst.train_finish: - return False - return True - - def pack_multicard_feed(iterator, net_inputs, dev_count): - ret = [] - mask = [] - for i in range(dev_count): - temp = {} - content, flag = next(iterator) - for q, var in net_inputs.items(): - temp[var.name] = content[q] - ret.append(temp) - mask.append(1 if flag else 0) - return ret, mask - - # do training - fetch_names, fetch_list = zip(*fetches.items()) - - main_step = 0 # only count for main task - global_step = 0 # count for all tasks - epoch = 0 - time_begin = time.time() - backbone_buffer = [] - - def multi_dev_reader(reader, dev_count): - def worker(reader, dev_count, queue): - dev_batches = [] - for index, data in enumerate(reader()): - if len(dev_batches) < dev_count: - dev_batches.append(data) - if len(dev_batches) == dev_count: - queue.put((dev_batches, 0)) - dev_batches = [] - # For the prediction of the remained batches, pad more batches to - # the number of devices and the padded samples would be removed in - # prediction outputs. - if len(dev_batches) > 0: - num_pad = dev_count - len(dev_batches) - for i in range(len(dev_batches), dev_count): - dev_batches.append(dev_batches[-1]) - queue.put((dev_batches, num_pad)) - queue.put(None) - - queue = Queue.Queue(dev_count*2) - p = Thread( - target=worker, args=(reader, dev_count, queue)) - p.daemon = True - p.start() - while True: - ret = queue.get() - if ret is not None: - batches, num_pad = ret - queue.task_done() - for batch in batches: - flag = num_pad == 0 - if num_pad > 0: - num_pad -= 1 - yield batch, flag - else: - break - queue.join() - - joint_iterator = multi_dev_reader(self._joint_iterator_fn, self.dev_count) - - while not train_finish(): - feed, mask = pack_multicard_feed(joint_iterator, self._net_inputs, self.dev_count) - rt_outputs = self.exe.run(train_program, feed=feed, fetch_list=fetch_list) - rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)} - rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist() - rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id - cur_task = instances[rt_task_id] - - backbone_rt_outputs = {k:v for k,v in rt_outputs.items() if '/' not in k} - backbone_buffer.append(backbone.postprocess(backbone_rt_outputs)) - - task_rt_outputs = {k[len(cur_task.name+'/'):]: v for k,v in rt_outputs.items() if k.startswith(cur_task.name+'/')} - instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs) - - global_step += 1 - cur_task.cur_train_step += 1 - - cur_task_global_step = cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch - if cur_task.is_target and cur_task.save_infermodel_every_n_steps > 0 and cur_task_global_step % cur_task.save_infermodel_every_n_steps == 0: - cur_task.save(suffix='.step'+str(cur_task_global_step)) - - if global_step % main_conf.get('print_every_n_steps', 5) == 0: - loss = rt_outputs[cur_task.name+'/loss'] - loss = np.mean(np.squeeze(loss)).tolist() - - time_end = time.time() - time_cost = time_end - time_begin - - print("Global step: {}. Task: {}, step {}/{} (epoch {}), loss: {:.3f}, speed: {:.2f} steps/s".format( - global_step, cur_task.name, cur_task.cur_train_step, cur_task.steps_pur_epoch, cur_task.cur_train_epoch, - loss, main_conf.get('print_every_n_steps', 5) / time_cost)) - time_begin = time.time() - - if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps: - print(cur_task.name+': train finished!') - cur_task.save() - - if 'save_ckpt_every_n_steps' in main_conf and global_step % main_conf['save_ckpt_every_n_steps'] == 0: - save_path = os.path.join(main_conf['save_path'], 'ckpt', - "step_" + str(global_step)) - fluid.io.save_persistables(self.exe, save_path, saver_program) - print('checkpoint has been saved at '+save_path) - - save_path = os.path.join(main_conf['save_path'], 'ckpt', - "step_" + str(global_step)) - fluid.io.save_persistables(self.exe, save_path, saver_program) - print('checkpoint has been saved at '+save_path) - - print("ALL tasks train finished, exiting...") - - def pred(self, task_instance, inference_model_dir=None): - if self._for_train: - raise Exception('This controller is a trainer. Please build a new controller with for_train=False for predicting.') - - assert isinstance(task_instance, str) - if isinstance(inference_model_dir, str): - assert os.path.exists(inference_model_dir), inference_model_dir+" not found." - # if not self.has_init_pred and inference_model_dir is None: - # raise ValueError('infer_model_path is required for prediction.') - if inference_model_dir is None: - assert 'save_path' in self.mtl_conf, "one of the `inference_model_dir` and 'save_path' should be set to load inference model." - inference_model_dir = os.path.join(self.mtl_conf['save_path'], task_instance, 'infer_model') - - instance = None - for inst in self.instances: - if inst.name == task_instance: - instance = inst - break - - if instance is None: - raise ValueError(task_instance + ' is not a valid task_instance.') - - pred_prog = self._init_pred(instance, inference_model_dir) - - inst = instance - print(inst.name+": loading data...") - inst.reader['pred'].load_data() - fetch_names, fetch_vars = inst.pred_fetch_list - - print('predicting...') - mapper = {k:v for k,v in inst.pred_input} - buf = [] - for feed in inst.reader['pred'].iterator(): - feed = _encode_inputs(feed, inst.name, cand_set=mapper) - feed = {mapper[k]: v for k,v in feed.items()} - - rt_outputs = self.exe.run(pred_prog, feed, fetch_vars) - rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)} - inst.postprocess(rt_outputs, phase='pred') - if inst.task_layer['pred'].epoch_inputs_attrs: - reader_outputs = inst.reader['pred'].get_epoch_outputs() - else: - reader_outputs = None - inst.epoch_postprocess({'reader':reader_outputs}, phase='pred') - - -if __name__ == '__main__': - assert len(sys.argv) == 2, "Usage: python mtl_controller.py " - conf_path = sys.argv[1] - del sys.argv[1] - controller = Controller(conf_path) - if controller.main_conf['do_train']: - controller.train() - - - -__all__ = ["Controller"] - - - diff --git a/paddlepalm/optimizer/.adam.py.swp b/paddlepalm/optimizer/.adam.py.swp deleted file mode 100644 index fa7213a37347ffd5b451e2a3a364dd3ebeac523f..0000000000000000000000000000000000000000 Binary files a/paddlepalm/optimizer/.adam.py.swp and /dev/null differ diff --git a/paddlepalm/optimizer/__init__.py b/paddlepalm/optimizer/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..72430e58230e585317fb1e3397a6f8763775adf3 100644 --- a/paddlepalm/optimizer/__init__.py +++ b/paddlepalm/optimizer/__init__.py @@ -0,0 +1,2 @@ + +from adam import Adam diff --git a/paddlepalm/optimizer/adam.py b/paddlepalm/optimizer/adam.py index efa7cd8f74abbc7a8cf4a25d29bb305315c70f58..04d52314370f4c612720a880f6ffdb65def2c317 100644 --- a/paddlepalm/optimizer/adam.py +++ b/paddlepalm/optimizer/adam.py @@ -20,102 +20,36 @@ from __future__ import print_function import numpy as np import paddle.fluid as fluid +from paddlepalm.optimizer.base_optimizer import BaseOptimizer -class schedualer(object): - - def __init__(self): - pass +class Adam(BaseOptimizer): - def lr(self): - pass + def __init__(self, loss_var, lr, lr_schedualer=None): + BaseOptimizer.__init__(self, loss_var, lr, lr_schedualer=None) -def ConstantLearning(): - def __init__(self, lr): + self._loss = loss_var self._lr = lr + self._lr_schedualer = lr_schedualer + + def build(self, grad_clip=None): - def lr(self): - return self._lr - - -def LinearWarmupLearning(): -def linear_warmup_decay(learning_rate, warmup_steps, num_train_steps): - """ Applies linear warmup of learning rate from 0 and decay to 0.""" - with fluid.default_main_program()._lr_schedule_guard(): - lr = fluid.layers.tensor.create_global_var( - shape=[1], - value=0.0, - dtype='float32', - persistable=True, - name="scheduled_learning_rate") - - global_step = fluid.layers.learning_rate_scheduler._decay_step_counter() - - with fluid.layers.control_flow.Switch() as switch: - with switch.case(global_step < warmup_steps): - warmup_lr = learning_rate * (global_step / warmup_steps) - fluid.layers.tensor.assign(warmup_lr, lr) - with switch.default(): - decayed_lr = fluid.layers.learning_rate_scheduler.polynomial_decay( - learning_rate=learning_rate, - decay_steps=num_train_steps, - end_learning_rate=0.0, - power=1.0, - cycle=False) - fluid.layers.tensor.assign(decayed_lr, lr) - - return lr - - -def optimize(loss, config, max_train_steps=None, warmup_steps=0, train_program=None): - if warmup_steps > 0: - decay_strategy = config.get('lr_scheduler', 'linear_warmup_decay') - if decay_strategy == 'noam_decay': - scheduled_lr = fluid.layers.learning_rate_scheduler\ - .noam_decay(1/(warmup_steps *(config['learning_rate'] ** 2)), - warmup_steps) - elif decay_strategy == 'linear_warmup_decay': - scheduled_lr = linear_warmup_decay(config['learning_rate'], warmup_steps, - max_train_steps) - else: - raise ValueError("Unkown lr_scheduler, should be " - "'noam_decay' or 'linear_warmup_decay'") - optimizer = fluid.optimizer.Adam(learning_rate=scheduled_lr) - else: - optimizer = fluid.optimizer.Adam(learning_rate=config['learning_rate']) - scheduled_lr = config['learning_rate'] - - clip_norm_thres = 1.0 - # When using mixed precision training, scale the gradient clip threshold - # by loss_scaling - fluid.clip.set_gradient_clip( - clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=clip_norm_thres)) - - def exclude_from_weight_decay(name): - if name.find("layer_norm") > -1: - return True - bias_suffix = ["_bias", "_b", ".b_0"] - for suffix in bias_suffix: - if name.endswith(suffix): - return True - return False + if self._lr_schedualer is not None: + self._lr = self._lr_schedualer.build(self._lr) - param_list = dict() + optimizer = fluid.optimizer.Adam(learning_rate=self._lr) - for param in train_program.global_block().all_parameters(): - param_list[param.name] = param * 1.0 - param_list[param.name].stop_gradient = True + if grad_clip is not None: + clip_norm_thres = grad_clip + # When using mixed precision training, scale the gradient clip threshold + # by loss_scaling + fluid.clip.set_gradient_clip( + clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=clip_norm_thres)) - _, param_grads = optimizer.minimize(loss) + _, param_grads = optimizer.minimize(self._loss) + return param_grads + def get_cur_learning_rate(self): + return self._lr - if config.get('weight_decay', 0) > 0: - for param, grad in param_grads: - if exclude_from_weight_decay(param.name): - continue - with param.block.program._optimized_guard( - [param, grad]), fluid.framework.name_scope("weight_decay"): - updated_param = param - param_list[ - param.name] * config['weight_decay'] * scheduled_lr - fluid.layers.assign(output=param, input=updated_param) diff --git a/paddlepalm/optimizer/base_optimizer.py b/paddlepalm/optimizer/base_optimizer.py new file mode 100644 index 0000000000000000000000000000000000000000..0f4157948e492440d46be2273d64c10289d1409a --- /dev/null +++ b/paddlepalm/optimizer/base_optimizer.py @@ -0,0 +1,19 @@ + +class BaseOptimizer(): + + def __init__(self, loss_var, lr, lr_schedualer=None): + self._prog = None + self._lr_schedualer = lr_schedualer + + def build(self, grad_clip=None): + pass + + def _set_prog(self, prog): + self._prog = prog + if self._lr_schedualer is not None: + self._lr_schedualer._set_prog(prog) + + def get_cur_learning_rate(self): + pass + + diff --git a/paddlepalm/reader/__init__.py b/paddlepalm/reader/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..10421172976e81883393d88a399414aa53427659 100644 --- a/paddlepalm/reader/__init__.py +++ b/paddlepalm/reader/__init__.py @@ -0,0 +1,3 @@ + +from cls import ClassifyReader + diff --git a/paddlepalm/base_reader.py b/paddlepalm/reader/base_reader.py similarity index 83% rename from paddlepalm/base_reader.py rename to paddlepalm/reader/base_reader.py index d3e378e70063bd2be477759f90d75e1c01a1eca5..03e541a8161651466c1c74a79917c4b931fc1dba 100644 --- a/paddlepalm/base_reader.py +++ b/paddlepalm/reader/base_reader.py @@ -14,23 +14,47 @@ # limitations under the License. """v1.1""" from copy import copy -class reader(object): +class BaseReader(object): """interface of data manager.""" - def __init__(self, config, phase='train'): - assert isinstance(config, dict) - self._config = config + def __init__(self, phase='train'): + # assert isinstance(config, dict) + # self._config = config self._phase = phase + self._register = set() + self._registered_backbone = None - def copy(self, phase=self._phase): + @classmethod + def create_register(self): + return set() + + def clone(self, phase='train'): if phase == self._phase: return copy(self) else: ret = copy(self) ret._phase = phase return ret + + def require_attr(self, attr_name): + self._register.add(attr_name) + def register_with(self, backbone): + print(backbone) + for attr in backbone.inputs_attr: + self.require_attr(attr) + self._registered_backbone = backbone + + def get_registered_backbone(self): + return self._registered_backbone + def _get_registed_attrs(self, attrs): + ret = {} + for i in self._register: + if i not in attrs: + raise NotImplementedError('output attr {} is not found in this reader.'.format(i)) + ret[i] = attrs[i] + return ret # @property # def inputs_attr(self): diff --git a/paddlepalm/reader/cls.py b/paddlepalm/reader/cls.py index 5be7d3bbbd53ffd1b6fa93765117931f93eaae69..548eeb588d32cf994bde04e6dd902b73b7f19314 100644 --- a/paddlepalm/reader/cls.py +++ b/paddlepalm/reader/cls.py @@ -13,108 +13,69 @@ # See the License for the specific language governing permissions and # limitations under the License. -from paddlepalm.interface import reader -from paddlepalm.reader.utils.reader4ernie import ClassifyReader - -def classify(data_path, vocab_path, batch_size, max_len, \ - pred_batch_size=None, file_format='csv', tokenizer='wordpiece', \ - lang='en', shuffle_train=True, seed=None, do_lower_case=False, \ - seed=None, phase='train'): - - assert lang.lower() in ['en', 'cn', 'english', 'chinese'], "supported language: en (English), cn (Chinese)." - assert phase in ['train', 'pred'], "supported phase: train, pred." - config = { - 'train_file': data_path, - 'pred_file': data_path, - 'batch_size': batch_size, - 'pred_batch_size': pred_batch_size, - 'max_len': max_len, - 'file_format': file_format, - 'tokenizer': tokenizer, - 'for_cn': lang.lower() == 'cn' or lang.lower() == 'chinese', - 'shuffle_train': shuffle_train, - 'do_lower_case': do_lower_case, - 'seed': seed - } - if pred_batch_size is None: - del config['pred_batch_size'] - - return Reader(config, phase=phase) - -class Reader(reader): +from paddlepalm.reader.base_reader import BaseReader +from paddlepalm.reader.utils.reader4ernie import ClassifyReader as CLSReader + + +class ClassifyReader(BaseReader): - def __init__(self, config, phase='train', print_prefix=''): - """ - Args: - phase: train, eval, pred - """ + def __init__(self, vocab_path, max_len, tokenizer='wordpiece', \ + lang='en', seed=None, do_lower_case=False, phase='train'): - self._is_training = phase == 'train' + BaseReader.__init__(self, phase) - reader = ClassifyReader(config['vocab_path'], - max_seq_len=config['max_len'], - do_lower_case=config.get('do_lower_case', False), - for_cn=config.get('for_cn', False), - random_seed=config.get('seed', None)) - self._reader = reader + assert lang.lower() in ['en', 'cn', 'english', 'chinese'], "supported language: en (English), cn (Chinese)." + assert phase in ['train', 'pred'], "supported phase: train, pred." - self._batch_size = config['batch_size'] - self._max_seq_len = config['max_len'] + for_cn = lang.lower() == 'cn' or lang.lower() == 'chinese' - self._input_file = config['data_path'] + self._register.add('token_ids') if phase == 'train': - self._num_epochs = None # 防止iteartor终止 - self._shuffle = config.get('shuffle_train', True) - # self._shuffle_buffer = config.get('shuffle_buffer', 5000) - elif phase == 'eval': - self._num_epochs = 1 - self._shuffle = False - self._batch_size = config.get('pred_batch_size', self._batch_size) - elif phase == 'pred': - self._num_epochs = 1 - self._shuffle = False - self._batch_size = config.get('pred_batch_size', self._batch_size) + self._register.add('label_ids') + + self._is_training = phase == 'train' + + cls_reader = CLSReader(vocab_path, + max_seq_len=max_len, + do_lower_case=do_lower_case, + for_cn=for_cn, + random_seed=seed) + self._reader = cls_reader self._phase = phase # self._batch_size = - self._print_first_n = config.get('print_first_n', 0) + # self._print_first_n = config.get('print_first_n', 0) @property def outputs_attr(self): - if self._is_training: - return {"token_ids": [[-1, -1], 'int64'], - "position_ids": [[-1, -1], 'int64'], - "segment_ids": [[-1, -1], 'int64'], - "input_mask": [[-1, -1, 1], 'float32'], - "label_ids": [[-1], 'int64'], - "task_ids": [[-1, -1], 'int64'] - } - else: - return {"token_ids": [[-1, -1], 'int64'], - "position_ids": [[-1, -1], 'int64'], - "segment_ids": [[-1, -1], 'int64'], - "task_ids": [[-1, -1], 'int64'], - "input_mask": [[-1, -1, 1], 'float32'] - } - - - def load_data(self): - self._data_generator = self._reader.data_generator(self._input_file, self._batch_size, self._num_epochs, shuffle=self._shuffle, phase=self._phase) - - def iterator(self): - - def list_to_dict(x): - names = ['token_ids', 'segment_ids', 'position_ids', 'task_ids', 'input_mask', - 'label_ids', 'unique_ids'] - outputs = {n: i for n,i in zip(names, x)} - del outputs['unique_ids'] - if not self._is_training: - del outputs['label_ids'] - return outputs - + attrs = {"token_ids": [[-1, -1], 'int64'], + "position_ids": [[-1, -1], 'int64'], + "segment_ids": [[-1, -1], 'int64'], + "input_mask": [[-1, -1, 1], 'float32'], + "label_ids": [[-1], 'int64'], + "task_ids": [[-1, -1], 'int64'] + } + return self._get_registed_attrs(attrs) + + + def _load_data(self, input_file, batch_size, num_epochs=None, \ + file_format='csv', shuffle_train=True): + self._data_generator = self._reader.data_generator(input_file, batch_size, \ + num_epochs, shuffle=shuffle_train if self._phase == 'train' else False, \ + phase=self._phase) + + def _iterator(self): + + names = ['token_ids', 'segment_ids', 'position_ids', 'task_ids', 'input_mask', + 'label_ids', 'unique_ids'] for batch in self._data_generator(): - yield list_to_dict(batch) + outputs = {n: i for n,i in zip(names, batch)} + ret = [] + # TODO: move runtime shape check here + for attr in self.outputs_attr.keys(): + ret[attr] = outputs[attr] + yield ret def get_epoch_outputs(self): return {'examples': self._reader.get_examples(self._phase), @@ -124,3 +85,4 @@ class Reader(reader): def num_examples(self): return self._reader.get_num_examples(phase=self._phase) + diff --git a/paddlepalm/task/__init__.py b/paddlepalm/task/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/paddlepalm/trainer.py b/paddlepalm/trainer.py index ed3cc6f73e4ee8af348fdc0a5af7756e6d737473..37817c94d6aacca335b30e39c0cc3d0af6515ee2 100644 --- a/paddlepalm/trainer.py +++ b/paddlepalm/trainer.py @@ -13,42 +13,53 @@ # See the License for the specific language governing permissions and # limitations under the License. -from paddlepalm.interface import reader as base_reader -from paddlepalm.interface import task_paradigm as base_paradigm +from __future__ import print_function import os import json from paddle import fluid -import importlib -from paddlepalm.default_settings import * +import paddlepalm.utils.basic_helper as helper +from paddlepalm.utils import reader_helper +# from paddlepalm.default_settings import * +DEBUG=False -def Trainer(object): - def __init__(self, name, reader, task, mix_ratio=1.0, \ - save_predict_model=True, save_path=None, save_steps=-1)\ - reuse_with=None, silent=False): +class Trainer(object): + + def __init__(self, name, reader, task_head, \ + save_predict_model=False, pred_head=None, save_path=None, save_steps=-1, \ + mix_ratio=1.0, reuse_head_with=None, \ + silent=False): self._name = name self._verbose = not silent + self._reader = reader + self._pred_reader = None + self._task_head = task_head + self._pred_head = pred_head if save_predict_model: + self._save_predict_model = True assert save_path is not None, "save_path is required when save_predict_model is set." assert save_steps == -1 or save_steps > 0, "save_steps should be -1 (only save the last step of this task) or larger than 0" - assert pred_reader is not None and pred_task is not None, "" - - self._save_infermodel_path = os.path.join(self._config['save_path'], self._name, 'infer_model') + assert pred_head is not None, "pred_head is required to save predict model." + self._pred_reader = reader.clone(phase='pred') + if save_path is not None and not os.path.exists(save_path): + os.makedirs(save_path) else: - self._save_infermodel_path = infermodel_save_path + assert save_path is None, "You should set save_predict_model as True, or the save_path is invalid." + assert save_steps == -1 or save_steps == 0, "You should set save_predict_model as True, or the save_steps is invalid." + assert pred_head is None, "You should set save_predict_model as True, or the pred_head is invalid." - self._save_infermodel_every_n_steps = save_infermodel_every_n_steps + self._save_steps = save_steps - self._is_target = as_target - self._first_target = False - self._task_reuse_scope = name if task_layer_reuse is None else task_layer_reuse + self._task_reuse_scope = name if reuse_head_with is None else reuse_head_with self._feeded_var_names = None self._target_vars = None + self._num_examples = 0 + # training process management self._mix_ratio = mix_ratio self._expected_train_steps = None @@ -59,10 +70,10 @@ def Trainer(object): self._train_finish = False # 存放不同运行阶段(train,eval,pred)的数据集reader,key为phase,value为Reader实例 - self._reader = {'train': reader, 'eval': None, 'pred': pred_reader} - self._input_layer = None + # self._reader = {'train': reader, 'eval': None, 'pred': self._pred_reader} + # self._input_layer = None self._inputname_to_varname = {} - self._task_layer = {'train': tasklayer, 'eval': None, 'pred': pred_tasklayer} + # self._task_layer = {'train': task_head, 'eval': None, 'pred': pred_head} self._pred_input_name_list = [] self._pred_input_varname_list = [] self._pred_fetch_name_list = [] @@ -76,10 +87,183 @@ def Trainer(object): 'fetch_list': 'self._pred_fetch_name_list'} self._lock = False + self._build_forward = False + - def _build_task_layer(self, net_inputs, phase, scope=""): - output_vars = self._task_layer[phase].build(net_inputs, scope_name=scope) + def build_forward(self, backbone, pred_backbone=None): + + # assert self._backbone is not None, "backbone is required for Trainer to build net forward to run with single task mode" + self._build_forward = True + if self._save_predict_model: + assert pred_backbone is not None, "" + + # create reader, task + # then check i/o across reader, backbone and task_layer + task_attrs = [] + pred_task_attrs = [] + + task_attr_from_reader = helper.encode_inputs(self._task_head.inputs_attrs['reader'], self.name) + # task_attr_from_reader = self._task_head.inputs_attrs['reader'] + + # _check_io(backbone.inputs_attr, inst._reader['train'].outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train') + # _check_io(inst.taskblock['train'].inputs_attrs['reader'], inst._reader['train'].outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train') + # _check_io(inst._taskblock['train'].inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone') + + if self._save_predict_model: + pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name) + # pred_task_attr_from_reader = self._pred_head.inputs_attrs['reader'] + + # _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred') + # _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred') + # _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone') + + # merge reader input attrs from backbone and task_instances + input_names, shape_and_dtypes, name_to_position = reader_helper.merge_input_attrs(backbone.inputs_attr, task_attr_from_reader) + pred_input_names, pred_shape_and_dtypes, _ = reader_helper.merge_input_attrs(backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False) + # shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN] + self._shape_and_dtypes = shape_and_dtypes + self._name_to_position = name_to_position + + if DEBUG: + print('----- for debug -----') + print('joint input names:') + print(joint_input_names) + print('joint input shape and dtypes:') + print(joint_shape_and_dtypes) + + + input_attrs = [[i, j, k] for i, (j,k) in zip(input_names, shape_and_dtypes)] + pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)] + + train_prog = fluid.Program() + train_init_prog = fluid.Program() + self._prog = train_prog + self._train_prog = train_prog + self._train_init_prog = train_init_prog + with fluid.program_guard(train_prog, train_init_prog): + net_inputs = reader_helper.create_net_inputs(input_attrs, async=False) + + # build backbone and task layers + # bb_output_vars = self._backbone.build(net_inputs, scope_name='__paddlepalm_') + bb_output_vars = backbone.build(net_inputs) + assert sorted(bb_output_vars.keys()) == sorted(backbone.outputs_attr.keys()) + + pred_prog = fluid.Program() + pred_init_prog = fluid.Program() + with fluid.program_guard(pred_prog, pred_init_prog): + pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs) + # pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_') + pred_bb_output_vars = pred_backbone.build(pred_net_inputs) + + # fluid.framework.switch_main_program(train_prog) + # fluid.framework.switch_startup_program(train_init_prog) + + task_output_vars = {} + task_inputs = {'backbone': bb_output_vars} + task_inputs_from_reader = helper.decode_inputs(net_inputs, self.name) + task_inputs['reader'] = task_inputs_from_reader + + scope = self.name+'.' + with fluid.program_guard(train_prog, train_init_prog): + with fluid.unique_name.guard(scope): + output_vars = self._build_head(task_inputs, phase='train', scope=scope) + output_vars = {self.name+'.'+key: val for key, val in output_vars.items()} + old = len(task_output_vars) # for debug + task_output_vars.update(output_vars) + assert len(task_output_vars) - old == len(output_vars) # for debug + + # prepare predict vars for saving inference model + if self._save_predict_model: + with fluid.program_guard(pred_prog, pred_init_prog): + cur_inputs = helper.decode_inputs(pred_net_inputs, self.name) + # self.pred_input = cur_inputs + self._pred_input_name_list, self._pred_input_varname_list = \ + zip(*[[k, v.name] for k,v in cur_inputs.items()]) + + pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs} + scope = self.name + '.' + with fluid.unique_name.guard(scope): + self._build_head(pred_task_inputs, phase='pred', scope=scope) + + + bb_fetches = {k: v.name for k,v in bb_output_vars.items()} + task_fetches = {k: v.name for k,v in task_output_vars.items()} + # fetches = task_fetches + # fetches['__task_id'] = net_inputs['__task_id'].name + + # compute loss + # task_id_var = net_inputs['__task_id'] + # task_id_vec = layers.one_hot(task_id_var, num_instances) + # losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0) + # loss = layers.reduce_sum(task_id_vec * losses) + with fluid.program_guard(train_prog, train_init_prog): + loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss']) + return loss_var + + def build_backward(self, optimizer, weight_decay=None, use_ema=False, ema_decay=0.9999): + # build optimizer + optimizer._set_prog(self._train_prog) + with fluid.program_guard(self._train_prog, self._train_init_prog): + param_grads = optimizer.build() + + if weight_decay is not None: + + param_list = dict() + + for param in self._prog.global_block().all_parameters(): + param_list[param.name] = param * 1.0 + param_list[param.name].stop_gradient = True + + def exclude_from_weight_decay(name): + if name.find("layer_norm") > -1: + return True + bias_suffix = ["_bias", "_b", ".b_0"] + for suffix in bias_suffix: + if name.endswith(suffix): + return True + return False + + for param, grad in param_grads: + if exclude_from_weight_decay(param.name): + continue + with param.block.program._optimized_guard( + [param, grad]), fluid.framework.name_scope("weight_decay"): + updated_param = param - param_list[ + param.name] * weight_decay * optimizer.get_cur_learning_rate() + fluid.layers.assign(output=param, input=updated_param) + + + # loss.persistable = True + if use_ema: + ema = fluid.optimizer.ExponentialMovingAverage(ema_decay) + ema.update() + + def load_data(self, input_file, file_format, batch_size, num_epochs=None, shuffle_train=True): + # load data + print("preparing data...", end='') + self._reader._load_data(input_file=input_file, batch_size=batch_size, \ + num_epochs=num_epochs, file_format=file_format, \ + shuffle_train=shuffle_train) + self._num_examples = self._reader.num_examples + print('ok!') + + # merge dataset iterators and create net input vars + iterator = self._reader._iterator() + prefix = self.name + + # 对yield出的数据进行runtime检查和适配 + iterator_fn = reader_helper.create_iterator_fn(iterator, prefix, self._shape_and_dtypes, self._name_to_position) + return iterator_fn + + def random_init_params(self): + + helper.build_executor() + + def _build_head(self, net_inputs, phase, scope=""): + if phase == 'train': + output_vars = self._task_head.build(net_inputs, scope_name=scope) if phase == 'pred': + output_vars = self._pred_head.build(net_inputs, scope_name=scope) if output_vars is not None: self._pred_fetch_name_list, self._pred_fetch_var_list = zip(*output_vars.items()) else: @@ -126,62 +310,22 @@ def Trainer(object): return self._name @property - def _Reader(self): - return self._Reader - - @property - def _Paradigm(self): - return self._Paradigm - - @property - def _reader(self): - return self._reader + def num_examples(self): + return self._num_examples - @property - def _pred_input(self): - return zip(*[self._pred_input_name_list, self._pred_input_varname_list]) - - @_pred_input.setter - def _pred_input(self, val): - assert isinstance(val, dict) - self._pred_input_name_list, self._pred_input_varname_list = \ - zip(*[[k, v.name] for k,v in val.items()]) - - @property - def _pred_fetch_list(self): - return [self._pred_fetch_name_list, self._pred_fetch_var_list] - - @property - def _task_layer(self): - return self._task_layer - - @property - def _is_first_target(self): - return self._is_first_target - - @_is_first_target.setter - def _is_first_target(self, value): - self._is_first_target = bool(value) - if self._is_first_target: - assert self._is_target, "ERROR: only target task could be set as main task." - if self._verbose and self._is_first_target: - print("{}: set as main task".format(self._name)) + # @property + # def _pred_input(self): + # return zip(*[self._pred_input_name_list, self._pred_input_varname_list]) - @property - def _is_target(self): - if self._is_target is not None: - return self._is_target - else: - raise ValueError("{}: is_target is None".format(self._name)) + # @_pred_input.setter + # def _pred_input(self, val): + # assert isinstance(val, dict) + # self._pred_input_name_list, self._pred_input_varname_list = \ + # zip(*[[k, v.name] for k,v in val.items()]) - @_is_target.setter - def _is_target(self, value): - self._is_target = bool(value) - if self._verbose: - if self._is_target: - print('{}: set as target task.'.format(self._name)) - else: - print('{}: set as aux task.'.format(self._name)) + # @property + # def _pred_fetch_list(self): + # return [self._pred_fetch_name_list, self._pred_fetch_var_list] @property def mix_ratio(self): @@ -209,7 +353,7 @@ def Trainer(object): return self._expected_train_steps @expected_train_steps.setter - def _expected_train_steps(self, value): + def expected_train_steps(self, value): self._expected_train_steps = value self._expected_train_epochs = value / float(self._steps_pur_epoch) @@ -221,29 +365,25 @@ def Trainer(object): def cur_train_epoch(self): return self._cur_train_epoch - @cur_train_epoch.setter - def _cur_train_epoch(self, value): - self._cur_train_epoch = value - @property def cur_train_step(self): return self._cur_train_step - @cur_train_step.setter - def _cur_train_step(self, value): - self._cur_train_step = value - if self._cur_train_step > self._steps_pur_epoch: - self._cur_train_epoch += 1 - self._cur_train_step = 1 - if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps: - self._train_finish = True + # @cur_train_step.setter + # def _cur_train_step(self, value): + # self._cur_train_step = value + # if self._cur_train_step > self._steps_pur_epoch: + # self._cur_train_epoch += 1 + # self._cur_train_step = 1 + # if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps: + # self._train_finish = True @property def steps_pur_epoch(self): return self._steps_pur_epoch @steps_pur_epoch.setter - def _steps_pur_epoch(self, value): + def steps_pur_epoch(self, value): self._steps_pur_epoch = value @property @@ -259,302 +399,3 @@ def Trainer(object): def _set_lock(self): self._lock = True - # @property - # def task_reuse_scope(self): - # if self._task_reuse_scope is not None: - # return self._task_reuse_scope - # else: - # raise ValueError("{}: task_reuse_scope is None".format(self._name)) - - # @task_reuse_scope.setter - # def task_reuse_scope(self, scope_name): - # self._task_reuse_scope = str(scope_name) - # if self._verbose: - # print('{}: task_reuse_scope is set to {}'.format(self._name, self._task_reuse_scope)) - - -def check_req_args(conf, name): - assert 'reader' in conf, name+': reader is required to build TaskInstance.' - assert 'paradigm' in conf, name+': paradigm is required to build TaskInstance.' - assert 'train_file' in conf or 'pred_file' in conf, name+': at least train_file or pred_file should be provided to build TaskInstance.' - - -class TaskInstance(object): - - def __init__(self, name, id, config, verbose=True): - self._name = name - self._config = config - self._verbose = verbose - - check_req_args(config, name) - - # parse Reader and Paradigm - reader_name = config['reader'] - reader_mod = importlib.import_module(READER_DIR + '.' + reader_name) - Reader = getattr(reader_mod, 'Reader') - - parad_name = config['paradigm'] - parad_mod = importlib.import_module(PARADIGM_DIR + '.' + parad_name) - Paradigm = getattr(parad_mod, 'TaskType') - - self._Reader = Reader - self._Paradigm = Paradigm - - self._save_infermodel_path = os.path.join(self._config['save_path'], self._name, 'infer_model') - self._save_ckpt_path = os.path.join(self._config['save_path'], 'ckpt') - self._save_infermodel_every_n_steps = config.get('save_infermodel_every_n_steps', -1) - - # following flags can be fetch from instance config file - self._is_target = config.get('is_target', True) - self._first_target = config.get('is_first_target', False) - self._task_reuse_scope = config.get('task_reuse_scope', name) - - self._feeded_var_names = None - self._target_vars = None - - # training process management - self._mix_ratio = None - self._expected_train_steps = None - self._expected_train_epochs = None - self._steps_pur_epoch = None - self._cur_train_epoch = 0 - self._cur_train_step = 0 - self._train_finish = False - - # 存放不同运行阶段(train,eval,pred)的数据集reader,key为phase,value为Reader实例 - self._reader = {'train': None, 'eval': None, 'pred': None} - self._input_layer = None - self._inputname_to_varname = {} - self._task_layer = {'train': None, 'eval': None, 'pred': None} - self._pred_input_name_list = [] - self._pred_input_varname_list = [] - self._pred_fetch_name_list = [] - self._pred_fetch_var_list = [] - - self._exe = fluid.Executor(fluid.CPUPlace()) - - self._save_protocol = { - 'input_names': 'self._pred_input_name_list', - 'input_varnames': 'self._pred_input_varname_list', - 'fetch_list': 'self._pred_fetch_name_list'} - - - def build_task_layer(self, net_inputs, phase, scope=""): - output_vars = self._task_layer[phase].build(net_inputs, scope_name=scope) - if phase == 'pred': - if output_vars is not None: - self._pred_fetch_name_list, self._pred_fetch_var_list = zip(*output_vars.items()) - else: - self._pred_fetch_name_list = [] - self._pred_fetch_var_list = [] - return output_vars - - def postprocess(self, rt_outputs, phase): - return self._task_layer[phase].postprocess(rt_outputs) - - def epoch_postprocess(self, epoch_inputs, phase): - return self._task_layer[phase].epoch_postprocess(epoch_inputs) - - def save(self, suffix=''): - dirpath = self._save_infermodel_path + suffix - self._pred_input_varname_list = [str(i) for i in self._pred_input_varname_list] - - # fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, export_for_deployment = True) - prog = fluid.default_main_program().clone() - fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, prog) - - conf = {} - for k, strv in self._save_protocol.items(): - d = None - v = locals() - exec('d={}'.format(strv), globals(), v) - conf[k] = v['d'] - with open(os.path.join(dirpath, '__conf__'), 'w') as writer: - writer.write(json.dumps(conf, indent=1)) - print(self._name + ': inference model saved at ' + dirpath) - - def load(self, infer_model_path=None): - if infer_model_path is None: - infer_model_path = self._save_infermodel_path - for k,v in json.load(open(os.path.join(infer_model_path, '__conf__'))).items(): - strv = self._save_protocol[k] - exec('{}=v'.format(strv)) - pred_prog, self._pred_input_varname_list, self._pred_fetch_var_list = \ - fluid.io.load_inference_model(infer_model_path, self._exe) - print(self._name+': inference model loaded from ' + infer_model_path) - return pred_prog - - @property - def name(self): - return self._name - - @property - def Reader(self): - return self._Reader - - # @Reader.setter - # def Reader(self, cls): - # assert base_reader.__name__ == cls.__bases__[-1].__name__, \ - # "expect: {}, receive: {}.".format(base_reader.__name__, \ - # cls.__bases__[-1].__name__) - # self._Reader = cls - - @property - def Paradigm(self): - return self._Paradigm - - # @Paradigm.setter - # def Paradigm(self, cls): - # assert base_paradigm.__name__ == cls.__bases__[-1].__name__, \ - # "expect: {}, receive: {}.".format(base_paradigm.__name__, \ - # cls.__bases__[-1].__name__) - # self._Paradigm = cls - - @property - def config(self): - return self._config - - @property - def reader(self): - return self._reader - - @property - def pred_input(self): - return zip(*[self._pred_input_name_list, self._pred_input_varname_list]) - - @pred_input.setter - def pred_input(self, val): - assert isinstance(val, dict) - self._pred_input_name_list, self._pred_input_varname_list = \ - zip(*[[k, v.name] for k,v in val.items()]) - - @property - def pred_fetch_list(self): - return [self._pred_fetch_name_list, self._pred_fetch_var_list] - - @property - def task_layer(self): - return self._task_layer - - @property - def is_first_target(self): - return self._is_first_target - - @is_first_target.setter - def is_first_target(self, value): - self._is_first_target = bool(value) - if self._is_first_target: - assert self._is_target, "ERROR: only target task could be set as main task." - if self._verbose and self._is_first_target: - print("{}: set as main task".format(self._name)) - - @property - def is_target(self): - if self._is_target is not None: - return self._is_target - else: - raise ValueError("{}: is_target is None".format(self._name)) - - @is_target.setter - def is_target(self, value): - self._is_target = bool(value) - if self._verbose: - if self._is_target: - print('{}: set as target task.'.format(self._name)) - else: - print('{}: set as aux task.'.format(self._name)) - - @property - def mix_ratio(self): - if self._mix_ratio is not None: - return self._mix_ratio - else: - raise ValueError("{}: mix_ratio is None".format(self._name)) - - @mix_ratio.setter - def mix_ratio(self, value): - self._mix_ratio = float(value) - if self._verbose: - print('{}: mix_ratio is set to {}'.format(self._name, self._mix_ratio)) - - @property - def save_infermodel_every_n_steps(self): - return self._save_infermodel_every_n_steps - - @property - def expected_train_steps(self): - return self._expected_train_steps - - @expected_train_steps.setter - def expected_train_steps(self, value): - self._expected_train_steps = value - self._expected_train_epochs = value / float(self._steps_pur_epoch) - - @property - def expected_train_epochs(self): - return self._expected_train_epochs - - @property - def cur_train_epoch(self): - return self._cur_train_epoch - - @cur_train_epoch.setter - def cur_train_epoch(self, value): - self._cur_train_epoch = value - - @property - def cur_train_step(self): - return self._cur_train_step - - @cur_train_step.setter - def cur_train_step(self, value): - self._cur_train_step = value - if self._cur_train_step > self._steps_pur_epoch: - self._cur_train_epoch += 1 - self._cur_train_step = 1 - if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps: - self._train_finish = True - - @property - def steps_pur_epoch(self): - return self._steps_pur_epoch - - @steps_pur_epoch.setter - def steps_pur_epoch(self, value): - self._steps_pur_epoch = value - - @property - def train_finish(self): - return self._train_finish - - @property - def task_reuse_scope(self): - if self._task_reuse_scope is not None: - return self._task_reuse_scope - else: - raise ValueError("{}: task_reuse_scope is None".format(self._name)) - - @task_reuse_scope.setter - def task_reuse_scope(self, scope_name): - self._task_reuse_scope = str(scope_name) - if self._verbose: - print('{}: task_reuse_scope is set to {}'.format(self._name, self._task_reuse_scope)) - - - - - - - -def check_instances(insts): - """to check ids, first_target""" - pass - -def _check_ids(): - pass - -def _check_targets(): - pass - -def _check_reuse_scopes(): - pass diff --git a/paddlepalm/utils/__init__.py b/paddlepalm/utils/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..d362471c9c3312bd333d7d2b21663b5d5cc6b500 100644 --- a/paddlepalm/utils/__init__.py +++ b/paddlepalm/utils/__init__.py @@ -0,0 +1,4 @@ + +import basic_helper +import config_helper + diff --git a/paddlepalm/utils/basic_helper.py b/paddlepalm/utils/basic_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..a95f690694afc3047b96123369d3833ca6eb4dc0 --- /dev/null +++ b/paddlepalm/utils/basic_helper.py @@ -0,0 +1,116 @@ +# coding=utf-8 +import os +import json +import yaml +from config_helper import PDConfig + +def get_basename(f): + return os.path.splitext(f)[0] + + +def get_suffix(f): + return os.path.splitext(f)[-1] + + +def parse_yaml(f, asdict=True, support_cmd_line=False): + assert os.path.exists(f), "file {} not found.".format(f) + if support_cmd_line: + args = PDConfig(yaml_file=f, fuse_args=True) + args.build() + return args.asdict() if asdict else args + else: + if asdict: + with open(f, "r") as fin: + yaml_config = yaml.load(fin, Loader=yaml.SafeLoader) + return yaml_config + else: + raise NotImplementedError() + + +def parse_json(f, asdict=True, support_cmd_line=False): + assert os.path.exists(f), "file {} not found.".format(f) + if support_cmd_line: + args = PDConfig(json_file=f, fuse_args=support_cmd_line) + args.build() + return args.asdict() if asdict else args + else: + if asdict: + with open(f, "r") as fin: + config = json.load(fin) + return config + else: + raise NotImplementedError() + + +def parse_list(string, astype=str): + assert isinstance(string, str), "{} is not a string.".format(string) + if ',' not in string: + return [astype(string)] + string = string.replace(',', ' ') + return [astype(i) for i in string.split()] + + +def try_float(s): + try: + float(s) + return(float(s)) + except: + return s + + +# TODO: 增加None机制,允许hidden size、batch size和seqlen设置为None +def check_io(in_attr, out_attr, strict=False, in_name="left", out_name="right"): + for name, attr in in_attr.items(): + assert name in out_attr, in_name+': '+name+' not found in '+out_name + if attr != out_attr[name]: + if strict: + raise ValueError(name+': shape or dtype not consistent!') + else: + logging.warning('{}: shape or dtype not consistent!\n{}:\n{}\n{}:\n{}'.format(name, in_name, attr, out_name, out_attr[name])) + + +def encode_inputs(inputs, scope_name, sep='/', cand_set=None): + outputs = {} + for k, v in inputs.items(): + if cand_set is not None: + if k in cand_set: + outputs[k] = v + if scope_name+sep+k in cand_set: + outputs[scope_name+sep+k] = v + else: + outputs[scope_name+sep+k] = v + return outputs + + +def decode_inputs(inputs, scope_name, sep='/', keep_unk_keys=True): + outputs = {} + for name, value in inputs.items(): + # var for backbone are also available to tasks + if keep_unk_keys and sep not in name: + outputs[name] = value + # var for this inst + if name.startswith(scope_name+'/'): + outputs[name[len(scope_name+'/'):]] = value + return outputs + + +def build_executor(on_gpu): + if on_gpu: + place = fluid.CUDAPlace(0) + # dev_count = fluid.core.get_cuda_device_count() + else: + place = fluid.CPUPlace() + # dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + # return fluid.Executor(place), dev_count + return fluid.Executor(place) + + +def fit_attr(conf, fit_attr, strict=False): + for i, attr in fit_attr.items(): + if i not in conf: + if strict: + raise Exception('Argument {} is required to create a controller.'.format(i)) + else: + continue + conf[i] = attr(conf[i]) + return conf diff --git a/paddlepalm/utils/reader_helper.py b/paddlepalm/utils/reader_helper.py index 63362b3ba19d43511e290dbe21cf733b283bbec5..f4c5d803f2b6fc067cc0a9283351b803a5b60ebc 100644 --- a/paddlepalm/utils/reader_helper.py +++ b/paddlepalm/utils/reader_helper.py @@ -88,7 +88,7 @@ def create_iterator_fn(iterator, iterator_prefix, shape_and_dtypes, outname_to_p outputs = next(iterator) # dict type prefix = iterator_prefixe for outname, val in outputs.items(): - task_outname = prefix + '/' + outname + task_outname = prefix + '.' + outname if outname in outname_to_pos: idx = outname_to_pos[outname]