未验证 提交 2194ad15 编写于 作者: L liuyuhui 提交者: GitHub

[Kunlun]add collective ops for multi XPU cards training and add Kunlun multi XPU cards CI (#32302)

上级 9ff85561
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_allreduce_op.h"
namespace paddle {
namespace platform {
struct XPUPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_XPU_KERNEL(c_allreduce_max,
ops::CAllReduceOpXPUKernel<ops::kRedMax, float>)
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_allreduce_op.h"
namespace paddle {
namespace platform {
struct XPUPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_XPU_KERNEL(c_allreduce_min,
ops::CAllReduceOpXPUKernel<ops::kRedMin, float>)
...@@ -20,11 +20,19 @@ limitations under the License. */ ...@@ -20,11 +20,19 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/collective_helper.h"
#endif
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/platform/nccl_helper.h" #include "paddle/fluid/platform/nccl_helper.h"
#endif #endif
#if defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
#if defined(PADDLE_WITH_GLOO) #if defined(PADDLE_WITH_GLOO)
#include <gloo/allreduce.h> #include <gloo/allreduce.h>
#include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h"
...@@ -105,6 +113,68 @@ class CAllReduceOpCPUKernel : public framework::OpKernel<T> { ...@@ -105,6 +113,68 @@ class CAllReduceOpCPUKernel : public framework::OpKernel<T> {
} }
}; };
template <ReduceType red_type, typename T>
class CAllReduceOpXPUKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_XPU_BKCL)
auto in = ctx.Input<framework::Tensor>("X");
auto out = ctx.Output<framework::Tensor>("Out");
auto place = ctx.GetPlace();
BKCLDataType dtype = platform::ToBKCLDataType(in->type());
int64_t numel = in->numel();
const void* sendbuff = in->data<void>();
out->Resize(in->dims());
void* recvbuff = out->mutable_data<T>(place);
int rid = ctx.Attr<int>("ring_id");
auto comm = platform::BKCLCommContext::Instance().Get(rid, place);
XPUStream stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::XPUDeviceContext*>(dev_ctx)
->x_context()
->xpu_stream;
} else {
stream = comm->stream();
}
BKCLOp bkcl_red_type = BKCL_ADD;
switch (red_type) {
case kRedSum:
bkcl_red_type = BKCL_ADD;
break;
case kRedMax:
bkcl_red_type = BKCL_MAX;
break;
case kRedMin:
bkcl_red_type = BKCL_MIN;
break;
case kRedProd:
bkcl_red_type = BKCL_PRODUCT;
break;
default:
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid reduce type: %d", red_type));
}
PADDLE_ENFORCE_EQ(bkcl_all_reduce(comm->comm(), sendbuff, recvbuff, numel,
dtype, bkcl_red_type, stream),
BKCL_SUCCESS, platform::errors::PreconditionNotMet(
"BKCL all reduce failed"));
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should be compiled with XPU."));
#endif
}
};
template <ReduceType red_type, typename T> template <ReduceType red_type, typename T>
class CAllReduceOpCUDAKernel : public framework::OpKernel<T> { class CAllReduceOpCUDAKernel : public framework::OpKernel<T> {
public: public:
......
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_allreduce_op.h"
namespace paddle {
namespace platform {
struct XPUPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_XPU_KERNEL(c_allreduce_prod,
ops::CAllReduceOpXPUKernel<ops::kRedProd, float>)
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_allreduce_op.h"
namespace paddle {
namespace platform {
struct XPUPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_XPU_KERNEL(c_allreduce_sum,
ops::CAllReduceOpXPUKernel<ops::kRedSum, float>)
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_reduce_op.h"
namespace paddle {
namespace platform {
struct XPUPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_XPU_KERNEL(c_reduce_max,
ops::CReduceOpXPUKernel<ops::kRedMax, float>)
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_reduce_op.h"
namespace paddle {
namespace platform {
struct XPUPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_XPU_KERNEL(c_reduce_min,
ops::CReduceOpXPUKernel<ops::kRedMin, float>)
...@@ -24,10 +24,19 @@ limitations under the License. */ ...@@ -24,10 +24,19 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/collective_helper.h"
#endif
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/platform/nccl_helper.h" #include "paddle/fluid/platform/nccl_helper.h"
#endif #endif
#if defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
#if defined(PADDLE_WITH_GLOO) #if defined(PADDLE_WITH_GLOO)
#include <gloo/reduce.h> #include <gloo/reduce.h>
#include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h"
...@@ -110,6 +119,69 @@ class CReduceOpCPUKernel : public framework::OpKernel<T> { ...@@ -110,6 +119,69 @@ class CReduceOpCPUKernel : public framework::OpKernel<T> {
} }
}; };
template <ReduceType red_type, typename T>
class CReduceOpXPUKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_XPU_BKCL)
auto in = ctx.Input<framework::Tensor>("X");
auto out = ctx.Output<framework::Tensor>("Out");
auto place = ctx.GetPlace();
BKCLDataType dtype = platform::ToBKCLDataType(in->type());
int64_t numel = in->numel();
const void* sendbuff = in->data<void>();
out->Resize(in->dims());
void* recvbuff = out->mutable_data<T>(place);
int rid = ctx.Attr<int>("ring_id");
int root = ctx.Attr<int>("root_id");
auto comm = platform::BKCLCommContext::Instance().Get(rid, place);
XPUStream stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::XPUDeviceContext*>(dev_ctx)
->x_context()
->xpu_stream;
} else {
stream = comm->stream();
}
BKCLOp bkcl_red_type = BKCL_ADD;
switch (red_type) {
case kRedSum:
bkcl_red_type = BKCL_ADD;
break;
case kRedMax:
bkcl_red_type = BKCL_MAX;
break;
case kRedMin:
bkcl_red_type = BKCL_MIN;
break;
case kRedProd:
bkcl_red_type = BKCL_PRODUCT;
break;
default:
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid reduce type: %d", red_type));
}
PADDLE_ENFORCE_EQ(bkcl_reduce(comm->comm(), sendbuff, recvbuff, numel,
dtype, bkcl_red_type, root, stream),
BKCL_SUCCESS, platform::errors::PreconditionNotMet(
"BKCL all reduce failed"));
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should be compiled with XPU."));
#endif
}
};
template <ReduceType red_type, typename T> template <ReduceType red_type, typename T>
class CReduceOpCUDAKernel : public framework::OpKernel<T> { class CReduceOpCUDAKernel : public framework::OpKernel<T> {
public: public:
......
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_reduce_op.h"
namespace paddle {
namespace platform {
struct XPUPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_XPU_KERNEL(c_reduce_prod,
ops::CReduceOpXPUKernel<ops::kRedProd, float>)
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_reduce_op.h"
namespace paddle {
namespace platform {
struct XPUPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_XPU_KERNEL(c_reduce_sum,
ops::CReduceOpXPUKernel<ops::kRedSum, float>)
...@@ -627,6 +627,12 @@ if (WITH_XPU) ...@@ -627,6 +627,12 @@ if (WITH_XPU)
add_subdirectory(xpu) add_subdirectory(xpu)
endif() endif()
# dist xpu tests:
if (WITH_XPU_BKCL)
py_test(test_collective_reduce_api_xpu SRCS "test_collective_reduce_api.py")
py_test(test_collective_allreduce_api_xpu SRCS "test_collective_allreduce_api.py")
endif()
if (WITH_ASCEND_CL) if (WITH_ASCEND_CL)
add_subdirectory(npu) add_subdirectory(npu)
endif() endif()
......
...@@ -27,9 +27,15 @@ class TestCollectiveAllreduceAPI(TestDistBase): ...@@ -27,9 +27,15 @@ class TestCollectiveAllreduceAPI(TestDistBase):
pass pass
def test_allreduce_nccl(self): def test_allreduce_nccl(self):
if paddle.fluid.core.is_compiled_with_cuda():
self.check_with_place("collective_allreduce_api.py", "allreduce", self.check_with_place("collective_allreduce_api.py", "allreduce",
"nccl") "nccl")
def test_allreduce_bkcl(self):
if paddle.fluid.core.is_compiled_with_xpu():
self.check_with_place("collective_allreduce_api.py", "allreduce",
"bkcl")
def test_allreduce_gloo(self): def test_allreduce_gloo(self):
self.check_with_place("collective_allreduce_api.py", "allreduce", self.check_with_place("collective_allreduce_api.py", "allreduce",
"gloo", "2") "gloo", "2")
......
...@@ -50,6 +50,9 @@ class TestCollectiveAPIRunnerBase(object): ...@@ -50,6 +50,9 @@ class TestCollectiveAPIRunnerBase(object):
device_id = int(os.getenv("FLAGS_selected_gpus", "0")) device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace( place = fluid.CUDAPlace(
device_id) #if args.use_gpu else fluid.CPUPlace() device_id) #if args.use_gpu else fluid.CPUPlace()
elif args['backend'] == 'bkcl':
device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
place = fluid.XPUPlace(device_id)
else: else:
place = fluid.CPUPlace() place = fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
...@@ -71,7 +74,6 @@ class TestCollectiveAPIRunnerBase(object): ...@@ -71,7 +74,6 @@ class TestCollectiveAPIRunnerBase(object):
def runtime_main(test_class, col_type): def runtime_main(test_class, col_type):
args = {} args = {}
model = test_class() model = test_class()
args["deviceid"] = os.getenv("FLAGS_selected_gpus")
args["trainerid"] = int(os.getenv("PADDLE_TRAINER_ID")) args["trainerid"] = int(os.getenv("PADDLE_TRAINER_ID"))
args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM")) args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM"))
args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS') args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS')
...@@ -112,6 +114,7 @@ class TestDistBase(unittest.TestCase): ...@@ -112,6 +114,7 @@ class TestDistBase(unittest.TestCase):
worker_endpoints = self._ps_endpoints.split(",") worker_endpoints = self._ps_endpoints.split(",")
w0_ep, w1_ep = worker_endpoints w0_ep, w1_ep = worker_endpoints
#print("w0_ep:",w0_ep," w1_ep:",w1_ep) #print("w0_ep:",w0_ep," w1_ep:",w1_ep)
if core.is_compiled_with_cuda():
env0 = { env0 = {
"FLAGS_selected_gpus": "0", "FLAGS_selected_gpus": "0",
"PADDLE_TRAINER_ID": "0", "PADDLE_TRAINER_ID": "0",
...@@ -127,6 +130,22 @@ class TestDistBase(unittest.TestCase): ...@@ -127,6 +130,22 @@ class TestDistBase(unittest.TestCase):
"PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints, "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
"PADDLE_CURRENT_ENDPOINT": w1_ep "PADDLE_CURRENT_ENDPOINT": w1_ep
} }
elif core.is_compiled_with_xpu():
env0 = {
"FLAGS_selected_xpus": "0",
"PADDLE_TRAINER_ID": "0",
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
"PADDLE_CURRENT_ENDPOINT": w0_ep
}
env1 = {
"FLAGS_selected_xpus": "1",
"PADDLE_TRAINER_ID": "1",
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
"PADDLE_CURRENT_ENDPOINT": w1_ep
}
#update environment #update environment
env0.update(envs) env0.update(envs)
env1.update(envs) env1.update(envs)
...@@ -169,7 +188,10 @@ class TestDistBase(unittest.TestCase): ...@@ -169,7 +188,10 @@ class TestDistBase(unittest.TestCase):
path_id="0", path_id="0",
check_error_log=False, check_error_log=False,
need_envs={}): need_envs={}):
with_gloo = '0' if backend == "nccl" else '1' if backend == "nccl" or backend == "bkcl":
with_gloo = '0'
else:
with_gloo = '1'
required_envs = { required_envs = {
"FLAGS_fraction_of_gpu_memory_to_use": "0.15", "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_eager_delete_tensor_gb": "0.0", "FLAGS_eager_delete_tensor_gb": "0.0",
......
...@@ -27,8 +27,13 @@ class TestCollectiveReduceAPI(TestDistBase): ...@@ -27,8 +27,13 @@ class TestCollectiveReduceAPI(TestDistBase):
pass pass
def test_reduce_nccl(self): def test_reduce_nccl(self):
if paddle.fluid.core.is_compiled_with_cuda():
self.check_with_place("collective_reduce_api.py", "reduce", "nccl") self.check_with_place("collective_reduce_api.py", "reduce", "nccl")
def test_reduce_bkcl(self):
if paddle.fluid.core.is_compiled_with_xpu():
self.check_with_place("collective_reduce_api.py", "reduce", "bkcl")
def test_reduce_gloo(self): def test_reduce_gloo(self):
self.check_with_place("collective_reduce_api.py", "reduce", "gloo", "1") self.check_with_place("collective_reduce_api.py", "reduce", "gloo", "1")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册