提交 c877f4b5 编写于 作者: X xixiaoyao

fix trainer

上级 085a13d2
此差异已折叠。
...@@ -6,11 +6,12 @@ if __name__ == '__main__': ...@@ -6,11 +6,12 @@ if __name__ == '__main__':
max_seqlen = 512 max_seqlen = 512
batch_size = 4 batch_size = 4
num_epochs = 20 num_epochs = 2
lr = 1e-3 lr = 1e-3
vocab_path = './pretrain/ernie/vocab.txt' vocab_path = './pretrain/ernie/vocab.txt'
train_file = './data/cls4mrqa/train.tsv' train_file = './data/cls4mrqa/train.tsv'
predict_file = './data/cls4mrqa/dev.tsv'
config = json.load(open('./pretrain/ernie/ernie_config.json')) config = json.load(open('./pretrain/ernie/ernie_config.json'))
# ernie = palm.backbone.ERNIE(...) # ernie = palm.backbone.ERNIE(...)
...@@ -23,15 +24,25 @@ if __name__ == '__main__': ...@@ -23,15 +24,25 @@ if __name__ == '__main__':
# 创建该分类任务的reader,由诸多参数控制数据集读入格式、文件数量、预处理规则等 # 创建该分类任务的reader,由诸多参数控制数据集读入格式、文件数量、预处理规则等
cls_reader = palm.reader.ClassifyReader(vocab_path, max_seqlen) cls_reader = palm.reader.ClassifyReader(vocab_path, max_seqlen)
predict_cls_reader = palm.reader.ClassifyReader(vocab_path, max_seqlen, phase='predict')
print(cls_reader.outputs_attr) print(cls_reader.outputs_attr)
print(predict_cls_reader.outputs_attr)
# 不同的backbone会对任务reader有不同的特征要求,例如对于分类任务,基本的输入feature为token_ids和label_ids,但是对于BERT,还要求从输入中额外提取position、segment、input_mask等特征,因此经过register后,reader会自动补充backbone所要求的字段 # 不同的backbone会对任务reader有不同的特征要求,例如对于分类任务,基本的输入feature为token_ids和label_ids,但是对于BERT,还要求从输入中额外提取position、segment、input_mask等特征,因此经过register后,reader会自动补充backbone所要求的字段
cls_reader.register_with(ernie) cls_reader.register_with(ernie)
print(cls_reader.outputs_attr) print(cls_reader.outputs_attr)
print(predict_cls_reader.outputs_attr)
print("preparing data...")
print(cls_reader.num_examples)
cls_reader.load_data(train_file, batch_size, num_epochs=num_epochs)
print(cls_reader.num_examples)
print('done!')
# 创建任务头(task head),如分类、匹配、机器阅读理解等。每个任务头有跟该任务相关的必选/可选参数。注意,任务头与reader是解耦合的,只要任务头依赖的数据集侧的字段能被reader提供,那么就是合法的 # 创建任务头(task head),如分类、匹配、机器阅读理解等。每个任务头有跟该任务相关的必选/可选参数。注意,任务头与reader是解耦合的,只要任务头依赖的数据集侧的字段能被reader提供,那么就是合法的
cls_head = palm.head.Classify(4, 1024, 0.1) cls_head = palm.head.Classify(4, 1024, 0.1)
# 根据reader和任务头来创建一个训练器trainer,trainer代表了一个训练任务,内部维护着训练进程、和任务的关键信息,并完成合法性校验,该任务的模型保存、载入等相关规则控制 # 根据reader和任务头来创建一个训练器trainer,trainer代表了一个训练任务,内部维护着训练进程、和任务的关键信息,并完成合法性校验,该任务的模型保存、载入等相关规则控制
trainer = palm.Trainer('senti_cls', cls_reader, cls_head) trainer = palm.Trainer('senti_cls')
# match4mrqa.reuse_head_with(mrc4mrqa) # match4mrqa.reuse_head_with(mrc4mrqa)
...@@ -39,19 +50,16 @@ if __name__ == '__main__': ...@@ -39,19 +50,16 @@ if __name__ == '__main__':
# output_vars = ernie.build(data_vars) # output_vars = ernie.build(data_vars)
# cls_head.build({'backbone': output_vars, 'reader': data_vars}) # cls_head.build({'backbone': output_vars, 'reader': data_vars})
loss_var = trainer.build_forward(ernie) loss_var = trainer.build_forward(ernie, cls_head)
# controller.build_forward() # controller.build_forward()
# Error! a head/backbone can be only build once! Try NOT to call build_forward method for any Trainer! # Error! a head/backbone can be only build once! Try NOT to call build_forward method for any Trainer!
print(trainer.num_examples) # n_steps = cls_reader.num_examples * num_epochs // batch_size
iterator_fn = trainer.load_data(train_file, 'csv', num_epochs=num_epochs, batch_size=batch_size) # warmup_steps = int(0.1 * n_steps)
print(trainer.num_examples) # print(warmup_steps)
# sched = palm.lr_sched.TriangularSchedualer(warmup_steps, n_steps)
n_steps = trainer.num_examples * num_epochs // batch_size sched = None
warmup_steps = int(0.1 * n_steps)
print(warmup_steps)
sched = palm.lr_sched.TriangularSchedualer(warmup_steps, n_steps)
adam = palm.optimizer.Adam(loss_var, lr, sched) adam = palm.optimizer.Adam(loss_var, lr, sched)
...@@ -60,17 +68,22 @@ if __name__ == '__main__': ...@@ -60,17 +68,22 @@ if __name__ == '__main__':
trainer.random_init_params() trainer.random_init_params()
trainer.load_pretrain('pretrain/ernie/params') trainer.load_pretrain('pretrain/ernie/params')
# print(trainer.train_one_step(next(iterator_fn())))
# trainer.train_one_epoch()
# for save predict model.
pred_ernie = palm.backbone.ERNIE.from_config(config, phase='pred')
cls_pred_head = palm.head.Classify(4, 1024, phase='pred')
trainer.build_predict_head(cls_pred_head, pred_ernie)
# trainer.train(iterator_fn, print_steps=1, save_steps=5, save_path='outputs', save_type='ckpt,predict') # trainer.train(iterator_fn, print_steps=1, save_steps=5, save_path='outputs', save_type='ckpt,predict')
trainer.train(iterator_fn, print_steps=1) trainer.fit_reader(cls_reader)
trainer.train(print_steps=1)
# trainer.save() # trainer.save()
print('prepare to predict...')
pred_ernie = palm.backbone.ERNIE.from_config(config, phase='pred')
cls_pred_head = palm.head.Classify(4, 1024, phase='pred')
trainer.build_predict_forward(pred_ernie, cls_pred_head)
predict_cls_reader.load_data(predict_file, 8)
print(predict_cls_reader.num_examples)
predict_cls_reader.register_with(pred_ernie)
trainer.fit_reader(predict_cls_reader, phase='predict')
print('predicting..')
trainer.predict(print_steps=20)
......
...@@ -58,52 +58,3 @@ class BaseBackbone(object): ...@@ -58,52 +58,3 @@ class BaseBackbone(object):
""" """
raise NotImplementedError() raise NotImplementedError()
class task_paradigm(object):
def __init__(self, config, phase, backbone_config):
"""
config: dict类型。描述了 任务实例(task instance)+多任务配置文件 中定义超参数
phase: str类型。运行阶段,目前支持train和predict
"""
@property
def inputs_attrs(self):
"""描述task_layer需要从reader, backbone等输入对象集合所读取到的输入对象的属性,第一级key为对象集和的名字,如backbone,reader等(后续会支持更灵活的输入),第二级key为对象集和中各对象的属性,包括对象的名字,shape和dtype。当某个对象为标量数据类型(如str, int, float等)时,shape设置为空列表[],当某个对象的某个维度长度可变时,shape中的相应维度设置为-1。
Return:
dict类型。对各个对象集及其输入对象的属性描述。"""
raise NotImplementedError()
@property
def outputs_attr(self):
"""描述task输出对象的属性,包括对象的名字,shape和dtype。输出对象会被加入到fetch_list中,从而在每个训练/推理step时得到runtime的计算结果,该计算结果会被传入postprocess方法中供用户处理。
当某个对象为标量数据类型(如str, int, float等)时,shape设置为空列表[],当某个对象的某个维度长度可变时,shape中的相应维度设置为-1。
Return:
dict类型。对各个输入对象的属性描述。注意,训练阶段必须包含名为loss的输出对象。
"""
raise NotImplementedError()
@property
def epoch_inputs_attrs(self):
return {}
def build(self, inputs, scope_name=""):
"""建立task_layer的计算图。将符合inputs_attrs描述的来自各个对象集的静态图Variables映射成符合outputs_attr描述的静态图Variable输出。
Args:
inputs: dict类型。字典中包含inputs_attrs中的对象名到计算图Variable的映射,inputs中至少会包含inputs_attr中定义的对象
Return:
需要输出的计算图变量,输出对象会被加入到fetch_list中,从而在每个训练/推理step时得到runtime的计算结果,该计算结果会被传入postprocess方法中供用户处理。
"""
raise NotImplementedError()
def postprocess(self, rt_outputs):
"""每个训练或推理step后针对当前batch的task_layer的runtime计算结果进行相关后处理。注意,rt_outputs除了包含build方法,还自动包含了loss的计算结果。"""
pass
def epoch_postprocess(self, post_inputs):
pass
...@@ -114,8 +114,6 @@ class ERNIE(BaseBackbone): ...@@ -114,8 +114,6 @@ class ERNIE(BaseBackbone):
input_mask = inputs['input_mask'] input_mask = inputs['input_mask']
task_ids = inputs['task_ids'] task_ids = inputs['task_ids']
fluid.layers.Print(src_ids)
# padding id in vocabulary must be set to 0 # padding id in vocabulary must be set to 0
emb_out = fluid.embedding( emb_out = fluid.embedding(
input=src_ids, input=src_ids,
......
...@@ -13,16 +13,20 @@ ...@@ -13,16 +13,20 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import json
class BaseHead(object): class BaseHead(object):
def __init__(self, config, phase, backbone_config): def __init__(self, phase='train'):
""" """
config: dict类型。描述了 任务实例(task instance)+多任务配置文件 中定义超参数 config: dict类型。描述了 任务实例(task instance)+多任务配置文件 中定义超参数
phase: str类型。运行阶段,目前支持train和predict phase: str类型。运行阶段,目前支持train和predict
""" """
self._stop_gradient = {} self._stop_gradient = {}
self._phase = phase
self._prog = None self._prog = None
self._results_buffer = []
@property @property
def inputs_attrs(self): def inputs_attrs(self):
...@@ -67,10 +71,31 @@ class BaseHead(object): ...@@ -67,10 +71,31 @@ class BaseHead(object):
raise NotImplementedError() raise NotImplementedError()
def postprocess(self, rt_outputs): def batch_postprocess(self, rt_outputs):
"""每个训练或推理step后针对当前batch的task_layer的runtime计算结果进行相关后处理。注意,rt_outputs除了包含build方法,还自动包含了loss的计算结果。""" """每个训练或推理step后针对当前batch的task_layer的runtime计算结果进行相关后处理。注意,rt_outputs除了包含build方法,还自动包含了loss的计算结果。"""
pass if isinstance(rt_outputs, dict):
keys = rt_outputs.keys()
vals = [rt_outputs[k] for k in keys]
lens = [len(v) for v in vals]
if len(set(lens)) == 1:
results = [dict(zip(*[keys, i])) for i in zip(*vals)]
self._results_buffer.extend(results)
return results
else:
print('WARNING: irregular output results. visualize failed.')
self._results_buffer.append(rt_outputs)
return None
def epoch_postprocess(self, post_inputs, output_dir=None):
if output_dir is not None:
for i in self._results_buffer:
print(i)
else:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(os.path.join(output_dir, self._phase), 'w') as writer:
for i in self._results_buffer:
writer.write(json.dumps(i)+'\n')
def epoch_postprocess(self, post_inputs):
pass
...@@ -87,14 +87,16 @@ class Classify(BaseHead): ...@@ -87,14 +87,16 @@ class Classify(BaseHead):
self._preds.extend(preds.tolist()) self._preds.extend(preds.tolist())
return preds return preds
def epoch_postprocess(self, post_inputs): def epoch_postprocess(self, post_inputs, output_dir=None):
# there is no post_inputs needed and not declared in epoch_inputs_attrs, hence no elements exist in post_inputs # there is no post_inputs needed and not declared in epoch_inputs_attrs, hence no elements exist in post_inputs
if not self._is_training: if not self._is_training:
if self._pred_output_path is None: if output_dir is None:
raise ValueError('argument pred_output_path not found in config. Please add it into config dict/file.')
with open(os.path.join(self._pred_output_path, 'predictions.json'), 'w') as writer:
for p in self._preds: for p in self._preds:
writer.write(str(p)+'\n') print(p)
print('Predictions saved at '+os.path.join(self._pred_output_path, 'predictions.json')) else:
with open(os.path.join(self._pred_output_path, 'predictions.json'), 'w') as writer:
for p in self._preds:
writer.write(str(p)+'\n')
print('Predictions saved at '+os.path.join(self._pred_output_path, 'predictions.json'))
...@@ -31,7 +31,7 @@ from paddlepalm.utils.saver import init_pretraining_params, init_checkpoint ...@@ -31,7 +31,7 @@ from paddlepalm.utils.saver import init_pretraining_params, init_checkpoint
from paddlepalm.utils.config_helper import PDConfig from paddlepalm.utils.config_helper import PDConfig
from paddlepalm.utils.print_helper import print_dict from paddlepalm.utils.print_helper import print_dict
from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn, create_joint_iterator_fn, merge_input_attrs from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn, create_joint_iterator_fn, merge_input_attrs
from paddlepalm.distribute import data_feeder from paddlepalm.distribute import data_feeder, decode_fake
from default_settings import * from default_settings import *
from task_instance import TaskInstance, check_instances from task_instance import TaskInstance, check_instances
...@@ -186,13 +186,20 @@ def _fit_attr(conf, fit_attr, strict=False): ...@@ -186,13 +186,20 @@ def _fit_attr(conf, fit_attr, strict=False):
def create_feed_batch_process_fn(net_inputs): def create_feed_batch_process_fn(net_inputs):
def feed_batch_process_fn(data):
def feed_batch_process_fn(data, id=-1):
# temps = {}
# for i in range(len(net_inputs)):
temp = {} temp = {}
for q, var in net_inputs.items(): inputs = net_inputs[id] if id != -1 else net_inputs
for q, var in inputs.items():
if isinstance(var, str) or isinstance(var, unicode): if isinstance(var, str) or isinstance(var, unicode):
temp[var] = data[q] temp[var] = data[q]
else: else:
temp[var.name] = data[q] temp[var.name] = data[q]
# temps[i] = temp
return temp return temp
return feed_batch_process_fn return feed_batch_process_fn
...@@ -221,6 +228,7 @@ class Controller(object): ...@@ -221,6 +228,7 @@ class Controller(object):
exe, dev_count = _init_env(use_gpu=mtl_conf.get('use_gpu', True)) exe, dev_count = _init_env(use_gpu=mtl_conf.get('use_gpu', True))
self.exe = exe self.exe = exe
self.dev_count = dev_count self.dev_count = dev_count
self.batch_size = mtl_conf.get('batch_size')
print_dict(mtl_conf, title='global configuration') print_dict(mtl_conf, title='global configuration')
...@@ -343,6 +351,7 @@ class Controller(object): ...@@ -343,6 +351,7 @@ class Controller(object):
dev_count = self.dev_count dev_count = self.dev_count
num_instances = len(instances) num_instances = len(instances)
mrs = self.mrs mrs = self.mrs
branch = fluid.data(name="branch",shape=[1],dtype='int64')
# set first_target/main task instance # set first_target/main task instance
main_inst = None main_inst = None
...@@ -362,35 +371,51 @@ class Controller(object): ...@@ -362,35 +371,51 @@ class Controller(object):
# create reader, task # create reader, task
# then check i/o across reader, backbone and task_layer # then check i/o across reader, backbone and task_layer
task_attrs = []
# check_fns = {}
task_attrs = {}
pred_task_attrs = [] pred_task_attrs = []
for inst in instances: joint_input_names = {}
train_reader = inst.Reader(inst.config, phase='train') joint_shape_and_dtypes = {}
inst.reader['train'] = train_reader name_to_position = {}
train_parad = inst.Paradigm(inst.config, phase='train', backbone_config=bb_conf) for i in range(num_instances):
inst.task_layer['train'] = train_parad # def check_tasks():
task_attr_from_reader = _encode_inputs(train_parad.inputs_attrs['reader'], inst.name) # i = s
task_attrs.append(task_attr_from_reader) # def checkeach():
train_reader = instances[i].Reader(instances[i].config, phase='train')
instances[i].reader['train'] = train_reader
train_parad = instances[i].Paradigm(instances[i].config, phase='train', backbone_config=bb_conf)
instances[i].task_layer['train'] = train_parad
task_attr_from_reader = _encode_inputs(train_parad.inputs_attrs['reader'], instances[i].name)
task_attrs[i] = task_attr_from_reader
_check_io(train_backbone.inputs_attr, train_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train') _check_io(train_backbone.inputs_attr, train_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train')
_check_io(train_parad.inputs_attrs['reader'], train_reader.outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train') _check_io(train_parad.inputs_attrs['reader'], train_reader.outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train')
_check_io(train_parad.inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone') _check_io(train_parad.inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone')
# merge reader input attrs from backbone and task_instances
if inst.is_target: # pred_joint_input_names = []
if 'pred_file' not in inst.config: # pred_joint_shape_and_dtypes = []
inst.config['pred_file'] = '' if instances[i].is_target:
pred_reader = inst.Reader(inst.config, phase='pred') if 'pred_file' not in instances[i].config:
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf) instances[i].config['pred_file'] = ''
inst.task_layer['pred'] = pred_parad pred_reader = instances[i].Reader(instances[i].config, phase='pred')
task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name) pred_parad = instances[i].Paradigm(instances[i].config, phase='pred', backbone_config=bb_conf)
instances[i].task_layer['pred'] = pred_parad
task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], instances[i].name)
pred_task_attrs.append(task_attr_from_reader) pred_task_attrs.append(task_attr_from_reader)
_check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred') _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred') _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone') _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
# pred_joint_input_names, pred_joint_shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, pred_task_attrs, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
# merge reader input attrs from backbone and task_instances # return joint_input_names[i], joint_shape_and_dtypes[i], name_to_position[i], pred_joint_input_names, pred_joint_shape_and_dtypes
joint_input_names, joint_shape_and_dtypes, name_to_position = merge_input_attrs(train_backbone.inputs_attr, task_attrs) # return checkeach
# check_fns[i] = check_tasks()
joint_input_names[i], joint_shape_and_dtypes[i], name_to_position[i] = merge_input_attrs(train_backbone.inputs_attr, task_attrs[i])
pred_joint_input_names, pred_joint_shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, pred_task_attrs, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False) pred_joint_input_names, pred_joint_shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, pred_task_attrs, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
# shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN] # shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN]
if DEBUG: if DEBUG:
...@@ -400,16 +425,18 @@ class Controller(object): ...@@ -400,16 +425,18 @@ class Controller(object):
print('joint input shape and dtypes:') print('joint input shape and dtypes:')
print(joint_shape_and_dtypes) print(joint_shape_and_dtypes)
# load data # load data
for inst in instances: data_fns={}
print(inst.name+": preparing data...", end='') for i in range(num_instances):
inst.reader['train'].load_data() print(instances[i].name+": preparing data...", end='')
instances[i].reader['train'].load_data()
print('ok!') print('ok!')
# merge dataset iterators and create net input vars # merge dataset iterators and create net input vars
iterators = [] iterators = []
prefixes = [] prefixes = []
mrs = [] mrs = []
for inst in instances: for inst in instances:
iterators.append(inst.reader['train'].iterator()) iterators.append(inst.reader['train'].iterator())
prefixes.append(inst.name) prefixes.append(inst.name)
...@@ -418,65 +445,65 @@ class Controller(object): ...@@ -418,65 +445,65 @@ class Controller(object):
joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict') joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict')
self._joint_iterator_fn = joint_iterator_fn self._joint_iterator_fn = joint_iterator_fn
input_attrs = [[i, j, k] for i, (j,k) in zip(joint_input_names, joint_shape_and_dtypes)] input_attrs = {}
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)] net_inputs = {}
# net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3) bb_output_vars = {}
net_inputs = create_net_inputs(input_attrs, async=False) bb_output_fns = {}
self._net_inputs = net_inputs
# build backbone and task layers # prepare predict vars for saving inference model
train_prog = fluid.default_main_program() pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)]
train_init_prog = fluid.default_startup_program()
bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_')
assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys())
pred_prog = fluid.Program() pred_prog = fluid.Program()
pred_init_prog = fluid.Program() pred_init_prog = fluid.Program()
self._pred_prog = pred_prog
with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog): with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog):
pred_net_inputs = create_net_inputs(pred_input_attrs) pred_net_inputs = create_net_inputs(pred_input_attrs)
pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_') pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
fluid.framework.switch_main_program(train_prog)
fluid.framework.switch_startup_program(train_init_prog)
task_inputs = {}
task_output_vars = {} task_output_vars = {}
for inst in instances: task_fns = {}
task_inputs = {'backbone': bb_output_vars}
task_inputs_from_reader = _decode_inputs(net_inputs, inst.name) def get_loss(i):
task_inputs['reader'] = task_inputs_from_reader input_attrs[i] = [[m, j, k] for m, (j,k) in zip(joint_input_names[i], joint_shape_and_dtypes[i])]
net_inputs[i] = create_net_inputs(input_attrs[i], async=False)
scope = inst.task_reuse_scope + '/' # net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3)
bb_output_vars[i] = train_backbone.build(net_inputs[i], scope_name='__paddlepalm_')
assert sorted(bb_output_vars[i].keys()) == sorted(train_backbone.outputs_attr.keys())
# build backbone and task layers
task_inputs[i] = {'backbone': bb_output_vars[i]}
task_inputs_from_reader = _decode_inputs(net_inputs[i], instances[i].name)
task_inputs[i]['reader'] = task_inputs_from_reader
scope = instances[i].task_reuse_scope + '/'
with fluid.unique_name.guard(scope): with fluid.unique_name.guard(scope):
output_vars = inst.build_task_layer(task_inputs, phase='train', scope=scope) output_vars = instances[i].build_task_layer(task_inputs[i], phase='train', scope=scope)
output_vars = {inst.name+'/'+key: val for key, val in output_vars.items()} output_vars = {instances[i].name+'/'+key: val for key, val in output_vars.items()}
old = len(task_output_vars) # for debug loss_var = output_vars[instances[i].name+'/loss']
task_output_vars.update(output_vars) task_output_vars[i] = output_vars
assert len(task_output_vars) - old == len(output_vars) # for debug
# prepare predict vars for saving inference model
if inst.is_target:
if instances[i].is_target:
with fluid.program_guard(pred_prog, pred_init_prog): with fluid.program_guard(pred_prog, pred_init_prog):
cur_inputs = _decode_inputs(pred_net_inputs, inst.name) cur_inputs = _decode_inputs(pred_net_inputs, instances[i].name)
inst.pred_input = cur_inputs instances[i].pred_input = cur_inputs
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs} pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = inst.task_reuse_scope + '/' scope = instances[i].task_reuse_scope + '/'
with fluid.unique_name.guard(scope): with fluid.unique_name.guard(scope):
inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope) instances[i].build_task_layer(pred_task_inputs, phase='pred', scope=scope)
return loss_var
bb_fetches = {k: v.name for k,v in bb_output_vars.items()} for i in range(num_instances):
task_fetches = {k: v.name for k,v in task_output_vars.items()} def task_loss():
fetches = task_fetches task_id = i
fetches['__task_id'] = net_inputs['__task_id'].name return lambda: get_loss(task_id)
task_fns[i] = task_loss()
# compute loss
task_id_var = net_inputs['__task_id'] loss = layers.switch_case(
task_id_vec = fluid.one_hot(task_id_var, num_instances) branch_index=branch,
losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0) branch_fns=task_fns
loss = layers.reduce_sum(task_id_vec * losses) )
self._switched_loss = loss.name
main_reader = main_inst.reader['train'] main_reader = main_inst.reader['train']
num_examples = main_reader.num_examples num_examples = main_reader.num_examples
...@@ -514,9 +541,9 @@ class Controller(object): ...@@ -514,9 +541,9 @@ class Controller(object):
self.saver_program = fluid.default_main_program() self.saver_program = fluid.default_main_program()
self.main_inst = main_inst self.main_inst = main_inst
self.fetches = fetches
self.has_init_train = True self.has_init_train = True
self.has_init_pred = True self.has_init_pred = True
self._net_inputs = net_inputs
self.exe.run(fluid.default_startup_program()) self.exe.run(fluid.default_startup_program())
print("\nRandomly initialize parameters...\n") print("\nRandomly initialize parameters...\n")
...@@ -569,8 +596,6 @@ class Controller(object): ...@@ -569,8 +596,6 @@ class Controller(object):
backbone = self.train_backbone backbone = self.train_backbone
train_program = self.train_program train_program = self.train_program
saver_program = self.saver_program saver_program = self.saver_program
fetches = self.fetches
finish = [] finish = []
for inst in instances: for inst in instances:
if inst.is_target: if inst.is_target:
...@@ -588,46 +613,45 @@ class Controller(object): ...@@ -588,46 +613,45 @@ class Controller(object):
return False return False
return True return True
# do training
fetch_names, fetch_list = zip(*fetches.items())
# do training
fetch_names = {}
fetch_list = []
main_step = 0 # only count for main task main_step = 0 # only count for main task
global_step = 0 # count for all tasks global_step = 0 # count for all tasks
epoch = 0 epoch = 0
time_begin = time.time() time_begin = time.time()
backbone_buffer = [] backbone_buffer = []
feed_batch_process_fn = create_feed_batch_process_fn(self._net_inputs) feed_batch_process_fn = create_feed_batch_process_fn(self._net_inputs)
distribute_feeder = data_feeder(self._joint_iterator_fn, feed_batch_process_fn) distribute_feeder = data_feeder(self._joint_iterator_fn, feed_batch_process_fn)
# palm.distribute.reader(self._joint_iterator_fn, self._net_inputs, prefetch_steps=2)
while not train_finish(): while not train_finish():
feed, mask = next(distribute_feeder) feed, mask, id = next(distribute_feeder)
for i in range(self.dev_count):
feed[i].update({'branch':np.array([id],dtype='int64')})
fetch_list.append(self._switched_loss)
rt_outputs = self.exe.run(train_program, feed=feed, fetch_list=fetch_list) rt_outputs = self.exe.run(train_program, feed=feed, fetch_list=fetch_list)
while mask.pop() == False: rt_loss = rt_outputs.pop()
rt_outputs.pop()
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)} rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist() cur_task = instances[id]
rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id
cur_task = instances[rt_task_id]
backbone_rt_outputs = {k:v for k,v in rt_outputs.items() if '/' not in k} # backbone_rt_outputs = {k:v for k,v in rt_outputs.items() if '/' not in k}
backbone_buffer.append(backbone.postprocess(backbone_rt_outputs)) # backbone_buffer.append(backbone.postprocess(backbone_rt_outputs))
task_rt_outputs = {k[len(cur_task.name+'/'):]: v for k,v in rt_outputs.items() if k.startswith(cur_task.name+'/')} # task_rt_outputs = {k[len(cur_task.name+'/'):]: v for k,v in rt_outputs.items() if k.startswith(cur_task.name+'/')}
instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs) # instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs)
global_step += 1 global_step += 1
cur_task.cur_train_step += 1 cur_task.cur_train_step += 1
cur_task_global_step = cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch cur_task_global_step = cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch
if cur_task.is_target and cur_task.save_infermodel_every_n_steps > 0 and cur_task_global_step % cur_task.save_infermodel_every_n_steps == 0: if cur_task.is_target and cur_task.save_infermodel_every_n_steps > 0 and cur_task_global_step % cur_task.save_infermodel_every_n_steps == 0:
cur_task.save(suffix='.step'+str(cur_task_global_step)) cur_task.save(suffix='.step'+str(cur_task_global_step), prog=self._pred_prog)
if global_step % main_conf.get('print_every_n_steps', 5) == 0: if global_step % main_conf.get('print_every_n_steps', 5) == 0:
loss = rt_outputs[cur_task.name+'/loss'] loss = rt_loss
loss = np.mean(np.squeeze(loss)).tolist() loss = np.mean(np.squeeze(loss)).tolist()
time_end = time.time() time_end = time.time()
...@@ -640,7 +664,7 @@ class Controller(object): ...@@ -640,7 +664,7 @@ class Controller(object):
if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps: if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps:
print(cur_task.name+': train finished!') print(cur_task.name+': train finished!')
cur_task.save() cur_task.save(prog=self._pred_prog)
if 'save_ckpt_every_n_steps' in main_conf and global_step % main_conf['save_ckpt_every_n_steps'] == 0: if 'save_ckpt_every_n_steps' in main_conf and global_step % main_conf['save_ckpt_every_n_steps'] == 0:
save_path = os.path.join(main_conf['save_path'], 'ckpt', save_path = os.path.join(main_conf['save_path'], 'ckpt',
...@@ -686,37 +710,26 @@ class Controller(object): ...@@ -686,37 +710,26 @@ class Controller(object):
print('predicting...') print('predicting...')
feed_batch_process_fn = create_feed_batch_process_fn(inst.pred_input) feed_batch_process_fn = create_feed_batch_process_fn(inst.pred_input)
distribute_feeder = data_feeder(inst.reader['pred'].iterator, feed_batch_process_fn, prefetch_steps=1) distribute_feeder = data_feeder(inst.reader['pred'].iterator, feed_batch_process_fn, prefetch_steps=1, phase='pred')
buf = [] buf = []
for feed, mask in distribute_feeder: for feed, mask, id in distribute_feeder:
print('before run')
rt_outputs = self.exe.run(pred_prog, feed, fetch_vars) rt_outputs = self.exe.run(pred_prog, feed, fetch_vars)
print('after run')
splited_rt_outputs = [] num_fakes = decode_fake(len(rt_outputs[0]), mask, self.batch_size)
for item in rt_outputs: for _ in range(num_fakes):
splited_rt_outputs.append(np.split(item, len(mask))) for item in rt_outputs:
# assert len(rt_outputs) == len(mask), [len(rt_outputs), len(mask)]
print(mask)
while mask.pop() == False:
print(mask)
for item in splited_rt_outputs:
item.pop() item.pop()
rt_outputs = []
print('cancat')
for item in splited_rt_outputs:
rt_outputs.append(np.concatenate(item))
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)} rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
inst.postprocess(rt_outputs, phase='pred') inst.postprocess(rt_outputs, phase='pred')
print('leave feeder')
if inst.task_layer['pred'].epoch_inputs_attrs: if inst.task_layer['pred'].epoch_inputs_attrs:
reader_outputs = inst.reader['pred'].get_epoch_outputs() reader_outputs = inst.reader['pred'].get_epoch_outputs()
else: else:
reader_outputs = None reader_outputs = None
print('epoch postprocess')
inst.epoch_postprocess({'reader':reader_outputs}, phase='pred') inst.epoch_postprocess({'reader':reader_outputs}, phase='pred')
...@@ -731,6 +744,3 @@ if __name__ == '__main__': ...@@ -731,6 +744,3 @@ if __name__ == '__main__':
__all__ = ["Controller"] __all__ = ["Controller"]
from paddlepalm.distribute import gpu_dev_count, cpu_dev_count
from paddlepalm import Trainer
dev_count = 1 if gpu_dev_count <= 1 else gpu_dev_count
VERBOSE=False
class MultiHeadTrainer(Trainer):
def __init__(self, trainers, reuse_flags=None):
assert len(trainers) == len(mix_ratios)
if reuse_flags is not None:
assert len(reuse_flags) == len(trainers)
self._trainers = trainers
def build_forward(self, backbone, head_dict):
num_heads = len(self._trainers)
assert len(head_dict) == num_heads
for t in trainers:
assert t.name in head_dict
train_prog = fluid.Program()
train_init_prog = fluid.Program()
def get_loss(i):
head = head_dict[self._trainers[i].name]
loss_var = self._trainers[i].build_forward(backbone, head, train_prog, train_init_prog)
return loss_var
task_fns = {}
for i in range(num_heads):
def task_loss():
task_id = i
return lambda: get_loss(task_id)
task_fns[i] = task_loss()
head_id_var = fluid.data(name="branch",shape=[1],dtype='int64')
loss_var = layers.switch_case(
branch_index=head_id_var,
branch_fns=task_fns
)
self._head_id_var = head_id_var
return loss_var
def fit_readers(self, reader_dict, mix_ratio, ):
num_heads = len(self._trainers)
assert len(head_dict) == num_heads
name_to_position = []
joint_shape_and_dtypes = []
iterators = []
prefixes = []
mrs = []
net_inputs = []
for t in trainers:
assert t.name in reader_dict
t.fit_reader(reader_dict[t.name])
net_inputs.append(t._net_inputs)
prefixes.append(t.name)
mrs.append(t.mix_ratio)
iterators.append(t._raw_iterator_fn())
name_to_position.append(t._name_to_position)
joint_shape_and_dtypes.append(t._shape_and_dtypes)
iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE, return_type='dict')
feed_batch_process_fn = reader_helper.create_multihead_feed_batch_process_fn(net_inputs)
if gpu_dev_count > 1:
distribute_feeder_fn = data_feeder(iterator_fn, feed_batch_process_fn)
else:
distribute_feeder_fn = iterator_fn
if phase == 'train':
self._train_reader = distribute_feeder_fn()
self._feed_batch_process_fn = feed_batch_process_fn
elif phase == 'predict':
self._predict_reader = distribute_feeder_fn()
self._pred_feed_batch_process_fn = feed_batch_process_fn
def train(self):
pass
def train_one_step(self):
pass
...@@ -37,8 +37,6 @@ class Adam(BaseOptimizer): ...@@ -37,8 +37,6 @@ class Adam(BaseOptimizer):
if self._lr_schedualer is not None: if self._lr_schedualer is not None:
self._lr = self._lr_schedualer.build(self._lr) self._lr = self._lr_schedualer.build(self._lr)
fluid.layers.Print(self._lr)
optimizer = fluid.optimizer.Adam(learning_rate=self._lr) optimizer = fluid.optimizer.Adam(learning_rate=self._lr)
if grad_clip is not None: if grad_clip is not None:
......
...@@ -21,6 +21,8 @@ class BaseReader(object): ...@@ -21,6 +21,8 @@ class BaseReader(object):
# assert isinstance(config, dict) # assert isinstance(config, dict)
# self._config = config # self._config = config
self._phase = phase self._phase = phase
self._batch_size = None
self._num_epochs = 1
self._register = set() self._register = set()
self._registered_backbone = None self._registered_backbone = None
...@@ -117,4 +119,8 @@ class BaseReader(object): ...@@ -117,4 +119,8 @@ class BaseReader(object):
"""数据集中的样本数量,即每个epoch中iterator所生成的样本数。注意,使用滑动窗口等可能导致数据集样本数发生变化的策略时,该接口应返回runtime阶段的实际样本数。""" """数据集中的样本数量,即每个epoch中iterator所生成的样本数。注意,使用滑动窗口等可能导致数据集样本数发生变化的策略时,该接口应返回runtime阶段的实际样本数。"""
raise NotImplementedError() raise NotImplementedError()
@property
def num_epochs(self):
""""""
raise NotImplementedError()
...@@ -32,7 +32,7 @@ class ClassifyReader(BaseReader): ...@@ -32,7 +32,7 @@ class ClassifyReader(BaseReader):
BaseReader.__init__(self, phase) BaseReader.__init__(self, phase)
assert lang.lower() in ['en', 'cn', 'english', 'chinese'], "supported language: en (English), cn (Chinese)." assert lang.lower() in ['en', 'cn', 'english', 'chinese'], "supported language: en (English), cn (Chinese)."
assert phase in ['train', 'pred'], "supported phase: train, pred." assert phase in ['train', 'predict'], "supported phase: train, predict."
for_cn = lang.lower() == 'cn' or lang.lower() == 'chinese' for_cn = lang.lower() == 'cn' or lang.lower() == 'chinese'
...@@ -66,10 +66,13 @@ class ClassifyReader(BaseReader): ...@@ -66,10 +66,13 @@ class ClassifyReader(BaseReader):
return self._get_registed_attrs(attrs) return self._get_registed_attrs(attrs)
def _load_data(self, input_file, batch_size, num_epochs=None, \ def load_data(self, input_file, batch_size, num_epochs=None, \
file_format='csv', shuffle_train=True): file_format='csv', shuffle_train=True):
self._data_generator = self._reader.data_generator(input_file, batch_size, \ self._batch_size = batch_size
num_epochs, shuffle=shuffle_train if self._phase == 'train' else False, \ self._num_epochs = num_epochs
self._data_generator = self._reader.data_generator( \
input_file, batch_size, num_epochs if phase == 'train' else 1, \
shuffle=shuffle_train if self._phase == 'train' else False, \
phase=self._phase) phase=self._phase)
def _iterator(self): def _iterator(self):
...@@ -92,4 +95,8 @@ class ClassifyReader(BaseReader): ...@@ -92,4 +95,8 @@ class ClassifyReader(BaseReader):
def num_examples(self): def num_examples(self):
return self._reader.get_num_examples(phase=self._phase) return self._reader.get_num_examples(phase=self._phase)
@property
def num_epochs(self):
return self._num_epochs
...@@ -219,7 +219,7 @@ class BaseReader(object): ...@@ -219,7 +219,7 @@ class BaseReader(object):
qid=qid) qid=qid)
return record return record
def _prepare_batch_data(self, examples, batch_size, phase=None): def _prepare_batch_data(self, examples, batch_size, phase='train'):
"""generate batch records""" """generate batch records"""
batch_records, max_len = [], 0 batch_records, max_len = [], 0
if len(examples) < batch_size: if len(examples) < batch_size:
...@@ -243,13 +243,11 @@ class BaseReader(object): ...@@ -243,13 +243,11 @@ class BaseReader(object):
if phase == 'pred' and batch_records: if phase == 'pred' and batch_records:
yield self._pad_batch_records(batch_records) yield self._pad_batch_records(batch_records)
def get_num_examples(self, input_file=None, phase=None): def get_num_examples(self, input_file=None, phase='train'):
if self.examples is not None: if input_file is None:
if phase is None: return len(self.examples.get(phase, []))
phase = 'all'
return len(self.examples[phase])
else: else:
assert input_file is not None, "Argument input_file should be given or the data_generator should be created when this func is called." # assert input_file is not None, "Argument input_file should be given or the data_generator should be created when this func is called."
examples = self._read_tsv(input_file) examples = self._read_tsv(input_file)
return len(examples) return len(examples)
......
...@@ -29,15 +29,13 @@ DEBUG=False ...@@ -29,15 +29,13 @@ DEBUG=False
class Trainer(object): class Trainer(object):
def __init__(self, name, reader, task_head, \ def __init__(self, name, mix_ratio=1.0, reuse_head_with=None, \
mix_ratio=1.0, reuse_head_with=None, \
silent=False): silent=False):
self._name = name self._name = name
self._verbose = not silent self._verbose = not silent
self._reader = reader
self._pred_reader = None self._pred_reader = None
self._task_head = task_head self._task_head = None
self._pred_head = None self._pred_head = None
self._train_init = False self._train_init = False
...@@ -66,15 +64,12 @@ class Trainer(object): ...@@ -66,15 +64,12 @@ class Trainer(object):
self._expected_train_steps = None self._expected_train_steps = None
self._expected_train_epochs = None self._expected_train_epochs = None
self._steps_pur_epoch = None self._steps_pur_epoch = None
self._pred_steps_pur_epoch = None
self._cur_train_epoch = 0 self._cur_train_epoch = 0
self._cur_train_step = 0 self._cur_train_step = 0
self._train_finish = False self._train_finish = False
# 存放不同运行阶段(train,eval,pred)的数据集reader,key为phase,value为Reader实例
# self._reader = {'train': reader, 'eval': None, 'pred': self._pred_reader}
# self._input_layer = None
self._inputname_to_varname = {} self._inputname_to_varname = {}
# self._task_layer = {'train': task_head, 'eval': None, 'pred': pred_head}
self._pred_input_name_list = [] self._pred_input_name_list = []
self._pred_input_varname_list = [] self._pred_input_varname_list = []
self._pred_fetch_name_list = [] self._pred_fetch_name_list = []
...@@ -92,7 +87,7 @@ class Trainer(object): ...@@ -92,7 +87,7 @@ class Trainer(object):
self._lock = False self._lock = False
self._build_forward = False self._build_forward = False
def build_predict_head(self, pred_head, pred_backbone, pred_prog=None, pred_init_prog=None): def build_predict_forward(self, pred_backbone, pred_head, pred_prog=None, pred_init_prog=None):
self._pred_head = pred_head self._pred_head = pred_head
# self._pred_reader = self._reader.clone(phase='pred') # self._pred_reader = self._reader.clone(phase='pred')
pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name) pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name)
...@@ -101,8 +96,10 @@ class Trainer(object): ...@@ -101,8 +96,10 @@ class Trainer(object):
# _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred') # _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred') # _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone') # _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
pred_input_names, pred_shape_and_dtypes, _ = reader_helper.merge_input_attrs(pred_backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False) pred_input_names, pred_shape_and_dtypes, pred_name_to_position = reader_helper.merge_input_attrs(pred_backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)] pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)]
self._pred_shape_and_dtypes = pred_shape_and_dtypes
self._pred_name_to_position = pred_name_to_position
if pred_prog is None: if pred_prog is None:
pred_prog = fluid.Program() pred_prog = fluid.Program()
...@@ -114,6 +111,7 @@ class Trainer(object): ...@@ -114,6 +111,7 @@ class Trainer(object):
pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs) pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs)
# pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_') # pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
pred_bb_output_vars = pred_backbone.build(pred_net_inputs) pred_bb_output_vars = pred_backbone.build(pred_net_inputs)
self._pred_net_inputs = pred_net_inputs
# prepare predict vars for saving inference model # prepare predict vars for saving inference model
with fluid.program_guard(pred_prog, pred_init_prog): with fluid.program_guard(pred_prog, pred_init_prog):
...@@ -128,16 +126,16 @@ class Trainer(object): ...@@ -128,16 +126,16 @@ class Trainer(object):
output_vars = self._build_head(pred_task_inputs, phase='pred', scope=scope) output_vars = self._build_head(pred_task_inputs, phase='pred', scope=scope)
if output_vars is not None: if output_vars is not None:
self._pred_fetch_name_list, self._pred_fetch_var_list = zip(*output_vars.items()) self._pred_fetch_name_list, self._pred_fetch_list = zip(*output_vars.items())
else: else:
self._pred_fetch_name_list = [] self._pred_fetch_name_list = []
self._pred_fetch_var_list = [] self._pred_fetch_var_list = []
self._distribute_pred_prog = fluid.CompiledProgram(self._pred_prog).with_data_parallel()
return output_vars return output_vars
def build_forward(self, backbone, pred_backbone=None, train_prog=None, train_init_prog=None, pred_prog=None, pred_init_prog=None): def build_forward(self, backbone, task_head, train_prog=None, train_init_prog=None, pred_prog=None, pred_init_prog=None):
self._task_head = task_head
# assert self._backbone is not None, "backbone is required for Trainer to build net forward to run with single task mode" # assert self._backbone is not None, "backbone is required for Trainer to build net forward to run with single task mode"
self._build_forward = True self._build_forward = True
...@@ -220,9 +218,9 @@ class Trainer(object): ...@@ -220,9 +218,9 @@ class Trainer(object):
with fluid.program_guard(train_prog, train_init_prog): with fluid.program_guard(train_prog, train_init_prog):
loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss']) loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss'])
for _id, block in enumerate(self._train_prog.blocks): # for _id, block in enumerate(self._train_prog.blocks):
for var in block.vars: # for var in block.vars:
print("[debug] : %d, %s" % (_id, var)) # print("[debug] : %d, %s" % (_id, var))
self._loss_var = loss_var self._loss_var = loss_var
return loss_var return loss_var
...@@ -272,43 +270,69 @@ class Trainer(object): ...@@ -272,43 +270,69 @@ class Trainer(object):
# print(self._train_prog) # print(self._train_prog)
def load_data(self, input_file, file_format, batch_size, num_epochs=None, shuffle_train=True): def fit_reader(self, reader, phase='train'):
# load data # load data
print("preparing data...", end='') assert self._train_init_prog is not None or self._pred_init_prog is not None, "You need to build_forward or build_predict_head first to prepare input features."
self._reader._load_data(input_file=input_file, batch_size=batch_size, \
num_epochs=num_epochs, file_format=file_format, \
shuffle_train=shuffle_train)
self._num_examples = self._reader.num_examples
# 这里不确定是否要向上取整,需确认 # 这里不确定是否要向上取整,需确认
# tail = self._num_examples % batch_size > 0 # tail = self._num_examples % batch_size > 0
# self._steps_pur_epoch = self._num_examples // batch_size + 1 if tail else 0 # self._steps_pur_epoch = self._num_examples // batch_size + 1 if tail else 0
self._steps_pur_epoch = self._num_examples // batch_size batch_size = reader._batch_size
self._num_epochs = reader.num_epochs
if phase == 'train':
self._steps_pur_epoch = reader.num_examples // batch_size
shape_and_dtypes = self._shape_and_dtypes
name_to_position = self._name_to_position
net_inputs = self._net_inputs
self._train_batch_size = batch_size
self._num_examples = reader.num_examples
elif phase == 'predict':
tail = self._num_examples % batch_size > 0
self._pred_steps_pur_epoch = reader.num_examples // batch_size + 1 if tail else 0
shape_and_dtypes = self._pred_shape_and_dtypes
name_to_position = self._pred_name_to_position
net_inputs = self._pred_net_inputs
self._predict_batch_size = batch_size
self._pred_num_examples = reader.num_examples
else:
raise NotImplementedError()
print('ok!') print('ok!')
# merge dataset iterators and create net input vars # merge dataset iterators and create net input vars
iterator = self._reader._iterator() iterator = reader._iterator()
prefix = self.name prefix = self.name
# 对yield出的数据进行runtime检查和适配 # 对yield出的数据进行runtime检查和适配
iterator_fn = reader_helper.create_iterator_fn(iterator, prefix, self._shape_and_dtypes, self._name_to_position, return_type='dict') iterator_fn = reader_helper.create_iterator_fn(iterator, prefix, shape_and_dtypes, name_to_position, return_type='dict')
feed_batch_process_fn = reader_helper.create_feed_batch_process_fn(self._net_inputs) self._raw_iterator_fn = iterator_fn
self._feed_batch_process_fn = feed_batch_process_fn feed_batch_process_fn = reader_helper.create_feed_batch_process_fn(net_inputs)
if gpu_dev_count > 1: if gpu_dev_count > 1:
distribute_feeder_fn = data_feeder(iterator_fn, feed_batch_process_fn) distribute_feeder_fn = data_feeder(iterator_fn, feed_batch_process_fn)
else: else:
distribute_feeder_fn = iterator_fn distribute_feeder_fn = iterator_fn
return distribute_feeder_fn()
if phase == 'train':
self._train_reader = distribute_feeder_fn()
self._feed_batch_process_fn = feed_batch_process_fn
elif phase == 'predict':
self._predict_reader = distribute_feeder_fn()
self._pred_feed_batch_process_fn = feed_batch_process_fn
# return distribute_feeder_fn()
def _init_exe_prog(self, for_train=True): def _init_exe_prog(self, for_train=True):
assert self._train_init_prog is not None, "train graph not foung! You should build_forward first before you random init parameters." if not self._train_init and not self._predict_init:
self._train_init = True on_gpu = gpu_dev_count > 0
self._distribute_train_prog = fluid.CompiledProgram(self._train_prog).with_data_parallel(loss_name=self._loss_var.name) self._exe = helper.build_executor(on_gpu)
on_gpu = gpu_dev_count > 0
self._exe = helper.build_executor(on_gpu) if for_train:
if not for_train: assert self._train_prog is not None, "train graph not foung! You should build_forward first before you random init parameters."
raise NotImplementedError() self._train_init = True
else:
assert self._pred_prog is not None, "predict graph not foung! You should build_predict_head first before you random init parameters."
self._predict_init = True
def random_init_params(self): def random_init_params(self):
if not self._train_init: if not self._train_init:
self._init_exe_prog() self._init_exe_prog()
...@@ -319,9 +343,9 @@ class Trainer(object): ...@@ -319,9 +343,9 @@ class Trainer(object):
# load pretrain model (or ckpt) # load pretrain model (or ckpt)
# assert self._exe is not None, "You need to random_init_params before load checkpoints." # assert self._exe is not None, "You need to random_init_params before load checkpoints."
if phase == 'train' and not self._train_init: if phase == 'train' and not self._train_init:
self._init_exe_prog() self._init_exe_prog(for_train=True)
if phase == 'predict' and not self._predict_init: if phase == 'predict' and not self._predict_init:
pass self._init_exe_prog(for_train=False)
if phase == 'train': if phase == 'train':
assert self._train_init_prog is not None, "train graph not found! You should build_forward first before load checkpoint." assert self._train_init_prog is not None, "train graph not found! You should build_forward first before load checkpoint."
...@@ -344,23 +368,23 @@ class Trainer(object): ...@@ -344,23 +368,23 @@ class Trainer(object):
def load_predict_model(self, model_path): def load_predict_model(self, model_path):
raise NotImplementedError() raise NotImplementedError()
def load_pretrain(self, model_path): def load_pretrain(self, model_path, convert=False):
# load pretrain model (or ckpt) # load pretrain model (or ckpt)
assert self._exe is not None, "You need to random_init_params before load pretrain models." assert self._exe is not None, "You need to random_init_params before load pretrain models."
saver.init_pretraining_params( saver.init_pretraining_params(
self._exe, self._exe,
model_path, model_path,
convert=convert,
main_program=self._train_init_prog) main_program=self._train_init_prog)
def set_predict_head(self): def train(self, save_path=None, save_steps=None, save_type='ckpt', print_steps=5):
pass
def train(self, iterator, save_path=None, save_steps=None, save_type='ckpt', print_steps=5):
""" """
Argument: Argument:
save_type: ckpt, predict, pretrain save_type: ckpt, predict, pretrain
""" """
iterator = self._train_reader
self._distribute_train_prog = fluid.CompiledProgram(self._train_prog).with_data_parallel(loss_name=self._loss_var.name)
save_type = save_type.split(',') save_type = save_type.split(',')
if 'predict' in save_type: if 'predict' in save_type:
...@@ -412,15 +436,13 @@ class Trainer(object): ...@@ -412,15 +436,13 @@ class Trainer(object):
# rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)} # rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)}
task_rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')} task_rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')}
self._task_head.postprocess(task_rt_outputs) self._task_head.batch_postprocess(task_rt_outputs)
# rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)} # rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)}
task_rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')} task_rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')}
self._task_head.postprocess(task_rt_outputs) self._task_head.batch_postprocess(task_rt_outputs)
self._cur_train_step += 1
self._cur_train_epoch = (self._cur_train_step-1) // self._steps_pur_epoch
# if self._save_predict_model and self._cur_train_step % save_steps == 0: # if self._save_predict_model and self._cur_train_step % save_steps == 0:
# self.save(save_path, suffix='.step'+str(self._cur_train_steps)) # self.save(save_path, suffix='.step'+str(self._cur_train_steps))
...@@ -448,6 +470,8 @@ class Trainer(object): ...@@ -448,6 +470,8 @@ class Trainer(object):
fluid.io.save_persistables(self._exe, os.path.join(save_path, 'ckpt.step'+str(self._cur_train_step)), self._train_prog) fluid.io.save_persistables(self._exe, os.path.join(save_path, 'ckpt.step'+str(self._cur_train_step)), self._train_prog)
print('checkpoint has been saved at '+os.path.join(save_path, 'ckpt.step'+str(self._cur_train_step))) print('checkpoint has been saved at '+os.path.join(save_path, 'ckpt.step'+str(self._cur_train_step)))
if self._num_epochs is None and self._cur_train_step == self._steps_pur_epoch:
break
# save_path = os.path.join(main_conf['save_path'], 'ckpt', # save_path = os.path.join(main_conf['save_path'], 'ckpt',
# "step_" + str(global_step)) # "step_" + str(global_step))
# fluid.io.save_persistables(self.exe, save_path, saver_program) # fluid.io.save_persistables(self.exe, save_path, saver_program)
...@@ -455,32 +479,83 @@ class Trainer(object): ...@@ -455,32 +479,83 @@ class Trainer(object):
# print("ALL tasks train finished, exiting...") # print("ALL tasks train finished, exiting...")
def get_one_batch(self, phase='train'):
if phase == 'train':
return next(self._train_reader)
elif phase == 'predict':
return next(self._predict_reader)
else:
raise NotImplementedError()
def predict(self, output_dir=None, print_steps=1000):
"""
Argument:
save_type: ckpt, predict, pretrain
"""
iterator = self._predict_reader
self._distribute_pred_prog = fluid.CompiledProgram(self._pred_prog).with_data_parallel()
if output_dir is not None and not os.path.exists(output_dir):
os.makedirs(output_dir)
time_begin = time.time()
cur_predict_step = 0
for feed in iterator:
rt_outputs = self.predict_one_batch(feed)
# rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')}
# print(rt_outputs)
self._pred_head.batch_postprocess(rt_outputs)
cur_predict_step += 1
if print_steps > 0 and cur_predict_step % print_steps == 0:
time_end = time.time()
time_cost = time_end - time_begin
print("batch {}/{}, speed: {:.2f} steps/s".format(
cur_predict_step, self._pred_steps_pur_epoch,
print_steps / time_cost))
time_begin = time.time()
if self._pred_head.epoch_inputs_attrs:
reader_outputs = self._pred_reader.get_epoch_outputs()
else:
reader_outputs = None
results = self._pred_head.epoch_postprocess({'reader':reader_outputs}, output_dir=output_dir)
return results
def train_one_step(self, batch): def train_one_step(self, batch):
if gpu_dev_count > 1: if gpu_dev_count > 1:
feed, mask = batch feed, mask = batch
rt_outputs = self.exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._fetch_list) rt_outputs = self.exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._fetch_list)
while mask.pop() == False: num_fakes = decode_fake(len(rt_outputs[0]), mask, self._batch_size)
rt_outputs.pop() for _ in range(num_fakes):
for item in rt_outputs:
item.pop()
else: else:
feed = self._feed_batch_process_fn(batch) feed = self._feed_batch_process_fn(batch)
rt_outputs = self._exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._fetch_list) rt_outputs = self._exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._fetch_list)
rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)} rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)}
self._cur_train_step += 1
self._cur_train_epoch = (self._cur_train_step-1) // self._steps_pur_epoch
return rt_outputs return rt_outputs
def predict_one_batch(self, batch): def predict_one_batch(self, batch):
if gpu_dev_count > 1: if gpu_dev_count > 1:
feed, mask = batch feed, mask = batch
rt_outputs = self.exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._fetch_list) rt_outputs = self.exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list)
while mask.pop() == False: num_fakes = decode_fake(len(rt_outputs[0]), mask, self._batch_size)
rt_outputs.pop() for _ in range(num_fakes):
for item in rt_outputs:
item.pop()
else: else:
feed = self._feed_batch_process_fn(batch) feed = self._pred_feed_batch_process_fn(batch)
rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._fetch_list) rt_outputs = self._exe.run(self._distribute_pred_prog, feed=feed, fetch_list=self._pred_fetch_list)
rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)} rt_outputs = {k:v for k,v in zip(self._pred_fetch_name_list, rt_outputs)}
return rt_outputs
def _build_head(self, net_inputs, phase, scope=""): def _build_head(self, net_inputs, phase, scope=""):
if phase == 'train': if phase == 'train':
...@@ -488,12 +563,6 @@ class Trainer(object): ...@@ -488,12 +563,6 @@ class Trainer(object):
if phase == 'pred': if phase == 'pred':
output_vars = self._pred_head.build(net_inputs, scope_name=scope) output_vars = self._pred_head.build(net_inputs, scope_name=scope)
return output_vars return output_vars
def _postprocess(self, rt_outputs, phase):
return self._task_layer[phase].postprocess(rt_outputs)
def _epoch_postprocess(self, epoch_inputs, phase):
return self._task_layer[phase].epoch_postprocess(epoch_inputs)
def save(self, save_path, suffix=None): def save(self, save_path, suffix=None):
# dirpath = save_path.rstrip('/').rstrip('\\') + suffix # dirpath = save_path.rstrip('/').rstrip('\\') + suffix
...@@ -536,20 +605,6 @@ class Trainer(object): ...@@ -536,20 +605,6 @@ class Trainer(object):
def num_examples(self): def num_examples(self):
return self._num_examples return self._num_examples
# @property
# def _pred_input(self):
# return zip(*[self._pred_input_name_list, self._pred_input_varname_list])
# @_pred_input.setter
# def _pred_input(self, val):
# assert isinstance(val, dict)
# self._pred_input_name_list, self._pred_input_varname_list = \
# zip(*[[k, v.name] for k,v in val.items()])
# @property
# def _pred_fetch_list(self):
# return [self._pred_fetch_name_list, self._pred_fetch_var_list]
@property @property
def mix_ratio(self): def mix_ratio(self):
if self._mix_ratio is not None: if self._mix_ratio is not None:
...@@ -563,57 +618,6 @@ class Trainer(object): ...@@ -563,57 +618,6 @@ class Trainer(object):
if self._verbose: if self._verbose:
print('{}: mix_ratio is set to {}'.format(self._name, self._mix_ratio)) print('{}: mix_ratio is set to {}'.format(self._name, self._mix_ratio))
@property
def save_infermodel_every_n_steps(self):
return self._save_infermodel_every_n_steps
@save_infermodel_every_n_steps.setter
def save_infermodel_every_n_steps(self, val):
self._save_infermodel_every_n_steps = val
@property
def expected_train_steps(self):
return self._expected_train_steps
@expected_train_steps.setter
def expected_train_steps(self, value):
self._expected_train_steps = value
self._expected_train_epochs = value / float(self._steps_pur_epoch)
@property
def expected_train_epochs(self):
return self._expected_train_epochs
@property
def cur_train_epoch(self):
return self._cur_train_epoch
@property
def cur_train_step(self):
return self._cur_train_step
# @cur_train_step.setter
# def _cur_train_step(self, value):
# self._cur_train_step = value
# if self._cur_train_step > self._steps_pur_epoch:
# self._cur_train_epoch += 1
# self._cur_train_step = 1
# if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps:
# self._train_finish = True
@steps_pur_epoch.setter
def steps_pur_epoch(self, value):
self._steps_pur_epoch = value
@property
def train_finish(self):
return self._train_finish
def tasklayer_reuse_with(self, task):
assert isinstance(task, Task)
if self._lock:
raise Exception('you can only set tasklayer reuses BEFORE Controller created.')
self._task_reuse_scope = task.name
def _set_lock(self): def _set_lock(self):
self._lock = True self._lock = True
...@@ -35,6 +35,27 @@ def create_feed_batch_process_fn(net_inputs): ...@@ -35,6 +35,27 @@ def create_feed_batch_process_fn(net_inputs):
return feed_batch_process_fn return feed_batch_process_fn
def create_multihead_feed_batch_process_fn(net_inputs):
def feed_batch_process_fn(data, id=-1):
# temps = {}
# for i in range(len(net_inputs)):
temp = {}
inputs = net_inputs[id] if id != -1 else net_inputs
for q, var in inputs.items():
if isinstance(var, str) or isinstance(var, unicode):
temp[var] = data[q]
else:
temp[var.name] = data[q]
# temps[i] = temp
return temp
return feed_batch_process_fn
def _check_and_adapt_shape_dtype(rt_val, attr, message=""): def _check_and_adapt_shape_dtype(rt_val, attr, message=""):
if not isinstance(rt_val, np.ndarray): if not isinstance(rt_val, np.ndarray):
rt_val = np.array(rt_val) rt_val = np.array(rt_val)
......
...@@ -47,7 +47,9 @@ def init_checkpoint(exe, init_checkpoint_path, main_program, skip_list = []): ...@@ -47,7 +47,9 @@ def init_checkpoint(exe, init_checkpoint_path, main_program, skip_list = []):
def init_pretraining_params(exe, def init_pretraining_params(exe,
pretraining_params_path, pretraining_params_path,
convert, convert,
main_program): main_program,
strict=False):
assert os.path.exists(pretraining_params_path assert os.path.exists(pretraining_params_path
), "[%s] cann't be found." % pretraining_params_path ), "[%s] cann't be found." % pretraining_params_path
...@@ -69,7 +71,10 @@ def init_pretraining_params(exe, ...@@ -69,7 +71,10 @@ def init_pretraining_params(exe,
if not isinstance(var, fluid.framework.Parameter): if not isinstance(var, fluid.framework.Parameter):
return False return False
if not os.path.exists(os.path.join(pretraining_params_path, var.name)): if not os.path.exists(os.path.join(pretraining_params_path, var.name)):
print('Warning: {} not found in {}.'.format(var.name, log_path)) if strict:
raise Exception('Error: {} not found in {}.'.format(var.name, log_path))
else:
print('Warning: {} not found in {}.'.format(var.name, log_path))
return os.path.exists(os.path.join(pretraining_params_path, var.name)) return os.path.exists(os.path.join(pretraining_params_path, var.name))
fluid.io.load_vars( fluid.io.load_vars(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册