diff --git a/paddle/fluid/operators/collective/CMakeLists.txt b/paddle/fluid/operators/collective/CMakeLists.txt index 6df7bd3df563981030081ad47dc658f8107d811c..3a220a485281c7586e4d76ff0d2747db8b40beaa 100644 --- a/paddle/fluid/operators/collective/CMakeLists.txt +++ b/paddle/fluid/operators/collective/CMakeLists.txt @@ -53,4 +53,8 @@ if(WITH_ASCEND_CL) DEPS send_v2_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM}) cc_test(recv_v2_op_npu_test SRCS recv_v2_op_npu_test.cc DEPS recv_v2_op ${COLLECTIVE_DEPS} ${COMMON_TEST_DEPS_FOR_HCOM}) + cc_test(c_sync_comm_stream_op_npu_test SRCS c_sync_comm_stream_op_npu_test.cc + DEPS op_registry c_broadcast_op c_comm_init_hcom_op c_sync_comm_stream_op ${COLLECTIVE_DEPS} ascend_hccl dynamic_loader dynload_warpctc scope device_context enforce executor) + cc_test(c_sync_calc_stream_op_npu_test SRCS c_sync_calc_stream_op_npu_test.cc + DEPS op_registry elementwise_add_op c_sync_calc_stream_op ${COLLECTIVE_DEPS} ascend_hccl dynamic_loader dynload_warpctc scope device_context enforce executor) endif() diff --git a/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc b/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc index bdffe96acd75d9ef204ec6885af1b578335c07e6..b54596e3b5f1d2ee675b3234df3305529f08fc2a 100644 --- a/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc +++ b/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc @@ -34,13 +34,23 @@ class CSyncCalcStreamOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { + +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) PADDLE_ENFORCE_EQ(is_gpu_place(place), true, platform::errors::PreconditionNotMet( "Sync stream op can run on gpu place only for now.")); -#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + auto dev_ctx = static_cast( platform::DeviceContextPool::Instance().Get(place)); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(dev_ctx->stream())); +#elif defined(PADDLE_WITH_ASCEND_CL) && !defined(_WIN32) + PADDLE_ENFORCE_EQ(is_npu_place(place), true, + platform::errors::PreconditionNotMet( + "Sync stream op can run on npu place only for now.")); + + auto dev_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(place)); + PADDLE_ENFORCE_NPU_SUCCESS(aclrtSynchronizeStream(dev_ctx->stream())); #else PADDLE_THROW(platform::errors::PreconditionNotMet( "PaddlePaddle should compile with GPU.")); diff --git a/paddle/fluid/operators/collective/c_sync_calc_stream_op_npu_test.cc b/paddle/fluid/operators/collective/c_sync_calc_stream_op_npu_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..4b1f7bb340178748d302f9ec5a5c987a25dae2e3 --- /dev/null +++ b/paddle/fluid/operators/collective/c_sync_calc_stream_op_npu_test.cc @@ -0,0 +1,107 @@ +/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#ifndef _WIN32 +#include +#endif + +#include + +#include +#include // NOLINT +#include + +#include "gtest/gtest.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/string/printf.h" + +namespace f = paddle::framework; +namespace p = paddle::platform; +namespace m = paddle::operators::math; + +USE_OP(elementwise_add); +USE_OP_DEVICE_KERNEL(elementwise_add, NPU); +USE_NO_KERNEL_OP(c_sync_calc_stream); + +template +void Compare(f::Scope* scope, const p::DeviceContext& ctx) { + // init + auto x = scope->Var("X"); + auto tensor_x = x->GetMutable(); + + auto y = scope->Var("Y"); + auto tensor_y = y->GetMutable(); + + std::vector init_x; + for (int64_t i = 0; i < 10 * 10; ++i) { + init_x.push_back(static_cast(1.0)); + } + + std::vector init_y; + for (int64_t i = 0; i < 10 * 10; ++i) { + init_y.push_back(static_cast(2.0)); + } + + TensorFromVector(init_x, ctx, tensor_x); + tensor_x->Resize({10, 10}); + TensorFromVector(init_y, ctx, tensor_y); + tensor_y->Resize({10, 10}); + + f::AttributeMap attrs; + auto place = ctx.GetPlace(); + auto out = scope->Var("Out"); + auto tensor_out = out->GetMutable(); + + // sync data + auto sync_op0 = f::OpRegistry::CreateOp("c_sync_calc_stream", {{"X", {"X"}}}, + {{"Out", {"Out"}}}, attrs); + sync_op0->Run(*scope, place); + + // run + + auto op = + f::OpRegistry::CreateOp("elementwise_add", {{"X", {"X"}}, {"Y", {"Y"}}}, + {{"Out", {"Out"}}}, attrs); + + op->Run(*scope, place); + + // sync op run + auto sync_op = f::OpRegistry::CreateOp("c_sync_calc_stream", {{"X", {"X"}}}, + {{"Out", {"Out"}}}, attrs); + sync_op->Run(*scope, place); + + std::vector out_vec; + TensorToVector(*tensor_out, ctx, &out_vec); + + // sync op copy + auto sync_op2 = f::OpRegistry::CreateOp("c_sync_calc_stream", {{"X", {"X"}}}, + {{"Out", {"Out"}}}, attrs); + sync_op2->Run(*scope, place); + + float expected = 3.0; + + EXPECT_EQ(out_vec.size(), init_x.size()); + for (uint32_t i = 0; i < out_vec.size(); i++) { + EXPECT_EQ(out_vec[i], static_cast(expected)); + } +} + +TEST(c_sync_calc_stream, NPU_fp32) { + f::Scope scope; + p::NPUDeviceContext ctx(p::NPUPlace(0)); + Compare(&scope, ctx); +} diff --git a/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc b/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc index ad9884565b638cc22de00d28d1e0b87bed9965d7..cf96edea9f4b105dd8ab2838bcc852c56715af3b 100644 --- a/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc +++ b/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc @@ -24,6 +24,11 @@ class Scope; #include "paddle/fluid/platform/collective_helper.h" #endif +#if defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/hccl_helper.h" +#endif + namespace paddle { namespace operators { @@ -37,22 +42,34 @@ class CSyncCommStreamOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { + +#if defined(PADDLE_WITH_NCCL) PADDLE_ENFORCE_EQ(is_gpu_place(place), true, platform::errors::PreconditionNotMet( "Sync stream op can run on gpu place only for now.")); -#if defined(PADDLE_WITH_NCCL) int ring_id = Attr("ring_id"); auto stream = platform::NCCLCommContext::Instance().Get(ring_id, place)->stream(); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); + +#elif defined(PADDLE_WITH_ASCEND_CL) + PADDLE_ENFORCE_EQ(is_npu_place(place), true, + platform::errors::PreconditionNotMet( + "Sync stream op can run on npu place only for now.")); + int ring_id = Attr("ring_id"); + auto stream = + platform::HCCLCommContext::Instance().Get(ring_id, place)->stream(); + PADDLE_ENFORCE_NPU_SUCCESS(aclrtSynchronizeStream(stream)); #else PADDLE_THROW(platform::errors::PreconditionNotMet( - "PaddlePaddle should compile with GPU.")); + "PaddlePaddle should compile with GPU or NPU.")); #endif + } }; + class CSyncCommStreamOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() { diff --git a/paddle/fluid/operators/collective/c_sync_comm_stream_op_npu_test.cc b/paddle/fluid/operators/collective/c_sync_comm_stream_op_npu_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..371bcc4cfcfef427ed0efc09a3ba808d74b01577 --- /dev/null +++ b/paddle/fluid/operators/collective/c_sync_comm_stream_op_npu_test.cc @@ -0,0 +1,131 @@ +/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#ifndef _WIN32 +#include +#endif + +#include + +#include +#include // NOLINT +#include + +#include "gtest/gtest.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/operators/dropout_op.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/string/printf.h" + +#if defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/hccl_helper.h" +#endif + +namespace f = paddle::framework; +namespace p = paddle::platform; +namespace m = paddle::operators::math; + +USE_OP(c_broadcast); +USE_NO_KERNEL_OP(c_sync_comm_stream); +USE_NO_KERNEL_OP(c_comm_init_hcom); +USE_OP_DEVICE_KERNEL(c_broadcast, NPU); + +void Prepare(f::Scope* scope, const p::DeviceContext& ctx) { + int rank_id = atoi(getenv("RANK_ID")); + int device_id = atoi(getenv("DEVICE_ID")); + + printf("rank_id = %d, device_id = %d\n", rank_id, device_id); + + std::vector rank_ids{0, 1}; + f::AttributeMap comm_init_attrs; + comm_init_attrs["ring_id"] = 0; + comm_init_attrs["nranks"] = 2; + comm_init_attrs["rank"] = rank_id; + comm_init_attrs["device_id"] = device_id; + comm_init_attrs["rank_ids"] = rank_ids; + auto comm_init_op = + f::OpRegistry::CreateOp("c_comm_init_hcom", {}, {}, comm_init_attrs); + auto place = ctx.GetPlace(); + comm_init_op->Run(*scope, place); + ctx.Wait(); +} + +void TestHCCLBroadcastOp(f::Scope* scope, const p::DeviceContext& ctx) { + std::cout << "BEGIN TEST:" << __FUNCTION__ << std::endl; + // init + auto x = scope->Var("X"); + auto tensor_x = x->GetMutable(); + int num = 2; + std::vector init; + int rank_id = atoi(getenv("RANK_ID")); + std::cout << "rank_id:" << rank_id << std::endl; + for (int64_t i = 0; i < num * num; ++i) { + init.push_back(1.0 + rank_id); + std::cout << init[0]; + } + std::cout << std::endl; + + TensorFromVector(init, ctx, tensor_x); + tensor_x->Resize({num, num}); + + ctx.Wait(); + + auto place = ctx.GetPlace(); + auto out = scope->Var("Out"); + auto tensor_out = out->GetMutable(); + tensor_out->Resize({num, num}); + tensor_out->mutable_data(place); // allocate + + ctx.Wait(); + + // run + f::AttributeMap attrs; + attrs["tag"] = std::string("tagx"); + attrs["root"] = 0; + attrs["ring_id"] = 0; + + auto op = f::OpRegistry::CreateOp("c_broadcast", {{"X", {"X"}}}, + {{"Out", {"Out"}}}, attrs); + + op->Run(*scope, place); + + // comm sync + + auto sync_op = f::OpRegistry::CreateOp("c_sync_comm_stream", {{"X", {"X"}}}, + {{"Out", {"Out"}}}, attrs); + sync_op->Run(*scope, place); + + // ctx.Wait(); + + std::vector out_vec; + TensorToVector(*tensor_out, ctx, &out_vec); + + EXPECT_EQ(out_vec.size(), init.size()); + for (uint32_t i = 0; i < out_vec.size(); i++) { + EXPECT_EQ(out_vec[i], 1.0); + } +} + +TEST(c_broadcast, NPU) { + f::Scope scope; + char* npu_id = getenv("FLAGS_selected_npus"); + + p::NPUDeviceContext ctx(p::NPUPlace(atoi(npu_id))); + + Prepare(&scope, ctx); + TestHCCLBroadcastOp(&scope, ctx); +}