From 0a3171612bd3bd1861c44198e457ae8d6fc206d7 Mon Sep 17 00:00:00 2001 From: xixiaoyao Date: Wed, 25 Dec 2019 21:53:33 +0800 Subject: [PATCH] release 0.3 --- demo/demo3/.run.py.swp | Bin 12288 -> 0 bytes demo/demo3/paddlepalm | 1 + demo/demo3/pretrain | 1 + demo/demo3/run.py | 87 +++- paddlepalm/__init__.py | 12 +- paddlepalm/_downloader.py | 171 ++++++++ paddlepalm/backbone/bert.py | 12 +- paddlepalm/backbone/ernie.py | 16 +- paddlepalm/downloader.py | 46 +-- paddlepalm/mtl_controller.py | 747 +++++++++++++++++++++++++++++++++++ 10 files changed, 1010 insertions(+), 83 deletions(-) delete mode 100644 demo/demo3/.run.py.swp create mode 120000 demo/demo3/paddlepalm create mode 120000 demo/demo3/pretrain create mode 100644 paddlepalm/_downloader.py create mode 100755 paddlepalm/mtl_controller.py diff --git a/demo/demo3/.run.py.swp b/demo/demo3/.run.py.swp deleted file mode 100644 index 2370d0471d868b0af30f7542375a1231fa978d03..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeI2O>f*p7{@1w1N4R0FJOp-9kj7GNvkFxON}b=no1zG^bl2}u|2!%#BZG$C%d_D z<1=uBJJ1vG4LHCR4shTCv|Nx7CvJ%U%-EYHRVvB_MH;LB^}IdLGtbY@colV%;d^gh z_cp^J!SxIww|@BJ_T6VMEWPwNAx9Hdjc0k8S7SYU>Xq!xj#lMNu8x~_b-%90+F~oz z!(qbmtPRt;gmpBjOCCv?U5TvM=wNLay|cY@J<51ln~#l1v{l&5P7UTJ905n*(FwF_ zw7Plm=EnM(IqJ(VzTiFo+|5V#<18EjN5Bzq1RMcJz!7i+905n*%o0$|5_uoHf2_;k za(7=kaqqhB<_I_fj({WJ2si?cfFs}tI0BA&`;0@ z&K(z;TjE($KfM(`}~OL2$!123;eT<|L8 zw5&5;&@30RU$?4hl^uyBhi!%*H%8hdG6r_aUm+)*d$HHBq71Li!v(M&<>T15_i)C_ z0&x|S`!<+vNsNqYL6u;66~D#`$yY2g9RQa7-pzu*#$6aLYL-#tWvBfxjPS4TcX@D8 zHZ-TSx`3$zcD@HB4!yY2cAqP zJZl*k$EG>(4tYMFD4Ow<&EjD=9Ig$#R&vTq7T*vpA9!fZr?%czXMNqN z6kI>0!(2^*Qlw^K7pL#`QEuppxafOIbuN+wM=-BcSH?-w`_hG# z)D;g+sRl_xM+Ffe)f75K+hAK7|0pFph7V5rx0rEili zaa0%gf`0E*<5jbUhZyU2irgQZ)oCA>p~MMPd_0T&hUMZgmmKjr!m>^FY@Kk?`%4J@5|dlqGoUsLA5-u%}t_I|Z*!x?O*` zTSMc&_+5LrUqk8N`86Dl&pP~WteY|Yin2*z?DE<=A^FIov|=Tv)Qe-!r?kYGqSSu{ Wb7fNth3}LsE4cn<@t8E 1: + percent = 1 + if not silent: + print('\r>> Downloading... {:.1%}'.format(percent), end = "") + + # copy to local + def _chunk_read(response, url, chunk_size = 16 * 1024, report_hook = None): + total_size = response.info().getheader('Content-Length').strip() + total_size = int(total_size) + bytes_so_far = 0 + with open("%s" % filename, "wb") as f: + while 1: + chunk = response.read(chunk_size) + f.write(chunk) + f.flush() + bytes_so_far += len(chunk) + if not chunk: + break + if report_hook: + report_hook(bytes_so_far, total_size) + return bytes_so_far + + response = urlopen(data_url) + _chunk_read(response, data_url, report_hook=_chunk_report) + + if not silent: + print(' done!') + + if item == 'pretrain': + if not silent: + print ('Extracting {}...'.format(data_name), end=" ") + if os.path.exists(filename): + tar = tarfile.open(filename, 'r') + tar.extractall(path = data_dir) + tar.close() + os.remove(filename) + if scope.startswith('bert'): + source_path = data_dir + '/' + data_name.split('.')[0] + fileList = os.listdir(source_path) + for file in fileList: + filePath = os.path.join(source_path, file) + shutil.move(filePath, data_dir) + os.removedirs(source_path) + if not silent: + print ('done!') + if not silent: + print ('Converting params...', end=" ") + _convert(data_dir, silent) + if not silent: + print ('done!') + + +def _convert(path, silent=False): + if os.path.isfile(path + '/params/__palminfo__'): + if not silent: + print ('already converted.') + else: + if os.path.exists(path + '/params/'): + os.rename(path + '/params/', path + '/params1/') + os.mkdir(path + '/params/') + tar_model = tarfile.open(path + '/params/' + '__palmmodel__', 'w') + tar_info = open(path + '/params/'+ '__palminfo__', 'w') + for root, dirs, files in os.walk(path + '/params1/'): + for file in files: + src_file = os.path.join(root, file) + tar_model.add(src_file, '__paddlepalm_' + file) + tar_info.write('__paddlepalm_' + file) + os.remove(src_file) + tar_model.close() + tar_info.close() + os.removedirs(path + '/params1/') + +def download(item, scope='all', path='.'): + item = item.lower() + scope = scope.lower() + assert item in _items, '{} is not found. Support list: {}'.format(item, list(_items.keys())) + + if _items[item]['utils'] is not None: + _download(item, 'utils', path, silent=True) + + if scope != 'all': + assert scope in _items[item], '{} is not found. Support scopes: {}'.format(scope, list(_items[item].keys())) + _download(item, scope, path) + else: + for s in _items[item].keys(): + _download(item, s, path) + + +def _ls(item, scope, l = 10): + if scope != 'all': + assert scope in _items[item], '{} is not found. Support scopes: {}'.format(scope, list(_items[item].keys())) + print ('{}'.format(scope)) + else: + for s in _items[item].keys(): + if s == 'utils': + continue + print (' => '+s) + +def ls(item='all', scope='all'): + + if scope == 'utils': + return + if item != 'all': + assert item in _items, '{} is not found. Support scopes: {}'.format(item, list(_items.keys())) + print ('Available {} items:'.format(item)) + _ls(item, scope) + else: + l = max(map(len, _items.keys())) + for i in _items.keys(): + print ('Available {} items: '.format(i)) + _ls(i, scope, l) + + + diff --git a/paddlepalm/backbone/bert.py b/paddlepalm/backbone/bert.py index 74f772c..d3592a5 100644 --- a/paddlepalm/backbone/bert.py +++ b/paddlepalm/backbone/bert.py @@ -52,9 +52,9 @@ class Model(backbone): @property def inputs_attr(self): - return {"token_ids": [[-1, -1, 1], 'int64'], - "position_ids": [[-1, -1, 1], 'int64'], - "segment_ids": [[-1, -1, 1], 'int64'], + return {"token_ids": [[-1, -1], 'int64'], + "position_ids": [[-1, -1], 'int64'], + "segment_ids": [[-1, -1], 'int64'], "input_mask": [[-1, -1, 1], 'float32']} @property @@ -73,7 +73,7 @@ class Model(backbone): self._emb_dtype = 'float32' # padding id in vocabulary must be set to 0 - emb_out = fluid.layers.embedding( + emb_out = fluid.embedding( input=src_ids, size=[self._voc_size, self._emb_size], dtype=self._emb_dtype, @@ -84,14 +84,14 @@ class Model(backbone): # fluid.global_scope().find_var('backbone-word_embedding').get_tensor() embedding_table = fluid.default_main_program().global_block().var(scope_name+self._word_emb_name) - position_emb_out = fluid.layers.embedding( + position_emb_out = fluid.embedding( input=pos_ids, size=[self._max_position_seq_len, self._emb_size], dtype=self._emb_dtype, param_attr=fluid.ParamAttr( name=scope_name+self._pos_emb_name, initializer=self._param_initializer)) - sent_emb_out = fluid.layers.embedding( + sent_emb_out = fluid.embedding( sent_ids, size=[self._sent_types, self._emb_size], dtype=self._emb_dtype, diff --git a/paddlepalm/backbone/ernie.py b/paddlepalm/backbone/ernie.py index 1e47153..ded1963 100644 --- a/paddlepalm/backbone/ernie.py +++ b/paddlepalm/backbone/ernie.py @@ -62,11 +62,11 @@ class Model(backbone): @property def inputs_attr(self): - return {"token_ids": [[-1, -1, 1], 'int64'], - "position_ids": [[-1, -1, 1], 'int64'], - "segment_ids": [[-1, -1, 1], 'int64'], + return {"token_ids": [[-1, -1], 'int64'], + "position_ids": [[-1, -1], 'int64'], + "segment_ids": [[-1, -1], 'int64'], "input_mask": [[-1, -1, 1], 'float32'], - "task_ids": [[-1,-1, 1], 'int64']} + "task_ids": [[-1,-1], 'int64']} @property def outputs_attr(self): @@ -85,7 +85,7 @@ class Model(backbone): task_ids = inputs['task_ids'] # padding id in vocabulary must be set to 0 - emb_out = fluid.layers.embedding( + emb_out = fluid.embedding( input=src_ids, size=[self._voc_size, self._emb_size], dtype=self._emb_dtype, @@ -96,14 +96,14 @@ class Model(backbone): # fluid.global_scope().find_var('backbone-word_embedding').get_tensor() embedding_table = fluid.default_main_program().global_block().var(scope_name+self._word_emb_name) - position_emb_out = fluid.layers.embedding( + position_emb_out = fluid.embedding( input=pos_ids, size=[self._max_position_seq_len, self._emb_size], dtype=self._emb_dtype, param_attr=fluid.ParamAttr( name=scope_name+self._pos_emb_name, initializer=self._param_initializer)) - sent_emb_out = fluid.layers.embedding( + sent_emb_out = fluid.embedding( sent_ids, size=[self._sent_types, self._emb_size], dtype=self._emb_dtype, @@ -113,7 +113,7 @@ class Model(backbone): emb_out = emb_out + position_emb_out emb_out = emb_out + sent_emb_out - task_emb_out = fluid.layers.embedding( + task_emb_out = fluid.embedding( task_ids, size=[self._task_types, self._emb_size], dtype=self._emb_dtype, diff --git a/paddlepalm/downloader.py b/paddlepalm/downloader.py index 159c3fb..640439c 100644 --- a/paddlepalm/downloader.py +++ b/paddlepalm/downloader.py @@ -1,46 +1,2 @@ - -from __future__ import print_function -import os - -items = { - 'pretrain': {'ernie-en-uncased-large': 'http://xxxxx', - 'xxx': 'xxx', - 'utils': None} - 'reader': {'cls': 'xxx', - 'xxx': 'xxx', - 'utils': 'xxx'} - 'backbone': {xxx} - 'tasktype': {xxx} -} - - -def download(item, scope='all', path='.'): - item = item.lower() - scope = scope.lower() - assert item in items, '{} is not found. Support list: {}'.format(item, list(items.keys())) - - if not os.path.exists(path, item): - os.makedirs(os.path.join(path, item)) - - def _download(item, scope, silent=False): - if not silent: - print('downloading {}: {} from {}...'.format(item, scope, items[item][scope]), end='') - urllib.downloadxxx(items[item][scope]) - if not silent: - print('done!') - - if items['utils'] is not None: - _download(item, 'utils', silent=True) - - if scope != 'all': - assert scope in items[item], '{} is not found. Support scopes: {}'.format(item, list(items[item].keys())) - _download(item, scope) - else: - for s in items[item].keys(): - _download(item, s) - - -def ls(item=None, scope='all'): - pass - +from _downloader import * diff --git a/paddlepalm/mtl_controller.py b/paddlepalm/mtl_controller.py new file mode 100755 index 0000000..db9c9c8 --- /dev/null +++ b/paddlepalm/mtl_controller.py @@ -0,0 +1,747 @@ +# -*- coding: UTF-8 -*- +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import sys +import importlib +import multiprocessing +from paddle import fluid +from paddle.fluid import layers +import yaml +import json +import logging +import time +import numpy as np + +from paddlepalm.utils.saver import init_pretraining_params, init_checkpoint +from paddlepalm.utils.config_helper import PDConfig +from paddlepalm.utils.print_helper import print_dict +from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn, create_joint_iterator_fn, merge_input_attrs + +from paddlepalm.default_settings import * +from task_instance import TaskInstance, check_instances + +import Queue +from threading import Thread + +DEBUG=False +VERBOSE=0 + +def _get_basename(f): + return os.path.splitext(f)[0] + + +def _get_suffix(f): + return os.path.splitext(f)[-1] + + +def _parse_yaml(f, asdict=True, support_cmd_line=False): + assert os.path.exists(f), "file {} not found.".format(f) + if support_cmd_line: + args = PDConfig(yaml_file=f, fuse_args=True) + args.build() + return args.asdict() if asdict else args + else: + if asdict: + with open(f, "r") as fin: + yaml_config = yaml.load(fin, Loader=yaml.SafeLoader) + return yaml_config + else: + raise NotImplementedError() + + +def _parse_json(f, asdict=True, support_cmd_line=False): + assert os.path.exists(f), "file {} not found.".format(f) + if support_cmd_line: + args = PDConfig(json_file=f, fuse_args=support_cmd_line) + args.build() + return args.asdict() if asdict else args + else: + if asdict: + with open(f, "r") as fin: + config = json.load(fin) + return config + else: + raise NotImplementedError() + + +def _parse_list(string, astype=str): + assert isinstance(string, str), "{} is not a string.".format(string) + if ',' not in string: + return [astype(string)] + string = string.replace(',', ' ') + return [astype(i) for i in string.split()] + + +def _try_float(s): + try: + float(s) + return(float(s)) + except: + return s + + +def _check_conf(conf, checklist=None): + assert isinstance(conf, dict), "{} is not a dict.".format(conf) + ret = {} + for k,v in conf.items(): + if isinstance(v, str): + v = _try_float(v) + ret[k] = v + if checklist is not None: + for k, t in checklist: + assert k in ret, "required argument {} is NOT exist in config file.".format(k) + assert isintance(ret[k], t), "value type of argument {} should be {}".format(k, t) + return ret + + +# TODO: 增加None机制,允许hidden size、batch size和seqlen设置为None +def _check_io(in_attr, out_attr, strict=False, in_name="left", out_name="right"): + for name, attr in in_attr.items(): + assert name in out_attr, in_name+': '+name+' not found in '+out_name + if attr != out_attr[name]: + if strict: + raise ValueError(name+': shape or dtype not consistent!') + else: + logging.warning('{}: shape or dtype not consistent!\n{}:\n{}\n{}:\n{}'.format(name, in_name, attr, out_name, out_attr[name])) + + +def _merge_conf(conf1, conf2, conf1_first=True, strict=False): + assert isinstance(conf1, dict), "{} is not a dict.".format(conf1) + assert isinstance(conf2, dict), "{} is not a dict.".format(conf2) + base_conf = conf2 if conf1_first else conf1 + base_conf = base_conf.copy() + new_conf = conf1 if conf1_first else conf2 + + for k, v in new_conf.items(): + if k in base_conf: + if base_conf[k] != v: + raise Warning("value of argument {} has been updated to {}.".format(k, v)) + else: + if strict: + continue + + base_conf[k] = v + return base_conf + + +def _encode_inputs(inputs, scope_name, sep='/', cand_set=None): + outputs = {} + for k, v in inputs.items(): + if cand_set is not None: + if k in cand_set: + outputs[k] = v + if scope_name+sep+k in cand_set: + outputs[scope_name+sep+k] = v + else: + outputs[scope_name+sep+k] = v + return outputs + + +def _decode_inputs(inputs, scope_name, sep='/', keep_unk_keys=True): + outputs = {} + for name, value in inputs.items(): + # var for backbone are also available to tasks + if keep_unk_keys and sep not in name: + outputs[name] = value + # var for this inst + if name.startswith(scope_name+'/'): + outputs[name[len(scope_name+'/'):]] = value + return outputs + + +def _init_env(use_gpu): + if use_gpu: + place = fluid.CUDAPlace(0) + dev_count = fluid.core.get_cuda_device_count() + else: + place = fluid.CPUPlace() + dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + return fluid.Executor(place), dev_count + + +def _fit_attr(conf, fit_attr, strict=False): + for i, attr in fit_attr.items(): + if i not in conf: + if strict: + raise Exception('Argument {} is required to create a controller.'.format(i)) + else: + continue + conf[i] = attr(conf[i]) + return conf + + +class Controller(object): + + def __init__(self, config, task_dir='.', for_train=True): + """ + Args: + config: (str|dict) 字符串类型时,给出yaml格式的config配置文件路径; + """ + + self._for_train = for_train + assert isinstance(config, str) or isinstance(config, dict), "a config dict or config file path is required to create a Controller." + + if isinstance(config, str): + mtl_conf = _parse_yaml(config, support_cmd_line=True) + else: + mtl_conf = config + + mtl_conf = _check_conf(mtl_conf) + mtl_conf = _fit_attr(mtl_conf, REQUIRED_ARGS, strict=True) + mtl_conf = _fit_attr(mtl_conf, OPTIONAL_ARGS, strict=False) + + exe, dev_count = _init_env(use_gpu=mtl_conf.get('use_gpu', True)) + self.exe = exe + self.dev_count = dev_count + + print_dict(mtl_conf, title='global configuration') + + # parse task instances and target tags + instnames = _parse_list(mtl_conf['task_instance']) + assert len(instnames) == len(set(instnames)), "repeated task_instance is NOT supported." + num_instances = len(instnames) + self.num_instances = num_instances + + instname_to_conf = {} + instname_to_id = {} + for id, instname in enumerate(instnames): + instpath = os.path.join(task_dir, instname+'.yaml') + conf = _parse_yaml(instpath, support_cmd_line=False) + # conf = _check_conf(conf, TASK_INSTANCE_REQUIRED_ARGS) + conf = _check_conf(conf) + temp_conf = _merge_conf(mtl_conf, conf, strict=True) + print_dict(temp_conf, title='{} configuration'.format(instname)) + conf = _merge_conf(mtl_conf, conf) + + instname_to_conf[instname] = conf + instname_to_id[instname] = id + + # prepare backbone + if 'backbone_config_path' in mtl_conf: + bb_conf = _parse_json(mtl_conf['backbone_config_path']) + bb_conf = _merge_conf(mtl_conf, bb_conf) + else: + bb_conf = mtl_conf + print_dict(bb_conf, title = 'backbone configuration'.format(instname)) + + bb_name = mtl_conf['backbone'] + bb_mod = importlib.import_module(BACKBONE_DIR + '.' + bb_name) + Backbone = getattr(bb_mod, 'Model') + + # create task instances + instances = [] + for name in instnames: + instances.append(TaskInstance(name, instname_to_id[name], instname_to_conf[name])) + + check_instances(instances) + + # parse target_tag + if 'target_tag' in mtl_conf: + target_tag = str(mtl_conf['target_tag']) + tags = _parse_list(target_tag, astype=int) + assert len(tags) == len(instnames), "number of target_tag is NOT consistent with that in task_instance." + for tag, inst in zip(tags, instances): + inst.is_target = tag + else: + tags = [i.is_target for i in instances] + num_targets = sum(tags) + num_auxes = num_instances - num_targets + + # parse mix ratios + if 'mix_ratio' in mtl_conf: + mix_ratio = str(mtl_conf['mix_ratio']) + mrs = _parse_list(mix_ratio, astype=float) + assert len(mrs) == num_instances, "number of mix_ratios is NOT consistent with num_instances." + else: + mrs = [1.0] * num_instances + + for mr, inst in zip(mrs, instances): + inst.mix_ratio = mr + + # parse task layer reuse tags + instname_to_reusehost = {i:i for i in instnames} + if 'task_reuse_tag' in mtl_conf: + tags = _parse_list(mtl_conf['task_reuse_tag'], astype=int) + assert len(tags) == num_targets, 'number of reuse_tags is NOT consistent with number of instances.' + else: + tags = [] + mapper = {} + for inst in instances: + history = set() + history.add(inst.name) + cur_inst = inst + while True: + if cur_inst.task_reuse_scope in history: + mapper[inst.name] = len(tags) + break + elif cur_inst.task_reuse_scope in mapper: + mapper[inst.name] = mapper[cur_inst.task_reuse_scope] + break + else: + cur_inst = name_to_instance[cur_inst.task_reuse_scope] + history.add(cur_inst.name) + + tags.append(mapper[inst.name]) + + for i in range(1, num_instances): + for j in range(i): + if tags[i] == tags[j]: + assert instances[i].Paradigm == \ + instances[j].Paradigm, \ + "paradigm of reuse tasks should be consistent" + instances[i].task_reuse_scope = instances[j].name + break + + self.instances = instances + self.mrs = mrs + self.Backbone = Backbone + self.bb_conf = bb_conf + self.bb_name = bb_name + + self.has_init_train = False + self.has_init_pred = False + + if self._for_train: + print("initialing for training...") + self._init_train() + self.has_init_train = True + + def _init_train(self): + + instances = self.instances + Backbone = self.Backbone + bb_conf = self.bb_conf + bb_name = self.bb_name + dev_count = self.dev_count + num_instances = len(instances) + mrs = self.mrs + + # set first_target/main task instance + main_inst = None + for inst in instances: + if inst.is_target: + main_inst = inst + inst.is_first_target = True + break + main_conf = main_inst.config + if not os.path.exists(main_conf['save_path']): + os.makedirs(main_conf['save_path']) + os.makedirs(os.path.join(main_conf['save_path'], 'ckpt')) + + # prepare backbone + train_backbone = Backbone(bb_conf, phase='train') + pred_backbone = Backbone(bb_conf, phase='pred') + + # create reader, task + # then check i/o across reader, backbone and task_layer + task_attrs = [] + pred_task_attrs = [] + for inst in instances: + train_reader = inst.Reader(inst.config, phase='train') + inst.reader['train'] = train_reader + train_parad = inst.Paradigm(inst.config, phase='train', backbone_config=bb_conf) + inst.task_layer['train'] = train_parad + task_attr_from_reader = _encode_inputs(train_parad.inputs_attrs['reader'], inst.name) + task_attrs.append(task_attr_from_reader) + + _check_io(train_backbone.inputs_attr, train_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train') + _check_io(train_parad.inputs_attrs['reader'], train_reader.outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train') + _check_io(train_parad.inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone') + + if inst.is_target: + if 'pred_file' not in inst.config: + inst.config['pred_file'] = '' + pred_reader = inst.Reader(inst.config, phase='pred') + pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf) + inst.task_layer['pred'] = pred_parad + task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name) + pred_task_attrs.append(task_attr_from_reader) + _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred') + _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred') + _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone') + + # merge reader input attrs from backbone and task_instances + joint_input_names, joint_shape_and_dtypes, name_to_position = merge_input_attrs(train_backbone.inputs_attr, task_attrs) + pred_joint_input_names, pred_joint_shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, pred_task_attrs, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False) + # shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN] + + if DEBUG: + print('----- for debug -----') + print('joint input names:') + print(joint_input_names) + print('joint input shape and dtypes:') + print(joint_shape_and_dtypes) + + # load data + for inst in instances: + print(inst.name+": preparing data...", end='') + inst.reader['train'].load_data() + print('ok!') + + # merge dataset iterators and create net input vars + iterators = [] + prefixes = [] + mrs = [] + for inst in instances: + iterators.append(inst.reader['train'].iterator()) + prefixes.append(inst.name) + mrs.append(inst.mix_ratio) + + joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict') + self._joint_iterator_fn = joint_iterator_fn + + input_attrs = [[i, j, k] for i, (j,k) in zip(joint_input_names, joint_shape_and_dtypes)] + pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)] + # net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3) + net_inputs = create_net_inputs(input_attrs, async=False) + self._net_inputs = net_inputs + + # build backbone and task layers + train_prog = fluid.default_main_program() + train_init_prog = fluid.default_startup_program() + bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_') + assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys()) + + pred_prog = fluid.Program() + pred_init_prog = fluid.Program() + + with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog): + pred_net_inputs = create_net_inputs(pred_input_attrs) + pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_') + + fluid.framework.switch_main_program(train_prog) + fluid.framework.switch_startup_program(train_init_prog) + + task_output_vars = {} + for inst in instances: + task_inputs = {'backbone': bb_output_vars} + task_inputs_from_reader = _decode_inputs(net_inputs, inst.name) + task_inputs['reader'] = task_inputs_from_reader + + scope = inst.task_reuse_scope + '/' + with fluid.unique_name.guard(scope): + output_vars = inst.build_task_layer(task_inputs, phase='train', scope=scope) + output_vars = {inst.name+'/'+key: val for key, val in output_vars.items()} + old = len(task_output_vars) # for debug + task_output_vars.update(output_vars) + assert len(task_output_vars) - old == len(output_vars) # for debug + + # prepare predict vars for saving inference model + if inst.is_target: + + with fluid.program_guard(pred_prog, pred_init_prog): + cur_inputs = _decode_inputs(pred_net_inputs, inst.name) + inst.pred_input = cur_inputs + pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs} + scope = inst.task_reuse_scope + '/' + with fluid.unique_name.guard(scope): + inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope) + + + bb_fetches = {k: v.name for k,v in bb_output_vars.items()} + task_fetches = {k: v.name for k,v in task_output_vars.items()} + fetches = task_fetches + fetches['__task_id'] = net_inputs['__task_id'].name + + # compute loss + task_id_var = net_inputs['__task_id'] + task_id_vec = fluid.one_hot(task_id_var, num_instances) + losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0) + loss = layers.reduce_sum(task_id_vec * losses) + + main_reader = main_inst.reader['train'] + + num_examples = main_reader.num_examples + for inst in instances: + max_train_steps = int(main_conf['num_epochs']* inst.mix_ratio * (num_examples // main_conf['batch_size'] // dev_count)) + if inst.is_target: + print('{}: expected train steps {}.'.format(inst.name, max_train_steps)) + inst.steps_pur_epoch = inst.reader['train'].num_examples // main_conf['batch_size'] // dev_count + inst.expected_train_steps = max_train_steps + + global_max_train_steps = int(main_conf['num_epochs'] * sum(mrs) * (num_examples // main_conf['batch_size'] // dev_count)) + print('Estimated overall train steps {}.'.format(global_max_train_steps)) + + if 'warmup_proportion' in main_conf and main_conf['warmup_proportion'] > 0: + warmup_steps = int(global_max_train_steps * main_conf['warmup_proportion']) + print('Warmup steps: '+str(warmup_steps)) + else: + warmup_steps = 0 + + # build optimizer + if 'optimizer' in main_conf: + optim_mod = importlib.import_module(OPTIMIZER_DIR + '.' + main_conf['optimizer']) + optimize = getattr(optim_mod, OPTIMIZE_METHOD) + optimize(loss, main_conf, max_train_steps, warmup_steps, fluid.default_main_program()) + + loss.persistable = True + if main_conf.get('use_ema', False): + assert 'ema_decay' in main_conf, "ema_decay should be set when use_ema is enabled." + ema = fluid.optimizer.ExponentialMovingAverage(main_conf['ema_decay']) + ema.update() + + # prepare for train + self.train_backbone = train_backbone + self.train_program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(loss_name=loss.name) + self.saver_program = fluid.default_main_program() + + self.main_inst = main_inst + self.fetches = fetches + self.has_init_train = True + self.has_init_pred = True + + self.exe.run(fluid.default_startup_program()) + print("\nRandomly initialize parameters...\n") + + def _init_pred(self, instance, infer_model_path): + inst = instance + if 'pred_output_path' not in inst.config: + inst.config['pred_output_path'] = os.path.join(inst.config.get('save_path', '.'), inst.name) + + if not os.path.exists(inst.config['pred_output_path']): + os.makedirs(inst.config['pred_output_path']) + + pred_backbone = self.Backbone(self.bb_conf, phase='pred') + pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=self.bb_conf) + inst.task_layer['pred'] = pred_parad + pred_joint_input_names, pred_joint_shape_and_dtypes, name_to_position = merge_input_attrs( + pred_backbone.inputs_attr, inst.task_layer['pred'].inputs_attrs['reader'], + insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False) + + pred_prog = inst.load(infer_model_path) + if inst.reader['pred'] is None: + pred_reader = inst.Reader(inst.config, phase='pred') + inst.reader['pred'] = pred_reader + return pred_prog + + def load_pretrain(self, pretrain_path=None): + # load pretrain model (or ckpt) + if pretrain_path is None: + assert 'pretrain_path' in self.main_conf, "pretrain_path NOT set." + pretrain_path = self.main_conf['pretrain_path'] + + init_pretraining_params( + self.exe, + pretrain_path, + main_program=fluid.default_startup_program()) + + + def train(self): + + if not self.has_init_train: + self._init_train() + self.has_init_train = True + + instances = self.instances + num_instances = self.num_instances + main_inst = self.main_inst + main_conf = main_inst.config + + backbone = self.train_backbone + train_program = self.train_program + saver_program = self.saver_program + fetches = self.fetches + + finish = [] + for inst in instances: + if inst.is_target: + if inst.expected_train_steps > 0: + finish.append(False) + else: + finish.append(True) + print(inst.name+': train finished!') + inst.save() + + def train_finish(): + for inst in instances: + if inst.is_target: + if not inst.train_finish: + return False + return True + + def pack_multicard_feed(iterator, net_inputs, dev_count): + ret = [] + mask = [] + for i in range(dev_count): + temp = {} + content, flag = next(iterator) + for q, var in net_inputs.items(): + temp[var.name] = content[q] + ret.append(temp) + mask.append(1 if flag else 0) + return ret, mask + + # do training + fetch_names, fetch_list = zip(*fetches.items()) + + main_step = 0 # only count for main task + global_step = 0 # count for all tasks + epoch = 0 + time_begin = time.time() + backbone_buffer = [] + + def multi_dev_reader(reader, dev_count): + def worker(reader, dev_count, queue): + dev_batches = [] + for index, data in enumerate(reader()): + if len(dev_batches) < dev_count: + dev_batches.append(data) + if len(dev_batches) == dev_count: + queue.put((dev_batches, 0)) + dev_batches = [] + # For the prediction of the remained batches, pad more batches to + # the number of devices and the padded samples would be removed in + # prediction outputs. + if len(dev_batches) > 0: + num_pad = dev_count - len(dev_batches) + for i in range(len(dev_batches), dev_count): + dev_batches.append(dev_batches[-1]) + queue.put((dev_batches, num_pad)) + queue.put(None) + + queue = Queue.Queue(dev_count*2) + p = Thread( + target=worker, args=(reader, dev_count, queue)) + p.daemon = True + p.start() + while True: + ret = queue.get() + if ret is not None: + batches, num_pad = ret + queue.task_done() + for batch in batches: + flag = num_pad == 0 + if num_pad > 0: + num_pad -= 1 + yield batch, flag + else: + break + queue.join() + + joint_iterator = multi_dev_reader(self._joint_iterator_fn, self.dev_count) + + while not train_finish(): + feed, mask = pack_multicard_feed(joint_iterator, self._net_inputs, self.dev_count) + rt_outputs = self.exe.run(train_program, feed=feed, fetch_list=fetch_list) + rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)} + rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist() + rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id + cur_task = instances[rt_task_id] + + backbone_rt_outputs = {k:v for k,v in rt_outputs.items() if '/' not in k} + backbone_buffer.append(backbone.postprocess(backbone_rt_outputs)) + + task_rt_outputs = {k[len(cur_task.name+'/'):]: v for k,v in rt_outputs.items() if k.startswith(cur_task.name+'/')} + instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs) + + global_step += 1 + cur_task.cur_train_step += 1 + + cur_task_global_step = cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch + if cur_task.is_target and cur_task.save_infermodel_every_n_steps > 0 and cur_task_global_step % cur_task.save_infermodel_every_n_steps == 0: + cur_task.save(suffix='.step'+str(cur_task_global_step)) + + if global_step % main_conf.get('print_every_n_steps', 5) == 0: + loss = rt_outputs[cur_task.name+'/loss'] + loss = np.mean(np.squeeze(loss)).tolist() + + time_end = time.time() + time_cost = time_end - time_begin + + print("Global step: {}. Task: {}, step {}/{} (epoch {}), loss: {:.3f}, speed: {:.2f} steps/s".format( + global_step, cur_task.name, cur_task.cur_train_step, cur_task.steps_pur_epoch, cur_task.cur_train_epoch, + loss, main_conf.get('print_every_n_steps', 5) / time_cost)) + time_begin = time.time() + + if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps: + print(cur_task.name+': train finished!') + cur_task.save() + + if 'save_ckpt_every_n_steps' in main_conf and global_step % main_conf['save_ckpt_every_n_steps'] == 0: + save_path = os.path.join(main_conf['save_path'], 'ckpt', + "step_" + str(global_step)) + fluid.io.save_persistables(self.exe, save_path, saver_program) + print('checkpoint has been saved at '+save_path) + + save_path = os.path.join(main_conf['save_path'], 'ckpt', + "step_" + str(global_step)) + fluid.io.save_persistables(self.exe, save_path, saver_program) + print('checkpoint has been saved at '+save_path) + + print("ALL tasks train finished, exiting...") + + def pred(self, task_instance, inference_model_dir=None): + if self._for_train: + raise Exception('This controller is a trainer. Please build a new controller with for_train=False for predicting.') + + assert isinstance(task_instance, str) + if isinstance(inference_model_dir, str): + assert os.path.exists(inference_model_dir), inference_model_dir+" not found." + # if not self.has_init_pred and inference_model_dir is None: + # raise ValueError('infer_model_path is required for prediction.') + if inference_model_dir is None: + assert 'save_path' in self.mtl_conf, "one of the `inference_model_dir` and 'save_path' should be set to load inference model." + inference_model_dir = os.path.join(self.mtl_conf['save_path'], task_instance, 'infer_model') + + instance = None + for inst in self.instances: + if inst.name == task_instance: + instance = inst + break + + if instance is None: + raise ValueError(task_instance + ' is not a valid task_instance.') + + pred_prog = self._init_pred(instance, inference_model_dir) + + inst = instance + print(inst.name+": loading data...") + inst.reader['pred'].load_data() + fetch_names, fetch_vars = inst.pred_fetch_list + + print('predicting...') + mapper = {k:v for k,v in inst.pred_input} + buf = [] + for feed in inst.reader['pred'].iterator(): + feed = _encode_inputs(feed, inst.name, cand_set=mapper) + feed = {mapper[k]: v for k,v in feed.items()} + + rt_outputs = self.exe.run(pred_prog, feed, fetch_vars) + rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)} + inst.postprocess(rt_outputs, phase='pred') + if inst.task_layer['pred'].epoch_inputs_attrs: + reader_outputs = inst.reader['pred'].get_epoch_outputs() + else: + reader_outputs = None + inst.epoch_postprocess({'reader':reader_outputs}, phase='pred') + + +if __name__ == '__main__': + assert len(sys.argv) == 2, "Usage: python mtl_controller.py " + conf_path = sys.argv[1] + del sys.argv[1] + controller = Controller(conf_path) + if controller.main_conf['do_train']: + controller.train() + + + +__all__ = ["Controller"] + + + -- GitLab