未验证 提交 f1fdddfd 编写于 作者: X xiayanming 提交者: GitHub

[NPU] Support npu kernel for c sync stream op (#31386)

* sync stream npu op

* add with_ascend_acl

* update c++ unittest
上级 e1c33a6d
...@@ -53,4 +53,8 @@ if(WITH_ASCEND_CL) ...@@ -53,4 +53,8 @@ if(WITH_ASCEND_CL)
DEPS send_v2_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM}) DEPS send_v2_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM})
cc_test(recv_v2_op_npu_test SRCS recv_v2_op_npu_test.cc cc_test(recv_v2_op_npu_test SRCS recv_v2_op_npu_test.cc
DEPS recv_v2_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM}) DEPS recv_v2_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM})
cc_test(c_sync_comm_stream_op_npu_test SRCS c_sync_comm_stream_op_npu_test.cc
DEPS op_registry c_broadcast_op c_comm_init_hcom_op c_sync_comm_stream_op ${COLLECTIVE_DEPS} ascend_hccl dynamic_loader dynload_warpctc scope device_context enforce executor)
cc_test(c_sync_calc_stream_op_npu_test SRCS c_sync_calc_stream_op_npu_test.cc
DEPS op_registry elementwise_add_op c_sync_calc_stream_op ${COLLECTIVE_DEPS} ascend_hccl dynamic_loader dynload_warpctc scope device_context enforce executor)
endif() endif()
...@@ -34,13 +34,23 @@ class CSyncCalcStreamOp : public framework::OperatorBase { ...@@ -34,13 +34,23 @@ class CSyncCalcStreamOp : public framework::OperatorBase {
void RunImpl(const framework::Scope& scope, void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override { const platform::Place& place) const override {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
PADDLE_ENFORCE_EQ(is_gpu_place(place), true, PADDLE_ENFORCE_EQ(is_gpu_place(place), true,
platform::errors::PreconditionNotMet( platform::errors::PreconditionNotMet(
"Sync stream op can run on gpu place only for now.")); "Sync stream op can run on gpu place only for now."));
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
auto dev_ctx = static_cast<platform::CUDADeviceContext*>( auto dev_ctx = static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place)); platform::DeviceContextPool::Instance().Get(place));
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(dev_ctx->stream())); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(dev_ctx->stream()));
#elif defined(PADDLE_WITH_ASCEND_CL) && !defined(_WIN32)
PADDLE_ENFORCE_EQ(is_npu_place(place), true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on npu place only for now."));
auto dev_ctx = static_cast<platform::NPUDeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
PADDLE_ENFORCE_NPU_SUCCESS(aclrtSynchronizeStream(dev_ctx->stream()));
#else #else
PADDLE_THROW(platform::errors::PreconditionNotMet( PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU.")); "PaddlePaddle should compile with GPU."));
......
/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#ifndef _WIN32
#include <unistd.h>
#endif
#include <stdio.h>
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "gtest/gtest.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/string/printf.h"
namespace f = paddle::framework;
namespace p = paddle::platform;
namespace m = paddle::operators::math;
USE_OP(elementwise_add);
USE_OP_DEVICE_KERNEL(elementwise_add, NPU);
USE_NO_KERNEL_OP(c_sync_calc_stream);
template <typename T>
void Compare(f::Scope* scope, const p::DeviceContext& ctx) {
// init
auto x = scope->Var("X");
auto tensor_x = x->GetMutable<f::LoDTensor>();
auto y = scope->Var("Y");
auto tensor_y = y->GetMutable<f::LoDTensor>();
std::vector<T> init_x;
for (int64_t i = 0; i < 10 * 10; ++i) {
init_x.push_back(static_cast<T>(1.0));
}
std::vector<T> init_y;
for (int64_t i = 0; i < 10 * 10; ++i) {
init_y.push_back(static_cast<T>(2.0));
}
TensorFromVector(init_x, ctx, tensor_x);
tensor_x->Resize({10, 10});
TensorFromVector(init_y, ctx, tensor_y);
tensor_y->Resize({10, 10});
f::AttributeMap attrs;
auto place = ctx.GetPlace();
auto out = scope->Var("Out");
auto tensor_out = out->GetMutable<f::LoDTensor>();
// sync data
auto sync_op0 = f::OpRegistry::CreateOp("c_sync_calc_stream", {{"X", {"X"}}},
{{"Out", {"Out"}}}, attrs);
sync_op0->Run(*scope, place);
// run
auto op =
f::OpRegistry::CreateOp("elementwise_add", {{"X", {"X"}}, {"Y", {"Y"}}},
{{"Out", {"Out"}}}, attrs);
op->Run(*scope, place);
// sync op run
auto sync_op = f::OpRegistry::CreateOp("c_sync_calc_stream", {{"X", {"X"}}},
{{"Out", {"Out"}}}, attrs);
sync_op->Run(*scope, place);
std::vector<T> out_vec;
TensorToVector(*tensor_out, ctx, &out_vec);
// sync op copy
auto sync_op2 = f::OpRegistry::CreateOp("c_sync_calc_stream", {{"X", {"X"}}},
{{"Out", {"Out"}}}, attrs);
sync_op2->Run(*scope, place);
float expected = 3.0;
EXPECT_EQ(out_vec.size(), init_x.size());
for (uint32_t i = 0; i < out_vec.size(); i++) {
EXPECT_EQ(out_vec[i], static_cast<T>(expected));
}
}
TEST(c_sync_calc_stream, NPU_fp32) {
f::Scope scope;
p::NPUDeviceContext ctx(p::NPUPlace(0));
Compare<float>(&scope, ctx);
}
...@@ -24,6 +24,11 @@ class Scope; ...@@ -24,6 +24,11 @@ class Scope;
#include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/collective_helper.h"
#endif #endif
#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/hccl_helper.h"
#endif
namespace paddle { namespace paddle {
namespace operators { namespace operators {
...@@ -37,22 +42,34 @@ class CSyncCommStreamOp : public framework::OperatorBase { ...@@ -37,22 +42,34 @@ class CSyncCommStreamOp : public framework::OperatorBase {
void RunImpl(const framework::Scope& scope, void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override { const platform::Place& place) const override {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(is_gpu_place(place), true, PADDLE_ENFORCE_EQ(is_gpu_place(place), true,
platform::errors::PreconditionNotMet( platform::errors::PreconditionNotMet(
"Sync stream op can run on gpu place only for now.")); "Sync stream op can run on gpu place only for now."));
#if defined(PADDLE_WITH_NCCL)
int ring_id = Attr<int>("ring_id"); int ring_id = Attr<int>("ring_id");
auto stream = auto stream =
platform::NCCLCommContext::Instance().Get(ring_id, place)->stream(); platform::NCCLCommContext::Instance().Get(ring_id, place)->stream();
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream));
#elif defined(PADDLE_WITH_ASCEND_CL)
PADDLE_ENFORCE_EQ(is_npu_place(place), true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on npu place only for now."));
int ring_id = Attr<int>("ring_id");
auto stream =
platform::HCCLCommContext::Instance().Get(ring_id, place)->stream();
PADDLE_ENFORCE_NPU_SUCCESS(aclrtSynchronizeStream(stream));
#else #else
PADDLE_THROW(platform::errors::PreconditionNotMet( PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU.")); "PaddlePaddle should compile with GPU or NPU."));
#endif #endif
} }
}; };
class CSyncCommStreamOpMaker : public framework::OpProtoAndCheckerMaker { class CSyncCommStreamOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
void Make() { void Make() {
......
/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#ifndef _WIN32
#include <unistd.h>
#endif
#include <stdio.h>
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "gtest/gtest.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/dropout_op.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/string/printf.h"
#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/hccl_helper.h"
#endif
namespace f = paddle::framework;
namespace p = paddle::platform;
namespace m = paddle::operators::math;
USE_OP(c_broadcast);
USE_NO_KERNEL_OP(c_sync_comm_stream);
USE_NO_KERNEL_OP(c_comm_init_hcom);
USE_OP_DEVICE_KERNEL(c_broadcast, NPU);
void Prepare(f::Scope* scope, const p::DeviceContext& ctx) {
int rank_id = atoi(getenv("RANK_ID"));
int device_id = atoi(getenv("DEVICE_ID"));
printf("rank_id = %d, device_id = %d\n", rank_id, device_id);
std::vector<int> rank_ids{0, 1};
f::AttributeMap comm_init_attrs;
comm_init_attrs["ring_id"] = 0;
comm_init_attrs["nranks"] = 2;
comm_init_attrs["rank"] = rank_id;
comm_init_attrs["device_id"] = device_id;
comm_init_attrs["rank_ids"] = rank_ids;
auto comm_init_op =
f::OpRegistry::CreateOp("c_comm_init_hcom", {}, {}, comm_init_attrs);
auto place = ctx.GetPlace();
comm_init_op->Run(*scope, place);
ctx.Wait();
}
void TestHCCLBroadcastOp(f::Scope* scope, const p::DeviceContext& ctx) {
std::cout << "BEGIN TEST:" << __FUNCTION__ << std::endl;
// init
auto x = scope->Var("X");
auto tensor_x = x->GetMutable<f::LoDTensor>();
int num = 2;
std::vector<float> init;
int rank_id = atoi(getenv("RANK_ID"));
std::cout << "rank_id:" << rank_id << std::endl;
for (int64_t i = 0; i < num * num; ++i) {
init.push_back(1.0 + rank_id);
std::cout << init[0];
}
std::cout << std::endl;
TensorFromVector(init, ctx, tensor_x);
tensor_x->Resize({num, num});
ctx.Wait();
auto place = ctx.GetPlace();
auto out = scope->Var("Out");
auto tensor_out = out->GetMutable<f::LoDTensor>();
tensor_out->Resize({num, num});
tensor_out->mutable_data<float>(place); // allocate
ctx.Wait();
// run
f::AttributeMap attrs;
attrs["tag"] = std::string("tagx");
attrs["root"] = 0;
attrs["ring_id"] = 0;
auto op = f::OpRegistry::CreateOp("c_broadcast", {{"X", {"X"}}},
{{"Out", {"Out"}}}, attrs);
op->Run(*scope, place);
// comm sync
auto sync_op = f::OpRegistry::CreateOp("c_sync_comm_stream", {{"X", {"X"}}},
{{"Out", {"Out"}}}, attrs);
sync_op->Run(*scope, place);
// ctx.Wait();
std::vector<float> out_vec;
TensorToVector(*tensor_out, ctx, &out_vec);
EXPECT_EQ(out_vec.size(), init.size());
for (uint32_t i = 0; i < out_vec.size(); i++) {
EXPECT_EQ(out_vec[i], 1.0);
}
}
TEST(c_broadcast, NPU) {
f::Scope scope;
char* npu_id = getenv("FLAGS_selected_npus");
p::NPUDeviceContext ctx(p::NPUPlace(atoi(npu_id)));
Prepare(&scope, ctx);
TestHCCLBroadcastOp(&scope, ctx);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册