未验证 提交 66c7a076 编写于 作者: T Thunderbrook 提交者: GitHub

Remove HeterBox (#33718)

* remove heterbox

* remove heterbox
上级 5c514f5e
...@@ -261,7 +261,7 @@ if(WITH_DISTRIBUTE) ...@@ -261,7 +261,7 @@ if(WITH_DISTRIBUTE)
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
heterxpu_trainer.cc heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc
heterbox_worker.cc heterbox_trainer.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto trainer_desc_proto glog fs shell device_context scope framework_proto trainer_desc_proto glog fs shell
fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer
...@@ -282,7 +282,7 @@ if(WITH_DISTRIBUTE) ...@@ -282,7 +282,7 @@ if(WITH_DISTRIBUTE)
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
heterxpu_trainer.cc heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc
heterbox_worker.cc heterbox_trainer.cc downpour_worker.cc downpour_worker_opt.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method
...@@ -296,7 +296,7 @@ if(WITH_DISTRIBUTE) ...@@ -296,7 +296,7 @@ if(WITH_DISTRIBUTE)
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
heterxpu_trainer.cc heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc
heterbox_worker.cc heterbox_trainer.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
...@@ -316,7 +316,7 @@ elseif(WITH_PSLIB) ...@@ -316,7 +316,7 @@ elseif(WITH_PSLIB)
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
heterxpu_trainer.cc heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc
heterbox_worker.cc heterbox_trainer.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
...@@ -326,7 +326,7 @@ else() ...@@ -326,7 +326,7 @@ else()
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
heterxpu_trainer.cc heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc ps_gpu_worker.cc
heterbox_worker.cc heterbox_trainer.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc ps_gpu_trainer.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
......
...@@ -444,107 +444,6 @@ class HeterCpuWorker : public HogwildWorker { ...@@ -444,107 +444,6 @@ class HeterCpuWorker : public HogwildWorker {
}; };
#endif #endif
#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_HIP || \
defined PADDLE_WITH_XPU) && \
(defined PADDLE_WITH_PSLIB)
class HeterBoxWorker : public HogwildWorker {
public:
HeterBoxWorker() {}
virtual ~HeterBoxWorker() {}
virtual void Initialize(const TrainerDesc& desc);
virtual void TrainFiles();
virtual void SetNeedDump(bool need_dump_field);
virtual void SetChannelWriter(ChannelObject<std::string>* queue);
virtual void SetWorkerNum(int num) { worker_num_ = num; }
virtual void CacheProgram(const ProgramDesc& main_program) {
new (&program_) ProgramDesc(main_program);
}
void ProduceTasks() override;
virtual void SetStream(const gpuStream_t stream) { copy_stream_ = stream; }
virtual void SetEvent(const gpuEvent_t event) { event_ = event; }
virtual void TrainFilesWithProfiler() {}
void ResetStat();
protected:
std::shared_ptr<paddle::framework::FleetWrapper> fleet_ptr_;
void FillSparseValue(std::shared_ptr<HeterTask> task, size_t table_id);
void PushGradients();
void CollectLabelInfo(std::shared_ptr<HeterTask> task, size_t table_id);
void AdjustInsWeight(std::shared_ptr<HeterTask> task);
void DumpParam();
void CopySparseTable();
void CopyDenseTable();
void CopyDenseVars();
private:
int mpi_rank_;
std::mutex mutex_;
std::vector<std::string> send_var_list_;
int worker_num_;
ProgramDesc program_;
HeterObjectPool<HeterTask> object_pool_;
bool need_dump_param_;
std::vector<std::string> dump_param_;
bool need_to_push_dense_;
bool need_dump_field_;
bool dump_slot_;
bool need_to_push_sparse_;
std::vector<std::string> dump_fields_;
ChannelWriter<std::string> writer_;
DownpourWorkerParameter param_;
float scale_datanorm_;
// just save the value in param_ for easy access
std::map<uint64_t, std::string> label_var_name_;
std::map<uint64_t, std::vector<std::string>> sparse_key_names_;
std::map<uint64_t, std::vector<std::string>> sparse_value_names_;
std::map<uint64_t, std::vector<std::string>> sparse_grad_names_;
std::map<uint64_t, std::vector<std::string>> dense_value_names_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
platform::Place root_place_;
// actually pushed feasign of each table
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
// skipped ops
std::vector<std::string> skip_ops_;
std::vector<::std::future<int32_t>> push_sparse_status_;
std::vector<::std::future<int32_t>> push_dense_status_;
// adjust ins weight
AdjustInsWeightConfig adjust_ins_weight_config_;
std::vector<float> nid_show_;
// check nan and inf during training
std::vector<std::string> check_nan_var_names_;
// copy table
CopyTableConfig copy_table_config_;
std::map<uint64_t, uint64_t> table_dependency_;
std::vector<std::pair<uint64_t, uint64_t>> copy_sparse_tables_;
std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
std::unordered_map<uint64_t, std::unordered_set<uint64_t>> feasign_set_;
paddle::framework::Channel<std::shared_ptr<HeterTask>> pull_queue_;
paddle::framework::Channel<std::shared_ptr<HeterTask>> push_queue_;
gpuEvent_t event_;
gpuStream_t copy_stream_;
int batch_cnt_{0};
std::atomic<int> done_cnt_{0};
double total_time_;
double read_time_;
double pack_time_;
double pull_sparse_local_time_;
double op_all_time_;
double xpu_op_time_;
double xpu_wait_time_;
double cpu_op_time_;
double collect_label_time_;
double fill_sparse_time_;
double push_sparse_time_;
double gpu_2_cpu_time_;
double cpu_2_gpu_time_;
uint64_t total_inst_;
};
#endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB) (defined PADDLE_WITH_PSLIB)
class PSGPUWorker : public HogwildWorker { class PSGPUWorker : public HogwildWorker {
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <cstdlib>
#include <string>
#include <vector>
#include "io/fs.h"
#include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/trainer.h"
#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_HIP || \
defined PADDLE_WITH_XPU) && \
(defined PADDLE_WITH_PSLIB)
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
#include "paddle/fluid/platform/cuda_device_guard.h"
#endif
namespace paddle {
namespace framework {
void HeterBoxTrainer::Initialize(const TrainerDesc& trainer_desc,
Dataset* dataset) {
thread_num_ = trainer_desc.thread_num();
param_ = trainer_desc.downpour_param();
for (int i = 0; i < param_.dense_table_size(); ++i) {
uint64_t table_id = static_cast<uint64_t>(param_.dense_table(i).table_id());
auto table = param_.dense_table(i);
dense_grad_names_[table_id].resize(table.dense_grad_name_size());
for (int j = 0; j < table.dense_grad_name_size(); ++j) {
dense_grad_names_[table_id][j] = table.dense_grad_name(j);
}
}
RegisterHeterCallback();
scale_datanorm_ = trainer_desc.scale_datanorm();
int place_num = trainer_desc.worker_places_size();
const std::vector<paddle::framework::DataFeed*> readers =
dataset->GetReaders();
for (int i = 0; i < place_num; ++i) {
int num = trainer_desc.worker_places(i);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
platform::CUDAPlace place = platform::CUDAPlace(num);
platform::CUDADeviceGuard guard(place.device);
gpuStream_t stream;
#ifdef PADDLE_WITH_HIP
PADDLE_ENFORCE_CUDA_SUCCESS(hipStreamCreate(&stream));
#else
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&stream));
#endif
copy_streams_.push_back(stream);
places_.push_back(place);
gpuEvent_t event;
#ifdef PADDLE_WITH_HIP
PADDLE_ENFORCE_CUDA_SUCCESS(
hipEventCreateWithFlags(&event, hipEventDisableTiming));
#else
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
#endif
events_.push_back(event);
#endif
#ifdef PADDLE_WITH_XPU
platform::XPUPlace place = platform::XPUPlace(num);
places_.push_back(place);
#endif
}
for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size();
i++) {
need_merge_var_names_.push_back(
trainer_desc.downpour_param().stat_var_names(i));
}
VLOG(3) << "going to initialize pull dense worker";
pull_dense_worker_ = PullDenseWorker::GetInstance();
pull_dense_worker_->Initialize(trainer_desc);
VLOG(3) << "initialize pull dense worker";
SetDebug(trainer_desc.debug());
fleet_ptr_ = FleetWrapper::GetInstance();
trainer_desc_ = trainer_desc;
workers_.resize(place_num);
for (int i = 0; i < place_num; ++i) {
workers_[i] = DeviceWorkerFactory::CreateDeviceWorker(
trainer_desc.device_worker_name());
workers_[i]->SetDeviceIndex(i);
workers_[i]->SetDataFeed(readers[i]);
workers_[i]->Initialize(trainer_desc);
workers_[i]->SetWorkerNum(place_num);
}
}
void HeterBoxTrainer::DumpWork(int tid) {}
void HeterBoxTrainer::RegisterHeterCallback() {
auto fleet_ptr = FleetWrapper::GetInstance();
fleet_ptr->RegisterHeterCallback([this](int worker, int taskid) {
// workers_[worker]->Schedule(taskid);
});
}
void HeterBoxTrainer::InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place) {
for (size_t i = 0; i < places_.size(); ++i) {
workers_[i]->SetPlace(places_[i]);
workers_[i]->SetStream(copy_streams_[i]);
workers_[i]->SetEvent(events_[i]);
workers_[i]->SetReaderPlace(platform::CPUPlace());
workers_[i]->SetRootScope(root_scope_);
workers_[i]->CreateDeviceResource(main_program); // Program
workers_[i]->BindingDataFeedMemory();
#ifdef PADDLE_WITH_PSLIB
workers_[i]->CacheProgram(main_program);
#endif
}
for (size_t num = 0; num < places_.size(); ++num) {
auto place = places_[num];
Scope* scope = workers_[num]->GetThreadScope();
auto stream = copy_streams_[num];
auto event = events_[num];
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id);
auto& block = main_program.Block(0);
for (auto& var : block.AllVars()) {
if (var->Persistable()) {
auto name = var->Name();
Variable* root_var = root_scope_->FindVar(name);
if (!root_var) {
continue;
}
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
auto* ptr = scope->Var(name);
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
LoDTensor* thread_tensor = ptr->GetMutable<LoDTensor>();
#define HeterMemcpyFunc(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
HeterMemCpy<cpp_type>(thread_tensor, root_tensor, place, stream); \
} \
} while (0)
_ForEachDataType_(HeterMemcpyFunc);
}
}
#ifdef PADDLE_WITH_HIP
PADDLE_ENFORCE_CUDA_SUCCESS(hipEventRecord(event, stream));
hipEventSynchronize(event);
#else
PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream));
cudaEventSynchronize(event);
#endif
}
place_ = place;
}
template <typename T>
void HeterBoxTrainer::HeterMemCpy(LoDTensor* thread_tensor,
LoDTensor* root_tensor,
const paddle::platform::Place& thread_place,
gpuStream_t stream) {
T* thread_ptr =
thread_tensor->mutable_data<T>(root_tensor->dims(), thread_place);
T* root_ptr = root_tensor->data<T>();
if (platform::is_cpu_place(root_tensor->place())) {
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, thread_place), thread_ptr,
platform::CPUPlace(), root_ptr,
sizeof(T) * root_tensor->numel(), stream);
} else {
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, thread_place), thread_ptr,
BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()),
root_ptr, sizeof(T) * root_tensor->numel(), stream);
}
}
void HeterBoxTrainer::InitOtherEnv(const ProgramDesc& main_program) {
pull_dense_worker_->SetRootScope(root_scope_);
pull_dense_worker_->CreatePinVar();
for (size_t i = 0; i < places_.size(); ++i) {
pull_dense_worker_->AddThreadScope(workers_[i]->GetThreadScope());
pull_dense_worker_->AddPlace(places_[i]);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
pull_dense_worker_->AddStream(copy_streams_[i]);
#endif
}
VLOG(3) << "init other env done.";
}
void HeterBoxTrainer::Run() {
int pull_thread_num = 3 * places_.size();
for (size_t thidx = 0; thidx < places_.size(); ++thidx) {
workers_[thidx]->device_reader_->Start();
std::dynamic_pointer_cast<paddle::framework::HeterBoxWorker>(
workers_[thidx])
->ResetStat();
}
for (int i = 0; i < pull_thread_num; ++i) {
int worker_id = i % places_.size();
pull_threads_.push_back(
std::thread(&DeviceWorker::ProduceTasks, workers_[worker_id].get()));
}
for (size_t thidx = 0; thidx < places_.size(); ++thidx) {
threads_.push_back(
std::thread(&DeviceWorker::TrainFiles, workers_[thidx].get()));
}
}
template <typename T>
void HeterBoxTrainer::MergeToRootScope(LoDTensor* root_tensor,
LoDTensor* tensor) {
LoDTensor tmp_root;
TensorCopy(*root_tensor, platform::CPUPlace(), &tmp_root);
T* tmp_root_data = tmp_root.data<T>();
LoDTensor tmp_tensor;
TensorCopy(*tensor, platform::CPUPlace(), &tmp_tensor);
T* data = tmp_tensor.data<T>();
for (int i = 0; i < tmp_tensor.numel(); i++) {
tmp_root_data[i] += data[i];
}
TensorCopy(tmp_root, platform::CPUPlace(), root_tensor);
}
Scope* HeterBoxTrainer::GetWorkerScope(int thread_id) { return nullptr; }
void HeterBoxTrainer::Finalize() {
for (auto& th : pull_threads_) {
th.join();
}
for (auto& th : threads_) {
th.join();
}
for (size_t i = 0; i < need_merge_var_names_.size(); i++) {
Variable* root_var = root_scope_->FindVar(need_merge_var_names_[i]);
if (root_var == nullptr) {
continue;
}
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
for (size_t j = 0; j < places_.size(); j++) {
Scope* cur_thread_scope = workers_[j]->GetThreadScope();
Variable* thread_var =
cur_thread_scope->FindVar(need_merge_var_names_[i]);
if (thread_var == nullptr) {
continue;
}
LoDTensor* thread_tensor = thread_var->GetMutable<LoDTensor>();
#define MergeCallback(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
if (thread_tensor->type() != proto_type) { \
VLOG(0) << "Error: thread id=" << j << ", need_merge_var_names_[" << i \
<< "] " << need_merge_var_names_[i] \
<< ", root tensor type=" << root_tensor->type() \
<< ", thread tensor type=" << thread_tensor->type(); \
exit(-1); \
} \
MergeToRootScope<cpp_type>(root_tensor, thread_tensor); \
} \
} while (0)
_ForEachDataType_(MergeCallback);
}
}
pull_dense_worker_->MergeDenseParam();
root_scope_->DropKids();
}
} // namespace framework
} // namespace paddle
#endif
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/heter_util.h"
#include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/string/string_helper.h"
#if (defined PADDLE_WITH_CUDA || defined PADDLE_WITH_XPU) && \
(defined PADDLE_WITH_PSLIB)
#include "paddle/fluid/platform/cuda_device_guard.h"
#if defined _WIN32 || defined __APPLE__
#else
#define _LINUX
#endif
namespace paddle {
namespace framework {
void HeterBoxWorker::Initialize(const TrainerDesc& desc) {
param_ = desc.downpour_param();
mpi_rank_ = desc.mpi_rank();
trainer_desc_ = desc;
for (int i = 0; i < trainer_desc_.xpu_recv_list_size(); ++i) {
send_var_list_.push_back(trainer_desc_.xpu_recv_list(i));
}
for (int i = 0; i < param_.sparse_table_size(); ++i) {
uint64_t table_id =
static_cast<uint64_t>(param_.sparse_table(i).table_id());
TableParameter table = param_.sparse_table(i);
sparse_key_names_[table_id].resize(table.sparse_key_name_size());
for (int j = 0; j < table.sparse_key_name_size(); ++j) {
sparse_key_names_[table_id][j] = table.sparse_key_name(j);
}
sparse_value_names_[table_id].resize(table.sparse_value_name_size());
for (int j = 0; j < table.sparse_value_name_size(); ++j) {
sparse_value_names_[table_id][j] = table.sparse_value_name(j);
}
sparse_grad_names_[table_id].resize(table.sparse_grad_name_size());
for (int j = 0; j < table.sparse_grad_name_size(); ++j) {
sparse_grad_names_[table_id][j] = table.sparse_grad_name(j);
}
label_var_name_[table_id] = table.label_var_name();
sparse_push_keys_[table_id] = std::vector<uint64_t>();
}
for (int i = 0; i < param_.dense_table_size(); ++i) {
uint64_t table_id = static_cast<uint64_t>(param_.dense_table(i).table_id());
auto table = param_.dense_table(i);
dense_value_names_[table_id].resize(table.dense_value_name_size());
for (int j = 0; j < table.dense_value_name_size(); ++j) {
dense_value_names_[table_id][j] = table.dense_value_name(j);
}
dense_grad_names_[table_id].resize(table.dense_grad_name_size());
for (int j = 0; j < table.dense_grad_name_size(); ++j) {
dense_grad_names_[table_id][j] = table.dense_grad_name(j);
}
}
skip_ops_.resize(param_.skip_ops_size());
for (int i = 0; i < param_.skip_ops_size(); ++i) {
skip_ops_[i] = param_.skip_ops(i);
}
for (int i = 0; i < param_.stat_var_names_size(); ++i) {
stat_var_name_map_[param_.stat_var_names(i)] = 1;
}
need_to_push_sparse_ = param_.push_sparse();
need_to_push_dense_ = param_.push_dense();
fleet_ptr_ = FleetWrapper::GetInstance();
fetch_config_ = desc.fetch_config();
use_cvm_ = desc.use_cvm();
// for sparse value accessor, embedding only
no_cvm_ = desc.no_cvm();
scale_datanorm_ = desc.scale_datanorm();
dump_slot_ = desc.dump_slot();
dump_fields_.resize(desc.dump_fields_size());
for (int i = 0; i < desc.dump_fields_size(); ++i) {
dump_fields_[i] = desc.dump_fields(i);
}
adjust_ins_weight_config_ = desc.adjust_ins_weight_config();
need_dump_param_ = false;
dump_param_.resize(desc.dump_param_size());
for (int i = 0; i < desc.dump_param_size(); ++i) {
dump_param_[i] = desc.dump_param(i);
}
if (desc.dump_param_size() != 0) {
need_dump_param_ = true;
}
for (int i = 0; i < desc.check_nan_var_names_size(); ++i) {
check_nan_var_names_.push_back(desc.check_nan_var_names(i));
}
copy_table_config_ = desc.copy_table_config();
for (int i = 0; i < copy_table_config_.src_sparse_tables_size(); ++i) {
uint64_t src_table = copy_table_config_.src_sparse_tables(i);
uint64_t dest_table = copy_table_config_.dest_sparse_tables(i);
VLOG(3) << "copy_sparse_tables_ push back " << src_table << "->"
<< dest_table;
copy_sparse_tables_.push_back(std::make_pair(src_table, dest_table));
}
for (int i = 0; i < copy_table_config_.src_dense_tables_size(); ++i) {
uint64_t src_table = copy_table_config_.src_dense_tables(i);
uint64_t dest_table = copy_table_config_.dest_dense_tables(i);
VLOG(3) << "copy_dense_tables_ push back " << src_table << "->"
<< dest_table;
copy_dense_tables_.push_back(std::make_pair(src_table, dest_table));
}
for (auto& m : copy_table_config_.table_denpendency_map()) {
if (sparse_key_names_.find(m.key()) != sparse_key_names_.end()) {
// currently only support one dependency
for (auto& value : m.values()) {
table_dependency_[m.key()] = value;
}
}
}
pull_queue_ = paddle::framework::MakeChannel<std::shared_ptr<HeterTask>>();
push_queue_ = paddle::framework::MakeChannel<std::shared_ptr<HeterTask>>();
}
void HeterBoxWorker::SetChannelWriter(ChannelObject<std::string>* queue) {
writer_.Reset(queue);
}
void HeterBoxWorker::SetNeedDump(bool need_dump_field) {
need_dump_field_ = need_dump_field;
}
void HeterBoxWorker::DumpParam() {}
void HeterBoxWorker::CollectLabelInfo(std::shared_ptr<HeterTask> task,
size_t table_idx) {
if (no_cvm_) {
return;
}
uint64_t table_id = static_cast<uint64_t>(
param_.program_config(0).pull_sparse_table_id(table_idx));
TableParameter table;
for (auto i : param_.sparse_table()) {
if (i.table_id() == table_id) {
table = i;
break;
}
}
auto& feature = (task->features_)[table_id];
auto& feature_label = (task->feature_labels_)[table_id];
Scope* scope = task->scope_;
feature_label.resize(feature.size());
Variable* var = scope->FindVar(label_var_name_[table_id]);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int64_t* label_ptr = tensor->data<int64_t>();
size_t global_index = 0;
for (size_t i = 0; i < sparse_key_names_[table_id].size(); ++i) {
VLOG(3) << "sparse_key_names_[" << i
<< "]: " << sparse_key_names_[table_id][i];
Variable* fea_var = scope->FindVar(sparse_key_names_[table_id][i]);
if (fea_var == nullptr) {
continue;
}
LoDTensor* tensor = fea_var->GetMutable<LoDTensor>();
CHECK(tensor != nullptr) << "tensor of var "
<< sparse_key_names_[table_id][i] << " is null";
// skip slots which do not have embedding
Variable* emb_var = scope->FindVar(sparse_value_names_[table_id][i]);
if (emb_var == nullptr) {
continue;
}
int64_t* ids = tensor->data<int64_t>();
size_t fea_idx = 0;
// tensor->lod()[0].size() == batch_size + 1
for (auto lod_idx = 1u; lod_idx < tensor->lod()[0].size(); ++lod_idx) {
for (; fea_idx < tensor->lod()[0][lod_idx]; ++fea_idx) {
// should be skipped feasign defined in protobuf
if (ids[fea_idx] == 0u) {
continue;
}
feature_label[global_index++] =
static_cast<float>(label_ptr[lod_idx - 1]);
}
}
}
CHECK(global_index == feature.size())
<< "expect fea info size:" << feature.size() << " real:" << global_index;
}
void HeterBoxWorker::FillSparseValue(std::shared_ptr<HeterTask> task,
size_t table_idx) {
uint64_t table_id = static_cast<uint64_t>(
param_.program_config(0).pull_sparse_table_id(table_idx));
TableParameter table;
for (auto i : param_.sparse_table()) {
if (i.table_id() == table_id) {
table = i;
break;
}
}
auto& fea_value = (task->feature_values_)[table_id];
Scope* scope = task->scope_;
auto fea_idx = 0u;
std::vector<float> init_value(table.fea_dim());
for (size_t i = 0; i < sparse_key_names_[table_id].size(); ++i) {
std::string slot_name = sparse_key_names_[table_id][i];
std::string emb_slot_name = sparse_value_names_[table_id][i];
Variable* var = scope->FindVar(slot_name);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
CHECK(tensor != nullptr) << "tensor of var " << slot_name << " is null";
int64_t* ids = tensor->data<int64_t>();
int len = tensor->numel();
Variable* var_emb = scope->FindVar(emb_slot_name);
if (var_emb == nullptr) {
continue;
}
LoDTensor* tensor_emb = var_emb->GetMutable<LoDTensor>();
float* ptr = tensor_emb->mutable_data<float>({len, table.emb_dim()},
platform::CPUPlace());
// memset(ptr, 0, sizeof(float) * len * table.emb_dim());
auto& tensor_lod = tensor->lod()[0];
LoD data_lod{tensor_lod};
tensor_emb->set_lod(data_lod);
bool is_nid = (adjust_ins_weight_config_.need_adjust() &&
adjust_ins_weight_config_.nid_slot() == emb_slot_name);
if (is_nid) {
nid_show_.clear();
}
int nid_ins_index = 0;
for (int index = 0; index < len; ++index) {
if (use_cvm_ || no_cvm_) {
if (ids[index] == 0u) {
memcpy(ptr + table.emb_dim() * index, init_value.data(),
sizeof(float) * table.emb_dim());
if (is_nid) {
nid_show_.push_back(-1);
++nid_ins_index;
}
continue;
}
memcpy(ptr + table.emb_dim() * index, fea_value[fea_idx].data(),
sizeof(float) * table.emb_dim());
if (is_nid &&
static_cast<size_t>(index) == tensor->lod()[0][nid_ins_index]) {
nid_show_.push_back(fea_value[fea_idx][0]);
++nid_ins_index;
}
fea_idx++;
} else {
if (ids[index] == 0u) {
memcpy(ptr + table.emb_dim() * index, init_value.data() + 2,
sizeof(float) * table.emb_dim());
if (is_nid) {
nid_show_.push_back(-1);
++nid_ins_index;
}
continue;
}
memcpy(ptr + table.emb_dim() * index, fea_value[fea_idx].data() + 2,
sizeof(float) * table.emb_dim());
if (is_nid &&
static_cast<size_t>(index) == tensor->lod()[0][nid_ins_index]) {
nid_show_.push_back(fea_value[fea_idx][0]);
++nid_ins_index;
}
fea_idx++;
}
}
}
}
void HeterBoxWorker::AdjustInsWeight(std::shared_ptr<HeterTask> task) {
#ifdef _LINUX
// check var and tensor not null
Scope* scope = task->scope_;
if (!adjust_ins_weight_config_.need_adjust()) {
VLOG(0) << "need_adjust=false, skip adjust ins weight";
return;
}
Variable* nid_var = scope->FindVar(adjust_ins_weight_config_.nid_slot());
if (nid_var == nullptr) {
VLOG(0) << "nid slot var " << adjust_ins_weight_config_.nid_slot()
<< " is nullptr, skip adjust ins weight";
return;
}
LoDTensor* nid_tensor = nid_var->GetMutable<LoDTensor>();
if (nid_tensor == nullptr) {
VLOG(0) << "tensor of nid slot var " << adjust_ins_weight_config_.nid_slot()
<< " is nullptr, skip adjust ins weight";
return;
}
Variable* ins_weight_var =
scope->FindVar(adjust_ins_weight_config_.ins_weight_slot());
if (ins_weight_var == nullptr) {
VLOG(0) << "ins weight var " << adjust_ins_weight_config_.ins_weight_slot()
<< " is nullptr, skip adjust ins weight";
return;
}
LoDTensor* ins_weight_tensor = ins_weight_var->GetMutable<LoDTensor>();
if (ins_weight_tensor == nullptr) {
VLOG(0) << "tensor of ins weight tensor "
<< adjust_ins_weight_config_.ins_weight_slot()
<< " is nullptr, skip adjust ins weight";
return;
}
float* ins_weights = ins_weight_tensor->data<float>();
size_t len = ins_weight_tensor->numel(); // len = batch size
// here we assume nid_show slot only has one feasign in each instance
CHECK(len == nid_show_.size()) << "ins_weight size should be equal to "
<< "nid_show size, " << len << " vs "
<< nid_show_.size();
float nid_adjw_threshold = adjust_ins_weight_config_.nid_adjw_threshold();
float nid_adjw_ratio = adjust_ins_weight_config_.nid_adjw_ratio();
int64_t nid_adjw_num = 0;
double nid_adjw_weight = 0.0;
size_t ins_index = 0;
for (size_t i = 0; i < len; ++i) {
float nid_show = nid_show_[i];
VLOG(3) << "nid_show " << nid_show;
if (nid_show < 0) {
VLOG(3) << "nid_show < 0, continue";
continue;
}
float ins_weight = 1.0;
if (nid_show >= 0 && nid_show < nid_adjw_threshold) {
ins_weight = log(M_E +
(nid_adjw_threshold - nid_show) / nid_adjw_threshold *
nid_adjw_ratio);
// count nid adjw insnum and weight
++nid_adjw_num;
nid_adjw_weight += ins_weight;
// choose large ins weight
VLOG(3) << "ins weight new " << ins_weight << ", ins weight origin "
<< ins_weights[ins_index];
if (ins_weight > ins_weights[ins_index]) {
VLOG(3) << "ins " << ins_index << " weight changes to " << ins_weight;
ins_weights[ins_index] = ins_weight;
}
++ins_index;
}
}
VLOG(3) << "nid adjw info: total_adjw_num: " << nid_adjw_num
<< ", avg_adjw_weight: " << nid_adjw_weight;
#endif
}
void HeterBoxWorker::TrainFiles() {
VLOG(3) << "Begin to train files";
platform::SetNumThreads(1);
need_to_push_dense_ = false;
while (1) {
VLOG(3) << "before heter task";
std::shared_ptr<HeterTask> task;
if (!pull_queue_->Get(task)) {
VLOG(3) << "get task";
break;
}
VLOG(3) << "get task done";
Scope* scope = task->scope_->kids().front();
VLOG(3) << "get kid done";
// do computation here
task->timeline.Start();
for (auto& op : ops_) {
if (op->HasAttr("op_device")) {
auto device = op->Attr<std::string>("op_device");
if (device != "gpu") {
continue;
}
}
bool need_skip = false;
for (auto t = 0u; t < skip_ops_.size(); ++t) {
if (op->Type().find(skip_ops_[t]) != std::string::npos) {
need_skip = true;
break;
}
}
if (!need_skip) {
op->Run(*(scope), place_);
}
}
platform::DeviceContextPool::Instance().Get(place_)->Wait();
task->timeline.Pause();
task->xpu_op_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
push_queue_->Put(task);
}
}
void HeterTask::PackGpuTask(Scope* thread_scope, DataFeed* reader,
const ProgramDesc& program) {
auto& block = program.Block(0);
if (!scope_) {
scope_ = &(thread_scope->NewScope());
for (auto& var : block.AllVars()) {
if (!var->Persistable()) {
auto* ptr = scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
}
}
}
reader->AssignFeedVar(*scope_);
cur_batch_ = reader->Next();
}
void HeterBoxWorker::ResetStat() {
total_time_ = 0;
read_time_ = 0;
pack_time_ = 0;
pull_sparse_local_time_ = 0;
op_all_time_ = 0;
xpu_op_time_ = 0;
xpu_wait_time_ = 0;
cpu_op_time_ = 0;
collect_label_time_ = 0;
fill_sparse_time_ = 0;
push_sparse_time_ = 0;
gpu_2_cpu_time_ = 0;
cpu_2_gpu_time_ = 0;
total_inst_ = 0;
}
void HeterBoxWorker::ProduceTasks() {
need_to_push_dense_ = false;
while (1) {
std::shared_ptr<HeterTask> task;
task = object_pool_.Get();
task->Reset();
{
std::lock_guard<std::mutex> lock(mutex_);
task->timeline.Start();
task->PackGpuTask(thread_scope_, device_reader_, program_);
task->timeline.Pause();
task->pack_time = task->timeline.ElapsedSec();
task->total_time += task->pack_time;
if (task->cur_batch_ <= 0) {
if (!pull_queue_->Closed() && batch_cnt_ == done_cnt_) {
pull_queue_->Close();
}
break;
}
batch_cnt_ += 1;
}
for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).pull_sparse_table_id(i));
TableParameter table;
for (auto j : param_.sparse_table()) {
if (j.table_id() == tid) {
table = j;
break;
}
}
task->timeline.Start();
fleet_ptr_->HeterPullSparseVars(thread_id_, task, tid,
sparse_key_names_[tid], table.fea_dim(),
sparse_value_names_[tid]);
task->timeline.Pause();
task->pull_sparse_local_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
task->timeline.Start();
CollectLabelInfo(task, i);
task->timeline.Pause();
task->collect_label_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
task->timeline.Start();
FillSparseValue(task, i);
task->timeline.Pause();
task->fill_sparse_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
auto nid_iter = std::find(sparse_value_names_[tid].begin(),
sparse_value_names_[tid].end(),
adjust_ins_weight_config_.nid_slot());
if (nid_iter != sparse_value_names_[tid].end()) {
AdjustInsWeight(task);
}
}
task->timeline.Start();
size_t op_index = 0;
for (; op_index < ops_.size(); ++op_index) {
auto& op = ops_[op_index];
if (op->HasAttr("op_device")) {
auto device = op->Attr<std::string>("op_device");
if (device == "gpu") {
break;
}
}
bool need_skip = false;
for (auto t = 0u; t < skip_ops_.size(); ++t) {
if (op->Type().find(skip_ops_[t]) != std::string::npos) {
need_skip = true;
break;
}
}
if (!need_skip) {
op->Run(*(task->scope_), platform::CPUPlace());
}
}
task->timeline.Pause();
task->cpu_op_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
task->timeline.Start();
// prepare for gpu
Scope* cpu_scope = task->scope_;
Scope* gpu_scope = nullptr;
if (cpu_scope->kids().empty()) {
gpu_scope = &cpu_scope->NewScope();
} else {
gpu_scope = cpu_scope->kids().front();
}
for (const std::string& name : send_var_list_) {
const LoDTensor& cpu_tensor = cpu_scope->FindVar(name)->Get<LoDTensor>();
LoDTensor* gpu_tensor = gpu_scope->Var(name)->GetMutable<LoDTensor>();
gpu_tensor->set_lod(cpu_tensor.lod());
gpu_tensor->Resize(cpu_tensor.dims());
gpu_tensor->set_layout(cpu_tensor.layout());
void* gpu_ptr = gpu_tensor->mutable_data(place_, cpu_tensor.type());
const void* cpu_ptr = cpu_tensor.data<void>();
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place_), gpu_ptr,
platform::CPUPlace(), cpu_ptr,
cpu_tensor.numel() * SizeOfType(cpu_tensor.type()),
copy_stream_);
}
task->timeline.Pause();
task->cpu_2_gpu_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
pull_queue_->Put(task);
push_queue_->Get(task);
int need_copy_grad = 1;
task->timeline.Start();
for (; op_index < ops_.size(); ++op_index) {
auto& op = ops_[op_index];
if (op->HasAttr("op_device")) {
auto device = op->Attr<std::string>("op_device");
if (device == "gpu") {
continue;
}
}
bool need_skip = false;
for (auto t = 0u; t < skip_ops_.size(); ++t) {
if (op->Type().find(skip_ops_[t]) != std::string::npos) {
need_skip = true;
break;
}
}
if (!need_skip) {
need_copy_grad = 0;
op->Run(*(task->scope_), platform::CPUPlace());
}
}
task->timeline.Pause();
task->cpu_op_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
VLOG(3) << "fill sparse value for all sparse table done.";
for (std::string& var_name : check_nan_var_names_) {
Variable* var = (task->scope_)->FindVar(var_name);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == nullptr) {
continue;
}
PADDLE_ENFORCE_EQ(framework::TensorContainsInf(*tensor), false,
platform::errors::InvalidArgument(
"Tensor %s contains Inf.", var_name));
PADDLE_ENFORCE_EQ(framework::TensorContainsNAN(*tensor), false,
platform::errors::InvalidArgument(
"Tensor %s contains NAN.", var_name));
}
if (need_to_push_sparse_) {
// push gradients here
for (int i = 0; i < param_.program_config(0).push_sparse_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_sparse_table_id(i));
TableParameter table;
for (auto i : param_.sparse_table()) {
if (i.table_id() == tid) {
table = i;
break;
}
}
Scope* src_scope = task->scope_;
Scope* dest_scope = nullptr;
task->timeline.Start();
if (need_copy_grad) {
if (cpu_scope->kids().empty()) {
dest_scope = &src_scope->NewScope();
} else {
dest_scope = src_scope->kids().front();
}
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device;
platform::CUDADeviceGuard guard(dev_id);
for (const std::string& name : sparse_grad_names_[tid]) {
const LoDTensor& src_tensor =
src_scope->FindVar(name)->Get<LoDTensor>();
LoDTensor* dest_tensor =
dest_scope->Var(name)->GetMutable<LoDTensor>();
dest_tensor->set_lod(src_tensor.lod());
dest_tensor->Resize(src_tensor.dims());
dest_tensor->set_layout(src_tensor.layout());
void* dest_ptr = dest_tensor->mutable_data(platform::CPUPlace(),
src_tensor.type());
const void* src_ptr = src_tensor.data<void>();
memory::Copy(platform::CPUPlace(), dest_ptr,
BOOST_GET_CONST(platform::CUDAPlace, place_), src_ptr,
src_tensor.numel() * SizeOfType(src_tensor.type()),
copy_stream_);
}
} else {
dest_scope = task->scope_;
}
task->timeline.Pause();
task->gpu_2_cpu_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
task->timeline.Start();
fleet_ptr_->HeterPushSparseVars(
task, *(dest_scope), tid, sparse_key_names_[tid],
sparse_grad_names_[tid], table.emb_dim(), &push_sparse_status_,
use_cvm_, dump_slot_, no_cvm_);
task->timeline.Pause();
task->push_sparse_time += task->timeline.ElapsedSec();
task->total_time += task->timeline.ElapsedSec();
}
}
if (need_to_push_sparse_) {
VLOG(3) << "push sparse gradient done.";
int32_t tmp_push_sparse_wait_times = -1;
static uint32_t push_sparse_wait_times =
static_cast<uint32_t>(tmp_push_sparse_wait_times);
if (push_sparse_status_.size() >= push_sparse_wait_times) {
for (auto& t : push_sparse_status_) {
t.wait();
}
push_sparse_status_.resize(0);
}
if (tmp_push_sparse_wait_times == -1) {
push_sparse_status_.resize(0);
}
}
{
std::lock_guard<std::mutex> lock(mutex_);
total_time_ += task->total_time;
read_time_ += task->read_time;
pack_time_ += task->pack_time;
pull_sparse_local_time_ += task->pull_sparse_local_time;
op_all_time_ += task->op_all_time;
xpu_op_time_ += task->xpu_op_time;
xpu_wait_time_ += task->xpu_wait_time;
cpu_op_time_ += task->cpu_op_time;
collect_label_time_ += task->collect_label_time;
fill_sparse_time_ += task->fill_sparse_time;
push_sparse_time_ += task->push_sparse_time;
gpu_2_cpu_time_ += task->gpu_2_cpu_time;
cpu_2_gpu_time_ += task->cpu_2_gpu_time;
total_inst_ += task->cur_batch_;
}
done_cnt_.fetch_add(1, std::memory_order_relaxed);
if (thread_id_ == 0) {
// should be configured here
if (done_cnt_ > 0 && done_cnt_ % 100 == 0) {
fprintf(stderr, "cpu_2_gpu total time: %fs\n",
cpu_2_gpu_time_ / done_cnt_);
fprintf(stderr, "gpu_2_cpu run total time: %fs\n",
gpu_2_cpu_time_ / done_cnt_);
fprintf(stderr, "cpu op run total time: %fs\n",
cpu_op_time_ / done_cnt_);
fprintf(stderr, "xpu op run total time: %fs\n",
xpu_op_time_ / done_cnt_);
fprintf(stderr, "xpu wait total time: %fs\n",
xpu_wait_time_ / done_cnt_);
fprintf(stderr, "pack task time: %fs\n", pack_time_ / done_cnt_);
fprintf(stderr, "train total time: %fs\n", total_time_ / done_cnt_);
fprintf(stderr, "pull sparse local time: %fs\n",
pull_sparse_local_time_ / done_cnt_);
fprintf(stderr, "fill sparse time: %fs\n",
fill_sparse_time_ / done_cnt_);
fprintf(stderr, "push sparse time: %fs\n",
push_sparse_time_ / done_cnt_);
fprintf(stderr, "collect label time: %fs\n",
collect_label_time_ / done_cnt_);
fprintf(stderr, "mean read time: %fs\n", read_time_ / done_cnt_);
fprintf(stderr, "IO percent: %f\n", read_time_ / total_time_ * 100);
fprintf(stderr, "cpu_2_gpu run percent: %f\n",
cpu_2_gpu_time_ / total_time_ * 100);
fprintf(stderr, "gpu_2_cpu run percent: %f\n",
gpu_2_cpu_time_ / total_time_ * 100);
fprintf(stderr, "cpu op run percent: %f\n",
cpu_op_time_ / total_time_ * 100);
fprintf(stderr, "xpu op run percent: %f\n",
xpu_op_time_ / total_time_ * 100);
fprintf(stderr, "xpu wait percent: %f\n",
xpu_wait_time_ / total_time_ * 100);
fprintf(stderr, "pack task percent: %f\n",
pack_time_ / total_time_ * 100);
fprintf(stderr, "pull sparse local time percent: %f\n",
pull_sparse_local_time_ / total_time_ * 100);
fprintf(stderr, "collect label time percent: %f\n",
collect_label_time_ / total_time_ * 100);
fprintf(stderr, "fill sparse time percent: %f\n",
fill_sparse_time_ / total_time_ * 100);
fprintf(stderr, "push sparse time percent: %f\n",
push_sparse_time_ / total_time_ * 100);
fprintf(stderr, "%6.2f instances/s\n", total_inst_ / total_time_);
}
}
VLOG(3) << "done taskid = " << task->taskid_;
task->scope_->DropKids();
object_pool_.Push(task);
}
}
} // end namespace framework
} // end namespace paddle
#endif
...@@ -243,55 +243,6 @@ class HeterXpuTrainer : public TrainerBase { ...@@ -243,55 +243,6 @@ class HeterXpuTrainer : public TrainerBase {
#endif #endif
}; };
class HeterBoxTrainer : public TrainerBase {
public:
HeterBoxTrainer() {}
virtual ~HeterBoxTrainer() {}
virtual void Initialize(const TrainerDesc& trainer_desc, Dataset* data_set);
virtual void InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place);
virtual void InitOtherEnv(const ProgramDesc& main_program);
virtual void Run();
virtual void Finalize();
virtual void RegisterHeterCallback();
virtual void DumpWork(int tid);
virtual Scope* GetWorkerScope(int thread_id);
virtual void CacheProgram(const ProgramDesc& main_program) {
new (&program_) ProgramDesc(main_program);
}
virtual std::string GetDumpPath(int tid) { return ""; }
virtual void InitDumpEnv() {}
template <typename T>
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void HeterMemCpy(LoDTensor* tensor, LoDTensor* root_tensor,
const paddle::platform::Place& thread_place,
gpuStream_t stream);
#endif
void CreateThreadParam(const ProgramDesc& program, int num);
template <typename T>
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
protected:
DownpourWorkerParameter param_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
std::vector<std::string> need_merge_var_names_;
float scale_datanorm_;
paddle::platform::Place place_;
ProgramDesc program_;
std::shared_ptr<paddle::framework::FleetWrapper> fleet_ptr_;
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
std::vector<std::shared_ptr<DeviceWorker>> workers_;
std::vector<platform::Place> places_;
// ps-gpu
std::vector<std::thread> pull_threads_;
std::vector<std::thread> threads_;
int use_ps_gpu_;
int thread_num_;
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
std::vector<gpuStream_t> copy_streams_;
std::vector<gpuEvent_t> events_;
#endif
};
#endif #endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
......
...@@ -70,7 +70,6 @@ REGISTER_TRAINER_CLASS(DistMultiTrainer); ...@@ -70,7 +70,6 @@ REGISTER_TRAINER_CLASS(DistMultiTrainer);
defined PADDLE_WITH_XPU) && \ defined PADDLE_WITH_XPU) && \
(defined PADDLE_WITH_PSLIB) (defined PADDLE_WITH_PSLIB)
REGISTER_TRAINER_CLASS(HeterXpuTrainer); REGISTER_TRAINER_CLASS(HeterXpuTrainer);
REGISTER_TRAINER_CLASS(HeterBoxTrainer);
#endif #endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB) (defined PADDLE_WITH_PSLIB)
......
...@@ -93,7 +93,7 @@ from .dygraph.varbase_patch_methods import monkey_patch_varbase ...@@ -93,7 +93,7 @@ from .dygraph.varbase_patch_methods import monkey_patch_varbase
from . import generator from . import generator
from .core import _cuda_synchronize from .core import _cuda_synchronize
from .generator import Generator from .generator import Generator
from .trainer_desc import TrainerDesc, DistMultiTrainer, PipelineTrainer, MultiTrainer, HeterXpuTrainer, HeterBoxTrainer from .trainer_desc import TrainerDesc, DistMultiTrainer, PipelineTrainer, MultiTrainer, HeterXpuTrainer
from .transpiler import HashName, RoundRobin from .transpiler import HashName, RoundRobin
from .backward import append_backward from .backward import append_backward
......
...@@ -17,7 +17,7 @@ import sys ...@@ -17,7 +17,7 @@ import sys
import os import os
__all__ = [ __all__ = [
'TrainerDesc', 'MultiTrainer', 'DistMultiTrainer', 'PipelineTrainer', 'TrainerDesc', 'MultiTrainer', 'DistMultiTrainer', 'PipelineTrainer',
'HeterXpuTrainer', 'HeterBoxTrainer' 'HeterXpuTrainer'
] ]
...@@ -346,30 +346,6 @@ class HeterXpuTrainer(TrainerDesc): ...@@ -346,30 +346,6 @@ class HeterXpuTrainer(TrainerDesc):
self._device_worker._gen_worker_desc(self.proto_desc) self._device_worker._gen_worker_desc(self.proto_desc)
class HeterBoxTrainer(TrainerDesc):
"""
Implement of HeterBoxTrainer.
It's for Distributed training.
"""
def __init__(self):
super(HeterBoxTrainer, self).__init__()
pass
def _set_program(self, program):
super(HeterBoxTrainer, self)._set_program(program)
self._program = program
def _gen_trainer_desc(self):
super(HeterBoxTrainer, self)._gen_trainer_desc()
self.proto_desc.class_name = "HeterBoxTrainer"
if self._program == None:
raise RuntimeError("None Program")
self._device_worker._set_infer(self._infer)
self._device_worker._set_program(self._program)
self._device_worker._gen_worker_desc(self.proto_desc)
class PSGPUTrainer(TrainerDesc): class PSGPUTrainer(TrainerDesc):
""" """
Implement of PSGPUTrainer. Implement of PSGPUTrainer.
......
...@@ -22,7 +22,7 @@ from paddle.fluid.log_helper import get_logger ...@@ -22,7 +22,7 @@ from paddle.fluid.log_helper import get_logger
local_logger = get_logger( local_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s')
from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer, HeterXpuTrainer, HeterBoxTrainer, PSGPUTrainer from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer, HeterXpuTrainer, PSGPUTrainer
from .device_worker import Hogwild, DownpourSGD, Section, DownpourSGDOPT from .device_worker import Hogwild, DownpourSGD, Section, DownpourSGDOPT
from .framework import Variable from .framework import Variable
from multiprocessing import Process, Manager from multiprocessing import Process, Manager
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册