From 6f69a4cb059119176f556a0aac0253d2899c6b59 Mon Sep 17 00:00:00 2001 From: Thunderbrook <52529258+Thunderbrook@users.noreply.github.com> Date: Fri, 25 Sep 2020 16:39:22 +0800 Subject: [PATCH] add xpu in heter mode (#27000) * add xpu in heter mode test=develop * BOOST_CONST_GET; PADDLE_THROW test=develop * code style test=develop * code style test=develop * code style test=develop * refine test=develop * refine test=develop * refine test=develop * refine code test=develop --- cmake/third_party.cmake | 8 +- paddle/fluid/framework/device_worker.h | 5 +- paddle/fluid/framework/fleet/fleet_wrapper.cc | 50 +++++++ paddle/fluid/framework/fleet/fleet_wrapper.h | 8 ++ paddle/fluid/framework/fleet/heter_wrapper.cc | 59 ++++++-- paddle/fluid/framework/heterxpu_trainer.cc | 127 ++++++++++++++++-- paddle/fluid/framework/pull_dense_worker.cc | 20 ++- paddle/fluid/framework/trainer.h | 17 ++- paddle/fluid/framework/trainer_factory.cc | 3 +- python/paddle/fluid/executor.py | 2 +- 10 files changed, 268 insertions(+), 31 deletions(-) diff --git a/cmake/third_party.cmake b/cmake/third_party.cmake index ffd32cc78f..1eb2096af9 100644 --- a/cmake/third_party.cmake +++ b/cmake/third_party.cmake @@ -270,6 +270,10 @@ if(WITH_PSLIB) endif() endif(WITH_PSLIB) +if(NOT WIN32 AND NOT APPLE) + include(external/gloo) + list(APPEND third_party_deps extern_gloo) +endif() if(WITH_BOX_PS) include(external/box_ps) @@ -277,10 +281,6 @@ if(WITH_BOX_PS) endif(WITH_BOX_PS) if(WITH_DISTRIBUTE) - if(WITH_GLOO) - include(external/gloo) - list(APPEND third_party_deps extern_gloo) - endif() if(WITH_GRPC) list(APPEND third_party_deps extern_grpc) diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index ee2ef9a0c3..f6f3098613 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -74,7 +74,9 @@ class PullDenseWorker { virtual void Initialize(const TrainerDesc& param); #ifdef PADDLE_WITH_CUDA void AddStream(const cudaStream_t stream) { copy_streams_.push_back(stream); } +#endif +#if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_XPU) void AddPlace(const paddle::platform::Place place) { places_.push_back(place); } @@ -135,9 +137,9 @@ class PullDenseWorker { #ifdef PADDLE_WITH_CUDA std::vector copy_streams_; +#endif std::vector places_; std::vector thread_scopes_; -#endif }; // should incorporate different type of device @@ -161,6 +163,7 @@ class DeviceWorker { virtual void SetDataFeed(DataFeed* data_feed); virtual void SetWorkerNum(int num) {} virtual void CacheProgram(const ProgramDesc& main_program) {} + virtual void GetXpuOpIndex() {} virtual void SetNeedDumpField(bool need_dump_field) { need_dump_field_ = need_dump_field; } diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 3c07680593..693073d1fc 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -745,7 +745,57 @@ void FleetWrapper::PushDenseVarsAsync( push_sparse_status->push_back(std::move(status)); } } +#endif + +#ifdef PADDLE_WITH_XPU +void FleetWrapper::PushDenseVarsAsync( + const Scope& scope, const uint64_t table_id, + const std::vector& var_names, + std::vector<::std::future>* push_sparse_status, + float scale_datanorm, int batch_size, + const paddle::platform::Place& place) { +#ifdef PADDLE_WITH_PSLIB + std::vector regions; + for (auto& t : var_names) { + Variable* var = scope.FindVar(t); + LoDTensor* tensor = var->GetMutable(); + int count = tensor->numel(); + float* g_data = tensor->data(); + + Variable* pin_var = scope.FindVar(t + "pin"); + LoDTensor* pin_tensor = pin_var->GetMutable(); + float* pin_g = + pin_tensor->mutable_data(tensor->dims(), platform::CPUPlace()); + memory::Copy(platform::CPUPlace(), pin_g, + BOOST_GET_CONST(platform::XPUPlace, place), g_data, + sizeof(float) * count); + + float* g = pin_g; + if (scale_datanorm >= 0) { + if (t.find(".batch_size@GRAD") != std::string::npos || + t.find(".batch_sum@GRAD") != std::string::npos) { + Eigen::Map mat(g, 1, count); + float scale = 1.0 / batch_size; + mat *= scale; + } else if (t.find(".batch_square_sum@GRAD") != std::string::npos) { + VLOG(3) << "epsilon: " << scale_datanorm; + for (int i = 0; i < count; ++i) { + g[i] = (g[i] - batch_size * scale_datanorm) / batch_size + + batch_size * scale_datanorm; + } + } + } + paddle::ps::Region reg(g, count); + regions.emplace_back(std::move(reg)); + } + auto status = pslib_ptr_->_worker_ptr->push_dense(regions.data(), + regions.size(), table_id); + if (push_sparse_status) { + push_sparse_status->push_back(std::move(status)); + } +#endif +} #endif void FleetWrapper::PushDenseVarsAsync( const Scope& scope, const uint64_t table_id, diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index be87bdf1e7..ae86835f38 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -160,6 +160,14 @@ class FleetWrapper { float scale_datanorm, int batch_size, const paddle::platform::Place& place, cudaStream_t stream, cudaEvent_t event); +#endif +#ifdef PADDLE_WITH_XPU + void PushDenseVarsAsync( + const Scope& scope, const uint64_t table_id, + const std::vector& var_names, + std::vector<::std::future>* push_sparse_status, + float scale_datanorm, int batch_size, + const paddle::platform::Place& place); #endif void PushDenseVarsAsync( const Scope& scope, const uint64_t table_id, diff --git a/paddle/fluid/framework/fleet/heter_wrapper.cc b/paddle/fluid/framework/fleet/heter_wrapper.cc index 7a27b6a9d7..8e232560ab 100644 --- a/paddle/fluid/framework/fleet/heter_wrapper.cc +++ b/paddle/fluid/framework/fleet/heter_wrapper.cc @@ -113,30 +113,66 @@ void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope, if (platform::is_cpu_place(tensor->place())) { memcpy(data_ptr, tensor->data(), tensor->numel() * SizeOfType(tensor->type())); -#ifdef PADDLE_WITH_CUDA } else { +#ifdef PADDLE_WITH_CUDA memory::Copy(platform::CPUPlace(), data_ptr, BOOST_GET_CONST(platform::CUDAPlace, tensor->place()), tensor->data(), tensor->numel() * SizeOfType(tensor->type()), nullptr); - } -#else - } #endif +#ifdef PADDLE_WITH_XPU + memory::Copy(platform::CPUPlace(), data_ptr, + BOOST_GET_CONST(platform::XPUPlace, tensor->place()), + tensor->data(), + tensor->numel() * SizeOfType(tensor->type())); +#endif + } } -// void HeterWrapper::DeSerializeToTensor(Scope* scope, -// const HeterRequest* request) { #ifdef PADDLE_WITH_CUDA void HeterWrapper::DeSerializeToTensor(Scope* scope, const VariableMessage& req_var, platform::Place place, cudaStream_t stream) { + // const VariableMessage& req_var = request->vars(); + auto* var = scope->FindVar(req_var.varname()); + auto* tensor = var->GetMutable(); + + std::vector vec_dim; + for (auto& x : req_var.dims()) { + vec_dim.push_back(x); + } + tensor->Resize(make_ddim(vec_dim)); + + LoD lod; + for (int i = 0; i < req_var.lod_level(); ++i) { + framework::Vector v; + for (int j = 0; j < req_var.lod(i).lod_data_size(); ++j) { + v.push_back(req_var.lod(i).lod_data(j)); + } + lod.push_back(v); + } + tensor->set_lod(lod); + + void* tensor_data = + tensor->mutable_data(place, ToVarType(req_var.data_type())); + +#ifdef PADDLE_WITH_CUDA + memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place), tensor_data, + platform::CPUPlace(), req_var.data().data(), + tensor->numel() * SizeOfType(tensor->type()), stream); #else + memcpy(tensor_data, req_var.data().data(), + tensor->numel() * SizeOfType(tensor->type())); +#endif +} +#endif + +// void HeterWrapper::DeSerializeToTensor(Scope* scope, +// const HeterRequest* request) { void HeterWrapper::DeSerializeToTensor(Scope* scope, const VariableMessage& req_var, platform::Place place) { -#endif // const VariableMessage& req_var = request->vars(); auto* var = scope->FindVar(req_var.varname()); auto* tensor = var->GetMutable(); @@ -160,10 +196,10 @@ void HeterWrapper::DeSerializeToTensor(Scope* scope, void* tensor_data = tensor->mutable_data(place, ToVarType(req_var.data_type())); -#ifdef PADDLE_WITH_CUDA - memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place), tensor_data, +#ifdef PADDLE_WITH_XPU + memory::Copy(BOOST_GET_CONST(platform::XPUPlace, place), tensor_data, platform::CPUPlace(), req_var.data().data(), - tensor->numel() * SizeOfType(tensor->type()), stream); + tensor->numel() * SizeOfType(tensor->type())); #else memcpy(tensor_data, req_var.data().data(), tensor->numel() * SizeOfType(tensor->type())); @@ -184,7 +220,8 @@ framework::proto::VarType::Type HeterWrapper::ToVarType( case VariableMessage::BOOL: return framework::proto::VarType::BOOL; // NOLINT default: - VLOG(0) << "Not support type " << type; + PADDLE_THROW(platform::errors::InvalidArgument( + "ToVarType:Unsupported type %d", type)); } } diff --git a/paddle/fluid/framework/heterxpu_trainer.cc b/paddle/fluid/framework/heterxpu_trainer.cc index fbed74800b..6bbbaacdde 100644 --- a/paddle/fluid/framework/heterxpu_trainer.cc +++ b/paddle/fluid/framework/heterxpu_trainer.cc @@ -12,9 +12,21 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB) +#include +#include +#include +#include +#include "io/fs.h" +#include "paddle/fluid/framework/data_feed_factory.h" +#include "paddle/fluid/framework/data_set.h" +#include "paddle/fluid/framework/device_worker_factory.h" +#include "paddle/fluid/framework/fleet/fleet_wrapper.h" +#include "paddle/fluid/framework/trainer.h" +#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \ + (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_CUDA #include "paddle/fluid/platform/cuda_device_guard.h" - +#endif namespace paddle { namespace framework { @@ -34,6 +46,7 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc, int place_num = trainer_desc.worker_places_size(); for (int i = 0; i < place_num; ++i) { int num = trainer_desc.worker_places(i); +#ifdef PADDLE_WITH_CUDA platform::CUDAPlace place = platform::CUDAPlace(num); platform::CUDADeviceGuard guard(place.device); cudaStream_t stream; @@ -44,6 +57,11 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc, PADDLE_ENFORCE_CUDA_SUCCESS( cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); events_.push_back(event); +#endif +#ifdef PADDLE_WITH_XPU + platform::XPUPlace place = platform::XPUPlace(num); + places_.push_back(place); +#endif } // thread_num_ = trainer_desc.thread_num(); // SetDataset(dataset); @@ -95,11 +113,17 @@ void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc, void HeterXpuTrainer::CreateThreadParam(const ProgramDesc& program, int num) { auto place = places_[num]; Scope* scope = place_scopes_[num]; +#ifdef PADDLE_WITH_CUDA auto stream = copy_streams_[num]; auto event = events_[num]; - auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device; platform::CUDADeviceGuard guard(dev_id); +#endif + +#ifdef PADDLE_WITH_XPU + xpu_set_device(BOOST_GET_CONST(platform::XPUPlace, place).device); +#endif + auto& block = program.Block(0); for (auto& var : block.AllVars()) { if (var->Persistable()) { @@ -116,13 +140,28 @@ void HeterXpuTrainer::CreateThreadParam(const ProgramDesc& program, int num) { HeterMemCpy(thread_tensor, root_tensor, place, stream); \ } \ } while (0) + +#define HeterMemcpyXpuFunc(cpp_type, proto_type) \ + do { \ + if (root_tensor->type() == proto_type) { \ + HeterMemCpy(thread_tensor, root_tensor, place); \ + } \ + } while (0) +#ifdef PADDLE_WITH_CUDA _ForEachDataType_(HeterMemcpyFunc); +#endif +#ifdef PADDLE_WITH_XPU + _ForEachDataType_(HeterMemcpyXpuFunc); +#endif } } +#ifdef PADDLE_WITH_CUDA PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream)); cudaEventSynchronize(event); +#endif } +#ifdef PADDLE_WITH_CUDA template void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor, LoDTensor* root_tensor, @@ -141,6 +180,27 @@ void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor, root_ptr, sizeof(T) * root_tensor->numel(), stream); } } +#endif + +#ifdef PADDLE_WITH_XPU +template +void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor, + LoDTensor* root_tensor, + const paddle::platform::Place& thread_place) { + T* thread_ptr = + thread_tensor->mutable_data(root_tensor->dims(), thread_place); + T* root_ptr = root_tensor->data(); + if (platform::is_cpu_place(root_tensor->place())) { + memory::Copy(BOOST_GET_CONST(platform::XPUPlace, thread_place), thread_ptr, + platform::CPUPlace(), root_ptr, + sizeof(T) * root_tensor->numel()); + } else { + memory::Copy(BOOST_GET_CONST(platform::XPUPlace, thread_place), thread_ptr, + BOOST_GET_CONST(platform::XPUPlace, root_tensor->place()), + root_ptr, sizeof(T) * root_tensor->numel()); + } +} +#endif void HeterXpuTrainer::DumpWork(int tid) {} @@ -171,13 +231,16 @@ void HeterXpuTrainer::InitOtherEnv(const ProgramDesc& main_program) { CreateThreadParam(main_program, i); pull_dense_worker_->AddThreadScope(scope); pull_dense_worker_->AddPlace(places_[i]); +#ifdef PADDLE_WITH_CUDA pull_dense_worker_->AddStream(copy_streams_[i]); +#endif } - pull_dense_worker_->Start(); +#ifdef PADDLE_WITH_CUDA for (auto& stream : copy_streams_) { cudaStreamSynchronize(stream); } +#endif op_names_.clear(); for (auto& op_desc : block.AllOps()) { std::unique_ptr local_op = OpRegistry::CreateOp(*op_desc); @@ -220,10 +283,12 @@ void HeterXpuTrainer::InitOtherEnv(const ProgramDesc& main_program) { OperatorBase* local_op_ptr = local_op.release(); (context->ops_).push_back(local_op_ptr); } +#ifdef PADDLE_WITH_CUDA auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device; platform::CUDADeviceGuard guard(dev_id); PADDLE_ENFORCE_CUDA_SUCCESS( cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming)); +#endif object_pool_.Push(context); } } @@ -267,12 +332,25 @@ int HeterXpuTrainer::EndPass(const HeterRequest* request, } \ } while (0) _ForEachDataType_(MergeCallback); - if (platform::is_gpu_place(thread_tensor->place())) { + if (!platform::is_cpu_place(thread_tensor->place())) { +#ifdef PADDLE_WITH_CUDA auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, thread_tensor->place()).device; platform::CUDADeviceGuard guard(dev_id); cudaMemset(thread_tensor->data(), 0, thread_tensor->numel() * SizeOfType(thread_tensor->type())); +#endif +#ifdef PADDLE_WITH_XPU + auto place = thread_tensor->place(); + xpu_set_device(BOOST_GET_CONST(platform::XPUPlace, place).device); + platform::DeviceContextPool& pool = + platform::DeviceContextPool::Instance(); + platform::DeviceContext* dev_ctx = pool.Get(place); + const platform::XPUDeviceContext* xpu_ctx = + reinterpret_cast(dev_ctx); + xpu::memset(xpu_ctx->x_context(), thread_tensor->data(), 0, + thread_tensor->numel() * SizeOfType(thread_tensor->type())); +#endif } else { memset(thread_tensor->data(), 0, thread_tensor->numel() * SizeOfType(thread_tensor->type())); @@ -281,12 +359,25 @@ int HeterXpuTrainer::EndPass(const HeterRequest* request, auto* merge_var = response->add_vars(); heter_ptr_->SerializeToReq(need_merge_var_names_[i], root_scope_, merge_var); - if (platform::is_gpu_place(root_tensor->place())) { + if (!platform::is_cpu_place(root_tensor->place())) { +#ifdef PADDLE_WITH_CUDA auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()).device; platform::CUDADeviceGuard guard(dev_id); cudaMemset(root_tensor->data(), 0, root_tensor->numel() * SizeOfType(root_tensor->type())); +#endif +#ifdef PADDLE_WITH_XPU + auto place = root_tensor->place(); + xpu_set_device(BOOST_GET_CONST(platform::XPUPlace, place).device); + platform::DeviceContextPool& pool = + platform::DeviceContextPool::Instance(); + platform::DeviceContext* dev_ctx = pool.Get(place); + const platform::XPUDeviceContext* xpu_ctx = + reinterpret_cast(dev_ctx); + xpu::memset(xpu_ctx->x_context(), root_tensor->data(), 0, + root_tensor->numel() * SizeOfType(root_tensor->type())); +#endif } else { memset(root_tensor->data(), 0, root_tensor->numel() * SizeOfType(root_tensor->type())); @@ -346,11 +437,12 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, OperatorBase* local_op_ptr = local_op.release(); (context->ops_).push_back(local_op_ptr); } - +#ifdef PADDLE_WITH_CUDA auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device; platform::CUDADeviceGuard guard(dev_id); PADDLE_ENFORCE_CUDA_SUCCESS( cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming)); +#endif } context->Reset(); @@ -359,15 +451,22 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, auto deserial_timer = std::make_shared("xpu_service_deserial"); for (int i = 0; i < request->vars_size(); ++i) { +#ifdef PADDLE_WITH_CUDA heter_ptr_->DeSerializeToTensor(context->scope_, request->vars(i), place, copy_streams_[context->place_num_]); +#endif +#ifdef PADDLE_WITH_XPU + heter_ptr_->DeSerializeToTensor(context->scope_, request->vars(i), place); +#endif } +#ifdef PADDLE_WITH_CUDA PADDLE_ENFORCE_CUDA_SUCCESS( cudaEventRecord(context->event_, copy_streams_[context->place_num_])); while (cudaEventQuery(context->event_) != cudaSuccess) { VLOG(3) << "wait for kernel"; bthread_yield(); } +#endif } { @@ -378,6 +477,7 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, op->Run(*(context->scope_), place); } } +#ifdef PADDLE_WITH_CUDA auto* dev_ctx = static_cast( platform::DeviceContextPool::Instance().Get(place)); PADDLE_ENFORCE_CUDA_SUCCESS( @@ -391,6 +491,10 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, bthread_yield(); } } +#endif +#ifdef PADDLE_WITH_XPU + xpu_wait(); +#endif for (int i = 0; i < trainer_desc_.xpu_send_list_size(); ++i) { const std::string& varname = trainer_desc_.xpu_send_list(i); @@ -407,11 +511,19 @@ int HeterXpuTrainer::RunTask(const HeterRequest* request, ++i) { uint64_t tid = static_cast(param_.program_config(0).push_dense_table_id(i)); +#ifdef PADDLE_WITH_CUDA fleet_ptr_->PushDenseVarsAsync( *(context->scope_), tid, dense_grad_names_[tid], &(context->push_dense_status_), scale_datanorm_, request->cur_batch(), places_[context->place_num_], copy_streams_[context->place_num_], context->event_); +#endif +#ifdef PADDLE_WITH_XPU + fleet_ptr_->PushDenseVarsAsync( + *(context->scope_), tid, dense_grad_names_[tid], + &(context->push_dense_status_), scale_datanorm_, request->cur_batch(), + places_[context->place_num_]); +#endif } for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); ++i) { @@ -453,7 +565,6 @@ void HeterXpuTrainer::Finalize() { pull_dense_worker_->Stop(); root_scope_->DropKids(); } - } // namespace framework } // namespace paddle #endif diff --git a/paddle/fluid/framework/pull_dense_worker.cc b/paddle/fluid/framework/pull_dense_worker.cc index c399c5d02e..6aeef8a39b 100644 --- a/paddle/fluid/framework/pull_dense_worker.cc +++ b/paddle/fluid/framework/pull_dense_worker.cc @@ -62,13 +62,15 @@ void PullDenseWorker::Initialize(const TrainerDesc& param) { fleet_ptr_ = FleetWrapper::GetInstance(); #ifdef PADDLE_WITH_CUDA copy_streams_.clear(); +#endif +#if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_XPU) places_.clear(); thread_scopes_.clear(); #endif } void PullDenseWorker::CreatePinVar() { -#ifdef PADDLE_WITH_CUDA +#if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_PSLIB) // for (auto& v : dense_value_names_) { // for (auto& name : v.second) { for (int i = 0; i < dwp_param_.program_config(0).pull_dense_table_id_size(); @@ -83,8 +85,13 @@ void PullDenseWorker::CreatePinVar() { auto* ptr = root_scope_->Var(name + "pin"); InitializeVariable(ptr, proto::VarType::LOD_TENSOR); LoDTensor* pin_tensor = ptr->GetMutable(); +#ifdef PADDLE_WITH_CUDA pin_tensor->mutable_data(tensor->dims(), platform::CUDAPinnedPlace()); +#endif +#ifdef PADDLE_WITH_XPU + pin_tensor->mutable_data(tensor->dims(), platform::CPUPlace()); +#endif } } #endif @@ -107,7 +114,7 @@ void PullDenseWorker::Wait(std::vector<::std::future>* status_vec) { exit(-1); } status_vec->resize(0); -#ifdef PADDLE_WITH_CUDA +#if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_XPU) for (size_t i = 0; i < places_.size(); ++i) { // for (auto& v : dense_value_names_) { @@ -125,9 +132,16 @@ void PullDenseWorker::Wait(std::vector<::std::future>* status_vec) { Variable* var = thread_scopes_[i]->FindVar(name); LoDTensor* tensor = var->GetMutable(); float* w = tensor->data(); +#ifdef PADDLE_WITH_CUDA memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, places_[i]), w, platform::CUDAPinnedPlace(), pin_w, sizeof(float) * tensor->numel(), copy_streams_[i]); +#endif +#ifdef PADDLE_WITH_XPU + memory::Copy(BOOST_GET_CONST(platform::XPUPlace, places_[i]), w, + platform::CPUPlace(), pin_w, + sizeof(float) * tensor->numel()); +#endif } } } @@ -148,7 +162,7 @@ void PullDenseWorker::PullDense(bool force_update) { uint64_t tid = static_cast( dwp_param_.program_config(0).pull_dense_table_id(i)); if (force_update || CheckUpdateParam(tid)) { -#ifdef PADDLE_WITH_CUDA +#if (defined PADDLE_WITH_CUDA) || (defined PADDLE_WITH_XPU) VLOG(3) << "pull dense " << force_update << " " << tid; fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid], &pull_dense_status_, false); diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index d041ef48e2..ecaec49aa4 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -138,7 +138,8 @@ class DistMultiTrainer : public MultiTrainer { std::shared_ptr pull_dense_worker_; }; -#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB) +#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \ + (defined PADDLE_WITH_PSLIB) class HeterServiceContext { public: HeterServiceContext() {} @@ -151,7 +152,9 @@ class HeterServiceContext { void Reset() { push_dense_status_.clear(); } int place_num_; Scope* scope_{nullptr}; +#ifdef PADDLE_WITH_CUDA cudaEvent_t event_; +#endif std::vector ops_; std::vector<::std::future> push_dense_status_; }; @@ -178,10 +181,18 @@ class HeterXpuTrainer : public TrainerBase { virtual void CacheProgram(const ProgramDesc& main_program) { new (&program_) ProgramDesc(main_program); } + virtual std::string GetDumpPath(int tid) { return ""; } + virtual void InitDumpEnv() {} template +#ifdef PADDLE_WITH_CUDA void HeterMemCpy(LoDTensor* tensor, LoDTensor* root_tensor, const paddle::platform::Place& thread_place, cudaStream_t stream); +#endif +#ifdef PADDLE_WITH_XPU + void HeterMemCpy(LoDTensor* thread_tensor, LoDTensor* root_tensor, + const paddle::platform::Place& thread_place); +#endif void CreateThreadParam(const ProgramDesc& program, int num); template void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor); @@ -207,9 +218,11 @@ class HeterXpuTrainer : public TrainerBase { std::vector op_names_; std::vector place_scopes_; BtObjectPool object_pool_; - std::vector copy_streams_; std::vector places_; +#ifdef PADDLE_WITH_CUDA + std::vector copy_streams_; std::vector events_; +#endif }; #endif diff --git a/paddle/fluid/framework/trainer_factory.cc b/paddle/fluid/framework/trainer_factory.cc index 15584620d8..cc92c50cc4 100644 --- a/paddle/fluid/framework/trainer_factory.cc +++ b/paddle/fluid/framework/trainer_factory.cc @@ -63,7 +63,8 @@ std::shared_ptr TrainerFactory::CreateTrainer( REGISTER_TRAINER_CLASS(MultiTrainer); REGISTER_TRAINER_CLASS(DistMultiTrainer); -#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB) +#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \ + (defined PADDLE_WITH_PSLIB) REGISTER_TRAINER_CLASS(HeterXpuTrainer); #endif #if defined(PADDLE_WITH_NCCL) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 2e3f34f416..3dc30767e5 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1355,7 +1355,7 @@ class Executor(object): if not program._fleet_opt is None: if program._fleet_opt.get("worker_class", "") == "HeterCpuWorker": is_heter = 1 - if program._fleet_opt("trainer", "") == "HeterXpuTrainer": + if program._fleet_opt.get("trainer", "") == "HeterXpuTrainer": is_heter = 1 if scope is None: scope = global_scope() -- GitLab