未验证 提交 f9c97dd7 编写于 作者: G gongweibao 提交者: GitHub

Add distribution supported (#30578)

Add distribution supported
上级 1882f2ce
...@@ -37,13 +37,18 @@ set(ATLAS_ACL_DIR ${ASCEND_DIR}/ascend-toolkit/latest/acllib/lib64) ...@@ -37,13 +37,18 @@ set(ATLAS_ACL_DIR ${ASCEND_DIR}/ascend-toolkit/latest/acllib/lib64)
set(ATLAS_ATC_DIR ${ASCEND_DIR}/ascend-toolkit/latest/atc/lib64) set(ATLAS_ATC_DIR ${ASCEND_DIR}/ascend-toolkit/latest/atc/lib64)
set(ATLAS_MS_RUNTIME_PATH ${ATLAS_RUNTIME_DIR} ${ATLAS_ACL_DIR} ${ATLAS_ATC_DIR}) set(ATLAS_MS_RUNTIME_PATH ${ATLAS_RUNTIME_DIR} ${ATLAS_ACL_DIR} ${ATLAS_ATC_DIR})
set(atlas_graph ${ATLAS_RUNTIME_DIR}/libgraph.so) set(atlas_graph_lib ${ATLAS_RUNTIME_DIR}/libgraph.so)
set(atlas_ge_runner ${ATLAS_RUNTIME_DIR}/libge_runner.so) set(atlas_ge_runner_lib ${ATLAS_RUNTIME_DIR}/libge_runner.so)
set(atlas_acl_lib ${ATLAS_RUNTIME_DIR}/libascendcl.so)
INCLUDE_DIRECTORIES(${ATLAS_RUNTIME_INC_DIR}) INCLUDE_DIRECTORIES(${ATLAS_RUNTIME_INC_DIR})
ADD_LIBRARY(ascend_ge SHARED IMPORTED GLOBAL) ADD_LIBRARY(ascend_ge SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET ascend_ge PROPERTY IMPORTED_LOCATION ${atlas_ge_runner}) SET_PROPERTY(TARGET ascend_ge PROPERTY IMPORTED_LOCATION ${atlas_ge_runner_lib})
ADD_LIBRARY(ascend_graph SHARED IMPORTED GLOBAL) ADD_LIBRARY(ascend_graph SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET ascend_graph PROPERTY IMPORTED_LOCATION ${atlas_graph}) SET_PROPERTY(TARGET ascend_graph PROPERTY IMPORTED_LOCATION ${atlas_graph_lib})
add_custom_target(extern_ascend DEPENDS ascend_ge ascend_graph)
ADD_LIBRARY(atlas_acl SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET atlas_acl PROPERTY IMPORTED_LOCATION ${atlas_acl_lib})
add_custom_target(extern_ascend DEPENDS ascend_ge ascend_graph atlas_acl)
...@@ -26,6 +26,8 @@ if (WITH_GPU) ...@@ -26,6 +26,8 @@ if (WITH_GPU)
set(AllocatorFacadeDeps gpu_info cuda_allocator pinned_allocator cuda_device_guard thread_local_allocator) set(AllocatorFacadeDeps gpu_info cuda_allocator pinned_allocator cuda_device_guard thread_local_allocator)
elseif(WITH_XPU) elseif(WITH_XPU)
set(AllocatorFacadeDeps xpu_info) set(AllocatorFacadeDeps xpu_info)
elseif(WITH_ASCEND)
set(AllocatorFacadeDeps ascend_npu_info)
else () else ()
set(AllocatorFacadeDeps) set(AllocatorFacadeDeps)
endif() endif()
......
...@@ -20,6 +20,11 @@ if(WITH_NCCL) ...@@ -20,6 +20,11 @@ if(WITH_NCCL)
op_library(gen_nccl_id_op DEPS ${COLLECTIVE_DEPS} gen_nccl_id_op_helper) op_library(gen_nccl_id_op DEPS ${COLLECTIVE_DEPS} gen_nccl_id_op_helper)
endif() endif()
if(WITH_ASCEND)
op_library(gen_nccl_id_op)
endif()
if(WITH_GLOO) if(WITH_GLOO)
set(COLLECTIVE_DEPS ${COLLECTIVE_DEPS} gloo_wrapper) set(COLLECTIVE_DEPS ${COLLECTIVE_DEPS} gloo_wrapper)
endif() endif()
......
...@@ -32,6 +32,8 @@ limitations under the License. */ ...@@ -32,6 +32,8 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace operators { namespace operators {
#ifdef PADDLE_WITH_NCCL
class GenNCCLIdOp : public framework::OperatorBase { class GenNCCLIdOp : public framework::OperatorBase {
public: public:
GenNCCLIdOp(const std::string& type, const framework::VariableNameMap& inputs, GenNCCLIdOp(const std::string& type, const framework::VariableNameMap& inputs,
...@@ -159,6 +161,21 @@ class GenNCCLIdOp : public framework::OperatorBase { ...@@ -159,6 +161,21 @@ class GenNCCLIdOp : public framework::OperatorBase {
} }
}; };
#else
class GenNCCLIdOp : public framework::OperatorBase {
public:
GenNCCLIdOp(const std::string& type, const framework::VariableNameMap& inputs,
const framework::VariableNameMap& outputs,
const framework::AttributeMap& attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
}
};
#endif
class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
void Make() override { void Make() override {
......
...@@ -10,6 +10,12 @@ ELSE() ...@@ -10,6 +10,12 @@ ELSE()
set(XPU_CTX_DEPS) set(XPU_CTX_DEPS)
endif(WITH_XPU) endif(WITH_XPU)
if(WITH_ASCEND)
set(ASCEND_DEPS xpulib)
ELSE()
set(ASCEND_DEPS)
endif(WITH_ASCEND)
if (WITH_PYTHON) if (WITH_PYTHON)
py_proto_compile(profiler_py_proto SRCS profiler.proto) py_proto_compile(profiler_py_proto SRCS profiler.proto)
add_custom_target(profiler_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_custom_target(profiler_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
...@@ -61,6 +67,10 @@ if(WITH_XPU) ...@@ -61,6 +67,10 @@ if(WITH_XPU)
cc_library(xpu_info SRCS xpu_info.cc DEPS gflags glog enforce) cc_library(xpu_info SRCS xpu_info.cc DEPS gflags glog enforce)
endif() endif()
if(WITH_ASCEND)
cc_library(ascend_npu_info SRCS ascend_npu_info.cc DEPS gflags glog enforce atlas_acl)
endif()
add_subdirectory(dynload) add_subdirectory(dynload)
add_subdirectory(stream) add_subdirectory(stream)
......
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <glog/logging.h>
#include "acl/acl_rt.h"
#include "paddle/fluid/platform/ascend_npu_info.h"
namespace paddle {
namespace platform {
namespace ascend{
int NPUDevice::GetDeviceCount() {
uint32_t count = 0;
aclError status = aclrtGetDeviceCount(&count);
if(status != 0){
LOG(ERROR) << "aclrtGetDeviceCount error code:" << status;
return -1;
}
return count;
}
} // namespace ascend
} // namespace platform
} // namespace paddle
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#ifdef PADDLE_WITH_ASCEND
namespace paddle {
namespace platform {
namespace ascend{
class NPUDevice{
public:
//! Get the total number of XPU devices in system.
static int GetDeviceCount();
};
} // namespace ascend
} // namespace platform
} // namespace paddle
#endif
...@@ -33,6 +33,7 @@ limitations under the License. */ ...@@ -33,6 +33,7 @@ limitations under the License. */
#include <vector> #include <vector>
#include "paddle/fluid/framework/fleet/ascend_wrapper.h" #include "paddle/fluid/framework/fleet/ascend_wrapper.h"
#include "paddle/fluid/pybind/ascend_wrapper_py.h" #include "paddle/fluid/pybind/ascend_wrapper_py.h"
#include "paddle/fluid/platform/ascend_npu_info.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
using namespace ge; // NOLINT using namespace ge; // NOLINT
...@@ -96,6 +97,12 @@ enum AttrType { ...@@ -96,6 +97,12 @@ enum AttrType {
AT_NAMEATTR AT_NAMEATTR
}; };
void BindAscendDevice(py::module* m) {
py::class_<platform::ascend::NPUDevice>(*m, "NPUDevice")
.def_static("get_device_count",
static_cast<int (*)()>(&platform::ascend::NPUDevice::GetDeviceCount));
}
void BindAscendGraph(py::module *m) { void BindAscendGraph(py::module *m) {
m->def("ge_initialize", &ge_initialize, "GEInitialize"); m->def("ge_initialize", &ge_initialize, "GEInitialize");
m->def("ge_finalize", &GEFinalize, "GEFinalize"); m->def("ge_finalize", &GEFinalize, "GEFinalize");
...@@ -712,6 +719,7 @@ void BindAscendGraph(py::module *m) { ...@@ -712,6 +719,7 @@ void BindAscendGraph(py::module *m) {
}) })
.def_static("is_exist_op", .def_static("is_exist_op",
static_cast<bool (*)(const char*)>(&OperatorFactory::IsExistOp)); static_cast<bool (*)(const char*)>(&OperatorFactory::IsExistOp));
} }
} // end namespace pybind } // end namespace pybind
......
...@@ -25,6 +25,7 @@ namespace pybind { ...@@ -25,6 +25,7 @@ namespace pybind {
void BindAscendGraph(py::module* m); void BindAscendGraph(py::module* m);
void BindAscendWrapper(py::module* m); void BindAscendWrapper(py::module* m);
void BindAscendDevice(py::module* m);
} // namespace pybind } // namespace pybind
} // namespace paddle } // namespace paddle
......
...@@ -134,6 +134,14 @@ bool IsCompiledWithCUDA() { ...@@ -134,6 +134,14 @@ bool IsCompiledWithCUDA() {
#endif #endif
} }
bool IsCompiledWithAscend() {
#ifndef PADDLE_WITH_ASCEND
return false;
#else
return true;
#endif
}
bool IsCompiledWithXPU() { bool IsCompiledWithXPU() {
#ifndef PADDLE_WITH_XPU #ifndef PADDLE_WITH_XPU
return false; return false;
...@@ -1439,6 +1447,7 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1439,6 +1447,7 @@ All parameter, weight, gradient are variables in Paddle.
.def("__repr__", string::to_string<const platform::CUDAPlace &>) .def("__repr__", string::to_string<const platform::CUDAPlace &>)
.def("__str__", string::to_string<const platform::CUDAPlace &>); .def("__str__", string::to_string<const platform::CUDAPlace &>);
py::class_<platform::XPUPlace>(m, "XPUPlace", R"DOC( py::class_<platform::XPUPlace>(m, "XPUPlace", R"DOC(
**Note**: **Note**:
Examples: Examples:
...@@ -1727,6 +1736,7 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1727,6 +1736,7 @@ All parameter, weight, gradient are variables in Paddle.
m.def("init_devices", []() { framework::InitDevices(); }); m.def("init_devices", []() { framework::InitDevices(); });
m.def("is_compiled_with_cuda", IsCompiledWithCUDA); m.def("is_compiled_with_cuda", IsCompiledWithCUDA);
m.def("is_compiled_with_ascend", IsCompiledWithAscend);
m.def("is_compiled_with_xpu", IsCompiledWithXPU); m.def("is_compiled_with_xpu", IsCompiledWithXPU);
m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN); m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN);
m.def("supports_bfloat16", SupportsBfloat16); m.def("supports_bfloat16", SupportsBfloat16);
...@@ -2843,6 +2853,7 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -2843,6 +2853,7 @@ All parameter, weight, gradient are variables in Paddle.
#ifdef PADDLE_WITH_ASCEND #ifdef PADDLE_WITH_ASCEND
BindAscendWrapper(&m); BindAscendWrapper(&m);
BindAscendGraph(&m); BindAscendGraph(&m);
BindAscendDevice(&m);
#endif #endif
#ifdef PADDLE_WITH_CRYPTO #ifdef PADDLE_WITH_CRYPTO
BindCrypto(&m); BindCrypto(&m);
......
...@@ -40,6 +40,11 @@ init = fleet.init ...@@ -40,6 +40,11 @@ init = fleet.init
is_first_worker = fleet.is_first_worker is_first_worker = fleet.is_first_worker
worker_index = fleet.worker_index worker_index = fleet.worker_index
worker_num = fleet.worker_num worker_num = fleet.worker_num
node_num=fleet.node_num
rank=fleet.worker_index
nranks=fleet.worker_num
world_size=fleet.worker_num
rank_in_node=fleet.rank_in_node
is_worker = fleet.is_worker is_worker = fleet.is_worker
worker_endpoints = fleet.worker_endpoints worker_endpoints = fleet.worker_endpoints
server_num = fleet.server_num server_num = fleet.server_num
......
...@@ -288,6 +288,12 @@ class Fleet(object): ...@@ -288,6 +288,12 @@ class Fleet(object):
""" """
return self._role_maker._worker_num() return self._role_maker._worker_num()
def node_num(self):
return self._role_maker._get_node_num()
def rank_in_node(self):
return self._role_maker._get_rank_in_node()
def is_worker(self): def is_worker(self):
""" """
Check whether the node is an instance of worker. Check whether the node is an instance of worker.
......
...@@ -614,7 +614,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -614,7 +614,7 @@ class PaddleCloudRoleMaker(RoleMakerBase):
return len(self._get_pserver_endpoints( return len(self._get_pserver_endpoints(
)) if self._get_pserver_endpoints() is not None else 0 )) if self._get_pserver_endpoints() is not None else 0
def _node_num(self): def _get_node_num(self):
""" """
return the training node number return the training node number
""" """
...@@ -622,6 +622,11 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -622,6 +622,11 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._generate_role() self._generate_role()
return self._nodes_num return self._nodes_num
def _get_rank_in_node(self):
if not self._role_is_generated:
self._generate_role()
return self._rank_in_node
def _get_trainer_endpoints(self): def _get_trainer_endpoints(self):
""" """
get endpoint of all trainers get endpoint of all trainers
...@@ -782,6 +787,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -782,6 +787,7 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._trainers_num = len(self._worker_endpoints) self._trainers_num = len(self._worker_endpoints)
self._nodes_num = len( self._nodes_num = len(
set([x.split(':')[0] for x in self._worker_endpoints])) set([x.split(':')[0] for x in self._worker_endpoints]))
self._rank_in_node = os.getenv("PADDLE_RANK_IN_NODE")
def _gloo_init(self): def _gloo_init(self):
# PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier # PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier
......
...@@ -117,6 +117,23 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra ...@@ -117,6 +117,23 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
"--gpus=\"0,1,2,3\" will launch four training processes each bound to one gpu." "--gpus=\"0,1,2,3\" will launch four training processes each bound to one gpu."
) )
base_group.add_argument(
"--run_mode",
type=str,
default="collective",
help="run mode of job, can be:collective/ps/ps-heter"
)
base_group.add_argument(
"--ascend_npus",
type=str,
default=None,
help="It's for ascend npu training."
"For example:"
"--ascend_npus=\"0,1,2,3\" will launch four training processes each bound to one gpu."
)
base_group.add_argument("--selected_gpus", dest="gpus") base_group.add_argument("--selected_gpus", dest="gpus")
base_group.add_argument( base_group.add_argument(
...@@ -266,6 +283,16 @@ def launch_ps(args, distribute_mode): ...@@ -266,6 +283,16 @@ def launch_ps(args, distribute_mode):
def which_distributed_mode(args): def which_distributed_mode(args):
if args.run_mode is not None:
assert args.run_mode in ["collective", "ps", "ps-heter"]
if args.run_mode == "collective":
return DistributeMode.COLLECTIVE
elif args.run_mode == "ps":
return DistributeMode.PS
elif args.run_mode == "ps-heter":
return DistributeMode.PS_HETER
ps_args = [ ps_args = [
'--worker_num', '--server_num', '--heter_worker_num', '--servers', '--worker_num', '--server_num', '--heter_worker_num', '--servers',
'--workers', '--heter_workers', '--http_port' '--workers', '--heter_workers', '--http_port'
...@@ -288,22 +315,24 @@ def which_distributed_mode(args): ...@@ -288,22 +315,24 @@ def which_distributed_mode(args):
) )
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
cuda_device_num = fluid.core.get_cuda_device_count() accelerators = fluid.core.get_cuda_device_count()
if fluid.core.is_compiled_with_ascend():
accelerators = fluid.core.NPUDevice.get_device_count()
else: else:
cuda_device_num = 0 accelerators = 0
if len(has_ps_args) > 0: if len(has_ps_args) > 0:
logger.info( logger.info(
"Run parameter-sever mode. pserver arguments:{}, cuda count:{}". "Run parameter-sever mode. pserver arguments:{}, accelerators count:{}".
format(has_ps_args, cuda_device_num)) format(has_ps_args, accelerators))
has_ps_heter_args = list(set(has_ps_args) & set(ps_heter_args)) has_ps_heter_args = list(set(has_ps_args) & set(ps_heter_args))
if len(has_ps_heter_args) > 0: if len(has_ps_heter_args) > 0:
return DistributeMode.PS_HETER return DistributeMode.PS_HETER
else: else:
return DistributeMode.PS return DistributeMode.PS
elif len(has_collective_args) > 0: elif len(has_collective_args) > 0:
logger.info("Run collective gpu mode. gpu arguments:{}, cuda count:{}". logger.info("Run collective mode. gpu arguments:{}, cuda count:{}".
format(has_collective_args, cuda_device_num)) format(has_collective_args, accelerators))
return DistributeMode.COLLECTIVE return DistributeMode.COLLECTIVE
else: else:
if not fluid.core.is_compiled_with_cuda(): if not fluid.core.is_compiled_with_cuda():
......
...@@ -50,6 +50,7 @@ class DeviceMode(): ...@@ -50,6 +50,7 @@ class DeviceMode():
CPU = 0 CPU = 0
GPU = 1 GPU = 1
KUNLUN = 2 KUNLUN = 2
ASCEND_NPU = 3
UNKNOWN = 3 UNKNOWN = 3
...@@ -131,23 +132,23 @@ class JobServer(object): ...@@ -131,23 +132,23 @@ class JobServer(object):
class Trainer(object): class Trainer(object):
def __init__(self): def __init__(self):
self.gpus = [] self.accelerators = []
self.endpoint = None self.endpoint = None
self.rank = None self.rank = None
def __str__(self): def __str__(self):
return "gpu:{} endpoint:{} rank:{}".format(self.gpus, self.endpoint, return "accelerator:{} endpoint:{} rank:{}".format(self.accelerators, self.endpoint,
self.rank) self.rank)
def __eq__(self, t): def __eq__(self, t):
if len(self.gpus) != len(t.gpus): if len(self.accelerators) != len(t.accelerators):
return False return False
if self.endpoint != t.endpoint or \ if self.endpoint != t.endpoint or \
self.rank != t.rank: self.rank != t.rank:
return False return False
for a, b in zip(self.gpus, t.gpus): for a, b in zip(self.accelerators, t.accelerators):
if a != b: if a != b:
return False return False
...@@ -170,12 +171,13 @@ class Pod(object): ...@@ -170,12 +171,13 @@ class Pod(object):
self.servers = [] self.servers = []
self.workers = [] self.workers = []
self.heter_workers = [] self.heter_workers = []
self.gpus = [] self.accelerators = []
self.device_mode = None
def __str__(self): def __str__(self):
return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{} servers:{} \ return "rank:{} id:{} addr:{} port:{} visible_accelerator:{} trainers:{} servers:{} \
workers:{} heter_workers:{}".format( workers:{} heter_workers:{}".format(
self.rank, self.id, self.addr, self.port, self.gpus, [ self.rank, self.id, self.addr, self.port, self.accelerators, [
str(t) for t in self.trainers str(t) for t in self.trainers
], [str(s) for s in self.servers], [str(w) for w in self.workers], ], [str(s) for s in self.servers], [str(w) for w in self.workers],
[str(h) for h in self.heter_workers]) [str(h) for h in self.heter_workers])
...@@ -230,12 +232,12 @@ class Pod(object): ...@@ -230,12 +232,12 @@ class Pod(object):
def rank(self): def rank(self):
return self.rank return self.rank
def get_visible_gpus(self): def get_visible_accelerators(self):
r = "" r = ""
for g in self.gpus: for g in self.accelerators:
r += "{},".format(g) r += "{},".format(g)
assert r != "", "this pod {} can't see any gpus".format(self) assert r != "", "this pod {} can't see any accelerators".format(self)
r = r[:-1] r = r[:-1]
return r return r
...@@ -263,18 +265,23 @@ def get_cluster(node_ips, node_ip, trainer_endpoints, device_mode, ...@@ -263,18 +265,23 @@ def get_cluster(node_ips, node_ip, trainer_endpoints, device_mode,
pod = Pod() pod = Pod()
pod.rank = node_rank pod.rank = node_rank
pod.addr = ip pod.addr = ip
pod.device_mode=device_mode
cur_node_endpoints = trainer_endpoints[node_rank] cur_node_endpoints = trainer_endpoints[node_rank]
# when use paddlecloud, endpoints may > devices_per_proc(user_defined) # when use paddlecloud, endpoints may > devices_per_proc(user_defined)
assert len(cur_node_endpoints) >= len( assert len(cur_node_endpoints) >= len(
devices_per_proc devices_per_proc
), "current trainer_endpoints size should be greater equal than selected_gpus size." ), "current trainer_endpoints size should be greater equal than acclerators size."
for i in range(len(devices_per_proc)): for i in range(len(devices_per_proc)):
trainer = Trainer() trainer = Trainer()
if device_mode == DeviceMode.GPU: if device_mode == DeviceMode.GPU or device_mode == DeviceMode.ASCEND_NPU:
if isinstance(devices_per_proc[i], (list, tuple)): if isinstance(devices_per_proc[i], (list, tuple)):
trainer.gpus.extend(devices_per_proc[i]) trainer.accelerators.extend(devices_per_proc[i])
pod.accelerators.extend(devices_per_proc[i])
else: else:
trainer.gpus.append(devices_per_proc[i]) trainer.accelerators.append(devices_per_proc[i])
pod.accelerators.append(devices_per_proc[i])
trainer.endpoint = "%s" % (cur_node_endpoints[i]) trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.rank = trainer_rank trainer.rank = trainer_rank
trainer_rank += 1 trainer_rank += 1
...@@ -451,12 +458,17 @@ def start_local_trainers(cluster, ...@@ -451,12 +458,17 @@ def start_local_trainers(cluster,
"PADDLE_TRAINER_ID": "%d" % t.rank, "PADDLE_TRAINER_ID": "%d" % t.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
"PADDLE_RANK_IN_NODE": str(idx)
} }
if len(t.gpus) > 0: if len(t.accelerators) > 0 and pod.device_mode==DeviceMode.GPU:
proc_env["FLAGS_selected_gpus"] = "%s" % ",".join( proc_env["FLAGS_selected_gpus"] = "%s" % ",".join(
[str(g) for g in t.gpus]) [str(g) for g in t.accelerators])
if len(t.accelerators) > 0:
proc_env["FLAGS_selected_accelerators"] = "%s" % ",".join(
[str(g) for g in t.accelerators])
current_env.update(proc_env) current_env.update(proc_env)
...@@ -555,6 +567,16 @@ def watch_local_trainers(procs, nranks): ...@@ -555,6 +567,16 @@ def watch_local_trainers(procs, nranks):
return alive return alive
def get_ascend_npus(npus):
if npus is None:
count = fluid.core.NPUDevice.get_device_count()
if count <= 0:
return ret
ret = [x for x in range(count)]
else:
ret = [x.strip() for x in npus.split(',')]
return ret
def get_gpus(gpus): def get_gpus(gpus):
if gpus is None: if gpus is None:
gpus_num = fluid.core.get_cuda_device_count() gpus_num = fluid.core.get_cuda_device_count()
...@@ -585,15 +607,18 @@ def get_gpus(gpus): ...@@ -585,15 +607,18 @@ def get_gpus(gpus):
def get_device_mode(): def get_device_mode():
#TODO(gongwb):Add XPU supported if fluid.core.is_compiled_with_ascend() and \
if not fluid.core.is_compiled_with_cuda( fluid.core.NPUDevice.get_device_count() > 0:
) or fluid.core.get_cuda_device_count() <= 0: print("launch train in ascend npu mode!")
print("launch train in CPU mode") return DeviceMode.ASCEND_NPU
return DeviceMode.CPU
print("launch train in GPU mode") if fluid.core.is_compiled_with_cuda() and \
return DeviceMode.GPU fluid.core.get_cuda_device_count() > 0:
print("launch train in GPU mode!")
return DeviceMode.GPU
print("launch train in CPU mode!")
return DeviceMode.CPU
def get_device_proc_info(args): def get_device_proc_info(args):
# device_mode # device_mode
...@@ -613,6 +638,10 @@ def get_device_proc_info(args): ...@@ -613,6 +638,10 @@ def get_device_proc_info(args):
] ]
else: else:
devices_per_proc = gpus devices_per_proc = gpus
elif device_mode == DeviceMode.ASCEND_NPU:
npus = get_ascend_npus(args.ascend_npus)
assert args.nproc_per_node is None, "ascend_npus need't nproc_per_node arguments"
devices_per_proc=npus
elif device_mode == DeviceMode.CPU: elif device_mode == DeviceMode.CPU:
if args.nproc_per_node is None: if args.nproc_per_node is None:
devices_per_proc = [0] devices_per_proc = [0]
......
...@@ -16,7 +16,7 @@ import paddle.fluid.framework as framework ...@@ -16,7 +16,7 @@ import paddle.fluid.framework as framework
from paddle.fluid.optimizer import Optimizer from paddle.fluid.optimizer import Optimizer
import paddle.fluid.core as core import paddle.fluid.core as core
import numpy as np import numpy as np
import ascend_parser from . import ascend_parser
class AscendIRParser(object): class AscendIRParser(object):
...@@ -27,37 +27,25 @@ class AscendIRParser(object): ...@@ -27,37 +27,25 @@ class AscendIRParser(object):
ret_map = {} ret_map = {}
ge_in_operator = [] ge_in_operator = []
for id, var in enumerate(input_varlist): for id, var in enumerate(input_varlist):
if var.is_data: # input data if var.is_data: # input data
ge_input = core.GEOperatorFactory.create_operator( ge_input = core.GEOperatorFactory.create_operator(var.name, "Data").set_attr_int32("index", id)
var.name, "Data").set_attr_int32("index", id)
ret_map[var.name] = ge_input ret_map[var.name] = ge_input
ge_in_operator.append(ge_input) ge_in_operator.append(ge_input)
else: # param, learning ... else: # param, learning ...
ge_input = core.GEOperatorFactory.create_operator(var.name, ge_input = core.GEOperatorFactory.create_operator(var.name, "Variable")
"Variable") ge_input.update_output_desc("y", core.GETensorDesc(core.GEShape(var.shape), core.GEFormat.FORMAT_ND, core.GEDataType.DT_FLOAT))
ge_input.update_output_desc("y",
core.GETensorDesc(
core.GEShape(var.shape),
core.GEFormat.FORMAT_ND,
core.GEDataType.DT_FLOAT))
ret_map[var.name] = ge_input ret_map[var.name] = ge_input
return ge_in_operator, ret_map return ge_in_operator, ret_map
def parse_op(self, op): def parse_op(self, op):
if op.type in ascend_parser.registerd_op: if op.type in ascend_parser.registerd_op:
print("Op[%s] has been registered, begin to parse it" % (op.type)) print("Op[%s] has been registered, begin to parse it" % (op.type))
op_parser = self.parser_factory.create_parse( op_parser = self.parser_factory.create_parse(ascend_parser.registerd_op[op.type])
ascend_parser.registerd_op[op.type])
op_parser.apply(op) op_parser.apply(op)
else: else:
print("Op[%s] has not been registered, so we have to skip it" % print("Op[%s] has not been registered, so we have to skip it" % (op.type))
(op.type))
def _parse_program(self, graph_name, program, input_varlist=[], fetch_list=[]):
def _parse_program(self,
graph_name,
program,
input_varlist=[],
fetch_list=[]):
begin_graph_idx = self.graph_idx begin_graph_idx = self.graph_idx
ge_in_operator = [] ge_in_operator = []
ge_out_operator = [] ge_out_operator = []
...@@ -72,8 +60,7 @@ class AscendIRParser(object): ...@@ -72,8 +60,7 @@ class AscendIRParser(object):
ge_in_operator, self.var2geop = self._construct_input_map(input_varlist) ge_in_operator, self.var2geop = self._construct_input_map(input_varlist)
self.parser_factory = ascend_parser.AscendParserFactory(graph, self.parser_factory = ascend_parser.AscendParserFactory(graph, self.var2geop)
self.var2geop)
for i, curop in list(enumerate(block.ops)): for i, curop in list(enumerate(block.ops)):
self.parse_op(curop) self.parse_op(curop)
...@@ -110,11 +97,9 @@ class AscendIRParser(object): ...@@ -110,11 +97,9 @@ class AscendIRParser(object):
self.graph_idx += 1 self.graph_idx += 1
return graph return graph
def parse_program(self, startup_program, main_program, input_varlist, def parse_program(self, startup_program, main_program, input_varlist, fetch_list):
fetch_list):
startup_graph = self._parse_program("startup", startup_program) startup_graph = self._parse_program("startup", startup_program)
main_graph = self._parse_program("main", main_program, input_varlist, main_graph = self._parse_program("main", main_program, input_varlist, fetch_list)
fetch_list)
return startup_graph, main_graph return startup_graph, main_graph
...@@ -138,7 +123,7 @@ class AscendOptimizer(Optimizer): ...@@ -138,7 +123,7 @@ class AscendOptimizer(Optimizer):
dist_strategy.ascend = False dist_strategy.ascend = False
dist_strategy.ascend_configs = {} dist_strategy.ascend_configs = {}
def _get_input_varlist(program): def _get_input_varlist(self, program):
ret_list = [] ret_list = []
for var in program.list_vars(): for var in program.list_vars():
if var.is_data or var.persistable: if var.is_data or var.persistable:
...@@ -149,18 +134,26 @@ class AscendOptimizer(Optimizer): ...@@ -149,18 +134,26 @@ class AscendOptimizer(Optimizer):
loss, loss,
startup_program=None, startup_program=None,
parameter_list=None, parameter_list=None,
no_grad_set=None): no_grad_set=None,
minimized = self.inner_opt.minimize( auto_dp=False):
loss, startup_program=startup_program) minimized = self.inner_opt.minimize(loss, startup_program=startup_program)
self.ascend_instance = core.AscendInstance() self.ascend_instance = core.AscendInstance()
from paddle.distributed import fleet
if auto_dp and fleet.worker_num() > 1:
from paddle.fluid.transpiler import ascend_transpiler
t = ascend_transpiler.AscendTranspiler(startup_program, loss.block.program)
t.transpile()
print(loss.block.program)
# Config about Graph Engine can be found in https://support.huaweicloud.com/ # Config about Graph Engine can be found in https://support.huaweicloud.com/
config = { config = {
"ge.exec.deviceId": "0", "ge.exec.deviceId": str(fleet.rank_in_node()),
"ge.graphRunMode": "1", "ge.graphRunMode": "1",
"ge.exec.precision_mode": "must_keep_origin_dtype" "ge.exec.precision_mode": "must_keep_origin_dtype"
} }
print("ge_initialize config:", config)
core.ge_initialize(config) core.ge_initialize(config)
# Init Session # Init Session
...@@ -169,7 +162,7 @@ class AscendOptimizer(Optimizer): ...@@ -169,7 +162,7 @@ class AscendOptimizer(Optimizer):
main_block = loss.block main_block = loss.block
self.parser = AscendIRParser() self.parser = AscendIRParser()
input_varlist = _get_input_varlist(main_block.program) input_varlist = self._get_input_varlist(main_block.program)
startup_graph, main_graph = self.parser.parse_program( startup_graph, main_graph = self.parser.parse_program(
startup_program, main_block.program, input_varlist, self.fetch_list) startup_program, main_block.program, input_varlist, self.fetch_list)
......
...@@ -29,6 +29,8 @@ registerd_op = { ...@@ -29,6 +29,8 @@ registerd_op = {
"reduce_sum_grad": "ReduceSumGradParser", "reduce_sum_grad": "ReduceSumGradParser",
"matmul_grad": "MatMulGradParser", "matmul_grad": "MatMulGradParser",
"mul_grad": "MulGradParser", "mul_grad": "MulGradParser",
"reshape2": "ReshapeParser",
"scale": "ScaleParser",
"relu_grad": "ReluGradParser", "relu_grad": "ReluGradParser",
"softmax_with_cross_entropy_grad": "SoftmaxWithCrossEntropyGradParser", "softmax_with_cross_entropy_grad": "SoftmaxWithCrossEntropyGradParser",
"truncated_gaussian_random": "TruncatedNormalParser", "truncated_gaussian_random": "TruncatedNormalParser",
...@@ -60,13 +62,11 @@ class AscendHelper(object): ...@@ -60,13 +62,11 @@ class AscendHelper(object):
} }
def dtype2ge(self, dtype): def dtype2ge(self, dtype):
assert dtype in self.dtype2ge_map, "dtype[%d] is not supported %d" % ( assert dtype in self.dtype2ge_map, "dtype[%d] is not supported %d" % (dtype)
dtype)
return self.dtype2ge_map[dtype] return self.dtype2ge_map[dtype]
def dtype2np(self, index): def dtype2np(self, index):
assert index in self.dtype2np_map, "index[%d] is not supported %d" % ( assert index in self.dtype2np_map, "index[%d] is not supported %d" % (dtype)
dtype)
return self.dtype2np_map[index] return self.dtype2np_map[index]
...@@ -91,8 +91,7 @@ class AscendParserBase(object): ...@@ -91,8 +91,7 @@ class AscendParserBase(object):
self.ascend_helper = AscendHelper() self.ascend_helper = AscendHelper()
def _get_ge_input(self, input_var_name): def _get_ge_input(self, input_var_name):
assert input_var_name in self.var2geop, "var %s not created before" % ( assert input_var_name in self.var2geop, "var %s not created before" % (input_var_name)
input_var_name)
return self.var2geop[input_var_name] return self.var2geop[input_var_name]
def update_output(self, geop_list, index_list): def update_output(self, geop_list, index_list):
...@@ -113,8 +112,7 @@ class AscendParserBase(object): ...@@ -113,8 +112,7 @@ class AscendParserBase(object):
for i in range(len(arguments)): for i in range(len(arguments)):
print("assgin index_list[%d][%d] to %s" % print("assgin index_list[%d][%d] to %s" %
(output_id, i, arguments[i])) (output_id, i, arguments[i]))
self.var2geop[arguments[i]] = geop_list[index_list[ self.var2geop[arguments[i]] = geop_list[index_list[output_id][i]]
output_id][i]]
for geop in geop_list: for geop in geop_list:
self.graph.add_op(geop) self.graph.add_op(geop)
...@@ -478,15 +476,11 @@ class TruncatedNormalParser(AscendParserBase): ...@@ -478,15 +476,11 @@ class TruncatedNormalParser(AscendParserBase):
"const" + self._accumulated_op_id(), "Const").set_attr_tensor( "const" + self._accumulated_op_id(), "Const").set_attr_tensor(
"value", tensor3) "value", tensor3)
tensor4 = self._create_ge_tensor([1], dtype, mean - 2 * std) tensor4 = self._create_ge_tensor([1], dtype, mean-2*std)
min_tensor = core.GEOperatorFactory.create_operator( min_tensor = core.GEOperatorFactory.create_operator("const" + self._accumulated_op_id(), "Const").set_attr_tensor("value", tensor4)
"const" + self._accumulated_op_id(), "Const").set_attr_tensor(
"value", tensor4)
tensor5 = self._create_ge_tensor([1], dtype, mean + 2 * std) tensor5 = self._create_ge_tensor([1], dtype, mean+2*std)
max_tensor = core.GEOperatorFactory.create_operator( max_tensor = core.GEOperatorFactory.create_operator("const" + self._accumulated_op_id(), "Const").set_attr_tensor("value", tensor5)
"const" + self._accumulated_op_id(), "Const").set_attr_tensor(
"value", tensor5)
self._mark_as_input(shape_tensor) self._mark_as_input(shape_tensor)
self._mark_as_input(mean_tensor) self._mark_as_input(mean_tensor)
...@@ -527,3 +521,43 @@ class TruncatedNormalParser(AscendParserBase): ...@@ -527,3 +521,43 @@ class TruncatedNormalParser(AscendParserBase):
"self.op.output('Out')[0] is not persistable in truncated_noraml" "self.op.output('Out')[0] is not persistable in truncated_noraml"
) )
return [truncated_normal], [[0]] #[assign] return [truncated_normal], [[0]] #[assign]
class ScaleParser(AscendParserBase):
def __init__(self, graph, var2geop):
super(ScaleParser, self).__init__(graph, var2geop)
self.parser_name = "scale"
def _apply(self):
x = self._get_ge_input(self.op.input_arg_names[0])
scale = self.op.attr("scale") #self.get_ge_input(self.op.input_arg_names[1])
bias = self.op.attr("bias")
bias_after_scale = self.op.attr("bias_after_scale")
if bias_after_scale:
scale_value = core.GEOperatorFactory.create_operator("scale" + self._accumulated_op_id(), "Power").set_input("x", x).set_attr_float("power", 1.0).set_attr_float("scale", scale).set_attr_float("shift", bias)
else:
x_add_bias = core.GEOperatorFactory.create_operator("adds" + self._accumulated_op_id(), "Adds").set_input("x", x).set_attr_float("value", bias) #set_input("x2", bias)
scale_value = core.GEOperatorFactory.create_operator("scale" + self._accumulated_op_id(), "Power").set_input("x", x_add_bias).set_attr_float("power", 1.0).set_attr_float("scale", scale).set_attr_float("shift", 0.0)
#tensor_zeros = core.GEOperatorFactory.create_operator("zeroslike" + self.getid(), "ZerosLike").set_input("x", x)
#bias_ = self.create_ge_tensor([1], 5, bias)
#const_bias = core.GEOperatorFactory.create_operator("const" + self.getid(), "Const").set_attr_tensor("value", tensor_bias)
return [scale_value],[[0]]
class ReshapeParser(AscendParserBase):
def __init__(self, graph, var2geop):
super(ReshapeParser, self).__init__(graph, var2geop)
self.parser_name = "reshape2"
def _apply(self):
print("swbuf:", self.op.input_arg_names)
shape = self.op.attr("shape")
axis = 0
if shape[0] == -1:
axis = 1
shape = shape[1:]
print("shape: ", shape)
data_x1_shape = self._get_ge_input(self.op.input_arg_names[0])
tensor = self._create_ge_tensor([len(shape)], 2, shape)
const_shape = core.GEOperatorFactory.create_operator("shape" + self._accumulated_op_id(), "Const").set_attr_tensor("value", tensor)
reshape = core.GEOperatorFactory.create_operator("reshape" + self._accumulated_op_id(), "Reshape").set_input("x", data_x1_shape).set_input("shape", const_shape).set_attr_int32("axis", axis)
return [reshape, reshape], [[0],[1]]
...@@ -61,8 +61,9 @@ class GraphExecutionOptimizer(MetaOptimizerBase): ...@@ -61,8 +61,9 @@ class GraphExecutionOptimizer(MetaOptimizerBase):
trainer_endpoints_env = ",".join(trainer_endpoints) trainer_endpoints_env = ",".join(trainer_endpoints)
trainers_num = self.role_maker._worker_num() trainers_num = self.role_maker._worker_num()
if trainer_id == 0: # FIXME(wangxi): approve this.
wait_server_ready(other_trainers) #if trainer_id == 0:
# wait_server_ready(other_trainers)
nccl_id_var = startup_program.global_block().create_var( nccl_id_var = startup_program.global_block().create_var(
name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW) name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW)
......
...@@ -38,6 +38,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleetrun) ...@@ -38,6 +38,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleetrun)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_run_random_port) list(APPEND MIXED_DIST_TEST_OPS test_fleet_run_random_port)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_async) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_async)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_ascend)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input)
list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer)
...@@ -521,6 +522,7 @@ if(WITH_DISTRIBUTE) ...@@ -521,6 +522,7 @@ if(WITH_DISTRIBUTE)
bash_test_modules(test_fleet_run_random_port START_BASH test_fleet_run_random_port.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_run_random_port START_BASH test_fleet_run_random_port.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_async START_BASH test_fleet_launch_async.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_async START_BASH test_fleet_launch_async.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_cloud START_BASH test_fleet_launch_cloud.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_cloud START_BASH test_fleet_launch_cloud.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_ascend START_BASH test_fleet_launch_ascend.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_nproc START_BASH test_fleet_launch_nproc.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch_nproc START_BASH test_fleet_launch_nproc.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
# port range (20000, 23000) is reserved for dist-ops # port range (20000, 23000) is reserved for dist-ops
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
def train(prefix):
selected_accelerators = os.getenv("FLAGS_selected_accelerators")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(','))
details = "selected_accelerators:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\
.format(selected_accelerators, worker_endpoints, trainers_num, current_endpoint,trainer_id)
print(details)
with open("multi_process_{}.check_{}.log".format(prefix, trainer_id), "w") as f:
f.write(details)
if __name__ == '__main__':
prefix = sys.argv[1]
train(prefix)
#!/bin/bash
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# use paddlecloud
echo "begin test use paddlecloud"
cluster_node_ips="127.0.0.1,127.0.0.2"
export PADDLE_TRAINERS_NUM=2
export POD_IP=127.0.0.1
export PADDLE_TRAINERS=127.0.0.1,127.0.0.2
export PADDLE_TRAINER_ID=0
export PADDLE_PORT=35789
export TRAINER_PORTS_NUM=2
distributed_args="--ips=${cluster_node_ips} --ascend_npus=0,1 --log_dir=testlog"
python -m paddle.distributed.fleet.launch ${distributed_args} ascend_multi_process_collective.py fleetlaunchascend
str1="selected_accelerators:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35789 trainer_id:0"
str2="selected_accelerators:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35790 trainer_id:1"
file_0="multi_process_fleetlaunchascend.check_0.log"
file_1="multi_process_fleetlaunchascend.check_1.log"
echo "paddlecloud params test"
if grep -q "$str1" "$file_0"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
if grep -q "$str2" "$file_1"; then
echo "find trainer 1"
else
echo "not find trainer 1"
exit -1
fi
# test async poll process
if [ -f $file_0 ]; then
rm $file_0
fi
if [ -f $file_1 ]; then
rm $file_1
fi
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import collective
from .. import core
OpRole = core.op_proto_and_checker_maker.OpRole
class AscendTranspiler(collective.Collective):
def __init__(self, startup_program, main_program):
self.nrings = 1
super(AscendTranspiler, self).__init__(self.nrings)
self._startup_program = startup_program
self._main_program = main_program
def _insert_allreduce_ops(self):
block = self._main_program.global_block()
ring_id = -1
grad = None
for idx, op in reversed(list(enumerate(block.ops))):
if self._is_backward_op(op) and \
self.op_role_var_key in op.attr_names:
op_role_var = op.all_attrs()[self.op_role_var_key]
if len(op_role_var) == 0:
continue
assert len(op_role_var) % 2 == 0
offset = idx
for i in range(0, len(op_role_var), 2):
param = block.vars[op_role_var[i]]
grad = block.vars[op_role_var[i + 1]]
if param.is_distributed:
continue
# As we search ops reversedly, we should insert c_allreduce_sum
# op in the same way to keep the ring_id alternate
ring_id = (ring_id + 1) % self.nrings
block._insert_op(
offset + 1,
type='allreduce',
inputs={'X': grad},
outputs={'Out': grad},
attrs={
'ring_id': ring_id,
self.op_role_key: OpRole.Backward
})
if grad is None:
return
def transpile(self):
self._insert_allreduce_ops()
...@@ -148,6 +148,7 @@ packages=['paddle', ...@@ -148,6 +148,7 @@ packages=['paddle',
'paddle.distributed.fleet.base', 'paddle.distributed.fleet.base',
'paddle.distributed.fleet.meta_optimizers', 'paddle.distributed.fleet.meta_optimizers',
'paddle.distributed.fleet.meta_optimizers.sharding', 'paddle.distributed.fleet.meta_optimizers.sharding',
'paddle.distributed.fleet.meta_optimizers.ascend',
'paddle.distributed.fleet.runtime', 'paddle.distributed.fleet.runtime',
'paddle.distributed.fleet.dataset', 'paddle.distributed.fleet.dataset',
'paddle.distributed.fleet.data_generator', 'paddle.distributed.fleet.data_generator',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册