未验证 提交 d0f02442 编写于 作者: X Xiaoyao Xi 提交者: GitHub

Merge pull request #18 from xixiaoyao/master

refine comments
......@@ -267,7 +267,6 @@ class Controller(object):
mrs = _parse_list(mix_ratio, astype=float)
assert len(mrs) == num_instances, "number of mix_ratios is NOT consistent with num_instances."
else:
# TODO: 增加joint training模式,让num_epochs平等的作用于每个instance
mrs = [1.0] * num_instances
for mr, inst in zip(mrs, instances):
......@@ -282,16 +281,13 @@ class Controller(object):
tags = []
mapper = {}
for inst in instances:
# 有环则tag_id + 1,否则被mapper shutdown
history = set()
history.add(inst.name)
cur_inst = inst
while True:
# 发现有环
if cur_inst.task_reuse_scope in history:
mapper[inst.name] = len(tags)
break
# 发现在mapper中
elif cur_inst.task_reuse_scope in mapper:
mapper[inst.name] = mapper[cur_inst.task_reuse_scope]
break
......@@ -300,12 +296,10 @@ class Controller(object):
history.add(cur_inst.name)
tags.append(mapper[inst.name])
# 注意,上面这段需要做单元测试
for i in range(1, num_instances):
for j in range(i):
if tags[i] == tags[j]:
# check paradigm of reused tasks
assert instances[i].Paradigm == \
instances[j].Paradigm, \
"paradigm of reuse tasks should be consistent"
......@@ -386,12 +380,9 @@ class Controller(object):
inst.config['pred_file'] = ''
pred_reader = inst.Reader(inst.config, phase='pred')
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf)
# inst.reader['pred'] = pred_reader # 这里创建的reader是个假reader,只是为了读取output_attr而已,所以不做保存
inst.task_layer['pred'] = pred_parad
# 框架有巨坑,先这样写吧
task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name)
pred_task_attrs.append(task_attr_from_reader)
# task_attr = pred_parad.inputs_attrs['reader']
_check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
......@@ -429,35 +420,21 @@ class Controller(object):
net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3)
# build backbone and task layers
# 不指定scope名字会挂,框架有坑
train_prog = fluid.default_main_program()
train_init_prog = fluid.default_startup_program()
# 别用unique_name.guard了,没用的,无法作用到param_attr里的name上
# with fluid.unique_name.guard("backbone-"):
bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_')
assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys())
# for block in train_init_prog.blocks:
# for var in block.vars:
# print(var)
# 会挂
# 这里是否有必要新建一个program?是的,被坑死了
pred_prog = fluid.Program()
pred_init_prog = fluid.Program()
with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog):
# with fluid.unique_name.guard():
pred_net_inputs = create_net_inputs(pred_input_attrs)
# 别用unique_name.guard了,没用的,无法作用到param_attr里的name上
# with fluid.unique_name.guard("backbone-"):
pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
fluid.framework.switch_main_program(train_prog)
fluid.framework.switch_startup_program(train_init_prog)
# pred_backbone = train_backbone
# pred_bb_output_vars = bb_output_vars
task_output_vars = {}
for inst in instances:
task_inputs = {'backbone': bb_output_vars}
......@@ -472,29 +449,20 @@ class Controller(object):
task_output_vars.update(output_vars)
assert len(task_output_vars) - old == len(output_vars) # for debug
# # prepare predict vars for saving inference model
# prepare predict vars for saving inference model
if inst.is_target:
# task_attr = inst.task_layer['pred'].inputs_attrs['reader']
# _input_names, _shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, task_attr, insert_taskid=False)
# pred_input_attrs = [[i, j, k] for i, (j,k) in zip(_input_names, _shape_and_dtypes)]
with fluid.program_guard(pred_prog, pred_init_prog):
# pred_net_inputs = create_net_inputs(pred_input_attrs)
# 这里同时建立了pred阶段的backbone计算图,不知道是否会造成额外的显存开销(paddle不会计算运行路径)
cur_inputs = _decode_inputs(pred_net_inputs, inst.name)
inst.pred_input = cur_inputs
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = inst.task_reuse_scope + '/'
# 注意,这里不加上fluid.unique_name.guard会挂
with fluid.unique_name.guard(scope):
inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope)
bb_fetches = {k: v.name for k,v in bb_output_vars.items()}
task_fetches = {k: v.name for k,v in task_output_vars.items()}
# fetches = bb_fetches.copy() # 注意!框架在多卡时无法fetch变长维度的tensor,这里加入bb的out后会挂
# fetches.update(task_fetches)
fetches = task_fetches
fetches['__task_id'] = net_inputs['__task_id'].name
......@@ -522,10 +490,8 @@ class Controller(object):
print('Warmup steps: '+str(warmup_steps))
else:
warmup_steps = 0
# steps_pur_epoch = num_examples // main_conf['batch_size'] // dev_count
# build optimizer
# 其实也完全可以支持每个任务用它自己的optimizer
if 'optimizer' in main_conf:
optim_mod = importlib.import_module(OPTIMIZER_DIR + '.' + main_conf['optimizer'])
optimize = getattr(optim_mod, OPTIMIZE_METHOD)
......@@ -546,8 +512,6 @@ class Controller(object):
self.fetches = fetches
self.has_init_train = True
self.has_init_pred = True
# self.max_train_steps = max_train_steps
# self.steps_pur_epoch = steps_pur_epoch
self.exe.run(fluid.default_startup_program())
print("\nRandomly initialize parameters...\n")
......@@ -563,7 +527,6 @@ class Controller(object):
insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
pred_prog = inst.load(infer_model_path)
# pred_prog = fluid.CompiledProgram(pred_prog).with_data_parallel()
if inst.reader['pred'] is None:
pred_reader = inst.Reader(inst.config, phase='pred')
inst.reader['pred'] = pred_reader
......@@ -582,7 +545,6 @@ class Controller(object):
def train(self):
# TODO: 备份各种配置文件,以便用户断点重新训练以及支持将来的预测
if not self.has_init_train:
self._init_train()
......@@ -598,9 +560,6 @@ class Controller(object):
saver_program = self.saver_program
fetches = self.fetches
# max_train_steps = self.max_train_steps
# steps_pur_epoch = self.steps_pur_epoch
finish = []
for inst in instances:
if inst.is_target:
......@@ -614,12 +573,6 @@ class Controller(object):
return True
# do training
# loss_fetches = {inst.name+'/loss': inst.task_layer['train'].loss for inst in instances}
# old = len(fetches) # for debug
# fetches.update(loss_fetches)
# assert len(fetches) == old + len(loss_fetches) # for debug and avoid user-caused bug
# assert 'task_id' not in fetches # for debug and avoid user-caused bug
# fetches['task_id'] = task_id_var
fetch_names, fetch_list = zip(*fetches.items())
main_step = 0 # only count for main task
......@@ -631,8 +584,6 @@ class Controller(object):
rt_outputs = self.exe.run(train_program, fetch_list=fetch_list)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist()
# 注意注释掉这一行之后,训练日志实际是错误的
# assert (not isinstance(rt_task_id, list)) or len(set(rt_task_id)) == 1, rt_task_id
rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id
cur_task = instances[rt_task_id]
......@@ -643,7 +594,6 @@ class Controller(object):
instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs)
global_step += 1
# if cur_task.is_target:
cur_task.cur_train_step += 1
if global_step % main_conf.get('print_every_n_steps', 5) == 0:
......@@ -667,9 +617,6 @@ class Controller(object):
"step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program)
# save_path = os.path.join(main_conf['save_path'],
# "step_" + str(global_step) + "_final")
# fluid.io.save_persistables(self.exe, save_path, saver_program)
print("ALL tasks train finished, exiting...")
def pred(self, task_instance, inference_model_dir=None):
......@@ -697,7 +644,6 @@ class Controller(object):
inst.reader['pred'].load_data()
fetch_names, fetch_vars = inst.pred_fetch_list
# iterator = create_iterator_fn(inst.reader['pred'].iterator, inst.name, pred_joint_shape_and_dtypes, name_to_position)
mapper = {k:v for k,v in inst.pred_input}
buf = []
for feed in inst.reader['pred'].iterator():
......
......@@ -26,7 +26,7 @@ class TaskInstance(object):
self._config = config
self._verbose = verbose
self._save_infermodel_path = os.path.join(self._config['save_path'], 'infer_model')
self._save_infermodel_path = os.path.join(self._config['save_path'], self._name, 'infer_model')
self._save_ckpt_path = os.path.join(self._config['save_path'], 'ckpt')
# following flags can be fetch from instance config file
......@@ -87,13 +87,7 @@ class TaskInstance(object):
dirpath = self._save_infermodel_path + suffix
self._pred_input_varname_list = [str(i) for i in self._pred_input_varname_list]
# del self._pred_input_varname_list[0]
# del self._pred_input_varname_list[0]
# del self._pred_input_varname_list[0]
# print(self._pred_input_varname_list)
fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, export_for_deployment = True)
# fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, params_filename='__params__')
conf = {}
for k, strv in self._save_protocol.items():
......@@ -111,8 +105,6 @@ class TaskInstance(object):
exec('{}=v'.format(strv))
pred_prog, self._pred_input_varname_list, self._pred_fetch_var_list = \
fluid.io.load_inference_model(infer_model_path, self._exe)
# pred_prog, self._pred_input_varname_list, self._pred_fetch_var_list = \
# fluid.io.load_inference_model(infer_model_path, self._exe, params_filename='__params__')
print(self._name+': inference model loaded from ' + infer_model_path)
return pred_prog
......@@ -159,7 +151,6 @@ class TaskInstance(object):
assert isinstance(val, dict)
self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in val.items()])
# print(self._pred_input_name_list)
@property
def pred_fetch_list(self):
......@@ -243,7 +234,6 @@ class TaskInstance(object):
self._cur_train_step = 1
if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps:
self._train_finish = True
# fluid.io.save_inference_model(self._save_infermodel_path, )
@property
def steps_pur_epoch(self):
......
......@@ -54,7 +54,6 @@ class TaskParadigm(task_paradigm):
mask_pos = inputs["reader"]["mask_pos"]
if self._is_training:
mask_label = inputs["reader"]["mask_label"]
# 多任务学习时才需要引入这个,防止其他run其他任务时导致seqlen过小,gather超范围
max_position = inputs["reader"]["batchsize_x_seqlen"] - 1
mask_pos = fluid.layers.elementwise_min(mask_pos, max_position)
mask_pos.stop_gradient = True
......@@ -90,14 +89,6 @@ class TaskParadigm(task_paradigm):
name=scope_name+"mask_lm_out_fc.b_0",
initializer=fluid.initializer.Constant(value=0.0))
# print fluid.default_main_program().global_block()
# fc_out = fluid.layers.matmul(
# x=mask_trans_feat,
# y=fluid.default_main_program().global_block().var(
# _word_emb_name),
# transpose_y=True)
fc_out = fluid.layers.matmul(
x=mask_trans_feat,
y=word_emb,
......
......@@ -51,7 +51,6 @@ def _zero_batch(attrs):
def _zero_batch_x(attrs, batch_size):
pos_attrs = []
for shape, dtype in attrs:
# pos_shape = [size if size and size > 0 else 5 for size in shape]
pos_shape = [size for size in shape]
if pos_shape[0] == -1:
pos_shape[0] = batch_size
......@@ -72,7 +71,6 @@ def create_net_inputs(input_attrs, async=False, iterator_fn=None, dev_count=1, n
if async:
assert iterator_fn is not None, "iterator_fn is needed for building async input layer."
# reader = fluid.io.PyReader(inputs, capacity=dev_count*n_prefetch, iterable=False)
reader = fluid.io.PyReader(inputs, capacity=dev_count, iterable=False)
reader.decorate_batch_generator(iterator_fn)
reader.start()
......@@ -117,8 +115,6 @@ def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtype
if not keep_one_task:
dev_count = 1
# build fake batch
# 注意这种方法会导致一个问题,用户将某任务的mix ratio设置成0后,并不能避免从该任务上读数据,若用户将数据集删掉则会导致崩溃;不过相比之前的zero batch方法,这种方法不必作出只能有一个size=-1的维度且第0维的-1必须是batch size的假设
results = _zero_batch(joint_shape_and_dtypes)
outbuf = {}
for id in task_ids:
......@@ -151,12 +147,9 @@ def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtype
print('----- debug joint iterator -----')
print('sampled task id: '+str(id))
task_id_tensor = np.array([[id]]).astype("int64")
# results[0] = task_id_tensor
for i in range(dev_count):
# 这两个应该是等价的
# results[0] = task_id_tensor
results[outname_to_pos['__task_id']] = task_id_tensor
assert outname_to_pos['__task_id'] == 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册