diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index 0b4a518076390a8e84e800cae54305d15e35f4cb..37d69c4b56c1992c371e75d2b44587e51e30c305 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -37,25 +37,20 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( const std::string &loss_var_name, const std::unordered_set ¶ms, const std::vector &local_scopes, - platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale, - bool use_nccl_allreduce) + platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale) : loss_var_name_(loss_var_name), places_(places), local_scopes_(local_scopes), - nccl_ctxs_(nccl_ctxs), - use_nccl_allreduce_(use_nccl_allreduce) { + nccl_ctxs_(nccl_ctxs) { #else - MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( const std::vector &places, const std::string &loss_var_name, const std::unordered_set ¶ms, - const std::vector &local_scopes, bool use_default_grad_scale, - bool use_nccl_allreduce) + const std::vector &local_scopes, bool use_default_grad_scale) : loss_var_name_(loss_var_name), places_(places), - local_scopes_(local_scopes), - use_nccl_allreduce_(use_nccl_allreduce) { + local_scopes_(local_scopes) { #endif for (auto &p : params) { grad_names_.insert(GradVarName(p)); @@ -121,8 +116,8 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( std::unordered_map>>>( places_.size()); - size_t cur_device_id = 0; - + // size_t cur_device_id = 0; + size_t update_sparse_gp_device_id = 0; std::vector> var_name_on_devices; std::vector> bcast_var_name_set; @@ -162,14 +157,13 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( // broadcast, and each gradient is only broadcast once. for (auto &og : op->OutputArgumentNames()) { if (IsParameterGradientOnce(og, &og_has_been_broadcast)) { - if (use_nccl_allreduce_) { - InsertNCCLAllReduceOp(&result, og); - } else { - CreateReduceOp(&result, cur_device_id, og); - var_name_on_devices[cur_device_id].emplace(og); - bcast_var_name_set[cur_device_id].emplace( + if (IsSparseGradient(og)) { + CreateReduceOp(&result, update_sparse_gp_device_id, og); + var_name_on_devices[update_sparse_gp_device_id].emplace(og); + bcast_var_name_set[update_sparse_gp_device_id].emplace( og.substr(0, og.size() - strlen(kGradVarSuffix))); - cur_device_id = (cur_device_id + 1) % places_.size(); + } else { + InsertNCCLAllReduceOp(&result, og); } } } @@ -205,13 +199,15 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( return std::unique_ptr(graph); } +bool MultiDevSSAGraphBuilder::IsSparseGradient(const std::string &og) const { + auto og_var = local_scopes_[0]->FindVar(og); + PADDLE_ENFORCE_NOT_NULL(og_var); + return og_var->IsType(); +} + int MultiDevSSAGraphBuilder::GetOpDeviceID( const std::vector> &var_name_on_devices, const OpDesc &op) const { - if (use_nccl_allreduce_) { - return -1; - } - int var_dev_id = -1; for (auto &var_name : op.InputArgumentNames()) { if (var_dev_id != -1) break; diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index 8243494301ac4faae0cd27c2bf02bacc477a398a..cf40ea5278676db2cda56d06685fd45f00392cc0 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -36,13 +36,13 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::unordered_set ¶ms, const std::vector &local_scopes, platform::NCCLContextMap *nccl_ctxs, - bool use_default_grad_scale, bool use_nccl_allreduce); + bool use_default_grad_scale); #else MultiDevSSAGraphBuilder(const std::vector &places, const std::string &loss_var_name, const std::unordered_set ¶ms, const std::vector &local_scopes, - bool use_default_grad_scale, bool use_nccl_allreduce); + bool use_default_grad_scale); #endif std::unique_ptr Build(const ProgramDesc &program) const override; @@ -60,7 +60,6 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { #ifdef PADDLE_WITH_CUDA platform::NCCLContextMap *nccl_ctxs_; #endif - bool use_nccl_allreduce_; bool use_default_grad_scale_; bool IsScaleLossOp(const OpDesc &op) const; @@ -99,6 +98,8 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { * nullptr if not found. */ OpDesc *GetSendOpDesc(const ProgramDesc &program) const; + + bool IsSparseGradient(const std::string &og) const; }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index f4593618213729aed8d2a6d193e06d0a43683050..9eea8d1c1861b8a7f6e49621b27c9871b0c1a590 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -58,7 +58,7 @@ ParallelExecutor::ParallelExecutor( const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, bool allow_op_delay, - bool use_default_grad_scale, bool use_nccl_allreduce) + bool use_default_grad_scale) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; @@ -93,11 +93,11 @@ ParallelExecutor::ParallelExecutor( #ifdef PADDLE_WITH_CUDA details::MultiDevSSAGraphBuilder builder( member_->places_, loss_var_name, params, member_->local_scopes_, - member_->nccl_ctxs_.get(), use_default_grad_scale, use_nccl_allreduce); + member_->nccl_ctxs_.get(), use_default_grad_scale); #else - details::MultiDevSSAGraphBuilder builder( - member_->places_, loss_var_name, params, member_->local_scopes_, - use_default_grad_scale, use_nccl_allreduce); + details::MultiDevSSAGraphBuilder builder(member_->places_, loss_var_name, + params, member_->local_scopes_, + use_default_grad_scale); #endif auto graph = builder.Build(main_program); diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index b2e8ddd05522d6f41c40ca3126f9c0a0968ecfed..ecd107d81f8f5bf5d8b899d0c07797114a7ab767 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -40,8 +40,7 @@ class ParallelExecutor { const ProgramDesc& main_program, const std::string& loss_var_name, Scope* scope, const std::vector& local_scopes, - bool allow_op_delay, bool use_default_grad_scale, - bool use_nccl_allreduce); + bool allow_op_delay, bool use_default_grad_scale); ~ParallelExecutor(); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 4b4de6f20801c74fdf8c8c228b450a51f18a500a..c925686f8382da1758fb7cdc048253290ef69513 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -502,12 +502,11 @@ All parameter, weight, gradient are variables in Paddle. const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, std::vector &local_scopes, - bool allow_op_delay, bool use_default_grad_scale, - bool use_nccl_allreduce) { + bool allow_op_delay, bool use_default_grad_scale) { new (&self) ParallelExecutor( num_threads, use_event, places, params, bcast_vars, main_program, loss_var_name, scope, local_scopes, - allow_op_delay, use_default_grad_scale, use_nccl_allreduce); + allow_op_delay, use_default_grad_scale); }) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) // NOTE: even we return a vec* to Python use reference policy. diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 46c18c6893353a23f2c3614ae8934dcd1ea15d27..6b80b007e9080922241ee6c66e0577a18b357980 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -30,8 +30,7 @@ class ParallelExecutor(object): num_threads=None, allow_op_delay=False, share_vars_from=None, - use_default_grad_scale=True, - use_nccl_allreduce=True): + use_default_grad_scale=True): """ ParallelExecutor can run program in parallel. @@ -47,14 +46,6 @@ class ParallelExecutor(object): improve performance in some cases, default False. share_vars_from(ParallelExecutor, default None): If provied, it will share variables from the specified ParallelExecutor. - use_nccl_allreduce(bool, default True): Whether to use nccl_allreduce - or not, if set True, the communication between different - devices by nccl allReduce, which doesn't support updating sparse - parameter, if set False, the communication between different - devices by reduce_op and broadcast_op, which will distribute all - the parameter gradients evenly to different device and updates - the parameters, and finally broadcast to other device, this method - support updating sparse parameter. Default True. use_default_grad_scale(bool, default True): If set True, a default scale value equal to `1./device_count` would be multiplied to gradients of each device and scaled gradients would be @@ -138,8 +129,7 @@ class ParallelExecutor(object): scope, local_scopes, allow_op_delay, - use_default_grad_scale, - use_nccl_allreduce) + use_default_grad_scale) self.scope = scope diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 8dc14b88b8d79220a2acbfdd9076a38bbb013764..9056f5e66fceb42397c9a923d802320dd772725b 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -205,8 +205,7 @@ class TestParallelExecutorBase(unittest.TestCase): allow_op_delay=False, feed_dict=None, seed=None, - use_parallel_executor=True, - use_nccl_allreduce=True): + use_parallel_executor=True): def run_executor(exe, feed, fetch_list, program=None): if isinstance(exe, fluid.ParallelExecutor): res = exe.run(fetch_list=fetch_list, feed=feed) @@ -235,10 +234,7 @@ class TestParallelExecutorBase(unittest.TestCase): if use_parallel_executor: exe = fluid.ParallelExecutor( - True, - loss_name=loss.name, - allow_op_delay=allow_op_delay, - use_nccl_allreduce=use_nccl_allreduce) + True, loss_name=loss.name, allow_op_delay=allow_op_delay) else: exe = fluid.Executor(place=place) @@ -284,25 +280,20 @@ class TestMNIST(TestParallelExecutorBase): fluid.recordio_writer.convert_reader_to_recordio_file( './mnist.recordio', reader, feeder) - def check_simple_fc_convergence(self, use_nccl_allreduce=True): + def check_simple_fc_convergence(self): self.check_network_convergence(simple_fc_net) self.check_network_convergence(simple_fc_net, allow_op_delay=True) img = numpy.zeros(shape=[32, 784], dtype='float32') label = numpy.ones(shape=[32, 1], dtype='int64') self.check_network_convergence( - simple_fc_net, - feed_dict={"image": img, - "label": label}, - use_nccl_allreduce=use_nccl_allreduce) + simple_fc_net, feed_dict={"image": img, + "label": label}) - def test_simple_fc_with_nccl_allreduce(self): - self.check_simple_fc_convergence(True) + def test_simple_fc(self): + self.check_simple_fc_convergence() - def test_simple_fc_with_reduce_op(self): - self.check_simple_fc_convergence(False) - - def check_simple_fc_parallel_accuracy(self, use_nccl_allreduce=True): + def check_simple_fc_parallel_accuracy(self): img = numpy.zeros(shape=[32, 784], dtype='float32') label = numpy.ones(shape=[32, 1], dtype='int64') single_first_loss, single_last_loss = self.check_network_convergence( @@ -316,35 +307,26 @@ class TestMNIST(TestParallelExecutorBase): seed=1000, feed_dict={"image": img, "label": label}, - use_parallel_executor=True, - use_nccl_allreduce=use_nccl_allreduce) + use_parallel_executor=True) for p_f in parallel_first_loss: self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6) for p_l in parallel_last_loss: self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6) - def test_simple_fc_parallel_accuracy_with_nccl_allreduce(self): - self.check_simple_fc_parallel_accuracy(True) - - def test_simple_fc_parallel_accuracy_with_reduce_op(self): - self.check_simple_fc_parallel_accuracy(False) + def test_simple_fc_parallel_accuracy(self): + self.check_simple_fc_parallel_accuracy() - def check_batchnorm_fc_convergence(self, use_nccl_allreduce): + def check_batchnorm_fc_convergence(self): self.check_network_convergence(fc_with_batchnorm) img = numpy.zeros(shape=[32, 784], dtype='float32') label = numpy.ones(shape=[32, 1], dtype='int64') self.check_network_convergence( - fc_with_batchnorm, - feed_dict={"image": img, - "label": label}, - use_nccl_allreduce=use_nccl_allreduce) - - def test_batchnorm_fc_with_nccl_allreduce(self): - self.check_batchnorm_fc_convergence(True) + fc_with_batchnorm, feed_dict={"image": img, + "label": label}) - def test_batchnorm_fc_with_reduce_op(self): - self.check_batchnorm_fc_convergence(False) + def test_batchnorm_fc(self): + self.check_batchnorm_fc_convergence() class TestResnet(TestParallelExecutorBase): @@ -366,21 +348,17 @@ class TestResnet(TestParallelExecutorBase): # fluid.recordio_writer.convert_reader_to_recordio_file( # "./flowers.recordio", reader, feeder, compressor=fluid.core.RecordIOWriter.Compressor.NoCompress) - def check_resnet_convergence(self, use_nccl_allreduce): + def check_resnet_convergence(self): import functools batch_size = 2 self.check_network_convergence( functools.partial( SE_ResNeXt50Small, batch_size=batch_size), iter=20, - batch_size=batch_size, - use_nccl_allreduce=use_nccl_allreduce) + batch_size=batch_size) - def test_resnet_with_nccl_allreduce(self): - self.check_resnet_convergence(True) - - def test_resnet_with_reduce_op(self): - self.check_resnet_convergence(False) + def test_resnet(self): + self.check_resnet_convergence() class ModelHyperParams(object): @@ -544,7 +522,7 @@ class TestTransformer(TestParallelExecutorBase): class ParallelExecutorTestingDuringTraining(unittest.TestCase): - def check_network_convergence(self, use_nccl_allreduce): + def check_network_convergence(self): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -565,16 +543,12 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): feed_dict = {'image': image, 'label': label} train_exe = fluid.ParallelExecutor( - use_cuda=True, - loss_name=loss.name, - main_program=main, - use_nccl_allreduce=use_nccl_allreduce) + use_cuda=True, loss_name=loss.name, main_program=main) test_exe = fluid.ParallelExecutor( use_cuda=True, main_program=test_program, - share_vars_from=train_exe, - use_nccl_allreduce=use_nccl_allreduce) + share_vars_from=train_exe) for i in xrange(5): test_loss, = test_exe.run([loss.name], feed=feed_dict) @@ -588,11 +562,8 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): "Train loss: " + str(train_loss) + "\n Test loss:" + str(test_loss)) - def test_parallel_testing_with_nccl_allreduce(self): - self.check_network_convergence(use_nccl_allreduce=True) - - def test_parallel_testing_with_reduce_op(self): - self.check_network_convergence(use_nccl_allreduce=False) + def test_parallel(self): + self.check_network_convergence() import paddle.dataset.conll05 as conll05 @@ -612,7 +583,7 @@ embedding_name = 'emb' def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, - is_sparse, use_nccl_allreduce, **ignored): + is_sparse, **ignored): # 8 features predicate_embedding = fluid.layers.embedding( input=predicate, @@ -681,7 +652,7 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, class TestCRFModel(unittest.TestCase): - def check_network_convergence(self, is_sparse, use_nccl_allreduce): + def check_network_convergence(self, is_sparse): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -729,10 +700,7 @@ class TestCRFModel(unittest.TestCase): exe = fluid.Executor(place) exe.run(startup) - pe = fluid.ParallelExecutor( - use_cuda=True, - loss_name=avg_cost.name, - use_nccl_allreduce=use_nccl_allreduce) + pe = fluid.ParallelExecutor(use_cuda=True, loss_name=avg_cost.name) feeder = fluid.DataFeeder( feed_list=[ @@ -749,11 +717,7 @@ class TestCRFModel(unittest.TestCase): fetch_list=[avg_cost.name]))[0] def test_update_sparse_parameter(self): - self.check_network_convergence(is_sparse=True, use_nccl_allreduce=False) - - def test_update_dense_parameter_with_nccl_allreduce(self): - self.check_network_convergence(is_sparse=False, use_nccl_allreduce=True) + self.check_network_convergence(is_sparse=True) - def test_update_dense_parameter_with_reduce_op(self): - self.check_network_convergence( - is_sparse=False, use_nccl_allreduce=False) + def test_update_dense_parameter(self): + self.check_network_convergence(is_sparse=False)