未验证 提交 83e55cff 编写于 作者: R ronnywang 提交者: GitHub

[NPU] Added HCCL backend support in dygraph mode (#36285)

* Added HCCL backend support in dynamic graph mode

* fix segmentation fault

* add ut
上级 e58ac121
......@@ -26,11 +26,15 @@ if(NOT WIN32)
cc_library(bkcl_context SRCS bkcl_context.cc DEPS collective_helper device_context tensor var_type_traits)
cc_library(reducer SRCS reducer.cc DEPS layer)
endif()
if(WITH_ASCEND_CL)
cc_library(hccl_context SRCS hccl_context.cc DEPS collective_helper device_context tensor var_type_traits)
cc_library(reducer SRCS reducer.cc DEPS layer)
endif()
cc_library(data_loader SRCS data_loader.cc DEPS enforce)
endif(NOT WIN32)
if(WITH_GLOO)
cc_library(imperative_gloo_context SRCS gloo_context.cc DEPS collective_helper device_context tensor var_type_traits)
if ( WIN32 OR (NOT (WITH_NCCL OR WITH_RCCL OR WITH_XPU_BKCL) ))
if ( WIN32 OR (NOT (WITH_NCCL OR WITH_RCCL OR WITH_XPU_BKCL OR WITH_ASCEND_CL) ))
cc_library(reducer SRCS reducer.cc DEPS layer)
endif()
endif()
......
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/imperative/hccl_context.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/hccl_helper.h"
namespace paddle {
namespace framework {
class Variable;
} // namespace framework
} // namespace paddle
namespace paddle {
namespace imperative {
static void AllReduce(const framework::Tensor &src, framework::Tensor *dst,
const aclrtStream stream,
const platform::HCCLComm *comm) {
const auto &place = src.place();
PADDLE_ENFORCE_EQ(
platform::is_npu_place(place), true,
platform::errors::Unimplemented(
"Imperative mode does not support multi-CPU training yet."));
void *src_ptr = const_cast<void *>(src.data<void>());
dst->Resize(src.dims());
void *dst_ptr = dst->mutable_data(src.place(), src.type());
HcclDataType hccl_dtype = platform::ToHCCLDataType(src.type());
PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::HcclAllReduce(
src_ptr, dst_ptr, src.numel(), hccl_dtype, HCCL_REDUCE_SUM, comm->comm(),
reinterpret_cast<void *>(stream)));
}
void HCCLParallelContext::BcastHCCLId(
std::vector<HcclRootInfo> &hccl_ids, // NOLINT
int root, int server_fd) {
if (strategy_.local_rank_ == root) {
std::vector<std::string> other_trainers;
for (auto &ep : strategy_.trainer_endpoints_) {
if (ep != strategy_.current_endpoint_) {
other_trainers.push_back(ep);
}
}
platform::SendBroadCastCommID(other_trainers, &hccl_ids);
} else {
platform::RecvBroadCastCommID(server_fd, strategy_.current_endpoint_,
&hccl_ids);
}
}
void HCCLParallelContext::Init() {
int server_fd = -1;
std::vector<HcclRootInfo> hccl_ids;
hccl_ids.resize(strategy_.nrings_);
if (strategy_.local_rank_ == 0) {
// generate the unique hcclid on the root worker
for (size_t i = 0; i < hccl_ids.size(); ++i) {
platform::dynload::HcclGetRootInfo(&hccl_ids[i]);
}
} else {
server_fd = platform::SocketServer::GetInstance(strategy_.current_endpoint_)
.socket();
}
BcastHCCLId(hccl_ids, 0, server_fd);
int npu_id = BOOST_GET_CONST(platform::NPUPlace, place_).device;
for (int ring_id = 0; ring_id < strategy_.nrings_; ring_id++) {
VLOG(0) << "init hccl context nranks: " << strategy_.nranks_
<< " local rank: " << strategy_.local_rank_ << " npu id: " << npu_id
<< " ring id: " << ring_id;
// it will assign hccl_comm in NPUDeviceContext within ring_id
platform::HCCLCommContext::Instance().CreateHCCLComm(
&hccl_ids[ring_id], strategy_.nranks_, strategy_.local_rank_, npu_id,
ring_id);
compute_events_.emplace_back(platform::NpuEventResourcePool::Instance().New(
BOOST_GET_CONST(platform::NPUPlace, place_).device));
comm_events_.emplace_back(platform::NpuEventResourcePool::Instance().New(
BOOST_GET_CONST(platform::NPUPlace, place_).device));
}
}
void HCCLParallelContext::InitWithRingID(int ring_id) {
int server_fd = -1;
std::vector<HcclRootInfo> hccl_ids;
hccl_ids.resize(1);
if (strategy_.local_rank_ == 0) {
// generate the unique hcclid on the root worker
platform::dynload::HcclGetRootInfo(&hccl_ids[0]);
} else {
server_fd = platform::SocketServer::GetInstance(strategy_.current_endpoint_)
.socket();
}
BcastHCCLId(hccl_ids, 0, server_fd);
int npu_id = BOOST_GET_CONST(platform::NPUPlace, place_).device;
VLOG(0) << "init hccl context nranks: " << strategy_.nranks_
<< " local rank: " << strategy_.local_rank_ << " npu id: " << npu_id
<< " ring id: " << ring_id;
// it will assign hccl_comm in NPUDeviceContext within ring_id
platform::HCCLCommContext::Instance().CreateHCCLComm(
&hccl_ids[0], strategy_.nranks_, strategy_.local_rank_, npu_id, ring_id);
compute_events_.emplace_back(platform::NpuEventResourcePool::Instance().New(
BOOST_GET_CONST(platform::NPUPlace, place_).device));
comm_events_.emplace_back(platform::NpuEventResourcePool::Instance().New(
BOOST_GET_CONST(platform::NPUPlace, place_).device));
}
void HCCLParallelContext::AllReduceByStream(const framework::Variable &src,
framework::Variable *dst,
int ring_id, bool use_calc_stream) {
PADDLE_ENFORCE_EQ(
platform::is_npu_place(place_), true,
platform::errors::Unimplemented(
"Dynamic graph mode does not support multi-CPU training yet."));
auto *dev_ctx = static_cast<platform::NPUDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_));
platform::HCCLComm *comm =
platform::HCCLCommContext::Instance().Get(ring_id, place_);
aclrtStream stream = use_calc_stream ? dev_ctx->stream() : comm->stream();
if (src.IsType<framework::LoDTensor>()) {
if (!dst->IsType<framework::LoDTensor>()) {
dst->Clear();
}
AllReduce(src.Get<framework::LoDTensor>(),
dst->GetMutable<framework::LoDTensor>(), stream, comm);
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"XPU unsupported variable type %s for imperative allreduce, only "
"LoDTensor are supported.",
platform::demangle(framework::ToTypeName(src.Type()))));
}
}
paddle::platform::DeviceContext *HCCLParallelContext::GetDeviceContext(
int ring_id) {
return static_cast<platform::DeviceContext *>(
platform::HCCLCommContext::Instance()
.Get(ring_id, place_)
->dev_context());
}
void HCCLParallelContext::WaitCompute(int ring_id) {
PADDLE_ENFORCE_GE(ring_id, 0, platform::errors::OutOfRange(
"ring id must >= 0, but got %d", ring_id));
PADDLE_ENFORCE_LT(ring_id, compute_events_.size(),
platform::errors::OutOfRange(
"ring id must < compute events size,"
"but got ring id = %d, compute events size = %d",
ring_id, compute_events_.size()));
auto compute_stream = static_cast<platform::NPUDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
auto comm_stream =
platform::HCCLCommContext::Instance().Get(ring_id, place_)->stream();
auto event = compute_events_[ring_id].get();
// compute_stream-->event-->comm_stream
PADDLE_ENFORCE_NPU_SUCCESS(aclrtRecordEvent(event, compute_stream));
PADDLE_ENFORCE_NPU_SUCCESS(aclrtStreamWaitEvent(comm_stream, event));
}
void HCCLParallelContext::WaitComm(int ring_id) {
PADDLE_ENFORCE_GE(ring_id, 0, platform::errors::OutOfRange(
"ring id must >= 0, but got %d", ring_id));
PADDLE_ENFORCE_LT(ring_id, comm_events_.size(),
platform::errors::OutOfRange(
"ring id must < comm events size,"
"but got ring id = %d, comm events size = %d",
ring_id, comm_events_.size()));
auto compute_stream = static_cast<platform::NPUDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
auto comm_stream =
platform::HCCLCommContext::Instance().Get(ring_id, place_)->stream();
auto event = comm_events_[ring_id].get();
// comm_stream-->event-->compute_stream
PADDLE_ENFORCE_NPU_SUCCESS(aclrtRecordEvent(event, comm_stream));
PADDLE_ENFORCE_NPU_SUCCESS(aclrtStreamWaitEvent(compute_stream, event));
}
void HCCLParallelContext::SynchronizeCompute() {
auto *compute_dev_ctx = static_cast<platform::NPUDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_));
compute_dev_ctx->Wait();
}
} // namespace imperative
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef PADDLE_WITH_ASCEND_CL
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/imperative/parallel_context.h"
#include "paddle/fluid/platform/dynload/hccl.h"
#include "paddle/fluid/platform/npu_resource_pool.h"
namespace paddle {
namespace framework {
class Variable;
} // namespace framework
} // namespace paddle
namespace paddle {
namespace imperative {
class HCCLParallelContext : public ParallelContext {
public:
explicit HCCLParallelContext(const ParallelStrategy& strategy,
const platform::Place& place)
: ParallelContext(strategy, place) {}
~HCCLParallelContext() override = default;
void BcastHCCLId(std::vector<HcclRootInfo>& hccl_ids, int root, // NOLINT
int server_fd);
void Init() override;
void InitWithRingID(int ring_id) override;
void AllReduceByStream(const framework::Variable& src,
framework::Variable* dst, int ring_id,
bool use_calc_stream) override;
paddle::platform::DeviceContext* GetDeviceContext(int ring_id) override;
void WaitCompute(int ring_id) override;
void WaitComm(int ring_id) override;
void SynchronizeCompute() override;
private:
// used for comm wait compute, compute_stream-->event-->comm_stream[ring_id]
std::vector<std::shared_ptr<platform::NpuStreamObject>> compute_events_;
// used for compute wait comm, comm_stream[ring_id]-->event-->compute_stream
std::vector<std::shared_ptr<platform::NpuEventObject>> comm_events_;
};
} // namespace imperative
} // namespace paddle
#endif
......@@ -228,6 +228,16 @@ void Group::ConcatTensors(const platform::DeviceContext &context) {
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat xpu grads since it's not compiled with BKCL,"
"Please recompile or reinstall Paddle with BKCL support."));
#endif
} else if (platform::is_npu_place(place)) {
#ifdef PADDLE_WITH_ASCEND_CL
ConcatTensorsWithType(
static_cast<const platform::NPUDeviceContext &>(context),
dense_tensors_, &dense_contents_, dtype_);
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat npu grads since it's not compiled with HCCL,"
"Please recompile or reinstall Paddle with HCCL support."));
#endif
} else if (platform::is_cpu_place(place)) {
ConcatTensorsWithType(
......@@ -260,6 +270,16 @@ void Group::SplitTensors(const platform::DeviceContext &context) {
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split xpu grad since it's not compiled with BKCL,"
"Please recompile or reinstall Paddle with BKCL support."));
#endif
} else if (platform::is_npu_place(place)) {
#ifdef PADDLE_WITH_ASCEND_CL
SplitTensorsWithType(
static_cast<const platform::NPUDeviceContext &>(context),
&dense_contents_, &dense_tensors_, dtype_);
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split npu grad since it's not compiled with HCCL,"
"Please recompile or reinstall Paddle with HCCL support."));
#endif
} else if (platform::is_cpu_place(place)) {
SplitTensorsWithType(
......
......@@ -44,7 +44,11 @@ if (WITH_ASCEND_CL)
endif()
# please add new math_library in alphabetical order
if (WITH_ASCEND_CL)
math_library(concat_and_split DEPS npu_op_runner)
else()
math_library(concat_and_split)
endif()
math_library(context_project DEPS im2col math_function)
math_library(cross_entropy)
math_library(cos_sim_functor)
......
......@@ -13,6 +13,9 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/math/concat_and_split.h"
#ifdef PADDLE_WITH_ASCEND_CL
#include "paddle/fluid/operators/npu_op_runner.h"
#endif
namespace paddle {
namespace framework {
......@@ -215,6 +218,79 @@ class SplitFunctor<platform::XPUDeviceContext, T> {
};
#endif
#ifdef PADDLE_WITH_ASCEND_CL
template <typename T>
class ConcatFunctor<platform::NPUDeviceContext, T> {
public:
void operator()(const platform::NPUDeviceContext& context,
const std::vector<framework::Tensor>& input, int axis,
framework::Tensor* output) {
int dev_id =
BOOST_GET_CONST(platform::NPUPlace, context.GetPlace()).GetDeviceId();
platform::NPUDeviceGuard guard(dev_id);
std::vector<std::string> names;
for (size_t i = 0; i < input.size(); ++i) {
names.push_back("x" + std::to_string(i));
}
NpuOpRunner runner{
"ConcatD",
{input},
{*output},
{{"concat_dim", axis}, {"N", static_cast<int>(input.size())}}};
runner.AddInputNames(names);
runner.Run(context.stream());
}
};
template <typename T>
class SplitFunctor<platform::NPUDeviceContext, T> {
public:
void operator()(const platform::NPUDeviceContext& context,
const framework::Tensor& input,
const std::vector<const framework::Tensor*>& ref_inputs,
const int axis, std::vector<framework::Tensor*>* outputs) {
if (input.numel() == 0) {
return;
}
size_t num = outputs->size();
int input_rows = 1;
auto dim_0 = ref_inputs[0]->dims();
for (int i = 0; i < axis; ++i) {
input_rows *= dim_0[i];
}
int input_cols = 0;
std::vector<int64_t> output_cols(outputs->size());
for (size_t i = 0; i < num; ++i) {
int t_cols = ref_inputs[i]->numel() / input_rows;
input_cols += t_cols;
output_cols[i] = t_cols;
}
auto npu_place = BOOST_GET_CONST(platform::NPUPlace, context.GetPlace());
// computation
for (int k = 0; k < input_rows; ++k) {
const T* src_ptr = input.data<T>() + k * input_cols;
int col_idx = 0;
for (size_t j = 0; j < num; ++j) {
int col_len = output_cols[j];
auto* out_tensor = outputs->at(j);
if (out_tensor != nullptr) {
T* dst_ptr = out_tensor->data<T>() + k * col_len;
memory::Copy(npu_place, dst_ptr, npu_place, src_ptr + col_idx,
sizeof(T) * col_len, context.stream());
}
col_idx += col_len;
}
}
}
};
#endif
#define DEFINE_FUNCTOR(type) \
template class ConcatFunctor<platform::CPUDeviceContext, type>; \
template class SplitFunctor<platform::CPUDeviceContext, type>;
......@@ -229,6 +305,14 @@ FOR_ALL_TYPES(DEFINE_FUNCTOR);
DEFINE_XPU_FUNCTOR(float)
#endif
#ifdef PADDLE_WITH_ASCEND_CL
#define DEFINE_NPU_FUNCTOR(type) \
template class ConcatFunctor<platform::NPUDeviceContext, type>; \
template class SplitFunctor<platform::NPUDeviceContext, type>;
FOR_ALL_TYPES(DEFINE_NPU_FUNCTOR)
#endif
} // namespace math
} // namespace operators
} // namespace paddle
......@@ -104,6 +104,11 @@ inline void StridedNumelCopyWithAxis(const platform::DeviceContext& ctx,
reinterpret_cast<const platform::CUDADeviceContext&>(ctx);
memory::Copy(gpu_place, dst + i * dst_after, gpu_place,
src + i * src_after, sizeof(T) * size, cuda_ctx.stream());
#elif defined(PADDLE_WITH_ASCEND_CL)
auto& npu_place = BOOST_GET_CONST(platform::NPUPlace, place);
auto& npu_ctx = reinterpret_cast<const platform::NPUDeviceContext&>(ctx);
memory::Copy(npu_place, dst + i * dst_after, npu_place,
src + i * src_after, sizeof(T) * size, npu_ctx.stream());
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Paddle is not compiled with GPU."));
......
......@@ -33,6 +33,9 @@ if(NOT WIN32)
if (WITH_NCCL OR WITH_RCCL)
set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context)
endif()
if (WITH_ASCEND_CL)
set(PYBIND_DEPS ${PYBIND_DEPS} hccl_context)
endif()
endif(NOT WIN32)
if(WITH_PYTHON)
......@@ -117,6 +120,10 @@ if(WITH_PYTHON)
list(APPEND OP_FUNCTION_GENERETOR_DEPS bkcl_context)
endif(WITH_XPU_BKCL)
if(WITH_ASCEND_CL)
list(APPEND OP_FUNCTION_GENERETOR_DEPS hccl_context)
endif(WITH_ASCEND_CL)
add_executable(op_function_generator op_function_generator.cc)
target_link_libraries(op_function_generator ${OP_FUNCTION_GENERETOR_DEPS})
......
......@@ -36,6 +36,7 @@ limitations under the License. */
#include "paddle/fluid/imperative/bkcl_context.h"
#include "paddle/fluid/imperative/data_loader.h"
#include "paddle/fluid/imperative/gloo_context.h"
#include "paddle/fluid/imperative/hccl_context.h"
#include "paddle/fluid/imperative/hooks.h"
#include "paddle/fluid/imperative/layer.h"
#include "paddle/fluid/imperative/nccl_context.h"
......@@ -2333,6 +2334,18 @@ void BindImperative(py::module *m_ptr) {
py::arg("ring_id"));
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
py::class_<imperative::HCCLParallelContext, imperative::ParallelContext,
std::shared_ptr<imperative::HCCLParallelContext>>(
m, "HCCLParallelContext")
.def(py::init<const imperative::ParallelStrategy &,
const platform::NPUPlace &>())
.def("init", [](imperative::HCCLParallelContext &self) { self.Init(); })
.def("init_with_ring_id",
&imperative::HCCLParallelContext::InitWithRingID,
py::arg("ring_id"));
#endif
m.def("pylayer_apply",
[](const platform::CPUPlace &place, const py::object &cls,
const py::args args, const py::kwargs kwargs) {
......
......@@ -119,6 +119,7 @@ limitations under the License. */
#endif
#ifdef PADDLE_WITH_ASCEND_CL
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/npu_info.h"
#include "paddle/fluid/platform/npu_profiler.h"
#endif
......@@ -2453,6 +2454,8 @@ All parameter, weight, gradient are variables in Paddle.
#ifdef PADDLE_WITH_ASCEND_CL
m.def("get_npu_device_count", platform::GetNPUDeviceCount);
m.def("npu_finalize", []() {
platform::HCCLCommContext::Instance().ReleaseHCCLComms();
auto &pool = platform::DeviceContextPool::Instance();
auto devices = platform::GetSelectedNPUDevices();
for (size_t i = 0; i < devices.size(); ++i) {
......
......@@ -264,6 +264,10 @@ def new_group(ranks=None, backend=None):
place = core.CUDAPlace(genv.device_id)
core.NCCLParallelContext(strategy,
place).init_with_ring_id(ring_id)
elif core.is_compiled_with_npu():
place = core.NPUPlace(genv.device_id)
core.HCCLParallelContext(strategy,
place).init_with_ring_id(ring_id)
else:
assert False, ("no cuda device found")
else:
......
......@@ -58,8 +58,10 @@ def _start_kv_server(port, http_server_d, size):
def _is_cpuonly(backend):
check_backend(backend)
if backend in ['auto', 'nccl', 'bkcl'] and (core.is_compiled_with_cuda() or
core.is_compiled_with_xpu()):
if backend in ['auto', 'nccl', 'bkcl', 'hccl'] and (
core.is_compiled_with_cuda() or core.is_compiled_with_xpu() or
core.is_compiled_with_npu()):
# passes 'auto' and can use cuda or xpu, use the default logics. so return False
return False
else:
......@@ -142,7 +144,7 @@ def init_parallel_env():
is_cpu_only = _is_cpuonly(backend)
# 1. gpu xpu check, must be gpu or xpu,
if not (is_cpu_only or core.is_compiled_with_cuda() or
core.is_compiled_with_xpu()):
core.is_compiled_with_xpu() or core.is_compiled_with_npu()):
raise NotImplementedError(
"If you want to use CPU-only version, please use 'gloo' as backend")
......@@ -204,6 +206,8 @@ def init_parallel_env():
place = core.CUDAPlace(parallel_env.device_id)
elif core.is_compiled_with_xpu():
place = core.XPUPlace(parallel_env.device_id)
elif core.is_compiled_with_npu():
place = core.NPUPlace(parallel_env.device_id)
_set_expected_place(place)
# init nccl or bkcl context
......@@ -216,6 +220,9 @@ def init_parallel_env():
elif core.is_compiled_with_xpu():
parallel_helper._set_parallel_ctx(
core.BKCLParallelContext(strategy, place))
elif core.is_compiled_with_npu():
parallel_helper._set_parallel_ctx(
core.HCCLParallelContext(strategy, place))
other_endpoints = strategy.trainer_endpoints[:]
other_endpoints.remove(strategy.current_endpoint)
......
......@@ -207,9 +207,9 @@ class Conv2D(layers.Layer):
if core.is_compiled_with_npu():
if (self._num_channels == self._groups and
self._num_channels == self._num_filters):
l_type = 'depthwise_conv2d'
self._l_type = 'depthwise_conv2d'
else:
l_type = 'conv2d'
self._l_type = 'conv2d'
self._num_channels = num_channels
if self._groups is None:
......
......@@ -62,9 +62,12 @@ def prepare_context(strategy=None):
elif isinstance(place, core.XPUPlace):
parallel_helper._set_parallel_ctx(
core.BKCLParallelContext(strategy, place))
elif isinstance(place, core.NPUPlace):
parallel_helper._set_parallel_ctx(
core.HCCLParallelContext(strategy, place))
else:
# TODO(Yancey1989): add Gloo Parallel Context to support CPU parallel computation
assert ("Only support CUDAPlace or XPUPlace for now.")
assert ("Only support CUDAPlace or XPUPlace or NPUPlace for now.")
parallel_helper._init_parallel_ctx()
return strategy
......@@ -122,6 +125,9 @@ class ParallelEnv(object):
elif core.is_compiled_with_xpu():
selected_xpus = os.getenv("FLAGS_selected_xpus", "0").split(",")
self._device_id = int(selected_xpus[0])
elif core.is_compiled_with_npu():
selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",")
self._device_id = int(selected_npus[0])
self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS",
"").split(",")
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import unittest
sys.path.append("..")
from test_dist_base import TestDistBase
import paddle.fluid as fluid
flag_name = os.path.splitext(__file__)[0]
rank_table_file = b"""{
"status": "completed",
"version": "1.0",
"server_count": "1",
"server_list": [
{
"server_id": "127.0.0.1",
"device": [
{
"device_id": "0",
"device_ip": "192.1.184.23",
"rank_id": "0"
},
{
"device_id": "1",
"device_ip": "192.2.21.93",
"rank_id": "1"
}
]
}
]
}"""
need_envs = {
"ASCEND_AICPU_PATH":
os.getenv("ASCEND_AICPU_PATH", "/usr/local/Ascend/nnae/latest"),
"ASCEND_OPP_PATH":
os.getenv("ASCEND_OPP_PATH", "/usr/local/Ascend/nnae/latest/opp"),
"HCCL_CONNECT_TIMEOUT": "7200",
"HCCL_WHITELIST_DISABLE": "1",
"HCCL_SECURITY_MODE": "1",
"RANK_TABLE_FILE": "rank_table_file.json",
}
class TestParallelDygraphMnistNPU(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._hccl_mode = True
self._dygraph = True
self._enforce_place = "NPU"
def test_mnist(self):
with open("rank_table_file.json", "wb") as f:
f.write(rank_table_file)
if fluid.core.is_compiled_with_npu():
self.check_with_place(
os.path.abspath('../parallel_dygraph_mnist.py'),
delta=1e-3,
check_error_log=True,
need_envs=need_envs,
log_name=flag_name)
class TestFleetDygraphMnistNPU(TestParallelDygraphMnistNPU):
def _setup_config(self):
self._sync_mode = False
self._hccl_mode = True
self._dygraph = True
self._enforce_place = "NPU"
self._use_fleet_api = True
if __name__ == "__main__":
unittest.main()
......@@ -551,6 +551,9 @@ class TestParallelDyGraphRunnerBase(object):
elif fluid.core.is_compiled_with_xpu():
device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
place = fluid.XPUPlace(device_id)
elif fluid.core.is_compiled_with_npu():
device_id = int(os.getenv("FLAGS_selected_npus", "0"))
place = fluid.NPUPlace(device_id)
else:
assert ("Only support CUDAPlace or XPUPlace or CPU(Gloo) for now.")
......@@ -564,7 +567,7 @@ class TestParallelDyGraphRunnerBase(object):
nranks = len(args.endpoints.split(",")) if args.endpoints else 1
#if args.update_method == "nccl2":
if args.update_method == "nccl2" or args.update_method == "bkcl":
if args.update_method == "nccl2" or args.update_method == "bkcl" or args.update_method == "hccl":
strategy = dygraph.parallel.ParallelStrategy()
strategy.nranks = nranks
strategy.local_rank = args.trainer_id
......@@ -671,12 +674,12 @@ class TestParallelDyGraphRunnerBase(object):
strategy.find_unused_parameters = True
# 3. init parallel env
if args.update_method == "nccl2" or "bkcl":
if args.update_method == "nccl2" or "bkcl" or "hccl":
fleet.init(is_collective=True, strategy=strategy)
# 4. train model
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2" or "bkcl":
if args.update_method == "nccl2" or "bkcl" or "hccl":
opt = fleet.distributed_optimizer(opt)
model = fleet.distributed_model(model)
......@@ -706,7 +709,8 @@ def runtime_main(test_class):
type=str,
default="local",
choices=[
"pserver", "nccl2", "bkcl", "local", "nccl2_reduce_layer", "gloo"
"pserver", "nccl2", "bkcl", "local", "nccl2_reduce_layer", "gloo",
"hccl"
])
parser.add_argument('--trainer_id', type=int, required=False, default=0)
parser.add_argument('--trainers', type=int, required=False, default=1)
......@@ -728,6 +732,7 @@ def runtime_main(test_class):
parser.add_argument('--use_cpu', action='store_true')
parser.add_argument('--use_xpu', action='store_true')
parser.add_argument('--use_dgc', action='store_true')
parser.add_argument('--use_npu', action='store_true')
parser.add_argument('--accumulate_gradient', action='store_true')
parser.add_argument('--find_unused_parameters', action='store_true')
parser.add_argument('--use_reduce', action='store_true')
......@@ -784,13 +789,21 @@ class TestDistBase(unittest.TestCase):
self.__use_cuda = False
self.__use_xpu = False
self._use_dgc = False
self.__use_npu = False
elif self._enforce_place == "GPU":
self.__use_cuda = True
self.__use_xpu = False
self.__use_npu = False
elif self._enforce_place == "XPU":
self.__use_cuda = False
self.__use_xpu = True
self._use_dgc = False
self.__use_npu = False
elif self._enforce_place == "NPU":
self.__use_cuda = False
self.__use_xpu = False
self._use_dgc = False
self.__use_npu = True
else:
if fluid.core.is_compiled_with_cuda():
self.__use_cuda = True
......@@ -815,6 +828,7 @@ class TestDistBase(unittest.TestCase):
self._nccl2_mode = False
self._bkcl_mode = False
self._gloo_mode = False # now, support gloo backend
self._hccl_mode = False
self._pipeline_mode = False
self._mp_mode = False
self._diff_batch = False
......@@ -953,6 +967,13 @@ class TestDistBase(unittest.TestCase):
"PADDLE_TRAINERS_NUM": "1",
"PADDLE_TRAINER_ID": "0"
}
elif self.__use_npu:
cmd += " --use_npu"
env_local = {
"FLAGS_selected_npus": devices,
"PADDLE_TRAINERS_NUM": "1",
"PADDLE_TRAINER_ID": "0"
}
else:
env_local = {'CPU_NUM': '1'}
......@@ -1199,6 +1220,16 @@ class TestDistBase(unittest.TestCase):
"PADDLE_CURRENT_ENDPOINT": ep,
"GLOG_v": "2",
})
elif self.__use_npu:
tr_cmd += " --use_npu"
env.update({
"FLAGS_selected_npus": "{}".format(trainer_id),
"PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
"PADDLE_TRAINER_ID": "{}".format(trainer_id),
"PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
"PADDLE_CURRENT_ENDPOINT": ep,
"GLOG_v": "2",
})
else:
env.update({'CPU_NUM': '1'})
......@@ -1471,6 +1502,13 @@ class TestDistBase(unittest.TestCase):
update_method='gloo',
check_error_log=check_error_log,
log_name=log_name)
elif self._hccl_mode:
tr0_losses, tr1_losses = self._run_cluster_nccl2(
model_file,
required_envs,
update_method='hccl',
check_error_log=check_error_log,
log_name=log_name)
elif self._pipeline_mode:
tr0_losses, tr1_losses = self._run_pipeline(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册