未验证 提交 0cb60c70 编写于 作者: T Thunderbrook 提交者: GitHub

add heter ps mode (#25682)

* add heter ps mode

* code style
test=develop

* add with_pslib
test=develop

* unitest
test=develop

* code style
test=develop

* code style
test=develop

* code style
test=develop

* code style
test=develop

* code style
test=develop

* code style
test=develop

* code style
test=develop

* code style
test=develop

* test monitor
test=develop

* prepare trainer
test=develop

* code style
test=develop
上级 c8d0d141
......@@ -27,6 +27,7 @@ add_subdirectory(fleet)
add_subdirectory(io)
#ddim lib
proto_library(framework_proto SRCS framework.proto)
proto_library(heter_service_proto SRCS heter_service.proto)
proto_library(data_feed_proto SRCS data_feed.proto)
proto_library(trainer_desc_proto SRCS trainer_desc.proto DEPS framework_proto
data_feed_proto)
......@@ -195,20 +196,37 @@ cc_library(executor_gc_helper SRCS executor_gc_helper.cc DEPS scope proto_desc o
if(WITH_DISTRIBUTE)
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc downpour_worker_opt.cc
heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto trainer_desc_proto glog fs shell fleet_wrapper box_wrapper lodtensor_printer
device_context scope framework_proto trainer_desc_proto glog fs shell
fleet_wrapper heter_wrapper box_wrapper lodtensor_printer
lod_rank_table feed_fetch_method sendrecvop_rpc communicator collective_helper ${GLOB_DISTRIBUTE_DEPS}
graph_to_program_pass variable_helper data_feed_proto timer monitor)
graph_to_program_pass variable_helper data_feed_proto timer monitor
heter_service_proto)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
elseif(WITH_PSLIB)
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor pslib_brpc )
# TODO: Fix these unittest failed on Windows
if(NOT WIN32)
cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op)
endif()
else()
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc downpour_worker_opt.cc
heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper box_wrapper lodtensor_printer feed_fetch_method
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
# TODO: Fix these unittest failed on Windows
if(NOT WIN32)
......
......@@ -27,6 +27,7 @@ limitations under the License. */
#include <vector>
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/program_desc.h"
......@@ -51,10 +52,23 @@ bool CheckValidOutput(LoDTensor* tensor, size_t batch_size);
class FleetWrapper;
#ifdef PADDLE_WITH_PSLIB
class HeterWrapper;
#endif
class PullDenseWorker {
public:
virtual ~PullDenseWorker() {}
virtual void Initialize(const TrainerDesc& param);
#ifdef PADDLE_WITH_CUDA
void AddStream(const cudaStream_t stream) { copy_streams_.push_back(stream); }
void AddPlace(const paddle::platform::Place place) {
places_.push_back(place);
}
void AddThreadScope(Scope* scope) { thread_scopes_.push_back(scope); }
#endif
int Start();
void Stop();
void SetRootScope(Scope* scope) { root_scope_ = scope; }
......@@ -62,6 +76,7 @@ class PullDenseWorker {
void ResetThreadVersion(uint64_t table_id);
void Wait(std::vector<::std::future<int32_t>>* status_vec);
void PullDense(bool force_update = false);
void CreatePinVar();
int GetThreadIdByScope(const Scope* scope);
void SetThreadIdByScope(const Scope* scope, int tid);
static std::shared_ptr<PullDenseWorker> GetInstance() {
......@@ -105,6 +120,12 @@ class PullDenseWorker {
std::mutex mutex_for_mean_scale_;
float total_batch_num_ = 0;
std::unordered_map<const Scope*, int> scope_to_thread_id_;
#ifdef PADDLE_WITH_CUDA
std::vector<cudaStream_t> copy_streams_;
std::vector<paddle::platform::Place> places_;
std::vector<Scope*> thread_scopes_;
#endif
};
// should incorporate different type of device
......@@ -126,6 +147,8 @@ class DeviceWorker {
virtual void BindingDataFeedMemory() = 0;
virtual void SetRootScope(Scope* root_scope);
virtual void SetDataFeed(DataFeed* data_feed);
virtual void SetWorkerNum(int num) {}
virtual void CacheProgram(const ProgramDesc& main_program) {}
virtual void SetNeedDumpField(bool need_dump_field) {
need_dump_field_ = need_dump_field;
}
......@@ -161,6 +184,7 @@ class DeviceWorker {
FetchConfig fetch_config_;
bool use_cvm_;
bool no_cvm_;
TrainerDesc trainer_desc_;
// dump params or grads for debug
bool need_dump_param_;
......@@ -306,6 +330,87 @@ class DownpourWorkerOpt : public DownpourWorker {
uint64_t async_tid_ = 0;
};
#ifdef PADDLE_WITH_PSLIB
class HeterCpuWorker : public HogwildWorker {
public:
HeterCpuWorker() {}
virtual ~HeterCpuWorker() {}
virtual void Initialize(const TrainerDesc& desc);
virtual void TrainFiles();
virtual void TrainFilesWithProfiler();
virtual void SetNeedDump(bool need_dump_field);
virtual void SetChannelWriter(ChannelObject<std::string>* queue);
virtual void SetWorkerNum(int num) { worker_num_ = num; }
virtual void Schedule(int taskid);
virtual void JumpContext(std::shared_ptr<HeterTask> task);
virtual void CacheProgram(const ProgramDesc& main_program) {
new (&program_) ProgramDesc(main_program);
}
virtual void GetXpuOpIndex();
protected:
std::shared_ptr<paddle::framework::FleetWrapper> fleet_ptr_;
std::shared_ptr<paddle::framework::HeterWrapper> heter_ptr_;
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
void FillSparseValue(std::shared_ptr<HeterTask> task, size_t table_id);
void PushGradients();
void CollectLabelInfo(std::shared_ptr<HeterTask> task, size_t table_id);
void AdjustInsWeight(std::shared_ptr<HeterTask> task);
void DumpParam();
void CopySparseTable();
void CopyDenseTable();
void CopyDenseVars();
private:
int mpi_rank_;
int worker_num_;
int xpu_begin_op_index_;
int xpu_end_op_index_;
ProgramDesc program_;
HeterObjectPool<HeterTask> object_pool_;
HeterList<int, std::shared_ptr<HeterTask>> run_queue_;
HeterList<int, std::shared_ptr<HeterTask>> wait_queue_;
bool need_dump_param_;
std::vector<std::string> dump_param_;
bool need_to_push_dense_;
bool need_dump_field_;
bool dump_slot_;
bool need_to_push_sparse_;
std::vector<std::string> dump_fields_;
ChannelWriter<std::string> writer_;
DownpourWorkerParameter param_;
float scale_datanorm_;
// just save the value in param_ for easy access
std::map<uint64_t, std::string> label_var_name_;
std::map<uint64_t, std::vector<std::string>> sparse_key_names_;
std::map<uint64_t, std::vector<std::string>> sparse_value_names_;
std::map<uint64_t, std::vector<std::string>> sparse_grad_names_;
std::map<uint64_t, std::vector<std::string>> dense_value_names_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
platform::Place root_place_;
// actually pushed feasign of each table
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
// skipped ops
std::vector<std::string> skip_ops_;
std::vector<::std::future<int32_t>> push_sparse_status_;
std::vector<::std::future<int32_t>> push_dense_status_;
// adjust ins weight
AdjustInsWeightConfig adjust_ins_weight_config_;
std::vector<float> nid_show_;
// check nan and inf during training
std::vector<std::string> check_nan_var_names_;
// copy table
CopyTableConfig copy_table_config_;
std::map<uint64_t, uint64_t> table_dependency_;
std::vector<std::pair<uint64_t, uint64_t>> copy_sparse_tables_;
std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
std::unordered_map<uint64_t, std::unordered_set<uint64_t>> feasign_set_;
};
#endif
#if defined(PADDLE_WITH_NCCL)
class SectionWorker : public DeviceWorker {
public:
......
......@@ -62,6 +62,9 @@ std::shared_ptr<DeviceWorker> DeviceWorkerFactory::CreateDeviceWorker(
REGISTER_DEVICE_WORKER_CLASS(HogwildWorker);
REGISTER_DEVICE_WORKER_CLASS(DownpourWorker);
REGISTER_DEVICE_WORKER_CLASS(DownpourWorkerOpt);
#ifdef PADDLE_WITH_PSLIB
REGISTER_DEVICE_WORKER_CLASS(HeterCpuWorker);
#endif
#if defined(PADDLE_WITH_NCCL)
REGISTER_DEVICE_WORKER_CLASS(SectionWorker);
#endif
......
......@@ -35,7 +35,7 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc,
dump_file_num_ = trainer_desc.dump_file_num();
const std::vector<paddle::framework::DataFeed *> readers =
dataset->GetReaders();
RegisterHeterCallback();
thread_num_ = readers.size();
workers_.resize(thread_num_);
for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size();
......@@ -55,6 +55,7 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc,
workers_[i]->SetDumpParamVector(dump_param_);
workers_[i]->InitRandomDumpConfig(trainer_desc);
workers_[i]->Initialize(trainer_desc);
workers_[i]->SetWorkerNum(thread_num_);
}
VLOG(3) << "going to initialize pull dense worker";
......@@ -64,6 +65,13 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc,
SetDebug(trainer_desc.debug());
}
void DistMultiTrainer::RegisterHeterCallback() {
auto fleet_ptr = FleetWrapper::GetInstance();
fleet_ptr->RegisterHeterCallback([this](int worker, int taskid) {
// workers_[worker]->Schedule(taskid);
});
}
void DistMultiTrainer::InitDumpEnv() {
queue_ = paddle::framework::MakeChannel<std::string>();
for (int i = 0; i < thread_num_; ++i) {
......@@ -90,6 +98,9 @@ void DistMultiTrainer::InitTrainerEnv(const ProgramDesc &main_program,
workers_[i]->SetRootScope(root_scope_);
workers_[i]->CreateDeviceResource(main_program); // Program
workers_[i]->BindingDataFeedMemory();
#ifdef PADDLE_WITH_PSLIB
workers_[i]->CacheProgram(main_program);
#endif
}
// Scope* -> thread id, it will be used in push_dense op
for (int i = 0; i < thread_num_; ++i) {
......@@ -104,6 +115,11 @@ void DistMultiTrainer::InitOtherEnv(const ProgramDesc &main_program) {
}
pull_dense_worker_->SetRootScope(root_scope_);
pull_dense_worker_->Start();
#ifdef PADDLE_WITH_PSLIB
for (int i = 0; i < thread_num_; ++i) {
workers_[i]->GetXpuOpIndex();
}
#endif
VLOG(3) << "init other env done.";
}
......
......@@ -379,7 +379,7 @@ void DownpourWorker::CopyDenseTable() {
pull_dense_status.resize(0);
fleet_ptr_->PullDenseVarsAsync(*root_scope_, dest_table,
dense_value_names_[dest_table],
&pull_dense_status);
&pull_dense_status, true);
for (auto& t : pull_dense_status) {
t.wait();
auto status = t.get();
......
......@@ -19,4 +19,6 @@ else()
cc_library(gloo_wrapper SRCS gloo_wrapper.cc DEPS framework_proto variable_helper scope)
endif(WITH_GLOO)
cc_library(heter_wrapper SRCS heter_wrapper.cc DEPS framework_proto)
cc_test(test_fleet SRCS test_fleet.cc DEPS fleet_wrapper gloo_wrapper fs shell)
......@@ -154,6 +154,219 @@ void FleetWrapper::CreateClient2ClientConnection() {
#endif
}
#ifdef PADDLE_WITH_PSLIB
void FleetWrapper::HeterPullSparseVars(
int workerid, std::shared_ptr<HeterTask> task, const uint64_t table_id,
const std::vector<std::string>& var_names, int fea_value_dim,
const std::vector<std::string>& var_emb_names) {
std::vector<::std::future<int32_t>> pull_sparse_status;
pull_sparse_status.resize(0);
auto& scope = *(task->scope_);
auto& fea_keys = (task->features_)[table_id];
auto& fea_values = (task->feature_values_)[table_id];
fea_keys.clear();
for (size_t var_index = 0; var_index < var_names.size(); ++var_index) {
const std::string& name = var_names[var_index];
Variable* var = scope.FindVar(name);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
CHECK(tensor != nullptr) << "tensor of var " << name << " is null";
int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();
// skip slots which do not have embedding
const std::string& emb_name = var_emb_names[var_index];
Variable* emb_var = scope.FindVar(emb_name);
if (emb_var == nullptr) {
continue;
}
for (auto i = 0u; i < len; ++i) {
if (ids[i] == 0u) {
continue;
}
fea_keys.push_back(static_cast<uint64_t>(ids[i]));
}
}
fea_values.resize(fea_keys.size() + 1);
for (auto& t : fea_values) {
t.resize(fea_value_dim);
}
std::vector<float*> pull_result_ptr;
for (auto& t : fea_values) {
pull_result_ptr.push_back(t.data());
}
auto status = pslib_ptr_->_worker_ptr->heter_pull_sparse(
workerid, pull_result_ptr.data(), table_id, fea_keys.data(),
fea_keys.size(), task->taskid_);
pull_sparse_status.push_back(std::move(status));
for (auto& t : pull_sparse_status) {
t.wait();
auto status = t.get();
if (status != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
}
}
void FleetWrapper::HeterPushSparseVars(
std::shared_ptr<HeterTask> task, const uint64_t table_id,
const std::vector<std::string>& sparse_key_names,
const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<::std::future<int32_t>>* push_sparse_status, const bool use_cvm,
const bool dump_slot, const bool no_cvm) {
auto& scope = *(task->scope_);
int batch_size = task->cur_batch_;
int offset = 2;
int slot_offset = 0;
int grad_dim = emb_dim;
int show_index = 0;
int click_index = 1;
auto& fea_keys = (task->features_)[table_id];
auto& fea_labels = (task->feature_labels_)[table_id];
auto& push_values = (task->feature_grads_)[table_id];
auto& sparse_push_keys = (task->sparse_push_keys_)[table_id];
if (use_cvm) {
offset = 0;
grad_dim = emb_dim - 2;
}
if (no_cvm) {
offset = 0;
grad_dim = emb_dim;
}
if (dump_slot) {
slot_offset = 1;
show_index = 1;
click_index = 2;
}
CHECK_GE(grad_dim, 0);
sparse_push_keys.clear();
sparse_push_keys.reserve(fea_keys.size() + 1);
push_values.resize(fea_keys.size() + 1);
for (auto& t : push_values) {
t.resize(emb_dim + offset + slot_offset);
}
uint64_t fea_idx = 0u;
for (size_t i = 0;
i < sparse_key_names.size() && i < sparse_grad_names.size(); ++i) {
Variable* var = scope.FindVar(sparse_key_names[i]);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == nullptr) {
LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null";
exit(-1);
}
size_t len = tensor->numel();
int64_t* ids = tensor->data<int64_t>();
int slot = 0;
if (dump_slot) {
slot = boost::lexical_cast<int>(sparse_key_names[i]);
}
Variable* g_var = scope.FindVar(sparse_grad_names[i]);
if (g_var == nullptr) {
continue;
}
LoDTensor* g_tensor = g_var->GetMutable<LoDTensor>();
if (g_tensor == nullptr) {
LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null";
exit(-1);
}
float* g = g_tensor->data<float>();
if (scale_sparse_gradient_with_batch_size_ && grad_dim > 0) {
int dim = emb_dim + offset;
Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
g_mat(g, g_tensor->numel() / dim, dim);
g_mat.rightCols(grad_dim) *= batch_size;
}
for (auto id_idx = 0u; id_idx < len; ++id_idx) {
if (ids[id_idx] == 0) {
g += emb_dim;
continue;
}
sparse_push_keys.push_back(ids[id_idx]);
CHECK(fea_idx < push_values.size());
if (use_cvm || no_cvm) {
memcpy(push_values[fea_idx].data() + offset + slot_offset, g,
sizeof(float) * emb_dim);
} else {
CHECK(fea_idx < fea_labels.size());
memcpy(push_values[fea_idx].data() + offset + slot_offset, g,
sizeof(float) * emb_dim);
push_values[fea_idx][show_index] = 1.0f;
push_values[fea_idx][click_index] =
static_cast<float>(fea_labels[fea_idx]);
}
if (dump_slot) {
push_values[fea_idx][0] = static_cast<float>(slot);
}
g += emb_dim;
fea_idx++;
}
}
// slots whose embedding has been stop gradient or
// not involved in forward-backward
uint64_t no_grad_fea_num = 0u;
for (size_t i = sparse_grad_names.size(); i < sparse_key_names.size(); ++i) {
Variable* var = scope.FindVar(sparse_key_names[i]);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == nullptr) {
LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null";
exit(-1);
}
size_t len = tensor->numel();
int64_t* ids = tensor->data<int64_t>();
for (auto id_idx = 0u; id_idx < len; ++id_idx) {
if (ids[id_idx] == 0) {
continue;
}
++no_grad_fea_num;
}
}
CHECK(fea_idx + no_grad_fea_num == fea_keys.size())
<< "fea_idx: " << fea_idx << " no_grad_fea_num: " << no_grad_fea_num
<< " features size: " << fea_keys.size();
CHECK(fea_idx == sparse_push_keys.size());
if (fea_idx == 0) {
return;
}
std::vector<float*> push_g_vec;
for (auto i = 0u; i < sparse_push_keys.size(); ++i) {
push_g_vec.push_back(push_values[i].data());
}
auto status = pslib_ptr_->_worker_ptr->push_sparse(
table_id, sparse_push_keys.data(), (const float**)push_g_vec.data(),
sparse_push_keys.size());
push_sparse_status->push_back(std::move(status));
}
#endif
int FleetWrapper::RegisterHeterCallback(HeterCallBackFunc handler) {
#ifdef PADDLE_WITH_PSLIB
VLOG(3) << "calling FleetWrapper::RegisterHeterCallback";
VLOG(3) << "pslib_ptr_=" << pslib_ptr_;
VLOG(3) << "_worker_ptr=" << pslib_ptr_->_worker_ptr;
return pslib_ptr_->_worker_ptr->registe_heter_callback(handler);
#else
VLOG(0) << "FleetWrapper::RegisterHeterCallback"
<< " does nothing when no pslib";
#endif
return 0;
}
void FleetWrapper::PullSparseToLocal(const uint64_t table_id,
int fea_value_dim) {
#ifdef PADDLE_WITH_PSLIB
......@@ -421,13 +634,17 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
void FleetWrapper::PullDenseVarsAsync(
const Scope& scope, const uint64_t tid,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* pull_dense_status) {
std::vector<::std::future<int32_t>>* pull_dense_status, bool in_cpu) {
#ifdef PADDLE_WITH_PSLIB
auto& regions = _regions[tid];
regions.clear();
regions.resize(var_names.size());
for (auto i = 0u; i < var_names.size(); ++i) {
Variable* var = scope.FindVar(var_names[i]);
std::string varname = var_names[i];
if (!in_cpu) {
varname = var_names[i] + "pin";
}
Variable* var = scope.FindVar(varname);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* w = tensor->data<float>();
paddle::ps::Region reg(w, tensor->numel());
......@@ -485,6 +702,57 @@ void FleetWrapper::PushDenseVarsSync(
Scope* scope, const uint64_t table_id,
const std::vector<std::string>& var_names) {}
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB)
void FleetWrapper::PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size, const paddle::platform::Place& place,
cudaStream_t stream, cudaEvent_t event) {
std::vector<paddle::ps::Region> regions;
for (auto& t : var_names) {
Variable* var = scope.FindVar(t);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int count = tensor->numel();
float* g_data = tensor->data<float>();
Variable* pin_var = scope.FindVar(t + "pin");
LoDTensor* pin_tensor = pin_var->GetMutable<LoDTensor>();
float* pin_g = pin_tensor->mutable_data<float>(tensor->dims(),
platform::CUDAPinnedPlace());
memory::Copy(platform::CUDAPinnedPlace(), pin_g,
BOOST_GET_CONST(platform::CUDAPlace, place), g_data,
sizeof(float) * count, stream);
PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream));
cudaEventSynchronize(event);
float* g = pin_g;
if (scale_datanorm >= 0) {
if (t.find(".batch_size@GRAD") != std::string::npos ||
t.find(".batch_sum@GRAD") != std::string::npos) {
Eigen::Map<Eigen::MatrixXf> mat(g, 1, count);
float scale = 1.0 / batch_size;
mat *= scale;
} else if (t.find(".batch_square_sum@GRAD") != std::string::npos) {
VLOG(3) << "epsilon: " << scale_datanorm;
for (int i = 0; i < count; ++i) {
g[i] = (g[i] - batch_size * scale_datanorm) / batch_size +
batch_size * scale_datanorm;
}
}
}
paddle::ps::Region reg(g, count);
regions.emplace_back(std::move(reg));
}
auto status = pslib_ptr_->_worker_ptr->push_dense(regions.data(),
regions.size(), table_id);
if (push_sparse_status) {
push_sparse_status->push_back(std::move(status));
}
}
#endif
void FleetWrapper::PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
......@@ -1085,8 +1353,8 @@ void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope,
push_status.wait();
auto status = push_status.get();
if (status != 0) {
PADDLE_THORW(platform::errors::Fatal(
"push shrink dense param failed, status is [%d].", status));
// PADDLE_THORW(platform::errors::Fatal(
// "push shrink dense param failed, status is [%d].", status));
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
......
......@@ -28,6 +28,7 @@ limitations under the License. */
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
......@@ -80,6 +81,24 @@ class FleetWrapper {
pull_local_thread_num_ = thread_num;
}
#ifdef PADDLE_WITH_PSLIB
void HeterPullSparseVars(int workerid, std::shared_ptr<HeterTask> task,
const uint64_t table_id,
const std::vector<std::string>& var_names,
int fea_dim,
const std::vector<std::string>& var_emb_names);
void HeterPushSparseVars(
std::shared_ptr<HeterTask> task, const uint64_t table_id,
const std::vector<std::string>& sparse_key_names,
const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<::std::future<int32_t>>* push_sparse_status,
const bool use_cvm, const bool dump_slot, const bool no_cvm);
#endif
typedef std::function<void(int, int)> HeterCallBackFunc;
int RegisterHeterCallback(HeterCallBackFunc handler);
// Pull sparse variables from server in sync mode
// Param<in>: scope, table_id, var_names, fea_keys, fea_dim, var_emb_names
// Param<out>: fea_values
......@@ -118,15 +137,24 @@ class FleetWrapper {
void PullDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* pull_dense_status);
std::vector<::std::future<int32_t>>* pull_dense_status, bool in_cpu);
// push dense parameters(not gradients) to server in sync mode
void PushDenseParamSync(const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names);
// Push dense variables to server in async mode
// Param<in>: scope, table_id, var_names, scale_datanorm, batch_size
// Param<out>: push_sparse_status
// Push dense variables to server in async mode
// Param<in>: scope, table_id, var_names, scale_datanorm, batch_size
// Param<out>: push_sparse_status
#ifdef PADDLE_WITH_CUDA
void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size,
const paddle::platform::Place& place, cudaStream_t stream,
cudaEvent_t event);
#endif
void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
......
......@@ -54,10 +54,10 @@ void HdfsStore::set(const std::string& key, const std::vector<char>& data) {
paddle::framework::fs_remove(tmp);
if (i == retry_times_) {
VLOG(0) << "fs_open_write failed, retry times reaches limit";
PADDLE_THROW(platform::errors::PreconditionNotMet(
"fs_open_write failed, retry times reaches"
" limit ",
retry_times_));
// PADDLE_THROW(platform::errors::PreconditionNotMet(
// "fs_open_write failed, retry times reaches"
// " limit ",
// retry_times_));
}
} else {
break;
......@@ -143,9 +143,9 @@ void HdfsStore::wait(const std::vector<std::string>& keys,
break;
}
}
PADDLE_THROW(platform::errors::ExecutionTimeout(
"TIMEOUT self_rank = %d pair_rank = %d", self_rank_,
last_check_rank));
// PADDLE_THROW(platform::errors::ExecutionTimeout(
VLOG(0) << "TIMEOUT self_rank = " << self_rank_
<< " pair_rank = " << last_check_rank;
}
std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_));
}
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include <algorithm>
#include <utility>
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/io/fs.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/timer.h"
#ifdef PADDLE_WITH_PSLIB
namespace paddle {
namespace framework {
std::shared_ptr<HeterWrapper> HeterWrapper::s_instance_ = NULL;
bool HeterWrapper::is_initialized_ = false;
void HeterWrapper::CreateClient2XpuConnection() {
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connection_type = "single";
options.timeout_ms = 2000000;
xpu_channels_.resize(xpu_list_.size());
for (size_t i = 0; i < xpu_list_.size(); ++i) {
VLOG(3) << "channel init: " << xpu_list_[i];
xpu_channels_[i].reset(new brpc::Channel());
if (xpu_channels_[i]->Init(xpu_list_[i].c_str(), "", &options) != 0) {
VLOG(0) << "server channel init fail";
}
}
}
void HeterWrapper::RegisterServiceHandler(int cmd, HeterServiceHandler func) {
service_.RegisterServiceHandler(cmd, func);
}
void HeterWrapper::SetXpuList(const std::vector<std::string>& xpu_list) {
#ifdef PADDLE_WITH_PSLIB
VLOG(3) << "Going to set xpu list";
for (auto& x : xpu_list) {
xpu_list_.push_back(x);
VLOG(3) << "set xpu list: " << x << " size: " << xpu_list_.size();
}
#endif
}
void HeterWrapper::StartXpuService(const std::string& ip, uint32_t port) {
std::string ip_port = ip + ":" + std::to_string(port);
VLOG(3) << "xpu server starts at " << ip_port;
server_.AddService(&service_, brpc::SERVER_DOESNT_OWN_SERVICE);
brpc::ServerOptions options;
if (server_.Start(ip_port.c_str(), &options) != 0) {
VLOG(0) << "xpu server start fail";
}
}
// void HeterWrapper::SerializeToReq(const std::string& varname,
// Scope* scope, HeterRequest& request) {
// auto* req_var = request.mutable_vars();
void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope,
VariableMessage* req_var) {
Variable* var = scope->FindVar(varname);
if (var == nullptr) {
return;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
req_var->set_varname(varname);
req_var->set_type(LOD_TENSOR);
req_var->set_data_type(static_cast<VariableMessage::Type>(tensor->type()));
for (auto& dim : framework::vectorize(tensor->dims())) {
req_var->add_dims(dim);
}
const framework::LoD lod = tensor->lod();
if (lod.size() > 0) {
req_var->set_lod_level(lod.size());
for (auto& each : lod) {
VariableMessage::LodData* lod_inner = req_var->add_lod();
for (auto& d : each) {
lod_inner->add_lod_data(d);
}
}
}
auto* req_data = req_var->mutable_data();
req_data->clear();
req_data->resize(tensor->numel() * SizeOfType(tensor->type()));
char* data_ptr = const_cast<char*>(req_data->data());
if (platform::is_cpu_place(tensor->place())) {
memcpy(data_ptr, tensor->data<void>(),
tensor->numel() * SizeOfType(tensor->type()));
}
#ifdef PADDLE_WITH_CUDA
else {
memory::Copy(platform::CPUPlace(), data_ptr,
BOOST_GET_CONST(platform::CUDAPlace, tensor->place()),
tensor->data<void>(),
tensor->numel() * SizeOfType(tensor->type()), nullptr);
}
#endif
}
// void HeterWrapper::DeSerializeToTensor(Scope* scope,
// const HeterRequest* request) {
#ifdef PADDLE_WITH_CUDA
void HeterWrapper::DeSerializeToTensor(Scope* scope,
const VariableMessage& req_var,
platform::Place place,
cudaStream_t stream) {
#else
void HeterWrapper::DeSerializeToTensor(Scope* scope,
const VariableMessage& req_var,
platform::Place place) {
#endif
// const VariableMessage& req_var = request->vars();
auto* var = scope->FindVar(req_var.varname());
auto* tensor = var->GetMutable<LoDTensor>();
std::vector<int> vec_dim;
for (auto& x : req_var.dims()) {
vec_dim.push_back(x);
}
tensor->Resize(make_ddim(vec_dim));
LoD lod;
for (int i = 0; i < req_var.lod_level(); ++i) {
framework::Vector<size_t> v;
for (int j = 0; j < req_var.lod(i).lod_data_size(); ++j) {
v.push_back(req_var.lod(i).lod_data(j));
}
lod.push_back(v);
}
tensor->set_lod(lod);
void* tensor_data =
tensor->mutable_data(place, ToVarType(req_var.data_type()));
#ifdef PADDLE_WITH_CUDA
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place), tensor_data,
platform::CPUPlace(), req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type()), stream);
#else
memcpy(tensor_data, req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type()));
#endif
}
framework::proto::VarType::Type HeterWrapper::ToVarType(
VariableMessage::Type type) {
switch (type) {
case VariableMessage::FP32:
return framework::proto::VarType::FP32; // NOLINT
case VariableMessage::FP64:
return framework::proto::VarType::FP64; // NOLINT
case VariableMessage::INT32:
return framework::proto::VarType::INT32; // NOLINT
case VariableMessage::INT64:
return framework::proto::VarType::INT64; // NOLINT
case VariableMessage::BOOL:
return framework::proto::VarType::BOOL; // NOLINT
default:
VLOG(0) << "Not support type " << type;
}
}
void HeterWrapper::StopXpuService(int num) {
HeterRequest request;
HeterResponse response;
brpc::Controller cntl;
request.set_cmd(2);
// for (size_t i = 0; i < xpu_channels_.size(); ++i) {
HeterService_Stub stub(xpu_channels_[num].get());
stub.service(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
VLOG(0) << "call stop xpu service fail: " << cntl.ErrorText();
} else {
VLOG(3) << "call stop xpu service success";
}
// }
}
void HeterWrapper::EndPass(Scope* scope, int num) {
HeterRequest request;
HeterResponse response;
brpc::Controller cntl;
request.set_cmd(1);
// for (size_t i = 0; i < xpu_channels_.size(); ++i) {
HeterService_Stub stub(xpu_channels_[num].get());
stub.service(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
VLOG(0) << "call end pass fail: " << cntl.ErrorText();
} else {
VLOG(3) << "call end pass success";
for (int j = 0; j < response.vars_size(); ++j) {
DeSerializeToTensor(scope, response.vars(j), platform::CPUPlace());
}
}
// }
}
void HeterWrapper::CallRemoteXpu(std::shared_ptr<HeterTask> task,
HeterCpuWorker* worker, int mpi_rank,
std::vector<std::string>& send_vars) {
HeterRequest request;
request.set_cmd(0);
request.set_cur_batch(task->cur_batch_);
OnHeterRpcDone* done = new OnHeterRpcDone([this, task, worker](void* done) {
auto* closure = (OnHeterRpcDone*)done;
if (closure->cntl.Failed()) {
VLOG(0) << "call xpu fail: " << closure->cntl.ErrorText();
} else {
VLOG(3) << "call xpu success";
}
// DeSerializeToTensor(task->scope_,
// closure->response.vars(), platform::CPUPlace());
for (int i = 0; i < closure->response.vars_size(); ++i) {
DeSerializeToTensor(task->scope_, closure->response.vars(i),
platform::CPUPlace());
}
worker->Schedule(task->taskid_);
});
// std::vector<std::string> varnames = {"click", "12345"};
// //varnames.push_back(send_var);
// //if (send_var == "_generated_var_412") {
// varnames.push_back("filter_by_instag_0.tmp_0");
// varnames.push_back("filter_by_instag_2.tmp_0");
// varnames.push_back("filter_by_instag_0.tmp_1");
// varnames.push_back("concat_1.tmp_0");
// }
for (auto& varname : send_vars) {
auto* req_var = request.add_vars();
SerializeToReq(varname, task->scope_, req_var);
}
int num = mpi_rank % xpu_channels_.size();
HeterService_Stub stub(xpu_channels_[num].get());
// stub.service(&cntl, &request, &response,
// brpc::NewCallback(&HeterWrapper::RpcCallBack,
// response, cntl, worker, task));
stub.service(&done->cntl, &request, &done->response, done);
}
void HeterWrapper::CallRemoteXpuSync(std::shared_ptr<HeterTask> task,
HeterCpuWorker* worker, int mpi_rank,
std::vector<std::string>& send_vars) {
HeterRequest request;
HeterResponse response;
brpc::Controller cntl;
request.set_cmd(0);
request.set_cur_batch(task->cur_batch_);
// std::vector<std::string> varnames = {"concat_1.tmp_0", "click", "12345"};
for (auto& varname : send_vars) {
auto* req_var = request.add_vars();
SerializeToReq(varname, task->scope_, req_var);
}
HeterService_Stub stub(xpu_channels_[0].get());
stub.service(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
VLOG(0) << "call xpu fail: " << cntl.ErrorText();
} else {
VLOG(3) << "call xpu success";
for (int i = 0; i < response.vars_size(); ++i) {
DeSerializeToTensor(task->scope_, response.vars(i), platform::CPUPlace());
}
}
}
} // end namespace framework
} // end namespace paddle
#endif
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <atomic>
#include <ctime>
#include <map>
#include <memory>
#include <random>
#include <string>
#include <unordered_map>
#include <vector>
#ifdef PADDLE_WITH_PSLIB
#include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN
namespace paddle {
namespace framework {
class HeterCpuWorker;
typedef std::function<void(void*)> HeterRpcCallbackFunc;
class OnHeterRpcDone : public google::protobuf::Closure {
public:
OnHeterRpcDone(HeterRpcCallbackFunc func) : handler_(func) {}
virtual ~OnHeterRpcDone() {}
void Run() {
std::unique_ptr<OnHeterRpcDone> self_guard(this);
handler_(this);
}
HeterRpcCallbackFunc handler_;
HeterResponse response;
brpc::Controller cntl;
};
class HeterWrapper {
public:
virtual ~HeterWrapper() {
server_.Stop(1000);
server_.Join();
}
HeterWrapper() {}
static void HeterRpcCallBack(HeterResponse* response, brpc::Controller* cntl,
HeterCpuWorker* worker,
std::shared_ptr<HeterTask> task);
void CreateClient2XpuConnection();
void RegisterServiceHandler(int cmd, HeterServiceHandler func);
void StartXpuService(const std::string& ip, uint32_t port);
void CallRemoteXpu(std::shared_ptr<HeterTask> task, HeterCpuWorker* worker,
int mpi_rank, std::vector<std::string>& send_vars);
void CallRemoteXpuSync(std::shared_ptr<HeterTask> task,
HeterCpuWorker* worker, int mpi_rank,
std::vector<std::string>& send_vars);
void StopXpuService(int num);
void EndPass(Scope* scope, int num);
void SerializeToReq(const std::string& varname, Scope* scope,
VariableMessage* req_var);
framework::proto::VarType::Type ToVarType(VariableMessage::Type type);
#ifdef PADDLE_WITH_CUDA
void DeSerializeToTensor(Scope* scope, const VariableMessage& req_var,
platform::Place place,
cudaStream_t stream = nullptr);
#else
void DeSerializeToTensor(Scope* scope, const VariableMessage& req_var,
platform::Place place);
#endif
// HeterWrapper singleton
static std::shared_ptr<HeterWrapper> GetInstance() {
if (NULL == s_instance_) {
s_instance_.reset(new paddle::framework::HeterWrapper());
}
return s_instance_;
}
std::vector<std::string>& GetXpuList() { return xpu_list_; }
void SetXpuList(const std::vector<std::string>& xpu_list);
private:
static std::shared_ptr<HeterWrapper> s_instance_;
protected:
std::vector<std::shared_ptr<brpc::Channel>> xpu_channels_;
brpc::Server server_;
HeterXpuService service_;
static bool is_initialized_;
DISABLE_COPY_AND_ASSIGN(HeterWrapper);
std::vector<std::string> xpu_list_;
};
} // end namespace framework
} // end namespace paddle
#endif
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <fstream>
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <thread> // NOLINT
#include <unordered_map> // NOLINT
#include <unordered_set> // NOLINT
#include <vector>
#include "paddle/fluid/framework/heter_service.pb.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#ifdef PADDLE_WITH_PSLIB
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/server.h"
namespace paddle {
namespace framework {
typedef std::function<int(const HeterRequest*, HeterResponse*)>
HeterServiceHandler;
class DataFeed;
class HeterXpuService : public HeterService {
public:
HeterXpuService() {}
virtual ~HeterXpuService() {}
void service(::google::protobuf::RpcController* controller,
const HeterRequest* request, HeterResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
int ret = 0;
int cmd = request->cmd();
auto itr = handler_map_.find(cmd);
if (itr == handler_map_.end()) {
} else {
ret = itr->second(request, response);
}
// response->set_err_code(0);
// response->set_err_msg("");
if (ret != 0) {
// response->set_err_code(-1);
// response->set_err_msg("xpu service error");
}
}
void RegisterServiceHandler(int cmd, HeterServiceHandler func) {
VLOG(0) << "register heter service";
handler_map_[cmd] = func;
}
private:
std::unordered_map<int, HeterServiceHandler> handler_map_;
};
enum HeterTaskState { PULL_SPARSE, OP_RUN, XPU, OP_RUN_END, PUSH_GRAD, DONE };
class HeterTask {
public:
void Update() {
if (state_ == PULL_SPARSE) {
state_ = OP_RUN;
} else if (state_ == OP_RUN) {
state_ = XPU;
// state_ = PUSH_GRAD;
// state_ = PUSH_GRAD;
} else if (state_ == XPU) {
state_ = OP_RUN_END;
} else if (state_ == OP_RUN_END) {
state_ = PUSH_GRAD;
} else if (state_ == PUSH_GRAD) {
state_ = DONE;
}
}
void Reset() {
total_time = 0;
read_time = 0;
pack_time = 0;
pull_sparse_local_time = 0;
op_all_time = 0;
xpu_op_time = 0;
xpu_wait_time = 0;
cpu_op_time = 0;
collect_label_time = 0;
fill_sparse_time = 0;
push_sparse_time = 0;
}
void Show() {
std::cout << "features size " << features_.size() << std::endl;
for (size_t i = 0; i < features_.size(); ++i) {
std::cout << "features[" << i << "] size " << features_[i].size()
<< std::endl;
}
}
void PackTask(Scope* scope, int taskid, DataFeed* reader, int cur_batch,
const ProgramDesc& program);
Scope* scope_{nullptr};
int taskid_;
int cur_batch_;
HeterTaskState state_;
// cache
std::map<uint64_t, std::vector<uint64_t>> features_;
std::map<uint64_t, std::vector<float>> feature_labels_;
std::map<uint64_t, std::vector<std::vector<float>>> feature_values_;
std::map<uint64_t, std::vector<std::vector<float>>> feature_grads_;
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
double total_time{0};
double read_time{0};
double pack_time{0};
double pull_sparse_local_time{0};
double op_all_time{0};
double xpu_op_time{0};
double xpu_wait_time{0};
double cpu_op_time{0};
double collect_label_time{0};
double fill_sparse_time{0};
double push_sparse_time{0};
};
template <class T>
class HeterObjectPool {
public:
HeterObjectPool() {}
virtual ~HeterObjectPool(){};
std::shared_ptr<T> Get() {
std::lock_guard<std::mutex> lock(mutex_);
if (pool_.empty()) {
num_ += 1;
#ifdef PADDLE_WITH_CUDA
VLOG(0) << "pool construct size: " << num_;
#endif
return std::make_shared<T>();
} else {
auto ret = pool_.back();
pool_.pop_back();
return ret;
}
}
void Push(std::shared_ptr<T> data) {
std::lock_guard<std::mutex> lock(mutex_);
pool_.push_back(std::move(data));
}
int Size() {
std::lock_guard<std::mutex> lock(mutex_);
return pool_.size();
}
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }
private:
std::vector<std::shared_ptr<T>> pool_;
std::mutex mutex_;
int num_{0};
};
struct BthreadMutextGuard {
BthreadMutextGuard(bthread_mutex_t* rho) {
mutex_ = rho;
bthread_mutex_lock(mutex_);
}
~BthreadMutextGuard() { bthread_mutex_unlock(mutex_); }
bthread_mutex_t* mutex_;
};
template <class T>
class BtObjectPool {
public:
BtObjectPool() {
bthread_mutex_init(&mutex_, NULL);
bthread_cond_init(&cond_, NULL);
}
virtual ~BtObjectPool() {
bthread_cond_destroy(&cond_);
bthread_mutex_destroy(&mutex_);
};
std::shared_ptr<T> Get() {
BthreadMutextGuard guard(&mutex_);
while (pool_.empty()) {
bthread_cond_wait(&cond_, &mutex_);
}
auto ret = pool_.back();
pool_.pop_back();
return ret;
}
void Push(std::shared_ptr<T> data) {
BthreadMutextGuard guard(&mutex_);
pool_.push_back(std::move(data));
bthread_cond_signal(&cond_);
}
int Size() { return pool_.size(); }
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }
private:
std::vector<std::shared_ptr<T>> pool_;
bthread_mutex_t mutex_;
bthread_cond_t cond_;
int num_{0};
};
template <class K, class T>
struct HeterNode {
K key;
T value;
HeterNode* prev;
HeterNode* next;
};
template <class K, class T>
class HeterList {
public:
HeterList() : head_(new HeterNode<K, T>), tail_(new HeterNode<K, T>) {
head_->prev = NULL;
head_->next = tail_;
tail_->prev = head_;
tail_->next = NULL;
size = 0;
cap_ = 1e9;
}
~HeterList() {
delete head_;
delete tail_;
}
void SetCap(int num) { cap_ = num; }
bool TryPut(K& key, T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
if (task_map_.find(key) != task_map_.end()) {
// std::cout << "try put key=" << key << " false" << std::endl;
task_map_.erase(key);
return false;
} else {
HeterNode<K, T>* node = new HeterNode<K, T>;
node->key = key;
node->value = value;
map_[node->key] = node;
attach(node);
// std::cout << "try put key=" << key << " true" << std::endl;
return true;
}
}
bool Put(K& key, T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
HeterNode<K, T>* node = new HeterNode<K, T>;
// std::cout << "put key=" << key << " true" << std::endl;
node->key = key;
node->value = value;
map_[node->key] = node;
attach(node);
return true;
}
T TryGet(const K& key) {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
// std::cout << "try get key=" << key << " true" << std::endl;
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(key);
delete node;
return ret;
}
task_map_.insert(key);
// std::cout << "try get key=" << key << " false" << std::endl;
return nullptr;
}
T Get(const K& key) {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
// std::cout << "get key=" << key << " true" << std::endl;
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(key);
delete node;
return ret;
}
// std::cout << "get key=" << key << " false" << std::endl;
return nullptr;
}
T Get() {
std::lock_guard<std::mutex> lock(mutex_);
HeterNode<K, T>* node = head_->next;
if (node == tail_) {
// std::cout << "get2 false" << std::endl;
return nullptr;
} else {
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(node->key);
// std::cout << "get2 key=" << node->key << " true" << std::endl;
delete node;
return ret;
}
}
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return head_->next == tail_;
}
int Size() {
std::lock_guard<std::mutex> lock(mutex_);
return size;
}
private:
void detach(HeterNode<K, T>* node) {
node->prev->next = node->next;
node->next->prev = node->prev;
size--;
}
void attach(HeterNode<K, T>* node) {
node->prev = head_;
node->next = head_->next;
head_->next->prev = node;
head_->next = node;
size++;
}
private:
HeterNode<K, T>* head_;
HeterNode<K, T>* tail_;
std::unordered_map<K, HeterNode<K, T>*> map_;
std::unordered_set<K> task_map_;
std::mutex mutex_;
std::condition_variable cond_;
int cap_;
int size;
};
} // namespace framework
} // namespace paddle
#endif
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
syntax = "proto2";
package paddle.framework;
option cc_generic_services = true;
// It can be: LoDTensor、SelectedRows or NCCL_ID
enum VarType {
LOD_TENSOR = 0;
SELECTED_ROWS = 1;
NCCL_ID = 2;
}
// VariableMessage is serialized paddle variable message.
// NOTICE(gongwb):don't modify this proto if you are not
// not familar with how we serialize in sendrecvop_utils.h
// and deserilize it in variable_response.h.
message VariableMessage {
enum Type {
// Pod Types
BOOL = 0;
INT16 = 1;
INT32 = 2;
INT64 = 3;
FP16 = 4;
FP32 = 5;
FP64 = 6;
}
message LodData { repeated int64 lod_data = 1; }
optional string varname = 1;
// TODO(Yancey1989): reference framework::proto::VarDesc::VarType
optional VarType type = 2;
// bool persistable is not needed for sending.
// tensor info:
optional Type data_type = 3;
repeated int64 dims = 4;
// lod details:
optional int64 lod_level = 5;
repeated LodData lod = 6;
// selected_rows height, aka. original dim0
optional int64 slr_height = 7;
// tensor data
optional bytes data = 8;
}
message HeterRequest {
required int32 cmd = 1;
optional int32 cur_batch = 2;
repeated VariableMessage vars = 3;
};
message HeterResponse {
// optional VariableMessage vars = 1;
repeated VariableMessage vars = 1;
};
service HeterService { rpc service(HeterRequest) returns (HeterResponse); };
此差异已折叠。
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <cstdlib>
#include <ctime>
#include <string>
#include <vector>
#include "io/fs.h"
#include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/trainer.h"
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB)
#include "paddle/fluid/platform/cuda_device_guard.h"
namespace paddle {
namespace framework {
void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc,
Dataset* dataset) {
srand((unsigned)time(NULL));
param_ = trainer_desc.downpour_param();
for (int i = 0; i < param_.dense_table_size(); ++i) {
uint64_t table_id = static_cast<uint64_t>(param_.dense_table(i).table_id());
auto table = param_.dense_table(i);
dense_grad_names_[table_id].resize(table.dense_grad_name_size());
for (int j = 0; j < table.dense_grad_name_size(); ++j) {
dense_grad_names_[table_id][j] = table.dense_grad_name(j);
}
}
scale_datanorm_ = trainer_desc.scale_datanorm();
int place_num = trainer_desc.worker_places_size();
for (int i = 0; i < place_num; ++i) {
int num = trainer_desc.worker_places(i);
platform::CUDAPlace place = platform::CUDAPlace(num);
platform::CUDADeviceGuard guard(place.device);
cudaStream_t stream;
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&stream));
copy_streams_.push_back(stream);
places_.push_back(place);
cudaEvent_t event;
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
events_.push_back(event);
}
// thread_num_ = trainer_desc.thread_num();
// SetDataset(dataset);
// dump_fields_path_ = trainer_desc.dump_fields_path();
// dump_converter_ = trainer_desc.dump_converter();
// need_dump_field_ = false;
// if (trainer_desc.dump_fields_size() != 0 && dump_fields_path_ != "") {
// need_dump_field_ = true;
// }
// if (need_dump_field_) {
// auto &file_list = dataset->GetFileList();
// if (file_list.size() == 0) {
// need_dump_field_ = false;
// }
// }
// mpi_rank_ = trainer_desc.mpi_rank();
// mpi_size_ = trainer_desc.mpi_size();
// dump_file_num_ = trainer_desc.dump_file_num();
// const std::vector<paddle::framework::DataFeed *> readers =
// dataset->GetReaders();
// thread_num_ = readers.size();
for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size();
i++) {
need_merge_var_names_.push_back(
trainer_desc.downpour_param().stat_var_names(i));
}
running_ = true;
VLOG(3) << "going to initialize pull dense worker";
pull_dense_worker_ = PullDenseWorker::GetInstance();
pull_dense_worker_->Initialize(trainer_desc);
VLOG(3) << "initialize pull dense worker";
SetDebug(trainer_desc.debug());
fleet_ptr_ = FleetWrapper::GetInstance();
heter_ptr_ = HeterWrapper::GetInstance();
RegisterServiceHandler();
// for (int i = 0; i < trainer_desc.worker_places_size(); ++i) {
// int num = trainer_desc.worker_places(i);
// platform::CUDAPlace place = platform::CUDAPlace(num);
// platform::CUDADeviceGuard guard(place.device);
// cudaStream_t stream;
// PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&stream));
// copy_streams_.push_back(stream);
// places_.push_back(place);
// }
trainer_desc_ = trainer_desc;
}
void HeterXpuTrainer::CreateThreadParam(const ProgramDesc& program, int num) {
auto place = places_[num];
Scope* scope = place_scopes_[num];
auto stream = copy_streams_[num];
auto event = events_[num];
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id);
auto& block = program.Block(0);
for (auto& var : block.AllVars()) {
if (var->Persistable()) {
auto name = var->Name();
Variable* root_var = root_scope_->FindVar(name);
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
auto* ptr = scope->Var(name);
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
LoDTensor* thread_tensor = ptr->GetMutable<LoDTensor>();
#define HeterMemcpyFunc(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
HeterMemCpy<cpp_type>(thread_tensor, root_tensor, place, stream); \
} \
} while (0)
_ForEachDataType_(HeterMemcpyFunc);
}
}
PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream));
cudaEventSynchronize(event);
}
template <typename T>
void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor,
LoDTensor* root_tensor,
const paddle::platform::Place& thread_place,
cudaStream_t stream) {
T* thread_ptr =
thread_tensor->mutable_data<T>(root_tensor->dims(), thread_place);
T* root_ptr = root_tensor->data<T>();
if (platform::is_cpu_place(root_tensor->place())) {
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, thread_place), thread_ptr,
platform::CPUPlace(), root_ptr,
sizeof(T) * root_tensor->numel(), stream);
} else {
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, thread_place), thread_ptr,
BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()),
root_ptr, sizeof(T) * root_tensor->numel(), stream);
}
}
void HeterXpuTrainer::DumpWork(int tid) {}
void HeterXpuTrainer::InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place) {
CacheProgram(main_program);
place_ = place;
auto& profiler = paddle::ps::CostProfiler::instance();
profiler.register_profiler("xpu_service_run_task");
profiler.register_profiler("xpu_service_deserial");
profiler.register_profiler("xpu_service_launch_kernel");
profiler.register_profiler("xpu_service_wait");
}
void HeterXpuTrainer::InitOtherEnv(const ProgramDesc& main_program) {
auto& block = main_program.Block(0);
pull_dense_worker_->SetRootScope(root_scope_);
pull_dense_worker_->CreatePinVar();
for (size_t i = 0; i < places_.size(); ++i) {
Scope* scope = &(root_scope_->NewScope());
// for (auto &var : block.AllVars()) {
// if (var->Persistable()) {
// auto *ptr = scope->Var(var->Name());
// InitializeVariable(ptr, var->GetType());
// }
// }
place_scopes_.push_back(scope);
CreateThreadParam(main_program, i);
pull_dense_worker_->AddThreadScope(scope);
pull_dense_worker_->AddPlace(places_[i]);
pull_dense_worker_->AddStream(copy_streams_[i]);
}
pull_dense_worker_->Start();
for (auto& stream : copy_streams_) {
cudaStreamSynchronize(stream);
}
op_names_.clear();
for (auto& op_desc : block.AllOps()) {
std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc);
op_names_.push_back(op_desc->Type());
OperatorBase* local_op_ptr = local_op.release();
ops_.push_back(local_op_ptr);
continue;
}
xpu_begin_op_index_ = xpu_end_op_index_ = -1;
xpu_begin_op_index_ = trainer_desc_.xpu_start_idx();
xpu_end_op_index_ = trainer_desc_.xpu_end_idx();
VLOG(0) << "xpu begin: " << xpu_begin_op_index_
<< " xpu end: " << xpu_end_op_index_;
// CHECK(xpu_begin_op_index_ == 0);
// CHECK(xpu_end_op_index_ = ops_.size() - 1);
//// init pool
for (size_t i = 0; i < 6; ++i) {
for (size_t j = 0; j < places_.size(); ++j) {
int num = j;
std::shared_ptr<HeterServiceContext> context =
std::make_shared<HeterServiceContext>();
context->place_num_ = num;
auto place = places_[num];
context->scope_ = &(place_scopes_[num]->NewScope());
auto& block = program_.Block(0);
for (auto& var : block.AllVars()) {
if (!var->Persistable()) {
auto* ptr = context->scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
}
}
for (auto& v : dense_grad_names_) {
for (auto& name : v.second) {
auto* ptr = context->scope_->Var(name + "pin");
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
}
}
for (auto& op_desc : block.AllOps()) {
std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc);
OperatorBase* local_op_ptr = local_op.release();
(context->ops_).push_back(local_op_ptr);
}
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id);
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming));
object_pool_.Push(context);
}
}
VLOG(3) << "init other env done.";
}
void HeterXpuTrainer::Run() {}
int HeterXpuTrainer::EndPass(const HeterRequest* request,
HeterResponse* response) {
// int scope_num = object_pool_.Size();
for (size_t i = 0; i < need_merge_var_names_.size(); i++) {
Variable* root_var = root_scope_->FindVar(need_merge_var_names_[i]);
if (root_var == nullptr) {
continue;
}
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
for (size_t j = 0; j < place_scopes_.size(); j++) {
Scope* cur_thread_scope = place_scopes_[j];
Variable* thread_var =
cur_thread_scope->FindVar(need_merge_var_names_[i]);
if (thread_var == nullptr) {
continue;
}
LoDTensor* thread_tensor = thread_var->GetMutable<LoDTensor>();
// if (root_tensor->numel() != thread_tensor->numel()) {
// continue;
// }
#define MergeCallback(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
if (thread_tensor->type() != proto_type) { \
VLOG(0) << "Error: thread id=" << j << ", need_merge_var_names_[" << i \
<< "] " << need_merge_var_names_[i] \
<< ", root tensor type=" << root_tensor->type() \
<< ", thread tensor type=" << thread_tensor->type(); \
exit(-1); \
} \
MergeToRootScope<cpp_type>(root_tensor, thread_tensor); \
} \
} while (0)
_ForEachDataType_(MergeCallback);
if (platform::is_gpu_place(thread_tensor->place())) {
auto dev_id =
BOOST_GET_CONST(platform::CUDAPlace, thread_tensor->place()).device;
platform::CUDADeviceGuard guard(dev_id);
cudaMemset(thread_tensor->data<void>(), 0,
thread_tensor->numel() * SizeOfType(thread_tensor->type()));
} else {
memset(thread_tensor->data<void>(), 0,
thread_tensor->numel() * SizeOfType(thread_tensor->type()));
}
}
auto* merge_var = response->add_vars();
heter_ptr_->SerializeToReq(need_merge_var_names_[i], root_scope_,
merge_var);
if (platform::is_gpu_place(root_tensor->place())) {
auto dev_id =
BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()).device;
platform::CUDADeviceGuard guard(dev_id);
cudaMemset(root_tensor->data<void>(), 0,
root_tensor->numel() * SizeOfType(root_tensor->type()));
} else {
memset(root_tensor->data<void>(), 0,
root_tensor->numel() * SizeOfType(root_tensor->type()));
}
}
return 0;
}
template <typename T>
void HeterXpuTrainer::MergeToRootScope(LoDTensor* root_tensor,
LoDTensor* tensor) {
LoDTensor tmp_root;
TensorCopy(*root_tensor, platform::CPUPlace(), &tmp_root);
T* tmp_root_data = tmp_root.data<T>();
LoDTensor tmp_tensor;
TensorCopy(*tensor, platform::CPUPlace(), &tmp_tensor);
T* data = tmp_tensor.data<T>();
for (int i = 0; i < tmp_tensor.numel(); i++) {
tmp_root_data[i] += data[i];
}
TensorCopy(tmp_root, root_tensor->place(), root_tensor);
}
int HeterXpuTrainer::StopService(const HeterRequest* request,
HeterResponse* response) {
std::unique_lock<std::mutex> lock(mutex_);
running_ = false;
cond_.notify_one();
return 0;
}
int HeterXpuTrainer::RunTask(const HeterRequest* request,
HeterResponse* response) {
auto timer = std::make_shared<paddle::ps::CostTimer>("xpu_service_run_task");
std::shared_ptr<HeterServiceContext> context = object_pool_.Get();
if (!context->scope_) {
int num = rand() % places_.size();
context->place_num_ = num;
auto place = places_[num];
context->scope_ = &(place_scopes_[num]->NewScope());
auto& block = program_.Block(0);
for (auto& var : block.AllVars()) {
if (!var->Persistable()) {
auto* ptr = context->scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
}
}
for (auto& v : dense_grad_names_) {
for (auto& name : v.second) {
auto* ptr = context->scope_->Var(name + "pin");
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
}
}
for (auto& op_desc : block.AllOps()) {
std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc);
OperatorBase* local_op_ptr = local_op.release();
(context->ops_).push_back(local_op_ptr);
}
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id);
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming));
}
context->Reset();
auto place = places_[context->place_num_];
{
auto deserial_timer =
std::make_shared<paddle::ps::CostTimer>("xpu_service_deserial");
for (int i = 0; i < request->vars_size(); ++i) {
heter_ptr_->DeSerializeToTensor(context->scope_, request->vars(i), place,
copy_streams_[context->place_num_]);
}
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(context->event_, copy_streams_[context->place_num_]));
while (cudaEventQuery(context->event_) != cudaSuccess) {
VLOG(3) << "wait for kernel";
bthread_yield();
}
}
{
auto launch_timer =
std::make_shared<paddle::ps::CostTimer>("xpu_service_launch_kernel");
for (int i = xpu_begin_op_index_; i <= xpu_end_op_index_; ++i) {
auto& op = (context->ops_)[i];
op->Run(*(context->scope_), place);
}
}
auto* dev_ctx = static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(context->event_, dev_ctx->stream()));
// cudaEventSynchronize(context->event_);
{
auto wait_timer =
std::make_shared<paddle::ps::CostTimer>("xpu_service_wait");
while (cudaEventQuery(context->event_) != cudaSuccess) {
VLOG(3) << "wait for kernel";
bthread_yield();
}
}
for (int i = 0; i < trainer_desc_.xpu_send_list_size(); ++i) {
const std::string& varname = trainer_desc_.xpu_send_list(i);
// CHECK(varname == "concat_1.tmp_0@GRAD");
auto* res_var = response->add_vars();
heter_ptr_->SerializeToReq(varname, context->scope_, res_var);
}
// std::string varname = "concat_1.tmp_0@GRAD";
//
// auto* res_var = response->add_vars();
// heter_ptr_->SerializeToReq(varname, context->scope_, res_var);
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid =
static_cast<uint64_t>(param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync(
*(context->scope_), tid, dense_grad_names_[tid],
&(context->push_dense_status_), scale_datanorm_, request->cur_batch(),
places_[context->place_num_], copy_streams_[context->place_num_],
context->event_);
}
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid =
static_cast<uint64_t>(param_.program_config(0).push_dense_table_id(i));
pull_dense_worker_->IncreaseThreadVersion(0, tid);
}
VLOG(3) << "push dense gradient done.";
context->scope_->DropKids();
object_pool_.Push(context);
VLOG(0) << "pool size " << object_pool_.Size();
return 0;
}
void HeterXpuTrainer::RegisterServiceHandler() {
heter_ptr_->RegisterServiceHandler(
0, [this](const HeterRequest* request, HeterResponse* response) -> int {
return this->RunTask(request, response);
});
heter_ptr_->RegisterServiceHandler(
1, [this](const HeterRequest* request, HeterResponse* response) -> int {
return this->EndPass(request, response);
});
heter_ptr_->RegisterServiceHandler(
2, [this](const HeterRequest* request, HeterResponse* response) -> int {
return this->StopService(request, response);
});
}
Scope* HeterXpuTrainer::GetWorkerScope(int thread_id) { return nullptr; }
void HeterXpuTrainer::Finalize() {
// for (auto &th : threads_) {
// th.join();
// }
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return !running_; });
sleep(3);
pull_dense_worker_->Stop();
root_scope_->DropKids();
}
} // namespace framework
} // namespace paddle
#endif
......@@ -102,6 +102,7 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program,
workers_[i]->SetRootScope(root_scope_);
workers_[i]->CreateDeviceResource(main_program); // Program
workers_[i]->BindingDataFeedMemory();
workers_[i]->CacheProgram(main_program);
}
}
......
......@@ -56,6 +56,34 @@ void PullDenseWorker::Initialize(const TrainerDesc& param) {
current_version_[tid] = 0;
}
fleet_ptr_ = FleetWrapper::GetInstance();
#ifdef PADDLE_WITH_CUDA
copy_streams_.clear();
places_.clear();
thread_scopes_.clear();
#endif
}
void PullDenseWorker::CreatePinVar() {
#ifdef PADDLE_WITH_CUDA
// for (auto& v : dense_value_names_) {
// for (auto& name : v.second) {
for (int i = 0; i < dwp_param_.program_config(0).pull_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
dwp_param_.program_config(0).pull_dense_table_id(i));
for (size_t j = 0; j < dense_value_names_[tid].size(); j++) {
auto& name = dense_value_names_[tid][j];
Variable* var = root_scope_->FindVar(name);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
auto* ptr = root_scope_->Var(name + "pin");
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
LoDTensor* pin_tensor = ptr->GetMutable<LoDTensor>();
pin_tensor->mutable_data<float>(tensor->dims(),
platform::CUDAPinnedPlace());
}
}
#endif
}
void PullDenseWorker::Wait(std::vector<::std::future<int32_t>>* status_vec) {
......@@ -75,6 +103,31 @@ void PullDenseWorker::Wait(std::vector<::std::future<int32_t>>* status_vec) {
exit(-1);
}
status_vec->resize(0);
#ifdef PADDLE_WITH_CUDA
for (size_t i = 0; i < places_.size(); ++i) {
// for (auto& v : dense_value_names_) {
// for (auto& name : v.second) {
for (int x = 0; x < dwp_param_.program_config(0).pull_dense_table_id_size();
++x) {
uint64_t tid = static_cast<uint64_t>(
dwp_param_.program_config(0).pull_dense_table_id(x));
for (size_t j = 0; j < dense_value_names_[tid].size(); j++) {
auto& name = dense_value_names_[tid][j];
Variable* pin_var = root_scope_->FindVar(name + "pin");
LoDTensor* pin_tensor = pin_var->GetMutable<LoDTensor>();
float* pin_w = pin_tensor->data<float>();
Variable* var = thread_scopes_[i]->FindVar(name);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* w = tensor->data<float>();
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, places_[i]), w,
platform::CUDAPinnedPlace(), pin_w,
sizeof(float) * tensor->numel(), copy_streams_[i]);
}
}
}
#endif
}
void PullDenseWorker::Stop() {
......@@ -91,8 +144,14 @@ void PullDenseWorker::PullDense(bool force_update) {
uint64_t tid = static_cast<uint64_t>(
dwp_param_.program_config(0).pull_dense_table_id(i));
if (force_update || CheckUpdateParam(tid)) {
#ifdef PADDLE_WITH_CUDA
VLOG(3) << "pull dense " << force_update << " " << tid;
fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid],
&pull_dense_status_);
&pull_dense_status_, false);
#else
fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid],
&pull_dense_status_, true);
#endif
ResetThreadVersion(tid);
}
}
......
......@@ -21,9 +21,12 @@ limitations under the License. */
#include <thread> // NOLINT
#include <vector>
#include <ctime>
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/reader.h"
......@@ -62,6 +65,7 @@ class TrainerBase {
Scope* root_scope_;
bool debug_;
Dataset* dataset_ptr_;
TrainerDesc trainer_desc_;
// For dump param or field
bool need_dump_field_ = false;
......@@ -118,10 +122,86 @@ class DistMultiTrainer : public MultiTrainer {
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
virtual void InitDumpEnv();
virtual Scope* GetWorkerScope(int thread_id);
virtual void RegisterHeterCallback();
protected:
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
};
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB)
class HeterServiceContext {
public:
HeterServiceContext() {}
virtual ~HeterServiceContext() {
for (OperatorBase* op : ops_) {
delete op;
}
std::vector<OperatorBase*>().swap(ops_);
}
void Reset() { push_dense_status_.clear(); }
int place_num_;
Scope* scope_{nullptr};
cudaEvent_t event_;
std::vector<OperatorBase*> ops_;
std::vector<::std::future<int32_t>> push_dense_status_;
};
class HeterXpuTrainer : public TrainerBase {
public:
HeterXpuTrainer() {}
virtual ~HeterXpuTrainer() {
for (OperatorBase* op : ops_) {
delete op;
}
std::vector<OperatorBase*>().swap(ops_);
}
virtual void Initialize(const TrainerDesc& trainer_desc, Dataset* data_set);
virtual void InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place);
virtual void InitOtherEnv(const ProgramDesc& main_program);
virtual void Run();
virtual void Finalize();
virtual void DumpWork(int tid);
virtual void RegisterServiceHandler();
virtual int RunTask(const HeterRequest* request, HeterResponse* response);
virtual Scope* GetWorkerScope(int thread_id);
virtual void CacheProgram(const ProgramDesc& main_program) {
new (&program_) ProgramDesc(main_program);
}
template <typename T>
void HeterMemCpy(LoDTensor* tensor, LoDTensor* root_tensor,
const paddle::platform::Place& thread_place,
cudaStream_t stream);
void CreateThreadParam(const ProgramDesc& program, int num);
template <typename T>
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
int EndPass(const HeterRequest* request, HeterResponse* response);
int StopService(const HeterRequest* request, HeterResponse* response);
protected:
DownpourWorkerParameter param_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
std::vector<std::string> need_merge_var_names_;
float scale_datanorm_;
int xpu_begin_op_index_;
int xpu_end_op_index_;
bool running_;
paddle::platform::Place place_;
std::mutex mutex_;
ProgramDesc program_;
std::condition_variable cond_;
std::shared_ptr<paddle::framework::FleetWrapper> fleet_ptr_;
std::shared_ptr<paddle::framework::HeterWrapper> heter_ptr_;
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
std::vector<OperatorBase*> ops_;
std::vector<std::string> op_names_;
std::vector<Scope*> place_scopes_;
BtObjectPool<HeterServiceContext> object_pool_;
std::vector<cudaStream_t> copy_streams_;
std::vector<platform::Place> places_;
std::vector<cudaEvent_t> events_;
};
#endif
#if defined(PADDLE_WITH_NCCL)
class PipelineTrainer : public TrainerBase {
......
......@@ -52,6 +52,12 @@ message TrainerDesc {
optional bool enable_random_dump = 24 [ default = false ];
optional bool random_with_lineid = 25 [ default = false ];
optional int32 dump_interval = 26 [ default = 10000 ];
repeated int32 worker_places = 27;
repeated string xpu_send_list = 28;
repeated string xpu_recv_list = 29;
optional int32 xpu_start_idx = 30;
optional int32 xpu_end_idx = 31;
// device worker parameters
optional HogwildWorkerParameter hogwild_param = 101;
......
......@@ -63,6 +63,9 @@ std::shared_ptr<TrainerBase> TrainerFactory::CreateTrainer(
REGISTER_TRAINER_CLASS(MultiTrainer);
REGISTER_TRAINER_CLASS(DistMultiTrainer);
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB)
REGISTER_TRAINER_CLASS(HeterXpuTrainer);
#endif
#if defined(PADDLE_WITH_NCCL)
REGISTER_TRAINER_CLASS(PipelineTrainer);
#endif
......
set(PYBIND_DEPS pybind python proto_desc memory executor fleet_wrapper box_wrapper prune
feed_fetch_method pass_builder parallel_executor profiler layer tracer engine scope_pool
analysis_predictor imperative_profiler imperative_flag save_load_util dlpack_tensor device_context
gloo_wrapper infer_io_utils)
gloo_wrapper infer_io_utils heter_wrapper)
if (WITH_NCCL)
set(PYBIND_DEPS ${PYBIND_DEPS} nccl_wrapper)
......@@ -31,6 +31,7 @@ set(PYBIND_SRCS
global_value_getter_setter.cc
reader_py.cc
fleet_wrapper_py.cc
heter_wrapper_py.cc
gloo_wrapper_py.cc
box_helper_py.cc
data_set_py.cc
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <fcntl.h>
#ifdef _POSIX_C_SOURCE
#undef _POSIX_C_SOURCE
#endif
#ifdef _XOPEN_SOURCE
#undef _XOPEN_SOURCE
#endif
#include <string>
#include <vector>
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/text_format.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include "paddle/fluid/pybind/heter_wrapper_py.h"
namespace py = pybind11;
namespace paddle {
namespace pybind {
#ifdef PADDLE_WITH_PSLIB
void BindHeterWrapper(py::module* m) {
py::class_<framework::HeterWrapper, std::shared_ptr<framework::HeterWrapper>>(
*m, "Heter")
.def(py::init([]() { return framework::HeterWrapper::GetInstance(); }))
.def("create_client2xpu_connection",
&framework::HeterWrapper::CreateClient2XpuConnection)
.def("set_xpu_list", &framework::HeterWrapper::SetXpuList)
.def("start_xpu_service", &framework::HeterWrapper::StartXpuService)
.def("end_pass", &framework::HeterWrapper::EndPass)
.def("stop_xpu_service", &framework::HeterWrapper::StopXpuService);
} // end HeterWrapper
#endif
} // end namespace pybind
} // end namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "pybind11/pybind11.h"
#include "pybind11/stl.h"
namespace py = pybind11;
namespace paddle {
namespace pybind {
#ifdef PADDLE_WITH_PSLIB
void BindHeterWrapper(py::module* m);
#endif
} // namespace pybind
} // namespace paddle
......@@ -66,6 +66,7 @@ limitations under the License. */
#include "paddle/fluid/pybind/fleet_wrapper_py.h"
#include "paddle/fluid/pybind/global_value_getter_setter.h"
#include "paddle/fluid/pybind/gloo_wrapper_py.h"
#include "paddle/fluid/pybind/heter_wrapper_py.h"
#include "paddle/fluid/pybind/imperative.h"
#include "paddle/fluid/pybind/inference_api.h"
#include "paddle/fluid/pybind/ir.h"
......@@ -2479,6 +2480,9 @@ All parameter, weight, gradient are variables in Paddle.
.def("device_count", &ParallelExecutor::DeviceCount);
BindFleetWrapper(&m);
#ifdef PADDLE_WITH_PSLIB
BindHeterWrapper(&m);
#endif
BindGlooWrapper(&m);
BindBoxHelper(&m);
#ifdef PADDLE_WITH_BOX_PS
......
......@@ -223,7 +223,8 @@ class DownpourSGD(DeviceWorker):
dense_table_set.add(i)
break
trainer_desc.device_worker_name = "DownpourWorker"
trainer_desc.device_worker_name = opt_info.get("worker_class",
"DownpourWorker")
pull_thread = trainer_desc.pull_dense_param
pull_thread.device_num = trainer_desc.thread_num
if opt_info.get("program_id_to_worker") is None:
......
......@@ -1300,6 +1300,12 @@ class Executor(object):
fetch_list=None,
fetch_info=None,
print_period=100):
is_heter = 0
if not program._fleet_opt is None:
if program._fleet_opt.get("worker_class", "") == "HeterCpuWorker":
is_heter = 1
if program._fleet_opt("trainer", "") == "HeterXpuTrainer":
is_heter = 1
if scope is None:
scope = global_scope()
if fetch_list is None:
......@@ -1308,6 +1314,11 @@ class Executor(object):
fetch_info = []
assert len(fetch_list) == len(fetch_info)
compiled = isinstance(program, compiler.CompiledProgram)
if is_heter:
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fu = FleetUtil()
ret = fu.split_program_by_device(program)
if not compiled:
# TODO: Need a better way to distinguish and specify different execution mode
if program._pipeline_opt:
......@@ -1317,6 +1328,8 @@ class Executor(object):
trainer = TrainerFactory()._create_trainer(program._fleet_opt)
trainer._set_thread_barrier(program._is_distributed)
trainer._set_program(program)
if is_heter:
trainer._set_heter_info(ret)
else:
if program._pipeline_opt:
trainer = TrainerFactory()._create_trainer(
......@@ -1476,6 +1489,60 @@ class Executor(object):
debug, fetch_list, fetch_info,
print_period, fetch_handler)
def start_heter_trainer(self,
program=None,
scope=None,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
return self._start_heter_trainer(program, scope, False, debug,
fetch_list, fetch_info, print_period,
fetch_handler)
def _start_heter_trainer(self,
program=None,
scope=None,
is_infer=False,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
scope, trainer = self._prepare_trainer(
program=program,
dataset=None,
scope=scope,
thread=1,
debug=debug,
fetch_list=fetch_list,
fetch_info=fetch_info,
print_period=print_period)
trainer._set_infer(is_infer)
trainer._gen_trainer_desc()
self._dump_debug_info(program=program, trainer=trainer)
trainer_instance = self._default_executor.init_for_dataset(
program.desc, trainer._desc(), scope, None)
#if fetch_handler is not None:
# scope0 = trainer_instance.get_worker_scope(0)
# fetch_monitor = FetchHandlerMonitor(scope0, fetch_handler)
# fetch_monitor.start()
# self._default_executor.run_from_dataset(trainer_instance)
# fetch_monitor.stop()
# self._default_executor.release_trainer(trainer_instance)
#else:
self._default_executor.run_from_dataset(trainer_instance)
#self._default_executor.release_trainer(trainer_instance)
return trainer_instance
def train_from_dataset(self,
program=None,
dataset=None,
......
......@@ -149,6 +149,16 @@ class Fleet(object):
"""
return self._role_maker.is_server()
def is_xpu(self):
"""
Check whether the node is an instance of server.
Returns:
bool: True if this is a node of server,
False if not.
"""
return self._role_maker.is_xpu()
def split_files(self, files):
"""
split files before distributed training,
......
......@@ -28,6 +28,7 @@ __all__ = [
class Role:
WORKER = 1
SERVER = 2
XPU = 3
class MockBarrier(object):
......@@ -988,6 +989,147 @@ class GeneralRoleMaker(RoleMakerBase):
http_server.stop()
class HeterRoleMaker(GeneralRoleMaker):
"""
This role maker is for general use, you can set os.environ to customize:
PADDLE_PSERVERS_IP_PORT_LIST : all pservers' ip:port, separated by ','
PADDLE_TRAINER_ENDPOINTS : all trainers' ip:port, separated by ','
TRAINING_ROLE : TRAINER or PSERVER
PADDLE_TRAINER_ID : current trainer id (only for trainer),
it is index in PADDLE_TRAINER_ENDPOINTS
PADDLE_PSERVER_ID : current pserver id (only for pserver)
it is index in PADDLE_PSERVERS_IP_PORT_LIST
"""
def generate_role(self):
"""
generate role for general role maker
"""
if not self._role_is_generated:
eplist = os.environ["PADDLE_PSERVERS_IP_PORT_LIST"].split(",")
training_role = os.environ["TRAINING_ROLE"]
worker_endpoints = os.environ["PADDLE_TRAINER_ENDPOINTS"].split(",")
trainers_num = len(worker_endpoints)
xpu_endpoints = os.environ["PADDLE_XPU_ENDPOINTS"].split(",")
xpu_num = len(xpu_endpoints)
if training_role not in ["TRAINER", "PSERVER", "XPU"]:
raise ValueError(
"TRAINING_ROLE must be PSERVER or TRAINER or XPU")
if training_role == "TRAINER":
role = Role.WORKER
current_id = int(os.environ["PADDLE_TRAINER_ID"])
self._node_type = 1
self._cur_endpoint = worker_endpoints[current_id]
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(worker_endpoints),
self._hdfs_path.rstrip("/") + "/trainer",
self._hdfs_name, self._hdfs_ugi, self._iface,
self._prefix)
self._node_type_comm = gloo
elif training_role == "XPU":
role = Role.XPU
current_id = int(os.environ["PADDLE_XPU_ID"])
self._node_type = 2
self._cur_endpoint = xpu_endpoints[current_id]
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(xpu_endpoints),
self._hdfs_path.rstrip("/") + "/xpu", self._hdfs_name,
self._hdfs_ugi, self._iface, self._prefix)
self._node_type_comm = gloo
elif training_role == "PSERVER":
role = Role.SERVER
if os.environ.get("PADDLE_PSERVER_ID") is not None:
current_id = int(os.environ["PADDLE_PSERVER_ID"])
cur_endpoint = eplist[current_id]
else:
# this is for compatible with paddlecloud
cur_ip = os.environ["POD_IP"]
cur_port = os.environ["PADDLE_PORT"]
cur_endpoint = ":".join([cur_ip, cur_port])
current_id = eplist.index(cur_endpoint)
self._node_type = 0
self._cur_endpoint = cur_endpoint
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(eplist),
self._hdfs_path.rstrip("/") + "/pserver",
self._hdfs_name, self._hdfs_ugi, self._iface,
self._prefix)
self._node_type_comm = gloo
if training_role == "TRAINER" or training_role == "XPU":
gloo = fluid.core.Gloo()
heter_list = worker_endpoints + xpu_endpoints
gloo.init(
heter_list.index(self._cur_endpoint),
len(heter_list),
self._hdfs_path.rstrip("/") + "/heter", self._hdfs_name,
self._hdfs_ugi, self._iface, self._prefix)
self._heter_comm = gloo
gloo = fluid.core.Gloo()
all_list = worker_endpoints + eplist + xpu_endpoints
gloo.init(
all_list.index(self._cur_endpoint),
len(all_list),
self._hdfs_path.rstrip("/") + "/all", self._hdfs_name,
self._hdfs_ugi, self._iface, self._prefix)
self._all_comm = gloo
self._trainers_num = trainers_num
self._server_endpoints = eplist
self._role = role
self._current_id = current_id
self._rank = all_list.index(self._cur_endpoint)
self._size = len(all_list)
self._worker_endpoints = worker_endpoints
self._xpu_endpoints = xpu_endpoints
self._role_is_generated = True
def is_xpu(self):
"""
whether current process is server
"""
if not self._role_is_generated:
self.generate_role()
return self._role == Role.XPU
def is_first_xpu(self):
"""
whether current process is worker of rank 0
"""
if not self._role_is_generated:
self.generate_role()
return self._role == Role.XPU and self._current_id == 0
def _barrier_xpu(self):
"""
barrier all workers in current distributed job
"""
if not self._role_is_generated:
self.generate_role()
if self.is_xpu():
self._node_type_comm.barrier()
def _barrier_heter(self):
"""
barrier all workers in current distributed job
"""
if not self._role_is_generated:
self.generate_role()
if self.is_xpu() or self.is_worker:
self._heter_comm.barrier()
def xpu_num(self):
"""
"""
if not self._role_is_generated:
self.generate_role()
return len(self._xpu_endpoints)
class UserDefinedRoleMaker(RoleMakerBase):
"""
UserDefinedRoleMaker is designed for worker and server assignment
......
......@@ -23,6 +23,7 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import HeterRoleMaker
class PSLib(Fleet):
......@@ -44,6 +45,9 @@ class PSLib(Fleet):
role_maker = MPISymetricRoleMaker()
super(PSLib, self).init(role_maker)
self._fleet_ptr = fluid.core.Fleet()
self._heter_ptr = None
if isinstance(role_maker, HeterRoleMaker):
self._heter_ptr = fluid.core.Heter()
def _set_client_communication_config(self, request_timeout_ms,
connect_timeout_ms, max_retry):
......@@ -77,15 +81,27 @@ class PSLib(Fleet):
raise Exception(
"You should run DistributedOptimizer.minimize() first")
# barrier_all for init_server, wait for server starts
if isinstance(self._role_maker, HeterRoleMaker):
if self._role_maker.is_xpu():
local_endpoint = self._role_maker.get_local_endpoint()
local_endpoint = local_endpoint.split(":")
self._heter_ptr.start_xpu_service(
str(local_endpoint[0]), int(local_endpoint[1]))
self._role_maker._barrier_all()
self.all_ips_ = self._role_maker._all_gather(self._local_ip)
# worker_index * 2 is for compatible with older versions of pslib
self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_,
self._role_maker._get_size(),
self._role_maker.worker_index() * 2)
if isinstance(self._role_maker, HeterRoleMaker):
if self._role_maker.is_worker():
self._heter_ptr.set_xpu_list(
self._role_maker._xpu_endpoints)
self._heter_ptr.create_client2xpu_connection()
# barrier_all for init_worker
self._role_maker._barrier_all()
# prepare for client to client communication
if self._role_maker.is_worker():
info = self._fleet_ptr.get_clients_info()
all_info = self._role_maker._worker_gather(info[0])
self._fleet_ptr.gather_clients(all_info)
......@@ -144,6 +160,12 @@ class PSLib(Fleet):
>>> fleet.init_server("/you/path/to/model", mode = 0)
"""
mode = kwargs.get("mode", 0)
if isinstance(self._role_maker, HeterRoleMaker):
self._role_maker._barrier_xpu()
if self._role_maker.is_first_xpu():
self._fleet_ptr.load_model(model_dir, mode)
self._role_maker._barrier_xpu()
else:
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.load_model(model_dir, mode)
......@@ -185,6 +207,54 @@ class PSLib(Fleet):
raise Exception(
"You should run DistributedOptimizer.minimize() first")
def end_pass(self, scope):
if self._role_maker.worker_index() < self._role_maker.xpu_num():
self._heter_ptr.end_pass(scope, self._role_maker.worker_index())
self._heter_ptr.stop_xpu_service(self._role_maker.worker_index())
def train_from_dataset(self,
executor,
program=None,
dataset=None,
scope=None,
thread=0,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
"""
"""
if self._role_maker.is_worker():
self._role_maker._barrier_heter()
executor.train_from_dataset(program, dataset, scope, thread, debug,
fetch_list, fetch_info, print_period,
fetch_handler)
def start_heter_trainer(self,
executor,
program=None,
scope=None,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
"""
"""
trainer_instance = executor.start_heter_trainer(
program, scope, debug, fetch_list, fetch_info, print_period,
fetch_handler)
if self._role_maker.is_xpu():
print("barrier heter")
self._role_maker._barrier_heter()
print("barrier heter")
executor._default_executor.release_trainer(trainer_instance)
def stop_worker(self):
"""
stop(): will be called after a user finishes his/her training task. Fleet instance will be
......@@ -197,6 +267,7 @@ class PSLib(Fleet):
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.stop_server()
self._heter_ptr.stop_xpu_service()
self._role_maker._barrier_worker()
self._role_maker._barrier_all()
self._role_maker._finalize()
......
......@@ -509,13 +509,15 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info = {}
opt_info["program_id_to_worker"] = prog_id_to_worker
opt_info["program_configs"] = program_configs
opt_info["trainer"] = "DistMultiTrainer"
opt_info["trainer"] = strategy.get("trainer", "DistMultiTrainer")
opt_info["device_worker"] = strategy.get("device_worker", "DownpourSGD")
opt_info["optimizer"] = "DownpourSGD"
opt_info["fleet_desc"] = ps_param
opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False)
opt_info["no_cvm"] = strategy.get("no_cvm", False)
opt_info["worker_class"] = strategy.get("worker_class",
"DownpourWorker")
opt_info["stat_var_names"] = strategy.get("stat_var_names", [])
opt_info["local_tables"] = strategy.get("local_tables", [])
opt_info["async_tables"] = strategy.get("async_tables", [])
......@@ -529,6 +531,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["dump_file_num"] = strategy.get("dump_file_num", 16)
opt_info["dump_fields_path"] = strategy.get("dump_fields_path", "")
opt_info["dump_param"] = strategy.get("dump_param", [])
opt_info["worker_places"] = strategy.get("worker_places", [])
if server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class in [
"DownpourCtrAccessor", "DownpourCtrDoubleAccessor",
......
......@@ -14,6 +14,7 @@
"""Fleet Utils."""
import collections
import copy
import json
import logging
import math
......@@ -1615,3 +1616,123 @@ class FleetUtil(object):
"""
program = utils.load_program(prog_path, is_text)
utils.parse_program(program, output_dir)
def split_program_by_device(self, program):
ops_list = []
type_list = []
pre = None
type_cpu = "cpu"
for op in program.global_block().ops:
if op.has_attr("op_device"):
if pre is None or pre != op.attr("op_device"):
ops_list.append([])
type_list.append(
op.attr("op_device")
if op.attr("op_device") != "" else type_cpu)
ops_list[-1].append(op)
pre = op.attr("op_device")
l = len(type_list)
i = 0
type_heter = None
while i < l:
while i < l and type_list[i] == type_cpu:
i += 1
if i == l:
break
type_heter = type_list[i]
i += 1
start = i
valid = True
while i < l and type_list[i] != type_heter:
if type_list[i] != type_cpu:
valid = False
break
i += 1
if i == l:
break
elif not valid:
continue
for j in range(start, i):
for op in ops_list[j]:
op._set_attr("op_device", type_heter)
type_list[j] = type_heter
j += 1
pre = None
merged_ops_list = []
merged_type_list = []
for i in range(l):
if pre is None or pre != type_list[i]:
merged_ops_list.append([])
merged_type_list.append(type_list[i])
merged_ops_list[-1].extend(ops_list[i])
pre = type_list[i]
data_vars = set()
for k in program.global_block().vars:
var = program.global_block().var(k)
if not var.persistable:
data_vars.add(var.name)
l = len(merged_ops_list)
inputs_pre = set()
outputs_pre = set()
in_from_pre = [[] for i in range(l)]
for i in range(l):
inputs = set()
outputs = set()
for op in merged_ops_list[i]:
for input in op.input_names:
for tmp in op.input(input):
if tmp not in outputs:
inputs.add(tmp)
for output in op.output_names:
for tmp in op.output(output):
outputs.add(tmp)
if i == 0:
in_from_pre[i] = []
elif i == 1:
in_from_pre[i] = (outputs_pre | data_vars) & inputs
else:
in_from_pre[i] = outputs_pre & inputs
inputs_pre = copy.deepcopy(inputs)
outputs_pre = copy.deepcopy(outputs)
l = len(in_from_pre)
start_list = []
end_list = []
send_list = [[] for i in range(l)]
sum = 0
program_list = []
for i in range(l):
start_list.append(sum)
end_list.append(sum + len(merged_ops_list[i]) - 1)
sum += len(merged_ops_list[i])
if i < l - 1:
send_list[i].extend(list(in_from_pre[i + 1]))
prog = program.clone()
if merged_type_list[i] != type_cpu:
prog = prog._prune_with_input(
list(in_from_pre[i]), list(send_list[i]))
program_list.append(prog)
else:
program_list.append(prog)
recv_list = [list(i) for i in in_from_pre]
found = False
heter_index = None
for i in range(len(merged_type_list)):
t = merged_type_list[i]
if t != type_cpu:
if found:
print("only one region of program can be heter")
found = True
heter_index = i
if heter_index is None:
print("warning: non heter program")
return None
else:
return [start_list[heter_index], end_list[heter_index], send_list[heter_index], \
recv_list[heter_index], program_list[heter_index]]
......@@ -15,7 +15,10 @@
import sys
import os
__all__ = ['TrainerDesc', 'MultiTrainer', 'DistMultiTrainer', 'PipelineTrainer']
__all__ = [
'TrainerDesc', 'MultiTrainer', 'DistMultiTrainer', 'PipelineTrainer',
'HeterXpuTrainer'
]
class TrainerDesc(object):
......@@ -48,6 +51,43 @@ class TrainerDesc(object):
self._program = None
self._infer = False
def _set_heter_info(self, ret):
#ret = = fu.split_program_by_device(program)
#start_list, end_list, send_list, recv_list, program_list = fu.split_program_by_device(program)
#if len(start_list) != 3:
# print("start_list len=", len(start_list), " will not set heter info")
# return
#for i in start_list[0]:
# self.proto_desc.op_run_start_idx.append(i)
#for i in end_list[0]:
# self.proto_desc.op_run_end_idx.append(i)
#for i in send_list[0]:
# self.proto_desc.op_run_send_list.append(i)
#for i in recv_list[0]:
# self.proto_desc.op_run_recv_list.append(i)
if ret is None:
return
#for i in ret[0]: # start_list[1]:
# self.proto_desc.xpu_start_idx.append(i)
self.proto_desc.xpu_start_idx = ret[0]
#for i in ret[1]: #end_list[1]:
# self.proto_desc.o_end_idx.append(i)
self.proto_desc.xpu_end_idx = ret[1]
for i in ret[2]: # send_list[1]:
self.proto_desc.xpu_send_list.append(i)
for i in ret[3]: # recv_list[1]:
self.proto_desc.xpu_recv_list.append(i)
#for i in start_list[2]:
# self.proto_desc.op_run_end_start_idx.append(i)
#for i in end_list[2]:
# self.proto_desc.op_run_end_idx.append(i)
#for i in send_list[2]:
# self.proto_desc.op_run_end_send_list.append(i)
#for i in recv_list[2]:
# self.proto_desc.op_run_end_recv_list.append(i)
def _set_fetch_var_and_info(self, fetch_vars, fetch_info, print_period):
# convert fetch_info to list
fetch_info = list(fetch_info)
......@@ -122,6 +162,10 @@ class TrainerDesc(object):
for param in dump_param:
self.proto_desc.dump_param.append(param)
def _set_worker_places(self, worker_places):
for place in worker_places:
self.proto_desc.worker_places.append(place)
def _set_thread_barrier(self, thread_barrier):
self.proto_desc.thread_barrier = thread_barrier
......@@ -272,6 +316,30 @@ class DistMultiTrainer(TrainerDesc):
self._device_worker._gen_worker_desc(self.proto_desc)
class HeterXpuTrainer(TrainerDesc):
"""
Implement of HeterXpuTrainer.
It's for Distributed training.
"""
def __init__(self):
super(HeterXpuTrainer, self).__init__()
pass
def _set_program(self, program):
super(HeterXpuTrainer, self)._set_program(program)
self._program = program
def _gen_trainer_desc(self):
super(HeterXpuTrainer, self)._gen_trainer_desc()
self.proto_desc.class_name = "HeterXpuTrainer"
if self._program == None:
raise RuntimeError("None Program")
self._device_worker._set_infer(self._infer)
self._device_worker._set_program(self._program)
self._device_worker._gen_worker_desc(self.proto_desc)
class PipelineTrainer(TrainerDesc):
"""
Implement of PipelineTrainer.
......
......@@ -22,7 +22,7 @@ from paddle.fluid.log_helper import get_logger
local_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s')
from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer
from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer, HeterXpuTrainer
from .device_worker import Hogwild, DownpourSGD, Section, DownpourSGDOPT
from .framework import Variable
from multiprocessing import Process, Manager
......@@ -75,6 +75,8 @@ class TrainerFactory(object):
if opt_info.get("dump_param") is not None and len(
opt_info.get("dump_param")) != 0:
trainer._set_dump_param(opt_info["dump_param"])
if opt_info.get("worker_places") is not None:
trainer._set_worker_places(opt_info["worker_places"])
if opt_info.get("enable_random_dump") is not None:
trainer._set_enable_random_dump(opt_info[
"enable_random_dump"])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册