From f9c97dd7287119f7546c90d9813ec825b3c956d2 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 21 Jan 2021 10:29:10 +0800 Subject: [PATCH] Add distribution supported (#30578) Add distribution supported --- cmake/external/ascend.cmake | 15 ++-- paddle/fluid/memory/allocation/CMakeLists.txt | 2 + .../fluid/operators/collective/CMakeLists.txt | 5 ++ .../operators/collective/gen_nccl_id_op.cc | 17 ++++ paddle/fluid/platform/CMakeLists.txt | 10 +++ paddle/fluid/platform/ascend_npu_info.cc | 35 +++++++++ paddle/fluid/platform/ascend_npu_info.h | 28 +++++++ paddle/fluid/pybind/ascend_wrapper_py.cc | 8 ++ paddle/fluid/pybind/ascend_wrapper_py.h | 1 + paddle/fluid/pybind/pybind.cc | 11 +++ python/paddle/distributed/fleet/__init__.py | 5 ++ .../distributed/fleet/base/fleet_base.py | 6 ++ .../distributed/fleet/base/role_maker.py | 8 +- python/paddle/distributed/fleet/launch.py | 41 ++++++++-- .../paddle/distributed/fleet/launch_utils.py | 77 +++++++++++++------ .../fleet/meta_optimizers/ascend/__init__.py | 0 .../ascend/ascend_optimizer.py | 61 +++++++-------- .../meta_optimizers/ascend/ascend_parser.py | 66 ++++++++++++---- .../graph_execution_optimizer.py | 5 +- .../fluid/tests/unittests/CMakeLists.txt | 2 + .../ascend_multi_process_collective.py | 37 +++++++++ .../unittests/test_fleet_launch_ascend.sh | 59 ++++++++++++++ .../fluid/transpiler/ascend_transpiler.py | 64 +++++++++++++++ python/setup.py.in | 1 + 24 files changed, 476 insertions(+), 88 deletions(-) create mode 100644 paddle/fluid/platform/ascend_npu_info.cc create mode 100644 paddle/fluid/platform/ascend_npu_info.h create mode 100644 python/paddle/distributed/fleet/meta_optimizers/ascend/__init__.py create mode 100644 python/paddle/fluid/tests/unittests/ascend_multi_process_collective.py create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_launch_ascend.sh create mode 100644 python/paddle/fluid/transpiler/ascend_transpiler.py diff --git a/cmake/external/ascend.cmake b/cmake/external/ascend.cmake index e496ff52155..656007a5b96 100644 --- a/cmake/external/ascend.cmake +++ b/cmake/external/ascend.cmake @@ -37,13 +37,18 @@ set(ATLAS_ACL_DIR ${ASCEND_DIR}/ascend-toolkit/latest/acllib/lib64) set(ATLAS_ATC_DIR ${ASCEND_DIR}/ascend-toolkit/latest/atc/lib64) set(ATLAS_MS_RUNTIME_PATH ${ATLAS_RUNTIME_DIR} ${ATLAS_ACL_DIR} ${ATLAS_ATC_DIR}) -set(atlas_graph ${ATLAS_RUNTIME_DIR}/libgraph.so) -set(atlas_ge_runner ${ATLAS_RUNTIME_DIR}/libge_runner.so) +set(atlas_graph_lib ${ATLAS_RUNTIME_DIR}/libgraph.so) +set(atlas_ge_runner_lib ${ATLAS_RUNTIME_DIR}/libge_runner.so) +set(atlas_acl_lib ${ATLAS_RUNTIME_DIR}/libascendcl.so) INCLUDE_DIRECTORIES(${ATLAS_RUNTIME_INC_DIR}) ADD_LIBRARY(ascend_ge SHARED IMPORTED GLOBAL) -SET_PROPERTY(TARGET ascend_ge PROPERTY IMPORTED_LOCATION ${atlas_ge_runner}) +SET_PROPERTY(TARGET ascend_ge PROPERTY IMPORTED_LOCATION ${atlas_ge_runner_lib}) ADD_LIBRARY(ascend_graph SHARED IMPORTED GLOBAL) -SET_PROPERTY(TARGET ascend_graph PROPERTY IMPORTED_LOCATION ${atlas_graph}) -add_custom_target(extern_ascend DEPENDS ascend_ge ascend_graph) +SET_PROPERTY(TARGET ascend_graph PROPERTY IMPORTED_LOCATION ${atlas_graph_lib}) + +ADD_LIBRARY(atlas_acl SHARED IMPORTED GLOBAL) +SET_PROPERTY(TARGET atlas_acl PROPERTY IMPORTED_LOCATION ${atlas_acl_lib}) + +add_custom_target(extern_ascend DEPENDS ascend_ge ascend_graph atlas_acl) diff --git a/paddle/fluid/memory/allocation/CMakeLists.txt b/paddle/fluid/memory/allocation/CMakeLists.txt index 108e1240c5d..c93f637af1a 100644 --- a/paddle/fluid/memory/allocation/CMakeLists.txt +++ b/paddle/fluid/memory/allocation/CMakeLists.txt @@ -26,6 +26,8 @@ if (WITH_GPU) set(AllocatorFacadeDeps gpu_info cuda_allocator pinned_allocator cuda_device_guard thread_local_allocator) elseif(WITH_XPU) set(AllocatorFacadeDeps xpu_info) +elseif(WITH_ASCEND) + set(AllocatorFacadeDeps ascend_npu_info) else () set(AllocatorFacadeDeps) endif() diff --git a/paddle/fluid/operators/collective/CMakeLists.txt b/paddle/fluid/operators/collective/CMakeLists.txt index 09d4adee947..55ad52f6447 100644 --- a/paddle/fluid/operators/collective/CMakeLists.txt +++ b/paddle/fluid/operators/collective/CMakeLists.txt @@ -20,6 +20,11 @@ if(WITH_NCCL) op_library(gen_nccl_id_op DEPS ${COLLECTIVE_DEPS} gen_nccl_id_op_helper) endif() +if(WITH_ASCEND) + op_library(gen_nccl_id_op) +endif() + + if(WITH_GLOO) set(COLLECTIVE_DEPS ${COLLECTIVE_DEPS} gloo_wrapper) endif() diff --git a/paddle/fluid/operators/collective/gen_nccl_id_op.cc b/paddle/fluid/operators/collective/gen_nccl_id_op.cc index a985da5d5d0..1b30a29df25 100644 --- a/paddle/fluid/operators/collective/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/collective/gen_nccl_id_op.cc @@ -32,6 +32,8 @@ limitations under the License. */ namespace paddle { namespace operators { +#ifdef PADDLE_WITH_NCCL + class GenNCCLIdOp : public framework::OperatorBase { public: GenNCCLIdOp(const std::string& type, const framework::VariableNameMap& inputs, @@ -159,6 +161,21 @@ class GenNCCLIdOp : public framework::OperatorBase { } }; +#else +class GenNCCLIdOp : public framework::OperatorBase { + public: + GenNCCLIdOp(const std::string& type, const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + } +}; + +#endif + class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { diff --git a/paddle/fluid/platform/CMakeLists.txt b/paddle/fluid/platform/CMakeLists.txt index 6ae1f52ec03..63ff4852f16 100644 --- a/paddle/fluid/platform/CMakeLists.txt +++ b/paddle/fluid/platform/CMakeLists.txt @@ -10,6 +10,12 @@ ELSE() set(XPU_CTX_DEPS) endif(WITH_XPU) +if(WITH_ASCEND) + set(ASCEND_DEPS xpulib) +ELSE() + set(ASCEND_DEPS) +endif(WITH_ASCEND) + if (WITH_PYTHON) py_proto_compile(profiler_py_proto SRCS profiler.proto) add_custom_target(profiler_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) @@ -61,6 +67,10 @@ if(WITH_XPU) cc_library(xpu_info SRCS xpu_info.cc DEPS gflags glog enforce) endif() +if(WITH_ASCEND) + cc_library(ascend_npu_info SRCS ascend_npu_info.cc DEPS gflags glog enforce atlas_acl) +endif() + add_subdirectory(dynload) add_subdirectory(stream) diff --git a/paddle/fluid/platform/ascend_npu_info.cc b/paddle/fluid/platform/ascend_npu_info.cc new file mode 100644 index 00000000000..a6a5e4e8631 --- /dev/null +++ b/paddle/fluid/platform/ascend_npu_info.cc @@ -0,0 +1,35 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#include +#include "acl/acl_rt.h" +#include "paddle/fluid/platform/ascend_npu_info.h" + +namespace paddle { +namespace platform { +namespace ascend{ + +int NPUDevice::GetDeviceCount() { + uint32_t count = 0; + aclError status = aclrtGetDeviceCount(&count); + if(status != 0){ + LOG(ERROR) << "aclrtGetDeviceCount error code:" << status; + return -1; + } + + return count; +} + + +} // namespace ascend +} // namespace platform +} // namespace paddle + + diff --git a/paddle/fluid/platform/ascend_npu_info.h b/paddle/fluid/platform/ascend_npu_info.h new file mode 100644 index 00000000000..faf3efb2e64 --- /dev/null +++ b/paddle/fluid/platform/ascend_npu_info.h @@ -0,0 +1,28 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#pragma once + +#ifdef PADDLE_WITH_ASCEND + +namespace paddle { +namespace platform { +namespace ascend{ + +class NPUDevice{ +public: + //! Get the total number of XPU devices in system. + static int GetDeviceCount(); +}; + +} // namespace ascend +} // namespace platform +} // namespace paddle +#endif diff --git a/paddle/fluid/pybind/ascend_wrapper_py.cc b/paddle/fluid/pybind/ascend_wrapper_py.cc index 26835ac1d70..492eb7fb5d3 100644 --- a/paddle/fluid/pybind/ascend_wrapper_py.cc +++ b/paddle/fluid/pybind/ascend_wrapper_py.cc @@ -33,6 +33,7 @@ limitations under the License. */ #include #include "paddle/fluid/framework/fleet/ascend_wrapper.h" #include "paddle/fluid/pybind/ascend_wrapper_py.h" +#include "paddle/fluid/platform/ascend_npu_info.h" #include "paddle/fluid/platform/enforce.h" using namespace ge; // NOLINT @@ -96,6 +97,12 @@ enum AttrType { AT_NAMEATTR }; +void BindAscendDevice(py::module* m) { + py::class_(*m, "NPUDevice") + .def_static("get_device_count", + static_cast(&platform::ascend::NPUDevice::GetDeviceCount)); +} + void BindAscendGraph(py::module *m) { m->def("ge_initialize", &ge_initialize, "GEInitialize"); m->def("ge_finalize", &GEFinalize, "GEFinalize"); @@ -712,6 +719,7 @@ void BindAscendGraph(py::module *m) { }) .def_static("is_exist_op", static_cast(&OperatorFactory::IsExistOp)); + } } // end namespace pybind diff --git a/paddle/fluid/pybind/ascend_wrapper_py.h b/paddle/fluid/pybind/ascend_wrapper_py.h index 4af96d6ef4b..e999080544c 100644 --- a/paddle/fluid/pybind/ascend_wrapper_py.h +++ b/paddle/fluid/pybind/ascend_wrapper_py.h @@ -25,6 +25,7 @@ namespace pybind { void BindAscendGraph(py::module* m); void BindAscendWrapper(py::module* m); +void BindAscendDevice(py::module* m); } // namespace pybind } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 72b3c9645ba..e1a638adf50 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -134,6 +134,14 @@ bool IsCompiledWithCUDA() { #endif } +bool IsCompiledWithAscend() { +#ifndef PADDLE_WITH_ASCEND + return false; +#else + return true; +#endif +} + bool IsCompiledWithXPU() { #ifndef PADDLE_WITH_XPU return false; @@ -1439,6 +1447,7 @@ All parameter, weight, gradient are variables in Paddle. .def("__repr__", string::to_string) .def("__str__", string::to_string); + py::class_(m, "XPUPlace", R"DOC( **Note**: Examples: @@ -1727,6 +1736,7 @@ All parameter, weight, gradient are variables in Paddle. m.def("init_devices", []() { framework::InitDevices(); }); m.def("is_compiled_with_cuda", IsCompiledWithCUDA); + m.def("is_compiled_with_ascend", IsCompiledWithAscend); m.def("is_compiled_with_xpu", IsCompiledWithXPU); m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN); m.def("supports_bfloat16", SupportsBfloat16); @@ -2843,6 +2853,7 @@ All parameter, weight, gradient are variables in Paddle. #ifdef PADDLE_WITH_ASCEND BindAscendWrapper(&m); BindAscendGraph(&m); + BindAscendDevice(&m); #endif #ifdef PADDLE_WITH_CRYPTO BindCrypto(&m); diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 0b7e8da101b..970a932fcc0 100644 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -40,6 +40,11 @@ init = fleet.init is_first_worker = fleet.is_first_worker worker_index = fleet.worker_index worker_num = fleet.worker_num +node_num=fleet.node_num +rank=fleet.worker_index +nranks=fleet.worker_num +world_size=fleet.worker_num +rank_in_node=fleet.rank_in_node is_worker = fleet.is_worker worker_endpoints = fleet.worker_endpoints server_num = fleet.server_num diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index a45cdd6f38f..547fe76063c 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -288,6 +288,12 @@ class Fleet(object): """ return self._role_maker._worker_num() + def node_num(self): + return self._role_maker._get_node_num() + + def rank_in_node(self): + return self._role_maker._get_rank_in_node() + def is_worker(self): """ Check whether the node is an instance of worker. diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index a8683aea97f..3da3e546352 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -614,7 +614,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): return len(self._get_pserver_endpoints( )) if self._get_pserver_endpoints() is not None else 0 - def _node_num(self): + def _get_node_num(self): """ return the training node number """ @@ -622,6 +622,11 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._generate_role() return self._nodes_num + def _get_rank_in_node(self): + if not self._role_is_generated: + self._generate_role() + return self._rank_in_node + def _get_trainer_endpoints(self): """ get endpoint of all trainers @@ -782,6 +787,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._trainers_num = len(self._worker_endpoints) self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) + self._rank_in_node = os.getenv("PADDLE_RANK_IN_NODE") def _gloo_init(self): # PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index c7c60a3fbde..2a9fa070f67 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -117,6 +117,23 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra "--gpus=\"0,1,2,3\" will launch four training processes each bound to one gpu." ) + base_group.add_argument( + "--run_mode", + type=str, + default="collective", + help="run mode of job, can be:collective/ps/ps-heter" + ) + + base_group.add_argument( + "--ascend_npus", + type=str, + default=None, + help="It's for ascend npu training." + "For example:" + "--ascend_npus=\"0,1,2,3\" will launch four training processes each bound to one gpu." + ) + + base_group.add_argument("--selected_gpus", dest="gpus") base_group.add_argument( @@ -266,6 +283,16 @@ def launch_ps(args, distribute_mode): def which_distributed_mode(args): + if args.run_mode is not None: + assert args.run_mode in ["collective", "ps", "ps-heter"] + + if args.run_mode == "collective": + return DistributeMode.COLLECTIVE + elif args.run_mode == "ps": + return DistributeMode.PS + elif args.run_mode == "ps-heter": + return DistributeMode.PS_HETER + ps_args = [ '--worker_num', '--server_num', '--heter_worker_num', '--servers', '--workers', '--heter_workers', '--http_port' @@ -288,22 +315,24 @@ def which_distributed_mode(args): ) if fluid.core.is_compiled_with_cuda(): - cuda_device_num = fluid.core.get_cuda_device_count() + accelerators = fluid.core.get_cuda_device_count() + if fluid.core.is_compiled_with_ascend(): + accelerators = fluid.core.NPUDevice.get_device_count() else: - cuda_device_num = 0 + accelerators = 0 if len(has_ps_args) > 0: logger.info( - "Run parameter-sever mode. pserver arguments:{}, cuda count:{}". - format(has_ps_args, cuda_device_num)) + "Run parameter-sever mode. pserver arguments:{}, accelerators count:{}". + format(has_ps_args, accelerators)) has_ps_heter_args = list(set(has_ps_args) & set(ps_heter_args)) if len(has_ps_heter_args) > 0: return DistributeMode.PS_HETER else: return DistributeMode.PS elif len(has_collective_args) > 0: - logger.info("Run collective gpu mode. gpu arguments:{}, cuda count:{}". - format(has_collective_args, cuda_device_num)) + logger.info("Run collective mode. gpu arguments:{}, cuda count:{}". + format(has_collective_args, accelerators)) return DistributeMode.COLLECTIVE else: if not fluid.core.is_compiled_with_cuda(): diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 625e8a476b5..93e39249059 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -50,6 +50,7 @@ class DeviceMode(): CPU = 0 GPU = 1 KUNLUN = 2 + ASCEND_NPU = 3 UNKNOWN = 3 @@ -131,23 +132,23 @@ class JobServer(object): class Trainer(object): def __init__(self): - self.gpus = [] + self.accelerators = [] self.endpoint = None self.rank = None def __str__(self): - return "gpu:{} endpoint:{} rank:{}".format(self.gpus, self.endpoint, + return "accelerator:{} endpoint:{} rank:{}".format(self.accelerators, self.endpoint, self.rank) def __eq__(self, t): - if len(self.gpus) != len(t.gpus): + if len(self.accelerators) != len(t.accelerators): return False if self.endpoint != t.endpoint or \ self.rank != t.rank: return False - for a, b in zip(self.gpus, t.gpus): + for a, b in zip(self.accelerators, t.accelerators): if a != b: return False @@ -170,12 +171,13 @@ class Pod(object): self.servers = [] self.workers = [] self.heter_workers = [] - self.gpus = [] + self.accelerators = [] + self.device_mode = None def __str__(self): - return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{} servers:{} \ + return "rank:{} id:{} addr:{} port:{} visible_accelerator:{} trainers:{} servers:{} \ workers:{} heter_workers:{}".format( - self.rank, self.id, self.addr, self.port, self.gpus, [ + self.rank, self.id, self.addr, self.port, self.accelerators, [ str(t) for t in self.trainers ], [str(s) for s in self.servers], [str(w) for w in self.workers], [str(h) for h in self.heter_workers]) @@ -230,12 +232,12 @@ class Pod(object): def rank(self): return self.rank - def get_visible_gpus(self): + def get_visible_accelerators(self): r = "" - for g in self.gpus: + for g in self.accelerators: r += "{},".format(g) - assert r != "", "this pod {} can't see any gpus".format(self) + assert r != "", "this pod {} can't see any accelerators".format(self) r = r[:-1] return r @@ -263,18 +265,23 @@ def get_cluster(node_ips, node_ip, trainer_endpoints, device_mode, pod = Pod() pod.rank = node_rank pod.addr = ip + pod.device_mode=device_mode + cur_node_endpoints = trainer_endpoints[node_rank] # when use paddlecloud, endpoints may > devices_per_proc(user_defined) assert len(cur_node_endpoints) >= len( devices_per_proc - ), "current trainer_endpoints size should be greater equal than selected_gpus size." + ), "current trainer_endpoints size should be greater equal than acclerators size." for i in range(len(devices_per_proc)): trainer = Trainer() - if device_mode == DeviceMode.GPU: + if device_mode == DeviceMode.GPU or device_mode == DeviceMode.ASCEND_NPU: if isinstance(devices_per_proc[i], (list, tuple)): - trainer.gpus.extend(devices_per_proc[i]) + trainer.accelerators.extend(devices_per_proc[i]) + pod.accelerators.extend(devices_per_proc[i]) else: - trainer.gpus.append(devices_per_proc[i]) + trainer.accelerators.append(devices_per_proc[i]) + pod.accelerators.append(devices_per_proc[i]) + trainer.endpoint = "%s" % (cur_node_endpoints[i]) trainer.rank = trainer_rank trainer_rank += 1 @@ -451,12 +458,17 @@ def start_local_trainers(cluster, "PADDLE_TRAINER_ID": "%d" % t.rank, "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), - "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) + "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()), + "PADDLE_RANK_IN_NODE": str(idx) } - if len(t.gpus) > 0: + if len(t.accelerators) > 0 and pod.device_mode==DeviceMode.GPU: proc_env["FLAGS_selected_gpus"] = "%s" % ",".join( - [str(g) for g in t.gpus]) + [str(g) for g in t.accelerators]) + + if len(t.accelerators) > 0: + proc_env["FLAGS_selected_accelerators"] = "%s" % ",".join( + [str(g) for g in t.accelerators]) current_env.update(proc_env) @@ -555,6 +567,16 @@ def watch_local_trainers(procs, nranks): return alive +def get_ascend_npus(npus): + if npus is None: + count = fluid.core.NPUDevice.get_device_count() + if count <= 0: + return ret + ret = [x for x in range(count)] + else: + ret = [x.strip() for x in npus.split(',')] + return ret + def get_gpus(gpus): if gpus is None: gpus_num = fluid.core.get_cuda_device_count() @@ -585,15 +607,18 @@ def get_gpus(gpus): def get_device_mode(): - #TODO(gongwb):Add XPU supported - if not fluid.core.is_compiled_with_cuda( - ) or fluid.core.get_cuda_device_count() <= 0: - print("launch train in CPU mode") - return DeviceMode.CPU + if fluid.core.is_compiled_with_ascend() and \ + fluid.core.NPUDevice.get_device_count() > 0: + print("launch train in ascend npu mode!") + return DeviceMode.ASCEND_NPU - print("launch train in GPU mode") - return DeviceMode.GPU + if fluid.core.is_compiled_with_cuda() and \ + fluid.core.get_cuda_device_count() > 0: + print("launch train in GPU mode!") + return DeviceMode.GPU + print("launch train in CPU mode!") + return DeviceMode.CPU def get_device_proc_info(args): # device_mode @@ -613,6 +638,10 @@ def get_device_proc_info(args): ] else: devices_per_proc = gpus + elif device_mode == DeviceMode.ASCEND_NPU: + npus = get_ascend_npus(args.ascend_npus) + assert args.nproc_per_node is None, "ascend_npus need't nproc_per_node arguments" + devices_per_proc=npus elif device_mode == DeviceMode.CPU: if args.nproc_per_node is None: devices_per_proc = [0] diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/__init__.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py index d7ac81bb5c5..d99ee9c1e9b 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py @@ -16,7 +16,7 @@ import paddle.fluid.framework as framework from paddle.fluid.optimizer import Optimizer import paddle.fluid.core as core import numpy as np -import ascend_parser +from . import ascend_parser class AscendIRParser(object): @@ -27,37 +27,25 @@ class AscendIRParser(object): ret_map = {} ge_in_operator = [] for id, var in enumerate(input_varlist): - if var.is_data: # input data - ge_input = core.GEOperatorFactory.create_operator( - var.name, "Data").set_attr_int32("index", id) + if var.is_data: # input data + ge_input = core.GEOperatorFactory.create_operator(var.name, "Data").set_attr_int32("index", id) ret_map[var.name] = ge_input ge_in_operator.append(ge_input) - else: # param, learning ... - ge_input = core.GEOperatorFactory.create_operator(var.name, - "Variable") - ge_input.update_output_desc("y", - core.GETensorDesc( - core.GEShape(var.shape), - core.GEFormat.FORMAT_ND, - core.GEDataType.DT_FLOAT)) + else: # param, learning ... + ge_input = core.GEOperatorFactory.create_operator(var.name, "Variable") + ge_input.update_output_desc("y", core.GETensorDesc(core.GEShape(var.shape), core.GEFormat.FORMAT_ND, core.GEDataType.DT_FLOAT)) ret_map[var.name] = ge_input return ge_in_operator, ret_map def parse_op(self, op): if op.type in ascend_parser.registerd_op: print("Op[%s] has been registered, begin to parse it" % (op.type)) - op_parser = self.parser_factory.create_parse( - ascend_parser.registerd_op[op.type]) + op_parser = self.parser_factory.create_parse(ascend_parser.registerd_op[op.type]) op_parser.apply(op) else: - print("Op[%s] has not been registered, so we have to skip it" % - (op.type)) - - def _parse_program(self, - graph_name, - program, - input_varlist=[], - fetch_list=[]): + print("Op[%s] has not been registered, so we have to skip it" % (op.type)) + + def _parse_program(self, graph_name, program, input_varlist=[], fetch_list=[]): begin_graph_idx = self.graph_idx ge_in_operator = [] ge_out_operator = [] @@ -72,8 +60,7 @@ class AscendIRParser(object): ge_in_operator, self.var2geop = self._construct_input_map(input_varlist) - self.parser_factory = ascend_parser.AscendParserFactory(graph, - self.var2geop) + self.parser_factory = ascend_parser.AscendParserFactory(graph, self.var2geop) for i, curop in list(enumerate(block.ops)): self.parse_op(curop) @@ -110,11 +97,9 @@ class AscendIRParser(object): self.graph_idx += 1 return graph - def parse_program(self, startup_program, main_program, input_varlist, - fetch_list): + def parse_program(self, startup_program, main_program, input_varlist, fetch_list): startup_graph = self._parse_program("startup", startup_program) - main_graph = self._parse_program("main", main_program, input_varlist, - fetch_list) + main_graph = self._parse_program("main", main_program, input_varlist, fetch_list) return startup_graph, main_graph @@ -138,7 +123,7 @@ class AscendOptimizer(Optimizer): dist_strategy.ascend = False dist_strategy.ascend_configs = {} - def _get_input_varlist(program): + def _get_input_varlist(self, program): ret_list = [] for var in program.list_vars(): if var.is_data or var.persistable: @@ -149,18 +134,26 @@ class AscendOptimizer(Optimizer): loss, startup_program=None, parameter_list=None, - no_grad_set=None): - minimized = self.inner_opt.minimize( - loss, startup_program=startup_program) + no_grad_set=None, + auto_dp=False): + minimized = self.inner_opt.minimize(loss, startup_program=startup_program) self.ascend_instance = core.AscendInstance() + from paddle.distributed import fleet + if auto_dp and fleet.worker_num() > 1: + from paddle.fluid.transpiler import ascend_transpiler + t = ascend_transpiler.AscendTranspiler(startup_program, loss.block.program) + t.transpile() + print(loss.block.program) + # Config about Graph Engine can be found in https://support.huaweicloud.com/ config = { - "ge.exec.deviceId": "0", + "ge.exec.deviceId": str(fleet.rank_in_node()), "ge.graphRunMode": "1", "ge.exec.precision_mode": "must_keep_origin_dtype" } + print("ge_initialize config:", config) core.ge_initialize(config) # Init Session @@ -169,7 +162,7 @@ class AscendOptimizer(Optimizer): main_block = loss.block self.parser = AscendIRParser() - input_varlist = _get_input_varlist(main_block.program) + input_varlist = self._get_input_varlist(main_block.program) startup_graph, main_graph = self.parser.parse_program( startup_program, main_block.program, input_varlist, self.fetch_list) diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py index b497b5eecda..7921b3d2216 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_parser.py @@ -29,6 +29,8 @@ registerd_op = { "reduce_sum_grad": "ReduceSumGradParser", "matmul_grad": "MatMulGradParser", "mul_grad": "MulGradParser", + "reshape2": "ReshapeParser", + "scale": "ScaleParser", "relu_grad": "ReluGradParser", "softmax_with_cross_entropy_grad": "SoftmaxWithCrossEntropyGradParser", "truncated_gaussian_random": "TruncatedNormalParser", @@ -60,13 +62,11 @@ class AscendHelper(object): } def dtype2ge(self, dtype): - assert dtype in self.dtype2ge_map, "dtype[%d] is not supported %d" % ( - dtype) + assert dtype in self.dtype2ge_map, "dtype[%d] is not supported %d" % (dtype) return self.dtype2ge_map[dtype] def dtype2np(self, index): - assert index in self.dtype2np_map, "index[%d] is not supported %d" % ( - dtype) + assert index in self.dtype2np_map, "index[%d] is not supported %d" % (dtype) return self.dtype2np_map[index] @@ -91,8 +91,7 @@ class AscendParserBase(object): self.ascend_helper = AscendHelper() def _get_ge_input(self, input_var_name): - assert input_var_name in self.var2geop, "var %s not created before" % ( - input_var_name) + assert input_var_name in self.var2geop, "var %s not created before" % (input_var_name) return self.var2geop[input_var_name] def update_output(self, geop_list, index_list): @@ -113,8 +112,7 @@ class AscendParserBase(object): for i in range(len(arguments)): print("assgin index_list[%d][%d] to %s" % (output_id, i, arguments[i])) - self.var2geop[arguments[i]] = geop_list[index_list[ - output_id][i]] + self.var2geop[arguments[i]] = geop_list[index_list[output_id][i]] for geop in geop_list: self.graph.add_op(geop) @@ -478,15 +476,11 @@ class TruncatedNormalParser(AscendParserBase): "const" + self._accumulated_op_id(), "Const").set_attr_tensor( "value", tensor3) - tensor4 = self._create_ge_tensor([1], dtype, mean - 2 * std) - min_tensor = core.GEOperatorFactory.create_operator( - "const" + self._accumulated_op_id(), "Const").set_attr_tensor( - "value", tensor4) + tensor4 = self._create_ge_tensor([1], dtype, mean-2*std) + min_tensor = core.GEOperatorFactory.create_operator("const" + self._accumulated_op_id(), "Const").set_attr_tensor("value", tensor4) - tensor5 = self._create_ge_tensor([1], dtype, mean + 2 * std) - max_tensor = core.GEOperatorFactory.create_operator( - "const" + self._accumulated_op_id(), "Const").set_attr_tensor( - "value", tensor5) + tensor5 = self._create_ge_tensor([1], dtype, mean+2*std) + max_tensor = core.GEOperatorFactory.create_operator("const" + self._accumulated_op_id(), "Const").set_attr_tensor("value", tensor5) self._mark_as_input(shape_tensor) self._mark_as_input(mean_tensor) @@ -527,3 +521,43 @@ class TruncatedNormalParser(AscendParserBase): "self.op.output('Out')[0] is not persistable in truncated_noraml" ) return [truncated_normal], [[0]] #[assign] + +class ScaleParser(AscendParserBase): + def __init__(self, graph, var2geop): + super(ScaleParser, self).__init__(graph, var2geop) + self.parser_name = "scale" + + def _apply(self): + x = self._get_ge_input(self.op.input_arg_names[0]) + scale = self.op.attr("scale") #self.get_ge_input(self.op.input_arg_names[1]) + bias = self.op.attr("bias") + bias_after_scale = self.op.attr("bias_after_scale") + if bias_after_scale: + scale_value = core.GEOperatorFactory.create_operator("scale" + self._accumulated_op_id(), "Power").set_input("x", x).set_attr_float("power", 1.0).set_attr_float("scale", scale).set_attr_float("shift", bias) + else: + x_add_bias = core.GEOperatorFactory.create_operator("adds" + self._accumulated_op_id(), "Adds").set_input("x", x).set_attr_float("value", bias) #set_input("x2", bias) + scale_value = core.GEOperatorFactory.create_operator("scale" + self._accumulated_op_id(), "Power").set_input("x", x_add_bias).set_attr_float("power", 1.0).set_attr_float("scale", scale).set_attr_float("shift", 0.0) + #tensor_zeros = core.GEOperatorFactory.create_operator("zeroslike" + self.getid(), "ZerosLike").set_input("x", x) + #bias_ = self.create_ge_tensor([1], 5, bias) + #const_bias = core.GEOperatorFactory.create_operator("const" + self.getid(), "Const").set_attr_tensor("value", tensor_bias) + return [scale_value],[[0]] + +class ReshapeParser(AscendParserBase): + def __init__(self, graph, var2geop): + super(ReshapeParser, self).__init__(graph, var2geop) + self.parser_name = "reshape2" + + def _apply(self): + print("swbuf:", self.op.input_arg_names) + shape = self.op.attr("shape") + axis = 0 + if shape[0] == -1: + axis = 1 + shape = shape[1:] + print("shape: ", shape) + data_x1_shape = self._get_ge_input(self.op.input_arg_names[0]) + tensor = self._create_ge_tensor([len(shape)], 2, shape) + const_shape = core.GEOperatorFactory.create_operator("shape" + self._accumulated_op_id(), "Const").set_attr_tensor("value", tensor) + reshape = core.GEOperatorFactory.create_operator("reshape" + self._accumulated_op_id(), "Reshape").set_input("x", data_x1_shape).set_input("shape", const_shape).set_attr_int32("axis", axis) + + return [reshape, reshape], [[0],[1]] diff --git a/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py index 7ee184cfc5e..bce645ebc5c 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py @@ -61,8 +61,9 @@ class GraphExecutionOptimizer(MetaOptimizerBase): trainer_endpoints_env = ",".join(trainer_endpoints) trainers_num = self.role_maker._worker_num() - if trainer_id == 0: - wait_server_ready(other_trainers) + # FIXME(wangxi): approve this. + #if trainer_id == 0: + # wait_server_ready(other_trainers) nccl_id_var = startup_program.global_block().create_var( name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 5af27ed047e..129453149c6 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -38,6 +38,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleetrun) list(APPEND MIXED_DIST_TEST_OPS test_fleet_run_random_port) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_async) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_ascend) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer) @@ -521,6 +522,7 @@ if(WITH_DISTRIBUTE) bash_test_modules(test_fleet_run_random_port START_BASH test_fleet_run_random_port.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_async START_BASH test_fleet_launch_async.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_cloud START_BASH test_fleet_launch_cloud.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleet_launch_ascend START_BASH test_fleet_launch_ascend.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_nproc START_BASH test_fleet_launch_nproc.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) # port range (20000, 23000) is reserved for dist-ops diff --git a/python/paddle/fluid/tests/unittests/ascend_multi_process_collective.py b/python/paddle/fluid/tests/unittests/ascend_multi_process_collective.py new file mode 100644 index 00000000000..435af572973 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ascend_multi_process_collective.py @@ -0,0 +1,37 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time + +def train(prefix): + selected_accelerators = os.getenv("FLAGS_selected_accelerators") + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS") + current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + worker_endpoints = worker_endpoints_env + trainers_num = len(worker_endpoints.split(',')) + + details = "selected_accelerators:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\ + .format(selected_accelerators, worker_endpoints, trainers_num, current_endpoint,trainer_id) + + print(details) + with open("multi_process_{}.check_{}.log".format(prefix, trainer_id), "w") as f: + f.write(details) + + +if __name__ == '__main__': + prefix = sys.argv[1] + train(prefix) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_ascend.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_ascend.sh new file mode 100644 index 00000000000..233fe7f7f25 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_ascend.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# use paddlecloud +echo "begin test use paddlecloud" +cluster_node_ips="127.0.0.1,127.0.0.2" +export PADDLE_TRAINERS_NUM=2 +export POD_IP=127.0.0.1 +export PADDLE_TRAINERS=127.0.0.1,127.0.0.2 +export PADDLE_TRAINER_ID=0 + +export PADDLE_PORT=35789 +export TRAINER_PORTS_NUM=2 + +distributed_args="--ips=${cluster_node_ips} --ascend_npus=0,1 --log_dir=testlog" +python -m paddle.distributed.fleet.launch ${distributed_args} ascend_multi_process_collective.py fleetlaunchascend + +str1="selected_accelerators:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35789 trainer_id:0" +str2="selected_accelerators:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35790 trainer_id:1" +file_0="multi_process_fleetlaunchascend.check_0.log" +file_1="multi_process_fleetlaunchascend.check_1.log" + +echo "paddlecloud params test" +if grep -q "$str1" "$file_0"; then + echo "find trainer 0" +else + echo "not find trainer 0" + exit -1 +fi + +if grep -q "$str2" "$file_1"; then + echo "find trainer 1" +else + echo "not find trainer 1" + exit -1 +fi + +# test async poll process +if [ -f $file_0 ]; then + rm $file_0 +fi +if [ -f $file_1 ]; then + rm $file_1 +fi diff --git a/python/paddle/fluid/transpiler/ascend_transpiler.py b/python/paddle/fluid/transpiler/ascend_transpiler.py new file mode 100644 index 00000000000..ff814161050 --- /dev/null +++ b/python/paddle/fluid/transpiler/ascend_transpiler.py @@ -0,0 +1,64 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import collective +from .. import core +OpRole = core.op_proto_and_checker_maker.OpRole + +class AscendTranspiler(collective.Collective): + def __init__(self, startup_program, main_program): + self.nrings = 1 + super(AscendTranspiler, self).__init__(self.nrings) + self._startup_program = startup_program + self._main_program = main_program + + + def _insert_allreduce_ops(self): + block = self._main_program.global_block() + ring_id = -1 + grad = None + for idx, op in reversed(list(enumerate(block.ops))): + if self._is_backward_op(op) and \ + self.op_role_var_key in op.attr_names: + op_role_var = op.all_attrs()[self.op_role_var_key] + + if len(op_role_var) == 0: + continue + assert len(op_role_var) % 2 == 0 + + offset = idx + for i in range(0, len(op_role_var), 2): + param = block.vars[op_role_var[i]] + grad = block.vars[op_role_var[i + 1]] + if param.is_distributed: + continue + + # As we search ops reversedly, we should insert c_allreduce_sum + # op in the same way to keep the ring_id alternate + ring_id = (ring_id + 1) % self.nrings + block._insert_op( + offset + 1, + type='allreduce', + inputs={'X': grad}, + outputs={'Out': grad}, + attrs={ + 'ring_id': ring_id, + self.op_role_key: OpRole.Backward + }) + + if grad is None: + return + + def transpile(self): + self._insert_allreduce_ops() diff --git a/python/setup.py.in b/python/setup.py.in index 3c13f55d4d3..9f31f97b531 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -148,6 +148,7 @@ packages=['paddle', 'paddle.distributed.fleet.base', 'paddle.distributed.fleet.meta_optimizers', 'paddle.distributed.fleet.meta_optimizers.sharding', + 'paddle.distributed.fleet.meta_optimizers.ascend', 'paddle.distributed.fleet.runtime', 'paddle.distributed.fleet.dataset', 'paddle.distributed.fleet.data_generator', -- GitLab