diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 63057ac281489f7c3206f7806a44aaffa9cc7f0b..170955ce853b8c61e8b756f4d7bd713f9865257a 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -874,14 +874,7 @@ paddle.fluid.optimizer.ExponentialMovingAverage.apply (ArgSpec(args=['self', 'ex paddle.fluid.optimizer.ExponentialMovingAverage.restore (ArgSpec(args=['self', 'executor'], varargs=None, keywords=None, defaults=None), ('document', '8c8a1791608b02a1ede53d6dd3a4fcec')) paddle.fluid.optimizer.ExponentialMovingAverage.update (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', 'ea10f08af6d7aac3b7974aa976e4085f')) paddle.fluid.optimizer.PipelineOptimizer.__init__ (ArgSpec(args=['self', 'optimizer', 'cut_list', 'place_list', 'concurrency_list', 'queue_size', 'sync_steps', 'start_cpu_core_id'], varargs=None, keywords=None, defaults=(None, None, None, 30, 1, 0)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.optimizer.PipelineOptimizer.create_vars (ArgSpec(args=['self', 'block', 'main_program'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.optimizer.PipelineOptimizer.extract_section_ops (ArgSpec(args=['self', 'ops', 'cut_point_name'], varargs=None, keywords=None, defaults=None), ('document', '4a29be77da04b5c30dd7202f44c79b70')) -paddle.fluid.optimizer.PipelineOptimizer.extract_section_opt_ops (ArgSpec(args=['self', 'ops', 'cut_point_name'], varargs=None, keywords=None, defaults=None), ('document', '99e0f641222c1ce4dd0d7194c3b2c653')) -paddle.fluid.optimizer.PipelineOptimizer.find_input_output (ArgSpec(args=['self', 'ops', 'name', 'is_forward'], varargs=None, keywords=None, defaults=(True,)), ('document', '92d77fb262766b352746f09cca81db93')) -paddle.fluid.optimizer.PipelineOptimizer.find_persistable_vars (ArgSpec(args=['self', 'ops', 'whole_parameters'], varargs=None, keywords=None, defaults=None), ('document', '877b7cc290f0647455e5e4409e825923')) -paddle.fluid.optimizer.PipelineOptimizer.find_section_opt (ArgSpec(args=['self', 'ops', 'params'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.optimizer.PipelineOptimizer.minimize (ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.optimizer.PipelineOptimizer.split_program (ArgSpec(args=['self', 'main_program', 'cut_list'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.backward.append_backward (ArgSpec(args=['loss', 'parameter_list', 'no_grad_set', 'callbacks'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '08a5dd9f6f376ff3d55e0b1d92115cbd')) paddle.fluid.backward.gradients (ArgSpec(args=['targets', 'inputs', 'target_gradients', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None)), ('document', 'e2097e1e0ed84ae44951437bfe269a1b')) paddle.fluid.regularizer.L1DecayRegularizer.__init__ (ArgSpec(args=['self', 'regularization_coeff'], varargs=None, keywords=None, defaults=(0.0,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 49adba58e63f92fce02d44ff7333b3619c34c209..3ce98bb0ba9b59b80af86ffa02b9f7e2d90caf3e 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -2650,57 +2650,67 @@ class ExponentialMovingAverage(object): class PipelineOptimizer(object): """ Pipeline Optimizer - Train with pipeline mode. The program will be splited by cut_list. - If the len of cut_list is k, then the whole program (including - backward part) will be splited to 2*k-1 sections. So the length of place_list - and concurrency_list must be also 2*k-1. - Note: Though the asynchronous mode is applied in pipeline training to speed up, + + Train with pipeline mode. The program will be splited by cut_list. + + If the len of cut_list is k, then the whole program (including \ + backward part) will be splited to 2*k-1 sections. + + So the length of place_list and concurrency_list must be also 2*k-1. + + Note: Though the asynchronous mode is applied in pipeline training to speed up, \ the final performance depends on the training progress of each pipeline heavily. - And we will try the synchronous mode in the future + + And we will try the synchronous mode in the future. + Args: - optimizer (Optimizer): The based optimizer, such as SGD - cut_list (list of Variable list): The cut variable of the main_program - place_list (list of Place): The place where the section will run on - concurrency_list (list of int): The concurrency degree + optimizer (Optimizer): The based optimizer, such as SGD. + cut_list (list of Variable list): The cut variable of the main_program. + place_list (list of Place): The place where the section will run on. + concurrency_list (list of int): The concurrency degree. queue_size (int): Each section will consume scopes from its in-scope queue and produce scopes to out-scope queue. And this parameter - specify the scope queue size. [Optional. Default: 30] - sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1] - start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0] + specify the scope queue size. [Optional. Default: 30]. + sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1]. + start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0]. + Examples: .. code-block:: python - x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0) - y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0) - emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False) - emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False) - concat = layers.concat([emb_x, emb_y], axis=1) - fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False) - loss = layers.reduce_mean(fc) - optimizer = fluid.optimizer.SGD(learning_rate=0.5) - optimizer = fluid.optimizer.PipelineOptimizer(optimizer, - cut_list=[[emb_x, emb_y], [loss]], - place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()], - concurrency_list=[1, 1, 4], - queue_size=2, - sync_steps=1, - ) - optimizer.minimize(loss) - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"] - dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset") - dataset.set_use_var([x,y]) - dataset.set_batch_size(batch_size) - dataset.set_filelist(filelist) - exe.train_from_dataset( - fluid.default_main_program(), - dataset, - thread=2, - debug=False, - fetch_list=[], - fetch_info=[], - print_period=1) + + import paddle.fluid.layers as layers + + x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0) + y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0) + emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False) + emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False) + concat = layers.concat([emb_x, emb_y], axis=1) + fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False) + loss = layers.reduce_mean(fc) + optimizer = fluid.optimizer.SGD(learning_rate=0.5) + optimizer = fluid.optimizer.PipelineOptimizer(optimizer, + cut_list=[[emb_x, emb_y], [loss]], + place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()], + concurrency_list=[1, 1, 4], + queue_size=2, + sync_steps=1, + ) + optimizer.minimize(loss) + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"] + dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset") + dataset.set_use_var([x,y]) + dataset.set_batch_size(batch_size) + dataset.set_filelist(filelist) + exe.train_from_dataset( + fluid.default_main_program(), + dataset, + thread=2, + debug=False, + fetch_list=[], + fetch_info=[], + print_period=1) """ def __init__(self, @@ -2720,7 +2730,7 @@ class PipelineOptimizer(object): self._sync_steps = sync_steps self._start_cpu_core_id = start_cpu_core_id - def create_vars(self, block, main_program): + def _create_vars(self, block, main_program): used_var_set = set() for op_idx in range(block.desc.op_size()): op_desc = block.desc.op(op_idx) @@ -2732,7 +2742,7 @@ class PipelineOptimizer(object): source_var = main_program.block(0).var(str(var)) block._clone_variable(source_var, False) - def extract_section_opt_ops(self, ops, cut_point_name): + def _extract_section_opt_ops(self, ops, cut_point_name): """ Extract opt ops in the given section """ @@ -2748,7 +2758,7 @@ class PipelineOptimizer(object): op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]] return op_path - def find_input_output(self, ops, name, is_forward=True): + def _find_input_output(self, ops, name, is_forward=True): """ Find the inputs or outputs of a section """ @@ -2763,7 +2773,7 @@ class PipelineOptimizer(object): all_set.update(op.desc.input_arg_names()) return all_set - part_set - def find_persistable_vars(self, ops, whole_parameters): + def _find_persistable_vars(self, ops, whole_parameters): """ find the persistable input vars in current section """ @@ -2791,7 +2801,7 @@ class PipelineOptimizer(object): return True return False - def extract_section_ops(self, ops, cut_point_name): + def _extract_section_ops(self, ops, cut_point_name): """ Extract ops in the given section """ @@ -2811,11 +2821,11 @@ class PipelineOptimizer(object): op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]] return op_path - def find_section_opt(self, ops, params): - res = self.extract_section_opt_ops(ops, params) + def _find_section_opt(self, ops, params): + res = self._extract_section_opt_ops(ops, params) return res - def split_program(self, main_program, cut_list): + def _split_program(self, main_program, cut_list): programs = [] block = main_program.block(0) whole_parameters = [e.name for e in block.all_parameters()] @@ -2836,24 +2846,24 @@ class PipelineOptimizer(object): "input_set": set(), "output_set": set() } - cur_ops = self.extract_section_ops(ops, cut_vars) + cur_ops = self._extract_section_ops(ops, cut_vars) if i == 0: for op in ops: if self._is_lr_role_op(op): cur_ops.append(op) #prevent inplace in/out program["input_set"].update( - self.find_input_output( + self._find_input_output( cur_ops, [], is_forward=True)) for e in cur_ops: ops.remove(e) if i < cut_len: sec_params.append( - self.find_persistable_vars(cur_ops, whole_parameters)) + self._find_persistable_vars(cur_ops, whole_parameters)) if i >= cut_len - 1: - opt_ops = self.find_section_opt(ops, - sec_params[2 * cut_len - 2 - i]) + opt_ops = self._find_section_opt( + ops, sec_params[2 * cut_len - 2 - i]) for e in opt_ops: ops.remove(e) @@ -2864,11 +2874,11 @@ class PipelineOptimizer(object): ap_op = program["program"].block(0).desc.append_op() ap_op.copy_from(op_desc) program["input_set"].update( - self.find_input_output( + self._find_input_output( cur_ops, cut_vars, is_forward=True)) program["input_set"].update(sec_params[min(i, 2 * cut_len - 2 - i)]) program["output_set"].update( - self.find_input_output( + self._find_input_output( cur_ops, cut_vars, is_forward=False)) programs.append(program) program = { @@ -2883,7 +2893,7 @@ class PipelineOptimizer(object): program["input_set"].update( [cut_var.name + "@GRAD" for cut_var in cut_list[0]]) program["input_set"].update( - self.find_input_output( + self._find_input_output( ops, [], is_forward=True)) program["input_set"].update(sec_params[0]) programs.append(program) @@ -2904,9 +2914,9 @@ class PipelineOptimizer(object): self._optimizer.minimize(loss, startup_program, parameter_list, no_grad_set) program = loss.block.program - program_list = self.split_program(program, self._cut_list) + program_list = self._split_program(program, self._cut_list) for p in program_list: - self.create_vars(p["program"].block(0), program) + self._create_vars(p["program"].block(0), program) whole_parameters = [e.name for e in program.block(0).all_parameters()] param_need_sync = [] for i, section_p in enumerate(program_list):