From 86b0a72576b26ab74be0a189baa4f3cb15bc1c38 Mon Sep 17 00:00:00 2001 From: chengduo Date: Fri, 13 Jul 2018 15:39:50 +0800 Subject: [PATCH] Refine multi thread cpu parallel exe (#11406) * refine multi-thread CPU Parallel exe * refine multi thread CPU Parallel exe * Refine CPU version for ParallelExecutor * add share_parameter_between_cards_ * Fix ParallelExecutor bug * Fix unit test * Fix parameter opt balance * Fix with opti (param->grad) * Add grad to op var * Remove shard_param_between_cards --- .../details/multi_devices_graph_builder.cc | 38 ++++++++++----- paddle/fluid/framework/parallel_executor.cc | 23 +++++++-- python/paddle/fluid/clip.py | 14 ++++-- python/paddle/fluid/framework.py | 11 +++-- python/paddle/fluid/optimizer.py | 32 ++++++++----- python/paddle/fluid/regularizer.py | 11 ++--- .../unittests/parallel_executor_test_base.py | 10 +++- .../unittests/test_parallel_executor_mnist.py | 48 +++++++++---------- .../test_parallel_executor_seresnext.py | 17 +++---- 9 files changed, 121 insertions(+), 83 deletions(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index b82c2ef40..6f5d4471a 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -276,13 +276,22 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( } } - // Insert BCast Ops - for (size_t dev_id = 0; dev_id < bcast_var_name_set.size(); ++dev_id) { - auto &to_bcast_set = bcast_var_name_set[dev_id]; - for (auto &bcast_name : to_bcast_set) { - CreateBroadcastOp(&result, bcast_name, dev_id); + bool use_gpu = false; +#ifdef PADDLE_WITH_CUDA + use_gpu = nccl_ctxs_ != nullptr; +#endif + + if (use_gpu || + strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { + // Insert BCast Ops + for (size_t dev_id = 0; dev_id < bcast_var_name_set.size(); ++dev_id) { + auto &to_bcast_set = bcast_var_name_set[dev_id]; + for (auto &bcast_name : to_bcast_set) { + CreateBroadcastOp(&result, bcast_name, dev_id); + } } } + /* Dependency graph has been constructed. However, there are still data hazards need to be handled. @@ -412,14 +421,19 @@ int MultiDevSSAGraphBuilder::GetOpDeviceID(const OpDesc &op) const { if (strategy_.reduce_ != BuildStrategy::ReduceStrategy::kReduce) { return -1; } - - for (auto &varname : op.InputArgumentNames()) { - int dev_id = GetVarDeviceID(varname); - if (dev_id != -1) { - return dev_id; - } + int op_role = boost::get( + op.GetAttr(framework::OpProtoAndCheckerMaker::OpRoleAttrName())); + if (op_role != static_cast(framework::OpRole::kOptimize)) { + return -1; } - return -1; + auto param_grad = boost::get>( + op.GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName())); + + PADDLE_ENFORCE_EQ(param_grad.size(), 2U); + int dev_id = GetVarDeviceID(param_grad[1]); + PADDLE_ENFORCE_NE(dev_id, -1, "dev_id should not be -1.[%s, %s]", op.Type(), + param_grad[0]); + return dev_id; } int MultiDevSSAGraphBuilder::GetVarDeviceID(const std::string &varname) const { diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 3a9027713..58be61362 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -45,6 +45,7 @@ class ParallelExecutorPrivate { #endif bool own_local_scope_; bool use_cuda_; + bool use_all_reduce_; }; std::vector &ParallelExecutor::GetLocalScopes() { @@ -62,6 +63,14 @@ ParallelExecutor::ParallelExecutor( : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; member_->use_cuda_ = exec_strategy.use_cuda_; + member_->use_all_reduce_ = + build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce; + + if (!member_->use_all_reduce_) { + PADDLE_ENFORCE(places.size() > 1, + "If you set build_strategy.reduce with 'Reduce'," + "the number of places must be greater than 1."); + } // Step 1. Bcast the params to devs. // Create local scopes @@ -117,7 +126,7 @@ ParallelExecutor::ParallelExecutor( #ifdef PADDLE_WITH_CUDA builder_factory.SetNCCLContextMap(member_->nccl_ctxs_.get()); #else - PADDLE_THROW("Not compiled with CUDA"); + PADDLE_THROW("Not compiled with CUDA."); #endif } @@ -133,7 +142,7 @@ ParallelExecutor::ParallelExecutor( void ParallelExecutor::BCastParamsToDevs( const std::unordered_set &vars) const { - // the the initializing bcast, all vars would be bcast from device(0), + // the initializing bcast, all vars would be bcast from device(0), // otherwise // bcast from the specified device. bool initializing = builder_.get() == nullptr ? true : false; @@ -209,9 +218,13 @@ void ParallelExecutor::BCastParamsToDevs( auto local_scope = member_->local_scopes_[i]; auto *t = local_scope->Var(var)->GetMutable(); - t->Resize(dims); - t->mutable_data(cpu, main_tensor.type()); - paddle::framework::TensorCopy(main_tensor, cpu, t); + if (member_->use_all_reduce_ || member_->use_cuda_) { + t->Resize(dims); + t->mutable_data(cpu, main_tensor.type()); + paddle::framework::TensorCopy(main_tensor, cpu, t); + } else { + t->ShareDataWith(main_tensor); + } } } } diff --git a/python/paddle/fluid/clip.py b/python/paddle/fluid/clip.py index 2a8e3d410..d9acfef58 100644 --- a/python/paddle/fluid/clip.py +++ b/python/paddle/fluid/clip.py @@ -324,10 +324,12 @@ def set_gradient_clip(clip, param_list=None, program=None): param.gradient_clip_attr = copy.deepcopy(clip) -def append_gradient_clip_ops(param_grad): +def append_gradient_clip_ops(param_grads): context = dict() - for p, g in param_grad: - with p.block.program.optimized_guard(p): + for p, g in param_grads: + if g is None: + continue + with p.block.program.optimized_guard([p, g]): clip_attr = getattr(p, 'gradient_clip_attr', NullGradientClipAttr()) if clip_attr is None: clip_attr = NullGradientClipAttr() @@ -339,8 +341,10 @@ def append_gradient_clip_ops(param_grad): clip_attr._process_context(context=context, param=p, grad=g) res = [] - for p, g in param_grad: - with p.block.program.optimized_guard(p): + for p, g in param_grads: + if g is None: + continue + with p.block.program.optimized_guard([p, g]): res.append(clip_attr._create_operators(param=p, grad=g)) return res diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index ea3117e02..d89cb246a 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -1319,7 +1319,7 @@ class Program(object): self._op_role_var = [var_name] @contextlib.contextmanager - def optimized_guard(self, var): + def optimized_guard(self, param_and_grads): """ A with guard to set :code:`Optimization` :code:`OpRole` and :code:`OpRoleVar` automatically. @@ -1327,17 +1327,20 @@ class Program(object): Notes: This is a very low level API. Users should not use it directly. Args: - var(Variable|str): The variable (name) to be optimized. + param_and_grads(list): The variables (names) to be optimized. Examples: >>> p, g = backward(...) - >>> with program.optimized_guard(p): + >>> with program.optimized_guard([p,g]): >>> p = p - 0.001 * g """ OpRole = core.op_proto_and_checker_maker.OpRole self._current_role = OpRole.Optimize - self._op_role_var = [var.name if isinstance(var, Variable) else var] + self._op_role_var = [ + var.name if isinstance(var, Variable) else var + for var in param_and_grads + ] yield self._op_role_var = [] self._current_role = OpRole.Forward diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 214f47afa..94e78d155 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -123,7 +123,7 @@ class Optimizer(object): """ pass - def _finish_update(self, block, parameters): + def _finish_update(self, block, parameters_and_grads): """Finish any custom updates needed before completing an optimization step @@ -226,18 +226,18 @@ class Optimizer(object): optimize_ops = [] for param_and_grad in parameters_and_grads: + if param_and_grad[1] is None: + continue with param_and_grad[0].block.program.optimized_guard( - param_and_grad[0]): - if param_and_grad[0].trainable is True and param_and_grad[ - 1] is not None: + param_and_grad): + if param_and_grad[0].trainable is True: optimize_op = self._append_optimize_op(loss.block, param_and_grad) optimize_ops.append(optimize_op) # Get custom finish ops for subclasses # FIXME: Need to fix this once we figure out how to handle dependencies - self._finish_update(loss.block, - [p[0] for p in parameters_and_grads]) + self._finish_update(loss.block, parameters_and_grads) end = len(global_block.ops) return global_block.slice_ops(start, end) @@ -564,13 +564,15 @@ class AdamOptimizer(Optimizer): return adam_op - def _finish_update(self, block, parameters): + def _finish_update(self, block, param_and_grads): """Update Beta1 and Beta2 Power accumulators """ assert isinstance(block, framework.Block) main_block = block.program.global_block() - for param in parameters: - with param.block.program.optimized_guard(param): + for param, grad in param_and_grads: + if grad is None: + continue + with param.block.program.optimized_guard([param, grad]): beta1_pow_acc = self._get_accumulator(self._beta1_pow_acc_str, param) beta2_pow_acc = self._get_accumulator(self._beta2_pow_acc_str, @@ -691,13 +693,15 @@ class AdamaxOptimizer(Optimizer): return adamax_op - def _finish_update(self, block, parameters): + def _finish_update(self, block, parameters_and_grads): """Update Beta1 Power accumulator """ assert isinstance(block, framework.Block) main_block = block.program.global_block() - for param in parameters: - with param.block.program.optimized_guard(param): + for param, grad in parameters_and_grads: + if grad is None: + continue + with param.block.program.optimized_guard([param, grad]): beta1_pow_acc = self._get_accumulator(self._beta1_pow_acc_str, param) main_block.append_op( @@ -1158,7 +1162,9 @@ class ModelAverage(Optimizer): self.params_grads.append((param, grad)) for param, grad in self.params_grads: - with param.block.program.optimized_guard(param): + if grad is None: + continue + with param.block.program.optimized_guard([param, grad]): self._append_average_accumulate_op(param) self.apply_program = Program() diff --git a/python/paddle/fluid/regularizer.py b/python/paddle/fluid/regularizer.py index 53f35f5cc..080c18542 100644 --- a/python/paddle/fluid/regularizer.py +++ b/python/paddle/fluid/regularizer.py @@ -41,12 +41,11 @@ def append_regularization_ops(parameters_and_grads, regularization=None): """ params_and_grads = [] for param, grad in parameters_and_grads: - with param.block.program.optimized_guard(param): - # If no gradient then we don't need to do anything - if grad is None: - params_and_grads.append((param, grad)) - continue - + # If no gradient then we don't need to do anything + if grad is None: + params_and_grads.append((param, grad)) + continue + with param.block.program.optimized_guard([param, grad]): regularization_term = None if param.regularizer is not None: # Add variable for regularization term in grad block diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index cddf00765..f5c93319d 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -35,7 +35,7 @@ class TestParallelExecutorBase(unittest.TestCase): feed_dict=None, seed=None, use_parallel_executor=True, - balance_parameter_opt_between_cards=False): + use_reduce=False): def run_executor(exe, feed, fetch_list, program=None): if isinstance(exe, fluid.ParallelExecutor): res = exe.run(fetch_list=fetch_list, feed=feed) @@ -50,14 +50,19 @@ class TestParallelExecutorBase(unittest.TestCase): main = fluid.Program() startup = fluid.Program() startup.random_seed = 1 # Fix random seed + main.random_seed = 1 with fluid.program_guard(main, startup): if seed is not None: startup.random_seed = seed + main.random_seed = seed + loss = method(use_feed=feed_dict is not None) adam = fluid.optimizer.Adam() adam.minimize(loss) + if memory_opt: fluid.memory_optimize(main) + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() startup_exe = fluid.Executor(place) startup_exe.run(startup) @@ -65,7 +70,8 @@ class TestParallelExecutorBase(unittest.TestCase): exec_strategy.allow_op_delay = allow_op_delay build_strategy = fluid.BuildStrategy() - build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce if balance_parameter_opt_between_cards else fluid.BuildStrategy.ReduceStrategy.AllReduce + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \ + if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce if use_parallel_executor: exe = fluid.ParallelExecutor( diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index a801d99aa..f098dc7a3 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -101,9 +101,7 @@ class TestMNIST(TestParallelExecutorBase): fluid.recordio_writer.convert_reader_to_recordio_file( MNIST_RECORDIO_FILE, reader, feeder) - def check_simple_fc_convergence(self, - balance_parameter_opt_between_cards, - use_cuda=True): + def check_simple_fc_convergence(self, use_cuda, use_reduce=False): self.check_network_convergence(simple_fc_net, use_cuda=use_cuda) self.check_network_convergence( simple_fc_net, use_cuda=use_cuda, allow_op_delay=True) @@ -115,20 +113,19 @@ class TestMNIST(TestParallelExecutorBase): feed_dict={"image": img, "label": label}, use_cuda=use_cuda, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + use_reduce=use_reduce) def test_simple_fc(self): - self.check_simple_fc_convergence(False, use_cuda=True) - self.check_simple_fc_convergence(False, use_cuda=False) + # use_cuda + self.check_simple_fc_convergence(True) + self.check_simple_fc_convergence(False) def test_simple_fc_with_new_strategy(self): - self.check_simple_fc_convergence(True, use_cuda=True) - self.check_simple_fc_convergence(True, use_cuda=False) + # use_cuda, use_reduce + self.check_simple_fc_convergence(True, True) + self.check_simple_fc_convergence(False, True) - def check_simple_fc_parallel_accuracy(self, - balance_parameter_opt_between_cards, - use_cuda=True): + def check_simple_fc_parallel_accuracy(self, use_cuda, use_reduce=False): img = np.zeros(shape=[32, 784], dtype='float32') label = np.ones(shape=[32, 1], dtype='int64') single_first_loss, single_last_loss = self.check_network_convergence( @@ -145,8 +142,7 @@ class TestMNIST(TestParallelExecutorBase): "label": label}, use_cuda=use_cuda, use_parallel_executor=True, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + use_reduce=use_reduce) for p_f in parallel_first_loss: self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6) @@ -154,15 +150,15 @@ class TestMNIST(TestParallelExecutorBase): self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6) def test_simple_fc_parallel_accuracy(self): - self.check_simple_fc_parallel_accuracy(False, use_cuda=True) - self.check_simple_fc_parallel_accuracy(False, use_cuda=False) + self.check_simple_fc_parallel_accuracy(True) + self.check_simple_fc_parallel_accuracy(False) def test_simple_fc_parallel_accuracy_with_new_strategy(self): - self.check_simple_fc_parallel_accuracy(True, use_cuda=True) - self.check_simple_fc_parallel_accuracy(True, use_cuda=False) + # use_cuda, use_reduce + self.check_simple_fc_parallel_accuracy(True, True) + self.check_simple_fc_parallel_accuracy(False, True) - def check_batchnorm_fc_convergence( - self, balance_parameter_opt_between_cards, use_cuda): + def check_batchnorm_fc_convergence(self, use_cuda, use_reduce=False): self.check_network_convergence(fc_with_batchnorm, use_cuda=use_cuda) img = np.zeros(shape=[32, 784], dtype='float32') label = np.ones(shape=[32, 1], dtype='int64') @@ -171,16 +167,16 @@ class TestMNIST(TestParallelExecutorBase): feed_dict={"image": img, "label": label}, use_cuda=use_cuda, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + use_reduce=use_reduce) def test_batchnorm_fc(self): - self.check_batchnorm_fc_convergence(False, use_cuda=True) - self.check_batchnorm_fc_convergence(False, use_cuda=False) + self.check_batchnorm_fc_convergence(True) + self.check_batchnorm_fc_convergence(False) def test_batchnorm_fc_with_new_strategy(self): - self.check_batchnorm_fc_convergence(True, use_cuda=True) - self.check_batchnorm_fc_convergence(True, use_cuda=False) + # use_cuda, use_reduce + self.check_batchnorm_fc_convergence(True, True) + self.check_batchnorm_fc_convergence(False, True) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index 066299e6c..57ae36dbd 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -131,10 +131,7 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False): class TestResnet(TestParallelExecutorBase): - def check_resnet_convergence(self, - balance_parameter_opt_between_cards, - use_cuda=True, - iter=20): + def check_resnet_convergence(self, use_cuda, use_reduce=False, iter=20): os.environ['CPU_NUM'] = str(4) import functools @@ -145,16 +142,16 @@ class TestResnet(TestParallelExecutorBase): iter=iter, batch_size=batch_size, use_cuda=use_cuda, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + use_reduce=use_reduce) def test_resnet(self): - self.check_resnet_convergence(False, use_cuda=True) - self.check_resnet_convergence(False, use_cuda=False, iter=5) + self.check_resnet_convergence(True) + self.check_resnet_convergence(False, iter=5) def test_resnet_with_new_strategy(self): - self.check_resnet_convergence(True, use_cuda=True) - self.check_resnet_convergence(True, use_cuda=False, iter=5) + # use_cuda, use_reduce + self.check_resnet_convergence(True, True) + self.check_resnet_convergence(False, True, iter=5) if __name__ == '__main__': -- GitLab