diff --git a/paddle/fluid/framework/fleet/ascend_wrapper.h b/paddle/fluid/framework/fleet/ascend_wrapper.h index cba98699fc5e40e758346d32a74406ad9a3c8bde..27a9b47e6309101d75127d1176eccb9868521e87 100644 --- a/paddle/fluid/framework/fleet/ascend_wrapper.h +++ b/paddle/fluid/framework/fleet/ascend_wrapper.h @@ -50,29 +50,35 @@ class AscendInstance { virtual ~AscendInstance() {} AscendInstance() {} - std::map GetDefaultInitOptions() { + std::map _GetDefaultInitOptions() { std::map init_options; init_options["ge.exec.deviceId"] = "0"; init_options["ge.graphRunMode"] = "1"; return init_options; } - std::map GetDefaultInitSessionOptions() { + std::map _GetDefaultInitSessionOptions() { std::map init_options; - init_options["a"] = "b"; - init_options["ge.trainFlag"] = "1"; + //init_options["a"] = "b"; + //init_options["ge.trainFlag"] = "1"; return init_options; } - ge::Status InitGEForUT() { return ge::GEInitialize(GetDefaultInitOptions()); } + ge::Status InitGEForUT() { return ge::GEInitialize(_GetDefaultInitOptions()); } void InitGlobalResouces() { - LOG(INFO) << "Begin InitGlobalResouces"; - session_.reset(new ge::Session(GetDefaultInitSessionOptions())); + LOG(INFO) << "Begin ascend InitGlobalResouces"; + session_.reset(new ge::Session(_GetDefaultInitSessionOptions())); if (session_ == nullptr) { LOG(FATAL) << "new session error:" << session_; } - LOG(INFO) << "End InitGlobalResouces"; + LOG(INFO) << "End ascend InitGlobalResouces"; + } + + void DestroyGlobalResouces() { + LOG(INFO) << "Begin ascend DestroyGlobalResouces"; + session_ = nullptr; + LOG(INFO) << "Begin ascend DestroyGlobalResouces"; } static std::shared_ptr GetInstance() { diff --git a/paddle/fluid/pybind/ascend_wrapper_py.cc b/paddle/fluid/pybind/ascend_wrapper_py.cc index cfab436a6446a6c5258143be64dc6c6253d08335..d21fd838f48dbf986feb77ffe4d9e056f872e1fd 100644 --- a/paddle/fluid/pybind/ascend_wrapper_py.cc +++ b/paddle/fluid/pybind/ascend_wrapper_py.cc @@ -55,6 +55,9 @@ void BindAscendWrapper(py::module *m) { .def("init_global_resources", &framework::AscendInstance::InitGlobalResouces, py::call_guard()) + .def("destroy_global_resources", + &framework::AscendInstance::DestroyGlobalResouces, + py::call_guard()) .def("add_ascend_subgraph", &framework::AscendInstance::AddAscendSubgraph, py::call_guard()); } diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 2a9fa070f67a2e13dc7b2a70d4c93d523c54133a..4bf1edf16363e13ad43fa5252635b741c7a64256 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -121,8 +121,7 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra "--run_mode", type=str, default="collective", - help="run mode of job, can be:collective/ps/ps-heter" - ) + help="run mode of job, can be:collective/ps/ps-heter") base_group.add_argument( "--ascend_npus", @@ -133,7 +132,6 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra "--ascend_npus=\"0,1,2,3\" will launch four training processes each bound to one gpu." ) - base_group.add_argument("--selected_gpus", dest="gpus") base_group.add_argument( @@ -250,6 +248,9 @@ def launch_collective(args): log_dir=args.log_dir, envs=global_envs) + for idx, proc in enumerate(procs): + print("launch proc_id:{} idx:{}".format(proc.proc.pid, idx)) + while True: alive = watch_local_trainers(procs, cluster.trainers_nranks()) diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py index 71b22d9519d6bff6b13cfe41df863051d57254c3..978899604eaf8c2ee45c03f866f2d5a081a7e502 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py @@ -182,9 +182,14 @@ class AscendOptimizer(Optimizer): def __init__(self, optimizer, fetch_list=[]): self.inner_opt = optimizer self.fetch_list = fetch_list + self.ascend_instance = None def __del__(self): + print("begin AscendOptimizer del") + if self.ascend_instance is not None: + self.ascend_instance.destroy_global_resources() core.ge_finalize() + print("end AscendOptimizer del") def _can_apply(self): if not self.user_defined_strategy.ascend: diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py index 8e2f5b60ab290201af349c374ea9b3b88f7c9ab5..d7ba61a8e40144fd5f75156788a95c4b4cb235ea 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py @@ -16,6 +16,7 @@ from paddle.fluid.optimizer import Optimizer import paddle.fluid.core as core import numpy as np from paddle.distributed import fleet +from functools import reduce registerd_op = {## forwards "elementwise_add": "AddParser",