diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index c06260b72e6ee73c1e137e619aed43fc2b7a6532..cb7b16a0cfb081083786afd9354c4e0a3b19d4e2 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -261,7 +261,7 @@ if(WITH_DISTRIBUTE) dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc heterxpu_trainer.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc - heterbox_worker.cc heterbox_trainer.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc + ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto trainer_desc_proto glog fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer @@ -282,7 +282,7 @@ if(WITH_DISTRIBUTE) dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc heterxpu_trainer.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc - heterbox_worker.cc heterbox_trainer.cc downpour_worker.cc downpour_worker_opt.cc + downpour_worker.cc downpour_worker_opt.cc pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method @@ -296,7 +296,7 @@ if(WITH_DISTRIBUTE) dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc heterxpu_trainer.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc - heterbox_worker.cc heterbox_trainer.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc + ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method @@ -316,7 +316,7 @@ elseif(WITH_PSLIB) dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc heterxpu_trainer.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc - heterbox_worker.cc heterbox_trainer.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc + ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method @@ -326,7 +326,7 @@ else() dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc heterxpu_trainer.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc - heterbox_worker.cc heterbox_trainer.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc + ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index db83cd55889c43feadaab2dd4170b5e90d117435..b40099542cfd5d0f1475e16f3be977b45fbd7144 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -444,107 +444,6 @@ class HeterCpuWorker : public HogwildWorker { }; #endif -#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_HIP || \ - defined PADDLE_WITH_XPU) && \ - (defined PADDLE_WITH_PSLIB) -class HeterBoxWorker : public HogwildWorker { - public: - HeterBoxWorker() {} - virtual ~HeterBoxWorker() {} - virtual void Initialize(const TrainerDesc& desc); - virtual void TrainFiles(); - virtual void SetNeedDump(bool need_dump_field); - virtual void SetChannelWriter(ChannelObject* queue); - virtual void SetWorkerNum(int num) { worker_num_ = num; } - virtual void CacheProgram(const ProgramDesc& main_program) { - new (&program_) ProgramDesc(main_program); - } - void ProduceTasks() override; - virtual void SetStream(const gpuStream_t stream) { copy_stream_ = stream; } - virtual void SetEvent(const gpuEvent_t event) { event_ = event; } - virtual void TrainFilesWithProfiler() {} - void ResetStat(); - - protected: - std::shared_ptr fleet_ptr_; - void FillSparseValue(std::shared_ptr task, size_t table_id); - void PushGradients(); - void CollectLabelInfo(std::shared_ptr task, size_t table_id); - void AdjustInsWeight(std::shared_ptr task); - void DumpParam(); - void CopySparseTable(); - void CopyDenseTable(); - void CopyDenseVars(); - - private: - int mpi_rank_; - std::mutex mutex_; - std::vector send_var_list_; - int worker_num_; - ProgramDesc program_; - HeterObjectPool object_pool_; - bool need_dump_param_; - std::vector dump_param_; - bool need_to_push_dense_; - bool need_dump_field_; - bool dump_slot_; - bool need_to_push_sparse_; - std::vector dump_fields_; - ChannelWriter writer_; - DownpourWorkerParameter param_; - float scale_datanorm_; - // just save the value in param_ for easy access - std::map label_var_name_; - std::map> sparse_key_names_; - std::map> sparse_value_names_; - std::map> sparse_grad_names_; - std::map> dense_value_names_; - std::map> dense_grad_names_; - platform::Place root_place_; - // actually pushed feasign of each table - std::map> sparse_push_keys_; - - // skipped ops - std::vector skip_ops_; - - std::vector<::std::future> push_sparse_status_; - std::vector<::std::future> push_dense_status_; - - // adjust ins weight - AdjustInsWeightConfig adjust_ins_weight_config_; - std::vector nid_show_; - // check nan and inf during training - std::vector check_nan_var_names_; - // copy table - CopyTableConfig copy_table_config_; - std::map table_dependency_; - std::vector> copy_sparse_tables_; - std::vector> copy_dense_tables_; - std::unordered_map> feasign_set_; - paddle::framework::Channel> pull_queue_; - paddle::framework::Channel> push_queue_; - gpuEvent_t event_; - gpuStream_t copy_stream_; - int batch_cnt_{0}; - std::atomic done_cnt_{0}; - - double total_time_; - double read_time_; - double pack_time_; - double pull_sparse_local_time_; - double op_all_time_; - double xpu_op_time_; - double xpu_wait_time_; - double cpu_op_time_; - double collect_label_time_; - double fill_sparse_time_; - double push_sparse_time_; - double gpu_2_cpu_time_; - double cpu_2_gpu_time_; - uint64_t total_inst_; -}; -#endif - #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ (defined PADDLE_WITH_PSLIB) class PSGPUWorker : public HogwildWorker { diff --git a/paddle/fluid/framework/heterbox_trainer.cc b/paddle/fluid/framework/heterbox_trainer.cc deleted file mode 100644 index 1f6dc39ae851dfa5dc4790c4a3994a19981be3e0..0000000000000000000000000000000000000000 --- a/paddle/fluid/framework/heterbox_trainer.cc +++ /dev/null @@ -1,275 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include -#include -#include -#include "io/fs.h" -#include "paddle/fluid/framework/data_feed_factory.h" -#include "paddle/fluid/framework/data_set.h" -#include "paddle/fluid/framework/device_worker_factory.h" -#include "paddle/fluid/framework/fleet/fleet_wrapper.h" -#include "paddle/fluid/framework/trainer.h" -#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_HIP || \ - defined PADDLE_WITH_XPU) && \ - (defined PADDLE_WITH_PSLIB) -#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) -#include "paddle/fluid/platform/cuda_device_guard.h" -#endif -namespace paddle { -namespace framework { - -void HeterBoxTrainer::Initialize(const TrainerDesc& trainer_desc, - Dataset* dataset) { - thread_num_ = trainer_desc.thread_num(); - param_ = trainer_desc.downpour_param(); - for (int i = 0; i < param_.dense_table_size(); ++i) { - uint64_t table_id = static_cast(param_.dense_table(i).table_id()); - auto table = param_.dense_table(i); - dense_grad_names_[table_id].resize(table.dense_grad_name_size()); - for (int j = 0; j < table.dense_grad_name_size(); ++j) { - dense_grad_names_[table_id][j] = table.dense_grad_name(j); - } - } - RegisterHeterCallback(); - scale_datanorm_ = trainer_desc.scale_datanorm(); - int place_num = trainer_desc.worker_places_size(); - const std::vector readers = - dataset->GetReaders(); - for (int i = 0; i < place_num; ++i) { - int num = trainer_desc.worker_places(i); -#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - platform::CUDAPlace place = platform::CUDAPlace(num); - platform::CUDADeviceGuard guard(place.device); - gpuStream_t stream; -#ifdef PADDLE_WITH_HIP - PADDLE_ENFORCE_CUDA_SUCCESS(hipStreamCreate(&stream)); -#else - PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&stream)); -#endif - copy_streams_.push_back(stream); - places_.push_back(place); - gpuEvent_t event; -#ifdef PADDLE_WITH_HIP - PADDLE_ENFORCE_CUDA_SUCCESS( - hipEventCreateWithFlags(&event, hipEventDisableTiming)); -#else - PADDLE_ENFORCE_CUDA_SUCCESS( - cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); -#endif - events_.push_back(event); -#endif -#ifdef PADDLE_WITH_XPU - platform::XPUPlace place = platform::XPUPlace(num); - places_.push_back(place); -#endif - } - for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size(); - i++) { - need_merge_var_names_.push_back( - trainer_desc.downpour_param().stat_var_names(i)); - } - VLOG(3) << "going to initialize pull dense worker"; - pull_dense_worker_ = PullDenseWorker::GetInstance(); - pull_dense_worker_->Initialize(trainer_desc); - VLOG(3) << "initialize pull dense worker"; - SetDebug(trainer_desc.debug()); - fleet_ptr_ = FleetWrapper::GetInstance(); - trainer_desc_ = trainer_desc; - workers_.resize(place_num); - for (int i = 0; i < place_num; ++i) { - workers_[i] = DeviceWorkerFactory::CreateDeviceWorker( - trainer_desc.device_worker_name()); - workers_[i]->SetDeviceIndex(i); - workers_[i]->SetDataFeed(readers[i]); - workers_[i]->Initialize(trainer_desc); - workers_[i]->SetWorkerNum(place_num); - } -} - -void HeterBoxTrainer::DumpWork(int tid) {} - -void HeterBoxTrainer::RegisterHeterCallback() { - auto fleet_ptr = FleetWrapper::GetInstance(); - fleet_ptr->RegisterHeterCallback([this](int worker, int taskid) { - // workers_[worker]->Schedule(taskid); - }); -} - -void HeterBoxTrainer::InitTrainerEnv(const ProgramDesc& main_program, - const platform::Place& place) { - for (size_t i = 0; i < places_.size(); ++i) { - workers_[i]->SetPlace(places_[i]); - workers_[i]->SetStream(copy_streams_[i]); - workers_[i]->SetEvent(events_[i]); - workers_[i]->SetReaderPlace(platform::CPUPlace()); - workers_[i]->SetRootScope(root_scope_); - workers_[i]->CreateDeviceResource(main_program); // Program - workers_[i]->BindingDataFeedMemory(); -#ifdef PADDLE_WITH_PSLIB - workers_[i]->CacheProgram(main_program); -#endif - } - for (size_t num = 0; num < places_.size(); ++num) { - auto place = places_[num]; - Scope* scope = workers_[num]->GetThreadScope(); - auto stream = copy_streams_[num]; - auto event = events_[num]; - auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device; - platform::CUDADeviceGuard guard(dev_id); - auto& block = main_program.Block(0); - for (auto& var : block.AllVars()) { - if (var->Persistable()) { - auto name = var->Name(); - Variable* root_var = root_scope_->FindVar(name); - if (!root_var) { - continue; - } - LoDTensor* root_tensor = root_var->GetMutable(); - auto* ptr = scope->Var(name); - InitializeVariable(ptr, proto::VarType::LOD_TENSOR); - LoDTensor* thread_tensor = ptr->GetMutable(); - -#define HeterMemcpyFunc(cpp_type, proto_type) \ - do { \ - if (root_tensor->type() == proto_type) { \ - HeterMemCpy(thread_tensor, root_tensor, place, stream); \ - } \ - } while (0) - _ForEachDataType_(HeterMemcpyFunc); - } - } -#ifdef PADDLE_WITH_HIP - PADDLE_ENFORCE_CUDA_SUCCESS(hipEventRecord(event, stream)); - hipEventSynchronize(event); -#else - PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream)); - cudaEventSynchronize(event); -#endif - } - place_ = place; -} - -template -void HeterBoxTrainer::HeterMemCpy(LoDTensor* thread_tensor, - LoDTensor* root_tensor, - const paddle::platform::Place& thread_place, - gpuStream_t stream) { - T* thread_ptr = - thread_tensor->mutable_data(root_tensor->dims(), thread_place); - T* root_ptr = root_tensor->data(); - if (platform::is_cpu_place(root_tensor->place())) { - memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, thread_place), thread_ptr, - platform::CPUPlace(), root_ptr, - sizeof(T) * root_tensor->numel(), stream); - } else { - memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, thread_place), thread_ptr, - BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()), - root_ptr, sizeof(T) * root_tensor->numel(), stream); - } -} - -void HeterBoxTrainer::InitOtherEnv(const ProgramDesc& main_program) { - pull_dense_worker_->SetRootScope(root_scope_); - pull_dense_worker_->CreatePinVar(); - for (size_t i = 0; i < places_.size(); ++i) { - pull_dense_worker_->AddThreadScope(workers_[i]->GetThreadScope()); - pull_dense_worker_->AddPlace(places_[i]); -#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - pull_dense_worker_->AddStream(copy_streams_[i]); -#endif - } - VLOG(3) << "init other env done."; -} - -void HeterBoxTrainer::Run() { - int pull_thread_num = 3 * places_.size(); - for (size_t thidx = 0; thidx < places_.size(); ++thidx) { - workers_[thidx]->device_reader_->Start(); - std::dynamic_pointer_cast( - workers_[thidx]) - ->ResetStat(); - } - for (int i = 0; i < pull_thread_num; ++i) { - int worker_id = i % places_.size(); - pull_threads_.push_back( - std::thread(&DeviceWorker::ProduceTasks, workers_[worker_id].get())); - } - for (size_t thidx = 0; thidx < places_.size(); ++thidx) { - threads_.push_back( - std::thread(&DeviceWorker::TrainFiles, workers_[thidx].get())); - } -} - -template -void HeterBoxTrainer::MergeToRootScope(LoDTensor* root_tensor, - LoDTensor* tensor) { - LoDTensor tmp_root; - TensorCopy(*root_tensor, platform::CPUPlace(), &tmp_root); - T* tmp_root_data = tmp_root.data(); - LoDTensor tmp_tensor; - TensorCopy(*tensor, platform::CPUPlace(), &tmp_tensor); - T* data = tmp_tensor.data(); - for (int i = 0; i < tmp_tensor.numel(); i++) { - tmp_root_data[i] += data[i]; - } - TensorCopy(tmp_root, platform::CPUPlace(), root_tensor); -} - -Scope* HeterBoxTrainer::GetWorkerScope(int thread_id) { return nullptr; } - -void HeterBoxTrainer::Finalize() { - for (auto& th : pull_threads_) { - th.join(); - } - for (auto& th : threads_) { - th.join(); - } - for (size_t i = 0; i < need_merge_var_names_.size(); i++) { - Variable* root_var = root_scope_->FindVar(need_merge_var_names_[i]); - if (root_var == nullptr) { - continue; - } - LoDTensor* root_tensor = root_var->GetMutable(); - - for (size_t j = 0; j < places_.size(); j++) { - Scope* cur_thread_scope = workers_[j]->GetThreadScope(); - Variable* thread_var = - cur_thread_scope->FindVar(need_merge_var_names_[i]); - if (thread_var == nullptr) { - continue; - } - LoDTensor* thread_tensor = thread_var->GetMutable(); -#define MergeCallback(cpp_type, proto_type) \ - do { \ - if (root_tensor->type() == proto_type) { \ - if (thread_tensor->type() != proto_type) { \ - VLOG(0) << "Error: thread id=" << j << ", need_merge_var_names_[" << i \ - << "] " << need_merge_var_names_[i] \ - << ", root tensor type=" << root_tensor->type() \ - << ", thread tensor type=" << thread_tensor->type(); \ - exit(-1); \ - } \ - MergeToRootScope(root_tensor, thread_tensor); \ - } \ - } while (0) - _ForEachDataType_(MergeCallback); - } - } - pull_dense_worker_->MergeDenseParam(); - root_scope_->DropKids(); -} -} // namespace framework -} // namespace paddle -#endif diff --git a/paddle/fluid/framework/heterbox_worker.cc b/paddle/fluid/framework/heterbox_worker.cc deleted file mode 100644 index b7df88218cbd4dd9018e49d709922cca3b287678..0000000000000000000000000000000000000000 --- a/paddle/fluid/framework/heterbox_worker.cc +++ /dev/null @@ -1,753 +0,0 @@ -/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/fluid/framework/device_worker.h" -#include "paddle/fluid/framework/device_worker_factory.h" -#include "paddle/fluid/framework/fleet/fleet_wrapper.h" -#include "paddle/fluid/framework/heter_util.h" -#include "paddle/fluid/platform/cpu_helper.h" -#include "paddle/fluid/string/string_helper.h" - -#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \ - (defined PADDLE_WITH_PSLIB) -#include "paddle/fluid/platform/cuda_device_guard.h" - -#if defined _WIN32 || defined __APPLE__ -#else -#define _LINUX -#endif - -namespace paddle { -namespace framework { - -void HeterBoxWorker::Initialize(const TrainerDesc& desc) { - param_ = desc.downpour_param(); - mpi_rank_ = desc.mpi_rank(); - trainer_desc_ = desc; - for (int i = 0; i < trainer_desc_.xpu_recv_list_size(); ++i) { - send_var_list_.push_back(trainer_desc_.xpu_recv_list(i)); - } - for (int i = 0; i < param_.sparse_table_size(); ++i) { - uint64_t table_id = - static_cast(param_.sparse_table(i).table_id()); - TableParameter table = param_.sparse_table(i); - sparse_key_names_[table_id].resize(table.sparse_key_name_size()); - for (int j = 0; j < table.sparse_key_name_size(); ++j) { - sparse_key_names_[table_id][j] = table.sparse_key_name(j); - } - sparse_value_names_[table_id].resize(table.sparse_value_name_size()); - for (int j = 0; j < table.sparse_value_name_size(); ++j) { - sparse_value_names_[table_id][j] = table.sparse_value_name(j); - } - sparse_grad_names_[table_id].resize(table.sparse_grad_name_size()); - for (int j = 0; j < table.sparse_grad_name_size(); ++j) { - sparse_grad_names_[table_id][j] = table.sparse_grad_name(j); - } - label_var_name_[table_id] = table.label_var_name(); - sparse_push_keys_[table_id] = std::vector(); - } - - for (int i = 0; i < param_.dense_table_size(); ++i) { - uint64_t table_id = static_cast(param_.dense_table(i).table_id()); - auto table = param_.dense_table(i); - dense_value_names_[table_id].resize(table.dense_value_name_size()); - for (int j = 0; j < table.dense_value_name_size(); ++j) { - dense_value_names_[table_id][j] = table.dense_value_name(j); - } - dense_grad_names_[table_id].resize(table.dense_grad_name_size()); - for (int j = 0; j < table.dense_grad_name_size(); ++j) { - dense_grad_names_[table_id][j] = table.dense_grad_name(j); - } - } - - skip_ops_.resize(param_.skip_ops_size()); - for (int i = 0; i < param_.skip_ops_size(); ++i) { - skip_ops_[i] = param_.skip_ops(i); - } - for (int i = 0; i < param_.stat_var_names_size(); ++i) { - stat_var_name_map_[param_.stat_var_names(i)] = 1; - } - - need_to_push_sparse_ = param_.push_sparse(); - need_to_push_dense_ = param_.push_dense(); - - fleet_ptr_ = FleetWrapper::GetInstance(); - fetch_config_ = desc.fetch_config(); - use_cvm_ = desc.use_cvm(); - // for sparse value accessor, embedding only - no_cvm_ = desc.no_cvm(); - scale_datanorm_ = desc.scale_datanorm(); - dump_slot_ = desc.dump_slot(); - dump_fields_.resize(desc.dump_fields_size()); - for (int i = 0; i < desc.dump_fields_size(); ++i) { - dump_fields_[i] = desc.dump_fields(i); - } - adjust_ins_weight_config_ = desc.adjust_ins_weight_config(); - need_dump_param_ = false; - dump_param_.resize(desc.dump_param_size()); - for (int i = 0; i < desc.dump_param_size(); ++i) { - dump_param_[i] = desc.dump_param(i); - } - if (desc.dump_param_size() != 0) { - need_dump_param_ = true; - } - for (int i = 0; i < desc.check_nan_var_names_size(); ++i) { - check_nan_var_names_.push_back(desc.check_nan_var_names(i)); - } - copy_table_config_ = desc.copy_table_config(); - for (int i = 0; i < copy_table_config_.src_sparse_tables_size(); ++i) { - uint64_t src_table = copy_table_config_.src_sparse_tables(i); - uint64_t dest_table = copy_table_config_.dest_sparse_tables(i); - VLOG(3) << "copy_sparse_tables_ push back " << src_table << "->" - << dest_table; - copy_sparse_tables_.push_back(std::make_pair(src_table, dest_table)); - } - for (int i = 0; i < copy_table_config_.src_dense_tables_size(); ++i) { - uint64_t src_table = copy_table_config_.src_dense_tables(i); - uint64_t dest_table = copy_table_config_.dest_dense_tables(i); - VLOG(3) << "copy_dense_tables_ push back " << src_table << "->" - << dest_table; - copy_dense_tables_.push_back(std::make_pair(src_table, dest_table)); - } - for (auto& m : copy_table_config_.table_denpendency_map()) { - if (sparse_key_names_.find(m.key()) != sparse_key_names_.end()) { - // currently only support one dependency - for (auto& value : m.values()) { - table_dependency_[m.key()] = value; - } - } - } - pull_queue_ = paddle::framework::MakeChannel>(); - push_queue_ = paddle::framework::MakeChannel>(); -} - -void HeterBoxWorker::SetChannelWriter(ChannelObject* queue) { - writer_.Reset(queue); -} - -void HeterBoxWorker::SetNeedDump(bool need_dump_field) { - need_dump_field_ = need_dump_field; -} - -void HeterBoxWorker::DumpParam() {} - -void HeterBoxWorker::CollectLabelInfo(std::shared_ptr task, - size_t table_idx) { - if (no_cvm_) { - return; - } - uint64_t table_id = static_cast( - param_.program_config(0).pull_sparse_table_id(table_idx)); - - TableParameter table; - for (auto i : param_.sparse_table()) { - if (i.table_id() == table_id) { - table = i; - break; - } - } - auto& feature = (task->features_)[table_id]; - auto& feature_label = (task->feature_labels_)[table_id]; - Scope* scope = task->scope_; - feature_label.resize(feature.size()); - Variable* var = scope->FindVar(label_var_name_[table_id]); - LoDTensor* tensor = var->GetMutable(); - int64_t* label_ptr = tensor->data(); - - size_t global_index = 0; - for (size_t i = 0; i < sparse_key_names_[table_id].size(); ++i) { - VLOG(3) << "sparse_key_names_[" << i - << "]: " << sparse_key_names_[table_id][i]; - Variable* fea_var = scope->FindVar(sparse_key_names_[table_id][i]); - if (fea_var == nullptr) { - continue; - } - LoDTensor* tensor = fea_var->GetMutable(); - CHECK(tensor != nullptr) << "tensor of var " - << sparse_key_names_[table_id][i] << " is null"; - - // skip slots which do not have embedding - Variable* emb_var = scope->FindVar(sparse_value_names_[table_id][i]); - if (emb_var == nullptr) { - continue; - } - int64_t* ids = tensor->data(); - size_t fea_idx = 0; - // tensor->lod()[0].size() == batch_size + 1 - for (auto lod_idx = 1u; lod_idx < tensor->lod()[0].size(); ++lod_idx) { - for (; fea_idx < tensor->lod()[0][lod_idx]; ++fea_idx) { - // should be skipped feasign defined in protobuf - if (ids[fea_idx] == 0u) { - continue; - } - feature_label[global_index++] = - static_cast(label_ptr[lod_idx - 1]); - } - } - } - CHECK(global_index == feature.size()) - << "expect fea info size:" << feature.size() << " real:" << global_index; -} - -void HeterBoxWorker::FillSparseValue(std::shared_ptr task, - size_t table_idx) { - uint64_t table_id = static_cast( - param_.program_config(0).pull_sparse_table_id(table_idx)); - - TableParameter table; - for (auto i : param_.sparse_table()) { - if (i.table_id() == table_id) { - table = i; - break; - } - } - - auto& fea_value = (task->feature_values_)[table_id]; - Scope* scope = task->scope_; - auto fea_idx = 0u; - - std::vector init_value(table.fea_dim()); - for (size_t i = 0; i < sparse_key_names_[table_id].size(); ++i) { - std::string slot_name = sparse_key_names_[table_id][i]; - std::string emb_slot_name = sparse_value_names_[table_id][i]; - Variable* var = scope->FindVar(slot_name); - if (var == nullptr) { - continue; - } - LoDTensor* tensor = var->GetMutable(); - CHECK(tensor != nullptr) << "tensor of var " << slot_name << " is null"; - int64_t* ids = tensor->data(); - int len = tensor->numel(); - Variable* var_emb = scope->FindVar(emb_slot_name); - if (var_emb == nullptr) { - continue; - } - LoDTensor* tensor_emb = var_emb->GetMutable(); - float* ptr = tensor_emb->mutable_data({len, table.emb_dim()}, - platform::CPUPlace()); - // memset(ptr, 0, sizeof(float) * len * table.emb_dim()); - auto& tensor_lod = tensor->lod()[0]; - LoD data_lod{tensor_lod}; - tensor_emb->set_lod(data_lod); - - bool is_nid = (adjust_ins_weight_config_.need_adjust() && - adjust_ins_weight_config_.nid_slot() == emb_slot_name); - if (is_nid) { - nid_show_.clear(); - } - int nid_ins_index = 0; - - for (int index = 0; index < len; ++index) { - if (use_cvm_ || no_cvm_) { - if (ids[index] == 0u) { - memcpy(ptr + table.emb_dim() * index, init_value.data(), - sizeof(float) * table.emb_dim()); - if (is_nid) { - nid_show_.push_back(-1); - ++nid_ins_index; - } - continue; - } - memcpy(ptr + table.emb_dim() * index, fea_value[fea_idx].data(), - sizeof(float) * table.emb_dim()); - if (is_nid && - static_cast(index) == tensor->lod()[0][nid_ins_index]) { - nid_show_.push_back(fea_value[fea_idx][0]); - ++nid_ins_index; - } - fea_idx++; - } else { - if (ids[index] == 0u) { - memcpy(ptr + table.emb_dim() * index, init_value.data() + 2, - sizeof(float) * table.emb_dim()); - if (is_nid) { - nid_show_.push_back(-1); - ++nid_ins_index; - } - continue; - } - memcpy(ptr + table.emb_dim() * index, fea_value[fea_idx].data() + 2, - sizeof(float) * table.emb_dim()); - if (is_nid && - static_cast(index) == tensor->lod()[0][nid_ins_index]) { - nid_show_.push_back(fea_value[fea_idx][0]); - ++nid_ins_index; - } - fea_idx++; - } - } - } -} - -void HeterBoxWorker::AdjustInsWeight(std::shared_ptr task) { -#ifdef _LINUX - // check var and tensor not null - Scope* scope = task->scope_; - if (!adjust_ins_weight_config_.need_adjust()) { - VLOG(0) << "need_adjust=false, skip adjust ins weight"; - return; - } - Variable* nid_var = scope->FindVar(adjust_ins_weight_config_.nid_slot()); - if (nid_var == nullptr) { - VLOG(0) << "nid slot var " << adjust_ins_weight_config_.nid_slot() - << " is nullptr, skip adjust ins weight"; - return; - } - LoDTensor* nid_tensor = nid_var->GetMutable(); - if (nid_tensor == nullptr) { - VLOG(0) << "tensor of nid slot var " << adjust_ins_weight_config_.nid_slot() - << " is nullptr, skip adjust ins weight"; - return; - } - Variable* ins_weight_var = - scope->FindVar(adjust_ins_weight_config_.ins_weight_slot()); - if (ins_weight_var == nullptr) { - VLOG(0) << "ins weight var " << adjust_ins_weight_config_.ins_weight_slot() - << " is nullptr, skip adjust ins weight"; - return; - } - LoDTensor* ins_weight_tensor = ins_weight_var->GetMutable(); - if (ins_weight_tensor == nullptr) { - VLOG(0) << "tensor of ins weight tensor " - << adjust_ins_weight_config_.ins_weight_slot() - << " is nullptr, skip adjust ins weight"; - return; - } - - float* ins_weights = ins_weight_tensor->data(); - size_t len = ins_weight_tensor->numel(); // len = batch size - // here we assume nid_show slot only has one feasign in each instance - CHECK(len == nid_show_.size()) << "ins_weight size should be equal to " - << "nid_show size, " << len << " vs " - << nid_show_.size(); - float nid_adjw_threshold = adjust_ins_weight_config_.nid_adjw_threshold(); - float nid_adjw_ratio = adjust_ins_weight_config_.nid_adjw_ratio(); - int64_t nid_adjw_num = 0; - double nid_adjw_weight = 0.0; - size_t ins_index = 0; - for (size_t i = 0; i < len; ++i) { - float nid_show = nid_show_[i]; - VLOG(3) << "nid_show " << nid_show; - if (nid_show < 0) { - VLOG(3) << "nid_show < 0, continue"; - continue; - } - float ins_weight = 1.0; - if (nid_show >= 0 && nid_show < nid_adjw_threshold) { - ins_weight = log(M_E + - (nid_adjw_threshold - nid_show) / nid_adjw_threshold * - nid_adjw_ratio); - // count nid adjw insnum and weight - ++nid_adjw_num; - nid_adjw_weight += ins_weight; - // choose large ins weight - VLOG(3) << "ins weight new " << ins_weight << ", ins weight origin " - << ins_weights[ins_index]; - if (ins_weight > ins_weights[ins_index]) { - VLOG(3) << "ins " << ins_index << " weight changes to " << ins_weight; - ins_weights[ins_index] = ins_weight; - } - ++ins_index; - } - } - VLOG(3) << "nid adjw info: total_adjw_num: " << nid_adjw_num - << ", avg_adjw_weight: " << nid_adjw_weight; -#endif -} - -void HeterBoxWorker::TrainFiles() { - VLOG(3) << "Begin to train files"; - platform::SetNumThreads(1); - need_to_push_dense_ = false; - while (1) { - VLOG(3) << "before heter task"; - std::shared_ptr task; - - if (!pull_queue_->Get(task)) { - VLOG(3) << "get task"; - break; - } - VLOG(3) << "get task done"; - Scope* scope = task->scope_->kids().front(); - VLOG(3) << "get kid done"; - // do computation here - task->timeline.Start(); - for (auto& op : ops_) { - if (op->HasAttr("op_device")) { - auto device = op->Attr("op_device"); - if (device != "gpu") { - continue; - } - } - bool need_skip = false; - for (auto t = 0u; t < skip_ops_.size(); ++t) { - if (op->Type().find(skip_ops_[t]) != std::string::npos) { - need_skip = true; - break; - } - } - if (!need_skip) { - op->Run(*(scope), place_); - } - } - platform::DeviceContextPool::Instance().Get(place_)->Wait(); - task->timeline.Pause(); - task->xpu_op_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - push_queue_->Put(task); - } -} - -void HeterTask::PackGpuTask(Scope* thread_scope, DataFeed* reader, - const ProgramDesc& program) { - auto& block = program.Block(0); - if (!scope_) { - scope_ = &(thread_scope->NewScope()); - for (auto& var : block.AllVars()) { - if (!var->Persistable()) { - auto* ptr = scope_->Var(var->Name()); - InitializeVariable(ptr, var->GetType()); - } - } - } - reader->AssignFeedVar(*scope_); - cur_batch_ = reader->Next(); -} - -void HeterBoxWorker::ResetStat() { - total_time_ = 0; - read_time_ = 0; - pack_time_ = 0; - pull_sparse_local_time_ = 0; - op_all_time_ = 0; - xpu_op_time_ = 0; - xpu_wait_time_ = 0; - cpu_op_time_ = 0; - collect_label_time_ = 0; - fill_sparse_time_ = 0; - push_sparse_time_ = 0; - gpu_2_cpu_time_ = 0; - cpu_2_gpu_time_ = 0; - total_inst_ = 0; -} - -void HeterBoxWorker::ProduceTasks() { - need_to_push_dense_ = false; - while (1) { - std::shared_ptr task; - task = object_pool_.Get(); - task->Reset(); - { - std::lock_guard lock(mutex_); - task->timeline.Start(); - task->PackGpuTask(thread_scope_, device_reader_, program_); - task->timeline.Pause(); - task->pack_time = task->timeline.ElapsedSec(); - task->total_time += task->pack_time; - if (task->cur_batch_ <= 0) { - if (!pull_queue_->Closed() && batch_cnt_ == done_cnt_) { - pull_queue_->Close(); - } - break; - } - batch_cnt_ += 1; - } - for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); - ++i) { - uint64_t tid = static_cast( - param_.program_config(0).pull_sparse_table_id(i)); - TableParameter table; - for (auto j : param_.sparse_table()) { - if (j.table_id() == tid) { - table = j; - break; - } - } - task->timeline.Start(); - fleet_ptr_->HeterPullSparseVars(thread_id_, task, tid, - sparse_key_names_[tid], table.fea_dim(), - sparse_value_names_[tid]); - task->timeline.Pause(); - task->pull_sparse_local_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - - task->timeline.Start(); - CollectLabelInfo(task, i); - task->timeline.Pause(); - task->collect_label_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - - task->timeline.Start(); - FillSparseValue(task, i); - task->timeline.Pause(); - task->fill_sparse_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - - auto nid_iter = std::find(sparse_value_names_[tid].begin(), - sparse_value_names_[tid].end(), - adjust_ins_weight_config_.nid_slot()); - if (nid_iter != sparse_value_names_[tid].end()) { - AdjustInsWeight(task); - } - } - - task->timeline.Start(); - size_t op_index = 0; - for (; op_index < ops_.size(); ++op_index) { - auto& op = ops_[op_index]; - if (op->HasAttr("op_device")) { - auto device = op->Attr("op_device"); - if (device == "gpu") { - break; - } - } - bool need_skip = false; - for (auto t = 0u; t < skip_ops_.size(); ++t) { - if (op->Type().find(skip_ops_[t]) != std::string::npos) { - need_skip = true; - break; - } - } - if (!need_skip) { - op->Run(*(task->scope_), platform::CPUPlace()); - } - } - - task->timeline.Pause(); - task->cpu_op_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - - task->timeline.Start(); - // prepare for gpu - Scope* cpu_scope = task->scope_; - Scope* gpu_scope = nullptr; - if (cpu_scope->kids().empty()) { - gpu_scope = &cpu_scope->NewScope(); - } else { - gpu_scope = cpu_scope->kids().front(); - } - for (const std::string& name : send_var_list_) { - const LoDTensor& cpu_tensor = cpu_scope->FindVar(name)->Get(); - LoDTensor* gpu_tensor = gpu_scope->Var(name)->GetMutable(); - gpu_tensor->set_lod(cpu_tensor.lod()); - gpu_tensor->Resize(cpu_tensor.dims()); - gpu_tensor->set_layout(cpu_tensor.layout()); - void* gpu_ptr = gpu_tensor->mutable_data(place_, cpu_tensor.type()); - const void* cpu_ptr = cpu_tensor.data(); - memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place_), gpu_ptr, - platform::CPUPlace(), cpu_ptr, - cpu_tensor.numel() * SizeOfType(cpu_tensor.type()), - copy_stream_); - } - task->timeline.Pause(); - task->cpu_2_gpu_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - pull_queue_->Put(task); - push_queue_->Get(task); - - int need_copy_grad = 1; - task->timeline.Start(); - for (; op_index < ops_.size(); ++op_index) { - auto& op = ops_[op_index]; - if (op->HasAttr("op_device")) { - auto device = op->Attr("op_device"); - if (device == "gpu") { - continue; - } - } - bool need_skip = false; - for (auto t = 0u; t < skip_ops_.size(); ++t) { - if (op->Type().find(skip_ops_[t]) != std::string::npos) { - need_skip = true; - break; - } - } - if (!need_skip) { - need_copy_grad = 0; - op->Run(*(task->scope_), platform::CPUPlace()); - } - } - task->timeline.Pause(); - task->cpu_op_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - - VLOG(3) << "fill sparse value for all sparse table done."; - for (std::string& var_name : check_nan_var_names_) { - Variable* var = (task->scope_)->FindVar(var_name); - if (var == nullptr) { - continue; - } - LoDTensor* tensor = var->GetMutable(); - if (tensor == nullptr) { - continue; - } - PADDLE_ENFORCE_EQ(framework::TensorContainsInf(*tensor), false, - platform::errors::InvalidArgument( - "Tensor %s contains Inf.", var_name)); - PADDLE_ENFORCE_EQ(framework::TensorContainsNAN(*tensor), false, - platform::errors::InvalidArgument( - "Tensor %s contains NAN.", var_name)); - } - - if (need_to_push_sparse_) { - // push gradients here - for (int i = 0; i < param_.program_config(0).push_sparse_table_id_size(); - ++i) { - uint64_t tid = static_cast( - param_.program_config(0).push_sparse_table_id(i)); - TableParameter table; - for (auto i : param_.sparse_table()) { - if (i.table_id() == tid) { - table = i; - break; - } - } - Scope* src_scope = task->scope_; - Scope* dest_scope = nullptr; - task->timeline.Start(); - if (need_copy_grad) { - if (cpu_scope->kids().empty()) { - dest_scope = &src_scope->NewScope(); - } else { - dest_scope = src_scope->kids().front(); - } - auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device; - platform::CUDADeviceGuard guard(dev_id); - - for (const std::string& name : sparse_grad_names_[tid]) { - const LoDTensor& src_tensor = - src_scope->FindVar(name)->Get(); - LoDTensor* dest_tensor = - dest_scope->Var(name)->GetMutable(); - dest_tensor->set_lod(src_tensor.lod()); - dest_tensor->Resize(src_tensor.dims()); - dest_tensor->set_layout(src_tensor.layout()); - void* dest_ptr = dest_tensor->mutable_data(platform::CPUPlace(), - src_tensor.type()); - const void* src_ptr = src_tensor.data(); - memory::Copy(platform::CPUPlace(), dest_ptr, - BOOST_GET_CONST(platform::CUDAPlace, place_), src_ptr, - src_tensor.numel() * SizeOfType(src_tensor.type()), - copy_stream_); - } - } else { - dest_scope = task->scope_; - } - task->timeline.Pause(); - task->gpu_2_cpu_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - - task->timeline.Start(); - fleet_ptr_->HeterPushSparseVars( - task, *(dest_scope), tid, sparse_key_names_[tid], - sparse_grad_names_[tid], table.emb_dim(), &push_sparse_status_, - use_cvm_, dump_slot_, no_cvm_); - task->timeline.Pause(); - task->push_sparse_time += task->timeline.ElapsedSec(); - task->total_time += task->timeline.ElapsedSec(); - } - } - - if (need_to_push_sparse_) { - VLOG(3) << "push sparse gradient done."; - int32_t tmp_push_sparse_wait_times = -1; - static uint32_t push_sparse_wait_times = - static_cast(tmp_push_sparse_wait_times); - if (push_sparse_status_.size() >= push_sparse_wait_times) { - for (auto& t : push_sparse_status_) { - t.wait(); - } - push_sparse_status_.resize(0); - } - - if (tmp_push_sparse_wait_times == -1) { - push_sparse_status_.resize(0); - } - } - { - std::lock_guard lock(mutex_); - total_time_ += task->total_time; - read_time_ += task->read_time; - pack_time_ += task->pack_time; - pull_sparse_local_time_ += task->pull_sparse_local_time; - op_all_time_ += task->op_all_time; - xpu_op_time_ += task->xpu_op_time; - xpu_wait_time_ += task->xpu_wait_time; - cpu_op_time_ += task->cpu_op_time; - collect_label_time_ += task->collect_label_time; - fill_sparse_time_ += task->fill_sparse_time; - push_sparse_time_ += task->push_sparse_time; - gpu_2_cpu_time_ += task->gpu_2_cpu_time; - cpu_2_gpu_time_ += task->cpu_2_gpu_time; - total_inst_ += task->cur_batch_; - } - done_cnt_.fetch_add(1, std::memory_order_relaxed); - if (thread_id_ == 0) { - // should be configured here - if (done_cnt_ > 0 && done_cnt_ % 100 == 0) { - fprintf(stderr, "cpu_2_gpu total time: %fs\n", - cpu_2_gpu_time_ / done_cnt_); - fprintf(stderr, "gpu_2_cpu run total time: %fs\n", - gpu_2_cpu_time_ / done_cnt_); - fprintf(stderr, "cpu op run total time: %fs\n", - cpu_op_time_ / done_cnt_); - fprintf(stderr, "xpu op run total time: %fs\n", - xpu_op_time_ / done_cnt_); - fprintf(stderr, "xpu wait total time: %fs\n", - xpu_wait_time_ / done_cnt_); - fprintf(stderr, "pack task time: %fs\n", pack_time_ / done_cnt_); - fprintf(stderr, "train total time: %fs\n", total_time_ / done_cnt_); - fprintf(stderr, "pull sparse local time: %fs\n", - pull_sparse_local_time_ / done_cnt_); - fprintf(stderr, "fill sparse time: %fs\n", - fill_sparse_time_ / done_cnt_); - fprintf(stderr, "push sparse time: %fs\n", - push_sparse_time_ / done_cnt_); - fprintf(stderr, "collect label time: %fs\n", - collect_label_time_ / done_cnt_); - fprintf(stderr, "mean read time: %fs\n", read_time_ / done_cnt_); - fprintf(stderr, "IO percent: %f\n", read_time_ / total_time_ * 100); - fprintf(stderr, "cpu_2_gpu run percent: %f\n", - cpu_2_gpu_time_ / total_time_ * 100); - fprintf(stderr, "gpu_2_cpu run percent: %f\n", - gpu_2_cpu_time_ / total_time_ * 100); - fprintf(stderr, "cpu op run percent: %f\n", - cpu_op_time_ / total_time_ * 100); - fprintf(stderr, "xpu op run percent: %f\n", - xpu_op_time_ / total_time_ * 100); - fprintf(stderr, "xpu wait percent: %f\n", - xpu_wait_time_ / total_time_ * 100); - fprintf(stderr, "pack task percent: %f\n", - pack_time_ / total_time_ * 100); - fprintf(stderr, "pull sparse local time percent: %f\n", - pull_sparse_local_time_ / total_time_ * 100); - fprintf(stderr, "collect label time percent: %f\n", - collect_label_time_ / total_time_ * 100); - fprintf(stderr, "fill sparse time percent: %f\n", - fill_sparse_time_ / total_time_ * 100); - fprintf(stderr, "push sparse time percent: %f\n", - push_sparse_time_ / total_time_ * 100); - fprintf(stderr, "%6.2f instances/s\n", total_inst_ / total_time_); - } - } - - VLOG(3) << "done taskid = " << task->taskid_; - task->scope_->DropKids(); - object_pool_.Push(task); - } -} - -} // end namespace framework -} // end namespace paddle -#endif diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 636760029fedc4e3a570f9a63db5d1f84795ab62..fc8fb9327d5bb2d2a3627f7fd463d48efb9a514f 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -243,55 +243,6 @@ class HeterXpuTrainer : public TrainerBase { #endif }; -class HeterBoxTrainer : public TrainerBase { - public: - HeterBoxTrainer() {} - virtual ~HeterBoxTrainer() {} - virtual void Initialize(const TrainerDesc& trainer_desc, Dataset* data_set); - virtual void InitTrainerEnv(const ProgramDesc& main_program, - const platform::Place& place); - virtual void InitOtherEnv(const ProgramDesc& main_program); - virtual void Run(); - virtual void Finalize(); - virtual void RegisterHeterCallback(); - virtual void DumpWork(int tid); - virtual Scope* GetWorkerScope(int thread_id); - virtual void CacheProgram(const ProgramDesc& main_program) { - new (&program_) ProgramDesc(main_program); - } - virtual std::string GetDumpPath(int tid) { return ""; } - virtual void InitDumpEnv() {} - template -#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - void HeterMemCpy(LoDTensor* tensor, LoDTensor* root_tensor, - const paddle::platform::Place& thread_place, - gpuStream_t stream); -#endif - void CreateThreadParam(const ProgramDesc& program, int num); - template - void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor); - - protected: - DownpourWorkerParameter param_; - std::map> dense_grad_names_; - std::vector need_merge_var_names_; - float scale_datanorm_; - paddle::platform::Place place_; - ProgramDesc program_; - std::shared_ptr fleet_ptr_; - std::shared_ptr pull_dense_worker_; - std::vector> workers_; - std::vector places_; - // ps-gpu - std::vector pull_threads_; - std::vector threads_; - int use_ps_gpu_; - int thread_num_; -#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - std::vector copy_streams_; - std::vector events_; -#endif -}; #endif #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ diff --git a/paddle/fluid/framework/trainer_factory.cc b/paddle/fluid/framework/trainer_factory.cc index 15073b6f78c5b35209c0c38135d067cb660e487e..660511b1f268d910629199bd122561a2a24a1b0a 100644 --- a/paddle/fluid/framework/trainer_factory.cc +++ b/paddle/fluid/framework/trainer_factory.cc @@ -70,7 +70,6 @@ REGISTER_TRAINER_CLASS(DistMultiTrainer); defined PADDLE_WITH_XPU) && \ (defined PADDLE_WITH_PSLIB) REGISTER_TRAINER_CLASS(HeterXpuTrainer); -REGISTER_TRAINER_CLASS(HeterBoxTrainer); #endif #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ (defined PADDLE_WITH_PSLIB) diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index f9e0e0ae047a25937d9c82d9077fe323bb2aab8d..fb1be483083a8c275d5da989ddf5d0271a76dd68 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -93,7 +93,7 @@ from .dygraph.varbase_patch_methods import monkey_patch_varbase from . import generator from .core import _cuda_synchronize from .generator import Generator -from .trainer_desc import TrainerDesc, DistMultiTrainer, PipelineTrainer, MultiTrainer, HeterXpuTrainer, HeterBoxTrainer +from .trainer_desc import TrainerDesc, DistMultiTrainer, PipelineTrainer, MultiTrainer, HeterXpuTrainer from .transpiler import HashName, RoundRobin from .backward import append_backward diff --git a/python/paddle/fluid/trainer_desc.py b/python/paddle/fluid/trainer_desc.py index 92a900e6c371586eb23dbda06345cfe449912ea6..4eca3a494e25a43c162a041f8d014e1e85c01597 100644 --- a/python/paddle/fluid/trainer_desc.py +++ b/python/paddle/fluid/trainer_desc.py @@ -17,7 +17,7 @@ import sys import os __all__ = [ 'TrainerDesc', 'MultiTrainer', 'DistMultiTrainer', 'PipelineTrainer', - 'HeterXpuTrainer', 'HeterBoxTrainer' + 'HeterXpuTrainer' ] @@ -346,30 +346,6 @@ class HeterXpuTrainer(TrainerDesc): self._device_worker._gen_worker_desc(self.proto_desc) -class HeterBoxTrainer(TrainerDesc): - """ - Implement of HeterBoxTrainer. - It's for Distributed training. - """ - - def __init__(self): - super(HeterBoxTrainer, self).__init__() - pass - - def _set_program(self, program): - super(HeterBoxTrainer, self)._set_program(program) - self._program = program - - def _gen_trainer_desc(self): - super(HeterBoxTrainer, self)._gen_trainer_desc() - self.proto_desc.class_name = "HeterBoxTrainer" - if self._program == None: - raise RuntimeError("None Program") - self._device_worker._set_infer(self._infer) - self._device_worker._set_program(self._program) - self._device_worker._gen_worker_desc(self.proto_desc) - - class PSGPUTrainer(TrainerDesc): """ Implement of PSGPUTrainer. diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index 95379a34c22144b1a17fcced5556291de15eaaa5..7912ffca84ba419c4347f19700b873fc255b8082 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -22,7 +22,7 @@ from paddle.fluid.log_helper import get_logger local_logger = get_logger( __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') -from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer, HeterXpuTrainer, HeterBoxTrainer, PSGPUTrainer +from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer, HeterXpuTrainer, PSGPUTrainer from .device_worker import Hogwild, DownpourSGD, Section, DownpourSGDOPT from .framework import Variable from multiprocessing import Process, Manager