未验证 提交 55538c56 编写于 作者: H hutuxian 提交者: GitHub

cherry-pick: update api format (#18413) (#18421)

上级 49884564
......@@ -874,14 +874,7 @@ paddle.fluid.optimizer.ExponentialMovingAverage.apply (ArgSpec(args=['self', 'ex
paddle.fluid.optimizer.ExponentialMovingAverage.restore (ArgSpec(args=['self', 'executor'], varargs=None, keywords=None, defaults=None), ('document', '8c8a1791608b02a1ede53d6dd3a4fcec'))
paddle.fluid.optimizer.ExponentialMovingAverage.update (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', 'ea10f08af6d7aac3b7974aa976e4085f'))
paddle.fluid.optimizer.PipelineOptimizer.__init__ (ArgSpec(args=['self', 'optimizer', 'cut_list', 'place_list', 'concurrency_list', 'queue_size', 'sync_steps', 'start_cpu_core_id'], varargs=None, keywords=None, defaults=(None, None, None, 30, 1, 0)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.PipelineOptimizer.create_vars (ArgSpec(args=['self', 'block', 'main_program'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.PipelineOptimizer.extract_section_ops (ArgSpec(args=['self', 'ops', 'cut_point_name'], varargs=None, keywords=None, defaults=None), ('document', '4a29be77da04b5c30dd7202f44c79b70'))
paddle.fluid.optimizer.PipelineOptimizer.extract_section_opt_ops (ArgSpec(args=['self', 'ops', 'cut_point_name'], varargs=None, keywords=None, defaults=None), ('document', '99e0f641222c1ce4dd0d7194c3b2c653'))
paddle.fluid.optimizer.PipelineOptimizer.find_input_output (ArgSpec(args=['self', 'ops', 'name', 'is_forward'], varargs=None, keywords=None, defaults=(True,)), ('document', '92d77fb262766b352746f09cca81db93'))
paddle.fluid.optimizer.PipelineOptimizer.find_persistable_vars (ArgSpec(args=['self', 'ops', 'whole_parameters'], varargs=None, keywords=None, defaults=None), ('document', '877b7cc290f0647455e5e4409e825923'))
paddle.fluid.optimizer.PipelineOptimizer.find_section_opt (ArgSpec(args=['self', 'ops', 'params'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.PipelineOptimizer.minimize (ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.PipelineOptimizer.split_program (ArgSpec(args=['self', 'main_program', 'cut_list'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.backward.append_backward (ArgSpec(args=['loss', 'parameter_list', 'no_grad_set', 'callbacks'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '08a5dd9f6f376ff3d55e0b1d92115cbd'))
paddle.fluid.backward.gradients (ArgSpec(args=['targets', 'inputs', 'target_gradients', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None)), ('document', 'e2097e1e0ed84ae44951437bfe269a1b'))
paddle.fluid.regularizer.L1DecayRegularizer.__init__ (ArgSpec(args=['self', 'regularization_coeff'], varargs=None, keywords=None, defaults=(0.0,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
......
......@@ -2650,57 +2650,67 @@ class ExponentialMovingAverage(object):
class PipelineOptimizer(object):
"""
Pipeline Optimizer
Train with pipeline mode. The program will be splited by cut_list.
If the len of cut_list is k, then the whole program (including
backward part) will be splited to 2*k-1 sections. So the length of place_list
and concurrency_list must be also 2*k-1.
Note: Though the asynchronous mode is applied in pipeline training to speed up,
Train with pipeline mode. The program will be splited by cut_list.
If the len of cut_list is k, then the whole program (including \
backward part) will be splited to 2*k-1 sections.
So the length of place_list and concurrency_list must be also 2*k-1.
Note: Though the asynchronous mode is applied in pipeline training to speed up, \
the final performance depends on the training progress of each pipeline heavily.
And we will try the synchronous mode in the future
And we will try the synchronous mode in the future.
Args:
optimizer (Optimizer): The based optimizer, such as SGD
cut_list (list of Variable list): The cut variable of the main_program
place_list (list of Place): The place where the section will run on
concurrency_list (list of int): The concurrency degree
optimizer (Optimizer): The based optimizer, such as SGD.
cut_list (list of Variable list): The cut variable of the main_program.
place_list (list of Place): The place where the section will run on.
concurrency_list (list of int): The concurrency degree.
queue_size (int): Each section will consume scopes from its in-scope queue
and produce scopes to out-scope queue. And this parameter
specify the scope queue size. [Optional. Default: 30]
sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1]
start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0]
specify the scope queue size. [Optional. Default: 30].
sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1].
start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0].
Examples:
.. code-block:: python
x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
concat = layers.concat([emb_x, emb_y], axis=1)
fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
loss = layers.reduce_mean(fc)
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
cut_list=[[emb_x, emb_y], [loss]],
place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
concurrency_list=[1, 1, 4],
queue_size=2,
sync_steps=1,
)
optimizer.minimize(loss)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var([x,y])
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)
exe.train_from_dataset(
fluid.default_main_program(),
dataset,
thread=2,
debug=False,
fetch_list=[],
fetch_info=[],
print_period=1)
import paddle.fluid.layers as layers
x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
concat = layers.concat([emb_x, emb_y], axis=1)
fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
loss = layers.reduce_mean(fc)
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
cut_list=[[emb_x, emb_y], [loss]],
place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
concurrency_list=[1, 1, 4],
queue_size=2,
sync_steps=1,
)
optimizer.minimize(loss)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var([x,y])
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)
exe.train_from_dataset(
fluid.default_main_program(),
dataset,
thread=2,
debug=False,
fetch_list=[],
fetch_info=[],
print_period=1)
"""
def __init__(self,
......@@ -2720,7 +2730,7 @@ class PipelineOptimizer(object):
self._sync_steps = sync_steps
self._start_cpu_core_id = start_cpu_core_id
def create_vars(self, block, main_program):
def _create_vars(self, block, main_program):
used_var_set = set()
for op_idx in range(block.desc.op_size()):
op_desc = block.desc.op(op_idx)
......@@ -2732,7 +2742,7 @@ class PipelineOptimizer(object):
source_var = main_program.block(0).var(str(var))
block._clone_variable(source_var, False)
def extract_section_opt_ops(self, ops, cut_point_name):
def _extract_section_opt_ops(self, ops, cut_point_name):
"""
Extract opt ops in the given section
"""
......@@ -2748,7 +2758,7 @@ class PipelineOptimizer(object):
op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]]
return op_path
def find_input_output(self, ops, name, is_forward=True):
def _find_input_output(self, ops, name, is_forward=True):
"""
Find the inputs or outputs of a section
"""
......@@ -2763,7 +2773,7 @@ class PipelineOptimizer(object):
all_set.update(op.desc.input_arg_names())
return all_set - part_set
def find_persistable_vars(self, ops, whole_parameters):
def _find_persistable_vars(self, ops, whole_parameters):
"""
find the persistable input vars in current section
"""
......@@ -2791,7 +2801,7 @@ class PipelineOptimizer(object):
return True
return False
def extract_section_ops(self, ops, cut_point_name):
def _extract_section_ops(self, ops, cut_point_name):
"""
Extract ops in the given section
"""
......@@ -2811,11 +2821,11 @@ class PipelineOptimizer(object):
op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]]
return op_path
def find_section_opt(self, ops, params):
res = self.extract_section_opt_ops(ops, params)
def _find_section_opt(self, ops, params):
res = self._extract_section_opt_ops(ops, params)
return res
def split_program(self, main_program, cut_list):
def _split_program(self, main_program, cut_list):
programs = []
block = main_program.block(0)
whole_parameters = [e.name for e in block.all_parameters()]
......@@ -2836,24 +2846,24 @@ class PipelineOptimizer(object):
"input_set": set(),
"output_set": set()
}
cur_ops = self.extract_section_ops(ops, cut_vars)
cur_ops = self._extract_section_ops(ops, cut_vars)
if i == 0:
for op in ops:
if self._is_lr_role_op(op):
cur_ops.append(op)
#prevent inplace in/out
program["input_set"].update(
self.find_input_output(
self._find_input_output(
cur_ops, [], is_forward=True))
for e in cur_ops:
ops.remove(e)
if i < cut_len:
sec_params.append(
self.find_persistable_vars(cur_ops, whole_parameters))
self._find_persistable_vars(cur_ops, whole_parameters))
if i >= cut_len - 1:
opt_ops = self.find_section_opt(ops,
sec_params[2 * cut_len - 2 - i])
opt_ops = self._find_section_opt(
ops, sec_params[2 * cut_len - 2 - i])
for e in opt_ops:
ops.remove(e)
......@@ -2864,11 +2874,11 @@ class PipelineOptimizer(object):
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
program["input_set"].update(
self.find_input_output(
self._find_input_output(
cur_ops, cut_vars, is_forward=True))
program["input_set"].update(sec_params[min(i, 2 * cut_len - 2 - i)])
program["output_set"].update(
self.find_input_output(
self._find_input_output(
cur_ops, cut_vars, is_forward=False))
programs.append(program)
program = {
......@@ -2883,7 +2893,7 @@ class PipelineOptimizer(object):
program["input_set"].update(
[cut_var.name + "@GRAD" for cut_var in cut_list[0]])
program["input_set"].update(
self.find_input_output(
self._find_input_output(
ops, [], is_forward=True))
program["input_set"].update(sec_params[0])
programs.append(program)
......@@ -2904,9 +2914,9 @@ class PipelineOptimizer(object):
self._optimizer.minimize(loss, startup_program, parameter_list,
no_grad_set)
program = loss.block.program
program_list = self.split_program(program, self._cut_list)
program_list = self._split_program(program, self._cut_list)
for p in program_list:
self.create_vars(p["program"].block(0), program)
self._create_vars(p["program"].block(0), program)
whole_parameters = [e.name for e in program.block(0).all_parameters()]
param_need_sync = []
for i, section_p in enumerate(program_list):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册