“94686c5751427168dbf34073fb2834bcad7d9092”上不存在“paddle/git@gitcode.net:s920243400/PaddleDetection.git”
提交 f9e37ecd 编写于 作者: X xixiaoyao

release 0.3-api

上级 062d27a7
task_instance: "mrqa, mlm4mrqa, match4mrqa"
ask_instance: "mrqa, mlm4mrqa, match4mrqa"
target_tag: 1, 0, 0
mix_ratio: 1.0, 0.5, 0.5
......
import paddlepalm as palm
if __name__ == '__main__':
controller = palm.Controller('config.yaml', task_dir='tasks')
match_reader = palm.reader.match(train_file, file_format='csv', tokenizer='wordpiece', lang='en')
mrc_reader = palm.reader.mrc(train_file, phase='train')
mlm_reader = palm.reader.mlm(train_file, phase='train')
palm.reader.
match = palm.tasktype.cls(num_classes=4)
mrc = palm.tasktype.match(learning_strategy='pairwise')
mlm = palm.tasktype.mlm()
mlm.print()
bb_flags = palm.load_json('./pretrain/ernie/ernie_config.json')
bb = palm.backbone.ernie(bb_flags['xx'], xxx)
bb.print()
match4mrqa = palm.Task('match4mrqa', match_reader, match_tt)
mrc4mrqa = palm.Task('match4mrqa', match_reader, match_tt)
# match4mrqa.reuse_with(mrc4mrqa)
controller = palm.Controller([mrqa, match4mrqa, mlm4mrqa])
loss = controller.build_forward(bb, mask_task=[])
n_steps = controller.estimate_train_steps(basetask=mrqa, num_epochs=2, batch_size=8, dev_count=4)
adam = palm.optimizer.Adam(loss)
sched = palm.schedualer.LinearWarmup(learning_rate, max_train_steps=n_steps, warmup_steps=0.1*n_steps)
controller.build_backward(optimizer=adam, schedualer=sched, weight_decay=0.001, use_ema=True, ema_decay=0.999)
controller.random_init_params()
controller.load_pretrain('../../pretrain_model/ernie/params')
controller.train()
controller = palm.Controller(config='config.yaml', task_dir='tasks', for_train=False)
controller.pred('mrqa', inference_model_dir='output_model/secondrun/mrqa/infer_model')
# controller = palm.Controller(config='config.yaml', task_dir='tasks', for_train=False)
# controller.pred('mrqa', inference_model_dir='output_model/secondrun/mrqa/infer_model')
import sys
from paddlepalm.mtl_controller import Controller
from paddlepalm.task_instance import Task
sys.path.append('paddlepalm')
......@@ -182,7 +182,7 @@ def _fit_attr(conf, fit_attr, strict=False):
return conf
class Controller(object):
class ConfController(object):
def __init__(self, config, task_dir='.', for_train=True):
"""
......
# -*- coding: UTF-8 -*-
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import importlib
import multiprocessing
from paddle import fluid
from paddle.fluid import layers
import yaml
import json
import logging
import time
import numpy as np
from paddlepalm.utils.saver import init_pretraining_params, init_checkpoint
from paddlepalm.utils.config_helper import PDConfig
from paddlepalm.utils.print_helper import print_dict
from paddlepalm.utils.reader_helper import create_net_inputs, create_iterator_fn, create_joint_iterator_fn, merge_input_attrs
from paddlepalm.default_settings import *
from paddlepalm.task_instance import TaskInstance, check_instances
DEBUG=False
VERBOSE=0
def _get_basename(f):
return os.path.splitext(f)[0]
def _get_suffix(f):
return os.path.splitext(f)[-1]
def _parse_yaml(f, asdict=True, support_cmd_line=False):
assert os.path.exists(f), "file {} not found.".format(f)
if support_cmd_line:
args = PDConfig(yaml_file=f, fuse_args=True)
args.build()
return args.asdict() if asdict else args
else:
if asdict:
with open(f, "r") as fin:
yaml_config = yaml.load(fin, Loader=yaml.SafeLoader)
return yaml_config
else:
raise NotImplementedError()
def _parse_json(f, asdict=True, support_cmd_line=False):
assert os.path.exists(f), "file {} not found.".format(f)
if support_cmd_line:
args = PDConfig(json_file=f, fuse_args=support_cmd_line)
args.build()
return args.asdict() if asdict else args
else:
if asdict:
with open(f, "r") as fin:
config = json.load(fin)
return config
else:
raise NotImplementedError()
def _parse_list(string, astype=str):
assert isinstance(string, str), "{} is not a string.".format(string)
if ',' not in string:
return [astype(string)]
string = string.replace(',', ' ')
return [astype(i) for i in string.split()]
def _try_float(s):
try:
float(s)
return(float(s))
except:
return s
def _check_conf(conf, checklist=None):
assert isinstance(conf, dict), "{} is not a dict.".format(conf)
ret = {}
for k,v in conf.items():
if isinstance(v, str):
v = _try_float(v)
ret[k] = v
if checklist is not None:
for k, t in checklist:
assert k in ret, "required argument {} is NOT exist in config file.".format(k)
assert isintance(ret[k], t), "value type of argument {} should be {}".format(k, t)
return ret
# TODO: 增加None机制,允许hidden size、batch size和seqlen设置为None
def _check_io(in_attr, out_attr, strict=False, in_name="left", out_name="right"):
for name, attr in in_attr.items():
assert name in out_attr, in_name+': '+name+' not found in '+out_name
if attr != out_attr[name]:
if strict:
raise ValueError(name+': shape or dtype not consistent!')
else:
logging.warning('{}: shape or dtype not consistent!\n{}:\n{}\n{}:\n{}'.format(name, in_name, attr, out_name, out_attr[name]))
def _merge_conf(conf1, conf2, conf1_first=True, strict=False):
assert isinstance(conf1, dict), "{} is not a dict.".format(conf1)
assert isinstance(conf2, dict), "{} is not a dict.".format(conf2)
base_conf = conf2 if conf1_first else conf1
base_conf = base_conf.copy()
new_conf = conf1 if conf1_first else conf2
for k, v in new_conf.items():
if k in base_conf:
if base_conf[k] != v:
raise Warning("value of argument {} has been updated to {}.".format(k, v))
else:
if strict:
continue
base_conf[k] = v
return base_conf
def _encode_inputs(inputs, scope_name, sep='/', cand_set=None):
outputs = {}
for k, v in inputs.items():
if cand_set is not None:
if k in cand_set:
outputs[k] = v
if scope_name+sep+k in cand_set:
outputs[scope_name+sep+k] = v
else:
outputs[scope_name+sep+k] = v
return outputs
def _decode_inputs(inputs, scope_name, sep='/', keep_unk_keys=True):
outputs = {}
for name, value in inputs.items():
# var for backbone are also available to tasks
if keep_unk_keys and sep not in name:
outputs[name] = value
# var for this inst
if name.startswith(scope_name+'/'):
outputs[name[len(scope_name+'/'):]] = value
return outputs
def _init_env(use_gpu):
if use_gpu:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
else:
place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
return fluid.Executor(place), dev_count
def _fit_attr(conf, fit_attr, strict=False):
for i, attr in fit_attr.items():
if i not in conf:
if strict:
raise Exception('Argument {} is required to create a controller.'.format(i))
else:
continue
conf[i] = attr(conf[i])
return conf
class Controller(object):
def __init__(self, tasks, mix_ratios=None, task_reuse_tag=None, use_gpu=True):
"""
Args:
"""
exe, dev_count = _init_env(use_gpu=use_gpu)
self.exe = exe
self.dev_count = dev_count
# parse task instances and target tags
for id in len(tasks):
tasks[id]._set_id(id)
# parse mix ratios
if mix_ratios is not None:
if isinstance(mix_ratios, str):
mix_ratios = _parse_list(mix_ratios, astype=float)
else:
assert isinstance(mix_ratios, list)
assert len(mix_ratios) == len(tasks), "number of mix_ratios is NOT consistent with num_instances."
for mr, t in zip(mix_ratios, tasks):
t.mix_ratio = mr
# parse task layer reuse tags
instname_to_reusehost = {i:i for i in instnames}
if task_reuse_tag is not None:
if isinstance(task_reuse_tag, str):
tags = _parse_list(task_reuse_tag, astype=int)
else:
assert isinstance(task_reuse_tag, list)
assert len(task_reuse_tag) == len(tasks), "number of task_reuse_tag is NOT consistent with num_tasks."
tags = task_reuse_tag
else:
tags = []
mapper = {}
for inst in tasks:
history = set()
history.add(inst.name)
cur_inst = inst
while True:
if cur_inst.task_reuse_scope in history:
mapper[inst.name] = len(tags)
break
elif cur_inst.task_reuse_scope in mapper:
mapper[inst.name] = mapper[cur_inst.task_reuse_scope]
break
else:
cur_inst = name_to_instance[cur_inst.task_reuse_scope]
history.add(cur_inst.name)
tags.append(mapper[inst.name])
for i in range(1, len(tasks)):
for j in range(i):
if tags[i] == tags[j]:
# assert tasks[i].tasktype == \
# instances[j].tasktype, \
# "paradigm of reuse tasks should be consistent"
tasks[i]._task_reuse_scope = task[j].name
break
# self.instances = instances
# self.mrs = mrs
# self.Backbone = Backbone
# self.bb_conf = bb_conf
# self.bb_name = bb_name
# self.has_init_train = False
# self.has_init_pred = False
# if self._for_train:
# print("initialing for training...")
# self._init_train()
# self.has_init_train = True
#
def build_forward(self, backbone, mask_task=[]):
task_instances = self._tasks
Backbone = self.Backbone
bb_conf = self.bb_conf
bb_name = self.bb_name
dev_count = self.dev_count
num_instances = len(instances)
mrs = self.mrs
# set first_target/main task instance
main_inst = None
for inst in task_instances:
if inst.is_target:
main_inst = inst
inst._as_main = True
break
if save_path is not None and not os.path.exists(save_path):
os.makedirs(save_path)
# create reader, task
# then check i/o across reader, backbone and task_layer
task_attrs = []
pred_task_attrs = []
for inst in task_instances:
task_attr_from_reader = _encode_inputs(inst._taskblock['train'].inputs_attrs['reader'], inst.name)
task_attrs.append(task_attr_from_reader)
_check_io(backbone.inputs_attr, inst._reader['train'].outputs_attr, in_name=bb_name+'_backbone', out_name='reader.train')
_check_io(inst.taskblock['train'].inputs_attrs['reader'], inst._reader['train'].outputs_attr, in_name='task_paradigm.train.reader', out_name='reader.train')
_check_io(inst._taskblock['train'].inputs_attrs['backbone'], train_backbone.outputs_attr, in_name='task_paradigm.train.backbone', out_name=bb_name+'_backbone')
if inst.is_target:
if 'pred_file' not in inst.config:
inst.config['pred_file'] = ''
pred_reader = inst.Reader(inst.config, phase='pred')
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf)
inst.task_layer['pred'] = pred_parad
task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name)
pred_task_attrs.append(task_attr_from_reader)
_check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
# merge reader input attrs from backbone and task_instances
joint_input_names, joint_shape_and_dtypes, name_to_position = merge_input_attrs(train_backbone.inputs_attr, task_attrs)
pred_joint_input_names, pred_joint_shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, pred_task_attrs, insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
# shapes: [task_id, shapes_of_backbone, shapes_of_inst1, ..., shapes_of_instN]
if DEBUG:
print('----- for debug -----')
print('joint input names:')
print(joint_input_names)
print('joint input shape and dtypes:')
print(joint_shape_and_dtypes)
# load data
for inst in instances:
print(inst.name+": preparing data...", end='')
inst.reader['train'].load_data()
print('ok!')
# merge dataset iterators and create net input vars
iterators = []
prefixes = []
mrs = []
for inst in instances:
iterators.append(inst.reader['train'].iterator())
prefixes.append(inst.name)
mrs.append(inst.mix_ratio)
joint_iterator_fn = create_joint_iterator_fn(iterators, prefixes, joint_shape_and_dtypes, mrs, name_to_position, dev_count=dev_count, verbose=VERBOSE)
input_attrs = [[i, j, k] for i, (j,k) in zip(joint_input_names, joint_shape_and_dtypes)]
pred_input_attrs = [[i, j, k] for i, (j,k) in zip(pred_joint_input_names, pred_joint_shape_and_dtypes)]
net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3)
# build backbone and task layers
train_prog = fluid.default_main_program()
train_init_prog = fluid.default_startup_program()
bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_')
assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys())
pred_prog = fluid.Program()
pred_init_prog = fluid.Program()
with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog):
pred_net_inputs = create_net_inputs(pred_input_attrs)
pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
fluid.framework.switch_main_program(train_prog)
fluid.framework.switch_startup_program(train_init_prog)
task_output_vars = {}
for inst in instances:
task_inputs = {'backbone': bb_output_vars}
task_inputs_from_reader = _decode_inputs(net_inputs, inst.name)
task_inputs['reader'] = task_inputs_from_reader
scope = inst.task_reuse_scope + '/'
with fluid.unique_name.guard(scope):
output_vars = inst.build_task_layer(task_inputs, phase='train', scope=scope)
output_vars = {inst.name+'/'+key: val for key, val in output_vars.items()}
old = len(task_output_vars) # for debug
task_output_vars.update(output_vars)
assert len(task_output_vars) - old == len(output_vars) # for debug
# prepare predict vars for saving inference model
if inst.is_target:
with fluid.program_guard(pred_prog, pred_init_prog):
cur_inputs = _decode_inputs(pred_net_inputs, inst.name)
inst.pred_input = cur_inputs
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = inst.task_reuse_scope + '/'
with fluid.unique_name.guard(scope):
inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope)
bb_fetches = {k: v.name for k,v in bb_output_vars.items()}
task_fetches = {k: v.name for k,v in task_output_vars.items()}
fetches = task_fetches
fetches['__task_id'] = net_inputs['__task_id'].name
# compute loss
task_id_var = net_inputs['__task_id']
task_id_vec = layers.one_hot(task_id_var, num_instances)
losses = fluid.layers.concat([task_output_vars[inst.name+'/loss'] for inst in instances], axis=0)
loss = layers.reduce_sum(task_id_vec * losses)
def init_train(self, basetask, num_epochs, ):
main_reader = main_inst.reader['train']
num_examples = main_reader.num_examples
for inst in instances:
max_train_steps = int(main_conf['num_epochs']* inst.mix_ratio * (num_examples // main_conf['batch_size'] // dev_count))
if inst.is_target:
print('{}: expected train steps {}.'.format(inst.name, max_train_steps))
inst.steps_pur_epoch = inst.reader['train'].num_examples // main_conf['batch_size'] // dev_count
inst.expected_train_steps = max_train_steps
global_max_train_steps = int(main_conf['num_epochs'] * sum(mrs) * (num_examples // main_conf['batch_size'] // dev_count))
print('Estimated overall train steps {}.'.format(global_max_train_steps))
# if 'warmup_proportion' in main_conf and main_conf['warmup_proportion'] > 0:
# warmup_steps = int(global_max_train_steps * main_conf['warmup_proportion'])
# print('Warmup steps: '+str(warmup_steps))
# else:
# warmup_steps = 0
return loss, max_train_steps
def build_backward(self, optimizer, use_ema=False, ema_decay=0.9999):
# build optimizer
optimizer.optimize(fluid.default_main_program())
# loss.persistable = True
if use_ema:
ema = fluid.optimizer.ExponentialMovingAverage(ema_decay)
ema.update()
def random_init_params(self):
if not self._init_finish:
# prepare for train
self.train_program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(loss_name=loss.name)
self.saver_program = fluid.default_main_program()
self._init_finish = True
print("\nRandomly initialize parameters...\n")
self.exe.run(fluid.default_startup_program())
def load_pretrain_params(self, pretrain_model_path=None):
# load pretrain model (or ckpt)
if pretrain_model_path is None:
assert 'pretrain_model_path' in self.main_conf, "pretrain_model_path NOT set."
pretrain_model_path = self.main_conf['pretrain_model_path']
init_pretraining_params(
self.exe,
pretrain_model_path,
main_program=fluid.default_startup_program())
if not self._init_finish:
self.train_program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(loss_name=loss.name)
self.saver_program = fluid.default_main_program()
self._init_finish = True
def load_infermodel(self, instance, infer_model_path):
inst = instance
if 'pred_output_path' not in inst.config:
inst.config['pred_output_path'] = os.path.join(inst.config.get('save_path', '.'), inst.name)
if not os.path.exists(inst.config['pred_output_path']):
os.makedirs(inst.config['pred_output_path'])
pred_backbone = self.Backbone(self.bb_conf, phase='pred')
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=self.bb_conf)
inst.task_layer['pred'] = pred_parad
pred_joint_input_names, pred_joint_shape_and_dtypes, name_to_position = merge_input_attrs(
pred_backbone.inputs_attr, inst.task_layer['pred'].inputs_attrs['reader'],
insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
pred_prog = inst.load(infer_model_path)
if inst.reader['pred'] is None:
pred_reader = inst.Reader(inst.config, phase='pred')
inst.reader['pred'] = pred_reader
return pred_prog
def train(self, num_epochs):
if not self._init_finish:
raise Exception('params has not been initialized! Please init params with random_init_params or load_pretrain_params.')
instances = self.instances
num_instances = self.num_instances
main_inst = self.main_inst
main_conf = main_inst.config
backbone = self.train_backbone
train_program = self.train_program
saver_program = self.saver_program
fetches = self.fetches
finish = []
for inst in instances:
if inst.is_target:
if inst.expected_train_steps > 0:
finish.append(False)
else:
finish.append(True)
print(inst.name+': train finished!')
inst.save()
def train_finish():
for inst in instances:
if inst.is_target:
if not inst.train_finish:
return False
return True
# do training
fetch_names, fetch_list = zip(*fetches.items())
main_step = 0 # only count for main task
global_step = 0 # count for all tasks
epoch = 0
time_begin = time.time()
backbone_buffer = []
while not train_finish():
rt_outputs = self.exe.run(train_program, fetch_list=fetch_list)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist()
rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id
cur_task = instances[rt_task_id]
backbone_rt_outputs = {k:v for k,v in rt_outputs.items() if '/' not in k}
backbone_buffer.append(backbone.postprocess(backbone_rt_outputs))
task_rt_outputs = {k[len(cur_task.name+'/'):]: v for k,v in rt_outputs.items() if k.startswith(cur_task.name+'/')}
instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs)
global_step += 1
cur_task.cur_train_step += 1
if cur_task.save_infermodel_every_n_steps > 0 and cur_task.cur_train_step % cur_task.save_infermodel_every_n_steps == 0:
cur_task.save(suffix='.step'+str(cur_task.cur_train_step))
if global_step % main_conf.get('print_every_n_steps', 5) == 0:
loss = rt_outputs[cur_task.name+'/loss']
loss = np.mean(np.squeeze(loss)).tolist()
time_end = time.time()
time_cost = time_end - time_begin
print("Global step: {}. Task: {}, step {}/{} (epoch {}), loss: {:.3f}, speed: {:.2f} steps/s".format(
global_step, cur_task.name, cur_task.cur_train_step, cur_task.steps_pur_epoch, cur_task.cur_train_epoch,
loss, main_conf.get('print_every_n_steps', 5) / time_cost))
time_begin = time.time()
if cur_task.train_finish and cur_task.cur_train_step + cur_task.cur_train_epoch * cur_task.steps_pur_epoch == cur_task.expected_train_steps:
print(cur_task.name+': train finished!')
cur_task.save()
if 'save_every_n_steps' in main_conf and global_step % main_conf['save_every_n_steps'] == 0:
save_path = os.path.join(main_conf['save_path'],
"step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program)
print("ALL tasks train finished, exiting...")
def pred(self, task_instance, inference_model_dir=None):
if self._for_train:
raise Exception('This controller is a trainer. Please build a new controller with for_train=False for predicting.')
assert isinstance(task_instance, str)
if isinstance(inference_model_dir, str):
assert os.path.exists(inference_model_dir), inference_model_dir+" not found."
# if not self.has_init_pred and inference_model_dir is None:
# raise ValueError('infer_model_path is required for prediction.')
if inference_model_dir is None:
assert 'save_path' in self.mtl_conf, "one of the `inference_model_dir` and 'save_path' should be set to load inference model."
inference_model_dir = os.path.join(self.mtl_conf['save_path'], task_instance, 'infer_model')
instance = None
for inst in self.instances:
if inst.name == task_instance:
instance = inst
break
if instance is None:
raise ValueError(task_instance + ' is not a valid task_instance.')
pred_prog = self._init_pred(instance, inference_model_dir)
inst = instance
print(inst.name+": loading data...")
inst.reader['pred'].load_data()
fetch_names, fetch_vars = inst.pred_fetch_list
print('predicting...')
mapper = {k:v for k,v in inst.pred_input}
buf = []
for feed in inst.reader['pred'].iterator():
feed = _encode_inputs(feed, inst.name, cand_set=mapper)
feed = {mapper[k]: v for k,v in feed.items()}
rt_outputs = self.exe.run(pred_prog, feed, fetch_vars)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
inst.postprocess(rt_outputs, phase='pred')
if inst.task_layer['pred'].epoch_inputs_attrs:
reader_outputs = inst.reader['pred'].get_epoch_outputs()
else:
reader_outputs = None
inst.epoch_postprocess({'reader':reader_outputs}, phase='pred')
if __name__ == '__main__':
assert len(sys.argv) == 2, "Usage: python mtl_controller.py <mtl_conf_path>"
conf_path = sys.argv[1]
del sys.argv[1]
controller = Controller(conf_path)
if controller.main_conf['do_train']:
controller.train()
......@@ -21,6 +21,24 @@ from __future__ import print_function
import numpy as np
import paddle.fluid as fluid
class schedualer(object):
def __init__(self):
pass
def lr(self):
pass
def ConstantLearning():
def __init__(self, lr):
self._lr = lr
def lr(self):
return self._lr
def LinearWarmupLearning():
def linear_warmup_decay(learning_rate, warmup_steps, num_train_steps):
""" Applies linear warmup of learning rate from 0 and decay to 0."""
with fluid.default_main_program()._lr_schedule_guard():
......
......@@ -22,6 +22,253 @@ import importlib
from paddlepalm.default_settings import *
def Task(object):
def __init__(self, name, reader, taskblock, mix_ratio=1.0, \
pred_reader=None, pred_taskblock=None,
infermodel_save_path=None, save_infermodel_every_n_steps=-1, \
as_target_task=True, task_layer_reuse=None, silent=False):
self._name = name
self._verbose = not silent
if infermodel_save_path is None:
self._save_infermodel_path = os.path.join(self._config['save_path'], self._name, 'infer_model')
else:
self._save_infermodel_path = infermodel_save_path
self._save_infermodel_every_n_steps = save_infermodel_every_n_steps
self._is_target = as_target
self._first_target = False
self._task_reuse_scope = name if task_layer_reuse is None else task_layer_reuse
self._feeded_var_names = None
self._target_vars = None
# training process management
self._mix_ratio = mix_ratio
self._expected_train_steps = None
self._expected_train_epochs = None
self._steps_pur_epoch = None
self._cur_train_epoch = 0
self._cur_train_step = 0
self._train_finish = False
# 存放不同运行阶段(train,eval,pred)的数据集reader,key为phase,value为Reader实例
self._reader = {'train': reader, 'eval': None, 'pred': pred_reader}
self._input_layer = None
self._inputname_to_varname = {}
self._task_layer = {'train': tasklayer, 'eval': None, 'pred': pred_tasklayer}
self._pred_input_name_list = []
self._pred_input_varname_list = []
self._pred_fetch_name_list = []
self._pred_fetch_var_list = []
self._exe = fluid.Executor(fluid.CPUPlace())
self._save_protocol = {
'input_names': 'self._pred_input_name_list',
'input_varnames': 'self._pred_input_varname_list',
'fetch_list': 'self._pred_fetch_name_list'}
self._lock = False
def _build_task_layer(self, net_inputs, phase, scope=""):
output_vars = self._task_layer[phase].build(net_inputs, scope_name=scope)
if phase == 'pred':
if output_vars is not None:
self._pred_fetch_name_list, self._pred_fetch_var_list = zip(*output_vars.items())
else:
self._pred_fetch_name_list = []
self._pred_fetch_var_list = []
return output_vars
def _postprocess(self, rt_outputs, phase):
return self._task_layer[phase].postprocess(rt_outputs)
def _epoch_postprocess(self, epoch_inputs, phase):
return self._task_layer[phase].epoch_postprocess(epoch_inputs)
def save(self, suffix=''):
dirpath = self._save_infermodel_path + suffix
self._pred_input_varname_list = [str(i) for i in self._pred_input_varname_list]
prog = fluid.default_main_program().clone()
fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, prog)
conf = {}
for k, strv in self._save_protocol.items():
d = None
v = locals()
exec('d={}'.format(strv), globals(), v)
conf[k] = v['d']
with open(os.path.join(dirpath, '__conf__'), 'w') as writer:
writer.write(json.dumps(conf, indent=1))
print(self._name + ': inference model saved at ' + dirpath)
def _load(self, infer_model_path=None):
if infer_model_path is None:
infer_model_path = self._save_infermodel_path
for k,v in json.load(open(os.path.join(infer_model_path, '__conf__'))).items():
strv = self._save_protocol[k]
exec('{}=v'.format(strv))
pred_prog, self._pred_input_varname_list, self._pred_fetch_var_list = \
fluid.io.load_inference_model(infer_model_path, self._exe)
print(self._name+': inference model loaded from ' + infer_model_path)
return pred_prog
@property
def name(self):
return self._name
@property
def _Reader(self):
return self._Reader
@property
def _Paradigm(self):
return self._Paradigm
@property
def _reader(self):
return self._reader
@property
def _pred_input(self):
return zip(*[self._pred_input_name_list, self._pred_input_varname_list])
@_pred_input.setter
def _pred_input(self, val):
assert isinstance(val, dict)
self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in val.items()])
@property
def _pred_fetch_list(self):
return [self._pred_fetch_name_list, self._pred_fetch_var_list]
@property
def _task_layer(self):
return self._task_layer
@property
def _is_first_target(self):
return self._is_first_target
@_is_first_target.setter
def _is_first_target(self, value):
self._is_first_target = bool(value)
if self._is_first_target:
assert self._is_target, "ERROR: only target task could be set as main task."
if self._verbose and self._is_first_target:
print("{}: set as main task".format(self._name))
@property
def _is_target(self):
if self._is_target is not None:
return self._is_target
else:
raise ValueError("{}: is_target is None".format(self._name))
@_is_target.setter
def _is_target(self, value):
self._is_target = bool(value)
if self._verbose:
if self._is_target:
print('{}: set as target task.'.format(self._name))
else:
print('{}: set as aux task.'.format(self._name))
@property
def mix_ratio(self):
if self._mix_ratio is not None:
return self._mix_ratio
else:
raise ValueError("{}: mix_ratio is None".format(self._name))
@mix_ratio.setter
def mix_ratio(self, value):
self._mix_ratio = float(value)
if self._verbose:
print('{}: mix_ratio is set to {}'.format(self._name, self._mix_ratio))
@property
def save_infermodel_every_n_steps(self):
return self._save_infermodel_every_n_steps
@save_infermodel_every_n_steps.setter
def save_infermodel_every_n_steps(self, val):
self._save_infermodel_every_n_steps = val
@property
def expected_train_steps(self):
return self._expected_train_steps
@expected_train_steps.setter
def _expected_train_steps(self, value):
self._expected_train_steps = value
self._expected_train_epochs = value / float(self._steps_pur_epoch)
@property
def expected_train_epochs(self):
return self._expected_train_epochs
@property
def cur_train_epoch(self):
return self._cur_train_epoch
@cur_train_epoch.setter
def _cur_train_epoch(self, value):
self._cur_train_epoch = value
@property
def cur_train_step(self):
return self._cur_train_step
@cur_train_step.setter
def _cur_train_step(self, value):
self._cur_train_step = value
if self._cur_train_step > self._steps_pur_epoch:
self._cur_train_epoch += 1
self._cur_train_step = 1
if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps:
self._train_finish = True
@property
def steps_pur_epoch(self):
return self._steps_pur_epoch
@steps_pur_epoch.setter
def _steps_pur_epoch(self, value):
self._steps_pur_epoch = value
@property
def train_finish(self):
return self._train_finish
def tasklayer_reuse_with(self, task):
assert isinstance(task, Task)
if self._lock:
raise Exception('you can only set tasklayer reuses BEFORE Controller created.')
self._task_reuse_scope = task.name
def _set_lock(self):
self._lock = True
# @property
# def task_reuse_scope(self):
# if self._task_reuse_scope is not None:
# return self._task_reuse_scope
# else:
# raise ValueError("{}: task_reuse_scope is None".format(self._name))
# @task_reuse_scope.setter
# def task_reuse_scope(self, scope_name):
# self._task_reuse_scope = str(scope_name)
# if self._verbose:
# print('{}: task_reuse_scope is set to {}'.format(self._name, self._task_reuse_scope))
def check_req_args(conf, name):
assert 'reader' in conf, name+': reader is required to build TaskInstance.'
assert 'paradigm' in conf, name+': paradigm is required to build TaskInstance.'
......@@ -44,7 +291,7 @@ class TaskInstance(object):
parad_name = config['paradigm']
parad_mod = importlib.import_module(PARADIGM_DIR + '.' + parad_name)
Paradigm = getattr(parad_mod, 'TaskParadigm')
Paradigm = getattr(parad_mod, 'TaskType')
self._Reader = Reader
self._Paradigm = Paradigm
......
......@@ -16,6 +16,12 @@
from paddlepalm.interface import reader
from paddlepalm.reader.utils.reader4ernie import ClassifyReader
def match(vocab_path, max_seq_len, do_lower_case=True, phase, dev_count=1):
config={
xxx}
return Reader(config())
class Reader(reader):
def __init__(self, config, phase='train', dev_count=1, print_prefix=''):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册