提交 0a317161 编写于 作者: X xixiaoyao

release 0.3

上级 061b9e9a
../../paddlepalm/
\ No newline at end of file
/home/zhangyiming/yiming/v02-exe/pretrain
\ No newline at end of file
# coding=utf-8
import paddlepalm as palm
import json
if __name__ == '__main__':
max_seqlen = 512
batch_size = 32
batch_size = 3
num_epochs = 2
lr = 1e-3
vocab_path = './pretrain/ernie/vocab.txt'
train_file = './data/cls4mrqa/train.tsv'
config = json.load(open('./pretrain/ernie/ernie_config.json'))
print(config)
ernie = palm.backbone.ERNIE.from_config(config)
pred_ernie = palm.backbone.ERNIE.from_config(config, phase='pred')
# cls_reader2 = palm.reader.cls(train_file_topic, vocab_path, batch_size, max_seqlen)
# cls_reader3 = palm.reader.cls(train_file_subj, vocab_path, batch_size, max_seqlen)
# topic_trainer = palm.Trainer('topic_cls', cls_reader2, cls)
# subj_trainer = palm.Trainer('subj_cls', cls_reader3, cls)
# 创建该分类任务的reader,由诸多参数控制数据集读入格式、文件数量、预处理规则等
cls_reader = palm.reader.ClassifyReader(vocab_path, batch_size, max_seqlen)
print(cls_reader.outputs_attr)
# 不同的backbone会对任务reader有不同的特征要求,例如对于分类任务,基本的输入feature为token_ids和label_ids,但是对于BERT,还要求从输入中额外提取position、segment、input_mask等特征,因此经过register后,reader会自动补充backbone所要求的字段
cls_reader.register_with(ernie)
print(cls_reader.outputs_attr)
# 创建任务头(task head),如分类、匹配、机器阅读理解等。每个任务头有跟该任务相关的必选/可选参数。注意,任务头与reader是解耦合的,只要任务头依赖的数据集侧的字段能被reader提供,那么就是合法的
cls_head = palm.head.Classify(4, 1024, 0.1)
cls_pred_head = palm.head.Classify(4, 1024, 0.1, phase='pred')
# 根据reader和任务头来创建一个训练器trainer,trainer代表了一个训练任务,内部维护着训练进程、和任务的关键信息,并完成合法性校验,该任务的模型保存、载入等相关规则控制
trainer = palm.Trainer('senti_cls', cls_reader, cls_head, save_predict_model=True, \
pred_head=cls_pred_head, save_path='./output')
# match4mrqa.reuse_head_with(mrc4mrqa)
# data_vars = cls_reader.build()
# output_vars = ernie.build(data_vars)
# cls_head.build({'backbone': output_vars, 'reader': data_vars})
loss_var = trainer.build_forward(ernie, pred_ernie)
# controller.build_forward()
# Error! a head/backbone can be only build once! Try NOT to call build_forward method for any Trainer!
print(trainer.num_examples)
trainer.load_data(train_file, 'csv', num_epochs=2, batch_size=32)
print(trainer.num_examples)
n_steps = trainer.num_examples * num_epochs // batch_size
warmup_steps = int(0.1 * n_steps)
print(warmup_steps)
sched = palm.lr_sched.TriangularSchedualer(warmup_steps, n_steps)
adam = palm.optimizer.Adam(loss_var, lr, sched)
trainer.build_backward(optimizer=adam, weight_decay=0.001, \
use_ema=True, ema_decay=0.999)
trainer.random_init_params()
trainer.load_pretrain('../../pretrain_model/ernie/params')
# trainer.train_one_step()
# trainer.train_one_epoch()
trainer.train()
trainer.save()
match_reader = palm.reader.match(train_file, vocab, \
max_seqlen, file_format='csv', tokenizer='wordpiece', \
lang='en', shuffle_train=True)
mrc_reader = palm.reader.mrc(train_file, phase='train')
mlm_reader = palm.reader.mlm(train_file, phase='train')
palm.reader.
match = palm.tasktype.cls(num_classes=4)
mrc = palm.tasktype.match(learning_strategy='pairwise')
mlm = palm.tasktype.mlm()
mlm.print()
bb_flags = palm.load_json('./pretrain/ernie/ernie_config.json')
bb = palm.backbone.ernie(bb_flags['xx'], xxx)
bb.print()
match4mrqa = palm.Task('match4mrqa', match_reader, match_tt)
mrc4mrqa = palm.Task('match4mrqa', match_reader, match_tt)
# match4mrqa.reuse_with(mrc4mrqa)
controller = palm.Controller([mrqa, match4mrqa, mlm4mrqa])
# controller = palm.Controller([mrqa, match4mrqa, mlm4mrqa])
loss = controller.build_forward(bb, mask_task=[])
......
import downloader
from mtl_controller import Controller
import sys
from paddlepalm.mtl_controller import Controller
from paddlepalm.task_instance import Task
sys.path.append('paddlepalm')
del interface
del task_instance
del default_settings
del utils
del mtl_controller
\ No newline at end of file
# -*- coding: UTF-8 -*-
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import tarfile
import shutil
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
import ssl
__all__ = ["download", "ls"]
# for https
ssl._create_default_https_context = ssl._create_unverified_context
_items = {
'pretrain': {'ernie-en-uncased-large': 'https://ernie.bj.bcebos.com/ERNIE_Large_en_stable-2.0.0.tar.gz',
'bert-en-uncased-large': 'https://bert-models.bj.bcebos.com/uncased_L-24_H-1024_A-16.tar.gz',
'bert-en-uncased-base': 'https://bert-models.bj.bcebos.com/uncased_L-12_H-768_A-12.tar.gz',
'utils': None},
'reader': {'utils': None},
'backbone': {'utils': None},
'tasktype': {'utils': None},
}
def _download(item, scope, path, silent=False):
data_url = _items[item][scope]
if data_url == None:
return
if not silent:
print('Downloading {}: {} from {}...'.format(item, scope, data_url))
data_dir = path + '/' + item + '/' + scope
if not os.path.exists(data_dir):
os.makedirs(os.path.join(data_dir))
data_name = data_url.split('/')[-1]
filename = data_dir + '/' + data_name
# print process
def _chunk_report(bytes_so_far, total_size):
percent = float(bytes_so_far) / float(total_size)
if percent > 1:
percent = 1
if not silent:
print('\r>> Downloading... {:.1%}'.format(percent), end = "")
# copy to local
def _chunk_read(response, url, chunk_size = 16 * 1024, report_hook = None):
total_size = response.info().getheader('Content-Length').strip()
total_size = int(total_size)
bytes_so_far = 0
with open("%s" % filename, "wb") as f:
while 1:
chunk = response.read(chunk_size)
f.write(chunk)
f.flush()
bytes_so_far += len(chunk)
if not chunk:
break
if report_hook:
report_hook(bytes_so_far, total_size)
return bytes_so_far
response = urlopen(data_url)
_chunk_read(response, data_url, report_hook=_chunk_report)
if not silent:
print(' done!')
if item == 'pretrain':
if not silent:
print ('Extracting {}...'.format(data_name), end=" ")
if os.path.exists(filename):
tar = tarfile.open(filename, 'r')
tar.extractall(path = data_dir)
tar.close()
os.remove(filename)
if scope.startswith('bert'):
source_path = data_dir + '/' + data_name.split('.')[0]
fileList = os.listdir(source_path)
for file in fileList:
filePath = os.path.join(source_path, file)
shutil.move(filePath, data_dir)
os.removedirs(source_path)
if not silent:
print ('done!')
if not silent:
print ('Converting params...', end=" ")
_convert(data_dir, silent)
if not silent:
print ('done!')
def _convert(path, silent=False):
if os.path.isfile(path + '/params/__palminfo__'):
if not silent:
print ('already converted.')
else:
if os.path.exists(path + '/params/'):
os.rename(path + '/params/', path + '/params1/')
os.mkdir(path + '/params/')
tar_model = tarfile.open(path + '/params/' + '__palmmodel__', 'w')
tar_info = open(path + '/params/'+ '__palminfo__', 'w')
for root, dirs, files in os.walk(path + '/params1/'):
for file in files:
src_file = os.path.join(root, file)
tar_model.add(src_file, '__paddlepalm_' + file)
tar_info.write('__paddlepalm_' + file)
os.remove(src_file)
tar_model.close()
tar_info.close()
os.removedirs(path + '/params1/')
def download(item, scope='all', path='.'):
item = item.lower()
scope = scope.lower()
assert item in _items, '{} is not found. Support list: {}'.format(item, list(_items.keys()))
if _items[item]['utils'] is not None:
_download(item, 'utils', path, silent=True)
if scope != 'all':
assert scope in _items[item], '{} is not found. Support scopes: {}'.format(scope, list(_items[item].keys()))
_download(item, scope, path)
else:
for s in _items[item].keys():
_download(item, s, path)
def _ls(item, scope, l = 10):
if scope != 'all':
assert scope in _items[item], '{} is not found. Support scopes: {}'.format(scope, list(_items[item].keys()))
print ('{}'.format(scope))
else:
for s in _items[item].keys():
if s == 'utils':
continue
print (' => '+s)
def ls(item='all', scope='all'):
if scope == 'utils':
return
if item != 'all':
assert item in _items, '{} is not found. Support scopes: {}'.format(item, list(_items.keys()))
print ('Available {} items:'.format(item))
_ls(item, scope)
else:
l = max(map(len, _items.keys()))
for i in _items.keys():
print ('Available {} items: '.format(i))
_ls(i, scope, l)
......@@ -52,9 +52,9 @@ class Model(backbone):
@property
def inputs_attr(self):
return {"token_ids": [[-1, -1, 1], 'int64'],
"position_ids": [[-1, -1, 1], 'int64'],
"segment_ids": [[-1, -1, 1], 'int64'],
return {"token_ids": [[-1, -1], 'int64'],
"position_ids": [[-1, -1], 'int64'],
"segment_ids": [[-1, -1], 'int64'],
"input_mask": [[-1, -1, 1], 'float32']}
@property
......@@ -73,7 +73,7 @@ class Model(backbone):
self._emb_dtype = 'float32'
# padding id in vocabulary must be set to 0
emb_out = fluid.layers.embedding(
emb_out = fluid.embedding(
input=src_ids,
size=[self._voc_size, self._emb_size],
dtype=self._emb_dtype,
......@@ -84,14 +84,14 @@ class Model(backbone):
# fluid.global_scope().find_var('backbone-word_embedding').get_tensor()
embedding_table = fluid.default_main_program().global_block().var(scope_name+self._word_emb_name)
position_emb_out = fluid.layers.embedding(
position_emb_out = fluid.embedding(
input=pos_ids,
size=[self._max_position_seq_len, self._emb_size],
dtype=self._emb_dtype,
param_attr=fluid.ParamAttr(
name=scope_name+self._pos_emb_name, initializer=self._param_initializer))
sent_emb_out = fluid.layers.embedding(
sent_emb_out = fluid.embedding(
sent_ids,
size=[self._sent_types, self._emb_size],
dtype=self._emb_dtype,
......
......@@ -62,11 +62,11 @@ class Model(backbone):
@property
def inputs_attr(self):
return {"token_ids": [[-1, -1, 1], 'int64'],
"position_ids": [[-1, -1, 1], 'int64'],
"segment_ids": [[-1, -1, 1], 'int64'],
return {"token_ids": [[-1, -1], 'int64'],
"position_ids": [[-1, -1], 'int64'],
"segment_ids": [[-1, -1], 'int64'],
"input_mask": [[-1, -1, 1], 'float32'],
"task_ids": [[-1,-1, 1], 'int64']}
"task_ids": [[-1,-1], 'int64']}
@property
def outputs_attr(self):
......@@ -85,7 +85,7 @@ class Model(backbone):
task_ids = inputs['task_ids']
# padding id in vocabulary must be set to 0
emb_out = fluid.layers.embedding(
emb_out = fluid.embedding(
input=src_ids,
size=[self._voc_size, self._emb_size],
dtype=self._emb_dtype,
......@@ -96,14 +96,14 @@ class Model(backbone):
# fluid.global_scope().find_var('backbone-word_embedding').get_tensor()
embedding_table = fluid.default_main_program().global_block().var(scope_name+self._word_emb_name)
position_emb_out = fluid.layers.embedding(
position_emb_out = fluid.embedding(
input=pos_ids,
size=[self._max_position_seq_len, self._emb_size],
dtype=self._emb_dtype,
param_attr=fluid.ParamAttr(
name=scope_name+self._pos_emb_name, initializer=self._param_initializer))
sent_emb_out = fluid.layers.embedding(
sent_emb_out = fluid.embedding(
sent_ids,
size=[self._sent_types, self._emb_size],
dtype=self._emb_dtype,
......@@ -113,7 +113,7 @@ class Model(backbone):
emb_out = emb_out + position_emb_out
emb_out = emb_out + sent_emb_out
task_emb_out = fluid.layers.embedding(
task_emb_out = fluid.embedding(
task_ids,
size=[self._task_types, self._emb_size],
dtype=self._emb_dtype,
......
from __future__ import print_function
import os
items = {
'pretrain': {'ernie-en-uncased-large': 'http://xxxxx',
'xxx': 'xxx',
'utils': None}
'reader': {'cls': 'xxx',
'xxx': 'xxx',
'utils': 'xxx'}
'backbone': {xxx}
'tasktype': {xxx}
}
def download(item, scope='all', path='.'):
item = item.lower()
scope = scope.lower()
assert item in items, '{} is not found. Support list: {}'.format(item, list(items.keys()))
if not os.path.exists(path, item):
os.makedirs(os.path.join(path, item))
def _download(item, scope, silent=False):
if not silent:
print('downloading {}: {} from {}...'.format(item, scope, items[item][scope]), end='')
urllib.downloadxxx(items[item][scope])
if not silent:
print('done!')
if items['utils'] is not None:
_download(item, 'utils', silent=True)
if scope != 'all':
assert scope in items[item], '{} is not found. Support scopes: {}'.format(item, list(items[item].keys()))
_download(item, scope)
else:
for s in items[item].keys():
_download(item, s)
def ls(item=None, scope='all'):
pass
from _downloader import *
# -*- coding: UTF-8 -*-
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import importlib
import multiprocessing
from paddle import fluid
from paddle.fluid import layers
import yaml
import json
import logging
import time
import numpy as np
from paddlepalm.utils.saver import init_pretraining_params, init_checkpoint
from paddlepalm.utils.config_helper import PDConfig
from paddlepalm.utils.print_helper import print_dict
from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn, create_joint_iterator_fn, merge_input_attrs
from paddlepalm.default_settings import *
from task_instance import TaskInstance, check_instances
import Queue
from threading import Thread
DEBUG=False
VERBOSE=0
def _get_basename(f):
return os.path.splitext(f)[0]
def _get_suffix(f):
return os.path.splitext(f)[-1]
def _parse_yaml(f, asdict=True, support_cmd_line=False):
assert os.path.exists(f), "file {} not found.".format(f)
if support_cmd_line:
args = PDConfig(yaml_file=f, fuse_args=True)
args.build()
return args.asdict() if asdict else args
else:
if asdict:
with open(f, "r") as fin:
yaml_config = yaml.load(fin, Loader=yaml.SafeLoader)
return yaml_config
else:
raise NotImplementedError()
def _parse_json(f, asdict=True, support_cmd_line=False):
assert os.path.exists(f), "file {} not found.".format(f)
if support_cmd_line:
args = PDConfig(json_file=f, fuse_args=support_cmd_line)
args.build()
return args.asdict() if asdict else args
else:
if asdict:
with open(f, "r") as fin:
config = json.load(fin)
return config
else:
raise NotImplementedError()
def _parse_list(string, astype=str):
assert isinstance(string, str), "{} is not a string.".format(string)
if ',' not in string:
return [astype(string)]
string = string.replace(',', ' ')
return [astype(i) for i in string.split()]
def _try_float(s):
try:
float(s)
return(float(s))
except:
return s
def _check_conf(conf, checklist=None):
assert isinstance(conf, dict), "{} is not a dict.".format(conf)
ret = {}
for k,v in conf.items():
if isinstance(v, str):
v = _try_float(v)
ret[k] = v
if checklist is not None:
for k, t in checklist:
assert k in ret, "required argument {} is NOT exist in config file.".format(k)
assert isintance(ret[k], t), "value type of argument {} should be {}".format(k, t)
return ret
# TODO: 增加None机制,允许hidden size、batch size和seqlen设置为None
def _check_io(in_attr, out_attr, strict=False, in_name="left", out_name="right"):
for name, attr in in_attr.items():
assert name in out_attr, in_name+': '+name+' not found in '+out_name
if attr != out_attr[name]:
if strict:
raise ValueError(name+': shape or dtype not consistent!')
else:
logging.warning('{}: shape or dtype not consistent!\n{}:\n{}\n{}:\n{}'.format(name, in_name, attr, out_name, out_attr[name]))
def _merge_conf(conf1, conf2, conf1_first=True, strict=False):
assert isinstance(conf1, dict), "{} is not a dict.".format(conf1)
assert isinstance(conf2, dict), "{} is not a dict.".format(conf2)
base_conf = conf2 if conf1_first else conf1
base_conf = base_conf.copy()
new_conf = conf1 if conf1_first else conf2
for k, v in new_conf.items():
if k in base_conf:
if base_conf[k] != v:
raise Warning("value of argument {} has been updated to {}.".format(k, v))
else:
if strict:
continue
base_conf[k] = v
return base_conf
def _encode_inputs(inputs, scope_name, sep='/', cand_set=None):
outputs = {}
for k, v in inputs.items():
if cand_set is not None:
if k in cand_set:
outputs[k] = v
if scope_name+sep+k in cand_set:
outputs[scope_name+sep+k] = v
else:
outputs[scope_name+sep+k] = v
return outputs
def _decode_inputs(inputs, scope_name, sep='/', keep_unk_keys=True):
outputs = {}
for name, value in inputs.items():
# var for backbone are also available to tasks
if keep_unk_keys and sep not in name:
outputs[name] = value
# var for this inst
if name.startswith(scope_name+'/'):
outputs[name[len(scope_name+'/'):]] = value
return outputs
def _init_env(use_gpu):
if use_gpu:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
else:
place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
return fluid.Executor(place), dev_count
def _fit_attr(conf, fit_attr, strict=False):
for i, attr in fit_attr.items():
if i not in conf:
if strict:
raise Exception('Argument {} is required to create a controller.'.format(i))
else:
continue
conf[i] = attr(conf[i])
return conf
class Controller(object):
def __init__(self, config, task_dir='.', for_train=True):
"""
Args:
config: (str|dict) 字符串类型时,给出yaml格式的config配置文件路径;
"""
self._for_train = for_train
assert isinstance(config, str) or isinstance(config, dict), "a config dict or config file path is required to create a Controller."
if isinstance(config, str):
mtl_conf = _parse_yaml(config, support_cmd_line=True)
else:
mtl_conf = config
mtl_conf = _check_conf(mtl_conf)
mtl_conf = _fit_attr(mtl_conf, REQUIRED_ARGS, strict=True)
mtl_conf = _fit_attr(mtl_conf, OPTIONAL_ARGS, strict=False)
exe, dev_count = _init_env(use_gpu=mtl_conf.get('use_gpu', True))
self.exe = exe
self.dev_count = dev_count
print_dict(mtl_conf, title='global configuration')
# parse task instances and target tags
instnames = _parse_list(mtl_conf['task_instance'])
assert len(instnames) == len(set(instnames)), "repeated task_instance is NOT supported."
num_instances = len(instnames)
self.num_instances = num_instances
instname_to_conf = {}
instname_to_id = {}
for id, instname in enumerate(instnames):
instpath = os.path.join(task_dir, instname+'.yaml')
conf = _parse_yaml(instpath, support_cmd_line=False)
# conf = _check_conf(conf, TASK_INSTANCE_REQUIRED_ARGS)
conf = _check_conf(conf)
temp_conf = _merge_conf(mtl_conf, conf, strict=True)
print_dict(temp_conf, title='{} configuration'.format(instname))
conf = _merge_conf(mtl_conf, conf)
instname_to_conf[instname] = conf
instname_to_id[instname] = id
# prepare backbone
if 'backbone_config_path' in mtl_conf:
bb_conf = _parse_json(mtl_conf['backbone_config_path'])
bb_conf = _merge_conf(mtl_conf, bb_conf)
else:
bb_conf = mtl_conf
print_dict(bb_conf, title = 'backbone configuration'.format(instname))
bb_name = mtl_conf['backbone']
bb_mod = importlib.import_module(BACKBONE_DIR + '.' + bb_name)
Backbone = getattr(bb_mod, 'Model')
# create task instances
instances = []
for name in instnames:
instances.append(TaskInstance(name, instname_to_id[name], instname_to_conf[name]))
check_instances(instances)
# parse target_tag
if 'target_tag' in mtl_conf:
target_tag = str(mtl_conf['target_tag'])
tags = _parse_list(target_tag, astype=int)
assert len(tags) == len(instnames), "number of target_tag is NOT consistent with that in task_instance."
for tag, inst in zip(tags, instances):
inst.is_target = tag
else:
tags = [i.is_target for i in instances]
num_targets = sum(tags)
num_auxes = num_instances - num_targets
# parse mix ratios
if 'mix_ratio' in mtl_conf:
mix_ratio = str(mtl_conf['mix_ratio'])
mrs = _parse_list(mix_ratio, astype=float)
assert len(mrs) == num_instances, "number of mix_ratios is NOT consistent with num_instances."
else:
mrs = [1.0] * num_instances
for mr, inst in zip(mrs, instances):
inst.mix_ratio = mr
# parse task layer reuse tags
instname_to_reusehost = {i:i for i in instnames}
if 'task_reuse_tag' in mtl_conf:
tags = _parse_list(mtl_conf['task_reuse_tag'], astype=int)
assert len(tags) == num_targets, 'number of reuse_tags is NOT consistent with number of instances.'
else:
tags = []
mapper = {}
for inst in instances:
history = set()
history.add(inst.name)
cur_inst = inst
while True:
if cur_inst.task_reuse_scope in history:
mapper[inst.name] = len(tags)
break
elif cur_inst.task_reuse_scope in mapper:
mapper[inst.name] = mapper[cur_inst.task_reuse_scope]
break
else:
cur_inst = name_to_instance[cur_inst.task_reuse_scope]
history.add(cur_inst.name)
tags.append(mapper[inst.name])
for i in range(1, num_instances):
for j in range(i):
if tags[i] == tags[j]:
assert instances[i].Paradigm == \
instances[j].Paradigm, \
"paradigm of reuse tasks should be consistent"
instances[i].task_reuse_scope = instances[j].name
break
self.instances = instances
self.mrs = mrs
self.Backbone = Backbone
self.bb_conf = bb_conf
self.bb_name = bb_name
self.has_init_train = False
self.has_init_pred = False
if self._for_train:
print("initialing for training...")
self._init_train()
self.has_init_train = True
def _init_train(self):
instances = self.instances
Backbone = self.Backbone
bb_conf = self.bb_conf
bb_name = self.bb_name
dev_count = self.dev_count
num_instances = len(instances)
mrs = self.mrs
# set first_target/main task instance
main_inst = None
for inst in instances:
if inst.is_target:
main_inst = inst
inst.is_first_target = True
break
main_conf = main_inst.config
if not os.path.exists(main_conf['save_path']):
os.makedirs(main_conf['save_path'])
os.makedirs(os.path.join(main_conf['save_path'], 'ckpt'))
# prepare backbone
train_backbone = Backbone(bb_conf, phase='train')
pred_backbone = Backbone(bb_conf, phase='pred')
# create reader, task
# then check i/o across reader, backbone and task_layer
task_attrs = []
pred_task_attrs = []
for inst in instances:
train_reader = inst.Reader(inst.config, phase='train')
inst.reader['train'] = train_reader
train_parad = inst.Paradigm(inst.config, phase='train', backbone_config=bb_conf)
inst.task_layer['train'] = train_parad
task_attr_from_reader = _encode_inputs(train_parad.inputs_attrs['reader'], inst.name)
task_attrs.append(task_attr_from_reader)
_check_io(train_backbone.inputs_attr, train_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train')
_check_io(train_parad.inputs_attrs['reader'], train_reader.outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train')
_check_io(train_parad.inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone')
if inst.is_target:
if 'pred_file' not in inst.config:
inst.config['pred_file'] = ''
pred_reader = inst.Reader(inst.config, phase='pred')
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf)
inst.task_layer['pred'] = pred_parad
task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name)
pred_task_attrs.append(task_attr_from_reader)
_check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
# merge reader input attrs from backbone and task_instances
joint_input_names, joint_shape_and_dtypes, name_to_position = merge_input_attrs(train_backbone.inputs_attr, task_attrs)
pred_joint_input_names, pred_joint_shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, pred_task_attrs, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
# shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN]
if DEBUG:
print('----- for debug -----')
print('joint input names:')
print(joint_input_names)
print('joint input shape and dtypes:')
print(joint_shape_and_dtypes)
# load data
for inst in instances:
print(inst.name+": preparing data...", end='')
inst.reader['train'].load_data()
print('ok!')
# merge dataset iterators and create net input vars
iterators = []
prefixes = []
mrs = []
for inst in instances:
iterators.append(inst.reader['train'].iterator())
prefixes.append(inst.name)
mrs.append(inst.mix_ratio)
joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict')
self._joint_iterator_fn = joint_iterator_fn
input_attrs = [[i, j, k] for i, (j,k) in zip(joint_input_names, joint_shape_and_dtypes)]
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)]
# net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3)
net_inputs = create_net_inputs(input_attrs, async=False)
self._net_inputs = net_inputs
# build backbone and task layers
train_prog = fluid.default_main_program()
train_init_prog = fluid.default_startup_program()
bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_')
assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys())
pred_prog = fluid.Program()
pred_init_prog = fluid.Program()
with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog):
pred_net_inputs = create_net_inputs(pred_input_attrs)
pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
fluid.framework.switch_main_program(train_prog)
fluid.framework.switch_startup_program(train_init_prog)
task_output_vars = {}
for inst in instances:
task_inputs = {'backbone': bb_output_vars}
task_inputs_from_reader = _decode_inputs(net_inputs, inst.name)
task_inputs['reader'] = task_inputs_from_reader
scope = inst.task_reuse_scope + '/'
with fluid.unique_name.guard(scope):
output_vars = inst.build_task_layer(task_inputs, phase='train', scope=scope)
output_vars = {inst.name+'/'+key: val for key, val in output_vars.items()}
old = len(task_output_vars) # for debug
task_output_vars.update(output_vars)
assert len(task_output_vars) - old == len(output_vars) # for debug
# prepare predict vars for saving inference model
if inst.is_target:
with fluid.program_guard(pred_prog, pred_init_prog):
cur_inputs = _decode_inputs(pred_net_inputs, inst.name)
inst.pred_input = cur_inputs
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = inst.task_reuse_scope + '/'
with fluid.unique_name.guard(scope):
inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope)
bb_fetches = {k: v.name for k,v in bb_output_vars.items()}
task_fetches = {k: v.name for k,v in task_output_vars.items()}
fetches = task_fetches
fetches['__task_id'] = net_inputs['__task_id'].name
# compute loss
task_id_var = net_inputs['__task_id']
task_id_vec = fluid.one_hot(task_id_var, num_instances)
losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0)
loss = layers.reduce_sum(task_id_vec * losses)
main_reader = main_inst.reader['train']
num_examples = main_reader.num_examples
for inst in instances:
max_train_steps = int(main_conf['num_epochs']* inst.mix_ratio * (num_examples // main_conf['batch_size'] // dev_count))
if inst.is_target:
print('{}: expected train steps {}.'.format(inst.name, max_train_steps))
inst.steps_pur_epoch = inst.reader['train'].num_examples // main_conf['batch_size'] // dev_count
inst.expected_train_steps = max_train_steps
global_max_train_steps = int(main_conf['num_epochs'] * sum(mrs) * (num_examples // main_conf['batch_size'] // dev_count))
print('Estimated overall train steps {}.'.format(global_max_train_steps))
if 'warmup_proportion' in main_conf and main_conf['warmup_proportion'] > 0:
warmup_steps = int(global_max_train_steps * main_conf['warmup_proportion'])
print('Warmup steps: '+str(warmup_steps))
else:
warmup_steps = 0
# build optimizer
if 'optimizer' in main_conf:
optim_mod = importlib.import_module(OPTIMIZER_DIR + '.' + main_conf['optimizer'])
optimize = getattr(optim_mod, OPTIMIZE_METHOD)
optimize(loss, main_conf, max_train_steps, warmup_steps, fluid.default_main_program())
loss.persistable = True
if main_conf.get('use_ema', False):
assert 'ema_decay' in main_conf, "ema_decay should be set when use_ema is enabled."
ema = fluid.optimizer.ExponentialMovingAverage(main_conf['ema_decay'])
ema.update()
# prepare for train
self.train_backbone = train_backbone
self.train_program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(loss_name=loss.name)
self.saver_program = fluid.default_main_program()
self.main_inst = main_inst
self.fetches = fetches
self.has_init_train = True
self.has_init_pred = True
self.exe.run(fluid.default_startup_program())
print("\nRandomly initialize parameters...\n")
def _init_pred(self, instance, infer_model_path):
inst = instance
if 'pred_output_path' not in inst.config:
inst.config['pred_output_path'] = os.path.join(inst.config.get('save_path', '.'), inst.name)
if not os.path.exists(inst.config['pred_output_path']):
os.makedirs(inst.config['pred_output_path'])
pred_backbone = self.Backbone(self.bb_conf, phase='pred')
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=self.bb_conf)
inst.task_layer['pred'] = pred_parad
pred_joint_input_names, pred_joint_shape_and_dtypes, name_to_position = merge_input_attrs(
pred_backbone.inputs_attr, inst.task_layer['pred'].inputs_attrs['reader'],
insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
pred_prog = inst.load(infer_model_path)
if inst.reader['pred'] is None:
pred_reader = inst.Reader(inst.config, phase='pred')
inst.reader['pred'] = pred_reader
return pred_prog
def load_pretrain(self, pretrain_path=None):
# load pretrain model (or ckpt)
if pretrain_path is None:
assert 'pretrain_path' in self.main_conf, "pretrain_path NOT set."
pretrain_path = self.main_conf['pretrain_path']
init_pretraining_params(
self.exe,
pretrain_path,
main_program=fluid.default_startup_program())
def train(self):
if not self.has_init_train:
self._init_train()
self.has_init_train = True
instances = self.instances
num_instances = self.num_instances
main_inst = self.main_inst
main_conf = main_inst.config
backbone = self.train_backbone
train_program = self.train_program
saver_program = self.saver_program
fetches = self.fetches
finish = []
for inst in instances:
if inst.is_target:
if inst.expected_train_steps > 0:
finish.append(False)
else:
finish.append(True)
print(inst.name+': train finished!')
inst.save()
def train_finish():
for inst in instances:
if inst.is_target:
if not inst.train_finish:
return False
return True
def pack_multicard_feed(iterator, net_inputs, dev_count):
ret = []
mask = []
for i in range(dev_count):
temp = {}
content, flag = next(iterator)
for q, var in net_inputs.items():
temp[var.name] = content[q]
ret.append(temp)
mask.append(1 if flag else 0)
return ret, mask
# do training
fetch_names, fetch_list = zip(*fetches.items())
main_step = 0 # only count for main task
global_step = 0 # count for all tasks
epoch = 0
time_begin = time.time()
backbone_buffer = []
def multi_dev_reader(reader, dev_count):
def worker(reader, dev_count, queue):
dev_batches = []
for index, data in enumerate(reader()):
if len(dev_batches) < dev_count:
dev_batches.append(data)
if len(dev_batches) == dev_count:
queue.put((dev_batches, 0))
dev_batches = []
# For the prediction of the remained batches, pad more batches to
# the number of devices and the padded samples would be removed in
# prediction outputs.
if len(dev_batches) > 0:
num_pad = dev_count - len(dev_batches)
for i in range(len(dev_batches), dev_count):
dev_batches.append(dev_batches[-1])
queue.put((dev_batches, num_pad))
queue.put(None)
queue = Queue.Queue(dev_count*2)
p = Thread(
target=worker, args=(reader, dev_count, queue))
p.daemon = True
p.start()
while True:
ret = queue.get()
if ret is not None:
batches, num_pad = ret
queue.task_done()
for batch in batches:
flag = num_pad == 0
if num_pad > 0:
num_pad -= 1
yield batch, flag
else:
break
queue.join()
joint_iterator = multi_dev_reader(self._joint_iterator_fn, self.dev_count)
while not train_finish():
feed, mask = pack_multicard_feed(joint_iterator, self._net_inputs, self.dev_count)
rt_outputs = self.exe.run(train_program, feed=feed, fetch_list=fetch_list)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist()
rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id
cur_task = instances[rt_task_id]
backbone_rt_outputs = {k:v for k,v in rt_outputs.items() if '/' not in k}
backbone_buffer.append(backbone.postprocess(backbone_rt_outputs))
task_rt_outputs = {k[len(cur_task.name+'/'):]: v for k,v in rt_outputs.items() if k.startswith(cur_task.name+'/')}
instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs)
global_step += 1
cur_task.cur_train_step += 1
cur_task_global_step = cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch
if cur_task.is_target and cur_task.save_infermodel_every_n_steps > 0 and cur_task_global_step % cur_task.save_infermodel_every_n_steps == 0:
cur_task.save(suffix='.step'+str(cur_task_global_step))
if global_step % main_conf.get('print_every_n_steps', 5) == 0:
loss = rt_outputs[cur_task.name+'/loss']
loss = np.mean(np.squeeze(loss)).tolist()
time_end = time.time()
time_cost = time_end - time_begin
print("Global step: {}. Task: {}, step {}/{} (epoch {}), loss: {:.3f}, speed: {:.2f} steps/s".format(
global_step, cur_task.name, cur_task.cur_train_step, cur_task.steps_pur_epoch, cur_task.cur_train_epoch,
loss, main_conf.get('print_every_n_steps', 5) / time_cost))
time_begin = time.time()
if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps:
print(cur_task.name+': train finished!')
cur_task.save()
if 'save_ckpt_every_n_steps' in main_conf and global_step % main_conf['save_ckpt_every_n_steps'] == 0:
save_path = os.path.join(main_conf['save_path'], 'ckpt',
"step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program)
print('checkpoint has been saved at '+save_path)
save_path = os.path.join(main_conf['save_path'], 'ckpt',
"step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program)
print('checkpoint has been saved at '+save_path)
print("ALL tasks train finished, exiting...")
def pred(self, task_instance, inference_model_dir=None):
if self._for_train:
raise Exception('This controller is a trainer. Please build a new controller with for_train=False for predicting.')
assert isinstance(task_instance, str)
if isinstance(inference_model_dir, str):
assert os.path.exists(inference_model_dir), inference_model_dir+" not found."
# if not self.has_init_pred and inference_model_dir is None:
# raise ValueError('infer_model_path is required for prediction.')
if inference_model_dir is None:
assert 'save_path' in self.mtl_conf, "one of the `inference_model_dir` and 'save_path' should be set to load inference model."
inference_model_dir = os.path.join(self.mtl_conf['save_path'], task_instance, 'infer_model')
instance = None
for inst in self.instances:
if inst.name == task_instance:
instance = inst
break
if instance is None:
raise ValueError(task_instance + ' is not a valid task_instance.')
pred_prog = self._init_pred(instance, inference_model_dir)
inst = instance
print(inst.name+": loading data...")
inst.reader['pred'].load_data()
fetch_names, fetch_vars = inst.pred_fetch_list
print('predicting...')
mapper = {k:v for k,v in inst.pred_input}
buf = []
for feed in inst.reader['pred'].iterator():
feed = _encode_inputs(feed, inst.name, cand_set=mapper)
feed = {mapper[k]: v for k,v in feed.items()}
rt_outputs = self.exe.run(pred_prog, feed, fetch_vars)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
inst.postprocess(rt_outputs, phase='pred')
if inst.task_layer['pred'].epoch_inputs_attrs:
reader_outputs = inst.reader['pred'].get_epoch_outputs()
else:
reader_outputs = None
inst.epoch_postprocess({'reader':reader_outputs}, phase='pred')
if __name__ == '__main__':
assert len(sys.argv) == 2, "Usage: python mtl_controller.py <mtl_conf_path>"
conf_path = sys.argv[1]
del sys.argv[1]
controller = Controller(conf_path)
if controller.main_conf['do_train']:
controller.train()
__all__ = ["Controller"]
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册