提交 8d35b43c 编写于 作者: X xixiaoyao

refine comments

上级 cde7404e
...@@ -267,7 +267,6 @@ class Controller(object): ...@@ -267,7 +267,6 @@ class Controller(object):
mrs = _parse_list(mix_ratio, astype=float) mrs = _parse_list(mix_ratio, astype=float)
assert len(mrs) == num_instances, "number of mix_ratios is NOT consistent with num_instances." assert len(mrs) == num_instances, "number of mix_ratios is NOT consistent with num_instances."
else: else:
# TODO: 增加joint training模式,让num_epochs平等的作用于每个instance
mrs = [1.0] * num_instances mrs = [1.0] * num_instances
for mr, inst in zip(mrs, instances): for mr, inst in zip(mrs, instances):
...@@ -282,16 +281,13 @@ class Controller(object): ...@@ -282,16 +281,13 @@ class Controller(object):
tags = [] tags = []
mapper = {} mapper = {}
for inst in instances: for inst in instances:
# 有环则tag_id + 1,否则被mapper shutdown
history = set() history = set()
history.add(inst.name) history.add(inst.name)
cur_inst = inst cur_inst = inst
while True: while True:
# 发现有环
if cur_inst.task_reuse_scope in history: if cur_inst.task_reuse_scope in history:
mapper[inst.name] = len(tags) mapper[inst.name] = len(tags)
break break
# 发现在mapper中
elif cur_inst.task_reuse_scope in mapper: elif cur_inst.task_reuse_scope in mapper:
mapper[inst.name] = mapper[cur_inst.task_reuse_scope] mapper[inst.name] = mapper[cur_inst.task_reuse_scope]
break break
...@@ -300,12 +296,10 @@ class Controller(object): ...@@ -300,12 +296,10 @@ class Controller(object):
history.add(cur_inst.name) history.add(cur_inst.name)
tags.append(mapper[inst.name]) tags.append(mapper[inst.name])
# 注意,上面这段需要做单元测试
for i in range(1, num_instances): for i in range(1, num_instances):
for j in range(i): for j in range(i):
if tags[i] == tags[j]: if tags[i] == tags[j]:
# check paradigm of reused tasks
assert instances[i].Paradigm == \ assert instances[i].Paradigm == \
instances[j].Paradigm, \ instances[j].Paradigm, \
"paradigm of reuse tasks should be consistent" "paradigm of reuse tasks should be consistent"
...@@ -386,12 +380,9 @@ class Controller(object): ...@@ -386,12 +380,9 @@ class Controller(object):
inst.config['pred_file'] = '' inst.config['pred_file'] = ''
pred_reader = inst.Reader(inst.config, phase='pred') pred_reader = inst.Reader(inst.config, phase='pred')
pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf) pred_parad = inst.Paradigm(inst.config, phase='pred', backbone_config=bb_conf)
# inst.reader['pred'] = pred_reader # 这里创建的reader是个假reader,只是为了读取output_attr而已,所以不做保存
inst.task_layer['pred'] = pred_parad inst.task_layer['pred'] = pred_parad
# 框架有巨坑,先这样写吧
task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name) task_attr_from_reader = _encode_inputs(pred_parad.inputs_attrs['reader'], inst.name)
pred_task_attrs.append(task_attr_from_reader) pred_task_attrs.append(task_attr_from_reader)
# task_attr = pred_parad.inputs_attrs['reader']
_check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred') _check_io(pred_backbone.inputs_attr, pred_reader.outputs_attr, in_name=bb_name+'_backbone', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred') _check_io(pred_parad.inputs_attrs['reader'], pred_reader.outputs_attr, in_name='task_paradigm.pred.reader', out_name='reader.pred')
_check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone') _check_io(pred_parad.inputs_attrs['backbone'], pred_backbone.outputs_attr, in_name='task_paradigm.pred.backbone', out_name=bb_name+'_backbone')
...@@ -429,35 +420,21 @@ class Controller(object): ...@@ -429,35 +420,21 @@ class Controller(object):
net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3) net_inputs = create_net_inputs(input_attrs, async=True, iterator_fn=joint_iterator_fn, dev_count=dev_count, n_prefetch=3)
# build backbone and task layers # build backbone and task layers
# 不指定scope名字会挂,框架有坑
train_prog = fluid.default_main_program() train_prog = fluid.default_main_program()
train_init_prog = fluid.default_startup_program() train_init_prog = fluid.default_startup_program()
# 别用unique_name.guard了,没用的,无法作用到param_attr里的name上
# with fluid.unique_name.guard("backbone-"):
bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_') bb_output_vars = train_backbone.build(net_inputs, scope_name='__paddlepalm_')
assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys()) assert sorted(bb_output_vars.keys()) == sorted(train_backbone.outputs_attr.keys())
# for block in train_init_prog.blocks:
# for var in block.vars:
# print(var)
# 会挂
# 这里是否有必要新建一个program?是的,被坑死了
pred_prog = fluid.Program() pred_prog = fluid.Program()
pred_init_prog = fluid.Program() pred_init_prog = fluid.Program()
with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog): with fluid.program_guard(main_program = pred_prog, startup_program = pred_init_prog):
# with fluid.unique_name.guard():
pred_net_inputs = create_net_inputs(pred_input_attrs) pred_net_inputs = create_net_inputs(pred_input_attrs)
# 别用unique_name.guard了,没用的,无法作用到param_attr里的name上
# with fluid.unique_name.guard("backbone-"):
pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_') pred_bb_output_vars = pred_backbone.build(pred_net_inputs, scope_name='__paddlepalm_')
fluid.framework.switch_main_program(train_prog) fluid.framework.switch_main_program(train_prog)
fluid.framework.switch_startup_program(train_init_prog) fluid.framework.switch_startup_program(train_init_prog)
# pred_backbone = train_backbone
# pred_bb_output_vars = bb_output_vars
task_output_vars = {} task_output_vars = {}
for inst in instances: for inst in instances:
task_inputs = {'backbone': bb_output_vars} task_inputs = {'backbone': bb_output_vars}
...@@ -472,29 +449,20 @@ class Controller(object): ...@@ -472,29 +449,20 @@ class Controller(object):
task_output_vars.update(output_vars) task_output_vars.update(output_vars)
assert len(task_output_vars) - old == len(output_vars) # for debug assert len(task_output_vars) - old == len(output_vars) # for debug
# # prepare predict vars for saving inference model # prepare predict vars for saving inference model
if inst.is_target: if inst.is_target:
# task_attr = inst.task_layer['pred'].inputs_attrs['reader']
# _input_names, _shape_and_dtypes, _ = merge_input_attrs(pred_backbone.inputs_attr, task_attr, insert_taskid=False)
# pred_input_attrs = [[i, j, k] for i, (j,k) in zip(_input_names, _shape_and_dtypes)]
with fluid.program_guard(pred_prog, pred_init_prog): with fluid.program_guard(pred_prog, pred_init_prog):
# pred_net_inputs = create_net_inputs(pred_input_attrs)
# 这里同时建立了pred阶段的backbone计算图,不知道是否会造成额外的显存开销(paddle不会计算运行路径)
cur_inputs = _decode_inputs(pred_net_inputs, inst.name) cur_inputs = _decode_inputs(pred_net_inputs, inst.name)
inst.pred_input = cur_inputs inst.pred_input = cur_inputs
pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs} pred_task_inputs = {'backbone': pred_bb_output_vars, 'reader': cur_inputs}
scope = inst.task_reuse_scope + '/' scope = inst.task_reuse_scope + '/'
# 注意,这里不加上fluid.unique_name.guard会挂
with fluid.unique_name.guard(scope): with fluid.unique_name.guard(scope):
inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope) inst.build_task_layer(pred_task_inputs, phase='pred', scope=scope)
bb_fetches = {k: v.name for k,v in bb_output_vars.items()} bb_fetches = {k: v.name for k,v in bb_output_vars.items()}
task_fetches = {k: v.name for k,v in task_output_vars.items()} task_fetches = {k: v.name for k,v in task_output_vars.items()}
# fetches = bb_fetches.copy() # 注意!框架在多卡时无法fetch变长维度的tensor,这里加入bb的out后会挂
# fetches.update(task_fetches)
fetches = task_fetches fetches = task_fetches
fetches['__task_id'] = net_inputs['__task_id'].name fetches['__task_id'] = net_inputs['__task_id'].name
...@@ -522,10 +490,8 @@ class Controller(object): ...@@ -522,10 +490,8 @@ class Controller(object):
print('Warmup steps: '+str(warmup_steps)) print('Warmup steps: '+str(warmup_steps))
else: else:
warmup_steps = 0 warmup_steps = 0
# steps_pur_epoch = num_examples // main_conf['batch_size'] // dev_count
# build optimizer # build optimizer
# 其实也完全可以支持每个任务用它自己的optimizer
if 'optimizer' in main_conf: if 'optimizer' in main_conf:
optim_mod = importlib.import_module(OPTIMIZER_DIR + '.' + main_conf['optimizer']) optim_mod = importlib.import_module(OPTIMIZER_DIR + '.' + main_conf['optimizer'])
optimize = getattr(optim_mod, OPTIMIZE_METHOD) optimize = getattr(optim_mod, OPTIMIZE_METHOD)
...@@ -546,8 +512,6 @@ class Controller(object): ...@@ -546,8 +512,6 @@ class Controller(object):
self.fetches = fetches self.fetches = fetches
self.has_init_train = True self.has_init_train = True
self.has_init_pred = True self.has_init_pred = True
# self.max_train_steps = max_train_steps
# self.steps_pur_epoch = steps_pur_epoch
self.exe.run(fluid.default_startup_program()) self.exe.run(fluid.default_startup_program())
print("\nRandomly initialize parameters...\n") print("\nRandomly initialize parameters...\n")
...@@ -563,7 +527,6 @@ class Controller(object): ...@@ -563,7 +527,6 @@ class Controller(object):
insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False) insert_taskid=False, insert_batchsize=False, insert_seqlen=False, insert_batchsize_x_seqlen=False)
pred_prog = inst.load(infer_model_path) pred_prog = inst.load(infer_model_path)
# pred_prog = fluid.CompiledProgram(pred_prog).with_data_parallel()
if inst.reader['pred'] is None: if inst.reader['pred'] is None:
pred_reader = inst.Reader(inst.config, phase='pred') pred_reader = inst.Reader(inst.config, phase='pred')
inst.reader['pred'] = pred_reader inst.reader['pred'] = pred_reader
...@@ -582,7 +545,6 @@ class Controller(object): ...@@ -582,7 +545,6 @@ class Controller(object):
def train(self): def train(self):
# TODO: 备份各种配置文件,以便用户断点重新训练以及支持将来的预测
if not self.has_init_train: if not self.has_init_train:
self._init_train() self._init_train()
...@@ -598,9 +560,6 @@ class Controller(object): ...@@ -598,9 +560,6 @@ class Controller(object):
saver_program = self.saver_program saver_program = self.saver_program
fetches = self.fetches fetches = self.fetches
# max_train_steps = self.max_train_steps
# steps_pur_epoch = self.steps_pur_epoch
finish = [] finish = []
for inst in instances: for inst in instances:
if inst.is_target: if inst.is_target:
...@@ -614,12 +573,6 @@ class Controller(object): ...@@ -614,12 +573,6 @@ class Controller(object):
return True return True
# do training # do training
# loss_fetches = {inst.name+'/loss': inst.task_layer['train'].loss for inst in instances}
# old = len(fetches) # for debug
# fetches.update(loss_fetches)
# assert len(fetches) == old + len(loss_fetches) # for debug and avoid user-caused bug
# assert 'task_id' not in fetches # for debug and avoid user-caused bug
# fetches['task_id'] = task_id_var
fetch_names, fetch_list = zip(*fetches.items()) fetch_names, fetch_list = zip(*fetches.items())
main_step = 0 # only count for main task main_step = 0 # only count for main task
...@@ -631,8 +584,6 @@ class Controller(object): ...@@ -631,8 +584,6 @@ class Controller(object):
rt_outputs = self.exe.run(train_program, fetch_list=fetch_list) rt_outputs = self.exe.run(train_program, fetch_list=fetch_list)
rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)} rt_outputs = {k:v for k,v in zip(fetch_names, rt_outputs)}
rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist() rt_task_id = np.squeeze(rt_outputs['__task_id']).tolist()
# 注意注释掉这一行之后,训练日志实际是错误的
# assert (not isinstance(rt_task_id, list)) or len(set(rt_task_id)) == 1, rt_task_id
rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id rt_task_id = rt_task_id[0] if isinstance(rt_task_id, list) else rt_task_id
cur_task = instances[rt_task_id] cur_task = instances[rt_task_id]
...@@ -643,7 +594,6 @@ class Controller(object): ...@@ -643,7 +594,6 @@ class Controller(object):
instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs) instances[rt_task_id].task_layer['train'].postprocess(task_rt_outputs)
global_step += 1 global_step += 1
# if cur_task.is_target:
cur_task.cur_train_step += 1 cur_task.cur_train_step += 1
if global_step % main_conf.get('print_every_n_steps', 5) == 0: if global_step % main_conf.get('print_every_n_steps', 5) == 0:
...@@ -667,9 +617,6 @@ class Controller(object): ...@@ -667,9 +617,6 @@ class Controller(object):
"step_" + str(global_step)) "step_" + str(global_step))
fluid.io.save_persistables(self.exe, save_path, saver_program) fluid.io.save_persistables(self.exe, save_path, saver_program)
# save_path = os.path.join(main_conf['save_path'],
# "step_" + str(global_step) + "_final")
# fluid.io.save_persistables(self.exe, save_path, saver_program)
print("ALL tasks train finished, exiting...") print("ALL tasks train finished, exiting...")
def pred(self, task_instance, inference_model_dir=None): def pred(self, task_instance, inference_model_dir=None):
...@@ -697,7 +644,6 @@ class Controller(object): ...@@ -697,7 +644,6 @@ class Controller(object):
inst.reader['pred'].load_data() inst.reader['pred'].load_data()
fetch_names, fetch_vars = inst.pred_fetch_list fetch_names, fetch_vars = inst.pred_fetch_list
# iterator = create_iterator_fn(inst.reader['pred'].iterator, inst.name, pred_joint_shape_and_dtypes, name_to_position)
mapper = {k:v for k,v in inst.pred_input} mapper = {k:v for k,v in inst.pred_input}
buf = [] buf = []
for feed in inst.reader['pred'].iterator(): for feed in inst.reader['pred'].iterator():
......
...@@ -26,7 +26,7 @@ class TaskInstance(object): ...@@ -26,7 +26,7 @@ class TaskInstance(object):
self._config = config self._config = config
self._verbose = verbose self._verbose = verbose
self._save_infermodel_path = os.path.join(self._config['save_path'], 'infer_model') self._save_infermodel_path = os.path.join(self._config['save_path'], self._name, 'infer_model')
self._save_ckpt_path = os.path.join(self._config['save_path'], 'ckpt') self._save_ckpt_path = os.path.join(self._config['save_path'], 'ckpt')
# following flags can be fetch from instance config file # following flags can be fetch from instance config file
...@@ -87,13 +87,7 @@ class TaskInstance(object): ...@@ -87,13 +87,7 @@ class TaskInstance(object):
dirpath = self._save_infermodel_path + suffix dirpath = self._save_infermodel_path + suffix
self._pred_input_varname_list = [str(i) for i in self._pred_input_varname_list] self._pred_input_varname_list = [str(i) for i in self._pred_input_varname_list]
# del self._pred_input_varname_list[0]
# del self._pred_input_varname_list[0]
# del self._pred_input_varname_list[0]
# print(self._pred_input_varname_list)
fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, export_for_deployment = True) fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, export_for_deployment = True)
# fluid.io.save_inference_model(dirpath, self._pred_input_varname_list, self._pred_fetch_var_list, self._exe, params_filename='__params__')
conf = {} conf = {}
for k, strv in self._save_protocol.items(): for k, strv in self._save_protocol.items():
...@@ -111,8 +105,6 @@ class TaskInstance(object): ...@@ -111,8 +105,6 @@ class TaskInstance(object):
exec('{}=v'.format(strv)) exec('{}=v'.format(strv))
pred_prog, self._pred_input_varname_list, self._pred_fetch_var_list = \ pred_prog, self._pred_input_varname_list, self._pred_fetch_var_list = \
fluid.io.load_inference_model(infer_model_path, self._exe) fluid.io.load_inference_model(infer_model_path, self._exe)
# pred_prog, self._pred_input_varname_list, self._pred_fetch_var_list = \
# fluid.io.load_inference_model(infer_model_path, self._exe, params_filename='__params__')
print(self._name+': inference model loaded from ' + infer_model_path) print(self._name+': inference model loaded from ' + infer_model_path)
return pred_prog return pred_prog
...@@ -159,7 +151,6 @@ class TaskInstance(object): ...@@ -159,7 +151,6 @@ class TaskInstance(object):
assert isinstance(val, dict) assert isinstance(val, dict)
self._pred_input_name_list, self._pred_input_varname_list = \ self._pred_input_name_list, self._pred_input_varname_list = \
zip(*[[k, v.name] for k,v in val.items()]) zip(*[[k, v.name] for k,v in val.items()])
# print(self._pred_input_name_list)
@property @property
def pred_fetch_list(self): def pred_fetch_list(self):
...@@ -243,7 +234,6 @@ class TaskInstance(object): ...@@ -243,7 +234,6 @@ class TaskInstance(object):
self._cur_train_step = 1 self._cur_train_step = 1
if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps: if self._is_target and self._cur_train_step + self._cur_train_epoch * self._steps_pur_epoch >= self._expected_train_steps:
self._train_finish = True self._train_finish = True
# fluid.io.save_inference_model(self._save_infermodel_path, )
@property @property
def steps_pur_epoch(self): def steps_pur_epoch(self):
......
...@@ -54,7 +54,6 @@ class TaskParadigm(task_paradigm): ...@@ -54,7 +54,6 @@ class TaskParadigm(task_paradigm):
mask_pos = inputs["reader"]["mask_pos"] mask_pos = inputs["reader"]["mask_pos"]
if self._is_training: if self._is_training:
mask_label = inputs["reader"]["mask_label"] mask_label = inputs["reader"]["mask_label"]
# 多任务学习时才需要引入这个,防止其他run其他任务时导致seqlen过小,gather超范围
max_position = inputs["reader"]["batchsize_x_seqlen"] - 1 max_position = inputs["reader"]["batchsize_x_seqlen"] - 1
mask_pos = fluid.layers.elementwise_min(mask_pos, max_position) mask_pos = fluid.layers.elementwise_min(mask_pos, max_position)
mask_pos.stop_gradient = True mask_pos.stop_gradient = True
...@@ -90,14 +89,6 @@ class TaskParadigm(task_paradigm): ...@@ -90,14 +89,6 @@ class TaskParadigm(task_paradigm):
name=scope_name+"mask_lm_out_fc.b_0", name=scope_name+"mask_lm_out_fc.b_0",
initializer=fluid.initializer.Constant(value=0.0)) initializer=fluid.initializer.Constant(value=0.0))
# print fluid.default_main_program().global_block()
# fc_out = fluid.layers.matmul(
# x=mask_trans_feat,
# y=fluid.default_main_program().global_block().var(
# _word_emb_name),
# transpose_y=True)
fc_out = fluid.layers.matmul( fc_out = fluid.layers.matmul(
x=mask_trans_feat, x=mask_trans_feat,
y=word_emb, y=word_emb,
......
...@@ -51,7 +51,6 @@ def _zero_batch(attrs): ...@@ -51,7 +51,6 @@ def _zero_batch(attrs):
def _zero_batch_x(attrs, batch_size): def _zero_batch_x(attrs, batch_size):
pos_attrs = [] pos_attrs = []
for shape, dtype in attrs: for shape, dtype in attrs:
# pos_shape = [size if size and size > 0 else 5 for size in shape]
pos_shape = [size for size in shape] pos_shape = [size for size in shape]
if pos_shape[0] == -1: if pos_shape[0] == -1:
pos_shape[0] = batch_size pos_shape[0] = batch_size
...@@ -72,7 +71,6 @@ def create_net_inputs(input_attrs, async=False, iterator_fn=None, dev_count=1, n ...@@ -72,7 +71,6 @@ def create_net_inputs(input_attrs, async=False, iterator_fn=None, dev_count=1, n
if async: if async:
assert iterator_fn is not None, "iterator_fn is needed for building async input layer." assert iterator_fn is not None, "iterator_fn is needed for building async input layer."
# reader = fluid.io.PyReader(inputs, capacity=dev_count*n_prefetch, iterable=False)
reader = fluid.io.PyReader(inputs, capacity=dev_count, iterable=False) reader = fluid.io.PyReader(inputs, capacity=dev_count, iterable=False)
reader.decorate_batch_generator(iterator_fn) reader.decorate_batch_generator(iterator_fn)
reader.start() reader.start()
...@@ -117,8 +115,6 @@ def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtype ...@@ -117,8 +115,6 @@ def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtype
if not keep_one_task: if not keep_one_task:
dev_count = 1 dev_count = 1
# build fake batch
# 注意这种方法会导致一个问题,用户将某任务的mix ratio设置成0后,并不能避免从该任务上读数据,若用户将数据集删掉则会导致崩溃;不过相比之前的zero batch方法,这种方法不必作出只能有一个size=-1的维度且第0维的-1必须是batch size的假设
results = _zero_batch(joint_shape_and_dtypes) results = _zero_batch(joint_shape_and_dtypes)
outbuf = {} outbuf = {}
for id in task_ids: for id in task_ids:
...@@ -151,12 +147,9 @@ def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtype ...@@ -151,12 +147,9 @@ def create_joint_iterator_fn(iterators, iterator_prefixes, joint_shape_and_dtype
print('----- debug joint iterator -----') print('----- debug joint iterator -----')
print('sampled task id: '+str(id)) print('sampled task id: '+str(id))
task_id_tensor = np.array([[id]]).astype("int64") task_id_tensor = np.array([[id]]).astype("int64")
# results[0] = task_id_tensor
for i in range(dev_count): for i in range(dev_count):
# 这两个应该是等价的
# results[0] = task_id_tensor
results[outname_to_pos['__task_id']] = task_id_tensor results[outname_to_pos['__task_id']] = task_id_tensor
assert outname_to_pos['__task_id'] == 0 assert outname_to_pos['__task_id'] == 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册