diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h new file mode 100644 index 0000000000000000000000000000000000000000..d6f9c547d8ab0a875311cc7d549a8db366e9b256 --- /dev/null +++ b/paddle/fluid/framework/details/build_strategy.h @@ -0,0 +1,36 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace paddle { +namespace framework { +namespace details { + +struct BuildStrategy { + enum class ReduceStrategy { kAllReduce = 0, kReduce = 1 }; + + enum class GradientScaleStrategy { + kCoeffNumDevice = 0, + kOne = 1, + kCustomized = 2, + }; + + ReduceStrategy reduce_{ReduceStrategy::kReduce}; + GradientScaleStrategy gradient_scale_{GradientScaleStrategy::kCoeffNumDevice}; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index 4755559f8d0c5b5fdeb6b56a28fff8a32ea7f82f..45bad58145a1144dfabdd3e15b38d972d57b105e 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -37,31 +37,26 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( const std::string &loss_var_name, const std::unordered_set ¶ms, const std::vector &local_scopes, - platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards) + platform::NCCLContextMap *nccl_ctxs, const BuildStrategy &strategy) : loss_var_name_(loss_var_name), places_(places), local_scopes_(local_scopes), nccl_ctxs_(nccl_ctxs), - balance_parameter_opt_between_cards_( - balance_parameter_opt_between_cards) { + strategy_(strategy) { #else MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( const std::vector &places, const std::string &loss_var_name, const std::unordered_set ¶ms, - const std::vector &local_scopes, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards) + const std::vector &local_scopes, const BuildStrategy &strategy) : loss_var_name_(loss_var_name), places_(places), local_scopes_(local_scopes), - balance_parameter_opt_between_cards_( - balance_parameter_opt_between_cards) { + strategy_(strategy) { #endif for (auto &p : params) { grad_names_.insert(GradVarName(p)); } - use_default_grad_scale_ = use_default_grad_scale; } void MultiDevSSAGraphBuilder::CreateOpHandleIOs(SSAGraph *result, @@ -146,7 +141,8 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( CreateComputationalOps(&result, *op, 1); } else if (IsScaleLossOp(*op)) { // user can customize loss@grad if not use_default_grad_scale_ - if (use_default_grad_scale_) { + if (strategy_.gradient_scale_ != + BuildStrategy::GradientScaleStrategy::kCustomized) { CreateScaleLossGradOp(&result); } is_forwarding = false; @@ -165,19 +161,22 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( // broadcast, and each gradient is only broadcast once. for (auto &og : op->OutputArgumentNames()) { if (IsParameterGradientOnce(og, &og_has_been_broadcast)) { - if (balance_parameter_opt_between_cards_) { - CreateReduceOp(&result, og, cur_device_id); - var_name_on_devices[cur_device_id].emplace(og); - bcast_var_name_set[cur_device_id].emplace( - og.substr(0, og.size() - strlen(kGradVarSuffix))); - cur_device_id = (cur_device_id + 1) % places_.size(); - } else { - if (IsSparseGradient(var_types, og)) { - CreateReduceOp(&result, og, 0); - CreateBroadcastOp(&result, og, 0); - } else { - InsertNCCLAllReduceOp(&result, og); - } + switch (strategy_.reduce_) { + case BuildStrategy::ReduceStrategy::kReduce: + CreateReduceOp(&result, og, cur_device_id); + var_name_on_devices[cur_device_id].emplace(og); + bcast_var_name_set[cur_device_id].emplace( + og.substr(0, og.size() - strlen(kGradVarSuffix))); + cur_device_id = (cur_device_id + 1) % places_.size(); + break; + case BuildStrategy::ReduceStrategy::kAllReduce: + if (IsSparseGradient(var_types, og)) { + CreateReduceOp(&result, og, 0); + CreateBroadcastOp(&result, og, 0); + } else { + InsertNCCLAllReduceOp(&result, og); + } + break; } } } @@ -303,7 +302,7 @@ bool MultiDevSSAGraphBuilder::IsParameterGradientOnce( int MultiDevSSAGraphBuilder::GetOpDeviceID( const std::vector> &var_name_on_devices, const OpDesc &op) const { - if (!balance_parameter_opt_between_cards_) { + if (strategy_.reduce_ != BuildStrategy::ReduceStrategy::kReduce) { return -1; } diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index 3a3e9e3b8538f52962e6a5ccd1a177e58d6c2f6b..4f708521884247fc013f0ae336ab683c3fe7ef2f 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -17,6 +17,7 @@ #include #include +#include "paddle/fluid/framework/details/build_strategy.h" #include "paddle/fluid/framework/details/ssa_graph_builder.h" namespace paddle { @@ -36,15 +37,13 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::unordered_set ¶ms, const std::vector &local_scopes, platform::NCCLContextMap *nccl_ctxs, - bool use_default_grad_scale, - bool balance_parameter_opt_between_cards); + const BuildStrategy &strategy); #else MultiDevSSAGraphBuilder(const std::vector &places, const std::string &loss_var_name, const std::unordered_set ¶ms, const std::vector &local_scopes, - bool use_default_grad_scale, - bool balance_parameter_opt_between_cards); + const BuildStrategy &strategy); #endif std::unique_ptr Build(const ProgramDesc &program) const override; @@ -62,8 +61,6 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { #ifdef PADDLE_WITH_CUDA platform::NCCLContextMap *nccl_ctxs_; #endif - bool balance_parameter_opt_between_cards_; - bool use_default_grad_scale_; bool IsScaleLossOp(const OpDesc &op) const; @@ -105,6 +102,9 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { bool IsSparseGradient( const std::unordered_map &var_types, const std::string &og) const; + + private: + BuildStrategy strategy_; }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index cdfd0a8c07fc7790d277d4f37c65b576459b8e49..392b13d3dc75bf8cf3e593f7bcd36ee006b5cbb9 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -57,8 +57,7 @@ ParallelExecutor::ParallelExecutor( const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, - bool use_default_grad_scale, bool balance_parameter_opt_between_cards, - const ExecutionStrategy &exec_strategy) + const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; @@ -93,12 +92,11 @@ ParallelExecutor::ParallelExecutor( #ifdef PADDLE_WITH_CUDA details::MultiDevSSAGraphBuilder builder( member_->places_, loss_var_name, params, member_->local_scopes_, - member_->nccl_ctxs_.get(), use_default_grad_scale, - balance_parameter_opt_between_cards); + member_->nccl_ctxs_.get(), build_strategy); #else - details::MultiDevSSAGraphBuilder builder( - member_->places_, loss_var_name, params, member_->local_scopes_, - use_default_grad_scale, balance_parameter_opt_between_cards); + details::MultiDevSSAGraphBuilder builder(member_->places_, loss_var_name, + params, member_->local_scopes_, + build_strategy); #endif auto graph = builder.Build(main_program); diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index ab50509124751aa461c6dd4f099abc727fc07d98..121e74293c5874402a4184d758f408c747ee0fc5 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once +#include #include #include #include @@ -29,6 +30,7 @@ namespace framework { class ParallelExecutorPrivate; +using details::BuildStrategy; using details::ExecutionStrategy; class ParallelExecutor { @@ -41,9 +43,8 @@ class ParallelExecutor { const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, - bool use_default_grad_scale, - bool balance_parameter_opt_between_cards, - const ExecutionStrategy &exec_strategy); + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy); ~ParallelExecutor(); diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 0013597fd516d15c7d502370eec77e1a6a5dca88..ec1682a44e2d9ecdf5ff7e6969f84f79254f86a7 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -50,7 +50,7 @@ class NCCLGroupGuard { } inline ~NCCLGroupGuard() { - PADDLE_ENFORCE(dynload::ncclGroupEnd()); + CHECK_EQ(dynload::ncclGroupEnd(), ncclSuccess); NCCLMutex().unlock(); } }; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index c456bc1a71deb9cbe7da784cc5b18772a6ea0fa3..ee2c5b904499ae3bd94ffe96da7100cdf53865ca 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -494,6 +494,7 @@ All parameter, weight, gradient are variables in Paddle. m.def("disable_profiler", platform::DisableProfiler); m.def("reset_profiler", platform::ResetProfiler); + // -- python binds for parallel executor. py::class_ pe(m, "ParallelExecutor"); py::class_(pe, "ExecutionStrategy") .def(py::init()) @@ -515,12 +516,38 @@ All parameter, weight, gradient are variables in Paddle. [](ExecutionStrategy &self, bool allow_op_delay) { self.allow_op_delay_ = allow_op_delay; }); + py::class_ build_strategy(pe, "BuildStrategy"); + + py::enum_(build_strategy, "ReduceStrategy") + .value("Reduce", BuildStrategy::ReduceStrategy::kReduce) + .value("AllReduce", BuildStrategy::ReduceStrategy::kAllReduce); + py::enum_(build_strategy, + "GradientScaleStrategy") + .value("CoeffNumDevice", + BuildStrategy::GradientScaleStrategy::kCoeffNumDevice) + .value("One", BuildStrategy::GradientScaleStrategy::kOne) + .value("Customized", BuildStrategy::GradientScaleStrategy::kCustomized); + + build_strategy.def(py::init()) + .def_property( + "reduce_strategy", + [](const BuildStrategy &self) { return self.reduce_; }, + [](BuildStrategy &self, BuildStrategy::ReduceStrategy strategy) { + self.reduce_ = strategy; + }) + .def_property( + "gradient_scale_strategy", + [](const BuildStrategy &self) { return self.gradient_scale_; }, + [](BuildStrategy &self, + BuildStrategy::GradientScaleStrategy strategy) { + self.gradient_scale_ = strategy; + }); pe.def(py::init &, const std::unordered_set &, const std::unordered_set &, const ProgramDesc &, - const std::string &, Scope *, std::vector &, bool, - bool, const ExecutionStrategy &>()) + const std::string &, Scope *, std::vector &, + const ExecutionStrategy &, const BuildStrategy &>()) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index ef7a5864759d22b2dd852c8bc12b9a1710c93bad..67aa5ec9979dbe3fcdb037e38ad94329d294cdcc 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -52,12 +52,14 @@ import clip import profiler import unique_name import recordio_writer -from parallel_executor import ParallelExecutor, ExecutionStrategy +import parallel_executor +from parallel_executor import * Tensor = LoDTensor __all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + \ - trainer.__all__ + inferencer.__all__ + transpiler.__all__ + [ + trainer.__all__ + inferencer.__all__ + transpiler.__all__ + \ + parallel_executor.__all__ + [ 'io', 'initializer', 'layers', @@ -79,8 +81,6 @@ __all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + \ 'profiler', 'unique_name', 'recordio_writer', - 'ParallelExecutor', - 'ExecutionStrategy', ] diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 69ea9ee335efe3c612002208f831f5704a91fd51..deab761f72a3ff77ea978d8d81f2105b4463b4e0 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -19,9 +19,10 @@ import executor import warnings import sys -__all__ = ['ParallelExecutor', 'ExecutionStrategy'] +__all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy +BuildStrategy = core.ParallelExecutor.BuildStrategy class ParallelExecutor(object): @@ -30,9 +31,8 @@ class ParallelExecutor(object): loss_name=None, main_program=None, share_vars_from=None, - use_default_grad_scale=True, - balance_parameter_opt_between_cards=False, exec_strategy=None, + build_strategy=None, **kwargs): """ ParallelExecutor can run program in parallel. @@ -81,7 +81,16 @@ class ParallelExecutor(object): "Setting {0} by constructor is deprecated. Use " \ "strategy=ExecutionStrategy(); strategy.{0}=xxx; " \ "pe=ParallelExecutor(exec_strategy=strategy) " \ - "instead.\n " + "instead.\n ".format(key) + elif key in dir(BuildStrategy): + err_msg += \ + "Setting {0} by constructor is deprecated. Use " \ + "strategy=BuildStrategy(); See help(" \ + "paddle.fluid.ParallelExecutor.BuildStrategy) \n".format( + key) + else: + err_msg += "Setting {0} by constructor is deprecated. Use strategy.\n".format( + key) raise ValueError(err_msg) self._places = [] @@ -116,6 +125,9 @@ class ParallelExecutor(object): exec_strategy.num_threads = min( len(self._places) * 2, multiprocessing.cpu_count()) + if build_strategy is None: + build_strategy = BuildStrategy() + main = main_program main = main if main else framework.default_main_program() scope = executor.global_scope() @@ -139,9 +151,8 @@ class ParallelExecutor(object): p.name for p in main.global_block().iter_parameters() if not p.stop_gradient ]), - set(self.persistable_vars), main.desc, loss_name - if loss_name else '', scope, local_scopes, use_default_grad_scale, - balance_parameter_opt_between_cards, exec_strategy) + set(self.persistable_vars), main.desc, loss_name if loss_name else + '', scope, local_scopes, exec_strategy, build_strategy) self.scope = scope diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 4173ad1925dc2a33cedd683177994b253929bf4e..cd95ee47fdee14d4865e814393bd44f03baa40bb 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -234,12 +234,16 @@ class TestParallelExecutorBase(unittest.TestCase): startup_exe.run(startup) exec_strategy = fluid.ExecutionStrategy() exec_strategy.allow_op_delay = allow_op_delay + + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce if balance_parameter_opt_between_cards else fluid.BuildStrategy.ReduceStrategy.AllReduce + if use_parallel_executor: exe = fluid.ParallelExecutor( True, loss_name=loss.name, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards, - exec_strategy=exec_strategy) + exec_strategy=exec_strategy, + build_strategy=build_strategy) else: exe = fluid.Executor(place=place) @@ -548,7 +552,7 @@ class TestTransformer(TestParallelExecutorBase): class ParallelExecutorTestingDuringTraining(unittest.TestCase): - def check_network_convergence(self, balance_parameter_opt_between_cards): + def check_network_convergence(self, build_strategy=None): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -571,15 +575,13 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): use_cuda=True, loss_name=loss.name, main_program=main, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) test_exe = fluid.ParallelExecutor( use_cuda=True, main_program=test_program, share_vars_from=train_exe, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) for i in xrange(5): test_loss, = test_exe.run([loss.name], feed=feed_dict) @@ -594,10 +596,14 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): str(test_loss)) def test_parallel_testing(self): - self.check_network_convergence(False) + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence(build_strategy) def test_parallel_testing_with_new_strategy(self): - self.check_network_convergence(True) + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce + self.check_network_convergence(build_strategy) import paddle.dataset.conll05 as conll05 @@ -617,7 +623,7 @@ embedding_name = 'emb' def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, - is_sparse, balance_parameter_opt_between_cards, **ignored): + is_sparse, **ignored): # 8 features predicate_embedding = fluid.layers.embedding( input=predicate, @@ -686,9 +692,7 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, class TestCRFModel(unittest.TestCase): - def check_network_convergence(self, - is_sparse, - balance_parameter_opt_between_cards=False): + def check_network_convergence(self, is_sparse, build_strategy=None): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -739,8 +743,7 @@ class TestCRFModel(unittest.TestCase): pe = fluid.ParallelExecutor( use_cuda=True, loss_name=avg_cost.name, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) feeder = fluid.DataFeeder( feed_list=[ @@ -756,19 +759,29 @@ class TestCRFModel(unittest.TestCase): pe.run(feed=feeder.feed(cur_batch), fetch_list=[avg_cost.name]))[0] - def test_update_sparse_parameter(self): - self.check_network_convergence(is_sparse=True) + def test_update_sparse_parameter_all_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence( + is_sparse=True, build_strategy=build_strategy) - def test_update_dense_parameter(self): - self.check_network_convergence(is_sparse=False) + def test_update_dense_parameter_all_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence( + is_sparse=False, build_strategy=build_strategy) - def test_update_sparse_parameter_with_new_strategy(self): + def test_update_sparse_parameter_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=False, balance_parameter_opt_between_cards=True) + is_sparse=False, build_strategy=build_strategy) - def test_update_dense_parameter_with_new_strategy(self): + def test_update_dense_parameter_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=False, balance_parameter_opt_between_cards=True) + is_sparse=False, build_strategy=build_strategy) # test fetch all the variables of global_block