提交 af31077d 编写于 作者: X xixiaoyao

release 0.3

上级 0a317161
import downloader
from mtl_controller import Controller
# from mtl_controller import Controller
import controller
import optimizer
import lr_sched
import backbone
import reader
import head
from trainer import Trainer
del interface
del task_instance
del default_settings
del utils
del mtl_controller
\ No newline at end of file
from ernie import ERNIE
from bert import BERT
......@@ -14,76 +14,8 @@
# limitations under the License.
"""v1.1"""
class reader(object):
"""interface of data manager."""
def __init__(self, config):
assert isinstance(config, dict)
# @property
# def inputs_attr(self):
# """描述reader输入对象的属性,包含各个对象的名字、shape以及数据类型。当某个对象为标量数据类型(如str, int, float等)时,shape设置为空列表[],当某个对象的某个维度长度可变时,shape中的相应维度设置为-1.
# Return:
# dict类型。对各个输入对象的属性描述。例如,
# 对于文本分类任务,可能需要包含输入文本和所属标签的id
# {"text": ([], 'str'),
# "label": ([], 'int')}
# 对于标注任务,可能需要输入词序列和对应的标签
# {"tokens", ([-1], 'str'),
# "tags", ([-1], 'str')}
# 对于机器阅读理解任务,可能需要包含上下文、问题、回答、答案区域的起止位置等
# {"paragraph", ([], 'str'),
# "question", ([], 'str'),
# "start_position", ([], 'int')
# """
# raise NotImplementedError()
@property
def outputs_attr(self):
"""描述reader输出对象(被yield出的对象)的属性,包含各个对象的名字、shape以及数据类型。当某个对象为标量数据类型(如str, int, float等)时,shape设置为空列表[],当某个对象的某个维度长度可变时,shape中的相应维度设置为-1。
注意:当使用mini-batch梯度下降学习策略时,,应为常规的输入对象设置batch_size维度(一般为-1)
Return:
dict类型。对各个输入对象的属性描述。例如,
对于文本分类和匹配任务,yield的输出内容可能包含如下的对象(下游backbone和task可按需访问其中的对象)
{"token_ids": ([-1, max_len], 'int64'),
"input_ids": ([-1, max_len], 'int64'),
"segment_ids": ([-1, max_len], 'int64'),
"input_mask": ([-1, max_len], 'float32'),
"label": ([-1], 'int')}
"""
raise NotImplementedError()
# def parse_line(self):
# """框架内部使用字典描述每个样本,字典的key为inputs_attr,value为每个input对应的符合attr描述的值。
# 该函数负责将文本行解析成符合inputs_attr描述的字典类型的样本。默认的parse_line方法会读取json格式的数据集文件,数据集的每一行为json格式描述的样本。
# 用户可通过对该方法的继承改写来适配不同格式的数据集,例如csv格式甚至tfrecord文件。
# """
# raise NotImplementedError()
#
# def tokenize(self, line):
# """框架中内置了word piece tokenizer等分词器,用户可通过修改tokenizer超参数来制定使用的分词器,若内置的分词器均无法满足需求,用户可通过对该方法的继承改写来自定义分词器。
# Args:
# - line: a unicode string.
# Return:
# a list of tokens
# """
# raise NotImplementedError()
def iterator(self):
"""数据集遍历接口,注意,当数据集遍历到尾部时该接口应自动完成指针重置,即重新从数据集头部开始新的遍历。
Yield:
(dict) elements that meet the requirements in output_templete
"""
raise NotImplementedError()
@property
def num_examples(self):
"""数据集中的样本数量,即每个epoch中iterator所生成的样本数。注意,使用滑动窗口等可能导致数据集样本数发生变化的策略时,该接口应返回runtime阶段的实际样本数。"""
raise NotImplementedError()
class backbone(object):
class BaseBackbone(object):
"""interface of backbone model."""
def __init__(self, config, phase):
......
......@@ -23,12 +23,44 @@ from paddle import fluid
from paddle.fluid import layers
from paddlepalm.backbone.utils.transformer import pre_process_layer, encoder
from paddlepalm.interface import backbone
class Model(backbone):
def __init__(self, config, phase):
from paddlepalm.backbone.base_backbone import BaseBackbone
class BERT(BaseBackbone):
def __init__(hidden_size, num_hidden_layers, num_attention_heads, vocab_size, \
max_position_embeddings, type_vocab_size, hidden_act, hidden_dropout_prob, \
attention_probs_dropout_prob, initializer_range, phase='train'):
config = {}
config['hidden_size'] = hidden_size
config['num_hidden_layers'] = num_hidden_layers
config['num_attention_heads'] = num_attention_heads
config['vocab_size'] = vocab_size
config['max_position_embeddings'] = max_position_embeddings
config['type_vocab_size'] = sent_type_vocab_size
config['hidden_act'] = hidden_act
config['hidden_dropout_prob'] = hidden_dropout_prob
config['attention_probs_dropout_prob'] = attention_probs_dropout_prob
config['initializer_range'] = initializer_range
self.from_config(config, phase=phase)
@classmethod
def from_config(self, config, phase='train'):
assert 'hidden_size' in config, "{} is required to initialize ERNIE".format('')
assert 'num_hidden_layers' in config, "{} is required to initialize ERNIE".format('num_hidden_layers')
assert 'num_attention_heads' in config, "{} is required to initialize ERNIE".format('num_attention_heads')
assert 'vocab_size' in config, "{} is required to initialize ERNIE".format('vocab_size')
assert 'max_position_embeddings' in config, "{} is required to initialize ERNIE".format('max_position_embeddings')
assert 'sent_type_vocab_size' in config or 'type_vocab_size' in config, \
"{} is required to initialize ERNIE".format('type_vocab_size')
assert 'hidden_act' in config, "{} is required to initialize ERNIE".format('hidden_act')
assert 'hidden_dropout_prob' in config, "{} is required to initialize ERNIE".format('hidden_dropout_prob')
assert 'attention_probs_dropout_prob' in config, \
"{} is required to initialize ERNIE".format('attention_probs_dropout_prob')
assert 'initializer_range' in config, "{} is required to initialize ERNIE".format('initializer_range')
# self._is_training = phase == 'train' # backbone一般不用关心运行阶段,因为outputs在任何阶段基本不会变
self._emb_size = config["hidden_size"]
......@@ -153,3 +185,9 @@ class Model(backbone):
pass
class Model(BERT):
"""BERT wrapper for ConfigController"""
def __init__(self, config, phase):
BERT.from_config(config, phase=phase)
......@@ -24,32 +24,30 @@ from paddle import fluid
from paddle.fluid import layers
from paddlepalm.backbone.utils.transformer import pre_process_layer, encoder
from paddlepalm.interface import backbone
from paddlepalm.backbone.base_backbone import BaseBackbone
class Model(backbone):
def __init__(self,
config,
phase):
class ERNIE(BaseBackbone):
def __init__(self, hidden_size, num_hidden_layers, num_attention_heads, vocab_size, \
max_position_embeddings, sent_type_vocab_size, task_type_vocab_size, \
hidden_act, hidden_dropout_prob, attention_probs_dropout_prob, initializer_range, phase='train'):
# self._is_training = phase == 'train' # backbone一般不用关心运行阶段,因为outputs在任何阶段基本不会变
self._emb_size = config['hidden_size']
self._n_layer = config['num_hidden_layers']
self._n_head = config['num_attention_heads']
self._voc_size = config['vocab_size']
self._max_position_seq_len = config['max_position_embeddings']
if config['sent_type_vocab_size']:
self._sent_types = config['sent_type_vocab_size']
else:
self._sent_types = config['type_vocab_size']
self._emb_size = hidden_size
self._n_layer = num_hidden_layers
self._n_head = num_attention_heads
self._voc_size = vocab_size
self._max_position_seq_len = max_position_embeddings
self._sent_types = sent_type_vocab_size
self._task_types = config['task_type_vocab_size']
self._task_types = task_type_vocab_size
self._hidden_act = config['hidden_act']
self._prepostprocess_dropout = config['hidden_dropout_prob']
self._attention_dropout = config['attention_probs_dropout_prob']
self._hidden_act = hidden_act
self._prepostprocess_dropout = hidden_dropout_prob
self._attention_dropout = attention_probs_dropout_prob
self._word_emb_name = "word_embedding"
self._pos_emb_name = "pos_embedding"
......@@ -58,7 +56,40 @@ class Model(backbone):
self._emb_dtype = "float32"
self._param_initializer = fluid.initializer.TruncatedNormal(
scale=config['initializer_range'])
scale=initializer_range)
@classmethod
def from_config(cls, config, phase='train'):
assert 'hidden_size' in config, "{} is required to initialize ERNIE".format('hidden_size')
assert 'num_hidden_layers' in config, "{} is required to initialize ERNIE".format('num_hidden_layers')
assert 'num_attention_heads' in config, "{} is required to initialize ERNIE".format('num_attention_heads')
assert 'vocab_size' in config, "{} is required to initialize ERNIE".format('vocab_size')
assert 'max_position_embeddings' in config, "{} is required to initialize ERNIE".format('max_position_embeddings')
assert 'sent_type_vocab_size' in config or 'type_vocab_size' in config, "{} is required to initialize ERNIE".format('sent_type_vocab_size')
assert 'task_type_vocab_size' in config, "{} is required to initialize ERNIE".format('task_type_vocab_size')
assert 'hidden_act' in config, "{} is required to initialize ERNIE".format('hidden_act')
assert 'hidden_dropout_prob' in config, "{} is required to initialize ERNIE".format('hidden_dropout_prob')
assert 'attention_probs_dropout_prob' in config, "{} is required to initialize ERNIE".format('attention_probs_dropout_prob')
assert 'initializer_range' in config, "{} is required to initialize ERNIE".format('initializer_range')
hidden_size = config['hidden_size']
num_hidden_layers = config['num_hidden_layers']
num_attention_heads = config['num_attention_heads']
vocab_size = config['vocab_size']
max_position_embeddings = config['max_position_embeddings']
if 'sent_type_vocab_size' in config:
sent_type_vocab_size = config['sent_type_vocab_size']
else:
sent_type_vocab_size = config['type_vocab_size']
task_type_vocab_size = config['task_type_vocab_size']
hidden_act = config['hidden_act']
hidden_dropout_prob = config['hidden_dropout_prob']
attention_probs_dropout_prob = config['attention_probs_dropout_prob']
initializer_range = config['initializer_range']
return cls(hidden_size, num_hidden_layers, num_attention_heads, vocab_size, \
max_position_embeddings, sent_type_vocab_size, task_type_vocab_size, \
hidden_act, hidden_dropout_prob, attention_probs_dropout_prob, initializer_range, phase=phase)
@property
def inputs_attr(self):
......@@ -173,3 +204,12 @@ class Model(backbone):
def postprocess(self, rt_outputs):
pass
class Model(ERNIE):
def __init__(self, config, phase):
ERNIE.from_config(config, phase=phase)
from conf_controller import ConfigController
from controller import Controller
......@@ -35,6 +35,9 @@ from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn
from paddlepalm.default_settings import *
from paddlepalm.task_instance import TaskInstance, check_instances
import Queue
from threading import Thread
DEBUG=False
VERBOSE=0
......@@ -182,7 +185,7 @@ def _fit_attr(conf, fit_attr, strict=False):
return conf
class ConfController(object):
class ConfigController(object):
def __init__(self, config, task_dir='.', for_train=True):
"""
......@@ -234,7 +237,7 @@ class ConfController(object):
bb_conf = _merge_conf(mtl_conf, bb_conf)
else:
bb_conf = mtl_conf
print_dict(bb_conf, title='backbone configuration'.format(instname))
print_dict(bb_conf, title = 'backbone configuration'.format(instname))
bb_name = mtl_conf['backbone']
bb_mod = importlib.import_module(BACKBONE_DIR + '.' + bb_name)
......@@ -338,6 +341,7 @@ class ConfController(object):
main_conf = main_inst.config
if not os.path.exists(main_conf['save_path']):
os.makedirs(main_conf['save_path'])
os.makedirs(os.path.join(main_conf['save_path'], 'ckpt'))
# prepare backbone
train_backbone = Backbone(bb_conf, phase='train')
......@@ -398,11 +402,14 @@ class ConfController(object):
prefixes.append(inst.name)
mrs.append(inst.mix_ratio)
joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE)
joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict')
self._joint_iterator_fn = joint_iterator_fn
input_attrs = [[i, j, k] for i, (j,k) in zip(joint_input_names, joint_shape_and_dtypes)]
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)]
net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3)
# net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3)
net_inputs = create_net_inputs(input_attrs, async=False)
self._net_inputs = net_inputs
# build backbone and task layers
train_prog = fluid.default_main_program()
......@@ -453,7 +460,7 @@ class ConfController(object):
# compute loss
task_id_var = net_inputs['__task_id']
task_id_vec = layers.one_hot(task_id_var, num_instances)
task_id_vec = fluid.one_hot(task_id_var, num_instances)
losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0)
loss = layers.reduce_sum(task_id_vec * losses)
......@@ -522,15 +529,15 @@ class ConfController(object):
inst.reader['pred'] = pred_reader
return pred_prog
def load_pretrain(self, pretrain_model_path=None):
def load_pretrain(self, pretrain_path=None):
# load pretrain model (or ckpt)
if pretrain_model_path is None:
assert 'pretrain_model_path' in self.main_conf, "pretrain_model_path NOT set."
pretrain_model_path = self.main_conf['pretrain_model_path']
if pretrain_path is None:
assert 'pretrain_path' in self.main_conf, "pretrain_path NOT set."
pretrain_path = self.main_conf['pretrain_path']
init_pretraining_params(
self.exe,
pretrain_model_path,
pretrain_path,
main_program=fluid.default_startup_program())
......@@ -567,6 +574,18 @@ class ConfController(object):
return False
return True
def pack_multicard_feed(iterator, net_inputs, dev_count):
ret = []
mask = []
for i in range(dev_count):
temp = {}
content, flag = next(iterator)
for q, var in net_inputs.items():
temp[var.name] = content[q]
ret.append(temp)
mask.append(1 if flag else 0)
return ret, mask
# do training
fetch_names, fetch_list = zip(*fetches.items())
......@@ -575,8 +594,50 @@ class ConfController(object):
epoch = 0
time_begin = time.time()
backbone_buffer = []
def multi_dev_reader(reader, dev_count):
def worker(reader, dev_count, queue):
dev_batches = []
for index, data in enumerate(reader()):
if len(dev_batches) < dev_count:
dev_batches.append(data)
if len(dev_batches) == dev_count:
queue.put((dev_batches, 0))
dev_batches = []
# For the prediction of the remained batches, pad more batches to
# the number of devices and the padded samples would be removed in
# prediction outputs.
if len(dev_batches) > 0:
num_pad = dev_count - len(dev_batches)
for i in range(len(dev_batches), dev_count):
dev_batches.append(dev_batches[-1])
queue.put((dev_batches, num_pad))
queue.put(None)
queue = Queue.Queue(dev_count*2)
p = Thread(
target=worker, args=(reader, dev_count, queue))
p.daemon = True
p.start()
while True:
ret = queue.get()
if ret is not None:
batches, num_pad = ret
queue.task_done()
for batch in batches:
flag = num_pad == 0
if num_pad > 0:
num_pad -= 1
yield batch, flag
else:
break
queue.join()
joint_iterator = multi_dev_reader(self._joint_iterator_fn, self.dev_count)
while not train_finish():
rt_outputs = self.exe.run(train_program, fetch_list=fetch_list)
feed, mask = pack_multicard_feed(joint_iterator, self._net_inputs, self.dev_count)
rt_outputs = self.exe.run(train_program, feed=feed, fetch_list=fetch_list)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist()
rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id
......@@ -591,8 +652,9 @@ class ConfController(object):
global_step += 1
cur_task.cur_train_step += 1
if cur_task.save_infermodel_every_n_steps > 0 and cur_task.cur_train_step % cur_task.save_infermodel_every_n_steps == 0:
cur_task.save(suffix='.step'+str(cur_task.cur_train_step))
cur_task_global_step = cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch
if cur_task.is_target and cur_task.save_infermodel_every_n_steps > 0 and cur_task_global_step % cur_task.save_infermodel_every_n_steps == 0:
cur_task.save(suffix='.step'+str(cur_task_global_step))
if global_step % main_conf.get('print_every_n_steps', 5) == 0:
loss = rt_outputs[cur_task.name+'/loss']
......@@ -610,10 +672,16 @@ class ConfController(object):
print(cur_task.name+': train finished!')
cur_task.save()
if 'save_every_n_steps' in main_conf and global_step % main_conf['save_every_n_steps'] == 0:
save_path = os.path.join(main_conf['save_path'],
if 'save_ckpt_every_n_steps' in main_conf and global_step % main_conf['save_ckpt_every_n_steps'] == 0:
save_path = os.path.join(main_conf['save_path'], 'ckpt',
"step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program)
print('checkpoint has been saved at '+save_path)
save_path = os.path.join(main_conf['save_path'], 'ckpt',
"step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program)
print('checkpoint has been saved at '+save_path)
print("ALL tasks train finished, exiting...")
......@@ -673,6 +741,7 @@ if __name__ == '__main__':
__all__ = ["Controller"]
......@@ -415,9 +415,6 @@ class Controller(object):
return loss, max_train_steps
def build_backward(self, optimizer, use_ema=False, ema_decay=0.9999):
# build optimizer
optimizer.optimize(fluid.default_main_program())
......
from cls import Classify
# from match import Match
# from mrc import MRC
# from mlm import MaskLM
......@@ -14,13 +14,15 @@
# limitations under the License.
class task(object):
class BaseHead(object):
def __init__(self, config, phase, backbone_config):
"""
config: dict类型。描述了 任务实例(task instance)+多任务配置文件 中定义超参数
phase: str类型。运行阶段,目前支持train和predict
"""
self._stop_gradient = {}
self._prog = None
@property
def inputs_attrs(self):
......@@ -43,6 +45,17 @@ class task(object):
def epoch_inputs_attrs(self):
return {}
# def stop_gradient(source, inputs):
# # if self._inputs is None:
# # raise Exception('You need to build this head first before stop gradient.')
# self._inputs = inputs
# for name, var in self._inputs[source].items():
# # cur_block = self._prog.current_block()
# var = fluid.layers.assign(var)
# var.stop_gradient = True
# self._inputs[name] = var
# return self._inputs
def build(self, inputs, scope_name=""):
"""建立task_layer的计算图。将符合inputs_attrs描述的来自各个对象集的静态图Variables映射成符合outputs_attr描述的静态图Variable输出。
Args:
......@@ -53,6 +66,7 @@ class task(object):
"""
raise NotImplementedError()
def postprocess(self, rt_outputs):
"""每个训练或推理step后针对当前batch的task_layer的runtime计算结果进行相关后处理。注意,rt_outputs除了包含build方法,还自动包含了loss的计算结果。"""
pass
......
......@@ -15,51 +15,47 @@
import paddle.fluid as fluid
from paddle.fluid import layers
from paddlepalm.base_task import base_task
from paddlepalm.head.base_head import BaseHead
import numpy as np
import os
def classify(num_classes, input_dim, dropout_prob, pred_output_dir=None, param_initializer_range=0.02, phase='train'):
config = {
'num_classes': num_classes,
'hidden_size': input_dim,
'dropout_prob': dropout_prob,
'pred_output_dir': pred_output_dir,
'initializer_range': param_initializer_range
}
return Task(config, phase, config)
# def classify(num_classes, input_dim, dropout_prob, pred_output_dir=None, param_initializer_range=0.02, phase='train'):
#
# config = {
# 'num_classes': num_classes,
# 'hidden_size': input_dim,
# 'dropout_prob': dropout_prob,
# 'pred_output_dir': pred_output_dir,
# 'initializer_range': param_initializer_range
# }
#
# return Task(config, phase, config)
class Task(base_task):
class Classify(BaseHead):
'''
classification
'''
def __init__(self, config, phase, backbone_config=None):
# def __init__(self, config, phase, backbone_config=None):
def __init__(self, num_classes, input_dim, dropout_prob=0.0, \
param_initializer_range=0.02, phase='train'):
self._is_training = phase == 'train'
self._hidden_size = backbone_config['hidden_size']
self.num_classes = config['num_classes']
self._hidden_size = input_dim
if 'initializer_range' in config:
self._param_initializer = config['initializer_range']
else:
self.num_classes = num_classes
self._dropout_prob = dropout_prob if phase == 'train' else 0.0
self._param_initializer = fluid.initializer.TruncatedNormal(
scale=backbone_config.get('initializer_range', 0.02))
if 'dropout_prob' in config:
self._dropout_prob = config['dropout_prob']
else:
self._dropout_prob = backbone_config.get('hidden_dropout_prob', 0.0)
self._pred_output_path = config.get('pred_output_dir', None)
scale=param_initializer_range)
self._preds = []
@property
def inputs_attrs(self):
if self._is_training:
reader = {"label_ids": [[-1], 'int64']}
else:
reader = {}
bb = {"sentence_embedding": [[-1, self._hidden_size], 'float32']}
if self._is_training:
reader["label_ids"] = [[-1], 'int64']
return {'reader': reader, 'backbone': bb}
@property
......@@ -96,11 +92,12 @@ class Task(base_task):
else:
return {"logits":logits}
def postprocess(self, rt_outputs):
def batch_postprocess(self, rt_outputs):
if not self._is_training:
logits = rt_outputs['logits']
preds = np.argmax(logits, -1)
self._preds.extend(preds.tolist())
return preds
def epoch_postprocess(self, post_inputs):
# there is no post_inputs needed and not declared in epoch_inputs_attrs, hence no elements exist in post_inputs
......
from slanted_triangular_schedualer import TriangularSchedualer
from warmup_schedualer import WarmupSchedualer
# scheduled_lr = fluid.layers.learning_rate_scheduler\
# .noam_decay(1/(warmup_steps *(config['learning_rate'] ** 2)),
# warmup_steps)
class BaseSchedualer():
def __init__(self):
self._prog = None
def _set_prog(self, prog):
self._prog = prog
def build(self, learning_rate):
raise NotImplementedError()
from paddlepalm.lr_sched.schedualer import BaseSchedualer
from paddle import fluid
class TriangularSchedualer(BaseSchedualer):
""" Applies linear warmup of learning rate from 0 to learning_rate until warmup_steps, and then decay to 0 linearly until num_train_steps."""
def __init__(self, warmup_steps, num_train_steps):
BaseSchedualer.__init__(self)
assert num_train_steps > warmup_steps > 0
self.warmup_steps = warmup_steps
self.num_train_steps = num_train_steps
def build(self, learning_rate):
with self._prog._lr_schedule_guard():
lr = fluid.layers.tensor.create_global_var(
shape=[1],
value=0.0,
dtype='float32',
persistable=True,
name="scheduled_learning_rate")
global_step = fluid.layers.learning_rate_scheduler._decay_step_counter()
with fluid.layers.control_flow.Switch() as switch:
with switch.case(global_step < self.warmup_steps):
warmup_lr = learning_rate * (global_step / self.warmup_steps)
fluid.layers.tensor.assign(warmup_lr, lr)
with switch.default():
decayed_lr = fluid.layers.learning_rate_scheduler.polynomial_decay(
learning_rate=learning_rate,
decay_steps=self.num_train_steps,
end_learning_rate=0.0,
power=1.0,
cycle=False)
fluid.layers.tensor.assign(decayed_lr, lr)
return lr
from paddlepalm.lr_sched.schedualer import BaseSchedualer
def WarmupSchedualer(BaseSchedualer):
""" Applies linear warmup of learning rate from 0 to learning_rate until warmup_steps, and then decay to 0 linearly until num_train_steps."""
def __init__(self, warmup_steps):
schedualer.__init__(self)
self.warmup_steps = warmup_steps
def build(self, learning_rate):
with self._prog._lr_schedule_guard():
lr = fluid.layers.tensor.create_global_var(
shape=[1],
value=0.0,
dtype='float32',
persistable=True,
name="scheduled_learning_rate")
global_step = fluid.layers.learning_rate_scheduler._decay_step_counter()
with fluid.layers.control_flow.Switch() as switch:
with switch.case(global_step < self.warmup_steps):
warmup_lr = learning_rate * (global_step / self.warmup_steps)
fluid.layers.tensor.assign(warmup_lr, lr)
with switch.default():
fluid.layers.tensor.assign(learning_rate, lr)
return lr
# -*- coding: UTF-8 -*-
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import importlib
import multiprocessing
from paddle import fluid
from paddle.fluid import layers
import yaml
import json
import logging
import time
import numpy as np
from paddlepalm.utils.saver import init_pretraining_params, init_checkpoint
from paddlepalm.utils.config_helper import PDConfig
from paddlepalm.utils.print_helper import print_dict
from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn, create_joint_iterator_fn, merge_input_attrs
from paddlepalm.default_settings import *
from task_instance import TaskInstance, check_instances
import Queue
from threading import Thread
DEBUG=False
VERBOSE=0
def _get_basename(f):
return os.path.splitext(f)[0]
def _get_suffix(f):
return os.path.splitext(f)[-1]
def _parse_yaml(f, asdict=True, support_cmd_line=False):
assert os.path.exists(f), "file {} not found.".format(f)
if support_cmd_line:
args = PDConfig(yaml_file=f, fuse_args=True)
args.build()
return args.asdict() if asdict else args
else:
if asdict:
with open(f, "r") as fin:
yaml_config = yaml.load(fin, Loader=yaml.SafeLoader)
return yaml_config
else:
raise NotImplementedError()
def _parse_json(f, asdict=True, support_cmd_line=False):
assert os.path.exists(f), "file {} not found.".format(f)
if support_cmd_line:
args = PDConfig(json_file=f, fuse_args=support_cmd_line)
args.build()
return args.asdict() if asdict else args
else:
if asdict:
with open(f, "r") as fin:
config = json.load(fin)
return config
else:
raise NotImplementedError()
def _parse_list(string, astype=str):
assert isinstance(string, str), "{} is not a string.".format(string)
if ',' not in string:
return [astype(string)]
string = string.replace(',', ' ')
return [astype(i) for i in string.split()]
def _try_float(s):
try:
float(s)
return(float(s))
except:
return s
def _check_conf(conf, checklist=None):
assert isinstance(conf, dict), "{} is not a dict.".format(conf)
ret = {}
for k,v in conf.items():
if isinstance(v, str):
v = _try_float(v)
ret[k] = v
if checklist is not None:
for k, t in checklist:
assert k in ret, "required argument {} is NOT exist in config file.".format(k)
assert isintance(ret[k], t), "value type of argument {} should be {}".format(k, t)
return ret
# TODO: 增加None机制,允许hidden size、batch size和seqlen设置为None
def _check_io(in_attr, out_attr, strict=False, in_name="left", out_name="right"):
for name, attr in in_attr.items():
assert name in out_attr, in_name+': '+name+' not found in '+out_name
if attr != out_attr[name]:
if strict:
raise ValueError(name+': shape or dtype not consistent!')
else:
logging.warning('{}: shape or dtype not consistent!\n{}:\n{}\n{}:\n{}'.format(name, in_name, attr, out_name, out_attr[name]))
def _merge_conf(conf1, conf2, conf1_first=True, strict=False):
assert isinstance(conf1, dict), "{} is not a dict.".format(conf1)
assert isinstance(conf2, dict), "{} is not a dict.".format(conf2)
base_conf = conf2 if conf1_first else conf1
base_conf = base_conf.copy()
new_conf = conf1 if conf1_first else conf2
for k, v in new_conf.items():
if k in base_conf:
if base_conf[k] != v:
raise Warning("value of argument {} has been updated to {}.".format(k, v))
else:
if strict:
continue
base_conf[k] = v
return base_conf
def _encode_inputs(inputs, scope_name, sep='/', cand_set=None):
outputs = {}
for k, v in inputs.items():
if cand_set is not None:
if k in cand_set:
outputs[k] = v
if scope_name+sep+k in cand_set:
outputs[scope_name+sep+k] = v
else:
outputs[scope_name+sep+k] = v
return outputs
def _decode_inputs(inputs, scope_name, sep='/', keep_unk_keys=True):
outputs = {}
for name, value in inputs.items():
# var for backbone are also available to tasks
if keep_unk_keys and sep not in name:
outputs[name] = value
# var for this inst
if name.startswith(scope_name+'/'):
outputs[name[len(scope_name+'/'):]] = value
return outputs
def _init_env(use_gpu):
if use_gpu:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
else:
place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
return fluid.Executor(place), dev_count
def _fit_attr(conf, fit_attr, strict=False):
for i, attr in fit_attr.items():
if i not in conf:
if strict:
raise Exception('Argument {} is required to create a controller.'.format(i))
else:
continue
conf[i] = attr(conf[i])
return conf
class Controller(object):
def __init__(self, config, task_dir='.', for_train=True):
"""
Args:
config: (str|dict) 字符串类型时,给出yaml格式的config配置文件路径;
"""
self._for_train = for_train
assert isinstance(config, str) or isinstance(config, dict), "a config dict or config file path is required to create a Controller."
if isinstance(config, str):
mtl_conf = _parse_yaml(config, support_cmd_line=True)
else:
mtl_conf = config
mtl_conf = _check_conf(mtl_conf)
mtl_conf = _fit_attr(mtl_conf, REQUIRED_ARGS, strict=True)
mtl_conf = _fit_attr(mtl_conf, OPTIONAL_ARGS, strict=False)
exe, dev_count = _init_env(use_gpu=mtl_conf.get('use_gpu', True))
self.exe = exe
self.dev_count = dev_count
print_dict(mtl_conf, title='global configuration')
# parse task instances and target tags
instnames = _parse_list(mtl_conf['task_instance'])
assert len(instnames) == len(set(instnames)), "repeated task_instance is NOT supported."
num_instances = len(instnames)
self.num_instances = num_instances
instname_to_conf = {}
instname_to_id = {}
for id, instname in enumerate(instnames):
instpath = os.path.join(task_dir, instname+'.yaml')
conf = _parse_yaml(instpath, support_cmd_line=False)
# conf = _check_conf(conf, TASK_INSTANCE_REQUIRED_ARGS)
conf = _check_conf(conf)
temp_conf = _merge_conf(mtl_conf, conf, strict=True)
print_dict(temp_conf, title='{} configuration'.format(instname))
conf = _merge_conf(mtl_conf, conf)
instname_to_conf[instname] = conf
instname_to_id[instname] = id
# prepare backbone
if 'backbone_config_path' in mtl_conf:
bb_conf = _parse_json(mtl_conf['backbone_config_path'])
bb_conf = _merge_conf(mtl_conf, bb_conf)
else:
bb_conf = mtl_conf
print_dict(bb_conf, title = 'backbone configuration'.format(instname))
bb_name = mtl_conf['backbone']
bb_mod = importlib.import_module(BACKBONE_DIR + '.' + bb_name)
Backbone = getattr(bb_mod, 'Model')
# create task instances
instances = []
for name in instnames:
instances.append(TaskInstance(name, instname_to_id[name], instname_to_conf[name]))
check_instances(instances)
# parse target_tag
if 'target_tag' in mtl_conf:
target_tag = str(mtl_conf['target_tag'])
tags = _parse_list(target_tag, astype=int)
assert len(tags) == len(instnames), "number of target_tag is NOT consistent with that in task_instance."
for tag, inst in zip(tags, instances):
inst.is_target = tag
else:
tags = [i.is_target for i in instances]
num_targets = sum(tags)
num_auxes = num_instances - num_targets
# parse mix ratios
if 'mix_ratio' in mtl_conf:
mix_ratio = str(mtl_conf['mix_ratio'])
mrs = _parse_list(mix_ratio, astype=float)
assert len(mrs) == num_instances, "number of mix_ratios is NOT consistent with num_instances."
else:
mrs = [1.0] * num_instances
for mr, inst in zip(mrs, instances):
inst.mix_ratio = mr
# parse task layer reuse tags
instname_to_reusehost = {i:i for i in instnames}
if 'task_reuse_tag' in mtl_conf:
tags = _parse_list(mtl_conf['task_reuse_tag'], astype=int)
assert len(tags) == num_targets, 'number of reuse_tags is NOT consistent with number of instances.'
else:
tags = []
mapper = {}
for inst in instances:
history = set()
history.add(inst.name)
cur_inst = inst
while True:
if cur_inst.task_reuse_scope in history:
mapper[inst.name] = len(tags)
break
elif cur_inst.task_reuse_scope in mapper:
mapper[inst.name] = mapper[cur_inst.task_reuse_scope]
break
else:
cur_inst = name_to_instance[cur_inst.task_reuse_scope]
history.add(cur_inst.name)
tags.append(mapper[inst.name])
for i in range(1, num_instances):
for j in range(i):
if tags[i] == tags[j]:
assert instances[i].Paradigm == \
instances[j].Paradigm, \
"paradigm of reuse tasks should be consistent"
instances[i].task_reuse_scope = instances[j].name
break
self.instances = instances
self.mrs = mrs
self.Backbone = Backbone
self.bb_conf = bb_conf
self.bb_name = bb_name
self.has_init_train = False
self.has_init_pred = False
if self._for_train:
print("initialing for training...")
self._init_train()
self.has_init_train = True
def _init_train(self):
instances = self.instances
Backbone = self.Backbone
bb_conf = self.bb_conf
bb_name = self.bb_name
dev_count = self.dev_count
num_instances = len(instances)
mrs = self.mrs
# set first_target/main task instance
main_inst = None
for inst in instances:
if inst.is_target:
main_inst = inst
inst.is_first_target = True
break
main_conf = main_inst.config
if not os.path.exists(main_conf['save_path']):
os.makedirs(main_conf['save_path'])
os.makedirs(os.path.join(main_conf['save_path'], 'ckpt'))
# prepare backbone
train_backbone = Backbone(bb_conf, phase='train')
pred_backbone = Backbone(bb_conf, phase='pred')
# create reader, task
# then check i/o across reader, backbone and task_layer
task_attrs = []
pred_task_attrs = []
for inst in instances:
train_reader = inst.Reader(inst.config, phase='train')
inst.reader['train'] = train_reader
train_parad = inst.Paradigm(inst.config, phase='train', backbone_config=bb_conf)
inst.task_layer['train'] = train_parad
task_attr_from_reader = _encode_inputs(train_parad.inputs_attrs['reader'], inst.name)
task_attrs.append(task_attr_from_reader)
_check_io(train_backbone.inputs_attr, train_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train')
_check_io(train_parad.inputs_attrs['reader'], train_reader.outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train')
_check_io(train_parad.inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone')
if inst.is_target:
if 'pred_file' not in inst.config:
inst.config['pred_file'] = ''
pred_reader = inst.Reader(inst.config, phase='pred')
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf)
inst.task_layer['pred'] = pred_parad
task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name)
pred_task_attrs.append(task_attr_from_reader)
_check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
# merge reader input attrs from backbone and task_instances
joint_input_names, joint_shape_and_dtypes, name_to_position = merge_input_attrs(train_backbone.inputs_attr, task_attrs)
pred_joint_input_names, pred_joint_shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, pred_task_attrs, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
# shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN]
if DEBUG:
print('----- for debug -----')
print('joint input names:')
print(joint_input_names)
print('joint input shape and dtypes:')
print(joint_shape_and_dtypes)
# load data
for inst in instances:
print(inst.name+": preparing data...", end='')
inst.reader['train'].load_data()
print('ok!')
# merge dataset iterators and create net input vars
iterators = []
prefixes = []
mrs = []
for inst in instances:
iterators.append(inst.reader['train'].iterator())
prefixes.append(inst.name)
mrs.append(inst.mix_ratio)
joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict')
self._joint_iterator_fn = joint_iterator_fn
input_attrs = [[i, j, k] for i, (j,k) in zip(joint_input_names, joint_shape_and_dtypes)]
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)]
# net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3)
net_inputs = create_net_inputs(input_attrs, async=False)
self._net_inputs = net_inputs
# build backbone and task layers
train_prog = fluid.default_main_program()
train_init_prog = fluid.default_startup_program()
bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_')
assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys())
pred_prog = fluid.Program()
pred_init_prog = fluid.Program()
with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog):
pred_net_inputs = create_net_inputs(pred_input_attrs)
pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
fluid.framework.switch_main_program(train_prog)
fluid.framework.switch_startup_program(train_init_prog)
task_output_vars = {}
for inst in instances:
task_inputs = {'backbone': bb_output_vars}
task_inputs_from_reader = _decode_inputs(net_inputs, inst.name)
task_inputs['reader'] = task_inputs_from_reader
scope = inst.task_reuse_scope + '/'
with fluid.unique_name.guard(scope):
output_vars = inst.build_task_layer(task_inputs, phase='train', scope=scope)
output_vars = {inst.name+'/'+key: val for key, val in output_vars.items()}
old = len(task_output_vars) # for debug
task_output_vars.update(output_vars)
assert len(task_output_vars) - old == len(output_vars) # for debug
# prepare predict vars for saving inference model
if inst.is_target:
with fluid.program_guard(pred_prog, pred_init_prog):
cur_inputs = _decode_inputs(pred_net_inputs, inst.name)
inst.pred_input = cur_inputs
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = inst.task_reuse_scope + '/'
with fluid.unique_name.guard(scope):
inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope)
bb_fetches = {k: v.name for k,v in bb_output_vars.items()}
task_fetches = {k: v.name for k,v in task_output_vars.items()}
fetches = task_fetches
fetches['__task_id'] = net_inputs['__task_id'].name
# compute loss
task_id_var = net_inputs['__task_id']
task_id_vec = fluid.one_hot(task_id_var, num_instances)
losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0)
loss = layers.reduce_sum(task_id_vec * losses)
main_reader = main_inst.reader['train']
num_examples = main_reader.num_examples
for inst in instances:
max_train_steps = int(main_conf['num_epochs']* inst.mix_ratio * (num_examples // main_conf['batch_size'] // dev_count))
if inst.is_target:
print('{}: expected train steps {}.'.format(inst.name, max_train_steps))
inst.steps_pur_epoch = inst.reader['train'].num_examples // main_conf['batch_size'] // dev_count
inst.expected_train_steps = max_train_steps
global_max_train_steps = int(main_conf['num_epochs'] * sum(mrs) * (num_examples // main_conf['batch_size'] // dev_count))
print('Estimated overall train steps {}.'.format(global_max_train_steps))
if 'warmup_proportion' in main_conf and main_conf['warmup_proportion'] > 0:
warmup_steps = int(global_max_train_steps * main_conf['warmup_proportion'])
print('Warmup steps: '+str(warmup_steps))
else:
warmup_steps = 0
# build optimizer
if 'optimizer' in main_conf:
optim_mod = importlib.import_module(OPTIMIZER_DIR + '.' + main_conf['optimizer'])
optimize = getattr(optim_mod, OPTIMIZE_METHOD)
optimize(loss, main_conf, max_train_steps, warmup_steps, fluid.default_main_program())
loss.persistable = True
if main_conf.get('use_ema', False):
assert 'ema_decay' in main_conf, "ema_decay should be set when use_ema is enabled."
ema = fluid.optimizer.ExponentialMovingAverage(main_conf['ema_decay'])
ema.update()
# prepare for train
self.train_backbone = train_backbone
self.train_program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(loss_name=loss.name)
self.saver_program = fluid.default_main_program()
self.main_inst = main_inst
self.fetches = fetches
self.has_init_train = True
self.has_init_pred = True
self.exe.run(fluid.default_startup_program())
print("\nRandomly initialize parameters...\n")
def _init_pred(self, instance, infer_model_path):
inst = instance
if 'pred_output_path' not in inst.config:
inst.config['pred_output_path'] = os.path.join(inst.config.get('save_path', '.'), inst.name)
if not os.path.exists(inst.config['pred_output_path']):
os.makedirs(inst.config['pred_output_path'])
pred_backbone = self.Backbone(self.bb_conf, phase='pred')
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=self.bb_conf)
inst.task_layer['pred'] = pred_parad
pred_joint_input_names, pred_joint_shape_and_dtypes, name_to_position = merge_input_attrs(
pred_backbone.inputs_attr, inst.task_layer['pred'].inputs_attrs['reader'],
insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
pred_prog = inst.load(infer_model_path)
if inst.reader['pred'] is None:
pred_reader = inst.Reader(inst.config, phase='pred')
inst.reader['pred'] = pred_reader
return pred_prog
def load_pretrain(self, pretrain_path=None):
# load pretrain model (or ckpt)
if pretrain_path is None:
assert 'pretrain_path' in self.main_conf, "pretrain_path NOT set."
pretrain_path = self.main_conf['pretrain_path']
init_pretraining_params(
self.exe,
pretrain_path,
main_program=fluid.default_startup_program())
def train(self):
if not self.has_init_train:
self._init_train()
self.has_init_train = True
instances = self.instances
num_instances = self.num_instances
main_inst = self.main_inst
main_conf = main_inst.config
backbone = self.train_backbone
train_program = self.train_program
saver_program = self.saver_program
fetches = self.fetches
finish = []
for inst in instances:
if inst.is_target:
if inst.expected_train_steps > 0:
finish.append(False)
else:
finish.append(True)
print(inst.name+': train finished!')
inst.save()
def train_finish():
for inst in instances:
if inst.is_target:
if not inst.train_finish:
return False
return True
def pack_multicard_feed(iterator, net_inputs, dev_count):
ret = []
mask = []
for i in range(dev_count):
temp = {}
content, flag = next(iterator)
for q, var in net_inputs.items():
temp[var.name] = content[q]
ret.append(temp)
mask.append(1 if flag else 0)
return ret, mask
# do training
fetch_names, fetch_list = zip(*fetches.items())
main_step = 0 # only count for main task
global_step = 0 # count for all tasks
epoch = 0
time_begin = time.time()
backbone_buffer = []
def multi_dev_reader(reader, dev_count):
def worker(reader, dev_count, queue):
dev_batches = []
for index, data in enumerate(reader()):
if len(dev_batches) < dev_count:
dev_batches.append(data)
if len(dev_batches) == dev_count:
queue.put((dev_batches, 0))
dev_batches = []
# For the prediction of the remained batches, pad more batches to
# the number of devices and the padded samples would be removed in
# prediction outputs.
if len(dev_batches) > 0:
num_pad = dev_count - len(dev_batches)
for i in range(len(dev_batches), dev_count):
dev_batches.append(dev_batches[-1])
queue.put((dev_batches, num_pad))
queue.put(None)
queue = Queue.Queue(dev_count*2)
p = Thread(
target=worker, args=(reader, dev_count, queue))
p.daemon = True
p.start()
while True:
ret = queue.get()
if ret is not None:
batches, num_pad = ret
queue.task_done()
for batch in batches:
flag = num_pad == 0
if num_pad > 0:
num_pad -= 1
yield batch, flag
else:
break
queue.join()
joint_iterator = multi_dev_reader(self._joint_iterator_fn, self.dev_count)
while not train_finish():
feed, mask = pack_multicard_feed(joint_iterator, self._net_inputs, self.dev_count)
rt_outputs = self.exe.run(train_program, feed=feed, fetch_list=fetch_list)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist()
rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id
cur_task = instances[rt_task_id]
backbone_rt_outputs = {k:v for k,v in rt_outputs.items() if '/' not in k}
backbone_buffer.append(backbone.postprocess(backbone_rt_outputs))
task_rt_outputs = {k[len(cur_task.name+'/'):]: v for k,v in rt_outputs.items() if k.startswith(cur_task.name+'/')}
instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs)
global_step += 1
cur_task.cur_train_step += 1
cur_task_global_step = cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch
if cur_task.is_target and cur_task.save_infermodel_every_n_steps > 0 and cur_task_global_step % cur_task.save_infermodel_every_n_steps == 0:
cur_task.save(suffix='.step'+str(cur_task_global_step))
if global_step % main_conf.get('print_every_n_steps', 5) == 0:
loss = rt_outputs[cur_task.name+'/loss']
loss = np.mean(np.squeeze(loss)).tolist()
time_end = time.time()
time_cost = time_end - time_begin
print("Global step: {}. Task: {}, step {}/{} (epoch {}), loss: {:.3f}, speed: {:.2f} steps/s".format(
global_step, cur_task.name, cur_task.cur_train_step, cur_task.steps_pur_epoch, cur_task.cur_train_epoch,
loss, main_conf.get('print_every_n_steps', 5) / time_cost))
time_begin = time.time()
if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps:
print(cur_task.name+': train finished!')
cur_task.save()
if 'save_ckpt_every_n_steps' in main_conf and global_step % main_conf['save_ckpt_every_n_steps'] == 0:
save_path = os.path.join(main_conf['save_path'], 'ckpt',
"step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program)
print('checkpoint has been saved at '+save_path)
save_path = os.path.join(main_conf['save_path'], 'ckpt',
"step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program)
print('checkpoint has been saved at '+save_path)
print("ALL tasks train finished, exiting...")
def pred(self, task_instance, inference_model_dir=None):
if self._for_train:
raise Exception('This controller is a trainer. Please build a new controller with for_train=False for predicting.')
assert isinstance(task_instance, str)
if isinstance(inference_model_dir, str):
assert os.path.exists(inference_model_dir), inference_model_dir+" not found."
# if not self.has_init_pred and inference_model_dir is None:
# raise ValueError('infer_model_path is required for prediction.')
if inference_model_dir is None:
assert 'save_path' in self.mtl_conf, "one of the `inference_model_dir` and 'save_path' should be set to load inference model."
inference_model_dir = os.path.join(self.mtl_conf['save_path'], task_instance, 'infer_model')
instance = None
for inst in self.instances:
if inst.name == task_instance:
instance = inst
break
if instance is None:
raise ValueError(task_instance + ' is not a valid task_instance.')
pred_prog = self._init_pred(instance, inference_model_dir)
inst = instance
print(inst.name+": loading data...")
inst.reader['pred'].load_data()
fetch_names, fetch_vars = inst.pred_fetch_list
print('predicting...')
mapper = {k:v for k,v in inst.pred_input}
buf = []
for feed in inst.reader['pred'].iterator():
feed = _encode_inputs(feed, inst.name, cand_set=mapper)
feed = {mapper[k]: v for k,v in feed.items()}
rt_outputs = self.exe.run(pred_prog, feed, fetch_vars)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
inst.postprocess(rt_outputs, phase='pred')
if inst.task_layer['pred'].epoch_inputs_attrs:
reader_outputs = inst.reader['pred'].get_epoch_outputs()
else:
reader_outputs = None
inst.epoch_postprocess({'reader':reader_outputs}, phase='pred')
if __name__ == '__main__':
assert len(sys.argv) == 2, "Usage: python mtl_controller.py <mtl_conf_path>"
conf_path = sys.argv[1]
del sys.argv[1]
controller = Controller(conf_path)
if controller.main_conf['do_train']:
controller.train()
__all__ = ["Controller"]
......@@ -20,102 +20,36 @@ from __future__ import print_function
import numpy as np
import paddle.fluid as fluid
from paddlepalm.optimizer.base_optimizer import BaseOptimizer
class schedualer(object):
class Adam(BaseOptimizer):
def __init__(self):
pass
def __init__(self, loss_var, lr, lr_schedualer=None):
def lr(self):
pass
BaseOptimizer.__init__(self, loss_var, lr, lr_schedualer=None)
def ConstantLearning():
def __init__(self, lr):
self._loss = loss_var
self._lr = lr
self._lr_schedualer = lr_schedualer
def lr(self):
return self._lr
def LinearWarmupLearning():
def linear_warmup_decay(learning_rate, warmup_steps, num_train_steps):
""" Applies linear warmup of learning rate from 0 and decay to 0."""
with fluid.default_main_program()._lr_schedule_guard():
lr = fluid.layers.tensor.create_global_var(
shape=[1],
value=0.0,
dtype='float32',
persistable=True,
name="scheduled_learning_rate")
global_step = fluid.layers.learning_rate_scheduler._decay_step_counter()
with fluid.layers.control_flow.Switch() as switch:
with switch.case(global_step < warmup_steps):
warmup_lr = learning_rate * (global_step / warmup_steps)
fluid.layers.tensor.assign(warmup_lr, lr)
with switch.default():
decayed_lr = fluid.layers.learning_rate_scheduler.polynomial_decay(
learning_rate=learning_rate,
decay_steps=num_train_steps,
end_learning_rate=0.0,
power=1.0,
cycle=False)
fluid.layers.tensor.assign(decayed_lr, lr)
def build(self, grad_clip=None):
return lr
if self._lr_schedualer is not None:
self._lr = self._lr_schedualer.build(self._lr)
optimizer = fluid.optimizer.Adam(learning_rate=self._lr)
def optimize(loss, config, max_train_steps=None, warmup_steps=0, train_program=None):
if warmup_steps > 0:
decay_strategy = config.get('lr_scheduler', 'linear_warmup_decay')
if decay_strategy == 'noam_decay':
scheduled_lr = fluid.layers.learning_rate_scheduler\
.noam_decay(1/(warmup_steps *(config['learning_rate'] ** 2)),
warmup_steps)
elif decay_strategy == 'linear_warmup_decay':
scheduled_lr = linear_warmup_decay(config['learning_rate'], warmup_steps,
max_train_steps)
else:
raise ValueError("Unkown lr_scheduler, should be "
"'noam_decay' or 'linear_warmup_decay'")
optimizer = fluid.optimizer.Adam(learning_rate=scheduled_lr)
else:
optimizer = fluid.optimizer.Adam(learning_rate=config['learning_rate'])
scheduled_lr = config['learning_rate']
clip_norm_thres = 1.0
if grad_clip is not None:
clip_norm_thres = grad_clip
# When using mixed precision training, scale the gradient clip threshold
# by loss_scaling
fluid.clip.set_gradient_clip(
clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=clip_norm_thres))
def exclude_from_weight_decay(name):
if name.find("layer_norm") > -1:
return True
bias_suffix = ["_bias", "_b", ".b_0"]
for suffix in bias_suffix:
if name.endswith(suffix):
return True
return False
param_list = dict()
for param in train_program.global_block().all_parameters():
param_list[param.name] = param * 1.0
param_list[param.name].stop_gradient = True
_, param_grads = optimizer.minimize(loss)
_, param_grads = optimizer.minimize(self._loss)
return param_grads
def get_cur_learning_rate(self):
return self._lr
if config.get('weight_decay', 0) > 0:
for param, grad in param_grads:
if exclude_from_weight_decay(param.name):
continue
with param.block.program._optimized_guard(
[param, grad]), fluid.framework.name_scope("weight_decay"):
updated_param = param - param_list[
param.name] * config['weight_decay'] * scheduled_lr
fluid.layers.assign(output=param, input=updated_param)
class BaseOptimizer():
def __init__(self, loss_var, lr, lr_schedualer=None):
self._prog = None
self._lr_schedualer = lr_schedualer
def build(self, grad_clip=None):
pass
def _set_prog(self, prog):
self._prog = prog
if self._lr_schedualer is not None:
self._lr_schedualer._set_prog(prog)
def get_cur_learning_rate(self):
pass
......@@ -14,15 +14,21 @@
# limitations under the License.
"""v1.1"""
from copy import copy
class reader(object):
class BaseReader(object):
"""interface of data manager."""
def __init__(self, config, phase='train'):
assert isinstance(config, dict)
self._config = config
def __init__(self, phase='train'):
# assert isinstance(config, dict)
# self._config = config
self._phase = phase
self._register = set()
self._registered_backbone = None
def copy(self, phase=self._phase):
@classmethod
def create_register(self):
return set()
def clone(self, phase='train'):
if phase == self._phase:
return copy(self)
else:
......@@ -30,7 +36,25 @@ class reader(object):
ret._phase = phase
return ret
def require_attr(self, attr_name):
self._register.add(attr_name)
def register_with(self, backbone):
print(backbone)
for attr in backbone.inputs_attr:
self.require_attr(attr)
self._registered_backbone = backbone
def get_registered_backbone(self):
return self._registered_backbone
def _get_registed_attrs(self, attrs):
ret = {}
for i in self._register:
if i not in attrs:
raise NotImplementedError('output attr {} is not found in this reader.'.format(i))
ret[i] = attrs[i]
return ret
# @property
# def inputs_attr(self):
......
......@@ -13,108 +13,69 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from paddlepalm.interface import reader
from paddlepalm.reader.utils.reader4ernie import ClassifyReader
from paddlepalm.reader.base_reader import BaseReader
from paddlepalm.reader.utils.reader4ernie import ClassifyReader as CLSReader
def classify(data_path, vocab_path, batch_size, max_len, \
pred_batch_size=None, file_format='csv', tokenizer='wordpiece', \
lang='en', shuffle_train=True, seed=None, do_lower_case=False, \
seed=None, phase='train'):
assert lang.lower() in ['en', 'cn', 'english', 'chinese'], "supported language: en (English), cn (Chinese)."
assert phase in ['train', 'pred'], "supported phase: train, pred."
config = {
'train_file': data_path,
'pred_file': data_path,
'batch_size': batch_size,
'pred_batch_size': pred_batch_size,
'max_len': max_len,
'file_format': file_format,
'tokenizer': tokenizer,
'for_cn': lang.lower() == 'cn' or lang.lower() == 'chinese',
'shuffle_train': shuffle_train,
'do_lower_case': do_lower_case,
'seed': seed
}
if pred_batch_size is None:
del config['pred_batch_size']
class ClassifyReader(BaseReader):
return Reader(config, phase=phase)
def __init__(self, vocab_path, max_len, tokenizer='wordpiece', \
lang='en', seed=None, do_lower_case=False, phase='train'):
class Reader(reader):
BaseReader.__init__(self, phase)
def __init__(self, config, phase='train', print_prefix=''):
"""
Args:
phase: train, eval, pred
"""
assert lang.lower() in ['en', 'cn', 'english', 'chinese'], "supported language: en (English), cn (Chinese)."
assert phase in ['train', 'pred'], "supported phase: train, pred."
self._is_training = phase == 'train'
for_cn = lang.lower() == 'cn' or lang.lower() == 'chinese'
reader = ClassifyReader(config['vocab_path'],
max_seq_len=config['max_len'],
do_lower_case=config.get('do_lower_case', False),
for_cn=config.get('for_cn', False),
random_seed=config.get('seed', None))
self._reader = reader
self._register.add('token_ids')
if phase == 'train':
self._register.add('label_ids')
self._batch_size = config['batch_size']
self._max_seq_len = config['max_len']
self._is_training = phase == 'train'
self._input_file = config['data_path']
if phase == 'train':
self._num_epochs = None # 防止iteartor终止
self._shuffle = config.get('shuffle_train', True)
# self._shuffle_buffer = config.get('shuffle_buffer', 5000)
elif phase == 'eval':
self._num_epochs = 1
self._shuffle = False
self._batch_size = config.get('pred_batch_size', self._batch_size)
elif phase == 'pred':
self._num_epochs = 1
self._shuffle = False
self._batch_size = config.get('pred_batch_size', self._batch_size)
cls_reader = CLSReader(vocab_path,
max_seq_len=max_len,
do_lower_case=do_lower_case,
for_cn=for_cn,
random_seed=seed)
self._reader = cls_reader
self._phase = phase
# self._batch_size =
self._print_first_n = config.get('print_first_n', 0)
# self._print_first_n = config.get('print_first_n', 0)
@property
def outputs_attr(self):
if self._is_training:
return {"token_ids": [[-1, -1], 'int64'],
attrs = {"token_ids": [[-1, -1], 'int64'],
"position_ids": [[-1, -1], 'int64'],
"segment_ids": [[-1, -1], 'int64'],
"input_mask": [[-1, -1, 1], 'float32'],
"label_ids": [[-1], 'int64'],
"task_ids": [[-1, -1], 'int64']
}
else:
return {"token_ids": [[-1, -1], 'int64'],
"position_ids": [[-1, -1], 'int64'],
"segment_ids": [[-1, -1], 'int64'],
"task_ids": [[-1, -1], 'int64'],
"input_mask": [[-1, -1, 1], 'float32']
}
return self._get_registed_attrs(attrs)
def load_data(self):
self._data_generator = self._reader.data_generator(self._input_file, self._batch_size, self._num_epochs, shuffle=self._shuffle, phase=self._phase)
def _load_data(self, input_file, batch_size, num_epochs=None, \
file_format='csv', shuffle_train=True):
self._data_generator = self._reader.data_generator(input_file, batch_size, \
num_epochs, shuffle=shuffle_train if self._phase == 'train' else False, \
phase=self._phase)
def iterator(self):
def _iterator(self):
def list_to_dict(x):
names = ['token_ids', 'segment_ids', 'position_ids', 'task_ids', 'input_mask',
'label_ids', 'unique_ids']
outputs = {n: i for n,i in zip(names, x)}
del outputs['unique_ids']
if not self._is_training:
del outputs['label_ids']
return outputs
for batch in self._data_generator():
yield list_to_dict(batch)
outputs = {n: i for n,i in zip(names, batch)}
ret = []
# TODO: move runtime shape check here
for attr in self.outputs_attr.keys():
ret[attr] = outputs[attr]
yield ret
def get_epoch_outputs(self):
return {'examples': self._reader.get_examples(self._phase),
......@@ -124,3 +85,4 @@ class Reader(reader):
def num_examples(self):
return self._reader.get_num_examples(phase=self._phase)
......@@ -13,42 +13,53 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from paddlepalm.interface import reader as base_reader
from paddlepalm.interface import task_paradigm as base_paradigm
from __future__ import print_function
import os
import json
from paddle import fluid
import importlib
from paddlepalm.default_settings import *
import paddlepalm.utils.basic_helper as helper
from paddlepalm.utils import reader_helper
# from paddlepalm.default_settings import *
DEBUG=False
def Trainer(object):
def __init__(self, name, reader, task, mix_ratio=1.0, \
save_predict_model=True, save_path=None, save_steps=-1)\
reuse_with=None, silent=False):
class Trainer(object):
def __init__(self, name, reader, task_head, \
save_predict_model=False, pred_head=None, save_path=None, save_steps=-1, \
mix_ratio=1.0, reuse_head_with=None, \
silent=False):
self._name = name
self._verbose = not silent
self._reader = reader
self._pred_reader = None
self._task_head = task_head
self._pred_head = pred_head
if save_predict_model:
self._save_predict_model = True
assert save_path is not None, "save_path is required when save_predict_model is set."
assert save_steps == -1 or save_steps > 0, "save_steps should be -1 (only save the last step of this task) or larger than 0"
assert pred_reader is not None and pred_task is not None, ""
self._save_infermodel_path = os.path.join(self._config['save_path'], self._name, 'infer_model')
assert pred_head is not None, "pred_head is required to save predict model."
self._pred_reader = reader.clone(phase='pred')
if save_path is not None and not os.path.exists(save_path):
os.makedirs(save_path)
else:
self._save_infermodel_path = infermodel_save_path
assert save_path is None, "You should set save_predict_model as True, or the save_path is invalid."
assert save_steps == -1 or save_steps == 0, "You should set save_predict_model as True, or the save_steps is invalid."
assert pred_head is None, "You should set save_predict_model as True, or the pred_head is invalid."
self._save_infermodel_every_n_steps = save_infermodel_every_n_steps
self._save_steps = save_steps
self._is_target = as_target
self._first_target = False
self._task_reuse_scope = name if task_layer_reuse is None else task_layer_reuse
self._task_reuse_scope = name if reuse_head_with is None else reuse_head_with
self._feeded_var_names = None
self._target_vars = None
self._num_examples = 0
# training process management
self._mix_ratio = mix_ratio
self._expected_train_steps = None
......@@ -59,10 +70,10 @@ def Trainer(object):
self._train_finish = False
# 存放不同运行阶段(train,eval,pred)的数据集reader,key为phase,value为Reader实例
self._reader = {'train': reader, 'eval': None, 'pred': pred_reader}
self._input_layer = None
# self._reader = {'train': reader, 'eval': None, 'pred': self._pred_reader}
# self._input_layer = None
self._inputname_to_varname = {}
self._task_layer = {'train': tasklayer, 'eval': None, 'pred': pred_tasklayer}
# self._task_layer = {'train': task_head, 'eval': None, 'pred': pred_head}
self._pred_input_name_list = []
self._pred_input_varname_list = []
self._pred_fetch_name_list = []
......@@ -76,10 +87,183 @@ def Trainer(object):
'fetch_list': 'self._pred_fetch_name_list'}
self._lock = False
def _build_task_layer(self, net_inputs, phase, scope=""):
output_vars = self._task_layer[phase].build(net_inputs, scope_name=scope)
self._build_forward = False
def build_forward(self, backbone, pred_backbone=None):
# assert self._backbone is not None, "backbone is required for Trainer to build net forward to run with single task mode"
self._build_forward = True
if self._save_predict_model:
assert pred_backbone is not None, ""
# create reader, task
# then check i/o across reader, backbone and task_layer
task_attrs = []
pred_task_attrs = []
task_attr_from_reader = helper.encode_inputs(self._task_head.inputs_attrs['reader'], self.name)
# task_attr_from_reader = self._task_head.inputs_attrs['reader']
# _check_io(backbone.inputs_attr, inst._reader['train'].outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train')
# _check_io(inst.taskblock['train'].inputs_attrs['reader'], inst._reader['train'].outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train')
# _check_io(inst._taskblock['train'].inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone')
if self._save_predict_model:
pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name)
# pred_task_attr_from_reader = self._pred_head.inputs_attrs['reader']
# _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
# merge reader input attrs from backbone and task_instances
input_names, shape_and_dtypes, name_to_position = reader_helper.merge_input_attrs(backbone.inputs_attr, task_attr_from_reader)
pred_input_names, pred_shape_and_dtypes, _ = reader_helper.merge_input_attrs(backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
# shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN]
self._shape_and_dtypes = shape_and_dtypes
self._name_to_position = name_to_position
if DEBUG:
print('----- for debug -----')
print('joint input names:')
print(joint_input_names)
print('joint input shape and dtypes:')
print(joint_shape_and_dtypes)
input_attrs = [[i, j, k] for i, (j,k) in zip(input_names, shape_and_dtypes)]
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)]
train_prog = fluid.Program()
train_init_prog = fluid.Program()
self._prog = train_prog
self._train_prog = train_prog
self._train_init_prog = train_init_prog
with fluid.program_guard(train_prog, train_init_prog):
net_inputs = reader_helper.create_net_inputs(input_attrs, async=False)
# build backbone and task layers
# bb_output_vars = self._backbone.build(net_inputs, scope_name='__paddlepalm_')
bb_output_vars = backbone.build(net_inputs)
assert sorted(bb_output_vars.keys()) == sorted(backbone.outputs_attr.keys())
pred_prog = fluid.Program()
pred_init_prog = fluid.Program()
with fluid.program_guard(pred_prog, pred_init_prog):
pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs)
# pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
pred_bb_output_vars = pred_backbone.build(pred_net_inputs)
# fluid.framework.switch_main_program(train_prog)
# fluid.framework.switch_startup_program(train_init_prog)
task_output_vars = {}
task_inputs = {'backbone': bb_output_vars}
task_inputs_from_reader = helper.decode_inputs(net_inputs, self.name)
task_inputs['reader'] = task_inputs_from_reader
scope = self.name+'.'
with fluid.program_guard(train_prog, train_init_prog):
with fluid.unique_name.guard(scope):
output_vars = self._build_head(task_inputs, phase='train', scope=scope)
output_vars = {self.name+'.'+key: val for key, val in output_vars.items()}
old = len(task_output_vars) # for debug
task_output_vars.update(output_vars)
assert len(task_output_vars) - old == len(output_vars) # for debug
# prepare predict vars for saving inference model
if self._save_predict_model:
with fluid.program_guard(pred_prog, pred_init_prog):
cur_inputs = helper.decode_inputs(pred_net_inputs, self.name)
# self.pred_input = cur_inputs
self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in cur_inputs.items()])
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = self.name + '.'
with fluid.unique_name.guard(scope):
self._build_head(pred_task_inputs, phase='pred', scope=scope)
bb_fetches = {k: v.name for k,v in bb_output_vars.items()}
task_fetches = {k: v.name for k,v in task_output_vars.items()}
# fetches = task_fetches
# fetches['__task_id'] = net_inputs['__task_id'].name
# compute loss
# task_id_var = net_inputs['__task_id']
# task_id_vec = layers.one_hot(task_id_var, num_instances)
# losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0)
# loss = layers.reduce_sum(task_id_vec * losses)
with fluid.program_guard(train_prog, train_init_prog):
loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss'])
return loss_var
def build_backward(self, optimizer, weight_decay=None, use_ema=False, ema_decay=0.9999):
# build optimizer
optimizer._set_prog(self._train_prog)
with fluid.program_guard(self._train_prog, self._train_init_prog):
param_grads = optimizer.build()
if weight_decay is not None:
param_list = dict()
for param in self._prog.global_block().all_parameters():
param_list[param.name] = param * 1.0
param_list[param.name].stop_gradient = True
def exclude_from_weight_decay(name):
if name.find("layer_norm") > -1:
return True
bias_suffix = ["_bias", "_b", ".b_0"]
for suffix in bias_suffix:
if name.endswith(suffix):
return True
return False
for param, grad in param_grads:
if exclude_from_weight_decay(param.name):
continue
with param.block.program._optimized_guard(
[param, grad]), fluid.framework.name_scope("weight_decay"):
updated_param = param - param_list[
param.name] * weight_decay * optimizer.get_cur_learning_rate()
fluid.layers.assign(output=param, input=updated_param)
# loss.persistable = True
if use_ema:
ema = fluid.optimizer.ExponentialMovingAverage(ema_decay)
ema.update()
def load_data(self, input_file, file_format, batch_size, num_epochs=None, shuffle_train=True):
# load data
print("preparing data...", end='')
self._reader._load_data(input_file=input_file, batch_size=batch_size, \
num_epochs=num_epochs, file_format=file_format, \
shuffle_train=shuffle_train)
self._num_examples = self._reader.num_examples
print('ok!')
# merge dataset iterators and create net input vars
iterator = self._reader._iterator()
prefix = self.name
# 对yield出的数据进行runtime检查和适配
iterator_fn = reader_helper.create_iterator_fn(iterator, prefix, self._shape_and_dtypes, self._name_to_position)
return iterator_fn
def random_init_params(self):
helper.build_executor()
def _build_head(self, net_inputs, phase, scope=""):
if phase == 'train':
output_vars = self._task_head.build(net_inputs, scope_name=scope)
if phase == 'pred':
output_vars = self._pred_head.build(net_inputs, scope_name=scope)
if output_vars is not None:
self._pred_fetch_name_list, self._pred_fetch_var_list = zip(*output_vars.items())
else:
......@@ -126,62 +310,22 @@ def Trainer(object):
return self._name
@property
def _Reader(self):
return self._Reader
@property
def _Paradigm(self):
return self._Paradigm
@property
def _reader(self):
return self._reader
@property
def _pred_input(self):
return zip(*[self._pred_input_name_list, self._pred_input_varname_list])
@_pred_input.setter
def _pred_input(self, val):
assert isinstance(val, dict)
self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in val.items()])
@property
def _pred_fetch_list(self):
return [self._pred_fetch_name_list, self._pred_fetch_var_list]
@property
def _task_layer(self):
return self._task_layer
def num_examples(self):
return self._num_examples
@property
def _is_first_target(self):
return self._is_first_target
# @property
# def _pred_input(self):
# return zip(*[self._pred_input_name_list, self._pred_input_varname_list])
@_is_first_target.setter
def _is_first_target(self, value):
self._is_first_target = bool(value)
if self._is_first_target:
assert self._is_target, "ERROR: only target task could be set as main task."
if self._verbose and self._is_first_target:
print("{}: set as main task".format(self._name))
# @_pred_input.setter
# def _pred_input(self, val):
# assert isinstance(val, dict)
# self._pred_input_name_list, self._pred_input_varname_list = \
# zip(*[[k, v.name] for k,v in val.items()])
@property
def _is_target(self):
if self._is_target is not None:
return self._is_target
else:
raise ValueError("{}: is_target is None".format(self._name))
@_is_target.setter
def _is_target(self, value):
self._is_target = bool(value)
if self._verbose:
if self._is_target:
print('{}: set as target task.'.format(self._name))
else:
print('{}: set as aux task.'.format(self._name))
# @property
# def _pred_fetch_list(self):
# return [self._pred_fetch_name_list, self._pred_fetch_var_list]
@property
def mix_ratio(self):
......@@ -209,7 +353,7 @@ def Trainer(object):
return self._expected_train_steps
@expected_train_steps.setter
def _expected_train_steps(self, value):
def expected_train_steps(self, value):
self._expected_train_steps = value
self._expected_train_epochs = value / float(self._steps_pur_epoch)
......@@ -221,29 +365,25 @@ def Trainer(object):
def cur_train_epoch(self):
return self._cur_train_epoch
@cur_train_epoch.setter
def _cur_train_epoch(self, value):
self._cur_train_epoch = value
@property
def cur_train_step(self):
return self._cur_train_step
@cur_train_step.setter
def _cur_train_step(self, value):
self._cur_train_step = value
if self._cur_train_step > self._steps_pur_epoch:
self._cur_train_epoch += 1
self._cur_train_step = 1
if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps:
self._train_finish = True
# @cur_train_step.setter
# def _cur_train_step(self, value):
# self._cur_train_step = value
# if self._cur_train_step > self._steps_pur_epoch:
# self._cur_train_epoch += 1
# self._cur_train_step = 1
# if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps:
# self._train_finish = True
@property
def steps_pur_epoch(self):
return self._steps_pur_epoch
@steps_pur_epoch.setter
def _steps_pur_epoch(self, value):
def steps_pur_epoch(self, value):
self._steps_pur_epoch = value
@property
......@@ -259,302 +399,3 @@ def Trainer(object):
def _set_lock(self):
self._lock = True
# @property
# def task_reuse_scope(self):
# if self._task_reuse_scope is not None:
# return self._task_reuse_scope
# else:
# raise ValueError("{}: task_reuse_scope is None".format(self._name))
# @task_reuse_scope.setter
# def task_reuse_scope(self, scope_name):
# self._task_reuse_scope = str(scope_name)
# if self._verbose:
# print('{}: task_reuse_scope is set to {}'.format(self._name, self._task_reuse_scope))
def check_req_args(conf, name):
assert 'reader' in conf, name+': reader is required to build TaskInstance.'
assert 'paradigm' in conf, name+': paradigm is required to build TaskInstance.'
assert 'train_file' in conf or 'pred_file' in conf, name+': at least train_file or pred_file should be provided to build TaskInstance.'
class TaskInstance(object):
def __init__(self, name, id, config, verbose=True):
self._name = name
self._config = config
self._verbose = verbose
check_req_args(config, name)
# parse Reader and Paradigm
reader_name = config['reader']
reader_mod = importlib.import_module(READER_DIR + '.' + reader_name)
Reader = getattr(reader_mod, 'Reader')
parad_name = config['paradigm']
parad_mod = importlib.import_module(PARADIGM_DIR + '.' + parad_name)
Paradigm = getattr(parad_mod, 'TaskType')
self._Reader = Reader
self._Paradigm = Paradigm
self._save_infermodel_path = os.path.join(self._config['save_path'], self._name, 'infer_model')
self._save_ckpt_path = os.path.join(self._config['save_path'], 'ckpt')
self._save_infermodel_every_n_steps = config.get('save_infermodel_every_n_steps', -1)
# following flags can be fetch from instance config file
self._is_target = config.get('is_target', True)
self._first_target = config.get('is_first_target', False)
self._task_reuse_scope = config.get('task_reuse_scope', name)
self._feeded_var_names = None
self._target_vars = None
# training process management
self._mix_ratio = None
self._expected_train_steps = None
self._expected_train_epochs = None
self._steps_pur_epoch = None
self._cur_train_epoch = 0
self._cur_train_step = 0
self._train_finish = False
# 存放不同运行阶段(train,eval,pred)的数据集reader,key为phase,value为Reader实例
self._reader = {'train': None, 'eval': None, 'pred': None}
self._input_layer = None
self._inputname_to_varname = {}
self._task_layer = {'train': None, 'eval': None, 'pred': None}
self._pred_input_name_list = []
self._pred_input_varname_list = []
self._pred_fetch_name_list = []
self._pred_fetch_var_list = []
self._exe = fluid.Executor(fluid.CPUPlace())
self._save_protocol = {
'input_names': 'self._pred_input_name_list',
'input_varnames': 'self._pred_input_varname_list',
'fetch_list': 'self._pred_fetch_name_list'}
def build_task_layer(self, net_inputs, phase, scope=""):
output_vars = self._task_layer[phase].build(net_inputs, scope_name=scope)
if phase == 'pred':
if output_vars is not None:
self._pred_fetch_name_list, self._pred_fetch_var_list = zip(*output_vars.items())
else:
self._pred_fetch_name_list = []
self._pred_fetch_var_list = []
return output_vars
def postprocess(self, rt_outputs, phase):
return self._task_layer[phase].postprocess(rt_outputs)
def epoch_postprocess(self, epoch_inputs, phase):
return self._task_layer[phase].epoch_postprocess(epoch_inputs)
def save(self, suffix=''):
dirpath = self._save_infermodel_path + suffix
self._pred_input_varname_list = [str(i) for i in self._pred_input_varname_list]
# fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, export_for_deployment = True)
prog = fluid.default_main_program().clone()
fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, prog)
conf = {}
for k, strv in self._save_protocol.items():
d = None
v = locals()
exec('d={}'.format(strv), globals(), v)
conf[k] = v['d']
with open(os.path.join(dirpath, '__conf__'), 'w') as writer:
writer.write(json.dumps(conf, indent=1))
print(self._name + ': inference model saved at ' + dirpath)
def load(self, infer_model_path=None):
if infer_model_path is None:
infer_model_path = self._save_infermodel_path
for k,v in json.load(open(os.path.join(infer_model_path, '__conf__'))).items():
strv = self._save_protocol[k]
exec('{}=v'.format(strv))
pred_prog, self._pred_input_varname_list, self._pred_fetch_var_list = \
fluid.io.load_inference_model(infer_model_path, self._exe)
print(self._name+': inference model loaded from ' + infer_model_path)
return pred_prog
@property
def name(self):
return self._name
@property
def Reader(self):
return self._Reader
# @Reader.setter
# def Reader(self, cls):
# assert base_reader.__name__ == cls.__bases__[-1].__name__, \
# "expect: {}, receive: {}.".format(base_reader.__name__, \
# cls.__bases__[-1].__name__)
# self._Reader = cls
@property
def Paradigm(self):
return self._Paradigm
# @Paradigm.setter
# def Paradigm(self, cls):
# assert base_paradigm.__name__ == cls.__bases__[-1].__name__, \
# "expect: {}, receive: {}.".format(base_paradigm.__name__, \
# cls.__bases__[-1].__name__)
# self._Paradigm = cls
@property
def config(self):
return self._config
@property
def reader(self):
return self._reader
@property
def pred_input(self):
return zip(*[self._pred_input_name_list, self._pred_input_varname_list])
@pred_input.setter
def pred_input(self, val):
assert isinstance(val, dict)
self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in val.items()])
@property
def pred_fetch_list(self):
return [self._pred_fetch_name_list, self._pred_fetch_var_list]
@property
def task_layer(self):
return self._task_layer
@property
def is_first_target(self):
return self._is_first_target
@is_first_target.setter
def is_first_target(self, value):
self._is_first_target = bool(value)
if self._is_first_target:
assert self._is_target, "ERROR: only target task could be set as main task."
if self._verbose and self._is_first_target:
print("{}: set as main task".format(self._name))
@property
def is_target(self):
if self._is_target is not None:
return self._is_target
else:
raise ValueError("{}: is_target is None".format(self._name))
@is_target.setter
def is_target(self, value):
self._is_target = bool(value)
if self._verbose:
if self._is_target:
print('{}: set as target task.'.format(self._name))
else:
print('{}: set as aux task.'.format(self._name))
@property
def mix_ratio(self):
if self._mix_ratio is not None:
return self._mix_ratio
else:
raise ValueError("{}: mix_ratio is None".format(self._name))
@mix_ratio.setter
def mix_ratio(self, value):
self._mix_ratio = float(value)
if self._verbose:
print('{}: mix_ratio is set to {}'.format(self._name, self._mix_ratio))
@property
def save_infermodel_every_n_steps(self):
return self._save_infermodel_every_n_steps
@property
def expected_train_steps(self):
return self._expected_train_steps
@expected_train_steps.setter
def expected_train_steps(self, value):
self._expected_train_steps = value
self._expected_train_epochs = value / float(self._steps_pur_epoch)
@property
def expected_train_epochs(self):
return self._expected_train_epochs
@property
def cur_train_epoch(self):
return self._cur_train_epoch
@cur_train_epoch.setter
def cur_train_epoch(self, value):
self._cur_train_epoch = value
@property
def cur_train_step(self):
return self._cur_train_step
@cur_train_step.setter
def cur_train_step(self, value):
self._cur_train_step = value
if self._cur_train_step > self._steps_pur_epoch:
self._cur_train_epoch += 1
self._cur_train_step = 1
if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps:
self._train_finish = True
@property
def steps_pur_epoch(self):
return self._steps_pur_epoch
@steps_pur_epoch.setter
def steps_pur_epoch(self, value):
self._steps_pur_epoch = value
@property
def train_finish(self):
return self._train_finish
@property
def task_reuse_scope(self):
if self._task_reuse_scope is not None:
return self._task_reuse_scope
else:
raise ValueError("{}: task_reuse_scope is None".format(self._name))
@task_reuse_scope.setter
def task_reuse_scope(self, scope_name):
self._task_reuse_scope = str(scope_name)
if self._verbose:
print('{}: task_reuse_scope is set to {}'.format(self._name, self._task_reuse_scope))
def check_instances(insts):
"""to check ids, first_target"""
pass
def _check_ids():
pass
def _check_targets():
pass
def _check_reuse_scopes():
pass
import basic_helper
import config_helper
# coding=utf-8
import os
import json
import yaml
from config_helper import PDConfig
def get_basename(f):
return os.path.splitext(f)[0]
def get_suffix(f):
return os.path.splitext(f)[-1]
def parse_yaml(f, asdict=True, support_cmd_line=False):
assert os.path.exists(f), "file {} not found.".format(f)
if support_cmd_line:
args = PDConfig(yaml_file=f, fuse_args=True)
args.build()
return args.asdict() if asdict else args
else:
if asdict:
with open(f, "r") as fin:
yaml_config = yaml.load(fin, Loader=yaml.SafeLoader)
return yaml_config
else:
raise NotImplementedError()
def parse_json(f, asdict=True, support_cmd_line=False):
assert os.path.exists(f), "file {} not found.".format(f)
if support_cmd_line:
args = PDConfig(json_file=f, fuse_args=support_cmd_line)
args.build()
return args.asdict() if asdict else args
else:
if asdict:
with open(f, "r") as fin:
config = json.load(fin)
return config
else:
raise NotImplementedError()
def parse_list(string, astype=str):
assert isinstance(string, str), "{} is not a string.".format(string)
if ',' not in string:
return [astype(string)]
string = string.replace(',', ' ')
return [astype(i) for i in string.split()]
def try_float(s):
try:
float(s)
return(float(s))
except:
return s
# TODO: 增加None机制,允许hidden size、batch size和seqlen设置为None
def check_io(in_attr, out_attr, strict=False, in_name="left", out_name="right"):
for name, attr in in_attr.items():
assert name in out_attr, in_name+': '+name+' not found in '+out_name
if attr != out_attr[name]:
if strict:
raise ValueError(name+': shape or dtype not consistent!')
else:
logging.warning('{}: shape or dtype not consistent!\n{}:\n{}\n{}:\n{}'.format(name, in_name, attr, out_name, out_attr[name]))
def encode_inputs(inputs, scope_name, sep='/', cand_set=None):
outputs = {}
for k, v in inputs.items():
if cand_set is not None:
if k in cand_set:
outputs[k] = v
if scope_name+sep+k in cand_set:
outputs[scope_name+sep+k] = v
else:
outputs[scope_name+sep+k] = v
return outputs
def decode_inputs(inputs, scope_name, sep='/', keep_unk_keys=True):
outputs = {}
for name, value in inputs.items():
# var for backbone are also available to tasks
if keep_unk_keys and sep not in name:
outputs[name] = value
# var for this inst
if name.startswith(scope_name+'/'):
outputs[name[len(scope_name+'/'):]] = value
return outputs
def build_executor(on_gpu):
if on_gpu:
place = fluid.CUDAPlace(0)
# dev_count = fluid.core.get_cuda_device_count()
else:
place = fluid.CPUPlace()
# dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
# return fluid.Executor(place), dev_count
return fluid.Executor(place)
def fit_attr(conf, fit_attr, strict=False):
for i, attr in fit_attr.items():
if i not in conf:
if strict:
raise Exception('Argument {} is required to create a controller.'.format(i))
else:
continue
conf[i] = attr(conf[i])
return conf
......@@ -88,7 +88,7 @@ def create_iterator_fn(iterator, iterator_prefix, shape_and_dtypes, outname_to_p
outputs = next(iterator) # dict type
prefix = iterator_prefixe
for outname, val in outputs.items():
task_outname = prefix + '/' + outname
task_outname = prefix + '.' + outname
if outname in outname_to_pos:
idx = outname_to_pos[outname]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册