未验证 提交 25bd5ed8 编写于 作者: K Kim Yann 提交者: GitHub

rem cncl (#52434)

上级 dfcba7f4
......@@ -272,7 +272,6 @@ option(WITH_CINN "Compile PaddlePaddle with CINN" OFF)
option(WITH_NCCL "Compile PaddlePaddle with NCCL support" ON)
option(WITH_RCCL "Compile PaddlePaddle with RCCL support" ON)
option(WITH_XPU_BKCL "Compile PaddlePaddle with BAIDU KUNLUN XPU BKCL" OFF)
option(WITH_CNCL "Compile PaddlePaddle with CNCL support" OFF)
option(WITH_CRYPTO "Compile PaddlePaddle with crypto support" ON)
option(WITH_ARM "Compile PaddlePaddle with arm support" OFF)
option(WITH_SW "Compile PaddlePaddle with sw support" OFF)
......
......@@ -43,10 +43,6 @@
#include "xpu/bkcl.h"
#endif
#if defined(PADDLE_WITH_CNCL)
#include <cncl.h>
#endif
namespace phi {
class DenseTensor;
class SelectedRows;
......@@ -199,9 +195,6 @@ using VarTypeRegistry = detail::VarTypeRegistryImpl<
#if defined(PADDLE_WITH_XPU_BKCL)
BKCLUniqueId,
platform::BKCLCommunicator,
#endif
#if defined(PADDLE_WITH_CNCL)
cnclCliqueId,
#endif
std::vector<std::unique_ptr<operators::CUDAGraphWithInOuts>>,
int,
......
......@@ -123,16 +123,7 @@ if(NOT WIN32)
SRCS reducer.cc
DEPS layer)
endif()
if(WITH_CNCL)
cc_library(
cncl_context
SRCS cncl_context.cc
DEPS collective_helper device_context tensor var_type_traits)
cc_library(
reducer
SRCS reducer.cc
DEPS layer)
endif()
if(WITH_NCCL
OR WITH_RCCL
OR WITH_XPU_BKCL)
......@@ -155,8 +146,7 @@ if(WITH_GLOO)
OR (NOT
(WITH_NCCL
OR WITH_RCCL
OR WITH_XPU_BKCL
OR WITH_CNCL)
OR WITH_XPU_BKCL)
))
cc_library(
reducer
......
/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#if defined(PADDLE_WITH_CNCL)
#include "paddle/fluid/imperative/cncl_context.h"
#include "paddle/fluid/framework/convert_utils.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/mlu/cncl_helper.h"
#include "paddle/fluid/platform/device/mlu/mlu_info.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace framework {
class Variable;
} // namespace framework
} // namespace paddle
namespace paddle {
namespace imperative {
static void AllReduce(const phi::DenseTensor &src,
phi::DenseTensor *dst,
const mluStream stream,
const platform::CNCLComm *comm) {
const auto &place = src.place();
PADDLE_ENFORCE_EQ(
platform::is_mlu_place(place),
true,
platform::errors::Unimplemented(
"Imperative mode does not support multi-CPU training yet."));
const void *src_ptr = src.data();
dst->Resize(src.dims());
auto *dst_ptr = dst->mutable_data(src.place(), src.dtype());
auto cncl_dtype =
platform::ToCNCLDataType(framework::TransToProtoVarType(src.dtype()));
PADDLE_ENFORCE_MLU_SUCCESS(cnclAllReduce(src_ptr,
dst_ptr,
src.numel(),
cncl_dtype,
cnclSum,
comm->comm(),
stream));
}
void CNCLParallelContext::BcastCNCLId(
std::vector<cnclCliqueId> &cncl_ids, // NOLINT
int root,
int server_fd) {
if (strategy_.local_rank_ == root) {
std::vector<std::string> other_trainers;
for (auto &ep : strategy_.trainer_endpoints_) {
if (ep != strategy_.current_endpoint_) {
other_trainers.push_back(ep);
}
}
platform::SendBroadCastCommID(other_trainers, &cncl_ids);
} else {
platform::RecvBroadCastCommID(
server_fd, strategy_.current_endpoint_, &cncl_ids);
}
}
void CNCLParallelContext::Init() {
int server_fd = -1;
std::vector<cnclCliqueId> cncl_ids;
cncl_ids.resize(strategy_.nrings_);
if (strategy_.local_rank_ == 0) {
// generate the unique cnclid on the root worker
for (size_t i = 0; i < cncl_ids.size(); ++i) {
PADDLE_ENFORCE_MLU_SUCCESS(cnclGetCliqueId(&cncl_ids[i]));
}
} else {
server_fd = platform::SocketServer::GetInstance(strategy_.current_endpoint_)
.socket();
}
BcastCNCLId(cncl_ids, 0, server_fd);
int mlu_id = place_.device;
for (int ring_id = 0; ring_id < strategy_.nrings_; ++ring_id) {
VLOG(0) << "init cncl context nranks: " << strategy_.nranks_
<< " local rank: " << strategy_.local_rank_ << " mlu id: " << mlu_id
<< " ring id: " << ring_id;
// it will assign cncl_comm in MLUDeviceContext within ring_id
platform::CNCLCommContext::Instance().CreateComm(&cncl_ids[ring_id],
strategy_.nranks_,
strategy_.local_rank_,
mlu_id,
ring_id);
compute_events_.emplace_back(
platform::MluEventResourcePool::Instance().New(place_.device));
comm_events_.emplace_back(
platform::MluEventResourcePool::Instance().New(place_.device));
}
}
void CNCLParallelContext::InitWithRingID(int ring_id) {
int server_fd = -1;
std::vector<cnclCliqueId> cncl_ids;
cncl_ids.resize(1);
if (strategy_.local_rank_ == 0) {
// generate the unique cnclid on the root worker
PADDLE_ENFORCE_MLU_SUCCESS(cnclGetCliqueId(&cncl_ids[0]));
} else {
server_fd = platform::SocketServer::GetInstance(strategy_.current_endpoint_)
.socket();
}
BcastCNCLId(cncl_ids, 0, server_fd);
int mlu_id = place_.device;
VLOG(0) << "init cncl context nranks: " << strategy_.nranks_
<< " local rank: " << strategy_.local_rank_ << " mlu id: " << mlu_id
<< " ring id: " << ring_id;
// it will assign cncl_comm in MLUDeviceContext within ring_id
platform::CNCLCommContext::Instance().CreateComm(
&cncl_ids[0], strategy_.nranks_, strategy_.local_rank_, mlu_id, ring_id);
compute_events_.emplace_back(
platform::MluEventResourcePool::Instance().New(place_.device));
comm_events_.emplace_back(
platform::MluEventResourcePool::Instance().New(place_.device));
}
void CNCLParallelContext::AllReduceByStream(const framework::Variable &src,
framework::Variable *dst,
int ring_id,
bool use_calc_stream) {
PADDLE_ENFORCE_EQ(
platform::is_mlu_place(place_),
true,
platform::errors::Unimplemented(
"Dynamic graph mode does not support multi-CPU training yet."));
auto *dev_ctx = static_cast<platform::MLUDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_));
platform::CNCLComm *comm =
platform::CNCLCommContext::Instance().Get(ring_id, place_);
mluStream stream = (use_calc_stream ? dev_ctx->stream() : comm->stream());
if (src.IsType<phi::DenseTensor>()) {
if (!dst->IsType<phi::DenseTensor>()) {
dst->Clear();
}
AllReduce(src.Get<phi::DenseTensor>(),
dst->GetMutable<phi::DenseTensor>(),
stream,
comm);
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Unsupported variable type %s for imperative allreduce, only "
"LoDTensor is supported.",
platform::demangle(framework::ToTypeName(src.Type()))));
}
}
void CNCLParallelContext::Broadcast(framework::Variable *src, int ring_id) {
VLOG(3) << "/// DEBUG /// start inter broadcast with ring_id: " << ring_id;
phi::DenseTensor *src_tensor = src->GetMutable<phi::DenseTensor>();
const auto &place = src_tensor->place();
platform::CNCLComm *comm =
platform::CNCLCommContext::Instance().Get(ring_id, place);
mluStream stream = comm->stream();
void *src_ptr = src_tensor->data();
auto cncl_dtype = platform::ToCNCLDataType(
framework::TransToProtoVarType(src_tensor->dtype()));
PADDLE_ENFORCE_MLU_SUCCESS(cnclBcast(
src_ptr, src_tensor->numel(), cncl_dtype, 0, comm->comm(), stream));
}
paddle::platform::DeviceContext *CNCLParallelContext::GetDeviceContext(
int ring_id) {
return static_cast<platform::DeviceContext *>(
platform::CNCLCommContext::Instance()
.Get(ring_id, place_)
->dev_context());
}
void CNCLParallelContext::WaitCompute(int ring_id) {
PADDLE_ENFORCE_GE(
ring_id,
0,
platform::errors::OutOfRange("ring id must >= 0, but got %d", ring_id));
PADDLE_ENFORCE_LT(ring_id,
compute_events_.size(),
platform::errors::OutOfRange(
"ring id must < compute events size,"
"but got ring id = %d, compute events size = %d",
ring_id,
compute_events_.size()));
auto compute_stream = static_cast<platform::MLUDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
auto comm_stream =
platform::CNCLCommContext::Instance().Get(ring_id, place_)->stream();
auto event = compute_events_[ring_id].get();
// compute_stream-->event-->comm_stream
PADDLE_ENFORCE_MLU_SUCCESS(cnrtPlaceNotifier(event, compute_stream));
PADDLE_ENFORCE_MLU_SUCCESS(cnrtQueueWaitNotifier(event, comm_stream, 0));
}
void CNCLParallelContext::WaitComm(int ring_id) {
PADDLE_ENFORCE_GE(
ring_id,
0,
platform::errors::OutOfRange("ring id must >= 0, but got %d", ring_id));
PADDLE_ENFORCE_LT(ring_id,
comm_events_.size(),
platform::errors::OutOfRange(
"ring id must < comm events size,"
"but got ring id = %d, comm events size = %d",
ring_id,
comm_events_.size()));
auto compute_stream = static_cast<platform::MLUDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
auto comm_stream =
platform::CNCLCommContext::Instance().Get(ring_id, place_)->stream();
auto event = comm_events_[ring_id].get();
// comm_stream-->event-->compute_stream
PADDLE_ENFORCE_MLU_SUCCESS(cnrtPlaceNotifier(event, comm_stream));
PADDLE_ENFORCE_MLU_SUCCESS(cnrtQueueWaitNotifier(event, compute_stream, 0));
}
void CNCLParallelContext::SynchronizeCompute() {
auto *compute_dev_ctx = static_cast<platform::MLUDeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_));
compute_dev_ctx->Wait();
}
} // namespace imperative
} // namespace paddle
#endif
/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#if defined(PADDLE_WITH_CNCL)
#include <cncl.h>
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/imperative/parallel_context.h"
#include "paddle/fluid/platform/device/mlu/mlu_resource_pool.h"
namespace paddle {
namespace framework {
class Variable;
} // namespace framework
} // namespace paddle
namespace paddle {
namespace imperative {
class CNCLParallelContext : public ParallelContext {
public:
explicit CNCLParallelContext(const ParallelStrategy& strategy,
const platform::Place& place)
: ParallelContext(strategy, place) {}
~CNCLParallelContext() override = default;
void BcastCNCLId(std::vector<cnclCliqueId>& cncl_ids,
int root, // NOLINT
int server_fd);
void Init() override;
void InitWithRingID(int ring_id) override;
void AllReduceByStream(const framework::Variable& src,
framework::Variable* dst,
int ring_id,
bool use_calc_stream) override;
void Broadcast(framework::Variable* src, int ring_id) override;
paddle::platform::DeviceContext* GetDeviceContext(int ring_id) override;
void WaitCompute(int ring_id) override;
void WaitComm(int ring_id) override;
void SynchronizeCompute() override;
private:
// used for comm wait compute, compute_stream-->event-->comm_stream[ring_id]
std::vector<std::shared_ptr<platform::MluEventObject>> compute_events_;
// used for compute wait comm, comm_stream[ring_id]-->event-->compute_stream
std::vector<std::shared_ptr<platform::MluEventObject>> comm_events_;
};
} // namespace imperative
} // namespace paddle
#endif
......@@ -30,8 +30,7 @@ namespace paddle {
namespace imperative {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO) || \
defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)
// div the nranks
void Group::DivNRanks(const platform::DeviceContext &context, int64_t nranks) {
phi::DenseTensor *tensor =
......@@ -229,56 +228,6 @@ void SplitTensorsWithType<platform::XPUDeviceContext>(
}
#endif
#ifdef PADDLE_WITH_CNCL
// context is used to select the stream for concat
template <>
void ConcatTensorsWithType<platform::MLUDeviceContext>(
const platform::MLUDeviceContext &context,
const std::vector<phi::DenseTensor> &dense_tensors_,
framework::Variable *p_dense_contents,
framework::proto::VarType::Type type) {
switch (type) {
case framework::proto::VarType::FP16:
ConcatTensorsForAllReduce<platform::MLUDeviceContext, platform::float16>(
context, dense_tensors_, p_dense_contents);
break;
case framework::proto::VarType::FP32:
ConcatTensorsForAllReduce<platform::MLUDeviceContext, float>(
context, dense_tensors_, p_dense_contents);
break;
default:
PADDLE_THROW(platform::errors::Unimplemented(
"Data type (%s) is not supported when it concats tensors for "
"allreduce.",
framework::DataTypeToString(type)));
}
}
// context is used to select the stream for split
template <>
void SplitTensorsWithType<platform::MLUDeviceContext>(
const platform::MLUDeviceContext &context,
framework::Variable *p_dense_contents,
std::vector<phi::DenseTensor> *p_dense_tensors,
framework::proto::VarType::Type type) {
switch (type) {
case framework::proto::VarType::FP16:
SplitTensorsForAllReduce<platform::MLUDeviceContext, platform::float16>(
context, p_dense_contents, p_dense_tensors);
break;
case framework::proto::VarType::FP32:
SplitTensorsForAllReduce<platform::MLUDeviceContext, float>(
context, p_dense_contents, p_dense_tensors);
break;
default:
PADDLE_THROW(platform::errors::Unimplemented(
"Data type (%s) is not supported when it splits tensors for "
"allreduce.",
framework::DataTypeToString(type)));
}
}
#endif
void Group::ConcatTensors(const platform::DeviceContext &context) {
auto place = context.GetPlace();
if (platform::is_gpu_place(place)) {
......@@ -308,19 +257,6 @@ void Group::ConcatTensors(const platform::DeviceContext &context) {
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat npu grads since it's not compiled with HCCL,"
"Please recompile or reinstall Paddle with HCCL support."));
} else if (platform::is_mlu_place(place)) {
#ifdef PADDLE_WITH_CNCL
ConcatTensorsWithType(
static_cast<const platform::MLUDeviceContext &>(context),
dense_tensors_,
&dense_contents_,
dtype_);
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat mlu grads since it's not compiled with CNCL,"
"Please recompile or reinstall Paddle with CNCL support."));
#endif
} else if (platform::is_cpu_place(place)) {
ConcatTensorsWithType(static_cast<const phi::CPUContext &>(context),
dense_tensors_,
......@@ -361,19 +297,6 @@ void Group::SplitTensors(const platform::DeviceContext &context) {
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split npu grad since it's not compiled with HCCL,"
"Please recompile or reinstall Paddle with HCCL support."));
} else if (platform::is_mlu_place(place)) {
#ifdef PADDLE_WITH_CNCL
SplitTensorsWithType(
static_cast<const platform::MLUDeviceContext &>(context),
&dense_contents_,
&dense_tensors_,
dtype_);
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split mlu grad since it's not compiled with CNCL,"
"Please recompile or reinstall Paddle with CNCL support."));
#endif
} else if (platform::is_cpu_place(place)) {
SplitTensorsWithType(static_cast<const phi::CPUContext &>(context),
&dense_contents_,
......@@ -850,11 +773,6 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) {
PADDLE_ENFORCE_XPU_SUCCESS(xpu_wait(dev_ctx->stream()));
}
}
#elif defined(PADDLE_WITH_CNCL)
if (platform::is_mlu_place(group_tensor.place())) {
// TODO(liuyuhui) support MLU set constant
VLOG(3) << "MLU doesn't support set_constant";
}
#else
auto *dev_ctx = platform::DeviceContextPool::Instance().Get(place_);
if (HasGrad(var_index)) {
......@@ -1116,7 +1034,7 @@ void Reducer::FinalizeBackward() {
if (find_unused_vars_each_step_) {
// TODO(liuyuhui) support xpu about Tensorcopy/TensorFromVector/TensorToVector
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_GLOO) || defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_GLOO)
ProcessUnusedDenseVars();
#endif
// Initialize local used vars
......
......@@ -45,8 +45,7 @@ namespace paddle {
namespace imperative {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO) || \
defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)
template <typename T>
struct DivNRanksFunctor {
......
......@@ -27,7 +27,6 @@ register_operators(
gen_bkcl_id_op
c_gen_nccl_id_op
gen_nccl_id_op
c_gen_cncl_id_op
DEPS
${COLLECTIVE_DEPS})
......@@ -44,11 +43,6 @@ if(WITH_XPU_BKCL)
op_library(gen_bkcl_id_op DEPS ${COLLECTIVE_DEPS})
endif()
if(WITH_CNCL)
set(COLLECTIVE_DEPS ${COLLECTIVE_DEPS} collective_helper)
op_library(c_gen_cncl_id_op DEPS ${COLLECTIVE_DEPS})
endif()
set(OPERATOR_DEPS
${OPERATOR_DEPS} ${COLLECTIVE_DEPS}
PARENT_SCOPE)
......
......@@ -25,7 +25,7 @@ limitations under the License. */
#include "paddle/phi/api/include/tensor.h"
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/collective_helper.h"
#endif
......@@ -39,14 +39,9 @@ limitations under the License. */
#if defined(PADDLE_WITH_GLOO)
#include <gloo/allreduce.h>
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#endif
#if defined(PADDLE_WITH_CNCL)
#include "paddle/fluid/platform/device/mlu/cncl_helper.h"
#endif
namespace paddle {
namespace operators {
......@@ -360,82 +355,6 @@ class CAllReduceOpCUDAKernel : public framework::OpKernel<T> {
template <typename T, typename DeviceContext> \
class op_name##CUDAKernel : public CAllReduceOpCUDAKernel<red_type, T> {};
template <ReduceType red_type, typename T>
class CAllReduceOpMLUKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_CNCL)
auto in = ctx.Input<phi::DenseTensor>("X");
auto out = ctx.Output<phi::DenseTensor>("Out");
if (ctx.HasInput("Cond")) {
auto cond = ctx.Input<phi::DenseTensor>("Cond");
auto place = cond->place();
PADDLE_ENFORCE_EQ(platform::is_cpu_place(place),
true,
platform::errors::PreconditionNotMet(
"The input `cond` tensor should be on cpu place"));
PADDLE_ENFORCE_EQ(cond->numel(),
1,
platform::errors::PreconditionNotMet(
"The input `cond` should be shape [1]"));
if (!cond->data<bool>()[0]) {
VLOG(4) << "Skip all reduce Op since cond is 0";
return;
}
}
auto place = ctx.GetPlace();
cnclDataType_t dtype =
platform::ToCNCLDataType(framework::TransToProtoVarType(in->dtype()));
int64_t numel = in->numel();
const void* sendbuff = in->data<T>();
out->Resize(in->dims());
void* recvbuff = out->mutable_data<T>(place);
int rid = ctx.Attr<int>("ring_id");
auto comm = platform::CNCLCommContext::Instance().Get(rid, place);
mluStream stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::MLUDeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
cnclReduceOp_t cncl_red_type = cnclSum;
switch (red_type) {
case kRedSum:
cncl_red_type = cnclSum;
break;
case kRedMax:
cncl_red_type = cnclMax;
break;
case kRedMin:
cncl_red_type = cnclMin;
break;
case kRedProd:
cncl_red_type = cnclProd;
break;
default:
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid reduce type: %d", red_type));
}
PADDLE_ENFORCE_MLU_SUCCESS(cnclAllReduce(
sendbuff, recvbuff, numel, dtype, cncl_red_type, comm->comm(), stream));
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with MLU."));
#endif
}
};
class CAllReduceOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() {
......
......@@ -20,15 +20,12 @@ limitations under the License. */
#if defined(PADDLE_WITH_XPU_BKCL)
#include "xpu/bkcl.h"
#endif
#if defined(PADDLE_WITH_CNCL)
#include <cncl.h>
#endif
#include <string>
#include "paddle/fluid/framework/op_registry.h"
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/collective_helper.h"
#endif
......@@ -58,9 +55,6 @@ class CCommInitOp : public framework::OperatorBase {
#elif defined(PADDLE_WITH_XPU_BKCL)
using UniqueId = BKCLUniqueId;
using CommContext = platform::BKCLCommContext;
#elif defined(PADDLE_WITH_CNCL)
using UniqueId = cnclCliqueId;
using CommContext = platform::CNCLCommContext;
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should be compiled with GPU or XPU or MLU."));
......@@ -74,7 +68,7 @@ class CCommInitOp : public framework::OperatorBase {
"CCommInitOp can run on gpu or xpu or mlu place only."));
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_XPU_BKCL)
auto var = scope.FindVar(Input("X"));
PADDLE_ENFORCE_NOT_NULL(
var, platform::errors::InvalidArgument("Input con not be empty."));
......
/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <cncl.h>
#include <string>
#include "paddle/fluid/framework/op_proto_maker.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/var_type_traits.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace operators {
static void GenCNCLID(std::vector<cnclCliqueId>* cncl_ids) {
for (size_t i = 0; i < cncl_ids->size(); ++i) {
PADDLE_ENFORCE_MLU_SUCCESS(cnclGetCliqueId(&(*cncl_ids)[i]));
}
}
static void CopyCNCLIDToVar(const std::vector<cnclCliqueId>& cncl_ids,
std::function<std::string(size_t)> func,
const framework::Scope& scope) {
for (size_t i = 0; i < cncl_ids.size(); ++i) {
std::string var_name = func(i);
auto var = scope.FindVar(var_name);
PADDLE_ENFORCE_NOT_NULL(
var,
platform::errors::NotFound("Variable with name %s is not found",
var_name.c_str()));
auto cncl_id = var->GetMutable<cnclCliqueId>();
memcpy(cncl_id, &cncl_ids[i], sizeof(cnclCliqueId));
}
}
class CGenCNCLIdOp : public framework::OperatorBase {
public:
CGenCNCLIdOp(const std::string& type,
const framework::VariableNameMap& inputs,
const framework::VariableNameMap& outputs,
const framework::AttributeMap& attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
int rank = Attr<int>("rank");
int ring_id = Attr<int>("ring_id");
std::function<std::string(size_t)> func = [&](size_t i) -> std::string {
return Output("Out");
};
std::string endpoint = Attr<std::string>("endpoint");
int server_fd = platform::SocketServer::GetInstance(endpoint).socket();
std::vector<cnclCliqueId> cncl_ids;
cncl_ids.resize(1);
if (rank == 0) {
GenCNCLID(&cncl_ids);
std::vector<std::string> endpoint_list =
Attr<std::vector<std::string>>("other_endpoints");
platform::SendBroadCastCommID(endpoint_list, &cncl_ids, ring_id);
} else {
platform::RecvBroadCastCommID(server_fd, endpoint, &cncl_ids, ring_id);
}
CopyCNCLIDToVar(cncl_ids, func, scope);
}
};
class CGenCNCLIdOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddOutput("Out", "Raw variable contains a CNCL CliqueId instaces.");
AddComment(R"DOC(
CGenCNCLId operator
For trainer 0: generate a new CliqueId and send it to all the other trainers.
For trainer 1~n: start a gRPC server to get the CliqueId, once got, stop the server.
)DOC");
AddAttr<std::string>("endpoint",
"(string), e.g. 127.0.0.1:6175 "
"current listen endpoint");
AddAttr<std::vector<std::string>>(
"other_endpoints",
"['trainer1_ip:port', 'trainer2_ip:port', ...] "
"list of other trainer endpoints")
.SetDefault({});
AddAttr<int>("rank",
"(int default 0) "
"The rank of the trainer in distributed training.")
.SetDefault(0);
AddAttr<int>("ring_id", "(int default 0) user specified ring id")
.SetDefault(0);
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(c_gen_cncl_id, ops::CGenCNCLIdOp, ops::CGenCNCLIdOpMaker);
......@@ -44,10 +44,6 @@ limitations under the License. */
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#endif
#if defined(PADDLE_WITH_CNCL)
#include "paddle/fluid/platform/device/mlu/cncl_helper.h"
#endif
namespace paddle {
namespace operators {
......@@ -286,73 +282,6 @@ class CReduceOpCUDAKernel : public framework::OpKernel<T> {
template <typename T, typename DeviceContext> \
class op_name##CUDAKernel : public CReduceOpCUDAKernel<red_type, T> {};
template <ReduceType red_type, typename T>
class CReduceOpMLUKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_CNCL)
auto in = ctx.Input<phi::DenseTensor>("X");
auto out = ctx.Output<phi::DenseTensor>("Out");
auto place = ctx.GetPlace();
cnclDataType_t dtype =
platform::ToCNCLDataType(framework::TransToProtoVarType(in->dtype()));
int64_t numel = in->numel();
const void* sendbuff = in->data();
out->Resize(in->dims());
void* recvbuff = out->mutable_data<T>(place);
int rid = ctx.Attr<int>("ring_id");
int root = ctx.Attr<int>("root_id");
auto comm = paddle::platform::CNCLCommContext::Instance().Get(rid, place);
mluStream stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::MLUDeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
cnclReduceOp_t cncl_red_type = cnclSum;
switch (red_type) {
case kRedSum:
cncl_red_type = cnclSum;
break;
case kRedMax:
cncl_red_type = cnclMax;
break;
case kRedMin:
cncl_red_type = cnclMin;
break;
case kRedProd:
cncl_red_type = cnclProd;
break;
default:
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid reduce type: %d", red_type));
}
PADDLE_ENFORCE_MLU_SUCCESS(cnclReduce(sendbuff,
recvbuff,
numel,
dtype,
cncl_red_type,
root,
comm->comm(),
stream));
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with MLU."));
#endif
}
};
class CReduceOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() {
......
......@@ -47,16 +47,6 @@ class CSyncCalcStreamKernel : public framework::OpKernel<T> {
platform::GpuStreamSync(dev_ctx->stream());
#elif defined(PADDLE_WITH_CNCL)
auto place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_mlu_place(place),
true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on mlu place only for now."));
auto dev_ctx = static_cast<platform::MLUDeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
platform::MLUStreamSync(dev_ctx->stream());
#elif defined(PADDLE_WITH_XPU_BKCL)
auto place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_xpu_place(place),
......
......@@ -22,10 +22,6 @@ limitations under the License. */
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
#if defined(PADDLE_WITH_CNCL)
#include "paddle/fluid/platform/device/mlu/cncl_helper.h"
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/collective_helper.h"
#endif
......@@ -45,16 +41,6 @@ class CSyncCommStreamKernel : public framework::OpKernel<T> {
platform::GpuStreamSync(stream);
#elif defined(PADDLE_WITH_CNCL)
auto place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_mlu_place(place),
true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on mlu place only for now."));
int ring_id = ctx.Attr<int>("ring_id");
auto stream =
platform::CNCLCommContext::Instance().Get(ring_id, place)->stream();
platform::MLUStreamSync(stream);
#elif defined(PADDLE_WITH_XPU_BKCL)
auto place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_xpu_place(place),
......
......@@ -139,10 +139,6 @@ cc_library(
SRCS collective_helper.cc gen_comm_id_helper.cc
DEPS framework_proto device_context enforce)
if(WITH_CNCL)
target_link_libraries(collective_helper mlu_collective_helper)
endif()
if(WITH_GPU OR WITH_ROCM)
target_link_libraries(device_context gpu_resource_pool)
endif()
......
......@@ -23,9 +23,6 @@
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/utils/variant.h"
#if defined(PADDLE_WITH_CNCL)
#include "paddle/fluid/platform/device/mlu/device_context.h"
#endif
namespace paddle {
namespace platform {
......@@ -246,108 +243,5 @@ class BKCLCommContext {
DISABLE_COPY_AND_ASSIGN(BKCLCommContext);
};
#endif
#if defined(PADDLE_WITH_CNCL)
// In order to apply hierarchical communication with CNCL, we need
// a communication ring contains CNCL communicators associated to a global
// cnclUniqueId. E.g. for a hierarchical case,
//
// 11 - 12 21 - 22
// | | | |
// 13 - 14 - 23 - 24
// | |
// 31 - 32 - 41 - 42
// | | | |
// 33 - 34 43 - 44
//
// we group (14,23,32,41) as the top, and (11,12,13,14), (21,22,23,24),
// (31,32,33,34), (41,42,43,44) as bottoms respectively.
//
// We could also use a single communication ring for the flatten case
//
// The CNCLComm instance is created and reversed in the CNCLCommContext
// singleton with a global user specified group id.
class MLUDeviceContext;
class CNCLComm {
public:
virtual int ring_id() const = 0;
virtual int nranks() const = 0;
virtual int rank() const = 0;
virtual int device_id() const = 0;
virtual cnclComm_t comm() const = 0;
virtual mluStream stream() const = 0;
virtual MLUDeviceContext* dev_context() const = 0;
virtual ~CNCLComm() = default;
};
// A singleton CNCL communicator context reserves communication ring ids
class CNCLCommContext {
public:
static CNCLCommContext& Instance() {
static CNCLCommContext comm_ctx;
return comm_ctx;
}
CNCLComm* CreateComm(
cnclCliqueId* cncl_id, int nranks, int rank, int dev_id, int ring_id = 0);
void CreateAllCNCLComms(const std::vector<int>& dev_ids, int ring_id = 0);
// a latter comm with the same dev_id and the same ring_id
// will override the former
CNCLComm* AssignCNCLComm(
cnclComm_t comm, int nranks, int rank, int dev_id, int ring_id = 0);
// retrieve a communicator by the ring id in multiprocessing mode
CNCLComm* Get(int ring_id) const {
PADDLE_ENFORCE_GT(
comm_map_.count(ring_id),
0,
platform::errors::InvalidArgument(
"Communicator in ring id %d has not been initialized.", ring_id));
PADDLE_ENFORCE_EQ(comm_map_.at(ring_id).size(),
1,
platform::errors::InvalidArgument(
"One device id should be specified to retrieve from "
"multiple communicators."));
return comm_map_.at(ring_id).begin()->second.get();
}
// retrieve a communicator by the ring id and the device id
CNCLComm* Get(int ring_id, int dev_id) const {
PADDLE_ENFORCE_GT(
comm_map_.count(ring_id),
0,
platform::errors::InvalidArgument(
"Communicator of ring id %d has not been initialized.", ring_id));
PADDLE_ENFORCE_GT(
comm_map_.at(ring_id).count(dev_id),
0,
platform::errors::InvalidArgument(
"Communicator at device id %d has not been initialized in ring %d.",
dev_id,
ring_id));
return comm_map_.at(ring_id).at(dev_id).get();
}
// retrieve a communicator by the ring id and place
CNCLComm* Get(int ring_id, Place place) const {
return Get(ring_id, place.device);
}
private:
std::once_flag once_flag_;
std::mutex comm_map_mutex_;
// ring id to dev-CNCLComm
std::map<int, std::map<int, std::unique_ptr<CNCLComm>>> comm_map_;
void ReleaseCNCLComms();
CNCLCommContext() = default;
DISABLE_COPY_AND_ASSIGN(CNCLCommContext);
};
#endif
} // namespace platform
} // namespace paddle
......@@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#include <arpa/inet.h>
......@@ -34,10 +34,6 @@ limitations under the License. */
#include "xpu/bkcl.h"
#endif
#if defined(PADDLE_WITH_CNCL)
#include <cncl.h>
#endif
DECLARE_int32(get_host_by_name_time);
namespace paddle {
......@@ -448,10 +444,6 @@ INSTANT_TEMPLATE(ncclUniqueId)
#ifdef PADDLE_WITH_XPU_BKCL
INSTANT_TEMPLATE(BKCLUniqueId)
#endif
#ifdef PADDLE_WITH_CNCL
INSTANT_TEMPLATE(cnclCliqueId)
#endif
} // namespace platform
} // namespace paddle
......
......@@ -15,7 +15,7 @@ limitations under the License. */
#pragma once
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_XPU_BKCL)
#include <functional>
#include <memory>
#include <mutex>
......
......@@ -91,11 +91,6 @@ if(WITH_XPU_BKCL)
set(PYBIND_DEPS ${PYBIND_DEPS} heter_ccl_context)
endif()
if(WITH_CNCL)
set(PYBIND_DEPS ${PYBIND_DEPS} reducer)
set(PYBIND_DEPS ${PYBIND_DEPS} cncl_context)
endif()
if(NOT WIN32)
set(PYBIND_DEPS ${PYBIND_DEPS} data_loader)
if(WITH_NCCL OR WITH_RCCL)
......@@ -259,10 +254,6 @@ if(WITH_PYTHON)
list(APPEND OP_FUNCTION_GENERETOR_DEPS bkcl_context)
endif()
if(WITH_CNCL)
list(APPEND OP_FUNCTION_GENERETOR_DEPS cncl_context)
endif()
if(NOT ((NOT WITH_PYTHON) AND ON_INFER))
list(APPEND OP_FUNCTION_GENERETOR_DEPS ${PYTHON_LIBRARIES})
endif()
......
......@@ -41,7 +41,6 @@ limitations under the License. */
#include "paddle/fluid/imperative/amp_auto_cast.h"
#include "paddle/fluid/imperative/basic_engine.h"
#include "paddle/fluid/imperative/bkcl_context.h"
#include "paddle/fluid/imperative/cncl_context.h"
#include "paddle/fluid/imperative/data_loader.h"
#include "paddle/fluid/imperative/gloo_context.h"
#include "paddle/fluid/imperative/hccl_context.h"
......@@ -2491,8 +2490,7 @@ void BindImperative(py::module *m_ptr) {
py::call_guard<py::gil_scoped_release>());
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO) || \
defined(PADDLE_WITH_CNCL)
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO)
py::class_<imperative::ParallelContext,
std::shared_ptr<imperative::ParallelContext>>(m,
"ParallelContext");
......
......@@ -2759,7 +2759,6 @@ class Operator:
'c_wait_comm',
'c_wait_compute',
'copy_cross_scope',
'c_gen_cncl_id',
}
def __init__(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册