未验证 提交 c594f576 编写于 作者: L lw921014 提交者: GitHub

add c_reduce_sum op (#31793)

add c_reduce_sum op
上级 228bce12
......@@ -448,6 +448,7 @@ function(cc_test TARGET_NAME)
"${TARGET_NAME}" STREQUAL "c_reducescatter_op_npu_test" OR
"${TARGET_NAME}" STREQUAL "c_allgather_op_npu_test" OR
"${TARGET_NAME}" STREQUAL "send_v2_op_npu_test" OR
"${TARGET_NAME}" STREQUAL "c_reduce_sum_op_npu_test" OR
"${TARGET_NAME}" STREQUAL "recv_v2_op_npu_test"))
cc_test_run(${TARGET_NAME}
COMMAND ${TARGET_NAME}
......
......@@ -45,6 +45,8 @@ if(WITH_ASCEND_CL)
DEPS c_allreduce_sum_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM})
cc_test(c_allreduce_max_op_npu_test SRCS c_allreduce_max_op_npu_test.cc
DEPS c_allreduce_max_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM})
cc_test(c_reduce_sum_op_npu_test SRCS c_reduce_sum_op_npu_test.cc
DEPS c_reduce_sum_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM})
cc_test(c_reducescatter_op_npu_test SRCS c_reducescatter_op_npu_test.cc
DEPS c_reducescatter_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM})
cc_test(c_allgather_op_npu_test SRCS c_allgather_op_npu_test.cc
......
......@@ -53,7 +53,7 @@ void PrintDebugInfo(const std::string preStr, const std::vector<T> &data){
for (auto ele : data) {
debugstring += std::to_string(ele) + std::string(",");
}
VLOG(2) << preStr << ":" << std::endl <<debugstring;
VLOG(3) << preStr << ":" << std::endl <<debugstring;
}
void Prepare(f::Scope* scope, const p::DeviceContext& ctx){
......@@ -80,7 +80,7 @@ void Prepare(f::Scope* scope, const p::DeviceContext& ctx){
ctx.Wait();
}
void TestHCCLAllReduceOp(f::Scope* scope, const p::DeviceContext& ctx) {
void TestHCCLAllReduceOp(f::Scope* scope, const p::DeviceContext& ctx, int iter) {
// init
auto x = scope->Var("X");
auto tensor_x = x->GetMutable<f::LoDTensor>();
......@@ -109,7 +109,7 @@ void TestHCCLAllReduceOp(f::Scope* scope, const p::DeviceContext& ctx) {
// run
f::AttributeMap attrs;
attrs["tag"]=std::string("tagx");
attrs["tag"]=std::string("tagx_"+ std::to_string(iter));
attrs["ring_id"]=0;
auto op = f::OpRegistry::CreateOp("c_allreduce_sum",
......@@ -141,5 +141,8 @@ TEST(c_allreduce_sum, NPU) {
p::NPUDeviceContext ctx(p::NPUPlace(atoi(FLAGS_selected_npus.c_str())));
Prepare(&scope, ctx);
TestHCCLAllReduceOp(&scope, ctx);
for(int i = 0; i < 1; i ++){
VLOG(2) << "iter num: " << i;
TestHCCLAllReduceOp(&scope, ctx, i);
}
}
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_reduce_op.h"
namespace paddle {
namespace platform {
struct ASCENDPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_NPU_KERNEL(c_reduce_max,
ops::CReduceOpASCENDKernel<ops::kRedMax, int>,
ops::CReduceOpASCENDKernel<ops::kRedMax, int8_t>,
ops::CReduceOpASCENDKernel<ops::kRedMax, float>,
ops::CReduceOpASCENDKernel<ops::kRedMax, plat::float16>)
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_reduce_op.h"
namespace paddle {
namespace platform {
struct ASCENDPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_NPU_KERNEL(c_reduce_min,
ops::CReduceOpASCENDKernel<ops::kRedMin, int>,
ops::CReduceOpASCENDKernel<ops::kRedMin, int8_t>,
ops::CReduceOpASCENDKernel<ops::kRedMin, float>,
ops::CReduceOpASCENDKernel<ops::kRedMin, plat::float16>)
......@@ -28,11 +28,17 @@ limitations under the License. */
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/nccl_helper.h"
#endif
#if defined(PADDLE_WITH_GLOO)
#include <gloo/reduce.h>
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/hccl_helper.h"
#endif
namespace paddle {
namespace operators {
......@@ -110,6 +116,116 @@ class CReduceOpCPUKernel : public framework::OpKernel<T> {
}
};
template <ReduceType red_type, typename T>
class CReduceOpASCENDKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_ASCEND_CL)
// we need to pre-allocate 512 Bytes before the data
// and 512 Bytes after the data, so the hccl allreduce
// can work. This is a must acooding to huawei peer.
#define PRE_MALLOC_SIZE_BYTES 512
auto in = ctx.Input<framework::LoDTensor>("X");
auto out = ctx.Output<framework::LoDTensor>("Out");
auto place = ctx.GetPlace();
hcclDataType_t dtype = platform::ToHCCLDataType(in->type());
int64_t numel = in->numel();
int64_t pre_tmp_size = PRE_MALLOC_SIZE_BYTES / sizeof(T);
int64_t tmp_numel = numel + pre_tmp_size * 2;
paddle::framework::LoDTensor tmp_in, tmp_out;
tmp_in.Resize({tmp_numel});
tmp_out.Resize({tmp_numel});
auto p_tmp_in = tmp_in.mutable_data<T>(place); // allocate
auto p_tmp_out = tmp_out.mutable_data<T>(place); // allocate
void* sendbuff = reinterpret_cast<void*>(tmp_in.data<T>() + pre_tmp_size);
void* recvbuff = reinterpret_cast<void*>(tmp_out.data<T>() + pre_tmp_size);
std::string tag = ctx.Attr<std::string>("tag");
int ring_id = ctx.Attr<int>("ring_id");
int root_id = ctx.Attr<int>("root_id");
std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id);
auto comm = paddle::platform::HCCLCommContext::Instance().Get(ring_id, place);
aclrtStream stream = nullptr;
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
if (ctx.Attr<bool>("use_calc_stream")) {
stream = static_cast<platform::NPUDeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
int rank_id = comm->rank();
// we need to memset this memory firstly to avoid core by hccl
platform::NPUMemsetAsync(static_cast<void*>(p_tmp_in), 0, tmp_numel*sizeof(T), stream);
platform::NPUMemsetAsync(static_cast<void*>(p_tmp_out), 0, tmp_numel*sizeof(T), stream);
auto npu_place = BOOST_GET_CONST(platform::NPUPlace, place);
memory::Copy(npu_place, sendbuff,
npu_place, reinterpret_cast<void*>(const_cast<T*>(in->data<T>())),
numel * sizeof(T),
stream);
hcclRedOp_t hccl_red_type = HCCL_REP_OP_SUM;
switch (red_type) {
case kRedSum:
hccl_red_type = HCCL_REP_OP_SUM;
break;
case kRedMax:
hccl_red_type = HCCL_REP_OP_MAX;
break;
case kRedMin:
hccl_red_type = HCCL_REP_OP_MIN;
break;
case kRedProd:
hccl_red_type = HCCL_REP_OP_PROD;
break;
default:
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid reduce type: %d", red_type));
}
VLOG(3) << "begin hccl reduce, parameter is: "
<< "input num: " << numel
<< "root_id: " << root_id
<< "dtype: " << dtype
<< "hccl_red_type: " << hccl_red_type
<< ", group is: " << group
<< ", tag is " << tag;
PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::hcom_all_reduce(
tag.c_str(), sendbuff, recvbuff, numel, dtype, hccl_red_type, group.c_str(), (void*)stream));
if(rank_id == root_id){
memory::Copy(npu_place, reinterpret_cast<void*>(out->data<T>()),
npu_place, recvbuff,
numel * sizeof(T),
stream);
}else{
memory::Copy(npu_place, reinterpret_cast<void*>(out->data<T>()),
npu_place, reinterpret_cast<void*>(const_cast<T*>(in->data<T>())),
numel * sizeof(T),
stream);
}
out->Resize(in->dims());
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with NPU."));
#endif
}
};
template <ReduceType red_type, typename T>
class CReduceOpCUDAKernel : public framework::OpKernel<T> {
public:
......@@ -179,6 +295,10 @@ class CReduceOpMaker : public framework::OpProtoAndCheckerMaker {
AddOutput("Out", "(Tensor) the reduced result.");
AddAttr<int>("ring_id", "(int default 0) communication ring id.")
.SetDefault(0);
#if defined(PADDLE_WITH_ASCEND_CL)
AddAttr<std::string>("tag", "(string default tag) tag for reduce.")
.SetDefault("tag");
#endif
AddAttr<int>("root_id", "(int default 0) root id.").SetDefault(0);
AddAttr<bool>(
"use_calc_stream",
......
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_reduce_op.h"
namespace paddle {
namespace platform {
struct ASCENDPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_NPU_KERNEL(c_reduce_prod,
ops::CReduceOpASCENDKernel<ops::kRedProd, int>,
ops::CReduceOpASCENDKernel<ops::kRedProd, int8_t>,
ops::CReduceOpASCENDKernel<ops::kRedProd, float>,
ops::CReduceOpASCENDKernel<ops::kRedProd, plat::float16>)
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_reduce_op.h"
namespace paddle {
namespace platform {
struct ASCENDPlace;
struct float16;
} // namespace platform
} // namespace paddle
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_NPU_KERNEL(c_reduce_sum,
ops::CReduceOpASCENDKernel<ops::kRedSum, int>,
ops::CReduceOpASCENDKernel<ops::kRedSum, int8_t>,
ops::CReduceOpASCENDKernel<ops::kRedSum, float>,
ops::CReduceOpASCENDKernel<ops::kRedSum, plat::float16>)
/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#ifndef _WIN32
#include <unistd.h>
#endif
#include <string>
#include <thread> // NOLINT
#include <vector>
#include <stdio.h>
#include "gtest/gtest.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/operators/dropout_op.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/collective/c_reduce_op.h"
#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/hccl_helper.h"
#endif
namespace f = paddle::framework;
namespace p = paddle::platform;
namespace m = paddle::operators::math;
USE_OP(c_reduce_sum);
USE_NO_KERNEL_OP(c_comm_init_hcom);
USE_OP_DEVICE_KERNEL(c_reduce_sum, NPU);
DECLARE_string(selected_npus);
template<typename T>
void PrintDebugInfo(const std::string preStr, const std::vector<T> &data){
std::string debugstring = "";
for (auto ele : data) {
debugstring += std::to_string(ele) + std::string(",");
}
VLOG(3) << preStr << ":" << std::endl <<debugstring;
}
void Prepare(f::Scope* scope, const p::DeviceContext& ctx){
int rank_id = atoi(getenv("RANK_ID"));
int device_id = atoi(getenv("DEVICE_ID"));
VLOG(2) << "rank_id = " << rank_id
<< "; device_id = " << device_id
<< "; rank_id = " << rank_id
<< "; RANK_TABLE_FILE = " << atoi(getenv("RANK_TABLE_FILE"));
std::vector<int> rank_ids{0, 1};
f::AttributeMap comm_init_attrs;
comm_init_attrs["ring_id"] = 0;
comm_init_attrs["nranks"] = 2;
comm_init_attrs["rank"] = rank_id;
comm_init_attrs["device_id"] = device_id;
comm_init_attrs["rank_ids"] = rank_ids;
auto comm_init_op =
f::OpRegistry::CreateOp("c_comm_init_hcom", {}, {}, comm_init_attrs);
auto place = ctx.GetPlace();
comm_init_op->Run(*scope, place);
ctx.Wait();
}
void TestHCCLReduceOp(f::Scope* scope, const p::DeviceContext& ctx, int iter) {
// init
auto x = scope->Var("X");
auto tensor_x = x->GetMutable<f::LoDTensor>();
int rank_id = atoi(getenv("RANK_ID"));
int num1 = 3;
int num2 = 128;
std::vector<float> init;
for (int64_t i = 0; i < num1 * num2; ++i) {
init.push_back(1.0 + rank_id);
}
PrintDebugInfo("input data", init);
auto place = ctx.GetPlace();
TensorFromVector(init, ctx, tensor_x);
tensor_x->Resize({num1, num2});
ctx.Wait();
auto out = scope->Var("Out");
auto tensor_out = out->GetMutable<f::LoDTensor>();
tensor_out->Resize({num1, num2});
tensor_out->mutable_data<float>(place); // allocate
ctx.Wait();
// run
f::AttributeMap attrs;
attrs["tag"]=std::string("tagx_"+ std::to_string(iter));
attrs["ring_id"]=0;
int root_id = 0;
attrs["root_id"]=root_id;
auto op = f::OpRegistry::CreateOp("c_reduce_sum",
{{"X", {"X"}}},
{{"Out", {"Out"}}},
attrs);
op->Run(*scope, place);
ctx.Wait();
std::vector<float> out_vec;
TensorToVector(*tensor_out, ctx, &out_vec);
ctx.Wait();
PrintDebugInfo("output data", out_vec);
EXPECT_EQ(out_vec.size(), init.size());
for (uint32_t i = 0; i < out_vec.size(); i++) {
if(rank_id == root_id){
EXPECT_EQ(out_vec[i], 3.0);
}
else{
EXPECT_EQ(out_vec[i], init[i]);
}
}
}
TEST(c_reduce_sum, NPU) {
f::Scope scope;
// only support one device, if more than one device, use first default
p::NPUDeviceContext ctx(p::NPUPlace(atoi(FLAGS_selected_npus.c_str())));
Prepare(&scope, ctx);
for(int i = 0; i < 2; i ++){
VLOG(2) << "iter num: " << i;
TestHCCLReduceOp(&scope, ctx, i);
}
}
......@@ -119,7 +119,8 @@ void TestHCCLReduceScatterOp(f::Scope* scope, const p::DeviceContext& ctx) {
auto op = f::OpRegistry::CreateOp("c_reducescatter", {{"X", {"X"}}},
{{"Out", {"Out"}}}, attrs);
for (int i = 0; i < 10; i ++) {
int iter_num = 10;
for (int i = 0; i < iter_num; i ++) {
op->Run(*scope, place);
}
ctx.Wait();
......@@ -131,7 +132,7 @@ void TestHCCLReduceScatterOp(f::Scope* scope, const p::DeviceContext& ctx) {
PrintDebugInfo("output data", out_vec);
EXPECT_EQ(out_vec.size(), init.size() / 2);
for (uint32_t i = 0; i < out_vec.size(); i++) {
EXPECT_EQ(out_vec[i], 2.0);
EXPECT_EQ(out_vec[i], iter_num + 1);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册