未验证 提交 6f69a4cb 编写于 作者: T Thunderbrook 提交者: GitHub

add xpu in heter mode (#27000)

* add xpu in heter mode
test=develop

* BOOST_CONST_GET; PADDLE_THROW
test=develop

* code style
test=develop

* code style
test=develop

* code style
test=develop

* refine
test=develop

* refine
test=develop

* refine
test=develop

* refine code
test=develop
上级 8daccc9e
...@@ -270,6 +270,10 @@ if(WITH_PSLIB) ...@@ -270,6 +270,10 @@ if(WITH_PSLIB)
endif() endif()
endif(WITH_PSLIB) endif(WITH_PSLIB)
if(NOT WIN32 AND NOT APPLE)
include(external/gloo)
list(APPEND third_party_deps extern_gloo)
endif()
if(WITH_BOX_PS) if(WITH_BOX_PS)
include(external/box_ps) include(external/box_ps)
...@@ -277,10 +281,6 @@ if(WITH_BOX_PS) ...@@ -277,10 +281,6 @@ if(WITH_BOX_PS)
endif(WITH_BOX_PS) endif(WITH_BOX_PS)
if(WITH_DISTRIBUTE) if(WITH_DISTRIBUTE)
if(WITH_GLOO)
include(external/gloo)
list(APPEND third_party_deps extern_gloo)
endif()
if(WITH_GRPC) if(WITH_GRPC)
list(APPEND third_party_deps extern_grpc) list(APPEND third_party_deps extern_grpc)
......
...@@ -74,7 +74,9 @@ class PullDenseWorker { ...@@ -74,7 +74,9 @@ class PullDenseWorker {
virtual void Initialize(const TrainerDesc& param); virtual void Initialize(const TrainerDesc& param);
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
void AddStream(const cudaStream_t stream) { copy_streams_.push_back(stream); } void AddStream(const cudaStream_t stream) { copy_streams_.push_back(stream); }
#endif
#if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_XPU)
void AddPlace(const paddle::platform::Place place) { void AddPlace(const paddle::platform::Place place) {
places_.push_back(place); places_.push_back(place);
} }
...@@ -135,9 +137,9 @@ class PullDenseWorker { ...@@ -135,9 +137,9 @@ class PullDenseWorker {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
std::vector<cudaStream_t> copy_streams_; std::vector<cudaStream_t> copy_streams_;
#endif
std::vector<paddle::platform::Place> places_; std::vector<paddle::platform::Place> places_;
std::vector<Scope*> thread_scopes_; std::vector<Scope*> thread_scopes_;
#endif
}; };
// should incorporate different type of device // should incorporate different type of device
...@@ -161,6 +163,7 @@ class DeviceWorker { ...@@ -161,6 +163,7 @@ class DeviceWorker {
virtual void SetDataFeed(DataFeed* data_feed); virtual void SetDataFeed(DataFeed* data_feed);
virtual void SetWorkerNum(int num) {} virtual void SetWorkerNum(int num) {}
virtual void CacheProgram(const ProgramDesc& main_program) {} virtual void CacheProgram(const ProgramDesc& main_program) {}
virtual void GetXpuOpIndex() {}
virtual void SetNeedDumpField(bool need_dump_field) { virtual void SetNeedDumpField(bool need_dump_field) {
need_dump_field_ = need_dump_field; need_dump_field_ = need_dump_field;
} }
......
...@@ -745,7 +745,57 @@ void FleetWrapper::PushDenseVarsAsync( ...@@ -745,7 +745,57 @@ void FleetWrapper::PushDenseVarsAsync(
push_sparse_status->push_back(std::move(status)); push_sparse_status->push_back(std::move(status));
} }
} }
#endif
#ifdef PADDLE_WITH_XPU
void FleetWrapper::PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size,
const paddle::platform::Place& place) {
#ifdef PADDLE_WITH_PSLIB
std::vector<paddle::ps::Region> regions;
for (auto& t : var_names) {
Variable* var = scope.FindVar(t);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int count = tensor->numel();
float* g_data = tensor->data<float>();
Variable* pin_var = scope.FindVar(t + "pin");
LoDTensor* pin_tensor = pin_var->GetMutable<LoDTensor>();
float* pin_g =
pin_tensor->mutable_data<float>(tensor->dims(), platform::CPUPlace());
memory::Copy(platform::CPUPlace(), pin_g,
BOOST_GET_CONST(platform::XPUPlace, place), g_data,
sizeof(float) * count);
float* g = pin_g;
if (scale_datanorm >= 0) {
if (t.find(".batch_size@GRAD") != std::string::npos ||
t.find(".batch_sum@GRAD") != std::string::npos) {
Eigen::Map<Eigen::MatrixXf> mat(g, 1, count);
float scale = 1.0 / batch_size;
mat *= scale;
} else if (t.find(".batch_square_sum@GRAD") != std::string::npos) {
VLOG(3) << "epsilon: " << scale_datanorm;
for (int i = 0; i < count; ++i) {
g[i] = (g[i] - batch_size * scale_datanorm) / batch_size +
batch_size * scale_datanorm;
}
}
}
paddle::ps::Region reg(g, count);
regions.emplace_back(std::move(reg));
}
auto status = pslib_ptr_->_worker_ptr->push_dense(regions.data(),
regions.size(), table_id);
if (push_sparse_status) {
push_sparse_status->push_back(std::move(status));
}
#endif
}
#endif #endif
void FleetWrapper::PushDenseVarsAsync( void FleetWrapper::PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id, const Scope& scope, const uint64_t table_id,
......
...@@ -160,6 +160,14 @@ class FleetWrapper { ...@@ -160,6 +160,14 @@ class FleetWrapper {
float scale_datanorm, int batch_size, float scale_datanorm, int batch_size,
const paddle::platform::Place& place, cudaStream_t stream, const paddle::platform::Place& place, cudaStream_t stream,
cudaEvent_t event); cudaEvent_t event);
#endif
#ifdef PADDLE_WITH_XPU
void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size,
const paddle::platform::Place& place);
#endif #endif
void PushDenseVarsAsync( void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id, const Scope& scope, const uint64_t table_id,
......
...@@ -113,30 +113,66 @@ void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope, ...@@ -113,30 +113,66 @@ void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope,
if (platform::is_cpu_place(tensor->place())) { if (platform::is_cpu_place(tensor->place())) {
memcpy(data_ptr, tensor->data<void>(), memcpy(data_ptr, tensor->data<void>(),
tensor->numel() * SizeOfType(tensor->type())); tensor->numel() * SizeOfType(tensor->type()));
#ifdef PADDLE_WITH_CUDA
} else { } else {
#ifdef PADDLE_WITH_CUDA
memory::Copy(platform::CPUPlace(), data_ptr, memory::Copy(platform::CPUPlace(), data_ptr,
BOOST_GET_CONST(platform::CUDAPlace, tensor->place()), BOOST_GET_CONST(platform::CUDAPlace, tensor->place()),
tensor->data<void>(), tensor->data<void>(),
tensor->numel() * SizeOfType(tensor->type()), nullptr); tensor->numel() * SizeOfType(tensor->type()), nullptr);
}
#else
}
#endif #endif
#ifdef PADDLE_WITH_XPU
memory::Copy(platform::CPUPlace(), data_ptr,
BOOST_GET_CONST(platform::XPUPlace, tensor->place()),
tensor->data<void>(),
tensor->numel() * SizeOfType(tensor->type()));
#endif
}
} }
// void HeterWrapper::DeSerializeToTensor(Scope* scope,
// const HeterRequest* request) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
void HeterWrapper::DeSerializeToTensor(Scope* scope, void HeterWrapper::DeSerializeToTensor(Scope* scope,
const VariableMessage& req_var, const VariableMessage& req_var,
platform::Place place, platform::Place place,
cudaStream_t stream) { cudaStream_t stream) {
// const VariableMessage& req_var = request->vars();
auto* var = scope->FindVar(req_var.varname());
auto* tensor = var->GetMutable<LoDTensor>();
std::vector<int> vec_dim;
for (auto& x : req_var.dims()) {
vec_dim.push_back(x);
}
tensor->Resize(make_ddim(vec_dim));
LoD lod;
for (int i = 0; i < req_var.lod_level(); ++i) {
framework::Vector<size_t> v;
for (int j = 0; j < req_var.lod(i).lod_data_size(); ++j) {
v.push_back(req_var.lod(i).lod_data(j));
}
lod.push_back(v);
}
tensor->set_lod(lod);
void* tensor_data =
tensor->mutable_data(place, ToVarType(req_var.data_type()));
#ifdef PADDLE_WITH_CUDA
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place), tensor_data,
platform::CPUPlace(), req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type()), stream);
#else #else
memcpy(tensor_data, req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type()));
#endif
}
#endif
// void HeterWrapper::DeSerializeToTensor(Scope* scope,
// const HeterRequest* request) {
void HeterWrapper::DeSerializeToTensor(Scope* scope, void HeterWrapper::DeSerializeToTensor(Scope* scope,
const VariableMessage& req_var, const VariableMessage& req_var,
platform::Place place) { platform::Place place) {
#endif
// const VariableMessage& req_var = request->vars(); // const VariableMessage& req_var = request->vars();
auto* var = scope->FindVar(req_var.varname()); auto* var = scope->FindVar(req_var.varname());
auto* tensor = var->GetMutable<LoDTensor>(); auto* tensor = var->GetMutable<LoDTensor>();
...@@ -160,10 +196,10 @@ void HeterWrapper::DeSerializeToTensor(Scope* scope, ...@@ -160,10 +196,10 @@ void HeterWrapper::DeSerializeToTensor(Scope* scope,
void* tensor_data = void* tensor_data =
tensor->mutable_data(place, ToVarType(req_var.data_type())); tensor->mutable_data(place, ToVarType(req_var.data_type()));
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_XPU
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place), tensor_data, memory::Copy(BOOST_GET_CONST(platform::XPUPlace, place), tensor_data,
platform::CPUPlace(), req_var.data().data(), platform::CPUPlace(), req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type()), stream); tensor->numel() * SizeOfType(tensor->type()));
#else #else
memcpy(tensor_data, req_var.data().data(), memcpy(tensor_data, req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type())); tensor->numel() * SizeOfType(tensor->type()));
...@@ -184,7 +220,8 @@ framework::proto::VarType::Type HeterWrapper::ToVarType( ...@@ -184,7 +220,8 @@ framework::proto::VarType::Type HeterWrapper::ToVarType(
case VariableMessage::BOOL: case VariableMessage::BOOL:
return framework::proto::VarType::BOOL; // NOLINT return framework::proto::VarType::BOOL; // NOLINT
default: default:
VLOG(0) << "Not support type " << type; PADDLE_THROW(platform::errors::InvalidArgument(
"ToVarType:Unsupported type %d", type));
} }
} }
......
...@@ -12,9 +12,21 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,9 +12,21 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB) #include <cstdlib>
#include <ctime>
#include <string>
#include <vector>
#include "io/fs.h"
#include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/trainer.h"
#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_device_guard.h" #include "paddle/fluid/platform/cuda_device_guard.h"
#endif
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -34,6 +46,7 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc, ...@@ -34,6 +46,7 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc,
int place_num = trainer_desc.worker_places_size(); int place_num = trainer_desc.worker_places_size();
for (int i = 0; i < place_num; ++i) { for (int i = 0; i < place_num; ++i) {
int num = trainer_desc.worker_places(i); int num = trainer_desc.worker_places(i);
#ifdef PADDLE_WITH_CUDA
platform::CUDAPlace place = platform::CUDAPlace(num); platform::CUDAPlace place = platform::CUDAPlace(num);
platform::CUDADeviceGuard guard(place.device); platform::CUDADeviceGuard guard(place.device);
cudaStream_t stream; cudaStream_t stream;
...@@ -44,6 +57,11 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc, ...@@ -44,6 +57,11 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc,
PADDLE_ENFORCE_CUDA_SUCCESS( PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
events_.push_back(event); events_.push_back(event);
#endif
#ifdef PADDLE_WITH_XPU
platform::XPUPlace place = platform::XPUPlace(num);
places_.push_back(place);
#endif
} }
// thread_num_ = trainer_desc.thread_num(); // thread_num_ = trainer_desc.thread_num();
// SetDataset(dataset); // SetDataset(dataset);
...@@ -95,11 +113,17 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc, ...@@ -95,11 +113,17 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc,
void HeterXpuTrainer::CreateThreadParam(const ProgramDesc& program, int num) { void HeterXpuTrainer::CreateThreadParam(const ProgramDesc& program, int num) {
auto place = places_[num]; auto place = places_[num];
Scope* scope = place_scopes_[num]; Scope* scope = place_scopes_[num];
#ifdef PADDLE_WITH_CUDA
auto stream = copy_streams_[num]; auto stream = copy_streams_[num];
auto event = events_[num]; auto event = events_[num];
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device; auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id); platform::CUDADeviceGuard guard(dev_id);
#endif
#ifdef PADDLE_WITH_XPU
xpu_set_device(BOOST_GET_CONST(platform::XPUPlace, place).device);
#endif
auto& block = program.Block(0); auto& block = program.Block(0);
for (auto& var : block.AllVars()) { for (auto& var : block.AllVars()) {
if (var->Persistable()) { if (var->Persistable()) {
...@@ -116,13 +140,28 @@ void HeterXpuTrainer::CreateThreadParam(const ProgramDesc& program, int num) { ...@@ -116,13 +140,28 @@ void HeterXpuTrainer::CreateThreadParam(const ProgramDesc& program, int num) {
HeterMemCpy<cpp_type>(thread_tensor, root_tensor, place, stream); \ HeterMemCpy<cpp_type>(thread_tensor, root_tensor, place, stream); \
} \ } \
} while (0) } while (0)
#define HeterMemcpyXpuFunc(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
HeterMemCpy<cpp_type>(thread_tensor, root_tensor, place); \
} \
} while (0)
#ifdef PADDLE_WITH_CUDA
_ForEachDataType_(HeterMemcpyFunc); _ForEachDataType_(HeterMemcpyFunc);
#endif
#ifdef PADDLE_WITH_XPU
_ForEachDataType_(HeterMemcpyXpuFunc);
#endif
} }
} }
#ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream)); PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream));
cudaEventSynchronize(event); cudaEventSynchronize(event);
#endif
} }
#ifdef PADDLE_WITH_CUDA
template <typename T> template <typename T>
void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor, void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor,
LoDTensor* root_tensor, LoDTensor* root_tensor,
...@@ -141,6 +180,27 @@ void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor, ...@@ -141,6 +180,27 @@ void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor,
root_ptr, sizeof(T) * root_tensor->numel(), stream); root_ptr, sizeof(T) * root_tensor->numel(), stream);
} }
} }
#endif
#ifdef PADDLE_WITH_XPU
template <typename T>
void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor,
LoDTensor* root_tensor,
const paddle::platform::Place& thread_place) {
T* thread_ptr =
thread_tensor->mutable_data<T>(root_tensor->dims(), thread_place);
T* root_ptr = root_tensor->data<T>();
if (platform::is_cpu_place(root_tensor->place())) {
memory::Copy(BOOST_GET_CONST(platform::XPUPlace, thread_place), thread_ptr,
platform::CPUPlace(), root_ptr,
sizeof(T) * root_tensor->numel());
} else {
memory::Copy(BOOST_GET_CONST(platform::XPUPlace, thread_place), thread_ptr,
BOOST_GET_CONST(platform::XPUPlace, root_tensor->place()),
root_ptr, sizeof(T) * root_tensor->numel());
}
}
#endif
void HeterXpuTrainer::DumpWork(int tid) {} void HeterXpuTrainer::DumpWork(int tid) {}
...@@ -171,13 +231,16 @@ void HeterXpuTrainer::InitOtherEnv(const ProgramDesc& main_program) { ...@@ -171,13 +231,16 @@ void HeterXpuTrainer::InitOtherEnv(const ProgramDesc& main_program) {
CreateThreadParam(main_program, i); CreateThreadParam(main_program, i);
pull_dense_worker_->AddThreadScope(scope); pull_dense_worker_->AddThreadScope(scope);
pull_dense_worker_->AddPlace(places_[i]); pull_dense_worker_->AddPlace(places_[i]);
#ifdef PADDLE_WITH_CUDA
pull_dense_worker_->AddStream(copy_streams_[i]); pull_dense_worker_->AddStream(copy_streams_[i]);
#endif
} }
pull_dense_worker_->Start(); pull_dense_worker_->Start();
#ifdef PADDLE_WITH_CUDA
for (auto& stream : copy_streams_) { for (auto& stream : copy_streams_) {
cudaStreamSynchronize(stream); cudaStreamSynchronize(stream);
} }
#endif
op_names_.clear(); op_names_.clear();
for (auto& op_desc : block.AllOps()) { for (auto& op_desc : block.AllOps()) {
std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc); std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc);
...@@ -220,10 +283,12 @@ void HeterXpuTrainer::InitOtherEnv(const ProgramDesc& main_program) { ...@@ -220,10 +283,12 @@ void HeterXpuTrainer::InitOtherEnv(const ProgramDesc& main_program) {
OperatorBase* local_op_ptr = local_op.release(); OperatorBase* local_op_ptr = local_op.release();
(context->ops_).push_back(local_op_ptr); (context->ops_).push_back(local_op_ptr);
} }
#ifdef PADDLE_WITH_CUDA
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device; auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id); platform::CUDADeviceGuard guard(dev_id);
PADDLE_ENFORCE_CUDA_SUCCESS( PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming)); cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming));
#endif
object_pool_.Push(context); object_pool_.Push(context);
} }
} }
...@@ -267,12 +332,25 @@ int HeterXpuTrainer::EndPass(const HeterRequest* request, ...@@ -267,12 +332,25 @@ int HeterXpuTrainer::EndPass(const HeterRequest* request,
} \ } \
} while (0) } while (0)
_ForEachDataType_(MergeCallback); _ForEachDataType_(MergeCallback);
if (platform::is_gpu_place(thread_tensor->place())) { if (!platform::is_cpu_place(thread_tensor->place())) {
#ifdef PADDLE_WITH_CUDA
auto dev_id = auto dev_id =
BOOST_GET_CONST(platform::CUDAPlace, thread_tensor->place()).device; BOOST_GET_CONST(platform::CUDAPlace, thread_tensor->place()).device;
platform::CUDADeviceGuard guard(dev_id); platform::CUDADeviceGuard guard(dev_id);
cudaMemset(thread_tensor->data<void>(), 0, cudaMemset(thread_tensor->data<void>(), 0,
thread_tensor->numel() * SizeOfType(thread_tensor->type())); thread_tensor->numel() * SizeOfType(thread_tensor->type()));
#endif
#ifdef PADDLE_WITH_XPU
auto place = thread_tensor->place();
xpu_set_device(BOOST_GET_CONST(platform::XPUPlace, place).device);
platform::DeviceContextPool& pool =
platform::DeviceContextPool::Instance();
platform::DeviceContext* dev_ctx = pool.Get(place);
const platform::XPUDeviceContext* xpu_ctx =
reinterpret_cast<const platform::XPUDeviceContext*>(dev_ctx);
xpu::memset(xpu_ctx->x_context(), thread_tensor->data<void>(), 0,
thread_tensor->numel() * SizeOfType(thread_tensor->type()));
#endif
} else { } else {
memset(thread_tensor->data<void>(), 0, memset(thread_tensor->data<void>(), 0,
thread_tensor->numel() * SizeOfType(thread_tensor->type())); thread_tensor->numel() * SizeOfType(thread_tensor->type()));
...@@ -281,12 +359,25 @@ int HeterXpuTrainer::EndPass(const HeterRequest* request, ...@@ -281,12 +359,25 @@ int HeterXpuTrainer::EndPass(const HeterRequest* request,
auto* merge_var = response->add_vars(); auto* merge_var = response->add_vars();
heter_ptr_->SerializeToReq(need_merge_var_names_[i], root_scope_, heter_ptr_->SerializeToReq(need_merge_var_names_[i], root_scope_,
merge_var); merge_var);
if (platform::is_gpu_place(root_tensor->place())) { if (!platform::is_cpu_place(root_tensor->place())) {
#ifdef PADDLE_WITH_CUDA
auto dev_id = auto dev_id =
BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()).device; BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()).device;
platform::CUDADeviceGuard guard(dev_id); platform::CUDADeviceGuard guard(dev_id);
cudaMemset(root_tensor->data<void>(), 0, cudaMemset(root_tensor->data<void>(), 0,
root_tensor->numel() * SizeOfType(root_tensor->type())); root_tensor->numel() * SizeOfType(root_tensor->type()));
#endif
#ifdef PADDLE_WITH_XPU
auto place = root_tensor->place();
xpu_set_device(BOOST_GET_CONST(platform::XPUPlace, place).device);
platform::DeviceContextPool& pool =
platform::DeviceContextPool::Instance();
platform::DeviceContext* dev_ctx = pool.Get(place);
const platform::XPUDeviceContext* xpu_ctx =
reinterpret_cast<const platform::XPUDeviceContext*>(dev_ctx);
xpu::memset(xpu_ctx->x_context(), root_tensor->data<void>(), 0,
root_tensor->numel() * SizeOfType(root_tensor->type()));
#endif
} else { } else {
memset(root_tensor->data<void>(), 0, memset(root_tensor->data<void>(), 0,
root_tensor->numel() * SizeOfType(root_tensor->type())); root_tensor->numel() * SizeOfType(root_tensor->type()));
...@@ -346,11 +437,12 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, ...@@ -346,11 +437,12 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request,
OperatorBase* local_op_ptr = local_op.release(); OperatorBase* local_op_ptr = local_op.release();
(context->ops_).push_back(local_op_ptr); (context->ops_).push_back(local_op_ptr);
} }
#ifdef PADDLE_WITH_CUDA
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device; auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id); platform::CUDADeviceGuard guard(dev_id);
PADDLE_ENFORCE_CUDA_SUCCESS( PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming)); cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming));
#endif
} }
context->Reset(); context->Reset();
...@@ -359,15 +451,22 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, ...@@ -359,15 +451,22 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request,
auto deserial_timer = auto deserial_timer =
std::make_shared<paddle::ps::CostTimer>("xpu_service_deserial"); std::make_shared<paddle::ps::CostTimer>("xpu_service_deserial");
for (int i = 0; i < request->vars_size(); ++i) { for (int i = 0; i < request->vars_size(); ++i) {
#ifdef PADDLE_WITH_CUDA
heter_ptr_->DeSerializeToTensor(context->scope_, request->vars(i), place, heter_ptr_->DeSerializeToTensor(context->scope_, request->vars(i), place,
copy_streams_[context->place_num_]); copy_streams_[context->place_num_]);
#endif
#ifdef PADDLE_WITH_XPU
heter_ptr_->DeSerializeToTensor(context->scope_, request->vars(i), place);
#endif
} }
#ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE_CUDA_SUCCESS( PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(context->event_, copy_streams_[context->place_num_])); cudaEventRecord(context->event_, copy_streams_[context->place_num_]));
while (cudaEventQuery(context->event_) != cudaSuccess) { while (cudaEventQuery(context->event_) != cudaSuccess) {
VLOG(3) << "wait for kernel"; VLOG(3) << "wait for kernel";
bthread_yield(); bthread_yield();
} }
#endif
} }
{ {
...@@ -378,6 +477,7 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, ...@@ -378,6 +477,7 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request,
op->Run(*(context->scope_), place); op->Run(*(context->scope_), place);
} }
} }
#ifdef PADDLE_WITH_CUDA
auto* dev_ctx = static_cast<platform::CUDADeviceContext*>( auto* dev_ctx = static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place)); platform::DeviceContextPool::Instance().Get(place));
PADDLE_ENFORCE_CUDA_SUCCESS( PADDLE_ENFORCE_CUDA_SUCCESS(
...@@ -391,6 +491,10 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, ...@@ -391,6 +491,10 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request,
bthread_yield(); bthread_yield();
} }
} }
#endif
#ifdef PADDLE_WITH_XPU
xpu_wait();
#endif
for (int i = 0; i < trainer_desc_.xpu_send_list_size(); ++i) { for (int i = 0; i < trainer_desc_.xpu_send_list_size(); ++i) {
const std::string& varname = trainer_desc_.xpu_send_list(i); const std::string& varname = trainer_desc_.xpu_send_list(i);
...@@ -407,11 +511,19 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, ...@@ -407,11 +511,19 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request,
++i) { ++i) {
uint64_t tid = uint64_t tid =
static_cast<uint64_t>(param_.program_config(0).push_dense_table_id(i)); static_cast<uint64_t>(param_.program_config(0).push_dense_table_id(i));
#ifdef PADDLE_WITH_CUDA
fleet_ptr_->PushDenseVarsAsync( fleet_ptr_->PushDenseVarsAsync(
*(context->scope_), tid, dense_grad_names_[tid], *(context->scope_), tid, dense_grad_names_[tid],
&(context->push_dense_status_), scale_datanorm_, request->cur_batch(), &(context->push_dense_status_), scale_datanorm_, request->cur_batch(),
places_[context->place_num_], copy_streams_[context->place_num_], places_[context->place_num_], copy_streams_[context->place_num_],
context->event_); context->event_);
#endif
#ifdef PADDLE_WITH_XPU
fleet_ptr_->PushDenseVarsAsync(
*(context->scope_), tid, dense_grad_names_[tid],
&(context->push_dense_status_), scale_datanorm_, request->cur_batch(),
places_[context->place_num_]);
#endif
} }
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) { ++i) {
...@@ -453,7 +565,6 @@ void HeterXpuTrainer::Finalize() { ...@@ -453,7 +565,6 @@ void HeterXpuTrainer::Finalize() {
pull_dense_worker_->Stop(); pull_dense_worker_->Stop();
root_scope_->DropKids(); root_scope_->DropKids();
} }
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
#endif #endif
...@@ -62,13 +62,15 @@ void PullDenseWorker::Initialize(const TrainerDesc& param) { ...@@ -62,13 +62,15 @@ void PullDenseWorker::Initialize(const TrainerDesc& param) {
fleet_ptr_ = FleetWrapper::GetInstance(); fleet_ptr_ = FleetWrapper::GetInstance();
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
copy_streams_.clear(); copy_streams_.clear();
#endif
#if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_XPU)
places_.clear(); places_.clear();
thread_scopes_.clear(); thread_scopes_.clear();
#endif #endif
} }
void PullDenseWorker::CreatePinVar() { void PullDenseWorker::CreatePinVar() {
#ifdef PADDLE_WITH_CUDA #if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_PSLIB)
// for (auto& v : dense_value_names_) { // for (auto& v : dense_value_names_) {
// for (auto& name : v.second) { // for (auto& name : v.second) {
for (int i = 0; i < dwp_param_.program_config(0).pull_dense_table_id_size(); for (int i = 0; i < dwp_param_.program_config(0).pull_dense_table_id_size();
...@@ -83,8 +85,13 @@ void PullDenseWorker::CreatePinVar() { ...@@ -83,8 +85,13 @@ void PullDenseWorker::CreatePinVar() {
auto* ptr = root_scope_->Var(name + "pin"); auto* ptr = root_scope_->Var(name + "pin");
InitializeVariable(ptr, proto::VarType::LOD_TENSOR); InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
LoDTensor* pin_tensor = ptr->GetMutable<LoDTensor>(); LoDTensor* pin_tensor = ptr->GetMutable<LoDTensor>();
#ifdef PADDLE_WITH_CUDA
pin_tensor->mutable_data<float>(tensor->dims(), pin_tensor->mutable_data<float>(tensor->dims(),
platform::CUDAPinnedPlace()); platform::CUDAPinnedPlace());
#endif
#ifdef PADDLE_WITH_XPU
pin_tensor->mutable_data<float>(tensor->dims(), platform::CPUPlace());
#endif
} }
} }
#endif #endif
...@@ -107,7 +114,7 @@ void PullDenseWorker::Wait(std::vector<::std::future<int32_t>>* status_vec) { ...@@ -107,7 +114,7 @@ void PullDenseWorker::Wait(std::vector<::std::future<int32_t>>* status_vec) {
exit(-1); exit(-1);
} }
status_vec->resize(0); status_vec->resize(0);
#ifdef PADDLE_WITH_CUDA #if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_XPU)
for (size_t i = 0; i < places_.size(); ++i) { for (size_t i = 0; i < places_.size(); ++i) {
// for (auto& v : dense_value_names_) { // for (auto& v : dense_value_names_) {
...@@ -125,9 +132,16 @@ void PullDenseWorker::Wait(std::vector<::std::future<int32_t>>* status_vec) { ...@@ -125,9 +132,16 @@ void PullDenseWorker::Wait(std::vector<::std::future<int32_t>>* status_vec) {
Variable* var = thread_scopes_[i]->FindVar(name); Variable* var = thread_scopes_[i]->FindVar(name);
LoDTensor* tensor = var->GetMutable<LoDTensor>(); LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* w = tensor->data<float>(); float* w = tensor->data<float>();
#ifdef PADDLE_WITH_CUDA
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, places_[i]), w, memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, places_[i]), w,
platform::CUDAPinnedPlace(), pin_w, platform::CUDAPinnedPlace(), pin_w,
sizeof(float) * tensor->numel(), copy_streams_[i]); sizeof(float) * tensor->numel(), copy_streams_[i]);
#endif
#ifdef PADDLE_WITH_XPU
memory::Copy(BOOST_GET_CONST(platform::XPUPlace, places_[i]), w,
platform::CPUPlace(), pin_w,
sizeof(float) * tensor->numel());
#endif
} }
} }
} }
...@@ -148,7 +162,7 @@ void PullDenseWorker::PullDense(bool force_update) { ...@@ -148,7 +162,7 @@ void PullDenseWorker::PullDense(bool force_update) {
uint64_t tid = static_cast<uint64_t>( uint64_t tid = static_cast<uint64_t>(
dwp_param_.program_config(0).pull_dense_table_id(i)); dwp_param_.program_config(0).pull_dense_table_id(i));
if (force_update || CheckUpdateParam(tid)) { if (force_update || CheckUpdateParam(tid)) {
#ifdef PADDLE_WITH_CUDA #if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_XPU)
VLOG(3) << "pull dense " << force_update << " " << tid; VLOG(3) << "pull dense " << force_update << " " << tid;
fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid], fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid],
&pull_dense_status_, false); &pull_dense_status_, false);
......
...@@ -138,7 +138,8 @@ class DistMultiTrainer : public MultiTrainer { ...@@ -138,7 +138,8 @@ class DistMultiTrainer : public MultiTrainer {
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_; std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
}; };
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB) #if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \
(defined PADDLE_WITH_PSLIB)
class HeterServiceContext { class HeterServiceContext {
public: public:
HeterServiceContext() {} HeterServiceContext() {}
...@@ -151,7 +152,9 @@ class HeterServiceContext { ...@@ -151,7 +152,9 @@ class HeterServiceContext {
void Reset() { push_dense_status_.clear(); } void Reset() { push_dense_status_.clear(); }
int place_num_; int place_num_;
Scope* scope_{nullptr}; Scope* scope_{nullptr};
#ifdef PADDLE_WITH_CUDA
cudaEvent_t event_; cudaEvent_t event_;
#endif
std::vector<OperatorBase*> ops_; std::vector<OperatorBase*> ops_;
std::vector<::std::future<int32_t>> push_dense_status_; std::vector<::std::future<int32_t>> push_dense_status_;
}; };
...@@ -178,10 +181,18 @@ class HeterXpuTrainer : public TrainerBase { ...@@ -178,10 +181,18 @@ class HeterXpuTrainer : public TrainerBase {
virtual void CacheProgram(const ProgramDesc& main_program) { virtual void CacheProgram(const ProgramDesc& main_program) {
new (&program_) ProgramDesc(main_program); new (&program_) ProgramDesc(main_program);
} }
virtual std::string GetDumpPath(int tid) { return ""; }
virtual void InitDumpEnv() {}
template <typename T> template <typename T>
#ifdef PADDLE_WITH_CUDA
void HeterMemCpy(LoDTensor* tensor, LoDTensor* root_tensor, void HeterMemCpy(LoDTensor* tensor, LoDTensor* root_tensor,
const paddle::platform::Place& thread_place, const paddle::platform::Place& thread_place,
cudaStream_t stream); cudaStream_t stream);
#endif
#ifdef PADDLE_WITH_XPU
void HeterMemCpy(LoDTensor* thread_tensor, LoDTensor* root_tensor,
const paddle::platform::Place& thread_place);
#endif
void CreateThreadParam(const ProgramDesc& program, int num); void CreateThreadParam(const ProgramDesc& program, int num);
template <typename T> template <typename T>
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor); void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
...@@ -207,9 +218,11 @@ class HeterXpuTrainer : public TrainerBase { ...@@ -207,9 +218,11 @@ class HeterXpuTrainer : public TrainerBase {
std::vector<std::string> op_names_; std::vector<std::string> op_names_;
std::vector<Scope*> place_scopes_; std::vector<Scope*> place_scopes_;
BtObjectPool<HeterServiceContext> object_pool_; BtObjectPool<HeterServiceContext> object_pool_;
std::vector<cudaStream_t> copy_streams_;
std::vector<platform::Place> places_; std::vector<platform::Place> places_;
#ifdef PADDLE_WITH_CUDA
std::vector<cudaStream_t> copy_streams_;
std::vector<cudaEvent_t> events_; std::vector<cudaEvent_t> events_;
#endif
}; };
#endif #endif
......
...@@ -63,7 +63,8 @@ std::shared_ptr<TrainerBase> TrainerFactory::CreateTrainer( ...@@ -63,7 +63,8 @@ std::shared_ptr<TrainerBase> TrainerFactory::CreateTrainer(
REGISTER_TRAINER_CLASS(MultiTrainer); REGISTER_TRAINER_CLASS(MultiTrainer);
REGISTER_TRAINER_CLASS(DistMultiTrainer); REGISTER_TRAINER_CLASS(DistMultiTrainer);
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB) #if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \
(defined PADDLE_WITH_PSLIB)
REGISTER_TRAINER_CLASS(HeterXpuTrainer); REGISTER_TRAINER_CLASS(HeterXpuTrainer);
#endif #endif
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL)
......
...@@ -1355,7 +1355,7 @@ class Executor(object): ...@@ -1355,7 +1355,7 @@ class Executor(object):
if not program._fleet_opt is None: if not program._fleet_opt is None:
if program._fleet_opt.get("worker_class", "") == "HeterCpuWorker": if program._fleet_opt.get("worker_class", "") == "HeterCpuWorker":
is_heter = 1 is_heter = 1
if program._fleet_opt("trainer", "") == "HeterXpuTrainer": if program._fleet_opt.get("trainer", "") == "HeterXpuTrainer":
is_heter = 1 is_heter = 1
if scope is None: if scope is None:
scope = global_scope() scope = global_scope()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册