未验证 提交 4427df37 编写于 作者: L liuyuhui 提交者: GitHub

[Kunlun] PR2: Support MultiDevicePass and BKCL in parallel executor (#29574)

上级 0b74428d
......@@ -29,7 +29,7 @@ include(generic) # simplify cmake module
find_package(CUDA QUIET)
option(WITH_GPU "Compile PaddlePaddle with NVIDIA GPU" ${CUDA_FOUND})
option(WITH_TENSORRT "Compile PaddlePaddle with NVIDIA TensorRT" OFF)
option(WITH_XPU "Compile PaddlePaddle with BAIDU KUNLUN" OFF)
option(WITH_XPU "Compile PaddlePaddle with BAIDU KUNLUN XPU" OFF)
option(WITH_WIN_DUMP_DBG "Compile with windows core dump debug mode" OFF)
if (WITH_GPU AND WITH_XPU)
message(FATAL_ERROR "Error when compile GPU and XPU at the same time")
......@@ -166,6 +166,7 @@ option(WITH_DGC "Use DGC(Deep Gradient Compression) or not" ${WITH_DISTRIBUTE}
option(SANITIZER_TYPE "Choose the type of sanitizer, options are: Address, Leak, Memory, Thread, Undefined" OFF)
option(WITH_LITE "Compile Paddle Fluid with Lite Engine" OFF)
option(WITH_NCCL "Compile PaddlePaddle with NCCL support" ON)
option(WITH_XPU_BKCL "Compile PaddlePaddle with BAIDU KUNLUN XPU BKCL" OFF)
option(WITH_CRYPTO "Compile PaddlePaddle with crypto support" ON)
option(WITH_ARM "Compile PaddlePaddle with arm support" OFF)
option(WITH_SW "Compile PaddlePaddle with sw support" OFF)
......@@ -213,6 +214,13 @@ if (NOT WITH_GPU AND WITH_NCCL)
"Disable NCCL when compiling without GPU" FORCE)
endif()
if (NOT WITH_XPU AND WITH_XPU_BKCL)
MESSAGE(WARNING
"Disable BKCL when compiling without XPU. Force WITH_XPU_BKCL=OFF.")
set(WITH_XPU_BKCL OFF CACHE STRING
"Disable BKCL when compiling without XPU" FORCE)
endif()
if(WITH_NCCL)
add_definitions("-DPADDLE_WITH_NCCL")
include(nccl)
......
......@@ -47,4 +47,18 @@ set_property(TARGET shared_xpuapi PROPERTY IMPORTED_LOCATION "${XPU_API_LIB}")
generate_dummy_static_lib(LIB_NAME "xpulib" GENERATOR "xpu.cmake")
TARGET_LINK_LIBRARIES(xpulib ${XPU_API_LIB} ${XPU_RT_LIB})
if (WITH_XPU_BKCL)
MESSAGE(STATUS "Compile with XPU BKCL!")
ADD_DEFINITIONS(-DPADDLE_WITH_XPU_BKCL)
SET(XPU_BKCL_LIB_NAME "libbkcl.so")
SET(XPU_BKCL_LIB "${XPU_LIB_DIR}/${XPU_BKCL_LIB_NAME}")
SET(XPU_BKCL_INC_DIR "${THIRD_PARTY_PATH}/install/xpu/include")
INCLUDE_DIRECTORIES(${XPU_BKCL_INC_DIR})
TARGET_LINK_LIBRARIES(xpulib ${XPU_API_LIB} ${XPU_RT_LIB} ${XPU_BKCL_LIB})
else(WITH_XPU_BKCL)
TARGET_LINK_LIBRARIES(xpulib ${XPU_API_LIB} ${XPU_RT_LIB} )
endif(WITH_XPU_BKCL)
ADD_DEPENDENCIES(xpulib ${XPU_PROJECT})
......@@ -43,6 +43,19 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
"number of local scopes is %d.",
places_.size(), local_scopes_.size()));
}
#elif defined(PADDLE_WITH_XPU_BKCL)
AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::BKCLCommunicator *ctxs)
: BKCLOpHandleBase(node, places, ctxs), local_scopes_(local_scopes) {
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size(),
platform::errors::InvalidArgument(
"The number of places and the number of local scopes "
"should be equal, but got number of places is %d and "
"number of local scopes is %d.",
places_.size(), local_scopes_.size()));
}
#else
AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
......@@ -98,6 +111,9 @@ void AllReduceOpHandle::AllReduceImpl(
places.reserve(num_places);
int64_t numel = -1;
bool is_gpu_place = false;
#if defined(PADDLE_WITH_XPU_BKCL)
bool is_xpu_place = false;
#endif
auto dtype = static_cast<framework::proto::VarType::Type>(0);
for (size_t i = 0; i < local_exec_scopes_.size(); ++i) {
auto &local_scope = local_exec_scopes_[i];
......@@ -117,6 +133,9 @@ void AllReduceOpHandle::AllReduceImpl(
in_var_handles[i]->name(), numel));
dtype = lod_tensor.type();
is_gpu_place = platform::is_gpu_place(lod_tensor.place());
#if defined(PADDLE_WITH_XPU_BKCL)
is_xpu_place = platform::is_xpu_place(lod_tensor.place());
#endif
}
PADDLE_ENFORCE_EQ(
numel, static_cast<int64_t>(lod_tensor.numel()),
......@@ -128,6 +147,12 @@ void AllReduceOpHandle::AllReduceImpl(
platform::errors::PreconditionNotMet(
"The dtype of tensors of the same variable in different local "
"scopes should be equal."));
#if defined(PADDLE_WITH_XPU_BKCL)
PADDLE_ENFORCE_EQ(is_xpu_place, platform::is_xpu_place(lod_tensor.place()),
platform::errors::PreconditionNotMet(
"The place type of tensors of the same variable "
"in different local scopes should be equal."));
#endif
PADDLE_ENFORCE_EQ(is_gpu_place, platform::is_gpu_place(lod_tensor.place()),
platform::errors::PreconditionNotMet(
"The place type of tensors of the same variable "
......@@ -179,6 +204,25 @@ void AllReduceOpHandle::AllReduceFunc(
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with CUDA."));
#endif
} else if (is_xpu_place(places[0])) {
#if defined(PADDLE_WITH_XPU_BKCL)
PADDLE_ENFORCE_NOT_NULL(bkcl_ctxs_,
platform::errors::InvalidArgument(
"The bkcl context should not be NULL."));
BKCLDataType bkcl_dtype = platform::ToBKCLDataType(dtype);
std::vector<std::function<void()>> all_reduce_calls;
for (size_t i = 0; i < local_exec_scopes_.size(); ++i) {
auto &p = places[i];
void *buffer = const_cast<void *>(lod_tensor_data.at(i));
all_reduce_calls.emplace_back([=] {
BKCLAllReduce(p, buffer, buffer, numel, bkcl_dtype, BKCL_ADD);
});
}
BKCLAllReduceFunc(all_reduce_calls);
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with BKCL."));
#endif
} else { // Special handle CPU only Operator's gradient. Like CRF
auto &trg = *local_exec_scopes_[0]
......@@ -205,6 +249,27 @@ void AllReduceOpHandle::AllReduceFunc(
VLOG(10) << Name() << " size:" << numel * SizeOfType(dtype);
}
#if defined(PADDLE_WITH_XPU_BKCL)
void AllReduceOpHandle::BKCLAllReduceFunc(
const std::vector<std::function<void()>> &all_reduce_calls) {
this->RunAndRecordEvent([&] {
if (all_reduce_calls.size() == 1UL) {
all_reduce_calls[0]();
} else {
PADDLE_ENFORCE_EQ(
bkcl_group_start(), BKCL_SUCCESS,
platform::errors::PreconditionNotMet("bkcl_group_start failed"));
for (auto &call : all_reduce_calls) {
call();
}
PADDLE_ENFORCE_EQ(
bkcl_group_end(), BKCL_SUCCESS,
platform::errors::PreconditionNotMet("bkcl_group_end failed"));
}
});
}
#endif
#if defined(PADDLE_WITH_NCCL)
void AllReduceOpHandle::NCCLAllReduceFunc(
const std::vector<std::function<void()>> &all_reduce_calls) {
......
......@@ -34,6 +34,9 @@ class NCCLCommunicator;
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/framework/details/nccl_op_handle.h"
#include "paddle/fluid/platform/nccl_helper.h"
#elif defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/framework/details/bkcl_op_handle.h"
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
namespace paddle {
......@@ -46,6 +49,12 @@ class AllReduceOpHandle : public NCCLOpHandleBase {
AllReduceOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::NCCLCommunicator *ctxs);
#elif defined(PADDLE_WITH_XPU_BKCL)
class AllReduceOpHandle : public BKCLOpHandleBase {
public:
AllReduceOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::BKCLCommunicator *ctxs);
#else
class AllReduceOpHandle : public OpHandleBase {
public:
......@@ -65,8 +74,8 @@ class AllReduceOpHandle : public OpHandleBase {
std::vector<Scope *> local_scopes_;
#ifndef PADDLE_WITH_NCCL
// NCCLOpHandleBase already have these attributes.
#if !(PADDLE_WITH_NCCL || PADDLE_WITH_XPU_BKCL)
// NCCLOpHandleBase and BKCLOpHandleBase already have these attributes.
// Will polish it by class inheritance framework.
std::vector<platform::Place> places_;
#endif
......@@ -78,6 +87,11 @@ class AllReduceOpHandle : public OpHandleBase {
void SyncNCCLAllReduce();
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
void BKCLAllReduceFunc(
const std::vector<std::function<void()>> &all_reduce_calls);
#endif
void AllReduceImpl(const std::vector<VarHandle *> &in_var_handles,
const std::vector<VarHandle *> &out_var_handles);
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "xpu/bkcl.h"
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/bkcl_helper.h"
DECLARE_bool(sync_bkcl_allreduce);
namespace paddle {
namespace framework {
namespace details {
class BKCLOpHandleBase : public OpHandleBase {
public:
BKCLOpHandleBase(ir::Node* node, const std::vector<platform::Place>& places,
const platform::BKCLCommunicator* bkcl_ctxs)
: OpHandleBase(node), places_(places), bkcl_ctxs_(bkcl_ctxs) {
if (bkcl_ctxs == nullptr) {
return;
}
// init device context
auto default_bkcl_ctxs = bkcl_ctxs_->DefaultFlatCtx();
for (auto& p : places_) {
this->SetDeviceContext(p, default_bkcl_ctxs->DevCtx(p));
}
}
virtual ~BKCLOpHandleBase() {}
void SetRunEnv(int run_order, bool use_hierarchical_allreduce) {
PADDLE_ENFORCE_GE(
run_order, 0,
platform::errors::InvalidArgument(
"The argument run_order must be >= 0, but got %d.", run_order));
PADDLE_ENFORCE_NE(use_hierarchical_allreduce, true,
platform::errors::Unimplemented(
"xpu doesn't support hierarchical_allreduce"));
run_order_ = run_order;
use_hierarchical_allreduce_ = use_hierarchical_allreduce;
VLOG(10) << "SetRunEnv "
<< " run_order:" << run_order
<< ", use_hierarchical_allreduce:" << use_hierarchical_allreduce;
if (bkcl_ctxs_ == nullptr) {
return;
}
if (!use_hierarchical_allreduce_) {
auto ctxs = bkcl_ctxs_->GetFlatCtx(run_order);
for (auto& p : places_) {
this->SetDeviceContext(p, ctxs->DevCtx(p));
}
return;
}
}
void FlatBKCLAllReduce(platform::Place place, const void* sendbuff,
void* recvbuff, size_t count, BKCLDataType datatype,
BKCLOp op) {
PADDLE_ENFORCE_GE(
run_order_, 0,
platform::errors::InvalidArgument(
"The argument run_order_ must be >= 0, but got %d.", run_order_));
auto flat_bkcl_ctxs = bkcl_ctxs_->GetFlatCtx(run_order_);
int dev_id = BOOST_GET_CONST(platform::XPUPlace, place).device;
auto& bkcl_ctx = flat_bkcl_ctxs->at(dev_id);
auto comm = bkcl_ctx.comm_;
VLOG(10) << "before all reduce buffer:" << sendbuff << ", numel:" << count
<< ", dev_id:" << dev_id << ", dtype:" << datatype
<< ", place:" << place;
PADDLE_ENFORCE_EQ(
bkcl_all_reduce(comm, sendbuff, recvbuff, count, datatype, op, NULL),
BKCL_SUCCESS,
platform::errors::PreconditionNotMet("bckl all reduce failed"));
}
void BKCLAllReduce(platform::Place place, const void* sendbuff,
void* recvbuff, size_t count, BKCLDataType datatype,
BKCLOp op) {
PADDLE_ENFORCE_GE(
run_order_, 0,
platform::errors::InvalidArgument(
"The argument run_order_ must be >= 0, but got %d.", run_order_));
PADDLE_ENFORCE_EQ(use_hierarchical_allreduce_, false,
platform::errors::Unimplemented(
"xpu doesn't support hierarchical all reduce"));
if (!use_hierarchical_allreduce_) {
FlatBKCLAllReduce(place, sendbuff, recvbuff, count, datatype, op);
return;
}
}
protected:
std::vector<platform::Place> places_;
const platform::BKCLCommunicator* bkcl_ctxs_{nullptr};
// When multi trainer call collective function, they need run the same order.
// Or the program will hang.So we use allreduce_deps_pass to set this
// run_order_.
int run_order_{0};
// Use 2d allreduce or not.
bool use_hierarchical_allreduce_{false};
};
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -80,7 +80,7 @@ void BroadcastOpHandle::BroadcastOneVar(
&VariableVisitor::GetMutableTensor(out_var));
});
}
} else {
} else if (platform::is_gpu_place(in_tensor.place())) {
#if defined(PADDLE_WITH_NCCL)
VarHandle *out_handle = nullptr;
int root_id =
......@@ -141,6 +141,72 @@ void BroadcastOpHandle::BroadcastOneVar(
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with NCLL."));
#endif
} else {
#if defined(PADDLE_WITH_XPU_BKCL)
VarHandle *out_handle = nullptr;
int root_id = BOOST_GET_CONST(platform::XPUPlace, in_tensor.place()).device;
std::vector<std::function<void()>> broadcast_calls;
int type = platform::ToBKCLDataType(in_tensor.type());
size_t numel = static_cast<size_t>(in_tensor.numel());
for (auto out_var_handle : out_var_handles) {
Variable *out_var = var_scopes.at(out_var_handle->scope_idx())
->FindVar(out_var_handle->name());
int dst_id =
BOOST_GET_CONST(platform::XPUPlace, out_var_handle->place()).device;
auto &bkcl_ctx = bkcl_ctxs_->at(dst_id);
void *send_recv_buffer = nullptr;
if (root_id == dst_id) {
send_recv_buffer = const_cast<void *>(in_tensor.data<void>());
out_handle = out_var_handle;
} else {
send_recv_buffer = VariableVisitor::GetMutableTensor(out_var)
.Resize(in_tensor.dims())
.mutable_data(out_var_handle->place());
}
broadcast_calls.emplace_back([send_recv_buffer, numel, type, root_id,
&bkcl_ctx] {
PADDLE_ENFORCE_EQ(
bkcl_broadcast(bkcl_ctx.comm(), send_recv_buffer, send_recv_buffer,
numel, static_cast<BKCLDataType>(type), root_id,
nullptr),
BKCL_SUCCESS,
platform::errors::Unavailable("bkcl_broadcast failed"));
});
}
WaitInputVarGenerated();
this->RunAndRecordEvent([&] {
{
PADDLE_ENFORCE_EQ(
bkcl_group_start(), BKCL_SUCCESS,
platform::errors::Unavailable("bkcl_group_start failed"));
for (auto &call : broadcast_calls) {
call();
}
PADDLE_ENFORCE_EQ(
bkcl_group_end(), BKCL_SUCCESS,
platform::errors::Unavailable("bkcl_group_end failed"));
}
if (!out_handle->IsTheSameVar(in_var_handle)) {
auto out_var = var_scopes.at(in_var_handle.scope_idx())
->FindVar(out_var_handles[0]->name());
paddle::framework::TensorCopy(
in_tensor, in_var_handle.place(),
*(dev_ctxes_.at(in_var_handle.place())),
&VariableVisitor::GetMutableTensor(out_var));
}
});
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with BKCL."));
#endif
}
}
......
......@@ -34,12 +34,19 @@ class Node;
} // namespace ir
} // namespace framework
namespace platform {
#if defined(PADDLE_WITH_NCCL)
struct NCCLContextMap;
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
struct BKCLContextMap;
#endif
} // namespace platform
} // namespace paddle
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/platform/nccl_helper.h"
#elif defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
namespace paddle {
......@@ -63,11 +70,26 @@ struct BroadcastOpHandle : public OpHandleBase {
}
}
}
#else
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
BroadcastOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::BKCLContextMap *bkcl_ctxs)
: OpHandleBase(node),
local_scopes_(local_scopes),
places_(places),
bkcl_ctxs_(bkcl_ctxs) {
if (bkcl_ctxs_) {
for (auto &p_ctx : bkcl_ctxs_->contexts_) {
this->SetDeviceContext(platform::XPUPlace(p_ctx.first),
p_ctx.second.ctx_.get());
}
}
}
#endif
BroadcastOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: OpHandleBase(node), local_scopes_(local_scopes), places_(places) {}
#endif
std::string Name() const override;
......@@ -86,6 +108,8 @@ struct BroadcastOpHandle : public OpHandleBase {
std::vector<platform::Place> places_;
#if defined(PADDLE_WITH_NCCL)
const platform::NCCLContextMap *nccl_ctxs_;
#elif defined(PADDLE_WITH_XPU_BKCL)
const platform::BKCLContextMap *bkcl_ctxs_;
#endif
void InitOutputValue(const VarHandle &in_var_handle,
......
......@@ -18,10 +18,12 @@ namespace paddle {
namespace framework {
namespace details {
using DeviceType = paddle::platform::DeviceType;
TEST(BroadcastTester, TestCPUBroadcastTestLodTensor) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitCtxOnDevice(p::kCPU);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastLodTensor(input_scope_idx);
}
......@@ -29,7 +31,7 @@ TEST(BroadcastTester, TestCPUBroadcastTestLodTensor) {
TEST(BroadcastTester, TestCPUBroadcastTestSelectedRows) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitCtxOnDevice(p::kCPU);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastSelectedRows(input_scope_idx);
}
......@@ -38,7 +40,7 @@ TEST(BroadcastTester, TestCPUBroadcastTestSelectedRows) {
TEST(BroadcastTester, TestGPUBroadcastTestLodTensor) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(true);
test_op.InitCtxOnDevice(p::kCUDA);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastLodTensor(input_scope_idx);
}
......@@ -46,12 +48,22 @@ TEST(BroadcastTester, TestGPUBroadcastTestLodTensor) {
TEST(BroadcastTester, TestGPUBroadcastTestSelectedRows) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(true);
test_op.InitCtxOnDevice(p::kCUDA);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastSelectedRows(input_scope_idx);
}
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
TEST(BroadcastTester, TestXPUBroadcastTestLodTensor) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnDevice(p::kXPU);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastLodTensor(input_scope_idx);
}
#endif
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -33,7 +33,7 @@ struct VarHandle;
namespace f = paddle::framework;
namespace p = paddle::platform;
using UseDevice = paddle::framework::details::ExecutionStrategy::UseDevice;
using DeviceType = paddle::platform::DeviceType;
// test data amount
const f::DDim kDims = {20, 20};
......@@ -47,11 +47,15 @@ struct TestBroadcastOpHandle {
std::vector<VarHandleBase*> vars_;
std::vector<std::unique_ptr<ir::Node>> nodes_;
std::vector<p::Place> place_list_;
bool use_gpu_;
DeviceType use_device_;
#if defined(PADDLE_WITH_NCCL)
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
std::unique_ptr<platform::BKCLContextMap> bkcl_ctxs_;
#endif
void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
......@@ -60,12 +64,36 @@ struct TestBroadcastOpHandle {
if (nccl_ctxs_) {
nccl_ctxs_->WaitAll();
}
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
if (bkcl_ctxs_) {
bkcl_ctxs_->WaitAll();
}
#endif
}
void InitCtxOnGpu(bool use_gpu) {
use_gpu_ = use_gpu;
if (use_gpu_) {
void InitCtxOnDevice(DeviceType use_device) {
use_device_ = use_device;
if (use_device_ == p::kXPU) {
#if defined(PADDLE_WITH_XPU_BKCL)
int count = p::GetXPUDeviceCount();
if (count <= 1) {
LOG(WARNING) << "Cannot test multi-xpu Broadcast, because the XPU "
"device count is "
<< count;
exit(0);
}
for (int i = 0; i < count; ++i) {
auto p = p::XPUPlace(i);
place_list_.push_back(p);
ctxs_.emplace_back(new p::XPUDeviceContext(p));
}
bkcl_ctxs_.reset(new platform::BKCLContextMap(place_list_));
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with BKCL."));
#endif
} else if (use_device_ == p::kCUDA) {
#if defined(PADDLE_WITH_NCCL)
int count = p::GetCUDADeviceCount();
if (count <= 1) {
......@@ -91,6 +119,9 @@ struct TestBroadcastOpHandle {
place_list_.push_back(p);
ctxs_.emplace_back(new p::CPUDeviceContext(p));
}
#if defined(PADDLE_WITH_XPU_BKCL)
bkcl_ctxs_.reset(nullptr);
#endif
#if defined(PADDLE_WITH_NCCL)
nccl_ctxs_.reset(nullptr);
#endif
......@@ -111,22 +142,25 @@ struct TestBroadcastOpHandle {
nodes_.emplace_back(
ir::CreateNodeForTest("node0", ir::Node::Type::kOperation));
if (use_gpu_) {
if (use_device_ == p::kCUDA) {
#if defined(PADDLE_WITH_NCCL)
op_handle_ = new BroadcastOpHandle(nodes_.back().get(), local_scopes_,
place_list_, nccl_ctxs_.get());
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with NCLL."));
platform::errors::PreconditionNotMet("Not compiled with NCCL."));
#endif
} else {
#if defined(PADDLE_WITH_NCCL)
} else if (use_device_ == p::kXPU) {
#if defined(PADDLE_WITH_XPU_BKCL)
op_handle_ = new BroadcastOpHandle(nodes_.back().get(), local_scopes_,
place_list_, nccl_ctxs_.get());
place_list_, bkcl_ctxs_.get());
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with BKCL."));
#endif
} else {
op_handle_ = new BroadcastOpHandle(nodes_.back().get(), local_scopes_,
place_list_);
#endif
}
op_handle_->SetLocalExecScopes(scope_map);
......@@ -149,7 +183,7 @@ struct TestBroadcastOpHandle {
op_handle_->AddInput(dummy_var_handle);
for (size_t j = 0; j < place_list_.size(); ++j) {
if (!use_gpu_) {
if (use_device_ != p::kCUDA) {
op_handle_->SetDeviceContext(place_list_[j], ctxs_[j].get());
}
nodes_.emplace_back(
......@@ -275,7 +309,7 @@ struct TestBroadcastOpHandle {
f::LoD lod{{0, 10, 20}};
auto send_vector = InitLoDTensor("input", input_scope_idx, lod);
UseDevice use_device = UseDevice::kCPU;
DeviceType use_device = p::kCPU;
op_handle_->Run(use_device);
WaitAll();
......@@ -290,7 +324,7 @@ struct TestBroadcastOpHandle {
int height = static_cast<int>(kDims[0] * 2);
auto send_vector = InitSelectedRows("input", input_scope_idx, rows, height);
UseDevice use_device = UseDevice::kCPU;
DeviceType use_device = p::kCPU;
op_handle_->Run(use_device);
WaitAll();
......
......@@ -313,10 +313,13 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
const std::vector<Scope *> &local_scopes,
const size_t &nranks,
#if defined(PADDLE_WITH_NCCL)
const bool use_cuda,
DeviceType use_device,
platform::NCCLCommunicator *nccl_ctxs) const {
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
DeviceType use_device,
platform::BKCLCommunicator *bkcl_ctxs) const {
#else
const bool use_cuda) const {
DeviceType use_device) const {
#endif
VLOG(1) << "apply all passes";
// Create a default one if not finalized by user.
......@@ -336,9 +339,16 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
pass->Set<size_t>(kNRanks, new size_t(nranks));
#if defined(PADDLE_WITH_NCCL)
platform::NCCLCommunicator *nctx = use_cuda ? nccl_ctxs : nullptr;
platform::NCCLCommunicator *nctx =
(use_device == p::kCUDA) ? nccl_ctxs : nullptr;
pass->Erase(kNCCLCtxs);
pass->SetNotOwned<platform::NCCLCommunicator>(kNCCLCtxs, nctx);
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
// ToDo: more check
platform::BKCLCommunicator *bkcl_ctx =
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
pass->Erase(kBKCLCtxs);
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, bkcl_ctx);
#endif
} else if (pass->Type() == "fuse_all_reduce_op_pass") {
pass->Erase(kNRanks);
......@@ -349,12 +359,24 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
pass->SetNotOwned<const std::vector<Scope *>>(kLocalScopes,
&local_scopes);
#if defined(PADDLE_WITH_NCCL)
platform::NCCLCommunicator *nctx = use_cuda ? nccl_ctxs : nullptr;
platform::NCCLCommunicator *nctx =
(use_device == p::kCUDA) ? nccl_ctxs : nullptr;
pass->Erase(kNCCLCtxs);
pass->SetNotOwned<platform::NCCLCommunicator>(kNCCLCtxs, nctx);
pass->Erase(kUseHierarchicalAllReduce);
pass->Set<bool>(kUseHierarchicalAllReduce,
new bool(use_hierarchical_allreduce_));
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
platform::BKCLCommunicator *nctx =
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
pass->Erase(kBKCLCtxs);
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, nctx);
pass->Erase(kUseHierarchicalAllReduce);
PADDLE_ENFORCE_EQ(use_hierarchical_allreduce_, false,
platform::errors::Unimplemented(
"xpu doesn't support hierarchical_allreduce"));
pass->Set<bool>(kUseHierarchicalAllReduce,
new bool(use_hierarchical_allreduce_));
#endif
} else if (pass->Type() == "coalesce_grad_tensor_pass") {
pass->Erase(kNRanks);
......@@ -364,35 +386,47 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
<< enable_sequential_execution_;
} else if (pass->Type() == "all_reduce_deps_pass") {
#if defined(PADDLE_WITH_NCCL)
platform::NCCLCommunicator *nctx = use_cuda ? nccl_ctxs : nullptr;
platform::NCCLCommunicator *nctx =
(use_device == p::kCUDA) ? nccl_ctxs : nullptr;
pass->Erase(kNCCLCtxs);
pass->SetNotOwned<platform::NCCLCommunicator>(kNCCLCtxs, nctx);
pass->Erase(kUseHierarchicalAllReduce);
pass->Set<bool>(kUseHierarchicalAllReduce,
new bool(use_hierarchical_allreduce_));
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
platform::BKCLCommunicator *nctx =
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
pass->Erase(kBKCLCtxs);
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, nctx);
pass->Erase(kUseHierarchicalAllReduce);
PADDLE_ENFORCE_EQ(use_hierarchical_allreduce_, false,
platform::errors::Unimplemented(
"xpu doesn't support hierarchical_allreduce"));
pass->Set<bool>(kUseHierarchicalAllReduce,
new bool(use_hierarchical_allreduce_));
#endif
VLOG(1) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this)
<< ", num_trainers:" << num_trainers_;
} else if (pass->Type() == "fuse_relu_depthwise_conv_pass") {
if (!use_cuda) {
if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_relu_depthwise_conv_pass is only supported on "
"GPU, skipped.";
continue;
}
} else if (pass->Type() == "fusion_group_pass") {
pass->Set<bool>("use_gpu", new bool(use_cuda));
if (!use_cuda) {
pass->Set<bool>("use_gpu", new bool((use_device == p::kCUDA)));
if (use_device != p::kCUDA) {
LOG(WARNING) << "fusion_group_pass is only supported on GPU, skipped.";
continue;
}
} else if (pass->Type() == "fuse_bn_act_pass") {
if (!use_cuda) {
if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_bn_act_pass is only supported on "
"GPU, skipped.";
continue;
}
} else if (pass->Type() == "fuse_bn_add_act_pass") {
if (!use_cuda) {
if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_bn_add_act_pass is only supported on "
"GPU, skipped.";
continue;
......@@ -401,7 +435,7 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
pass->Set("mkldnn_enabled_op_types",
new std::unordered_set<std::string>(mkldnn_enabled_op_types_));
} else if (pass->Type() == "backward_optimizer_op_deps_pass") {
if (!use_cuda) {
if (use_device != p::kCUDA) {
VLOG(1) << "backward_optimizer_op_deps_pass is only supported on "
"GPU, skipped.";
continue;
......
......@@ -41,11 +41,15 @@ class NCCLCommunicator;
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/platform/nccl_helper.h"
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
namespace paddle {
namespace framework {
namespace details {
using DeviceType = paddle::platform::DeviceType;
namespace p = paddle::platform;
struct BuildStrategy {
// ParallelExecutor supports two modes of ReduceStrategy, kAllReduce and
......@@ -147,6 +151,7 @@ struct BuildStrategy {
// NCCL config
size_t nccl_comm_num_{1};
size_t bkcl_comm_num_{1};
// The picture is here:
// https://github.com/PaddlePaddle/Paddle/pull/17263#discussion_r285411396
bool use_hierarchical_allreduce_{false};
......@@ -181,10 +186,13 @@ struct BuildStrategy {
const std::vector<Scope *> &local_scopes,
const size_t &nranks,
#if defined(PADDLE_WITH_NCCL)
const bool use_cuda,
DeviceType use_device,
platform::NCCLCommunicator *nccl_ctxs) const;
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
DeviceType use_device,
platform::BKCLCommunicator *bkcl_ctxs) const;
#else
const bool use_cuda) const;
DeviceType use_device) const;
#endif
// If set true, ParallelExecutor would build the main_program into multiple
......
......@@ -14,22 +14,19 @@
#pragma once
#include <cstddef> // for size_t
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
namespace details {
using DeviceType = paddle::platform::DeviceType;
namespace p = paddle::platform;
struct ExecutionStrategy {
enum ExecutorType { kDefault = 0, kExperimental = 1 };
enum UseDevice {
kCPU = 0,
kCUDA = 1,
kXPU = 2,
};
// num_threads indicates the size of thread pool.
size_t num_threads_{0};
UseDevice use_device_{kCUDA};
DeviceType use_device_ = p::kCUDA;
// Note that allow_op_delay is invalid now.
bool allow_op_delay_{false};
// num_iteration_per_drop_scope indicates how many
......
......@@ -37,6 +37,13 @@ FusedAllReduceOpHandle::FusedAllReduceOpHandle(
const platform::NCCLCommunicator *ctxs)
: AllReduceOpHandle(node, local_scopes, places, ctxs),
num_of_all_reduce_(num_of_all_reduce) {}
#elif defined(PADDLE_WITH_XPU_BKCL)
FusedAllReduceOpHandle::FusedAllReduceOpHandle(
ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const size_t num_of_all_reduce,
const platform::BKCLCommunicator *ctxs)
: AllReduceOpHandle(node, local_scopes, places, ctxs),
num_of_all_reduce_(num_of_all_reduce) {}
#else
FusedAllReduceOpHandle::FusedAllReduceOpHandle(
ir::Node *node, const std::vector<Scope *> &local_scopes,
......@@ -73,9 +80,14 @@ void FusedAllReduceOpHandle::RunImpl() {
"handles is %d, and the number of output variable handles is %d.",
in_var_handles.size(), out_var_handles.size()));
// Note: some gradient op doesn't have CUDAKernel, so the gradients of
// those op are in CPUPlace, in this case, the all reduce should not be fused.
// Note: some gradient op doesn't have CUDAKernel, so the gradients of
// those op are in CPUPlace, in this case, the all reduce should not be fused.
#if defined(PADDLE_WITH_XPU_BKCL)
// TODO(liuyuhui): XPU don't support fuse all reduce for now
if (InputIsInDifferentPlace(in_var_handles) || true) {
#else
if (InputIsInDifferentPlace(in_var_handles)) {
#endif
for (size_t j = 0; j < num_of_all_reduce_; ++j) {
std::vector<VarHandle *> dev_inputs;
std::vector<VarHandle *> dev_outputs;
......
......@@ -36,6 +36,8 @@ class NCCLCommunicator;
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/framework/details/nccl_op_handle.h"
#include "paddle/fluid/platform/nccl_helper.h"
#elif defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
namespace paddle {
......@@ -49,6 +51,13 @@ struct FusedAllReduceOpHandle : public AllReduceOpHandle {
const std::vector<platform::Place> &places,
const size_t num_of_all_reduce,
const platform::NCCLCommunicator *ctxs);
#elif defined(PADDLE_WITH_XPU_BKCL)
struct FusedAllReduceOpHandle : public AllReduceOpHandle {
FusedAllReduceOpHandle(ir::Node *node,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const size_t num_of_all_reduce,
const platform::BKCLCommunicator *ctxs);
#else
struct FusedAllReduceOpHandle : public AllReduceOpHandle {
FusedAllReduceOpHandle(ir::Node *node,
......
......@@ -52,11 +52,18 @@ struct FusedBroadcastOpHandle : public BroadcastOpHandle {
const std::vector<platform::Place> &places,
const platform::NCCLContextMap *nccl_ctx)
: BroadcastOpHandle(node, local_scopes, places, nccl_ctx) {}
#else
FusedBroadcastOpHandle(ir::Node* node, const std::vector<Scope*> local_scopes,
const std::vector<platform::Place>& places)
: BroadcastOpHandle(node, local_scopes, places) {}
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
FusedBroadcastOpHandle(ir::Node *node,
const std::vector<Scope *> local_scopes,
const std::vector<platform::Place> &places,
const platform::BKCLContextMap *bkcl_ctx)
: BroadcastOpHandle(node, local_scopes, places, bkcl_ctx) {}
#endif
FusedBroadcastOpHandle(ir::Node *node,
const std::vector<Scope *> local_scopes,
const std::vector<platform::Place> &places)
: BroadcastOpHandle(node, local_scopes, places) {}
std::string Name() const override;
protected:
......
......@@ -32,7 +32,7 @@ namespace framework {
namespace details {
struct VarHandle;
using UseDevice = paddle::framework::details::ExecutionStrategy::UseDevice;
using DeviceType = paddle::platform::DeviceType;
struct TestFusedBroadcastOpHandle : TestBroadcastOpHandle {
std::vector<std::string> out_varnames_;
......@@ -56,7 +56,7 @@ struct TestFusedBroadcastOpHandle : TestBroadcastOpHandle {
// create op handle node
nodes_.emplace_back(
ir::CreateNodeForTest("fused_broadcast", ir::Node::Type::kOperation));
if (use_gpu_) {
if (use_device_ == p::kCUDA) {
#if defined(PADDLE_WITH_NCCL)
op_handle_ = new FusedBroadcastOpHandle(
nodes_.back().get(), local_scopes_, place_list_, nccl_ctxs_.get());
......@@ -64,14 +64,17 @@ struct TestFusedBroadcastOpHandle : TestBroadcastOpHandle {
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with CUDA."));
#endif
} else {
#if defined(PADDLE_WITH_NCCL)
} else if (use_device_ == p::kXPU) {
#if defined(PADDLE_WITH_XPU_BKCL)
op_handle_ = new FusedBroadcastOpHandle(
nodes_.back().get(), local_scopes_, place_list_, nccl_ctxs_.get());
nodes_.back().get(), local_scopes_, place_list_, bkcl_ctxs_.get());
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with XPU."));
#endif
} else {
op_handle_ = new FusedBroadcastOpHandle(nodes_.back().get(),
local_scopes_, place_list_);
#endif
}
op_handle_->SetLocalExecScopes(scope_map);
......@@ -109,7 +112,7 @@ struct TestFusedBroadcastOpHandle : TestBroadcastOpHandle {
InitLoDTensor(varname, input_scope_idxes[i], lod, val_scalar));
}
UseDevice use_device = UseDevice::kCPU;
DeviceType use_device = p::kCPU;
op_handle_->Run(use_device);
WaitAll();
......@@ -133,7 +136,7 @@ struct TestFusedBroadcastOpHandle : TestBroadcastOpHandle {
rows, height, val_scalar));
}
UseDevice use_device = UseDevice::kCPU;
DeviceType use_device = p::kCPU;
op_handle_->Run(use_device);
WaitAll();
......@@ -150,7 +153,7 @@ struct TestFusedBroadcastOpHandle : TestBroadcastOpHandle {
TEST(FusedBroadcastTester, CPULodTensor) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnGpu(false);
test_op.InitCtxOnDevice(p::kCPU);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastLoDTensor(input_scope_idxes);
}
......@@ -158,7 +161,7 @@ TEST(FusedBroadcastTester, CPULodTensor) {
TEST(FusedBroadcastTester, CPUSelectedRows) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnGpu(false);
test_op.InitCtxOnDevice(p::kCPU);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastSelectedRows(input_scope_idxes);
}
......@@ -167,7 +170,7 @@ TEST(FusedBroadcastTester, CPUSelectedRows) {
TEST(FusedBroadcastTester, GPULodTensor) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnGpu(true);
test_op.InitCtxOnDevice(p::kCUDA);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastLoDTensor(input_scope_idxes);
}
......@@ -175,12 +178,22 @@ TEST(FusedBroadcastTester, GPULodTensor) {
TEST(FusedBroadcastTester, GPUSelectedRows) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnGpu(true);
test_op.InitCtxOnDevice(p::kCUDA);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastSelectedRows(input_scope_idxes);
}
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
TEST(FusedBroadcastTester, XPULodTensor) {
TestFusedBroadcastOpHandle test_op;
std::vector<size_t> input_scope_idxes = {0, 1};
test_op.InitCtxOnDevice(p::kXPU);
test_op.InitFusedBroadcastOp(input_scope_idxes);
test_op.TestFusedBroadcastLoDTensor(input_scope_idxes);
}
#endif
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -27,7 +27,7 @@ struct DummyVarHandle;
namespace f = paddle::framework;
namespace p = paddle::platform;
using UseDevice = paddle::framework::details::ExecutionStrategy::UseDevice;
using DeviceType = paddle::platform::DeviceType;
// test data amount
const f::DDim kDims = {20, 20};
......@@ -173,7 +173,7 @@ struct TestGatherOpHandle {
out_selected_rows->mutable_value()->ShareDataWith(
in_selected_rows->value());
UseDevice use_device = UseDevice::kCPU;
DeviceType use_device = p::kCPU;
op_handle_->Run(use_device);
WaitAll();
......
......@@ -55,6 +55,7 @@ constexpr char kPlaces[] = "places";
constexpr char kGlobalScope[] = "global_scope";
constexpr char kLocalScopes[] = "local_scopes";
constexpr char kNCCLCtxs[] = "nccl_ctxs";
constexpr char kBKCLCtxs[] = "bkcl_ctxs";
constexpr char kUseHierarchicalAllReduce[] = "use_hierarchical_allreduce";
// aux variables to represent dependency. Useful to resolve data hazard.
......
......@@ -82,21 +82,74 @@ void OpHandleBase::InitCUDA() {
}
}
}
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't use CUDA device since it's not compiled with CUDA,"
"Please recompile or reinstall Paddle with GPU support."));
#endif
}
void OpHandleBase::InitXPU() {
#ifdef PADDLE_WITH_XPU
if (IsMultiDeviceTransfer() && dev_ctxes_.size() > 0) {
for (auto &out_var : outputs_) {
auto *out_var_handle = dynamic_cast<VarHandle *>(out_var);
if (out_var_handle) {
// TODO(liuyuhui): XPU now don't support sync events, add later.
}
}
} else {
PADDLE_ENFORCE_EQ(dev_ctxes_.size(), 1UL,
platform::errors::InvalidArgument(
"%s should have only one dev_ctx.", Name()));
auto &place = dev_ctxes_.begin()->first;
int dev_id = BOOST_GET_CONST(platform::XPUPlace, place).device;
PADDLE_ENFORCE_EQ(
xpu_set_device(dev_id), XPU_SUCCESS,
platform::errors::PreconditionNotMet("xpu_set_device failed"));
for (auto &out_var : outputs_) {
auto *out_var_handle = dynamic_cast<VarHandle *>(out_var);
if (out_var_handle) {
PADDLE_ENFORCE_EQ(
platform::is_same_place(place, out_var_handle->place()), true,
platform::errors::InvalidArgument(
"The place of output(%s) is not consistent with the "
"place of current op(%s).",
out_var_handle->Name(), Name()));
}
}
}
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't use XPU device since it's not compiled with XPU,"
"Please recompile or reinstall Paddle with XPU support."));
#endif
}
void OpHandleBase::Run(ExecutionStrategy::UseDevice use_device) {
void OpHandleBase::Run(DeviceType use_device) {
#ifdef PADDLE_WITH_CUDA
if (events_.empty() && use_device == ExecutionStrategy::UseDevice::kCUDA &&
dev_ctxes_.size() > 0) {
if (events_.empty() && use_device == p::kCUDA && dev_ctxes_.size() > 0) {
InitCUDA();
}
#else
PADDLE_ENFORCE_NE(use_device, ExecutionStrategy::UseDevice::kCUDA,
platform::errors::InvalidArgument(
"Argument use_cuda should be false when Paddle is not "
"compiled with CUDA."));
PADDLE_ENFORCE_NE(
use_device, p::kCUDA,
platform::errors::InvalidArgument(
"Argument use_device should not be kCUDA when Paddle is not "
"compiled with CUDA."));
#endif
if (use_device == p::kXPU && dev_ctxes_.size() > 0) {
#ifdef PADDLE_WITH_XPU
InitXPU();
#else
PADDLE_ENFORCE_NE(
use_device, p::kXPU,
platform::errors::InvalidArgument(
"Argument use_device should not be kXPU when Paddle is not "
"compiled with XPU."));
#endif
}
// skip running current op, used with inplace_addto_op_pass
if (skip_running_) {
......
......@@ -43,7 +43,8 @@ class Node;
} // namespace ir
namespace details {
using DeviceType = paddle::platform::DeviceType;
namespace p = paddle::platform;
// Wraps ir::Node and provide helper utilities.
// It's responsible for populating necessary fields of ir::Node.
class OpHandleBase {
......@@ -72,7 +73,7 @@ class OpHandleBase {
virtual std::string Name() const = 0;
void Run(ExecutionStrategy::UseDevice use_device);
void Run(DeviceType use_device);
virtual void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx);
......@@ -145,6 +146,7 @@ class OpHandleBase {
virtual void RunImpl() = 0;
virtual void InitCUDA();
virtual void InitXPU();
ir::Node *node_;
std::vector<VarHandleBase *> inputs_;
......
......@@ -212,10 +212,64 @@ void ReduceOpHandle::RunImpl() {
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with CUDA."));
#endif
} else if (paddle::platform::is_xpu_place(lod_tensors[0]->place())) {
#if defined(PADDLE_WITH_XPU_BKCL)
auto pre_in = pre_in_var->Get<framework::LoDTensor>();
VariableVisitor::ShareDimsAndLoD(*pre_in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out_var_handle->place(), pre_in.type());
auto out_p = out_var_handle->place();
int root_id = BOOST_GET_CONST(platform::XPUPlace, out_p).device;
std::vector<std::function<void()>> all_reduce_calls;
for (size_t i = 0; i < var_scopes.size(); ++i) {
auto &p = in_places[i];
auto &lod_tensor = *lod_tensors[i];
int dev_id = BOOST_GET_CONST(platform::XPUPlace, p).device;
auto &bkcl_ctx = bkcl_ctxs_->at(dev_id);
void *buffer = const_cast<void *>(lod_tensor.data<void>());
void *recvbuffer = nullptr;
if (root_id == dev_id) {
recvbuffer =
out_var->GetMutable<framework::LoDTensor>()->mutable_data(
out_var_handle->place());
}
int type = platform::ToBKCLDataType(lod_tensor.type());
size_t numel = static_cast<size_t>(lod_tensor.numel());
all_reduce_calls.emplace_back([buffer, recvbuffer, type, numel, root_id,
&bkcl_ctx] {
PADDLE_ENFORCE_EQ(bkcl_reduce(bkcl_ctx.comm(), buffer, recvbuffer,
numel, static_cast<BKCLDataType>(type),
BKCL_ADD, root_id, nullptr),
BKCL_SUCCESS, platform::errors::Unavailable(
"bkcl_all_reduce failed"));
});
}
WaitInputVarGenerated();
this->RunAndRecordEvent([&] {
PADDLE_ENFORCE_EQ(
bkcl_group_start(), BKCL_SUCCESS,
platform::errors::Unavailable("bkcl_group_start failed"));
for (auto &call : all_reduce_calls) {
call();
}
PADDLE_ENFORCE_EQ(
bkcl_group_end(), BKCL_SUCCESS,
platform::errors::Unavailable("bkcl_group_end failed"));
});
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with XPU."));
#endif
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"The place of tensor should be CPUPlace or CUDAPlace, but got %s.",
"The place of tensor should be CPUPlace, CUDAPlace or XPUPlace, but "
"got %s.",
lod_tensors[0]->place()));
}
}
......
......@@ -41,6 +41,8 @@ struct NCCLContextMap;
} // namespace paddle
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/platform/nccl_helper.h"
#elif defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
namespace paddle {
......@@ -93,6 +95,22 @@ struct ReduceOpHandle : public OpHandleBase {
}
}
}
#elif defined(PADDLE_WITH_XPU_BKCL)
const platform::BKCLContextMap *bkcl_ctxs_;
ReduceOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::BKCLContextMap *bkcl_ctxs)
: OpHandleBase(node),
local_scopes_(local_scopes),
places_(places),
bkcl_ctxs_(bkcl_ctxs) {
if (bkcl_ctxs_) {
for (auto &p_ctx : bkcl_ctxs_->contexts_) {
this->SetDeviceContext(platform::XPUPlace(p_ctx.first),
p_ctx.second.ctx_.get());
}
}
}
#else
ReduceOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
......
......@@ -25,7 +25,7 @@ namespace details {
namespace f = paddle::framework;
namespace p = paddle::platform;
using UseDevice = paddle::framework::details::ExecutionStrategy::UseDevice;
using DeviceType = paddle::platform::DeviceType;
// test data amount
const f::DDim kDims = {20, 20};
......@@ -198,7 +198,7 @@ struct TestReduceOpHandle {
out_selected_rows->mutable_value()->ShareDataWith(
in_selected_rows->value());
UseDevice use_device = UseDevice::kCPU;
DeviceType use_device = p::kCPU;
op_handle_->Run(use_device);
WaitAll();
......@@ -263,7 +263,7 @@ struct TestReduceOpHandle {
out_lodtensor->ShareDataWith(in_lodtensor);
UseDevice use_device = UseDevice::kCPU;
DeviceType use_device = p::kCPU;
op_handle_->Run(use_device);
WaitAll();
......
......@@ -30,6 +30,7 @@ DECLARE_double(eager_delete_tensor_gb);
namespace paddle {
namespace framework {
namespace p = paddle::platform;
static std::vector<platform::Place> CreatePlaces(size_t num, bool use_cuda) {
std::vector<platform::Place> result;
......@@ -88,8 +89,7 @@ class ReferenceCountPassTestHelper {
FLAGS_eager_delete_tensor_gb = -1;
details::ExecutionStrategy exec_strategy;
exec_strategy.use_device_ =
use_cuda ? (ExecutionStrategy::kCUDA) : (ExecutionStrategy::kCPU);
exec_strategy.use_device_ = use_cuda ? p::kCUDA : p::kCPU;
executor_.reset(new ParallelExecutor(CreatePlaces(1, use_cuda), {}, "",
&scope_, {}, exec_strategy,
......
......@@ -41,6 +41,9 @@ class FuseAllReduceOpPass : public ir::Pass {
#if defined(PADDLE_WITH_NCCL)
auto *multi_nccl_ctxs =
&Get<platform::NCCLCommunicator>(details::kNCCLCtxs);
#elif defined(PADDLE_WITH_XPU_BKCL)
auto *multi_bkcl_ctxs =
&Get<platform::BKCLCommunicator>(details::kBKCLCtxs);
#endif
ir::Graph &result = *graph;
......@@ -92,6 +95,9 @@ class FuseAllReduceOpPass : public ir::Pass {
#if defined(PADDLE_WITH_NCCL)
InsertFusedAllReduce(places, local_scopes, group_size,
group_all_reduce_ops, multi_nccl_ctxs, &result);
#elif defined(PADDLE_WITH_XPU_BKCL)
InsertFusedAllReduce(places, local_scopes, group_size,
group_all_reduce_ops, multi_bkcl_ctxs, &result);
#else
InsertFusedAllReduce(places, local_scopes, group_size,
group_all_reduce_ops, &result);
......@@ -154,6 +160,8 @@ class FuseAllReduceOpPass : public ir::Pass {
const std::vector<ir::Node *> &all_reduce_ops,
#if defined(PADDLE_WITH_NCCL)
const platform::NCCLCommunicator *multi_nccl_ctxs,
#elif defined(PADDLE_WITH_XPU_BKCL)
const platform::BKCLCommunicator *multi_bkcl_ctxs,
#endif
ir::Graph *result) const {
std::vector<details::VarHandleBase *> inputs;
......@@ -182,6 +190,9 @@ class FuseAllReduceOpPass : public ir::Pass {
#if defined(PADDLE_WITH_NCCL)
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
local_scopes, multi_nccl_ctxs, result);
#elif defined(PADDLE_WITH_XPU_BKCL)
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
local_scopes, multi_bkcl_ctxs, result);
#else
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
local_scopes, result);
......@@ -197,12 +208,18 @@ class FuseAllReduceOpPass : public ir::Pass {
const std::vector<Scope *> &local_scopes,
#if defined(PADDLE_WITH_NCCL)
const platform::NCCLCommunicator *multi_nccl_ctxs,
#elif defined(PADDLE_WITH_XPU_BKCL)
const platform::BKCLCommunicator *multi_bkcl_ctxs,
#endif
ir::Graph *result) const {
#if defined(PADDLE_WITH_NCCL)
auto *op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, multi_nccl_ctxs);
#elif defined(PADDLE_WITH_XPU_BKCL)
auto *op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
local_scopes, places, num_of_all_reduce, multi_bkcl_ctxs);
#else
auto *op_handle = new details::FusedAllReduceOpHandle(
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
......@@ -221,6 +238,10 @@ class FuseAllReduceOpPass : public ir::Pass {
if (!multi_nccl_ctxs) {
SetCommunicationContext(places, op_handle);
}
#elif defined(PADDLE_WITH_XPU_BKCL)
if (!multi_bkcl_ctxs) {
SetCommunicationContext(places, op_handle);
}
#else
SetCommunicationContext(places, op_handle);
#endif
......
......@@ -162,6 +162,12 @@ void MultiDevSSAGraphBuilderBase::Init() const {
if (multi_nccl_ctxs_) {
nccl_ctxs_ = multi_nccl_ctxs_->DefaultFlatCtx();
}
#elif defined(PADDLE_WITH_XPU_BKCL)
multi_bkcl_ctxs_ = &Get<platform::BKCLCommunicator>(details::kBKCLCtxs);
bkcl_ctxs_ = nullptr;
if (multi_bkcl_ctxs_) {
bkcl_ctxs_ = multi_bkcl_ctxs_->DefaultFlatCtx();
}
#endif
PADDLE_ENFORCE_EQ(
places_.size(), local_scopes_.size(),
......@@ -371,6 +377,11 @@ void MultiDevSSAGraphBuilderBase::SetCommunicationContext(
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
}
#elif defined(PADDLE_WITH_XPU_BKCL)
if (bkcl_ctxs_ == nullptr) {
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
}
#else
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
......@@ -384,6 +395,10 @@ void MultiDevSSAGraphBuilderBase::CreateBroadcastOp(ir::Graph *result,
auto *op_handle = new details::BroadcastOpHandle(
result->CreateEmptyNode("broadcast", ir::Node::Type::kOperation),
local_scopes_, places_, nccl_ctxs_);
#elif defined(PADDLE_WITH_XPU_BKCL)
auto *op_handle = new details::BroadcastOpHandle(
result->CreateEmptyNode("broadcast", ir::Node::Type::kOperation),
local_scopes_, places_, bkcl_ctxs_);
#else
auto *op_handle = new details::BroadcastOpHandle(
result->CreateEmptyNode("broadcast", ir::Node::Type::kOperation),
......@@ -417,6 +432,10 @@ void MultiDevSSAGraphBuilderBase::CreateFusedBroadcastOp(
auto *op_handle = new details::FusedBroadcastOpHandle(
result->CreateEmptyNode("fused_broadcast", ir::Node::Type::kOperation),
local_scopes_, places_, nccl_ctxs_);
#elif defined(PADDLE_WITH_XPU_BKCL)
auto *op_handle = new details::FusedBroadcastOpHandle(
result->CreateEmptyNode("fused_broadcast", ir::Node::Type::kOperation),
local_scopes_, places_, bkcl_ctxs_);
#else
auto *op_handle = new details::FusedBroadcastOpHandle(
result->CreateEmptyNode("fused_broadcast", ir::Node::Type::kOperation),
......@@ -487,6 +506,11 @@ void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(ir::Graph *result,
new details::AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, multi_nccl_ctxs_));
#elif defined(PADDLE_WITH_XPU_BKCL)
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
scopes, places, multi_bkcl_ctxs_));
#else
result->Get<GraphOps>(kGraphOps).emplace_back(
new details::AllReduceOpHandle(
......@@ -565,6 +589,10 @@ details::VarHandle *MultiDevSSAGraphBuilderBase::CreateReduceOp(
result->Get<GraphOps>(kGraphOps).emplace_back(new details::ReduceOpHandle(
result->CreateEmptyNode("reduce", ir::Node::Type::kOperation),
local_scopes_, places_, nccl_ctxs_));
#elif defined(PADDLE_WITH_XPU_BKCL)
result->Get<GraphOps>(kGraphOps).emplace_back(new details::ReduceOpHandle(
result->CreateEmptyNode("reduce", ir::Node::Type::kOperation),
local_scopes_, places_, bkcl_ctxs_));
#else
result->Get<GraphOps>(kGraphOps).emplace_back(new details::ReduceOpHandle(
result->CreateEmptyNode("reduce", ir::Node::Type::kOperation),
......
......@@ -41,6 +41,8 @@ namespace paddle {
namespace platform {
class NCCLContextMap;
class NCCLCommunicator;
class BKCLContextMap;
class BKCLCommunicator;
}
namespace framework {
......@@ -114,6 +116,9 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
#if defined(PADDLE_WITH_NCCL)
mutable platform::NCCLContextMap *nccl_ctxs_{nullptr};
mutable platform::NCCLCommunicator *multi_nccl_ctxs_{nullptr};
#elif defined(PADDLE_WITH_XPU_BKCL)
mutable platform::BKCLContextMap *bkcl_ctxs_{nullptr};
mutable platform::BKCLCommunicator *multi_bkcl_ctxs_{nullptr};
#endif
mutable std::string loss_var_name_;
......
......@@ -63,8 +63,6 @@ static bool gProfileStarted = false;
std::once_flag p2p_init_flag;
#endif
using UseDevice = paddle::framework::details::ExecutionStrategy::UseDevice;
class ParallelExecutorPrivate {
public:
ParallelExecutorPrivate(const std::vector<platform::Place> &places,
......@@ -95,7 +93,7 @@ class ParallelExecutorPrivate {
}
}
bool IsUseCUDA(UseDevice use_device);
bool IsUseCUDA(DeviceType use_device);
void SetHasFeed(size_t dev_idx, bool has_feed = true);
......@@ -272,6 +270,90 @@ class ParallelExecutorPrivate {
}
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
void InitBKCLCtxs(framework::Scope *scope, const BuildStrategy &bst) {
VLOG(1) << "bkcl comm num:" << bst.bkcl_comm_num_ << ", nranks:" << nranks_
<< ", num_trainers:" << bst.num_trainers_
<< ", trainer_id:" << bst.trainer_id_;
PADDLE_ENFORCE_EQ(bst.use_hierarchical_allreduce_, false,
platform::errors::Unimplemented(
"xpu doesn't support use_hierarchical_allreduce"));
std::vector<BKCLUniqueId *> flat_bkcl_ids;
if (nranks_ == 1) {
// FIXME(gongwb): need not to create bkclid when nranks==1
bkcl_ctxs_->InitFlatCtxs(places_, flat_bkcl_ids, bst.num_trainers_,
bst.trainer_id_);
return;
}
if (bst.enable_parallel_graph_) {
VLOG(1) << "use only one bkclid in pg model";
BKCLUniqueId *bkcl_id = nullptr;
std::string var_name = platform::GetFlatBKCLVarName(0);
auto bkcl_id_var = scope->FindVar(var_name);
std::unique_ptr<BKCLUniqueId> id(new BKCLUniqueId());
if (bkcl_id_var) {
bkcl_id = bkcl_id_var->GetMutable<BKCLUniqueId>();
} else {
PADDLE_ENFORCE_EQ(
bkcl_get_unique_id(id.get()), BKCL_SUCCESS,
platform::errors::Unavailable("bkcl get unique id failed"));
bkcl_id = id.get();
}
flat_bkcl_ids.push_back(bkcl_id);
bkcl_ctxs_->InitFlatCtxs(places_, flat_bkcl_ids, bst.num_trainers_,
bst.trainer_id_);
VLOG(1) << "init bst bkcl context complete!";
return;
}
// num_trainers ==1 && places > 1
if (bst.num_trainers_ == 1) {
bkcl_ctxs_->InitFlatCtxs(places_, flat_bkcl_ids, bst.num_trainers_,
bst.trainer_id_);
return;
}
for (int i = 0; i < static_cast<int>(bst.bkcl_comm_num_); i++) {
std::string var_name = platform::GetFlatBKCLVarName(i);
auto bkcl_id_var = scope->FindVar(var_name);
PADDLE_ENFORCE_NOT_NULL(
bkcl_id_var,
platform::errors::NotFound("can't find %s bkcl_id_var", var_name));
auto bkcl_id = bkcl_id_var->GetMutable<BKCLUniqueId>();
flat_bkcl_ids.push_back(bkcl_id);
}
bkcl_ctxs_->InitFlatCtxs(places_, flat_bkcl_ids, bst.num_trainers_,
bst.trainer_id_);
}
void InitOrGetBKCLCommunicator(framework::Scope *scope,
const BuildStrategy &bst) {
const std::string var_name = "BKCLCommunicator";
auto var = scope->FindVar(var_name);
if (var != nullptr) {
PADDLE_ENFORCE_EQ(var->IsInitialized(), true,
platform::errors::PreconditionNotMet(
"if %s exists, it must be initialized", var_name));
VLOG(1) << "find " << var_name
<< " in scope, so use it and does not recreate!";
bkcl_ctxs_ = var->GetMutable<platform::BKCLCommunicator>();
return;
}
VLOG(1) << "not find " << var_name << " in scope, so recreate it!";
bkcl_ctxs_ = scope->Var(var_name)->GetMutable<platform::BKCLCommunicator>();
InitBKCLCtxs(scope, bst);
}
#endif
inline bool IsPersistable(const std::string &name) const {
auto iter = is_persistable_.find(name);
return iter != is_persistable_.end() && iter->second;
......@@ -288,9 +370,11 @@ class ParallelExecutorPrivate {
#if defined(PADDLE_WITH_NCCL)
platform::NCCLCommunicator *nccl_ctxs_{nullptr};
#elif defined(PADDLE_WITH_XPU_BKCL)
platform::BKCLCommunicator *bkcl_ctxs_{nullptr};
#endif
bool own_local_scope_;
UseDevice use_device_;
DeviceType use_device_;
bool use_all_reduce_;
size_t nranks_;
......@@ -300,8 +384,8 @@ class ParallelExecutorPrivate {
details::ParallelSSAGraphExecutor *inference_executor_{nullptr};
};
bool ParallelExecutorPrivate::IsUseCUDA(UseDevice use_device) {
return use_device == UseDevice::kCUDA;
bool ParallelExecutorPrivate::IsUseCUDA(DeviceType use_device) {
return use_device == p::kCUDA;
}
void ParallelExecutorPrivate::SetHasFeed(size_t dev_idx, bool has_feed) {
......@@ -348,7 +432,7 @@ ir::Graph *ParallelExecutorPrivate::ApplyMemoryOptimizePass(ir::Graph *graph) {
auto addto_pass = ir::PassRegistry::Instance().Get("inplace_addto_op_pass");
addto_pass->SetNotOwned(ir::kMemOptVarInfoMapList, &mem_opt_var_infos_);
addto_pass->SetNotOwned(ir::kLastLiveOpsOfVars, &last_live_ops_of_vars);
addto_pass->Set(ir::kUseCuda, new bool(use_device_ == UseDevice::kCUDA));
addto_pass->Set(ir::kUseCuda, new bool(use_device_ == p::kCUDA));
VLOG(10) << "Start to apply inplace_addto_op_pass";
graph = addto_pass->Apply(graph);
VLOG(10) << "inplace_addto_op_pass Applied";
......@@ -359,7 +443,7 @@ ir::Graph *ParallelExecutorPrivate::ApplyMemoryOptimizePass(ir::Graph *graph) {
ir::PassRegistry::Instance().Get("buffer_shared_inplace_pass");
inplace_pass->SetNotOwned(ir::kMemOptVarInfoMapList, &mem_opt_var_infos_);
inplace_pass->SetNotOwned(ir::kLastLiveOpsOfVars, &last_live_ops_of_vars);
inplace_pass->Set(ir::kUseCuda, new bool(use_device_ == UseDevice::kCUDA));
inplace_pass->Set(ir::kUseCuda, new bool(use_device_ == p::kCUDA));
VLOG(10) << "Start to apply buffer_shared_inplace_pass";
graph = inplace_pass->Apply(graph);
VLOG(10) << "buffer_shared_inplace_pass Applied";
......@@ -375,7 +459,7 @@ ir::Graph *ParallelExecutorPrivate::ApplyMemoryOptimizePass(ir::Graph *graph) {
cross_op_memory_reuse_pass->SetNotOwned(ir::kLastLiveOpsOfVars,
&last_live_ops_of_vars);
cross_op_memory_reuse_pass->Set(ir::kUseCuda,
new bool(use_device_ == UseDevice::kCUDA));
new bool(use_device_ == p::kCUDA));
VLOG(10) << "Start to apply buffer_shared_cross_op_memory_reuse_pass";
graph = cross_op_memory_reuse_pass->Apply(graph);
VLOG(10) << "buffer_shared_cross_op_memory_reuse_pass Applied";
......@@ -564,9 +648,9 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
#endif
std::string device_name;
if (member_->use_device_ == UseDevice::kCPU) {
if (member_->use_device_ == p::kCPU) {
device_name = "CPU";
} else if (member_->use_device_ == UseDevice::kCUDA) {
} else if (member_->use_device_ == p::kCUDA) {
device_name = "CUDA";
} else {
device_name = "XPU";
......@@ -642,6 +726,27 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
auto &nccl_ctx = nccl_ctxs->at(member_->places_[dev_id]);
dev_ctx->set_nccl_comm(nccl_ctx.comm());
}
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with CUDA."));
#endif
}
if (member_->use_device_ == p::kXPU && member_->nranks_ > 1) {
#if defined(PADDLE_WITH_XPU_BKCL)
member_->InitOrGetBKCLCommunicator(scope, member_->build_strategy_);
auto *bkcl_ctxs =
member_->bkcl_ctxs_->GetSyncBatchNormCtx(scope, member_->places_);
auto &pool = platform::DeviceContextPool::Instance();
for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) {
auto *dev_ctx = static_cast<platform::XPUDeviceContext *>(
pool.Get(member_->places_[dev_id]));
auto &bkcl_ctx = bkcl_ctxs->at(member_->places_[dev_id]);
dev_ctx->set_bkcl_context(bkcl_ctx.comm());
}
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with XPU."));
#endif
}
// broadcast parameters from the 0th device to others:
......@@ -671,39 +776,55 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1,
member_->IsUseCUDA(member_->use_device_), member_->nccl_ctxs_);
{member_->local_scopes_[0]}, 1, member_->use_device_,
member_->nccl_ctxs_);
for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1,
member_->IsUseCUDA(member_->use_device_), member_->nccl_ctxs_);
{member_->local_scopes_[i]}, 1, member_->use_device_,
member_->nccl_ctxs_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->IsUseCUDA(member_->use_device_),
member_->nccl_ctxs_);
member_->nranks_, member_->use_device_, member_->nccl_ctxs_);
}
#elif defined(PADDLE_WITH_XPU_BKCL)
if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1, member_->use_device_,
member_->bkcl_ctxs_);
for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, member_->use_device_,
member_->bkcl_ctxs_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_device_, member_->bkcl_ctxs_);
}
#else
if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1,
member_->IsUseCUDA(member_->use_device_));
{member_->local_scopes_[0]}, 1, member_->use_device_);
for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1,
member_->IsUseCUDA(member_->use_device_));
{member_->local_scopes_[i]}, 1, member_->use_device_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->IsUseCUDA(member_->use_device_));
member_->nranks_, member_->use_device_);
}
#endif
......@@ -847,6 +968,9 @@ void ParallelExecutor::BCastParamsToDevices(
continue;
}
auto &dims = main_tensor.dims();
VLOG(1) << "bcast var=" << var;
if (paddle::platform::is_gpu_place(main_tensor.place())) {
#if defined(PADDLE_WITH_NCCL)
std::vector<void *> buffers;
......@@ -883,6 +1007,58 @@ void ParallelExecutor::BCastParamsToDevices(
}
nccl_ctxs->WaitAll();
}
#endif
} else if (paddle::platform::is_xpu_place(main_tensor.place())) {
#if defined(PADDLE_WITH_XPU_BKCL)
std::vector<void *> buffers;
buffers.reserve(member_->places_.size());
size_t numel = main_tensor.numel();
BKCLDataType data_type = BKCL_FLOAT;
// BKCLDataType data_type = platform::ToBKCLDataType(main_tensor.type());
for (size_t i = 0; i < member_->places_.size(); ++i) {
auto place = member_->places_[i];
void *buffer;
if (i == 0 && trainer_id == 0) {
buffer = const_cast<void *>(main_tensor.data<void>());
} else {
auto local_scope = member_->local_scopes_[i];
auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
t->Resize(dims);
buffer = t->mutable_data(place, main_tensor.type());
}
buffers.push_back(buffer);
}
PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(),
platform::errors::PreconditionNotMet(
"variables' buffer size to bcast is %d, which is "
"NOT equal to places size %d",
buffers.size(), member_->places_.size()));
{
auto *bkcl_ctxs = member_->bkcl_ctxs_->DefaultFlatCtx();
PADDLE_ENFORCE_EQ(
bkcl_group_start(), BKCL_SUCCESS,
platform::errors::Unavailable("bkcl_group_start failed"));
for (size_t i = 0; i < member_->places_.size(); ++i) {
auto &bkcl_ctx = bkcl_ctxs->at(member_->places_[i]);
if (main_tensor.type() == framework::proto::VarType::INT64) {
numel *= 2;
}
PADDLE_ENFORCE_EQ(
bkcl_broadcast(bkcl_ctx.comm(), buffers[i], buffers[i], numel,
data_type, 0, NULL),
BKCL_SUCCESS,
platform::errors::Unavailable("bkcl_broadcast failed"));
}
PADDLE_ENFORCE_EQ(
bkcl_group_end(), BKCL_SUCCESS,
platform::errors::Unavailable("bkcl_group_end failed"));
}
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with BKCL."));
#endif
} else {
platform::CPUPlace cpu;
......
......@@ -43,6 +43,8 @@ class ParallelExecutorPrivate;
using details::BuildStrategy;
using details::ExecutionStrategy;
namespace p = paddle::platform;
using DeviceType = paddle::platform::DeviceType;
class ParallelExecutor {
DISABLE_COPY_AND_ASSIGN(ParallelExecutor);
......
......@@ -30,6 +30,10 @@
#include "paddle/fluid/operators/cudnn_rnn_cache.h"
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
namespace paddle {
namespace framework {
......
......@@ -31,6 +31,10 @@
#endif
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
#include "xpu/bkcl.h"
#endif
// Users should add forward declarations here
namespace paddle {
......@@ -41,6 +45,10 @@ class Communicator;
class NCCLCommunicator;
#endif
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
class BKCLCommunicator;
#endif
} // namespace platform
namespace framework {
......@@ -148,6 +156,9 @@ using VarTypeRegistry = detail::VarTypeRegistryImpl<
ncclUniqueId, platform::Communicator, platform::NCCLCommunicator,
#endif
operators::CudnnRNNCache,
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
BKCLUniqueId, platform::BKCLCommunicator,
#endif
int, float>;
......
......@@ -31,6 +31,9 @@
#include "paddle/fluid/operators/conv_cudnn_op_cache.h"
#include "paddle/fluid/operators/cudnn_rnn_cache.h"
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
#include "paddle/fluid/platform/bkcl_helper.h"
#endif
namespace paddle {
namespace framework {
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _WIN32
#if defined(PADDLE_WITH_XPU_BKCL)
#pragma once
#include <stdio.h>
#include <memory>
#include <string>
#include <thread> // NOLINT
#include <typeindex>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/float16.h"
#include "paddle/fluid/platform/place.h"
#include "xpu/bkcl.h"
#include "xpu/runtime.h"
#define BKCL_ID_VARNAME "BKCLID"
namespace paddle {
namespace platform {
inline BKCLDataType ToBKCLDataType(framework::proto::VarType::Type type) {
if (type == framework::proto::VarType::FP32) {
return BKCL_FLOAT;
} else {
PADDLE_THROW(
platform::errors::Unimplemented("BKCL currently only support FP32, "
"other data types are not supported."));
}
}
struct BKCLContext {
std::unique_ptr<platform::XPUDeviceContext> ctx_;
BKCLContext_t comm_;
explicit BKCLContext(int dev_id)
: ctx_(new platform::XPUDeviceContext(XPUPlace(dev_id))),
comm_{nullptr} {}
BKCLContext_t comm() const { return comm_; }
int device_id() const {
return BOOST_GET_CONST(platform::XPUPlace, ctx_->GetPlace()).device;
}
};
struct InitBKCLPara {
BKCLUniqueId *bkcl_id;
int rank;
int nranks;
int dev_id;
BKCLContext_t *ctx;
};
static void *init_bkcl_context_func(void *args) {
struct InitBKCLPara *para = (struct InitBKCLPara *)args;
PADDLE_ENFORCE_EQ(xpu_set_device(para->dev_id), XPU_SUCCESS,
platform::errors::PreconditionNotMet(
"xpu_set_device failed[%d]", para->dev_id));
PADDLE_ENFORCE_EQ(
bkcl_init_rank(para->ctx, para->rank, para->nranks, para->bkcl_id),
BKCL_SUCCESS,
platform::errors::PreconditionNotMet("bkcl_init_rank failed"));
return nullptr;
}
struct BKCLContextMap {
std::unordered_map<int, BKCLContext> contexts_;
std::vector<int> order_;
std::vector<platform::Place> places_;
size_t num_trainers_;
size_t trainer_id_;
BKCLUniqueId *bkcl_id_;
explicit BKCLContextMap(const std::vector<platform::Place> &places,
BKCLUniqueId *bkcl_id = nullptr,
size_t num_trainers = 1, size_t trainer_id = 0) {
places_ = places;
bkcl_id_ = bkcl_id;
num_trainers_ = num_trainers;
trainer_id_ = trainer_id;
}
// Synchronization is required and can only be initialized with
// multithreading.
int init() {
PADDLE_ENFORCE_EQ(!places_.empty(), true,
platform::errors::InvalidArgument(
"The BKCL place should not be empty."));
order_.reserve(places_.size());
for (auto &p : places_) {
int dev_id = BOOST_GET_CONST(platform::XPUPlace, p).device;
order_.emplace_back(dev_id);
contexts_.emplace(dev_id, BKCLContext(dev_id));
}
PADDLE_ENFORCE_EQ(
order_.size(), contexts_.size(),
platform::errors::Unavailable("BKCL Context Map does not support "
"contain two or more same device"));
std::unique_ptr<BKCLContext_t[]> comms(new BKCLContext_t[order_.size()]);
std::unique_ptr<InitBKCLPara[]> paras(new InitBKCLPara[order_.size()]);
std::unique_ptr<pthread_t[]> pids(new pthread_t[order_.size()]);
BKCLResult_t ret;
BKCLUniqueId id;
// if num_trainers == 1, should create a new bkcl id for local comms.
if (num_trainers_ == 1 && bkcl_id_ == nullptr) {
ret = bkcl_get_unique_id(&id);
PADDLE_ENFORCE_EQ(BKCL_SUCCESS, ret,
platform::errors::PreconditionNotMet(
"bkcl get unique id failed [%d]", ret));
bkcl_id_ = &id;
}
PADDLE_ENFORCE_NOT_NULL(bkcl_id_, platform::errors::InvalidArgument(
"The BKCL id should not be null."));
{
int nranks = num_trainers_ * order_.size();
for (size_t i = 0; i < order_.size(); ++i) {
int rank;
if (order_.size() > 1) {
rank = trainer_id_ * order_.size() + i;
} else {
rank = trainer_id_;
}
VLOG(1) << "init bkcl rank:" << rank << ", nranks:" << nranks
<< ", xpu_id:" << order_[i];
paras[i].rank = rank;
paras[i].nranks = nranks;
paras[i].dev_id = order_[i];
paras[i].bkcl_id = bkcl_id_;
paras[i].ctx = &comms[i];
PADDLE_ENFORCE_EQ(
pthread_create(&pids[i], nullptr, init_bkcl_context_func,
reinterpret_cast<void *>(&paras[i])),
0, platform::errors::External("pthread_create failed"));
}
for (size_t i = 0; i < order_.size(); i++) {
pthread_join(pids[i], nullptr);
}
}
int i = 0;
for (auto &dev_id : order_) {
contexts_.at(dev_id).comm_ = comms[i++];
}
return 0;
}
BKCLContextMap(const BKCLContextMap &other) = delete;
BKCLContextMap &operator=(const BKCLContextMap &other) = delete;
XPUDeviceContext *DevCtx(int dev_id) const { return at(dev_id).ctx_.get(); }
XPUDeviceContext *DevCtx(platform::Place p) const {
return DevCtx(BOOST_GET_CONST(platform::XPUPlace, p).device);
}
const BKCLContext &at(platform::Place p) const {
return this->at(BOOST_GET_CONST(platform::XPUPlace, p).device);
}
const BKCLContext &at(int dev_id) const { return contexts_.at(dev_id); }
void WaitAll() {
for (auto &p : contexts_) {
p.second.ctx_->Wait();
}
}
};
inline std::string GetFlatBKCLVarName(size_t pos) {
if (pos == 0) {
return BKCL_ID_VARNAME;
}
return string::Sprintf("%s_%d", BKCL_ID_VARNAME, static_cast<int>(pos));
}
class BKCLCommunicator {
public:
BKCLCommunicator() {}
virtual ~BKCLCommunicator() {}
BKCLContextMap *DefaultFlatCtx() const {
if (flat_ctxs_.size() == 0) {
return nullptr;
}
return flat_ctxs_[0].get();
}
std::vector<std::unique_ptr<BKCLContextMap>> *GetFlatCtxs() {
return &flat_ctxs_;
}
BKCLContextMap *GetFlatCtx(size_t run_order) const {
return flat_ctxs_[run_order % flat_ctxs_.size()].get();
}
BKCLContextMap *GetRunEnvBKCLCtx(size_t run_order,
bool use_hierarchical_allreduce) const {
PADDLE_ENFORCE_EQ(use_hierarchical_allreduce, false,
platform::errors::Unimplemented(
"Hierarchical all reduce is not support for XPU"));
return GetFlatCtx(run_order);
}
/*
*It meets error when allreduce ophandle and sync_batch_norm_op use
*bkcl_all_reduce
*parallelly. So create a new bkcl comm for sync_batch_norm_op. And these
*codes should be polished with a unified bkcl management.
*/
BKCLContextMap *GetSyncBatchNormCtx(
framework::Scope *scope, const std::vector<platform::Place> &places) {
auto *bkcl_id_var = scope->FindVar(BKCL_ID_VARNAME);
if (bkcl_id_var != nullptr) {
return DefaultFlatCtx();
}
if (sync_batch_norm_ctx_.get() == nullptr) {
sync_batch_norm_ctx_.reset(new BKCLContextMap(places));
sync_batch_norm_ctx_->init();
}
return sync_batch_norm_ctx_.get();
}
void InitFlatCtxs(const std::vector<platform::Place> &places,
const std::vector<BKCLUniqueId *> &bkcl_ids,
size_t trainers_num, size_t trainer_id) {
if (bkcl_ids.size() == 0) {
auto ptr = new platform::BKCLContextMap(places);
ptr->init();
VLOG(1) << "init local trainer";
flat_ctxs_.emplace_back(ptr);
return;
}
PADDLE_ENFORCE_EQ(bkcl_ids.size(), 1,
platform::errors::Unimplemented(
"Multi-all-reduce-ring is not support for XPU"));
for (size_t i = 0; i < bkcl_ids.size(); i++) {
auto ptr = new platform::BKCLContextMap(places, bkcl_ids[i], trainers_num,
trainer_id);
ptr->init();
VLOG(1) << "init trainer_id:" << trainer_id << ", comm no:" << i;
flat_ctxs_.emplace_back(ptr);
}
}
protected:
// Support multi bkcl comm on default bkcl ring while BKCLContextMap can't.
std::vector<std::unique_ptr<BKCLContextMap>> flat_ctxs_;
// just used for sync_batch_norm op.
std::unique_ptr<BKCLContextMap> sync_batch_norm_ctx_;
};
} // namespace platform
} // namespace paddle
#endif // PADDLE_WITH_XPU_BKCL
#endif
......@@ -30,6 +30,10 @@ limitations under the License. */
#include "paddle/fluid/platform/gpu_info.h"
#endif
#if defined(PADDLE_WITH_XPU_BKCL)
#include "xpu/bkcl.h"
#endif
#ifdef PADDLE_WITH_MKLDNN
#include "mkldnn.hpp"
#include "paddle/fluid/framework/data_layout.h"
......@@ -52,6 +56,7 @@ struct GpuDevice;
#ifdef PADDLE_WITH_XPU
#include "paddle/fluid/platform/xpu_header.h"
#include "paddle/fluid/platform/xpu_info.h"
#endif
namespace paddle {
......@@ -64,6 +69,16 @@ void SetAllowTF32Cublas(bool active);
bool AllowTF32Cublas();
#endif // PADDLE_WITH_CUDA
enum DeviceType {
CPU = 0,
CUDA = 1,
XPU = 2,
};
constexpr DeviceType kCPU = DeviceType::CPU;
constexpr DeviceType kCUDA = DeviceType::CUDA;
constexpr DeviceType kXPU = DeviceType::XPU;
class DeviceContext {
public:
virtual ~DeviceContext() PADDLE_MAY_THROW {}
......@@ -107,9 +122,20 @@ class XPUDeviceContext : public DeviceContext {
/*! \brief Wait for all operations completion in the stream. */
void Wait() const override;
#ifdef PADDLE_WITH_XPU_BKCL
/*! \brief Return nccl context. */
BKCLContext_t bkcl_context() const { return bkcl_context_; }
/*! \brief Set bkcl context. */
void set_bkcl_context(BKCLContext_t context) { bkcl_context_ = context; }
#endif
private:
XPUPlace place_;
xpu::Context* context_;
#ifdef PADDLE_WITH_XPU_BKCL
BKCLContext_t bkcl_context_;
#endif
// Need to be the same with other DeviceContext,
// Eventhough eigen_device_ is not used in XPU
......@@ -552,8 +578,8 @@ class MKLDNNDeviceContext : public CPUDeviceContext {
const std::string& GetKeySuffix(void) const { return key_suffix_; }
// Disable adding thread ID to the key
void DisableThreadInfoInKey(void) { key_attach_thread_id_ = false; };
bool IsThreadIdUsedInKey(void) const { return key_attach_thread_id_; };
void DisableThreadInfoInKey(void) { key_attach_thread_id_ = false; }
bool IsThreadIdUsedInKey(void) const { return key_attach_thread_id_; }
// Prevent next ResetBlobMap()
void BlockNextCacheClearing();
......
......@@ -1308,6 +1308,7 @@ All parameter, weight, gradient are variables in Paddle.
"The module will return special predefined variable name in Paddle")
.def("empty", []() { return kEmptyVarName; })
.def("temp", []() { return kTempVarName; });
// clang-format off
py::class_<paddle::platform::DeviceContext>(m, "DeviceContext")
.def_static("create",
......@@ -2080,10 +2081,10 @@ All parameter, weight, gradient are variables in Paddle.
exec_strategy=exec_strategy)
)DOC");
py::enum_<ExecutionStrategy::UseDevice>(exec_strategy, "UseDevice")
.value("CPU", ExecutionStrategy::UseDevice::kCPU)
.value("CUDA", ExecutionStrategy::UseDevice::kCUDA)
.value("XPU", ExecutionStrategy::UseDevice::kXPU);
py::enum_<paddle::platform::DeviceType>(m, "DeviceType", py::arithmetic())
.value("CPU", paddle::platform::DeviceType::CPU)
.value("CUDA", paddle::platform::DeviceType::CUDA)
.value("XPU", paddle::platform::DeviceType::XPU);
exec_strategy.def(py::init())
.def_property(
......@@ -2117,7 +2118,7 @@ All parameter, weight, gradient are variables in Paddle.
.def_property(
"_use_device",
[](const ExecutionStrategy &self) { return self.use_device_; },
[](ExecutionStrategy &self, ExecutionStrategy::UseDevice use_device) {
[](ExecutionStrategy &self, paddle::platform::DeviceType use_device) {
self.use_device_ = use_device;
}) // NOTE(liuyuhui): Doesn't add doc for 'use_device', because
// use_device isn‘t exposed to users.
......
......@@ -28,6 +28,7 @@ ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy
BuildStrategy = core.ParallelExecutor.BuildStrategy
InferNativeConfig = core.NativeConfig
InferAnalysisConfig = core.AnalysisConfig
DeviceType = core.DeviceType
def _place_obj(place):
......@@ -345,17 +346,17 @@ class CompiledProgram(object):
self._exec_strategy._use_device = use_device
if self._exec_strategy.num_threads == 0:
if self._exec_strategy._use_device == ExecutionStrategy.UseDevice.CUDA:
if self._exec_strategy._use_device == DeviceType.CUDA:
# Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future.
self._exec_strategy.num_threads = len(places) * 4
elif self._exec_strategy._use_device == ExecutionStrategy.UseDevice.XPU:
elif self._exec_strategy._use_device == DeviceType.XPU:
# Currently only single thread is supported in Kunlun XPU.
self._exec_strategy.num_threads = 1
else:
self._exec_strategy.num_threads = len(places) * 2
if self._exec_strategy._use_device == ExecutionStrategy.UseDevice.XPU:
if self._exec_strategy._use_device == DeviceType.XPU:
assert self._exec_strategy.num_threads == 1, \
"Currently only single thread is supported in Kunlun XPU."
......@@ -384,7 +385,7 @@ class CompiledProgram(object):
self._build_strategy.enable_sequential_execution = True
if self._program is not None and self._program._enable_dgc:
assert self._exec_strategy._use_device == ExecutionStrategy.UseDevice.CUDA, "DGC only used under CUDA environment."
assert self._exec_strategy._use_device == DeviceType.CUDA, "DGC only used under CUDA environment."
assert self._build_strategy.num_trainers * len(
places) > 1, "DGC is not avaliable for single card training."
assert self._build_strategy.reduce_strategy == BuildStrategy.ReduceStrategy.AllReduce, "DGC \
......@@ -455,11 +456,11 @@ class CompiledProgram(object):
"If optimizer is used in control flow, "
"training on multi-places is not supported now.")
if isinstance(self._place, core.CUDAPlace):
use_device = ExecutionStrategy.UseDevice.CUDA
use_device = DeviceType.CUDA
elif isinstance(self._place, core.XPUPlace):
use_device = ExecutionStrategy.UseDevice.XPU
use_device = DeviceType.XPU
else:
use_device = ExecutionStrategy.UseDevice.CPU
use_device = DeviceType.CPU
self._executor = self._compile_data_parallel(
use_device=use_device, scope=self._scope, places=self._places)
return self
......
......@@ -462,6 +462,7 @@ def xpu_places(device_ids=None):
list of paddle.XPUPlace: Created XPU place list.
Examples:
.. code-block:: python
import paddle
import paddle.static as static
......
......@@ -30,11 +30,17 @@ from feed_data_reader import FeedDataReader
__all__ = ['TestParallelExecutorBase']
class DeviceType:
CPU = 1
GPU = 2
XPU = 3
class TestParallelExecutorBase(unittest.TestCase):
@classmethod
def check_network_convergence(cls,
method,
use_cuda=True,
use_device=DeviceType.GPU,
iter=5,
batch_size=None,
feed_dict=None,
......@@ -74,7 +80,9 @@ class TestParallelExecutorBase(unittest.TestCase):
feed_dict, loss = cls.build_model(feed_dict, get_data_from_feeder,
main, method, optimizer)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
place = fluid.CUDAPlace(
0) if use_device == DeviceType.GPU else fluid.XPUPlace(
0) if use_device == DeviceType.XPU else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
......@@ -82,7 +90,7 @@ class TestParallelExecutorBase(unittest.TestCase):
enable_inplace, enable_sequential_execution, fuse_all_optimizer_ops,
fuse_all_reduce_ops, fuse_elewise_add_act_ops,
fuse_relu_depthwise_conv, use_fast_executor, use_ir_memory_optimize,
use_reduce, use_cuda)
use_reduce, use_device)
if use_parallel_executor:
binary = compiler.CompiledProgram(main).with_data_parallel(
......@@ -94,7 +102,8 @@ class TestParallelExecutorBase(unittest.TestCase):
if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count(
) if use_cuda else int(
) if use_device == DeviceType.GPU else fluid.core.get_xpu_device_count(
) if use_device == DeviceType.XPU else int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
begin = time.time()
......@@ -123,7 +132,7 @@ class TestParallelExecutorBase(unittest.TestCase):
@classmethod
def check_pass_conflict(cls,
method,
use_cuda=True,
use_device=DeviceType.GPU,
feed_dict=None,
get_data_from_feeder=None,
use_reduce=False,
......@@ -143,7 +152,9 @@ class TestParallelExecutorBase(unittest.TestCase):
feed_dict, loss = cls.build_model(feed_dict, get_data_from_feeder,
main, method, optimizer)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
place = fluid.CUDAPlace(
0) if use_device == DeviceType.GPU else fluid.XPUPlace(
0) if use_device == DeviceType.XPU else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
......@@ -151,7 +162,7 @@ class TestParallelExecutorBase(unittest.TestCase):
enable_inplace, enable_sequential_execution, fuse_all_optimizer_ops,
fuse_all_reduce_ops, fuse_elewise_add_act_ops,
fuse_relu_depthwise_conv, use_fast_executor, use_ir_memory_optimize,
use_reduce, use_cuda)
use_reduce, use_device)
binary = compiler.CompiledProgram(main).with_data_parallel(
loss_name=loss.name,
......@@ -165,7 +176,7 @@ class TestParallelExecutorBase(unittest.TestCase):
fuse_all_optimizer_ops, fuse_all_reduce_ops,
fuse_elewise_add_act_ops, fuse_relu_depthwise_conv,
use_fast_executor, use_ir_memory_optimize, use_reduce,
use_cuda):
use_device):
exec_strategy = fluid.ExecutionStrategy()
if use_fast_executor:
exec_strategy.use_experimental_executor = True
......@@ -180,8 +191,17 @@ class TestParallelExecutorBase(unittest.TestCase):
build_strategy.enable_inplace = enable_inplace
build_strategy.enable_sequential_execution = enable_sequential_execution
if use_cuda and core.is_compiled_with_cuda():
if use_device == DeviceType.GPU and core.is_compiled_with_cuda():
build_strategy.remove_unnecessary_lock = True
if use_device == DeviceType.XPU and core.is_compiled_with_xpu():
build_strategy.fuse_elewise_add_act_ops = False
build_strategy.fuse_relu_depthwise_conv = False
build_strategy.fuse_all_optimizer_ops = False
build_strategy.fuse_all_reduce_ops = False
build_strategy.memory_optimize = False
build_strategy.enable_inplace = False
build_strategy.enable_sequential_execution = False
return build_strategy, exec_strategy
@classmethod
......
......@@ -19,6 +19,7 @@ fluid.core._set_eager_deletion_mode(-1, -1, False)
import paddle.fluid.layers.ops as ops
from paddle.fluid.layers.learning_rate_scheduler import cosine_decay
from simple_nets import init_data
from seresnext_test_base import DeviceType
import math
import os
os.environ['CPU_NUM'] = str(4)
......@@ -169,28 +170,32 @@ def optimizer(learning_rate=0.01):
model = SE_ResNeXt50Small
def batch_size(use_cuda):
if use_cuda:
def batch_size(use_device):
if use_device == DeviceType.GPU:
# Paddle uses 8GB P4 GPU for unittest so we decreased the batch size.
return 8
return 12
def iter(use_cuda):
if use_cuda:
def iter(use_device):
if use_device == DeviceType.GPU:
return 10
return 1
gpu_img, gpu_label = init_data(
batch_size=batch_size(use_cuda=True), img_shape=img_shape, label_range=999)
batch_size=batch_size(use_device=DeviceType.GPU),
img_shape=img_shape,
label_range=999)
cpu_img, cpu_label = init_data(
batch_size=batch_size(use_cuda=False), img_shape=img_shape, label_range=999)
batch_size=batch_size(use_device=DeviceType.CPU),
img_shape=img_shape,
label_range=999)
feed_dict_gpu = {"image": gpu_img, "label": gpu_label}
feed_dict_cpu = {"image": cpu_img, "label": cpu_label}
def feed_dict(use_cuda):
if use_cuda:
def feed_dict(use_device):
if use_device == DeviceType.GPU:
return feed_dict_gpu
return feed_dict_cpu
......@@ -15,34 +15,35 @@
from __future__ import print_function
import seresnext_net
import paddle.fluid.core as core
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
from parallel_executor_test_base import DeviceType
import numpy as np
class TestResnetBase(TestParallelExecutorBase):
def _compare_result_with_origin_model(self,
check_func,
use_cuda,
use_device,
delta2=1e-5,
compare_seperately=True):
if use_cuda and not core.is_compiled_with_cuda():
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
func_1_first_loss, func_1_last_loss = self.check_network_convergence(
seresnext_net.model,
feed_dict=seresnext_net.feed_dict(use_cuda),
iter=seresnext_net.iter(use_cuda),
batch_size=seresnext_net.batch_size(use_cuda),
use_cuda=use_cuda,
feed_dict=seresnext_net.feed_dict(use_device),
iter=seresnext_net.iter(use_device),
batch_size=seresnext_net.batch_size(use_device),
use_device=use_device,
use_reduce=False,
optimizer=seresnext_net.optimizer)
func_2_first_loss, func_2_last_loss = check_func(
seresnext_net.model,
feed_dict=seresnext_net.feed_dict(use_cuda),
iter=seresnext_net.iter(use_cuda),
batch_size=seresnext_net.batch_size(use_cuda),
use_cuda=use_cuda)
feed_dict=seresnext_net.feed_dict(use_device),
iter=seresnext_net.iter(use_device),
batch_size=seresnext_net.batch_size(use_device),
use_device=use_device)
if compare_seperately:
for loss in zip(func_1_first_loss, func_2_first_loss):
......
......@@ -14,7 +14,7 @@
from simple_nets import simple_fc_net, fc_with_batchnorm, init_data, bow_net
from fake_reader import fake_imdb_reader
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
import paddle.fluid as fluid
import paddle.fluid.core as core
from functools import partial
......@@ -30,12 +30,12 @@ class TestFuseAllReduceOpsBase(TestParallelExecutorBase):
def compare_fuse_all_reduce_ops(self,
model,
use_cuda,
use_device,
init_feed_dict=None,
get_data_from_feeder=None,
optimizer=None,
fuse_all_optimizer_ops=False):
if use_cuda and not core.is_compiled_with_cuda():
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
feed_dict_data = None
......@@ -47,7 +47,7 @@ class TestFuseAllReduceOpsBase(TestParallelExecutorBase):
model,
feed_dict=feed_dict_data,
get_data_from_feeder=get_data_from_feeder,
use_cuda=use_cuda,
use_device=use_device,
fuse_all_reduce_ops=False,
fuse_all_optimizer_ops=fuse_all_optimizer_ops,
optimizer=optimizer)
......@@ -55,7 +55,7 @@ class TestFuseAllReduceOpsBase(TestParallelExecutorBase):
model,
feed_dict=feed_dict_data,
get_data_from_feeder=get_data_from_feeder,
use_cuda=use_cuda,
use_device=use_device,
fuse_all_reduce_ops=True,
fuse_all_optimizer_ops=fuse_all_optimizer_ops,
optimizer=optimizer)
......@@ -73,28 +73,30 @@ class TestFuseAllReduceOpsBase(TestParallelExecutorBase):
class TestFuseAllReduceOps(TestFuseAllReduceOpsBase):
def _decorate_compare_fused_all_reduce(self, model, use_cuda):
def _decorate_compare_fused_all_reduce(self, model, use_device):
self.compare_fuse_all_reduce_ops(
model,
use_cuda,
use_device,
init_feed_dict=init_data,
optimizer=self.optimizer,
fuse_all_optimizer_ops=True)
def test_simple_fc_with_fuse_all_reduce(self):
self._decorate_compare_fused_all_reduce(simple_fc_net, True)
self._decorate_compare_fused_all_reduce(simple_fc_net, False)
self._decorate_compare_fused_all_reduce(simple_fc_net, DeviceType.GPU)
self._decorate_compare_fused_all_reduce(simple_fc_net, DeviceType.CPU)
def test_batchnorm_fc_with_fuse_all_reduce(self):
self._decorate_compare_fused_all_reduce(fc_with_batchnorm, True)
self._decorate_compare_fused_all_reduce(fc_with_batchnorm, False)
self._decorate_compare_fused_all_reduce(fc_with_batchnorm,
DeviceType.GPU)
self._decorate_compare_fused_all_reduce(fc_with_batchnorm,
DeviceType.CPU)
class TestFuseAllReduceOpsAndOptiOps(TestFuseAllReduceOps):
def _decorate_compare_fused_all_reduce(self, model, use_cuda):
def _decorate_compare_fused_all_reduce(self, model, use_device):
self.compare_fuse_all_reduce_ops(
model,
use_cuda,
use_device,
init_feed_dict=init_data,
optimizer=self.optimizer,
fuse_all_optimizer_ops=True)
......@@ -115,17 +117,17 @@ class TestFuseAllReduceOpsWithSparseGrad(TestFuseAllReduceOpsBase):
feeder = fluid.DataFeeder(feed_list=["words", "label"], place=place)
return feeder.feed(self.train_data)
def _decorate_compare_fused_all_reduce(self, model, use_cuda):
def _decorate_compare_fused_all_reduce(self, model, use_device):
self.compare_fuse_all_reduce_ops(
model,
use_cuda,
use_device,
get_data_from_feeder=self.get_data_from_feeder,
optimizer=self.optimizer)
def test_simple_bow_net_with_fuse_all_reduce(self):
model = partial(bow_net, dict_dim=self.word_dict_len, is_sparse=True)
self._decorate_compare_fused_all_reduce(model, True)
self._decorate_compare_fused_all_reduce(model, False)
self._decorate_compare_fused_all_reduce(model, DeviceType.GPU)
self._decorate_compare_fused_all_reduce(model, DeviceType.CPU)
if __name__ == '__main__':
......
......@@ -13,7 +13,7 @@
# limitations under the License.
from simple_nets import simple_fc_net, fc_with_batchnorm, init_data
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
import paddle.fluid as fluid
import paddle.fluid.core as core
import unittest
......@@ -25,8 +25,8 @@ class TestMNIST(TestParallelExecutorBase):
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
def _compare_fuse_elewise_add_act_ops(self, model, use_cuda):
if use_cuda and not core.is_compiled_with_cuda():
def _compare_fuse_elewise_add_act_ops(self, model, use_device):
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
img, label = init_data()
......@@ -45,7 +45,7 @@ class TestMNIST(TestParallelExecutorBase):
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
fuse_elewise_add_act_ops=False,
use_ir_memory_optimize=False,
enable_inplace=False,
......@@ -54,7 +54,7 @@ class TestMNIST(TestParallelExecutorBase):
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
fuse_elewise_add_act_ops=True,
use_ir_memory_optimize=False,
enable_inplace=False,
......@@ -66,12 +66,14 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
def test_simple_fc_with_fuse_op(self):
self._compare_fuse_elewise_add_act_ops(simple_fc_net, True)
self._compare_fuse_elewise_add_act_ops(simple_fc_net, False)
self._compare_fuse_elewise_add_act_ops(simple_fc_net, DeviceType.GPU)
self._compare_fuse_elewise_add_act_ops(simple_fc_net, DeviceType.CPU)
def test_batchnorm_fc_with_fuse_op(self):
self._compare_fuse_elewise_add_act_ops(fc_with_batchnorm, True)
self._compare_fuse_elewise_add_act_ops(fc_with_batchnorm, False)
self._compare_fuse_elewise_add_act_ops(fc_with_batchnorm,
DeviceType.GPU)
self._compare_fuse_elewise_add_act_ops(fc_with_batchnorm,
DeviceType.CPU)
if __name__ == '__main__':
......
......@@ -14,7 +14,7 @@
from simple_nets import simple_fc_net, fc_with_batchnorm, init_data, bow_net
from fake_reader import fake_imdb_reader
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
from functools import partial
import paddle
import paddle.fluid as fluid
......@@ -34,25 +34,25 @@ class TestFuseOptimizationOps(TestParallelExecutorBase):
def _compare_fused_optimizer_ops(self,
model,
use_cuda,
use_device,
feed_dict=None,
get_data_from_feeder=None,
optimizer=fluid.optimizer.Adam):
if use_cuda and not core.is_compiled_with_cuda():
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
not_fuse_op_first_loss, not_fuse_op_last_loss = self.check_network_convergence(
model,
feed_dict=feed_dict,
get_data_from_feeder=get_data_from_feeder,
use_cuda=use_cuda,
use_device=use_device,
fuse_all_optimizer_ops=False,
optimizer=optimizer)
fuse_op_first_loss, fuse_op_last_loss = self.check_network_convergence(
model,
feed_dict=feed_dict,
get_data_from_feeder=get_data_from_feeder,
use_cuda=use_cuda,
use_device=use_device,
fuse_all_optimizer_ops=True,
optimizer=optimizer)
......@@ -61,10 +61,11 @@ class TestFuseOptimizationOps(TestParallelExecutorBase):
for loss in zip(not_fuse_op_last_loss, fuse_op_last_loss):
self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
def _decorate_compare_fused_optimizer_ops(self, model, use_cuda, optimizer):
def _decorate_compare_fused_optimizer_ops(self, model, use_device,
optimizer):
self._compare_fused_optimizer_ops(
model,
use_cuda,
use_device,
feed_dict=self._get_feed_dict(),
optimizer=optimizer)
......@@ -75,9 +76,9 @@ class TestFuseAdamOps(TestFuseOptimizationOps):
def test_batchnorm_fc_with_fuse_op(self):
self._decorate_compare_fused_optimizer_ops(
fc_with_batchnorm, True, optimizer=self.optimizer)
fc_with_batchnorm, DeviceType.GPU, optimizer=self.optimizer)
self._decorate_compare_fused_optimizer_ops(
fc_with_batchnorm, False, optimizer=self.optimizer)
fc_with_batchnorm, DeviceType.CPU, optimizer=self.optimizer)
class TestFuseSGDOps(TestFuseAdamOps):
......@@ -106,10 +107,11 @@ class TestSpareFuseAdamOps(TestFuseOptimizationOps):
feeder = fluid.DataFeeder(feed_list=["words", "label"], place=place)
return feeder.feed(self.train_data)
def _decorate_compare_fused_optimizer_ops(self, model, use_cuda, optimizer):
def _decorate_compare_fused_optimizer_ops(self, model, use_device,
optimizer):
self._compare_fused_optimizer_ops(
model,
use_cuda,
use_device,
get_data_from_feeder=self._get_data_from_feeder,
optimizer=optimizer)
......@@ -119,9 +121,9 @@ class TestSpareFuseAdamOps(TestFuseOptimizationOps):
def test_simple_bow_net_with_fuse_op(self):
model = partial(bow_net, dict_dim=self.word_dict_len, is_sparse=True)
self._decorate_compare_fused_optimizer_ops(
model, True, optimizer=self.optimizer)
model, DeviceType.GPU, optimizer=self.optimizer)
self._decorate_compare_fused_optimizer_ops(
model, False, optimizer=self.optimizer)
model, DeviceType.CPU, optimizer=self.optimizer)
class TestSpareFuseSGDOps(TestSpareFuseAdamOps):
......@@ -138,18 +140,18 @@ class TestSpareFuseMomentumOps(TestSpareFuseAdamOps):
class TestPassConflictBase(TestFuseAdamOps):
def _compare_fused_optimizer_ops(self,
model,
use_cuda,
use_device,
feed_dict=None,
get_data_from_feeder=None,
optimizer=fluid.optimizer.Adam):
if use_cuda and not core.is_compiled_with_cuda():
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
self.check_pass_conflict(
model,
feed_dict=feed_dict,
get_data_from_feeder=get_data_from_feeder,
use_cuda=use_cuda,
use_device=use_device,
fuse_all_optimizer_ops=True,
optimizer=optimizer,
enable_sequential_execution=True)
......@@ -161,9 +163,9 @@ class TestFuseAdamOpsPassConflict(TestPassConflictBase):
def test_batchnorm_fc_with_fuse_op(self):
self._decorate_compare_fused_optimizer_ops(
fc_with_batchnorm, True, optimizer=self.optimizer)
fc_with_batchnorm, DeviceType.CPU, optimizer=self.optimizer)
self._decorate_compare_fused_optimizer_ops(
fc_with_batchnorm, False, optimizer=self.optimizer)
fc_with_batchnorm, DeviceType.GPU, optimizer=self.optimizer)
class TestFuseSGDOpsPassConflict(TestFuseAdamOpsPassConflict):
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
import paddle.fluid as fluid
import paddle.fluid.core as core
import numpy as np
......@@ -72,8 +72,8 @@ class TestMNIST(TestParallelExecutorBase):
label = np.ones(shape=[32, 1], dtype='int64')
return img, label
def _compare(self, model, use_cuda, random_data=True, only_forward=False):
if use_cuda and not core.is_compiled_with_cuda():
def _compare(self, model, use_device, random_data=True, only_forward=False):
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
img, label = self._init_data(random_data)
......@@ -90,7 +90,7 @@ class TestMNIST(TestParallelExecutorBase):
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
fuse_relu_depthwise_conv=True,
use_ir_memory_optimize=True,
optimizer=_optimizer)
......@@ -98,7 +98,7 @@ class TestMNIST(TestParallelExecutorBase):
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
fuse_relu_depthwise_conv=False,
optimizer=_optimizer)
......@@ -108,12 +108,12 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
def test_simple_depthwise_with_fuse_op(self):
self._compare(simple_depthwise_net, True)
self._compare(simple_depthwise_net, False)
self._compare(simple_depthwise_net, DeviceType.GPU)
self._compare(simple_depthwise_net, DeviceType.CPU)
def test_simple_depthwise_with_fuse_op_only_forward(self):
self._compare(simple_depthwise_net, True, only_forward=True)
self._compare(simple_depthwise_net, False, only_forward=True)
self._compare(simple_depthwise_net, DeviceType.GPU, only_forward=True)
self._compare(simple_depthwise_net, DeviceType.CPU, only_forward=True)
if __name__ == '__main__':
......
......@@ -19,7 +19,7 @@ import unittest
import numpy as np
import paddle.fluid.core as core
import paddle.fluid as fluid
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
def fc_with_batchnorm(use_feed):
......@@ -58,7 +58,7 @@ class TestIrInplace(TestParallelExecutorBase):
fc_with_batchnorm,
feed_dict={"image": img,
"label": label},
use_cuda=True,
use_device=DeviceType.GPU,
use_ir_memory_optimize=ir_memory_optimize,
enable_inplace=enable_inplace)
......
......@@ -75,7 +75,7 @@ class TestIrMemoryOptimizeIfElseOp(unittest.TestCase):
exe = Executor(place)
exec_strategy = fluid.ExecutionStrategy()
exec_strategy._use_device = fluid.ExecutionStrategy.UseDevice.CUDA if use_cuda else fluid.ExecutionStrategy.UseDevice.CPU
exec_strategy._use_device = core.DeviceType.CUDA if use_cuda else core.DeviceType.CPU
build_strategy = fluid.BuildStrategy()
build_strategy.memory_optimize = use_mem_opt
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
import paddle.fluid as fluid
import paddle.fluid.core as core
import numpy as np
......@@ -60,8 +60,8 @@ class TestMNIST(TestParallelExecutorBase):
label = np.ones(shape=[32, 1], dtype='int64')
return img, label
def _compare_ir_memory_optimize(self, model, use_cuda):
if use_cuda and not core.is_compiled_with_cuda():
def _compare_ir_memory_optimize(self, model, use_device):
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
img, label = self._dummy_data()
......@@ -69,13 +69,13 @@ class TestMNIST(TestParallelExecutorBase):
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_ir_memory_optimize=False)
first_loss1, last_loss1 = self.check_network_convergence(
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_ir_memory_optimize=True)
for loss in zip(first_loss0, first_loss1):
self.assertAlmostEqual(loss[0], loss[1], delta=1e-6)
......@@ -83,12 +83,12 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEqual(loss[0], loss[1], delta=1e-6)
def test_simple_fc_net(self):
self._compare_ir_memory_optimize(simple_fc_net, False)
self._compare_ir_memory_optimize(simple_fc_net, True)
self._compare_ir_memory_optimize(simple_fc_net, DeviceType.CPU)
self._compare_ir_memory_optimize(simple_fc_net, DeviceType.GPU)
def test_fc_with_reshape_net(self):
self._compare_ir_memory_optimize(fc_with_inplace_net, False)
self._compare_ir_memory_optimize(fc_with_inplace_net, True)
self._compare_ir_memory_optimize(fc_with_inplace_net, DeviceType.CPU)
self._compare_ir_memory_optimize(fc_with_inplace_net, DeviceType.GPU)
if __name__ == '__main__':
......
......@@ -23,7 +23,7 @@ import paddle.dataset.wmt16 as wmt16
os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0"
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
from test_parallel_executor_transformer import get_feed_data_reader, transformer
......@@ -35,14 +35,14 @@ class TestTransformerWithIR(TestParallelExecutorBase):
# check python transpiler
self.check_network_convergence(
transformer,
use_cuda=True,
use_device=DeviceType.GPU,
feed_data_reader=get_feed_data_reader(),
use_ir_memory_optimize=False,
iter=2)
# check IR memory optimize
self.check_network_convergence(
transformer,
use_cuda=True,
use_device=DeviceType.GPU,
feed_data_reader=get_feed_data_reader(),
use_ir_memory_optimize=True,
iter=2)
......
......@@ -24,7 +24,7 @@ import numpy as np
import paddle
import paddle.fluid as fluid
from simple_nets import init_data
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
batch_size = 12
img_shape = [1, 28, 28]
......@@ -68,7 +68,7 @@ def _optimizer(learning_rate=1e-6):
class TestResnet(TestParallelExecutorBase):
def check_model(self, use_cuda):
def check_model(self, use_device):
img, label = init_data(
batch_size=batch_size, img_shape=img_shape, label_range=9)
img = np.float16(img)
......@@ -78,13 +78,13 @@ class TestResnet(TestParallelExecutorBase):
conv_net,
feed_dict=feed_dict,
iter=10,
use_cuda=use_cuda,
use_device=use_device,
fuse_all_reduce_ops=True,
optimizer=_optimizer)
def test_model(self):
if core.is_compiled_with_cuda():
self.check_model(True)
self.check_model(DeviceType.GPU)
if __name__ == '__main__':
......
......@@ -18,9 +18,11 @@ import unittest
import numpy as np
import paddle.fluid.core as core
import paddle
import os
import paddle.fluid as fluid
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
from parallel_executor_test_base import DeviceType
def simple_fc_net(use_feed):
......@@ -76,10 +78,13 @@ class TestMNIST(TestParallelExecutorBase):
def _compare_reduce_and_allreduce(self,
model,
use_cuda,
use_device,
delta1=1e-6,
delta2=1e-4):
if use_cuda and not core.is_compiled_with_cuda():
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return
img, label = self._init_data()
......@@ -88,14 +93,14 @@ class TestMNIST(TestParallelExecutorBase):
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_reduce=False)
reduce_first_loss, reduce_last_loss = self.check_network_convergence(
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_reduce=True)
for loss in zip(all_reduce_first_loss, reduce_first_loss):
......@@ -104,8 +109,11 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEqual(loss[0], loss[1], delta=delta2)
# simple_fc
def check_simple_fc_convergence(self, use_cuda, use_reduce=False):
if use_cuda and not core.is_compiled_with_cuda():
def check_simple_fc_convergence(self, use_device, use_reduce=False):
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return
img, label = self._init_data()
......@@ -114,23 +122,26 @@ class TestMNIST(TestParallelExecutorBase):
simple_fc_net,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_reduce=use_reduce)
def test_simple_fc(self):
# use_cuda
self.check_simple_fc_convergence(True)
self.check_simple_fc_convergence(False)
# use_device
self.check_simple_fc_convergence(DeviceType.GPU)
self.check_simple_fc_convergence(DeviceType.CPU)
self.check_simple_fc_convergence(DeviceType.XPU)
def test_simple_fc_with_new_strategy(self):
# use_cuda, use_reduce
# use_device, use_reduce
# NOTE: the computation result of nccl_reduce is non-deterministic,
# related issue: https://github.com/NVIDIA/nccl/issues/157
self._compare_reduce_and_allreduce(simple_fc_net, True, 1e-5, 1e-2)
self._compare_reduce_and_allreduce(simple_fc_net, False, 1e-5, 1e-2)
self._compare_reduce_and_allreduce(simple_fc_net, DeviceType.GPU, 1e-5,
1e-2)
self._compare_reduce_and_allreduce(simple_fc_net, DeviceType.CPU, 1e-5,
1e-2)
def check_simple_fc_parallel_accuracy(self, use_cuda):
if use_cuda and not core.is_compiled_with_cuda():
def check_simple_fc_parallel_accuracy(self, use_device):
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
img, label = self._init_data()
......@@ -139,13 +150,13 @@ class TestMNIST(TestParallelExecutorBase):
method=simple_fc_net,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_parallel_executor=False)
parallel_first_loss, parallel_last_loss = self.check_network_convergence(
method=simple_fc_net,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_parallel_executor=True)
self.assertAlmostEquals(
......@@ -156,33 +167,38 @@ class TestMNIST(TestParallelExecutorBase):
np.mean(parallel_last_loss), single_last_loss, delta=1e-6)
def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy(True)
self.check_simple_fc_parallel_accuracy(False)
self.check_simple_fc_parallel_accuracy(DeviceType.GPU)
self.check_simple_fc_parallel_accuracy(DeviceType.CPU)
def check_batchnorm_fc_convergence(self, use_cuda, use_fast_executor):
if use_cuda and not core.is_compiled_with_cuda():
def check_batchnorm_fc_convergence(self, use_device, use_fast_executor):
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
if use_device == DeviceType.XPU and not core.is_compiled_with_xpu():
return
img, label = self._init_data()
self.check_network_convergence(
fc_with_batchnorm,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_fast_executor=use_fast_executor)
def test_batchnorm_fc(self):
for use_cuda in (False, True):
for use_device in (DeviceType.CPU, DeviceType.GPU):
for use_fast_executor in (False, True):
self.check_batchnorm_fc_convergence(use_cuda, use_fast_executor)
self.check_batchnorm_fc_convergence(use_device,
use_fast_executor)
def test_batchnorm_fc_with_new_strategy(self):
# NOTE: the computation result of nccl_reduce is non-deterministic,
# related issue: https://github.com/NVIDIA/nccl/issues/157
self._compare_reduce_and_allreduce(fc_with_batchnorm, True, 1e-5, 1e-2)
self._compare_reduce_and_allreduce(fc_with_batchnorm, False, 1e-5, 1e-2)
self._compare_reduce_and_allreduce(fc_with_batchnorm, DeviceType.GPU,
1e-5, 1e-2)
self._compare_reduce_and_allreduce(fc_with_batchnorm, DeviceType.CPU,
1e-5, 1e-2)
if __name__ == '__main__':
paddle.enable_static()
unittest.main()
......@@ -21,7 +21,7 @@ import os
os.environ['FLAGS_enable_parallel_graph'] = str(1)
import paddle.fluid.core as core
import os
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
from simple_nets import simple_fc_net, init_data
......@@ -31,8 +31,8 @@ class TestMNIST(TestParallelExecutorBase):
os.environ['CPU_NUM'] = str(4)
# simple_fc
def check_simple_fc_convergence(self, use_cuda, use_reduce=False):
if use_cuda and not core.is_compiled_with_cuda():
def check_simple_fc_convergence(self, use_device, use_reduce=False):
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
img, label = init_data()
......@@ -40,15 +40,15 @@ class TestMNIST(TestParallelExecutorBase):
simple_fc_net,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_reduce=use_reduce)
def test_simple_fc(self):
# use_cuda
# use_device
self.check_simple_fc_convergence(True)
def check_simple_fc_parallel_accuracy(self, use_cuda):
if use_cuda and not core.is_compiled_with_cuda():
def check_simple_fc_parallel_accuracy(self, use_device):
if use_device and not core.is_compiled_with_cuda():
return
img, label = init_data()
......@@ -56,13 +56,13 @@ class TestMNIST(TestParallelExecutorBase):
method=simple_fc_net,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_parallel_executor=False)
parallel_first_loss, parallel_last_loss = self.check_network_convergence(
method=simple_fc_net,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_device=use_device,
use_parallel_executor=True)
self.assertAlmostEquals(
......@@ -73,7 +73,7 @@ class TestMNIST(TestParallelExecutorBase):
np.mean(parallel_last_loss), single_last_loss, delta=1e-6)
def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy(True)
self.check_simple_fc_parallel_accuracy(DeviceType.GPU)
if __name__ == '__main__':
......
......@@ -15,7 +15,7 @@
from __future__ import print_function
import unittest
import seresnext_net
from seresnext_test_base import TestResnetBase
from seresnext_test_base import TestResnetBase, DeviceType
from functools import partial
......@@ -30,7 +30,10 @@ class TestResnetCPU(TestResnetBase):
optimizer=seresnext_net.optimizer,
use_parallel_executor=False)
self._compare_result_with_origin_model(
check_func, use_cuda=False, compare_seperately=False, delta2=1e-3)
check_func,
use_device=DeviceType.CPU,
compare_seperately=False,
delta2=1e-3)
if __name__ == '__main__':
......
......@@ -15,7 +15,7 @@
from __future__ import print_function
import unittest
import seresnext_net
from seresnext_test_base import TestResnetBase
from seresnext_test_base import TestResnetBase, DeviceType
from functools import partial
......@@ -30,7 +30,7 @@ class TestResnetGPU(TestResnetBase):
optimizer=seresnext_net.optimizer,
use_parallel_executor=False)
self._compare_result_with_origin_model(
check_func, use_cuda=True, compare_seperately=False)
check_func, use_device=DeviceType.GPU, compare_seperately=False)
if __name__ == '__main__':
......
......@@ -19,7 +19,7 @@ fluid.core._set_fuse_parameter_memory_size(131072)
import unittest
import seresnext_net
from seresnext_test_base import TestResnetBase
from seresnext_test_base import TestResnetBase, DeviceType
from functools import partial
......@@ -31,7 +31,8 @@ class TestResnetWithFuseAllReduceCPU(TestResnetBase):
self.check_network_convergence,
optimizer=seresnext_net.optimizer,
fuse_all_reduce_ops=True)
self._compare_result_with_origin_model(check_func, use_cuda=False)
self._compare_result_with_origin_model(
check_func, use_device=DeviceType.CPU)
if __name__ == '__main__':
......
......@@ -19,7 +19,7 @@ fluid.core._set_fuse_parameter_memory_size(131072)
import unittest
import seresnext_net
from seresnext_test_base import TestResnetBase
from seresnext_test_base import TestResnetBase, DeviceType
from functools import partial
......@@ -32,7 +32,7 @@ class TestResnetWithFuseAllReduceGPU(TestResnetBase):
optimizer=seresnext_net.optimizer,
fuse_all_reduce_ops=True)
self._compare_result_with_origin_model(
check_func, use_cuda=True, delta2=1e-2)
check_func, use_device=DeviceType.GPU, delta2=1e-2)
if __name__ == '__main__':
......
......@@ -14,30 +14,30 @@
from __future__ import print_function
import unittest
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
import seresnext_net
import paddle.fluid.core as core
class TestResnetWithReduceBase(TestParallelExecutorBase):
def _compare_reduce_and_allreduce(self, use_cuda, delta2=1e-5):
if use_cuda and not core.is_compiled_with_cuda():
def _compare_reduce_and_allreduce(self, use_device, delta2=1e-5):
if use_device == DeviceType.GPU and not core.is_compiled_with_cuda():
return
all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence(
seresnext_net.model,
feed_dict=seresnext_net.feed_dict(use_cuda),
iter=seresnext_net.iter(use_cuda),
batch_size=seresnext_net.batch_size(use_cuda),
use_cuda=use_cuda,
feed_dict=seresnext_net.feed_dict(use_device),
iter=seresnext_net.iter(use_device),
batch_size=seresnext_net.batch_size(use_device),
use_device=use_device,
use_reduce=False,
optimizer=seresnext_net.optimizer)
reduce_first_loss, reduce_last_loss = self.check_network_convergence(
seresnext_net.model,
feed_dict=seresnext_net.feed_dict(use_cuda),
iter=seresnext_net.iter(use_cuda),
batch_size=seresnext_net.batch_size(use_cuda),
use_cuda=use_cuda,
feed_dict=seresnext_net.feed_dict(use_device),
iter=seresnext_net.iter(use_device),
batch_size=seresnext_net.batch_size(use_device),
use_device=use_device,
use_reduce=True,
optimizer=seresnext_net.optimizer)
......@@ -46,25 +46,25 @@ class TestResnetWithReduceBase(TestParallelExecutorBase):
for loss in zip(all_reduce_last_loss, reduce_last_loss):
self.assertAlmostEquals(loss[0], loss[1], delta=loss[0] * delta2)
if not use_cuda:
if not use_device:
return
all_reduce_first_loss_seq, all_reduce_last_loss_seq = self.check_network_convergence(
seresnext_net.model,
feed_dict=seresnext_net.feed_dict(use_cuda),
iter=seresnext_net.iter(use_cuda),
batch_size=seresnext_net.batch_size(use_cuda),
use_cuda=use_cuda,
feed_dict=seresnext_net.feed_dict(use_device),
iter=seresnext_net.iter(use_device),
batch_size=seresnext_net.batch_size(use_device),
use_device=use_device,
use_reduce=False,
optimizer=seresnext_net.optimizer,
enable_sequential_execution=True)
reduce_first_loss_seq, reduce_last_loss_seq = self.check_network_convergence(
seresnext_net.model,
feed_dict=seresnext_net.feed_dict(use_cuda),
iter=seresnext_net.iter(use_cuda),
batch_size=seresnext_net.batch_size(use_cuda),
use_cuda=use_cuda,
feed_dict=seresnext_net.feed_dict(use_device),
iter=seresnext_net.iter(use_device),
batch_size=seresnext_net.batch_size(use_device),
use_device=use_device,
use_reduce=True,
optimizer=seresnext_net.optimizer,
enable_sequential_execution=True)
......@@ -87,7 +87,8 @@ class TestResnetWithReduceBase(TestParallelExecutorBase):
class TestResnetWithReduceCPU(TestResnetWithReduceBase):
def test_seresnext_with_reduce(self):
self._compare_reduce_and_allreduce(use_cuda=False, delta2=1e-3)
self._compare_reduce_and_allreduce(
use_device=DeviceType.CPU, delta2=1e-3)
if __name__ == '__main__':
......
......@@ -14,12 +14,13 @@
from __future__ import print_function
import unittest
from test_parallel_executor_seresnext_with_reduce_cpu import TestResnetWithReduceBase
from test_parallel_executor_seresnext_with_reduce_cpu import TestResnetWithReduceBase, DeviceType
class TestResnetWithReduceGPU(TestResnetWithReduceBase):
def test_seresnext_with_reduce(self):
self._compare_reduce_and_allreduce(use_cuda=True, delta2=1e-2)
self._compare_reduce_and_allreduce(
use_device=DeviceType.GPU, delta2=1e-2)
if __name__ == '__main__':
......
......@@ -17,7 +17,7 @@ from __future__ import print_function
import paddle.fluid as fluid
import transformer_model
import numpy as np
from parallel_executor_test_base import TestParallelExecutorBase
from parallel_executor_test_base import TestParallelExecutorBase, DeviceType
import unittest
import paddle
import paddle.fluid.core as core
......@@ -191,16 +191,16 @@ class TestTransformer(TestParallelExecutorBase):
if core.is_compiled_with_cuda():
self.check_network_convergence(
transformer,
use_cuda=True,
use_device=DeviceType.GPU,
feed_data_reader=get_feed_data_reader())
self.check_network_convergence(
transformer,
use_cuda=True,
use_device=DeviceType.GPU,
enable_sequential_execution=True,
feed_data_reader=get_feed_data_reader())
self.check_network_convergence(
transformer,
use_cuda=False,
use_device=DeviceType.CPU,
iter=2,
feed_data_reader=get_feed_data_reader())
......
......@@ -22,7 +22,7 @@ import paddle.fluid as fluid
import paddle.fluid.core as core
from simple_nets import init_data, simple_fc_net, fc_with_batchnorm
import seresnext_net
from test_parallel_executor_transformer import transformer, get_feed_data_reader
from test_parallel_executor_transformer import transformer, get_feed_data_reader, DeviceType
from fake_reader import fake_imdb_reader
......@@ -219,7 +219,7 @@ class TestProgramPruneBackward(unittest.TestCase):
with self.program_scope_guard():
self.check_prune_correctness(
method=seresnext_net.model,
feed_dict=seresnext_net.feed_dict(use_cuda=False),
feed_dict=seresnext_net.feed_dict(use_device=DeviceType.CPU),
optimizer=seresnext_net.optimizer)
def test_transformer(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册