提交 d44b6381 编写于 作者: X xixiaoyao

release api

上级 82741fb6
/home/zhangyiming/yiming/v02-exe/pretrain
\ No newline at end of file
../../pretrain/
\ No newline at end of file
......@@ -5,7 +5,7 @@ import json
if __name__ == '__main__':
max_seqlen = 512
batch_size = 3
batch_size = 4
num_epochs = 2
lr = 1e-3
vocab_path = './pretrain/ernie/vocab.txt'
......@@ -13,9 +13,9 @@ if __name__ == '__main__':
train_file = './data/cls4mrqa/train.tsv'
config = json.load(open('./pretrain/ernie/ernie_config.json'))
print(config)
# ernie = palm.backbone.ERNIE(...)
ernie = palm.backbone.ERNIE.from_config(config)
pred_ernie = palm.backbone.ERNIE.from_config(config, phase='pred')
# pred_ernie = palm.backbone.ERNIE.from_config(config, phase='pred')
# cls_reader2 = palm.reader.cls(train_file_topic, vocab_path, batch_size, max_seqlen)
# cls_reader3 = palm.reader.cls(train_file_subj, vocab_path, batch_size, max_seqlen)
......@@ -23,18 +23,17 @@ if __name__ == '__main__':
# subj_trainer = palm.Trainer('subj_cls', cls_reader3, cls)
# 创建该分类任务的reader,由诸多参数控制数据集读入格式、文件数量、预处理规则等
cls_reader = palm.reader.ClassifyReader(vocab_path, batch_size, max_seqlen)
cls_reader = palm.reader.ClassifyReader(vocab_path, max_seqlen)
print(cls_reader.outputs_attr)
# 不同的backbone会对任务reader有不同的特征要求,例如对于分类任务,基本的输入feature为token_ids和label_ids,但是对于BERT,还要求从输入中额外提取position、segment、input_mask等特征,因此经过register后,reader会自动补充backbone所要求的字段
cls_reader.register_with(ernie)
print(cls_reader.outputs_attr)
# 创建任务头(task head),如分类、匹配、机器阅读理解等。每个任务头有跟该任务相关的必选/可选参数。注意,任务头与reader是解耦合的,只要任务头依赖的数据集侧的字段能被reader提供,那么就是合法的
cls_head = palm.head.Classify(4, 1024, 0.1)
cls_pred_head = palm.head.Classify(4, 1024, 0.1, phase='pred')
# cls_pred_head = palm.head.Classify(4, 1024, 0.1, phase='pred')
# 根据reader和任务头来创建一个训练器trainer,trainer代表了一个训练任务,内部维护着训练进程、和任务的关键信息,并完成合法性校验,该任务的模型保存、载入等相关规则控制
trainer = palm.Trainer('senti_cls', cls_reader, cls_head, save_predict_model=True, \
pred_head=cls_pred_head, save_path='./output')
trainer = palm.Trainer('senti_cls', cls_reader, cls_head)
# match4mrqa.reuse_head_with(mrc4mrqa)
......@@ -42,13 +41,13 @@ if __name__ == '__main__':
# output_vars = ernie.build(data_vars)
# cls_head.build({'backbone': output_vars, 'reader': data_vars})
loss_var = trainer.build_forward(ernie, pred_ernie)
loss_var = trainer.build_forward(ernie)
# controller.build_forward()
# Error! a head/backbone can be only build once! Try NOT to call build_forward method for any Trainer!
print(trainer.num_examples)
trainer.load_data(train_file, 'csv', num_epochs=2, batch_size=32)
iterator_fn = trainer.load_data(train_file, 'csv', num_epochs=num_epochs, batch_size=batch_size)
print(trainer.num_examples)
n_steps = trainer.num_examples * num_epochs // batch_size
......@@ -58,16 +57,15 @@ if __name__ == '__main__':
adam = palm.optimizer.Adam(loss_var, lr, sched)
trainer.build_backward(optimizer=adam, weight_decay=0.001, \
use_ema=True, ema_decay=0.999)
trainer.build_backward(optimizer=adam, weight_decay=0.001)
trainer.random_init_params()
trainer.load_pretrain('pretrain/ernie/params')
# trainer.train_one_step()
# print(trainer.train_one_step(next(iterator_fn())))
# trainer.train_one_epoch()
trainer.train()
trainer.save()
trainer.train(iterator_fn, print_steps=1, save_steps=5, save_path='outputs/ckpt')
# trainer.save()
......@@ -79,17 +77,17 @@ if __name__ == '__main__':
# controller = palm.Controller([mrqa, match4mrqa, mlm4mrqa])
loss = controller.build_forward(bb, mask_task=[])
# loss = controller.build_forward(bb, mask_task=[])
n_steps = controller.estimate_train_steps(basetask=mrqa, num_epochs=2, batch_size=8, dev_count=4)
adam = palm.optimizer.Adam(loss)
sched = palm.schedualer.LinearWarmup(learning_rate, max_train_steps=n_steps, warmup_steps=0.1*n_steps)
controller.build_backward(optimizer=adam, schedualer=sched, weight_decay=0.001, use_ema=True, ema_decay=0.999)
# n_steps = controller.estimate_train_steps(basetask=mrqa, num_epochs=2, batch_size=8, dev_count=4)
# adam = palm.optimizer.Adam(loss)
# sched = palm.schedualer.LinearWarmup(learning_rate, max_train_steps=n_steps, warmup_steps=0.1*n_steps)
#
# controller.build_backward(optimizer=adam, schedualer=sched, weight_decay=0.001, use_ema=True, ema_decay=0.999)
controller.random_init_params()
controller.load_pretrain('../../pretrain_model/ernie/params')
controller.train()
# controller.random_init_params()
# controller.load_pretrain('../../pretrain_model/ernie/params')
# controller.train()
......
export CUDA_VISIBLE_DEVICES=0
export CUDA_VISIBLE_DEVICES=3
python run.py
......@@ -27,7 +27,6 @@ from paddlepalm.backbone.utils.transformer import pre_process_layer, encoder
from paddlepalm.backbone.base_backbone import BaseBackbone
class ERNIE(BaseBackbone):
def __init__(self, hidden_size, num_hidden_layers, num_attention_heads, vocab_size, \
......
......@@ -19,23 +19,11 @@ from paddlepalm.head.base_head import BaseHead
import numpy as np
import os
# def classify(num_classes, input_dim, dropout_prob, pred_output_dir=None, param_initializer_range=0.02, phase='train'):
#
# config = {
# 'num_classes': num_classes,
# 'hidden_size': input_dim,
# 'dropout_prob': dropout_prob,
# 'pred_output_dir': pred_output_dir,
# 'initializer_range': param_initializer_range
# }
#
# return Task(config, phase, config)
class Classify(BaseHead):
'''
"""
classification
'''
"""
# def __init__(self, config, phase, backbone_config=None):
def __init__(self, num_classes, input_dim, dropout_prob=0.0, \
param_initializer_range=0.02, phase='train'):
......
......@@ -21,6 +21,13 @@ class ClassifyReader(BaseReader):
def __init__(self, vocab_path, max_len, tokenizer='wordpiece', \
lang='en', seed=None, do_lower_case=False, phase='train'):
"""xxxxxx.
Argument:
- vocab_path: xxxx
-
"""
BaseReader.__init__(self, phase)
......@@ -71,7 +78,7 @@ class ClassifyReader(BaseReader):
'label_ids', 'unique_ids']
for batch in self._data_generator():
outputs = {n: i for n,i in zip(names, batch)}
ret = []
ret = {}
# TODO: move runtime shape check here
for attr in self.outputs_attr.keys():
ret[attr] = outputs[attr]
......
......@@ -54,14 +54,14 @@ class BaseReader(object):
vocab_path,
label_map_config=None,
max_seq_len=512,
do_lower_case=True,
do_lower_case=False,
in_tokens=False,
is_inference=False,
random_seed=None,
tokenizer="FullTokenizer",
is_classify=True,
is_regression=False,
for_cn=True,
for_cn=False,
task_id=0):
self.max_seq_len = max_seq_len
self.tokenizer = tokenization.FullTokenizer(
......@@ -301,6 +301,70 @@ class BaseReader(object):
return f
class ClassifyReader(BaseReader):
def _read_tsv(self, input_file, quotechar=None):
"""Reads a tab separated value file."""
with open(input_file, 'r', encoding='utf8') as f:
reader = csv_reader(f)
headers = next(reader)
text_indices = [
index for index, h in enumerate(headers) if h != "label"
]
Example = namedtuple('Example', headers)
examples = []
for line in reader:
for index, text in enumerate(line):
if index in text_indices:
if self.for_cn:
line[index] = text.replace(' ', '')
else:
line[index] = text
example = Example(*line)
examples.append(example)
return examples
def _pad_batch_records(self, batch_records):
batch_token_ids = [record.token_ids for record in batch_records]
batch_text_type_ids = [record.text_type_ids for record in batch_records]
batch_position_ids = [record.position_ids for record in batch_records]
if not self.is_inference:
batch_labels = [record.label_id for record in batch_records]
if self.is_classify:
batch_labels = np.array(batch_labels).astype("int64").reshape(
[-1])
elif self.is_regression:
batch_labels = np.array(batch_labels).astype("float32").reshape(
[-1])
if batch_records[0].qid:
batch_qids = [record.qid for record in batch_records]
batch_qids = np.array(batch_qids).astype("int64").reshape(
[-1])
else:
batch_qids = np.array([]).astype("int64").reshape([-1])
# padding
padded_token_ids, input_mask = pad_batch_data(
batch_token_ids, pad_idx=self.pad_id, return_input_mask=True)
padded_text_type_ids = pad_batch_data(
batch_text_type_ids, pad_idx=self.pad_id)
padded_position_ids = pad_batch_data(
batch_position_ids, pad_idx=self.pad_id)
padded_task_ids = np.ones_like(
padded_token_ids, dtype="int64") * self.task_id
return_list = [
padded_token_ids, padded_text_type_ids, padded_position_ids,
padded_task_ids, input_mask
]
if not self.is_inference:
return_list += [batch_labels, batch_qids]
return return_list
class MaskLMReader(BaseReader):
def _convert_example_to_record(self, example, max_seq_length, tokenizer):
......@@ -447,70 +511,6 @@ class MaskLMReader(BaseReader):
return wrapper
class ClassifyReader(BaseReader):
def _read_tsv(self, input_file, quotechar=None):
"""Reads a tab separated value file."""
with open(input_file, 'r', encoding='utf8') as f:
reader = csv_reader(f)
headers = next(reader)
text_indices = [
index for index, h in enumerate(headers) if h != "label"
]
Example = namedtuple('Example', headers)
examples = []
for line in reader:
for index, text in enumerate(line):
if index in text_indices:
if self.for_cn:
line[index] = text.replace(' ', '')
else:
line[index] = text
example = Example(*line)
examples.append(example)
return examples
def _pad_batch_records(self, batch_records):
batch_token_ids = [record.token_ids for record in batch_records]
batch_text_type_ids = [record.text_type_ids for record in batch_records]
batch_position_ids = [record.position_ids for record in batch_records]
if not self.is_inference:
batch_labels = [record.label_id for record in batch_records]
if self.is_classify:
batch_labels = np.array(batch_labels).astype("int64").reshape(
[-1])
elif self.is_regression:
batch_labels = np.array(batch_labels).astype("float32").reshape(
[-1])
if batch_records[0].qid:
batch_qids = [record.qid for record in batch_records]
batch_qids = np.array(batch_qids).astype("int64").reshape(
[-1])
else:
batch_qids = np.array([]).astype("int64").reshape([-1])
# padding
padded_token_ids, input_mask = pad_batch_data(
batch_token_ids, pad_idx=self.pad_id, return_input_mask=True)
padded_text_type_ids = pad_batch_data(
batch_text_type_ids, pad_idx=self.pad_id)
padded_position_ids = pad_batch_data(
batch_position_ids, pad_idx=self.pad_id)
padded_task_ids = np.ones_like(
padded_token_ids, dtype="int64") * self.task_id
return_list = [
padded_token_ids, padded_text_type_ids, padded_position_ids,
padded_task_ids, input_mask
]
if not self.is_inference:
return_list += [batch_labels, batch_qids]
return return_list
class SequenceLabelReader(BaseReader):
def _pad_batch_records(self, batch_records):
batch_token_ids = [record.token_ids for record in batch_records]
......
# -*- coding: UTF-8 -*-
# -*- coding: utf-8 -*-
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
......@@ -17,6 +17,8 @@ from __future__ import print_function
import os
import json
from paddle import fluid
import time
import numpy as np
import paddlepalm.utils.basic_helper as helper
from paddlepalm.utils import reader_helper, saver
from paddlepalm.distribute import gpu_dev_count, data_feeder
......@@ -28,7 +30,6 @@ DEBUG=False
class Trainer(object):
def __init__(self, name, reader, task_head, \
save_predict_model=False, pred_head=None, save_path=None, save_steps=-1, \
mix_ratio=1.0, reuse_head_with=None, \
silent=False):
......@@ -39,20 +40,16 @@ class Trainer(object):
self._task_head = task_head
self._pred_head = pred_head
if save_predict_model:
self._save_predict_model = True
assert save_path is not None, "save_path is required when save_predict_model is set."
assert save_steps == -1 or save_steps > 0, "save_steps should be -1 (only save the last step of this task) or larger than 0"
assert pred_head is not None, "pred_head is required to save predict model."
self._pred_reader = reader.clone(phase='pred')
if save_path is not None and not os.path.exists(save_path):
os.makedirs(save_path)
else:
assert save_path is None, "You should set save_predict_model as True, or the save_path is invalid."
assert save_steps == -1 or save_steps == 0, "You should set save_predict_model as True, or the save_steps is invalid."
assert pred_head is None, "You should set save_predict_model as True, or the pred_head is invalid."
# if save_predict_model:
# self._save_predict_model = True
# assert pred_head is not None, "pred_head is required to save predict model."
# self._pred_reader = reader.clone(phase='pred')
# else:
# assert pred_head is None, "You should set save_predict_model as True, or the pred_head is invalid."
# self._save_predict_model = False
# self._pred_reader = None
self._save_steps = save_steps
# self._save_steps = save_steps
self._task_reuse_scope = name if reuse_head_with is None else reuse_head_with
......@@ -80,6 +77,8 @@ class Trainer(object):
self._pred_fetch_name_list = []
self._pred_fetch_var_list = []
# exe is built when random_init_params is called.
# self._exe = helper.build_executor(gpu_dev_count>0)
self._exe = None
self._save_protocol = {
......@@ -90,13 +89,44 @@ class Trainer(object):
self._lock = False
self._build_forward = False
def build_predict_head(self, pred_backbone, pred_prog=None, pred_init_prog=None):
pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name)
# pred_task_attr_from_reader = self._pred_head.inputs_attrs['reader']
def build_forward(self, backbone, pred_backbone=None):
# _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
pred_input_names, pred_shape_and_dtypes, _ = reader_helper.merge_input_attrs(backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)]
if pred_prog is None:
pred_prog = fluid.Program()
if pred_init_prog is None:
pred_init_prog = fluid.Program()
with fluid.program_guard(pred_prog, pred_init_prog):
pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs)
# pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
pred_bb_output_vars = pred_backbone.build(pred_net_inputs)
# prepare predict vars for saving inference model
with fluid.program_guard(pred_prog, pred_init_prog):
cur_inputs = helper.decode_inputs(pred_net_inputs, self.name)
# self.pred_input = cur_inputs
self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in cur_inputs.items()])
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = self.name + '.'
with fluid.unique_name.guard(scope):
self._build_head(pred_task_inputs, phase='pred', scope=scope)
def build_forward(self, backbone, pred_backbone=None, train_prog=None, train_init_prog=None, pred_prog=None, pred_init_prog=None):
# assert self._backbone is not None, "backbone is required for Trainer to build net forward to run with single task mode"
self._build_forward = True
if self._save_predict_model:
assert pred_backbone is not None, ""
# create reader, task
# then check i/o across reader, backbone and task_layer
......@@ -110,17 +140,9 @@ class Trainer(object):
# _check_io(inst.taskblock['train'].inputs_attrs['reader'], inst._reader['train'].outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train')
# _check_io(inst._taskblock['train'].inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone')
if self._save_predict_model:
pred_task_attr_from_reader = helper.encode_inputs(self._pred_head.inputs_attrs['reader'], self.name)
# pred_task_attr_from_reader = self._pred_head.inputs_attrs['reader']
# _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
# _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
# merge reader input attrs from backbone and task_instances
input_names, shape_and_dtypes, name_to_position = reader_helper.merge_input_attrs(backbone.inputs_attr, task_attr_from_reader)
pred_input_names, pred_shape_and_dtypes, _ = reader_helper.merge_input_attrs(backbone.inputs_attr, pred_task_attr_from_reader, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
input_names, shape_and_dtypes, name_to_position = reader_helper.merge_input_attrs(backbone.inputs_attr, task_attr_from_reader, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
# shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN]
self._shape_and_dtypes = shape_and_dtypes
self._name_to_position = name_to_position
......@@ -134,10 +156,11 @@ class Trainer(object):
input_attrs = [[i, j, k] for i, (j,k) in zip(input_names, shape_and_dtypes)]
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_input_names, pred_shape_and_dtypes)]
train_prog = fluid.Program()
train_init_prog = fluid.Program()
if train_prog is None:
train_prog = fluid.Program()
if train_init_prog is None:
train_init_prog = fluid.Program()
self._prog = train_prog
self._train_prog = train_prog
self._train_init_prog = train_init_prog
......@@ -150,12 +173,6 @@ class Trainer(object):
bb_output_vars = backbone.build(net_inputs)
assert sorted(bb_output_vars.keys()) == sorted(backbone.outputs_attr.keys())
pred_prog = fluid.Program()
pred_init_prog = fluid.Program()
with fluid.program_guard(pred_prog, pred_init_prog):
pred_net_inputs = reader_helper.create_net_inputs(pred_input_attrs)
# pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
pred_bb_output_vars = pred_backbone.build(pred_net_inputs)
# fluid.framework.switch_main_program(train_prog)
# fluid.framework.switch_startup_program(train_init_prog)
......@@ -174,23 +191,10 @@ class Trainer(object):
task_output_vars.update(output_vars)
assert len(task_output_vars) - old == len(output_vars) # for debug
# prepare predict vars for saving inference model
if self._save_predict_model:
with fluid.program_guard(pred_prog, pred_init_prog):
cur_inputs = helper.decode_inputs(pred_net_inputs, self.name)
# self.pred_input = cur_inputs
self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in cur_inputs.items()])
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = self.name + '.'
with fluid.unique_name.guard(scope):
self._build_head(pred_task_inputs, phase='pred', scope=scope)
bb_fetches = {k: v.name for k,v in bb_output_vars.items()}
task_fetches = {k: v.name for k,v in task_output_vars.items()}
self._fetches = task_fetches
self._fetch_names, self._fetch_list = zip(*self._fetches.items())
# fetches = task_fetches
# fetches['__task_id'] = net_inputs['__task_id'].name
......@@ -201,6 +205,8 @@ class Trainer(object):
# loss = layers.reduce_sum(task_id_vec * losses)
with fluid.program_guard(train_prog, train_init_prog):
loss_var = fluid.layers.reduce_sum(task_output_vars[self.name+'.loss'])
self._distribute_train_prog = fluid.CompiledProgram(self._train_prog).with_data_parallel(loss_name=loss_var.name)
return loss_var
def build_backward(self, optimizer, weight_decay=None, use_ema=False, ema_decay=0.9999):
......@@ -248,6 +254,10 @@ class Trainer(object):
num_epochs=num_epochs, file_format=file_format, \
shuffle_train=shuffle_train)
self._num_examples = self._reader.num_examples
# 这里不确定是否要向上取整,需确认
# tail = self._num_examples % batch_size > 0
# self._steps_pur_epoch = self._num_examples // batch_size + 1 if tail else 0
self._steps_pur_epoch = self._num_examples // batch_size
print('ok!')
# merge dataset iterators and create net input vars
......@@ -255,12 +265,20 @@ class Trainer(object):
prefix = self.name
# 对yield出的数据进行runtime检查和适配
iterator_fn = reader_helper.create_iterator_fn(iterator, prefix, self._shape_and_dtypes, self._name_to_position)
return iterator_fn
iterator_fn = reader_helper.create_iterator_fn(iterator, prefix, self._shape_and_dtypes, self._name_to_position, return_type='dict')
feed_batch_process_fn = reader_helper.create_feed_batch_process_fn(self._net_inputs)
self._feed_batch_process_fn = feed_batch_process_fn
if gpu_dev_count > 1:
distribute_feeder_fn = data_feeder(iterator_fn, feed_batch_process_fn)
else:
distribute_feeder_fn = iterator_fn
return distribute_feeder_fn()
def random_init_params(self):
on_gpu = gpu_dev_count > 0
self._exe = helper.build_executor(on_gpu)
print('random init params...')
self._exe.run(self._train_init_prog)
def load_pretrain(self, model_path):
# load pretrain model (or ckpt)
......@@ -271,16 +289,112 @@ class Trainer(object):
model_path,
main_program=self._train_init_prog)
def train(self, iterator, print_steps=5):
feed_batch_process_fn = reader_helper.create_feed_batch_process_fn(self._net_inputs)
distribute_feeder = data_feeder(iterator, feed_batch_process_fn)
def set_predict_head(self):
pass
fetch_names, fetch_list = zip(*self._fetches.items())
def train(self, iterator, save_path=None, save_steps=None, save_type='ckpt', print_steps=5):
for feed, mask in distribute_feeder:
rt_outputs = self.exe.run(self._train_prog, feed=feed, fetch_list=fetch_list)
save_type = save_type.split(',')
if 'predict' in save_type:
assert self._pred_head is not None, "Predict head not found! You should call set_predict_head first if you want to save predict model."
assert save_path is not None and save_steps is not None, 'save_path and save_steps is required to save model.'
save_predict = True
if not os.path.exists(save_path):
os.makedirs(save_path)
else:
save_predict = False
if 'ckpt' in save_type:
if save_path is not None and save_steps is not None:
save_ckpt = True
if not os.path.exists(save_path):
os.makedirs(save_path)
else:
"WARNING: save_path or save_steps is not set, model will not be saved during training."
save_ckpt = False
else:
save_ckpt = False
# if save_path is not None or save_steps is not None:
# assert self._save_predict_model, "If you want to save model, you need set save_predict_model=True when this trainer is built."
# if self._save_predict_model:
# if save_path is None and save_steps is None:
# print('Warning: model will not be saved for this run. If you want to save model, set save_path and save_steps.')
# else:
# assert save_path is not None, "argument save_path is required to save models."
# assert save_steps == -1 or save_steps > 0, "argument save_steps should be -1 (only save the last step of this task) or larger than 0"
# if save_path is not None and not os.path.exists(save_path):
# os.makedirs(save_path)
# else:
# assert save_path is None, "You should set save_predict_model as True, or the argument save_path is invalid."
# assert save_steps is None, "You should set save_predict_model as True, or the argument save_steps is invalid."
time_begin = time.time()
for feed in iterator:
rt_outputs = self.train_one_step(feed)
# if gpu_dev_count > 1:
# feed, mask = feed
# rt_outputs = self.exe.run(self._train_prog, feed=feed, fetch_list=self._fetch_list)
# print(rt_outputs)
# print(len(rt_outputs))
# if gpu_dev_count > 1:
# while mask.pop() == False:
# rt_outputs.pop()
# rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)}
task_rt_outputs = {k[len(self.name+'.'):]: v for k,v in rt_outputs.items() if k.startswith(self.name+'.')}
self._task_head.postprocess(task_rt_outputs)
self._cur_train_step += 1
self._cur_train_epoch = (self._cur_train_step-1) // self._steps_pur_epoch
# if self._save_predict_model and self._cur_train_step % save_steps == 0:
# self.save(save_path, suffix='.step'+str(self._cur_train_steps))
if print_steps > 0 and self._cur_train_step % print_steps == 0:
loss = rt_outputs[self.name+'.loss']
loss = np.mean(np.squeeze(loss)).tolist()
time_end = time.time()
time_cost = time_end - time_begin
print("step {}/{} (epoch {}), loss: {:.3f}, speed: {:.2f} steps/s".format(
(self._cur_train_step-1) % self._steps_pur_epoch + 1, self._steps_pur_epoch, self._cur_train_epoch,
loss, print_steps / time_cost))
time_begin = time.time()
# if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps:
# print(cur_task.name+': train finished!')
# cur_task.save()
if (save_predict or save_ckpt) and self._cur_train_step % save_steps == 0:
if save_predict_model:
self.save(save_path, suffix='pred.step'+str(global_step))
if save_ckpt:
fluid.io.save_persistables(self.exe, os.path.join(save_path, 'ckpt.step'+str(global_step)), self._train_prog)
print('checkpoint has been saved at '+os.path.join(save_path, 'ckpt.step'+str(global_step)))
# save_path = os.path.join(main_conf['save_path'], 'ckpt',
# "step_" + str(global_step))
# fluid.io.save_persistables(self.exe, save_path, saver_program)
# print('checkpoint has been saved at '+save_path)
# print("ALL tasks train finished, exiting...")
def train_one_step(self, batch):
if gpu_dev_count > 1:
feed, mask = batch
rt_outputs = self.exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._fetch_list)
while mask.pop() == False:
rt_outputs.pop()
else:
feed = self._feed_batch_process_fn(batch)
rt_outputs = self._exe.run(self._distribute_train_prog, feed=feed, fetch_list=self._fetch_list)
rt_outputs = {k:v for k,v in zip(self._fetch_names, rt_outputs)}
return rt_outputs
def _build_head(self, net_inputs, phase, scope=""):
if phase == 'train':
......@@ -300,8 +414,12 @@ class Trainer(object):
def _epoch_postprocess(self, epoch_inputs, phase):
return self._task_layer[phase].epoch_postprocess(epoch_inputs)
def save(self, suffix=''):
dirpath = self._save_infermodel_path + suffix
def save(self, save_path, suffix=None):
# dirpath = save_path.rstrip('/').rstrip('\\') + suffix
if suffix is not None:
dirpath = os.path.join(save_path, suffix)
else:
dirpath = save_path
self._pred_input_varname_list = [str(i) for i in self._pred_input_varname_list]
prog = fluid.default_main_program().clone()
......@@ -315,7 +433,7 @@ class Trainer(object):
conf[k] = v['d']
with open(os.path.join(dirpath, '__conf__'), 'w') as writer:
writer.write(json.dumps(conf, indent=1))
print(self._name + ': inference model saved at ' + dirpath)
print(self._name + ': predict model saved at ' + dirpath)
def _load(self, infer_model_path=None):
if infer_model_path is None:
......
......@@ -70,7 +70,7 @@ def check_io(in_attr, out_attr, strict=False, in_name="left", out_name="right"):
logging.warning('{}: shape or dtype not consistent!\n{}:\n{}\n{}:\n{}'.format(name, in_name, attr, out_name, out_attr[name]))
def encode_inputs(inputs, scope_name, sep='/', cand_set=None):
def encode_inputs(inputs, scope_name, sep='.', cand_set=None):
outputs = {}
for k, v in inputs.items():
if cand_set is not None:
......@@ -83,15 +83,15 @@ def encode_inputs(inputs, scope_name, sep='/', cand_set=None):
return outputs
def decode_inputs(inputs, scope_name, sep='/', keep_unk_keys=True):
def decode_inputs(inputs, scope_name, sep='.', keep_unk_keys=True):
outputs = {}
for name, value in inputs.items():
# var for backbone are also available to tasks
if keep_unk_keys and sep not in name:
outputs[name] = value
# var for this inst
if name.startswith(scope_name+'/'):
outputs[name[len(scope_name+'/'):]] = value
if name.startswith(scope_name+'.'):
outputs[name[len(scope_name+'.'):]] = value
return outputs
......
......@@ -91,31 +91,40 @@ def create_net_inputs(input_attrs, async=False, iterator_fn=None, dev_count=1, n
return ret
def create_iterator_fn(iterator, iterator_prefix, shape_and_dtypes, outname_to_pos, verbose=0):
def create_iterator_fn(iterator, iterator_prefix, shape_and_dtypes, outname_to_pos, verbose=0, return_type='list'):
def iterator():
pos_to_outname = {j:i for i,j in outname_to_pos.items()}
def iterator_fn():
v = verbose
while True:
results = _zero_batch(shape_and_dtypes)
# results = _zero_batch(shape_and_dtypes)
results = [None] * len(outname_to_pos)
outputs = next(iterator) # dict type
prefix = iterator_prefixe
prefix = iterator_prefix
for outname, val in outputs.items():
task_outname = prefix + '.' + outname
if outname in outname_to_pos:
idx = outname_to_pos[outname]
val = _check_and_adapt_shape_dtype(val, joint_shape_and_dtypes[idx])
val = _check_and_adapt_shape_dtype(val, shape_and_dtypes[idx])
results[idx] = val
if task_outname in outname_to_pos:
idx = outname_to_pos[task_outname]
val = _check_and_adapt_shape_dtype(val, joint_shape_and_dtypes[idx])
val = _check_and_adapt_shape_dtype(val, shape_and_dtypes[idx])
results[idx] = val
if return_type == 'list':
yield results
elif return_type == 'dict':
temp = {}
for pos, i in enumerate(results):
temp[pos_to_outname[pos]] = i
yield results
yield temp
return iterator
return iterator_fn
def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtypes, mrs, outname_to_pos, dev_count=1, keep_one_task=True, verbose=0):
......@@ -135,7 +144,7 @@ def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtype
outbuf[id] = outputs
prefix = iterator_prefixes[id]
for outname, val in outputs.items():
task_outname = prefix + '/' + outname
task_outname = prefix + '.' + outname
if outname in outname_to_pos:
idx = outname_to_pos[outname]
......@@ -192,7 +201,7 @@ def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtype
for outname, val in outputs.items():
if v > 0:
print('reader generate: '+outname)
task_outname = prefix + '/' + outname
task_outname = prefix + '.' + outname
if outname in outname_to_pos:
idx = outname_to_pos[outname]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册