未验证 提交 040f8aa5 编写于 作者: R ronnywang 提交者: GitHub

[CustomDevice] add pipeline parallel support (#53220)

* [CustomDevice] add pipeline parallel support

* update

* update
上级 993bc412
......@@ -576,6 +576,150 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Broadcast(
false);
}
void CheckTensorsInDifferentCustomDevices(
const std::vector<phi::DenseTensor>& tensors, const size_t num_devices) {
PADDLE_ENFORCE_EQ(
tensors.size() == 0,
false,
phi::errors::InvalidArgument("Tensor list must be nonempty."));
PADDLE_ENFORCE_LE(
tensors.size(),
num_devices,
phi::errors::InvalidArgument("Tensor list mustn't be larger than the "
"number of available CustomDevice."));
std::set<Place> used_devices;
for (const auto& t : tensors) {
PADDLE_ENFORCE_EQ(platform::is_custom_place(t.place()),
true,
phi::errors::InvalidArgument(
"Tensors must be CustomDevice and dense tensor."));
const auto inserted = used_devices.insert(t.place()).second;
PADDLE_ENFORCE_EQ(inserted,
true,
phi::errors::InvalidArgument(
"Tensors must be on distinct custom devices."));
}
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Recv(
phi::DenseTensor* tensor,
int src_rank,
int64_t offset,
int64_t numel,
bool sync_op,
bool use_calc_stream) {
// numel > 0 indicates the tensor need to be sliced
phi::DenseTensor partial_tensor;
if (numel > 0) {
partial_tensor = GetPartialTensor(*tensor, offset, numel);
tensor = &partial_tensor;
}
phi::distributed::CommStaticCheck::CheckShape(
*tensor, rank_, size_, phi::AllocationType::CUSTOM);
std::vector<phi::DenseTensor> in_wrapper{*tensor};
std::vector<phi::DenseTensor> out_wrapper{*tensor};
return Collective(
in_wrapper,
out_wrapper,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
phi::ccl::CCLComm comm,
const phi::stream::Stream& stream) {
phi::DeviceManager::CCLRecv(device_type_,
output.data(),
output.numel(),
phi::ccl::ToCCLDataType(output.dtype()),
src_rank,
comm,
stream);
},
CommType::RECV,
sync_op,
use_calc_stream);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Recv(
std::vector<phi::DenseTensor>& tensors, int src_rank) {
CheckTensorsInDifferentCustomDevices(tensors, static_cast<size_t>(GetSize()));
return Collective(
tensors,
tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
phi::ccl::CCLComm comm,
const phi::stream::Stream& stream) {
phi::DeviceManager::CCLRecv(device_type_,
output.data(),
output.numel(),
phi::ccl::ToCCLDataType(output.dtype()),
src_rank,
comm,
stream);
},
CommType::RECV,
false,
false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Send(
const phi::DenseTensor& tensor,
int dst_rank,
int64_t offset,
int64_t numel,
bool sync_op,
bool use_calc_stream) {
// numel > 0 indicates the tensor need to be sliced
const phi::DenseTensor& tensor_maybe_partial =
numel > 0 ? GetPartialTensor(tensor, offset, numel) : tensor;
phi::distributed::CommStaticCheck::CheckShape(
tensor_maybe_partial, rank_, size_, phi::AllocationType::CUSTOM);
std::vector<phi::DenseTensor> in_wrapper{tensor_maybe_partial};
std::vector<phi::DenseTensor> out_wrapper{tensor_maybe_partial};
return Collective(
in_wrapper,
out_wrapper,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
phi::ccl::CCLComm comm,
const phi::stream::Stream& stream) {
phi::DeviceManager::CCLSend(device_type_,
input.data(),
input.numel(),
phi::ccl::ToCCLDataType(input.dtype()),
dst_rank,
comm,
stream);
},
CommType::SEND,
sync_op,
use_calc_stream);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Send(
std::vector<phi::DenseTensor>& tensors, int dst_rank) {
CheckTensorsInDifferentCustomDevices(tensors, static_cast<size_t>(GetSize()));
return Collective(
tensors,
tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
phi::ccl::CCLComm comm,
const phi::stream::Stream& stream) {
phi::DeviceManager::CCLSend(device_type_,
input.data(),
input.numel(),
phi::ccl::ToCCLDataType(input.dtype()),
dst_rank,
comm,
stream);
},
CommType::SEND,
false,
false);
}
std::shared_ptr<ProcessGroupCustom>
ProcessGroupCustom::CreateProcessGroupCustom(
const std::shared_ptr<phi::distributed::Store>& store,
......
......@@ -143,6 +143,26 @@ class ProcessGroupCustom : public ProcessGroupWithStream {
const BroadcastOptions& opts,
bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> Send(const phi::DenseTensor& tensor,
int dst_rank,
int64_t offset,
int64_t numel,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& tensors, int dst_rank) override;
std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor,
int src_rank,
int64_t offset,
int64_t numel,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors, int src_rank) override;
protected:
virtual std::shared_ptr<ProcessGroupCustom::CustomTask> CreateTask(
std::vector<Place> places,
......
......@@ -12,6 +12,11 @@ set(EAGER_GENERETOR_DEPS
imperative_profiler
imperative_flag)
if(WITH_CUSTOM_DEVICE)
set(EAGER_GENERETOR_DEPS ${EAGER_GENERETOR_DEPS}
custom_device_common_op_registry)
endif()
add_executable(eager_generator eager_generator.cc)
target_link_libraries(eager_generator ${EAGER_GENERETOR_DEPS})
......
......@@ -23,6 +23,7 @@
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/custom_device_common_op_registry.h"
#include "paddle/fluid/pybind/eager_generator.h"
#include "paddle/fluid/pybind/pybind.h"
#include "paddle/fluid/string/string_helper.h"
......@@ -3306,6 +3307,11 @@ int main(int argc, char* argv[]) {
std::cerr << "argc must be 3" << std::endl;
return -1;
}
#ifdef PADDLE_WITH_CUSTOM_DEVICE
// We need a fake device to trigger the registration of the common kernel and
// generate api
paddle::operators::RegisterCustomDeviceCommonKernel("fake_device");
#endif
std::string eager_root = argv[1];
int split_count = atoi(argv[2]);
......
......@@ -463,7 +463,7 @@ void Tensor::CopyToCpuImpl(T *data,
t_data,
ele_num * sizeof(T),
dev_ctx->stream());
// TODO(wangran16): sync_stream
dev_ctx->GetStream()->Synchronize();
#else
PADDLE_THROW(paddle::platform::errors::InvalidArgument(
"The analysis predictor supports CPU, GPU, NPU and XPU now."));
......
......@@ -486,6 +486,17 @@ class CSoftmaxWithCrossEntropyGradCustomDeviceKernel
}
};
template <typename DeviceContext, typename T>
class CSyncCalcStreamCustomDeviceKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto place = ctx.GetPlace();
auto dev_ctx = static_cast<DeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
dev_ctx->GetStream()->Synchronize();
}
};
template <typename Context>
void FeedDenseTensorKernel(const Context& dev_ctx,
const phi::ExtendedTensor& x,
......@@ -604,7 +615,27 @@ void RegisterCustomDeviceCommonKernel(const std::string& dev_type) {
paddle::operators::CIdentityOpKernel<
paddle::platform::float16,
paddle::platform::CustomDeviceContext>) {}
REGISTER_OP_CUSTOM_DEVICE_KERNEL(
c_sync_calc_stream,
device_type,
paddle::operators::CSyncCalcStreamCustomDeviceKernel<
paddle::platform::CustomDeviceContext,
int16_t>,
paddle::operators::CSyncCalcStreamCustomDeviceKernel<
paddle::platform::CustomDeviceContext,
int32_t>,
paddle::operators::CSyncCalcStreamCustomDeviceKernel<
paddle::platform::CustomDeviceContext,
int64_t>,
paddle::operators::CSyncCalcStreamCustomDeviceKernel<
paddle::platform::CustomDeviceContext,
float>,
paddle::operators::CSyncCalcStreamCustomDeviceKernel<
paddle::platform::CustomDeviceContext,
double>,
paddle::operators::CSyncCalcStreamCustomDeviceKernel<
paddle::platform::CustomDeviceContext,
paddle::platform::float16>) {}
#endif
}
......
......@@ -258,6 +258,11 @@ if(WITH_PYTHON)
list(APPEND OP_FUNCTION_GENERETOR_DEPS ${PYTHON_LIBRARIES})
endif()
if(WITH_CUSTOM_DEVICE)
set(OP_FUNCTION_GENERETOR_DEPS ${OP_FUNCTION_GENERETOR_DEPS}
custom_device_common_op_registry)
endif()
add_executable(eager_legacy_op_function_generator
eager_legacy_op_function_generator.cc)
target_link_libraries(eager_legacy_op_function_generator
......
......@@ -26,6 +26,7 @@
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/custom_device_common_op_registry.h"
#include "paddle/fluid/pybind/eager_generator.h"
#include "paddle/fluid/pybind/pybind.h"
#include "paddle/fluid/string/string_helper.h"
......@@ -481,6 +482,11 @@ int main(int argc, char* argv[]) {
std::cerr << "argc must be 2" << std::endl;
return -1;
}
#ifdef PADDLE_WITH_CUSTOM_DEVICE
// We need a fake device to trigger the registration of the common kernel and
// generate api
paddle::operators::RegisterCustomDeviceCommonKernel("fake_device");
#endif
std::vector<std::string> headers{
"<Python.h>",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册