diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index c164996f5eae14767c9a232655a9365d7f7a9c9c..5871d63f2ee2f204593a2608bc69b9d2a5030d20 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -204,7 +204,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { AppendPass("all_reduce_deps_pass"); } - if (strategy_.enable_backward_optimizer_op_deps_) { + if (strategy_.num_trainers_ > 1 && !strategy_.async_mode_ && + !strategy_.is_distribution_ && + strategy_.enable_backward_optimizer_op_deps_) { VLOG(1) << "Add backward_op_deps_pass"; AppendPass("backward_optimizer_op_deps_pass"); } @@ -351,6 +353,12 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, } else if (pass->Type() == "mkldnn_placement_pass") { pass->Set("mkldnn_enabled_op_types", new std::unordered_set(mkldnn_enabled_op_types_)); + } else if (pass->Type() == "backward_optimizer_op_deps_pass") { + if (!use_cuda) { + VLOG(1) << "backward_optimizer_op_deps_pass is only supported on " + "GPU, skipped."; + continue; + } } VLOG(3) << "Start Apply Pass " << pass->Type(); graph = pass->Apply(graph); diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 09e7ca5f21df1c2b8ab3a1c319918cd8085cd1eb..14fb1783e9ba41779f096a28d600897a87460570 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -72,7 +72,7 @@ struct BuildStrategy { // Add dependency between backward ops and optimization ops, make sure that // all the backward ops are finished before running the optimization ops. // It might make the training speed of data parallelism faster. - bool enable_backward_optimizer_op_deps_{false}; + bool enable_backward_optimizer_op_deps_{true}; // TODO(dev-paddle): enable_sequential_execution depends on // kStaleProgramOpDescs, it is not appropriate, because kStaleProgramOpDescs // will be removed in the near future. diff --git a/paddle/fluid/framework/details/nccl_op_handle.h b/paddle/fluid/framework/details/nccl_op_handle.h index 2f425372234898860521570da8884497c995e9e2..56dacccafaace8b094e35e4a6a85bbd78c82e10a 100644 --- a/paddle/fluid/framework/details/nccl_op_handle.h +++ b/paddle/fluid/framework/details/nccl_op_handle.h @@ -59,7 +59,8 @@ class NCCLOpHandleBase : public OpHandleBase { VLOG(10) << "SetRunEnv " << " run_order:" << run_order - << ", use_hierarchical_allreduce:" << use_hierarchical_allreduce; + << ", use_hierarchical_allreduce:" << use_hierarchical_allreduce + << ", nccl_ctx_:" << nccl_ctxs_; if (nccl_ctxs_ == nullptr) { return; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 26e6fb6301a8fe3708411bac658eb7a99cd43759..e45b59259324a0eaa34347966e93ee7d09a15c13 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -113,9 +113,12 @@ class ParallelExecutorPrivate { auto nccl_id_var = scope->FindVar(var_name); if (nccl_id_var) { nccl_id = nccl_id_var->GetMutable(); + VLOG(10) << "find nccl_id_var:" << var_name << ", nccl_id:" << nccl_id; } else { nccl_id = new ncclUniqueId(); PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(nccl_id)); + VLOG(10) << "can't find nccl_id_var:" << var_name + << ", nccl_id:" << nccl_id; } flat_nccl_ids.push_back(nccl_id); @@ -170,8 +173,7 @@ class ParallelExecutorPrivate { } } - void InitOrGetNCCLCommunicator(framework::Scope *scope, - const BuildStrategy &bst) { + void InitOrGetNCCLCommunicator(framework::Scope *scope, BuildStrategy *bst) { const std::string var_name = "NCCLCommunicator"; auto var = scope->FindVar(var_name); if (var != nullptr) { @@ -183,9 +185,24 @@ class ParallelExecutorPrivate { return; } + if (bst->use_hierarchical_allreduce_) { + PADDLE_ENFORCE(bst->num_trainers_ > 1, "num_trainers:%llu < 1", + bst->num_trainers_); + PADDLE_ENFORCE(bst->hierarchical_allreduce_inter_nranks_ > 1, + "inter_nranks:%d < 1", + bst->hierarchical_allreduce_inter_nranks_); + PADDLE_ENFORCE( + (bst->num_trainers_ % bst->hierarchical_allreduce_inter_nranks_ == 0), + "num_trainers:%llu mod inter_nranks:%d != 0", bst->num_trainers_, + bst->hierarchical_allreduce_inter_nranks_); + + bst->hierarchical_allreduce_exter_nranks_ = + bst->num_trainers_ / bst->hierarchical_allreduce_inter_nranks_; + } + VLOG(1) << "not find " << var_name << " in scope, so recreate it!"; nccl_ctxs_ = scope->Var(var_name)->GetMutable(); - InitNCCLCtxs(scope, bst); + InitNCCLCtxs(scope, *bst); } #endif @@ -383,7 +400,7 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, if (member_->use_cuda_ && member_->nranks_ > 1) { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - member_->InitOrGetNCCLCommunicator(scope, member_->build_strategy_); + member_->InitOrGetNCCLCommunicator(scope, &member_->build_strategy_); // Initialize device context's nccl comm, will be used by normal // Operators like sync_batch_norm, and collective ops. diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index d79ff6e2b98a3fb3722198b67785b41a83fcb7cd..4680b070bb996c3a8ef76187b5634309d1dbb9b0 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -286,10 +286,14 @@ class NCCLCommunicator { bool NeedExterAllReduce() const { return h_exter_ctxs_.size() > 0; } NCCLContextMap *GetHierarchicalInterCtx(size_t run_order) const { + PADDLE_ENFORCE(h_inter_ctxs_.size() > 0, + "must init hierarchical ctxs first!"); return h_inter_ctxs_[run_order % h_inter_ctxs_.size()].get(); } NCCLContextMap *GetHierarchicalExterCtx(size_t run_order) const { + PADDLE_ENFORCE(h_exter_ctxs_.size() > 0, + "must init hierarchical ctxs first!"); return h_exter_ctxs_[run_order % h_exter_ctxs_.size()].get(); } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 4dbafc08b93d7acd7b30b04006499b7e244116c0..f0dcf91921a009b833bbf59d1864c4f265fe70d1 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1408,27 +1408,20 @@ All parameter, weight, gradient are variables in Paddle. [](BuildStrategy &self, int nccl_comm_num) { self.nccl_comm_num_ = nccl_comm_num; }) - .def_property("use_hierarchical_allreduce_", + .def_property("use_hierarchical_allreduce", [](const BuildStrategy &self) { return self.use_hierarchical_allreduce_; }, [](BuildStrategy &self, bool use) { self.use_hierarchical_allreduce_ = use; }) - .def_property("hierarchical_allreduce_inter_nranks_", + .def_property("hierarchical_allreduce_inter_nranks", [](const BuildStrategy &self) { return self.hierarchical_allreduce_inter_nranks_; }, [](BuildStrategy &self, int nranks) { self.hierarchical_allreduce_inter_nranks_ = nranks; }) - .def_property("hierarchical_allreduce_exter_nranks_", - [](const BuildStrategy &self) { - return self.hierarchical_allreduce_exter_nranks_; - }, - [](BuildStrategy &self, int nranks) { - self.hierarchical_allreduce_exter_nranks_ = nranks; - }) .def_property( "fuse_elewise_add_act_ops", diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index 06369ea6b701ec2edac781f56dd76a20cff6e6e4..91b126aaaf4c0cff1bf52f014f7015e3ff3e7011 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -154,7 +154,7 @@ def start_procs(args): for i in range(selected_gpus_num): if trainers_endpoints != "": trainers_endpoints += "," - trainers_endpoints += "%s:617%d" % (ip, i) + trainers_endpoints += "%s:%d" % (ip, args.started_port + i) nranks = num_nodes * selected_gpus_num diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index a13114577bdf910c85accad6f27929b0c0393107..e540dbfe21798b163d792b049b88643c223d6a53 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -288,9 +288,8 @@ class CompiledProgram(object): if self._program: self._build_strategy.nccl_comm_num = self._program._nccl_comm_num - self._build_strategy.use_hierarchical_allreduce_ = self._program._use_hierarchical_allreduce - self._build_strategy.hierarchical_allreduce_inter_nranks_ = self._program._hierarchical_allreduce_inter_nranks - self._build_strategy.hierarchical_allreduce_exter_nranks_ = self._program._hierarchical_allreduce_exter_nranks + self._build_strategy.use_hierarchical_allreduce = self._program._use_hierarchical_allreduce + self._build_strategy.hierarchical_allreduce_inter_nranks = self._program._hierarchical_allreduce_inter_nranks if self._build_strategy.sync_batch_norm: self._build_strategy.enable_sequential_execution = True diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 49d75eface80581feede62b07940cfc76c2153ae..22d30cbcf574ebacf06a3fc3d725f39a3c8777ee 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -2844,7 +2844,6 @@ class Program(object): self._nccl_comm_num = 1 self._use_hierarchical_allreduce = False self._hierarchical_allreduce_inter_nranks = 0 - self._hierarchical_allreduce_exter_nranks = 0 # @deprecated(the python memory optimize transpiler is deprecated) # whether the program is optimized by memory_optimize_transpiler diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index a1f59d368d69d2e9eebe0cd6e7abfdeebe2f83ec..71044f055243fed18bdffeab81ffadbb30d5b97b 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -129,6 +129,9 @@ class TestDistRunnerBase(object): config = fluid.DistributeTranspilerConfig() config.mode = "nccl2" config.nccl_comm_num = args.nccl_comm_num + if args.use_hallreduce: + config.use_hierarchical_allreduce = True + config.hierarchical_allreduce_inter_nranks = args.hallreduce_inter_nranks my_print( type(self).__name__, "begin to run transpile on trainer with nccl2 mode") @@ -198,15 +201,6 @@ class TestDistRunnerBase(object): exec_strategy=exec_strategy) my_print(type(self).__name__, "program compiled with data parallel") - if args.use_cuda and args.update_method == "nccl2": - # it just for test share_vars_from feature. - test_exe = fluid.ParallelExecutor( - use_cuda=True, - loss_name=avg_cost.name, - build_strategy=build_stra, - main_program=test_program, - share_vars_from=binary._executor) - feed_var_list = [ var for var in trainer_prog.global_block().vars.values() if var.is_data @@ -327,8 +321,10 @@ def runtime_main(test_class): parser.add_argument('--trainer_id', type=int, required=False, default=0) parser.add_argument('--trainers', type=int, required=False, default=1) parser.add_argument('--nccl_comm_num', type=int, required=False, default=1) + parser.add_argument('--enable_backward_deps', action='store_true') + parser.add_argument('--use_hallreduce', action='store_true') parser.add_argument( - '--enable_backward_deps', type=bool, required=False, default=1) + '--hallreduce_inter_nranks', type=int, required=False, default=2) parser.add_argument( '--current_endpoint', type=str, required=False, default="") parser.add_argument('--sync_mode', action='store_true') @@ -407,9 +403,10 @@ class TestDistBase(unittest.TestCase): self._use_dgc = False self._dygraph = False self._nccl_comm_num = 1 + self._enable_backward_deps = False + self._use_hallreduce = False self._setup_config() self._after_setup_config() - self._enable_backward_deps = False def _find_free_port(self): def __free_port(): @@ -597,118 +594,97 @@ class TestDistBase(unittest.TestCase): ps0.terminate() ps1.terminate() - # print server log - ''' - with open("/tmp/ps0_err.log", "rb") as fn: - sys.stderr.write("ps0 stderr: %s\n" % fn.read()) - with open("/tmp/ps1_err.log", "rb") as fn: - sys.stderr.write("ps1 stderr: %s\n" % fn.read()) - ''' - - # print log - ''' - with open("/tmp/tr0_err.log", "rb") as fn: - sys.stderr.write('trainer 0 stderr: %s\n' % fn.read()) - with open("/tmp/tr1_err.log", "rb") as fn: - sys.stderr.write('trainer 1 stderr: %s\n' % fn.read()) - ''' - return pickle.loads(tr0_out), pickle.loads(tr1_out) - def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer, - check_error_log): - # NOTE: we reuse ps_endpoints as nccl2 worker endpoints - worker_endpoints = self._ps_endpoints.split(",") - w0_ep, w1_ep = worker_endpoints - if nccl2_reduce_layer: - update_method = "nccl2_reduce_layer" - else: - update_method = "nccl2" - - tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f" - tr0_cmd = tr_cmd % \ - (self._python_interp, model, self._ps_endpoints, - 0, w0_ep, update_method, self._lr) - tr1_cmd = tr_cmd % \ + def _get_nccl2_trainer_cmd(self, model, ep, update_method, trainer_id, + trainer_num): + env = {} + tr_cmd = "%s -u %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f" + tr_cmd = tr_cmd % \ (self._python_interp, model, self._ps_endpoints, - 1, w1_ep, update_method, self._lr) + trainer_id, ep, update_method, self._lr) if self._mem_opt: - tr0_cmd += " --mem_opt" - tr1_cmd += " --mem_opt" + tr_cmd += " --mem_opt" if self._use_reduce: - tr0_cmd += " --use_reduce" - tr1_cmd += " --use_reduce" + tr_cmd += " --use_reduce" if self._use_reader_alloc: - tr0_cmd += " --use_reader_alloc" - tr1_cmd += " --use_reader_alloc" + tr_cmd += " --use_reader_alloc" if self.__use_cuda: - tr0_cmd += " --use_cuda" - tr1_cmd += " --use_cuda" - env0 = { - "CUDA_VISIBLE_DEVICES": "0", - # for test nccl2 layer - "PADDLE_TRAINERS_NUM": "2", - "PADDLE_TRAINER_ID": "0" - } - env1 = { - "CUDA_VISIBLE_DEVICES": "1", - "PADDLE_TRAINERS_NUM": "2", - "PADDLE_TRAINER_ID": "1" - } + tr_cmd += " --use_cuda" + env.update({ + "CUDA_VISIBLE_DEVICES": "{}".format(trainer_id), + "PADDLE_TRAINERS_NUM": "{}".format(trainer_num), + "PADDLE_TRAINER_ID": "{}".format(trainer_id) + }) else: - env0 = {'CPU_NUM': '1'} - env1 = {'CPU_NUM': '1'} + env.update({'CPU_NUM': '1'}) if self._use_dgc: - tr0_cmd += " --use_dgc" - tr1_cmd += " --use_dgc" + tr_cmd += " --use_dgc" + + if self._mp_mode: + env = {"FLAGS_selected_gpus": "{}".format(trainer_id)} if self._nccl_comm_num > 1: - tr0_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num) - tr1_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num) + tr_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num) - if self._mp_mode: - env0 = {"FLAGS_selected_gpus": "0"} - env1 = {"FLAGS_selected_gpus": "1"} + if self._use_hallreduce: + tr_cmd += " --use_hallreduce --hallreduce_inter_nranks 2" if self._enable_backward_deps: - tr0_cmd += " --enable_backward_deps 1" - tr1_cmd += " --enable_backward_deps 1" + tr_cmd += " --enable_backward_deps" - env0.update(envs) - env1.update(envs) + return tr_cmd, env - print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0)) - print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1)) - tr0_pipe = open("/tmp/tr0_err.log", "wb") - tr1_pipe = open("/tmp/tr1_err.log", "wb") + def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer, + check_error_log): + if self._use_hallreduce: + self._ps_endpoints = "" + for i in range(0, 4): + self._ps_endpoints += "127.0.0.1:%s," % (self._find_free_port()) + self._ps_endpoints = self._ps_endpoints[:-1] - my_print(type(self).__name__, "going to start process 0 with nccl2") - tr0_proc = subprocess.Popen( - tr0_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=tr0_pipe, - env=env0) - my_print(type(self).__name__, "going to start process 1 with nccl2") - tr1_proc = subprocess.Popen( - tr1_cmd.strip().split(" "), - stdout=subprocess.PIPE, - stderr=tr1_pipe, - env=env1) + # NOTE: we reuse ps_endpoints as nccl2 worker endpoints + worker_endpoints = self._ps_endpoints.split(",") + if nccl2_reduce_layer: + update_method = "nccl2_reduce_layer" + else: + update_method = "nccl2" - tr0_out, tr0_err = tr0_proc.communicate() - tr1_out, tr1_err = tr1_proc.communicate() + trainer_num = len(worker_endpoints) - # close trainer file - tr0_pipe.close() - tr1_pipe.close() + procs = [] + pipes = [] + for i in range(0, trainer_num): + tr_cmd, tr_env = self._get_nccl2_trainer_cmd( + model, worker_endpoints[i], update_method, i, trainer_num) + tr_env.update(envs) + print("use_hallreduce:{} tr_cmd:{}, env: {}".format( + self._use_hallreduce, tr_cmd, tr_env)) - # print log - sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err) - sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err) + tr_pipe = open("/tmp/tr{}_err.log".format(i), "wb") - return pickle.loads(tr0_out), pickle.loads(tr1_out) + my_print( + type(self).__name__, + "going to start process {} with nccl2".format(i)) + tr_proc = subprocess.Popen( + tr_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr_pipe, + env=tr_env) + + procs.append(tr_proc) + pipes.append(tr_pipe) + + outs = [] + for i in range(0, trainer_num): + tr_out, tr_err = procs[i].communicate() + outs.append(tr_out) + pipes[i].close() + sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err)) + + return pickle.loads(outs[0]), pickle.loads(outs[1]) def check_with_place(self, model_file, @@ -724,13 +700,14 @@ class TestDistBase(unittest.TestCase): "FLAGS_rpc_deadline": "30000", # 5sec to fail fast "FLAGS_cudnn_deterministic": "1", "http_proxy": "", - "NCCL_P2P_DISABLE": "1" + "NCCL_P2P_DISABLE": "1", + "NCCL_SHM_DISABLE": "1" } required_envs.update(need_envs) if check_error_log: - required_envs["GLOG_v"] = "3" + required_envs["GLOG_v"] = "10" required_envs["GLOG_logtostderr"] = "1" local_losses\ diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist_nccl.py b/python/paddle/fluid/tests/unittests/test_dist_mnist_nccl.py index 65df03b402ac6caa30866471797783dcb94f7f04..8718dce5ee53c2234eba41b635ffc8609cc962fe 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist_nccl.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist_nccl.py @@ -72,5 +72,19 @@ class TestDistMnistNCCL2BackWardDeps(TestDistBase): self.check_with_place("dist_mnist.py", delta=1e-5) +class TestDistMnistNCCL2HAllreduce(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + self._use_reader_alloc = False + self._nccl2_mode = True + self._use_hallreduce = True + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place("dist_mnist.py", delta=1e-5) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index ece7ade6932300ac6340feb00edb03663354518f..722531abe4be1a252847d3242161e4ae10b2d640 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -172,8 +172,6 @@ class DistributeTranspilerConfig(object): use_hierarchical_allreduce = False #Nccl ranks in a node when use hierarchical allreduce, it's setted to gpu cards' number in most cases. hierarchical_allreduce_inter_nranks = 0 - #Nccl ranks bewteen nodes when use hierarchical allreduce, it's setted to nodes number. - hierarchical_allreduce_exter_nranks = 0 # if mode is collective # supported modes: sgd, local_sgd @@ -428,10 +426,23 @@ class DistributeTranspiler(object): self.origin_program._trainers_endpoints = trainers.split(",") self.origin_program._nccl_comm_num = self.config.nccl_comm_num self.origin_program._use_hierarchical_allreduce = self.config.use_hierarchical_allreduce - self.origin_program._hierarchical_allreduce_inter_nranks = \ - int(self.config.hierarchical_allreduce_inter_nranks) - self.origin_program._hierarchical_allreduce_exter_nranks = \ - int(self.config.hierarchical_allreduce_exter_nranks) + # check use_hierarchical_allreduce options + if self.config.use_hierarchical_allreduce: + trainers_num = len(self.origin_program._trainers_endpoints) + # selected automaticly + if self.config.hierarchical_allreduce_inter_nranks <= 1: + self.config.hierarchical_allreduce_inter_nranks = fluid.core.get_cuda_device_count( + ) + + assert trainers_num > self.config.hierarchical_allreduce_inter_nranks, \ + "trainers_num:{} < hierarchical_allreduce_inter_nranks:{}".format(trainers_num, self.config.hierarchical_allreduce_inter_nranks) + + assert trainers_num % self.config.hierarchical_allreduce_inter_nranks == 0, \ + "trainers_num:{} mod hierarchical_allreduce_inter_nranks:{} != 0".format(trainers_num, self.config.hierarchical_allreduce_inter_nranks) + + self.origin_program._hierarchical_allreduce_inter_nranks = \ + int(self.config.hierarchical_allreduce_inter_nranks) + self._transpile_nccl2( trainer_id, trainers,