提交 862dde5e 编写于 作者: Z zlsh80826

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into trt_stack_op

......@@ -118,7 +118,7 @@ function(op_library TARGET)
"tensor_array_read_write_op" "tensorrt_engine_op" "conv_fusion_op"
"fusion_transpose_flatten_concat_op" "fusion_conv_inception_op"
"sync_batch_norm_op" "dgc_op" "fused_fc_elementwise_layernorm_op"
"multihead_matmul_op" "fusion_group_op" "fused_bn_activation_op" "fused_embedding_eltwise_layernorm_op")
"multihead_matmul_op" "fusion_group_op" "fused_bn_activation_op" "fused_embedding_eltwise_layernorm_op" "fusion_gru_op")
if ("${TARGET}" STREQUAL "${manual_pybind_op}")
set(pybind_flag 1)
endif()
......
......@@ -27,6 +27,7 @@ add_subdirectory(fleet)
add_subdirectory(io)
#ddim lib
proto_library(framework_proto SRCS framework.proto)
proto_library(heter_service_proto SRCS heter_service.proto)
proto_library(data_feed_proto SRCS data_feed.proto)
proto_library(trainer_desc_proto SRCS trainer_desc.proto DEPS framework_proto
data_feed_proto)
......@@ -195,20 +196,37 @@ cc_library(executor_gc_helper SRCS executor_gc_helper.cc DEPS scope proto_desc o
if(WITH_DISTRIBUTE)
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc downpour_worker_opt.cc
heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto trainer_desc_proto glog fs shell fleet_wrapper box_wrapper lodtensor_printer
device_context scope framework_proto trainer_desc_proto glog fs shell
fleet_wrapper heter_wrapper box_wrapper lodtensor_printer
lod_rank_table feed_fetch_method sendrecvop_rpc communicator collective_helper ${GLOB_DISTRIBUTE_DEPS}
graph_to_program_pass variable_helper data_feed_proto timer monitor)
graph_to_program_pass variable_helper data_feed_proto timer monitor
heter_service_proto)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
elseif(WITH_PSLIB)
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor pslib_brpc )
# TODO: Fix these unittest failed on Windows
if(NOT WIN32)
cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op)
endif()
else()
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc downpour_worker_opt.cc
heterxpu_trainer.cc
data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc downpour_worker.cc downpour_worker_opt.cc
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper box_wrapper lodtensor_printer feed_fetch_method
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
# TODO: Fix these unittest failed on Windows
if(NOT WIN32)
......
......@@ -27,6 +27,7 @@ limitations under the License. */
#include <vector>
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/program_desc.h"
......@@ -51,10 +52,23 @@ bool CheckValidOutput(LoDTensor* tensor, size_t batch_size);
class FleetWrapper;
#ifdef PADDLE_WITH_PSLIB
class HeterWrapper;
#endif
class PullDenseWorker {
public:
virtual ~PullDenseWorker() {}
virtual void Initialize(const TrainerDesc& param);
#ifdef PADDLE_WITH_CUDA
void AddStream(const cudaStream_t stream) { copy_streams_.push_back(stream); }
void AddPlace(const paddle::platform::Place place) {
places_.push_back(place);
}
void AddThreadScope(Scope* scope) { thread_scopes_.push_back(scope); }
#endif
int Start();
void Stop();
void SetRootScope(Scope* scope) { root_scope_ = scope; }
......@@ -62,6 +76,7 @@ class PullDenseWorker {
void ResetThreadVersion(uint64_t table_id);
void Wait(std::vector<::std::future<int32_t>>* status_vec);
void PullDense(bool force_update = false);
void CreatePinVar();
int GetThreadIdByScope(const Scope* scope);
void SetThreadIdByScope(const Scope* scope, int tid);
static std::shared_ptr<PullDenseWorker> GetInstance() {
......@@ -105,6 +120,12 @@ class PullDenseWorker {
std::mutex mutex_for_mean_scale_;
float total_batch_num_ = 0;
std::unordered_map<const Scope*, int> scope_to_thread_id_;
#ifdef PADDLE_WITH_CUDA
std::vector<cudaStream_t> copy_streams_;
std::vector<paddle::platform::Place> places_;
std::vector<Scope*> thread_scopes_;
#endif
};
// should incorporate different type of device
......@@ -126,6 +147,8 @@ class DeviceWorker {
virtual void BindingDataFeedMemory() = 0;
virtual void SetRootScope(Scope* root_scope);
virtual void SetDataFeed(DataFeed* data_feed);
virtual void SetWorkerNum(int num) {}
virtual void CacheProgram(const ProgramDesc& main_program) {}
virtual void SetNeedDumpField(bool need_dump_field) {
need_dump_field_ = need_dump_field;
}
......@@ -161,6 +184,7 @@ class DeviceWorker {
FetchConfig fetch_config_;
bool use_cvm_;
bool no_cvm_;
TrainerDesc trainer_desc_;
// dump params or grads for debug
bool need_dump_param_;
......@@ -306,6 +330,87 @@ class DownpourWorkerOpt : public DownpourWorker {
uint64_t async_tid_ = 0;
};
#ifdef PADDLE_WITH_PSLIB
class HeterCpuWorker : public HogwildWorker {
public:
HeterCpuWorker() {}
virtual ~HeterCpuWorker() {}
virtual void Initialize(const TrainerDesc& desc);
virtual void TrainFiles();
virtual void TrainFilesWithProfiler();
virtual void SetNeedDump(bool need_dump_field);
virtual void SetChannelWriter(ChannelObject<std::string>* queue);
virtual void SetWorkerNum(int num) { worker_num_ = num; }
virtual void Schedule(int taskid);
virtual void JumpContext(std::shared_ptr<HeterTask> task);
virtual void CacheProgram(const ProgramDesc& main_program) {
new (&program_) ProgramDesc(main_program);
}
virtual void GetXpuOpIndex();
protected:
std::shared_ptr<paddle::framework::FleetWrapper> fleet_ptr_;
std::shared_ptr<paddle::framework::HeterWrapper> heter_ptr_;
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
void FillSparseValue(std::shared_ptr<HeterTask> task, size_t table_id);
void PushGradients();
void CollectLabelInfo(std::shared_ptr<HeterTask> task, size_t table_id);
void AdjustInsWeight(std::shared_ptr<HeterTask> task);
void DumpParam();
void CopySparseTable();
void CopyDenseTable();
void CopyDenseVars();
private:
int mpi_rank_;
int worker_num_;
int xpu_begin_op_index_;
int xpu_end_op_index_;
ProgramDesc program_;
HeterObjectPool<HeterTask> object_pool_;
HeterList<int, std::shared_ptr<HeterTask>> run_queue_;
HeterList<int, std::shared_ptr<HeterTask>> wait_queue_;
bool need_dump_param_;
std::vector<std::string> dump_param_;
bool need_to_push_dense_;
bool need_dump_field_;
bool dump_slot_;
bool need_to_push_sparse_;
std::vector<std::string> dump_fields_;
ChannelWriter<std::string> writer_;
DownpourWorkerParameter param_;
float scale_datanorm_;
// just save the value in param_ for easy access
std::map<uint64_t, std::string> label_var_name_;
std::map<uint64_t, std::vector<std::string>> sparse_key_names_;
std::map<uint64_t, std::vector<std::string>> sparse_value_names_;
std::map<uint64_t, std::vector<std::string>> sparse_grad_names_;
std::map<uint64_t, std::vector<std::string>> dense_value_names_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
platform::Place root_place_;
// actually pushed feasign of each table
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
// skipped ops
std::vector<std::string> skip_ops_;
std::vector<::std::future<int32_t>> push_sparse_status_;
std::vector<::std::future<int32_t>> push_dense_status_;
// adjust ins weight
AdjustInsWeightConfig adjust_ins_weight_config_;
std::vector<float> nid_show_;
// check nan and inf during training
std::vector<std::string> check_nan_var_names_;
// copy table
CopyTableConfig copy_table_config_;
std::map<uint64_t, uint64_t> table_dependency_;
std::vector<std::pair<uint64_t, uint64_t>> copy_sparse_tables_;
std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
std::unordered_map<uint64_t, std::unordered_set<uint64_t>> feasign_set_;
};
#endif
#if defined(PADDLE_WITH_NCCL)
class SectionWorker : public DeviceWorker {
public:
......
......@@ -62,6 +62,9 @@ std::shared_ptr<DeviceWorker> DeviceWorkerFactory::CreateDeviceWorker(
REGISTER_DEVICE_WORKER_CLASS(HogwildWorker);
REGISTER_DEVICE_WORKER_CLASS(DownpourWorker);
REGISTER_DEVICE_WORKER_CLASS(DownpourWorkerOpt);
#ifdef PADDLE_WITH_PSLIB
REGISTER_DEVICE_WORKER_CLASS(HeterCpuWorker);
#endif
#if defined(PADDLE_WITH_NCCL)
REGISTER_DEVICE_WORKER_CLASS(SectionWorker);
#endif
......
......@@ -35,7 +35,7 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc,
dump_file_num_ = trainer_desc.dump_file_num();
const std::vector<paddle::framework::DataFeed *> readers =
dataset->GetReaders();
RegisterHeterCallback();
thread_num_ = readers.size();
workers_.resize(thread_num_);
for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size();
......@@ -55,6 +55,7 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc,
workers_[i]->SetDumpParamVector(dump_param_);
workers_[i]->InitRandomDumpConfig(trainer_desc);
workers_[i]->Initialize(trainer_desc);
workers_[i]->SetWorkerNum(thread_num_);
}
VLOG(3) << "going to initialize pull dense worker";
......@@ -64,6 +65,13 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc,
SetDebug(trainer_desc.debug());
}
void DistMultiTrainer::RegisterHeterCallback() {
auto fleet_ptr = FleetWrapper::GetInstance();
fleet_ptr->RegisterHeterCallback([this](int worker, int taskid) {
// workers_[worker]->Schedule(taskid);
});
}
void DistMultiTrainer::InitDumpEnv() {
queue_ = paddle::framework::MakeChannel<std::string>();
for (int i = 0; i < thread_num_; ++i) {
......@@ -90,6 +98,9 @@ void DistMultiTrainer::InitTrainerEnv(const ProgramDesc &main_program,
workers_[i]->SetRootScope(root_scope_);
workers_[i]->CreateDeviceResource(main_program); // Program
workers_[i]->BindingDataFeedMemory();
#ifdef PADDLE_WITH_PSLIB
workers_[i]->CacheProgram(main_program);
#endif
}
// Scope* -> thread id, it will be used in push_dense op
for (int i = 0; i < thread_num_; ++i) {
......@@ -104,6 +115,11 @@ void DistMultiTrainer::InitOtherEnv(const ProgramDesc &main_program) {
}
pull_dense_worker_->SetRootScope(root_scope_);
pull_dense_worker_->Start();
#ifdef PADDLE_WITH_PSLIB
for (int i = 0; i < thread_num_; ++i) {
workers_[i]->GetXpuOpIndex();
}
#endif
VLOG(3) << "init other env done.";
}
......
......@@ -379,7 +379,7 @@ void DownpourWorker::CopyDenseTable() {
pull_dense_status.resize(0);
fleet_ptr_->PullDenseVarsAsync(*root_scope_, dest_table,
dense_value_names_[dest_table],
&pull_dense_status);
&pull_dense_status, true);
for (auto& t : pull_dense_status) {
t.wait();
auto status = t.get();
......
......@@ -19,4 +19,6 @@ else()
cc_library(gloo_wrapper SRCS gloo_wrapper.cc DEPS framework_proto variable_helper scope)
endif(WITH_GLOO)
cc_library(heter_wrapper SRCS heter_wrapper.cc DEPS framework_proto)
cc_test(test_fleet SRCS test_fleet.cc DEPS fleet_wrapper gloo_wrapper fs shell)
......@@ -154,6 +154,219 @@ void FleetWrapper::CreateClient2ClientConnection() {
#endif
}
#ifdef PADDLE_WITH_PSLIB
void FleetWrapper::HeterPullSparseVars(
int workerid, std::shared_ptr<HeterTask> task, const uint64_t table_id,
const std::vector<std::string>& var_names, int fea_value_dim,
const std::vector<std::string>& var_emb_names) {
std::vector<::std::future<int32_t>> pull_sparse_status;
pull_sparse_status.resize(0);
auto& scope = *(task->scope_);
auto& fea_keys = (task->features_)[table_id];
auto& fea_values = (task->feature_values_)[table_id];
fea_keys.clear();
for (size_t var_index = 0; var_index < var_names.size(); ++var_index) {
const std::string& name = var_names[var_index];
Variable* var = scope.FindVar(name);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
CHECK(tensor != nullptr) << "tensor of var " << name << " is null";
int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();
// skip slots which do not have embedding
const std::string& emb_name = var_emb_names[var_index];
Variable* emb_var = scope.FindVar(emb_name);
if (emb_var == nullptr) {
continue;
}
for (auto i = 0u; i < len; ++i) {
if (ids[i] == 0u) {
continue;
}
fea_keys.push_back(static_cast<uint64_t>(ids[i]));
}
}
fea_values.resize(fea_keys.size() + 1);
for (auto& t : fea_values) {
t.resize(fea_value_dim);
}
std::vector<float*> pull_result_ptr;
for (auto& t : fea_values) {
pull_result_ptr.push_back(t.data());
}
auto status = pslib_ptr_->_worker_ptr->heter_pull_sparse(
workerid, pull_result_ptr.data(), table_id, fea_keys.data(),
fea_keys.size(), task->taskid_);
pull_sparse_status.push_back(std::move(status));
for (auto& t : pull_sparse_status) {
t.wait();
auto status = t.get();
if (status != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
}
}
void FleetWrapper::HeterPushSparseVars(
std::shared_ptr<HeterTask> task, const uint64_t table_id,
const std::vector<std::string>& sparse_key_names,
const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<::std::future<int32_t>>* push_sparse_status, const bool use_cvm,
const bool dump_slot, const bool no_cvm) {
auto& scope = *(task->scope_);
int batch_size = task->cur_batch_;
int offset = 2;
int slot_offset = 0;
int grad_dim = emb_dim;
int show_index = 0;
int click_index = 1;
auto& fea_keys = (task->features_)[table_id];
auto& fea_labels = (task->feature_labels_)[table_id];
auto& push_values = (task->feature_grads_)[table_id];
auto& sparse_push_keys = (task->sparse_push_keys_)[table_id];
if (use_cvm) {
offset = 0;
grad_dim = emb_dim - 2;
}
if (no_cvm) {
offset = 0;
grad_dim = emb_dim;
}
if (dump_slot) {
slot_offset = 1;
show_index = 1;
click_index = 2;
}
CHECK_GE(grad_dim, 0);
sparse_push_keys.clear();
sparse_push_keys.reserve(fea_keys.size() + 1);
push_values.resize(fea_keys.size() + 1);
for (auto& t : push_values) {
t.resize(emb_dim + offset + slot_offset);
}
uint64_t fea_idx = 0u;
for (size_t i = 0;
i < sparse_key_names.size() && i < sparse_grad_names.size(); ++i) {
Variable* var = scope.FindVar(sparse_key_names[i]);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == nullptr) {
LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null";
exit(-1);
}
size_t len = tensor->numel();
int64_t* ids = tensor->data<int64_t>();
int slot = 0;
if (dump_slot) {
slot = boost::lexical_cast<int>(sparse_key_names[i]);
}
Variable* g_var = scope.FindVar(sparse_grad_names[i]);
if (g_var == nullptr) {
continue;
}
LoDTensor* g_tensor = g_var->GetMutable<LoDTensor>();
if (g_tensor == nullptr) {
LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null";
exit(-1);
}
float* g = g_tensor->data<float>();
if (scale_sparse_gradient_with_batch_size_ && grad_dim > 0) {
int dim = emb_dim + offset;
Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
g_mat(g, g_tensor->numel() / dim, dim);
g_mat.rightCols(grad_dim) *= batch_size;
}
for (auto id_idx = 0u; id_idx < len; ++id_idx) {
if (ids[id_idx] == 0) {
g += emb_dim;
continue;
}
sparse_push_keys.push_back(ids[id_idx]);
CHECK(fea_idx < push_values.size());
if (use_cvm || no_cvm) {
memcpy(push_values[fea_idx].data() + offset + slot_offset, g,
sizeof(float) * emb_dim);
} else {
CHECK(fea_idx < fea_labels.size());
memcpy(push_values[fea_idx].data() + offset + slot_offset, g,
sizeof(float) * emb_dim);
push_values[fea_idx][show_index] = 1.0f;
push_values[fea_idx][click_index] =
static_cast<float>(fea_labels[fea_idx]);
}
if (dump_slot) {
push_values[fea_idx][0] = static_cast<float>(slot);
}
g += emb_dim;
fea_idx++;
}
}
// slots whose embedding has been stop gradient or
// not involved in forward-backward
uint64_t no_grad_fea_num = 0u;
for (size_t i = sparse_grad_names.size(); i < sparse_key_names.size(); ++i) {
Variable* var = scope.FindVar(sparse_key_names[i]);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == nullptr) {
LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null";
exit(-1);
}
size_t len = tensor->numel();
int64_t* ids = tensor->data<int64_t>();
for (auto id_idx = 0u; id_idx < len; ++id_idx) {
if (ids[id_idx] == 0) {
continue;
}
++no_grad_fea_num;
}
}
CHECK(fea_idx + no_grad_fea_num == fea_keys.size())
<< "fea_idx: " << fea_idx << " no_grad_fea_num: " << no_grad_fea_num
<< " features size: " << fea_keys.size();
CHECK(fea_idx == sparse_push_keys.size());
if (fea_idx == 0) {
return;
}
std::vector<float*> push_g_vec;
for (auto i = 0u; i < sparse_push_keys.size(); ++i) {
push_g_vec.push_back(push_values[i].data());
}
auto status = pslib_ptr_->_worker_ptr->push_sparse(
table_id, sparse_push_keys.data(), (const float**)push_g_vec.data(),
sparse_push_keys.size());
push_sparse_status->push_back(std::move(status));
}
#endif
int FleetWrapper::RegisterHeterCallback(HeterCallBackFunc handler) {
#ifdef PADDLE_WITH_PSLIB
VLOG(3) << "calling FleetWrapper::RegisterHeterCallback";
VLOG(3) << "pslib_ptr_=" << pslib_ptr_;
VLOG(3) << "_worker_ptr=" << pslib_ptr_->_worker_ptr;
return pslib_ptr_->_worker_ptr->registe_heter_callback(handler);
#else
VLOG(0) << "FleetWrapper::RegisterHeterCallback"
<< " does nothing when no pslib";
#endif
return 0;
}
void FleetWrapper::PullSparseToLocal(const uint64_t table_id,
int fea_value_dim) {
#ifdef PADDLE_WITH_PSLIB
......@@ -421,13 +634,17 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
void FleetWrapper::PullDenseVarsAsync(
const Scope& scope, const uint64_t tid,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* pull_dense_status) {
std::vector<::std::future<int32_t>>* pull_dense_status, bool in_cpu) {
#ifdef PADDLE_WITH_PSLIB
auto& regions = _regions[tid];
regions.clear();
regions.resize(var_names.size());
for (auto i = 0u; i < var_names.size(); ++i) {
Variable* var = scope.FindVar(var_names[i]);
std::string varname = var_names[i];
if (!in_cpu) {
varname = var_names[i] + "pin";
}
Variable* var = scope.FindVar(varname);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* w = tensor->data<float>();
paddle::ps::Region reg(w, tensor->numel());
......@@ -485,6 +702,57 @@ void FleetWrapper::PushDenseVarsSync(
Scope* scope, const uint64_t table_id,
const std::vector<std::string>& var_names) {}
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB)
void FleetWrapper::PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size, const paddle::platform::Place& place,
cudaStream_t stream, cudaEvent_t event) {
std::vector<paddle::ps::Region> regions;
for (auto& t : var_names) {
Variable* var = scope.FindVar(t);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int count = tensor->numel();
float* g_data = tensor->data<float>();
Variable* pin_var = scope.FindVar(t + "pin");
LoDTensor* pin_tensor = pin_var->GetMutable<LoDTensor>();
float* pin_g = pin_tensor->mutable_data<float>(tensor->dims(),
platform::CUDAPinnedPlace());
memory::Copy(platform::CUDAPinnedPlace(), pin_g,
BOOST_GET_CONST(platform::CUDAPlace, place), g_data,
sizeof(float) * count, stream);
PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream));
cudaEventSynchronize(event);
float* g = pin_g;
if (scale_datanorm >= 0) {
if (t.find(".batch_size@GRAD") != std::string::npos ||
t.find(".batch_sum@GRAD") != std::string::npos) {
Eigen::Map<Eigen::MatrixXf> mat(g, 1, count);
float scale = 1.0 / batch_size;
mat *= scale;
} else if (t.find(".batch_square_sum@GRAD") != std::string::npos) {
VLOG(3) << "epsilon: " << scale_datanorm;
for (int i = 0; i < count; ++i) {
g[i] = (g[i] - batch_size * scale_datanorm) / batch_size +
batch_size * scale_datanorm;
}
}
}
paddle::ps::Region reg(g, count);
regions.emplace_back(std::move(reg));
}
auto status = pslib_ptr_->_worker_ptr->push_dense(regions.data(),
regions.size(), table_id);
if (push_sparse_status) {
push_sparse_status->push_back(std::move(status));
}
}
#endif
void FleetWrapper::PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
......@@ -1085,8 +1353,8 @@ void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope,
push_status.wait();
auto status = push_status.get();
if (status != 0) {
PADDLE_THORW(platform::errors::Fatal(
"push shrink dense param failed, status is [%d].", status));
// PADDLE_THORW(platform::errors::Fatal(
// "push shrink dense param failed, status is [%d].", status));
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
......
......@@ -28,6 +28,7 @@ limitations under the License. */
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
......@@ -80,6 +81,24 @@ class FleetWrapper {
pull_local_thread_num_ = thread_num;
}
#ifdef PADDLE_WITH_PSLIB
void HeterPullSparseVars(int workerid, std::shared_ptr<HeterTask> task,
const uint64_t table_id,
const std::vector<std::string>& var_names,
int fea_dim,
const std::vector<std::string>& var_emb_names);
void HeterPushSparseVars(
std::shared_ptr<HeterTask> task, const uint64_t table_id,
const std::vector<std::string>& sparse_key_names,
const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<::std::future<int32_t>>* push_sparse_status,
const bool use_cvm, const bool dump_slot, const bool no_cvm);
#endif
typedef std::function<void(int, int)> HeterCallBackFunc;
int RegisterHeterCallback(HeterCallBackFunc handler);
// Pull sparse variables from server in sync mode
// Param<in>: scope, table_id, var_names, fea_keys, fea_dim, var_emb_names
// Param<out>: fea_values
......@@ -118,15 +137,24 @@ class FleetWrapper {
void PullDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* pull_dense_status);
std::vector<::std::future<int32_t>>* pull_dense_status, bool in_cpu);
// push dense parameters(not gradients) to server in sync mode
void PushDenseParamSync(const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names);
// Push dense variables to server in async mode
// Param<in>: scope, table_id, var_names, scale_datanorm, batch_size
// Param<out>: push_sparse_status
// Push dense variables to server in async mode
// Param<in>: scope, table_id, var_names, scale_datanorm, batch_size
// Param<out>: push_sparse_status
#ifdef PADDLE_WITH_CUDA
void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size,
const paddle::platform::Place& place, cudaStream_t stream,
cudaEvent_t event);
#endif
void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
......
......@@ -54,10 +54,10 @@ void HdfsStore::set(const std::string& key, const std::vector<char>& data) {
paddle::framework::fs_remove(tmp);
if (i == retry_times_) {
VLOG(0) << "fs_open_write failed, retry times reaches limit";
PADDLE_THROW(platform::errors::PreconditionNotMet(
"fs_open_write failed, retry times reaches"
" limit ",
retry_times_));
// PADDLE_THROW(platform::errors::PreconditionNotMet(
// "fs_open_write failed, retry times reaches"
// " limit ",
// retry_times_));
}
} else {
break;
......@@ -143,9 +143,9 @@ void HdfsStore::wait(const std::vector<std::string>& keys,
break;
}
}
PADDLE_THROW(platform::errors::ExecutionTimeout(
"TIMEOUT self_rank = %d pair_rank = %d", self_rank_,
last_check_rank));
// PADDLE_THROW(platform::errors::ExecutionTimeout(
VLOG(0) << "TIMEOUT self_rank = " << self_rank_
<< " pair_rank = " << last_check_rank;
}
std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_));
}
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include <algorithm>
#include <utility>
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/io/fs.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/timer.h"
#ifdef PADDLE_WITH_PSLIB
namespace paddle {
namespace framework {
std::shared_ptr<HeterWrapper> HeterWrapper::s_instance_ = NULL;
bool HeterWrapper::is_initialized_ = false;
void HeterWrapper::CreateClient2XpuConnection() {
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connection_type = "single";
options.timeout_ms = 2000000;
xpu_channels_.resize(xpu_list_.size());
for (size_t i = 0; i < xpu_list_.size(); ++i) {
VLOG(3) << "channel init: " << xpu_list_[i];
xpu_channels_[i].reset(new brpc::Channel());
if (xpu_channels_[i]->Init(xpu_list_[i].c_str(), "", &options) != 0) {
VLOG(0) << "server channel init fail";
}
}
}
void HeterWrapper::RegisterServiceHandler(int cmd, HeterServiceHandler func) {
service_.RegisterServiceHandler(cmd, func);
}
void HeterWrapper::SetXpuList(const std::vector<std::string>& xpu_list) {
#ifdef PADDLE_WITH_PSLIB
VLOG(3) << "Going to set xpu list";
for (auto& x : xpu_list) {
xpu_list_.push_back(x);
VLOG(3) << "set xpu list: " << x << " size: " << xpu_list_.size();
}
#endif
}
void HeterWrapper::StartXpuService(const std::string& ip, uint32_t port) {
std::string ip_port = ip + ":" + std::to_string(port);
VLOG(3) << "xpu server starts at " << ip_port;
server_.AddService(&service_, brpc::SERVER_DOESNT_OWN_SERVICE);
brpc::ServerOptions options;
if (server_.Start(ip_port.c_str(), &options) != 0) {
VLOG(0) << "xpu server start fail";
}
}
// void HeterWrapper::SerializeToReq(const std::string& varname,
// Scope* scope, HeterRequest& request) {
// auto* req_var = request.mutable_vars();
void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope,
VariableMessage* req_var) {
Variable* var = scope->FindVar(varname);
if (var == nullptr) {
return;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
req_var->set_varname(varname);
req_var->set_type(LOD_TENSOR);
req_var->set_data_type(static_cast<VariableMessage::Type>(tensor->type()));
for (auto& dim : framework::vectorize(tensor->dims())) {
req_var->add_dims(dim);
}
const framework::LoD lod = tensor->lod();
if (lod.size() > 0) {
req_var->set_lod_level(lod.size());
for (auto& each : lod) {
VariableMessage::LodData* lod_inner = req_var->add_lod();
for (auto& d : each) {
lod_inner->add_lod_data(d);
}
}
}
auto* req_data = req_var->mutable_data();
req_data->clear();
req_data->resize(tensor->numel() * SizeOfType(tensor->type()));
char* data_ptr = const_cast<char*>(req_data->data());
if (platform::is_cpu_place(tensor->place())) {
memcpy(data_ptr, tensor->data<void>(),
tensor->numel() * SizeOfType(tensor->type()));
}
#ifdef PADDLE_WITH_CUDA
else {
memory::Copy(platform::CPUPlace(), data_ptr,
BOOST_GET_CONST(platform::CUDAPlace, tensor->place()),
tensor->data<void>(),
tensor->numel() * SizeOfType(tensor->type()), nullptr);
}
#endif
}
// void HeterWrapper::DeSerializeToTensor(Scope* scope,
// const HeterRequest* request) {
#ifdef PADDLE_WITH_CUDA
void HeterWrapper::DeSerializeToTensor(Scope* scope,
const VariableMessage& req_var,
platform::Place place,
cudaStream_t stream) {
#else
void HeterWrapper::DeSerializeToTensor(Scope* scope,
const VariableMessage& req_var,
platform::Place place) {
#endif
// const VariableMessage& req_var = request->vars();
auto* var = scope->FindVar(req_var.varname());
auto* tensor = var->GetMutable<LoDTensor>();
std::vector<int> vec_dim;
for (auto& x : req_var.dims()) {
vec_dim.push_back(x);
}
tensor->Resize(make_ddim(vec_dim));
LoD lod;
for (int i = 0; i < req_var.lod_level(); ++i) {
framework::Vector<size_t> v;
for (int j = 0; j < req_var.lod(i).lod_data_size(); ++j) {
v.push_back(req_var.lod(i).lod_data(j));
}
lod.push_back(v);
}
tensor->set_lod(lod);
void* tensor_data =
tensor->mutable_data(place, ToVarType(req_var.data_type()));
#ifdef PADDLE_WITH_CUDA
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place), tensor_data,
platform::CPUPlace(), req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type()), stream);
#else
memcpy(tensor_data, req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type()));
#endif
}
framework::proto::VarType::Type HeterWrapper::ToVarType(
VariableMessage::Type type) {
switch (type) {
case VariableMessage::FP32:
return framework::proto::VarType::FP32; // NOLINT
case VariableMessage::FP64:
return framework::proto::VarType::FP64; // NOLINT
case VariableMessage::INT32:
return framework::proto::VarType::INT32; // NOLINT
case VariableMessage::INT64:
return framework::proto::VarType::INT64; // NOLINT
case VariableMessage::BOOL:
return framework::proto::VarType::BOOL; // NOLINT
default:
VLOG(0) << "Not support type " << type;
}
}
void HeterWrapper::StopXpuService(int num) {
HeterRequest request;
HeterResponse response;
brpc::Controller cntl;
request.set_cmd(2);
// for (size_t i = 0; i < xpu_channels_.size(); ++i) {
HeterService_Stub stub(xpu_channels_[num].get());
stub.service(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
VLOG(0) << "call stop xpu service fail: " << cntl.ErrorText();
} else {
VLOG(3) << "call stop xpu service success";
}
// }
}
void HeterWrapper::EndPass(Scope* scope, int num) {
HeterRequest request;
HeterResponse response;
brpc::Controller cntl;
request.set_cmd(1);
// for (size_t i = 0; i < xpu_channels_.size(); ++i) {
HeterService_Stub stub(xpu_channels_[num].get());
stub.service(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
VLOG(0) << "call end pass fail: " << cntl.ErrorText();
} else {
VLOG(3) << "call end pass success";
for (int j = 0; j < response.vars_size(); ++j) {
DeSerializeToTensor(scope, response.vars(j), platform::CPUPlace());
}
}
// }
}
void HeterWrapper::CallRemoteXpu(std::shared_ptr<HeterTask> task,
HeterCpuWorker* worker, int mpi_rank,
std::vector<std::string>& send_vars) {
HeterRequest request;
request.set_cmd(0);
request.set_cur_batch(task->cur_batch_);
OnHeterRpcDone* done = new OnHeterRpcDone([this, task, worker](void* done) {
auto* closure = (OnHeterRpcDone*)done;
if (closure->cntl.Failed()) {
VLOG(0) << "call xpu fail: " << closure->cntl.ErrorText();
} else {
VLOG(3) << "call xpu success";
}
// DeSerializeToTensor(task->scope_,
// closure->response.vars(), platform::CPUPlace());
for (int i = 0; i < closure->response.vars_size(); ++i) {
DeSerializeToTensor(task->scope_, closure->response.vars(i),
platform::CPUPlace());
}
worker->Schedule(task->taskid_);
});
// std::vector<std::string> varnames = {"click", "12345"};
// //varnames.push_back(send_var);
// //if (send_var == "_generated_var_412") {
// varnames.push_back("filter_by_instag_0.tmp_0");
// varnames.push_back("filter_by_instag_2.tmp_0");
// varnames.push_back("filter_by_instag_0.tmp_1");
// varnames.push_back("concat_1.tmp_0");
// }
for (auto& varname : send_vars) {
auto* req_var = request.add_vars();
SerializeToReq(varname, task->scope_, req_var);
}
int num = mpi_rank % xpu_channels_.size();
HeterService_Stub stub(xpu_channels_[num].get());
// stub.service(&cntl, &request, &response,
// brpc::NewCallback(&HeterWrapper::RpcCallBack,
// response, cntl, worker, task));
stub.service(&done->cntl, &request, &done->response, done);
}
void HeterWrapper::CallRemoteXpuSync(std::shared_ptr<HeterTask> task,
HeterCpuWorker* worker, int mpi_rank,
std::vector<std::string>& send_vars) {
HeterRequest request;
HeterResponse response;
brpc::Controller cntl;
request.set_cmd(0);
request.set_cur_batch(task->cur_batch_);
// std::vector<std::string> varnames = {"concat_1.tmp_0", "click", "12345"};
for (auto& varname : send_vars) {
auto* req_var = request.add_vars();
SerializeToReq(varname, task->scope_, req_var);
}
HeterService_Stub stub(xpu_channels_[0].get());
stub.service(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
VLOG(0) << "call xpu fail: " << cntl.ErrorText();
} else {
VLOG(3) << "call xpu success";
for (int i = 0; i < response.vars_size(); ++i) {
DeSerializeToTensor(task->scope_, response.vars(i), platform::CPUPlace());
}
}
}
} // end namespace framework
} // end namespace paddle
#endif
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <atomic>
#include <ctime>
#include <map>
#include <memory>
#include <random>
#include <string>
#include <unordered_map>
#include <vector>
#ifdef PADDLE_WITH_PSLIB
#include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN
namespace paddle {
namespace framework {
class HeterCpuWorker;
typedef std::function<void(void*)> HeterRpcCallbackFunc;
class OnHeterRpcDone : public google::protobuf::Closure {
public:
OnHeterRpcDone(HeterRpcCallbackFunc func) : handler_(func) {}
virtual ~OnHeterRpcDone() {}
void Run() {
std::unique_ptr<OnHeterRpcDone> self_guard(this);
handler_(this);
}
HeterRpcCallbackFunc handler_;
HeterResponse response;
brpc::Controller cntl;
};
class HeterWrapper {
public:
virtual ~HeterWrapper() {
server_.Stop(1000);
server_.Join();
}
HeterWrapper() {}
static void HeterRpcCallBack(HeterResponse* response, brpc::Controller* cntl,
HeterCpuWorker* worker,
std::shared_ptr<HeterTask> task);
void CreateClient2XpuConnection();
void RegisterServiceHandler(int cmd, HeterServiceHandler func);
void StartXpuService(const std::string& ip, uint32_t port);
void CallRemoteXpu(std::shared_ptr<HeterTask> task, HeterCpuWorker* worker,
int mpi_rank, std::vector<std::string>& send_vars);
void CallRemoteXpuSync(std::shared_ptr<HeterTask> task,
HeterCpuWorker* worker, int mpi_rank,
std::vector<std::string>& send_vars);
void StopXpuService(int num);
void EndPass(Scope* scope, int num);
void SerializeToReq(const std::string& varname, Scope* scope,
VariableMessage* req_var);
framework::proto::VarType::Type ToVarType(VariableMessage::Type type);
#ifdef PADDLE_WITH_CUDA
void DeSerializeToTensor(Scope* scope, const VariableMessage& req_var,
platform::Place place,
cudaStream_t stream = nullptr);
#else
void DeSerializeToTensor(Scope* scope, const VariableMessage& req_var,
platform::Place place);
#endif
// HeterWrapper singleton
static std::shared_ptr<HeterWrapper> GetInstance() {
if (NULL == s_instance_) {
s_instance_.reset(new paddle::framework::HeterWrapper());
}
return s_instance_;
}
std::vector<std::string>& GetXpuList() { return xpu_list_; }
void SetXpuList(const std::vector<std::string>& xpu_list);
private:
static std::shared_ptr<HeterWrapper> s_instance_;
protected:
std::vector<std::shared_ptr<brpc::Channel>> xpu_channels_;
brpc::Server server_;
HeterXpuService service_;
static bool is_initialized_;
DISABLE_COPY_AND_ASSIGN(HeterWrapper);
std::vector<std::string> xpu_list_;
};
} // end namespace framework
} // end namespace paddle
#endif
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <fstream>
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <thread> // NOLINT
#include <unordered_map> // NOLINT
#include <unordered_set> // NOLINT
#include <vector>
#include "paddle/fluid/framework/heter_service.pb.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#ifdef PADDLE_WITH_PSLIB
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/server.h"
namespace paddle {
namespace framework {
typedef std::function<int(const HeterRequest*, HeterResponse*)>
HeterServiceHandler;
class DataFeed;
class HeterXpuService : public HeterService {
public:
HeterXpuService() {}
virtual ~HeterXpuService() {}
void service(::google::protobuf::RpcController* controller,
const HeterRequest* request, HeterResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
int ret = 0;
int cmd = request->cmd();
auto itr = handler_map_.find(cmd);
if (itr == handler_map_.end()) {
} else {
ret = itr->second(request, response);
}
// response->set_err_code(0);
// response->set_err_msg("");
if (ret != 0) {
// response->set_err_code(-1);
// response->set_err_msg("xpu service error");
}
}
void RegisterServiceHandler(int cmd, HeterServiceHandler func) {
VLOG(0) << "register heter service";
handler_map_[cmd] = func;
}
private:
std::unordered_map<int, HeterServiceHandler> handler_map_;
};
enum HeterTaskState { PULL_SPARSE, OP_RUN, XPU, OP_RUN_END, PUSH_GRAD, DONE };
class HeterTask {
public:
void Update() {
if (state_ == PULL_SPARSE) {
state_ = OP_RUN;
} else if (state_ == OP_RUN) {
state_ = XPU;
// state_ = PUSH_GRAD;
// state_ = PUSH_GRAD;
} else if (state_ == XPU) {
state_ = OP_RUN_END;
} else if (state_ == OP_RUN_END) {
state_ = PUSH_GRAD;
} else if (state_ == PUSH_GRAD) {
state_ = DONE;
}
}
void Reset() {
total_time = 0;
read_time = 0;
pack_time = 0;
pull_sparse_local_time = 0;
op_all_time = 0;
xpu_op_time = 0;
xpu_wait_time = 0;
cpu_op_time = 0;
collect_label_time = 0;
fill_sparse_time = 0;
push_sparse_time = 0;
}
void Show() {
std::cout << "features size " << features_.size() << std::endl;
for (size_t i = 0; i < features_.size(); ++i) {
std::cout << "features[" << i << "] size " << features_[i].size()
<< std::endl;
}
}
void PackTask(Scope* scope, int taskid, DataFeed* reader, int cur_batch,
const ProgramDesc& program);
Scope* scope_{nullptr};
int taskid_;
int cur_batch_;
HeterTaskState state_;
// cache
std::map<uint64_t, std::vector<uint64_t>> features_;
std::map<uint64_t, std::vector<float>> feature_labels_;
std::map<uint64_t, std::vector<std::vector<float>>> feature_values_;
std::map<uint64_t, std::vector<std::vector<float>>> feature_grads_;
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
double total_time{0};
double read_time{0};
double pack_time{0};
double pull_sparse_local_time{0};
double op_all_time{0};
double xpu_op_time{0};
double xpu_wait_time{0};
double cpu_op_time{0};
double collect_label_time{0};
double fill_sparse_time{0};
double push_sparse_time{0};
};
template <class T>
class HeterObjectPool {
public:
HeterObjectPool() {}
virtual ~HeterObjectPool(){};
std::shared_ptr<T> Get() {
std::lock_guard<std::mutex> lock(mutex_);
if (pool_.empty()) {
num_ += 1;
#ifdef PADDLE_WITH_CUDA
VLOG(0) << "pool construct size: " << num_;
#endif
return std::make_shared<T>();
} else {
auto ret = pool_.back();
pool_.pop_back();
return ret;
}
}
void Push(std::shared_ptr<T> data) {
std::lock_guard<std::mutex> lock(mutex_);
pool_.push_back(std::move(data));
}
int Size() {
std::lock_guard<std::mutex> lock(mutex_);
return pool_.size();
}
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }
private:
std::vector<std::shared_ptr<T>> pool_;
std::mutex mutex_;
int num_{0};
};
struct BthreadMutextGuard {
BthreadMutextGuard(bthread_mutex_t* rho) {
mutex_ = rho;
bthread_mutex_lock(mutex_);
}
~BthreadMutextGuard() { bthread_mutex_unlock(mutex_); }
bthread_mutex_t* mutex_;
};
template <class T>
class BtObjectPool {
public:
BtObjectPool() {
bthread_mutex_init(&mutex_, NULL);
bthread_cond_init(&cond_, NULL);
}
virtual ~BtObjectPool() {
bthread_cond_destroy(&cond_);
bthread_mutex_destroy(&mutex_);
};
std::shared_ptr<T> Get() {
BthreadMutextGuard guard(&mutex_);
while (pool_.empty()) {
bthread_cond_wait(&cond_, &mutex_);
}
auto ret = pool_.back();
pool_.pop_back();
return ret;
}
void Push(std::shared_ptr<T> data) {
BthreadMutextGuard guard(&mutex_);
pool_.push_back(std::move(data));
bthread_cond_signal(&cond_);
}
int Size() { return pool_.size(); }
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }
private:
std::vector<std::shared_ptr<T>> pool_;
bthread_mutex_t mutex_;
bthread_cond_t cond_;
int num_{0};
};
template <class K, class T>
struct HeterNode {
K key;
T value;
HeterNode* prev;
HeterNode* next;
};
template <class K, class T>
class HeterList {
public:
HeterList() : head_(new HeterNode<K, T>), tail_(new HeterNode<K, T>) {
head_->prev = NULL;
head_->next = tail_;
tail_->prev = head_;
tail_->next = NULL;
size = 0;
cap_ = 1e9;
}
~HeterList() {
delete head_;
delete tail_;
}
void SetCap(int num) { cap_ = num; }
bool TryPut(K& key, T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
if (task_map_.find(key) != task_map_.end()) {
// std::cout << "try put key=" << key << " false" << std::endl;
task_map_.erase(key);
return false;
} else {
HeterNode<K, T>* node = new HeterNode<K, T>;
node->key = key;
node->value = value;
map_[node->key] = node;
attach(node);
// std::cout << "try put key=" << key << " true" << std::endl;
return true;
}
}
bool Put(K& key, T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return size < cap_; });
HeterNode<K, T>* node = new HeterNode<K, T>;
// std::cout << "put key=" << key << " true" << std::endl;
node->key = key;
node->value = value;
map_[node->key] = node;
attach(node);
return true;
}
T TryGet(const K& key) {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
// std::cout << "try get key=" << key << " true" << std::endl;
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(key);
delete node;
return ret;
}
task_map_.insert(key);
// std::cout << "try get key=" << key << " false" << std::endl;
return nullptr;
}
T Get(const K& key) {
std::lock_guard<std::mutex> lock(mutex_);
auto iter = map_.find(key);
if (iter != map_.end()) {
// std::cout << "get key=" << key << " true" << std::endl;
HeterNode<K, T>* node = iter->second;
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(key);
delete node;
return ret;
}
// std::cout << "get key=" << key << " false" << std::endl;
return nullptr;
}
T Get() {
std::lock_guard<std::mutex> lock(mutex_);
HeterNode<K, T>* node = head_->next;
if (node == tail_) {
// std::cout << "get2 false" << std::endl;
return nullptr;
} else {
detach(node);
cond_.notify_one();
T ret = std::move(node->value);
map_.erase(node->key);
// std::cout << "get2 key=" << node->key << " true" << std::endl;
delete node;
return ret;
}
}
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return head_->next == tail_;
}
int Size() {
std::lock_guard<std::mutex> lock(mutex_);
return size;
}
private:
void detach(HeterNode<K, T>* node) {
node->prev->next = node->next;
node->next->prev = node->prev;
size--;
}
void attach(HeterNode<K, T>* node) {
node->prev = head_;
node->next = head_->next;
head_->next->prev = node;
head_->next = node;
size++;
}
private:
HeterNode<K, T>* head_;
HeterNode<K, T>* tail_;
std::unordered_map<K, HeterNode<K, T>*> map_;
std::unordered_set<K> task_map_;
std::mutex mutex_;
std::condition_variable cond_;
int cap_;
int size;
};
} // namespace framework
} // namespace paddle
#endif
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
syntax = "proto2";
package paddle.framework;
option cc_generic_services = true;
// It can be: LoDTensor、SelectedRows or NCCL_ID
enum VarType {
LOD_TENSOR = 0;
SELECTED_ROWS = 1;
NCCL_ID = 2;
}
// VariableMessage is serialized paddle variable message.
// NOTICE(gongwb):don't modify this proto if you are not
// not familar with how we serialize in sendrecvop_utils.h
// and deserilize it in variable_response.h.
message VariableMessage {
enum Type {
// Pod Types
BOOL = 0;
INT16 = 1;
INT32 = 2;
INT64 = 3;
FP16 = 4;
FP32 = 5;
FP64 = 6;
}
message LodData { repeated int64 lod_data = 1; }
optional string varname = 1;
// TODO(Yancey1989): reference framework::proto::VarDesc::VarType
optional VarType type = 2;
// bool persistable is not needed for sending.
// tensor info:
optional Type data_type = 3;
repeated int64 dims = 4;
// lod details:
optional int64 lod_level = 5;
repeated LodData lod = 6;
// selected_rows height, aka. original dim0
optional int64 slr_height = 7;
// tensor data
optional bytes data = 8;
}
message HeterRequest {
required int32 cmd = 1;
optional int32 cur_batch = 2;
repeated VariableMessage vars = 3;
};
message HeterResponse {
// optional VariableMessage vars = 1;
repeated VariableMessage vars = 1;
};
service HeterService { rpc service(HeterRequest) returns (HeterResponse); };
此差异已折叠。
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <cstdlib>
#include <ctime>
#include <string>
#include <vector>
#include "io/fs.h"
#include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/trainer.h"
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB)
#include "paddle/fluid/platform/cuda_device_guard.h"
namespace paddle {
namespace framework {
void HeterXpuTrainer::Initialize(const TrainerDesc& trainer_desc,
Dataset* dataset) {
srand((unsigned)time(NULL));
param_ = trainer_desc.downpour_param();
for (int i = 0; i < param_.dense_table_size(); ++i) {
uint64_t table_id = static_cast<uint64_t>(param_.dense_table(i).table_id());
auto table = param_.dense_table(i);
dense_grad_names_[table_id].resize(table.dense_grad_name_size());
for (int j = 0; j < table.dense_grad_name_size(); ++j) {
dense_grad_names_[table_id][j] = table.dense_grad_name(j);
}
}
scale_datanorm_ = trainer_desc.scale_datanorm();
int place_num = trainer_desc.worker_places_size();
for (int i = 0; i < place_num; ++i) {
int num = trainer_desc.worker_places(i);
platform::CUDAPlace place = platform::CUDAPlace(num);
platform::CUDADeviceGuard guard(place.device);
cudaStream_t stream;
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&stream));
copy_streams_.push_back(stream);
places_.push_back(place);
cudaEvent_t event;
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
events_.push_back(event);
}
// thread_num_ = trainer_desc.thread_num();
// SetDataset(dataset);
// dump_fields_path_ = trainer_desc.dump_fields_path();
// dump_converter_ = trainer_desc.dump_converter();
// need_dump_field_ = false;
// if (trainer_desc.dump_fields_size() != 0 && dump_fields_path_ != "") {
// need_dump_field_ = true;
// }
// if (need_dump_field_) {
// auto &file_list = dataset->GetFileList();
// if (file_list.size() == 0) {
// need_dump_field_ = false;
// }
// }
// mpi_rank_ = trainer_desc.mpi_rank();
// mpi_size_ = trainer_desc.mpi_size();
// dump_file_num_ = trainer_desc.dump_file_num();
// const std::vector<paddle::framework::DataFeed *> readers =
// dataset->GetReaders();
// thread_num_ = readers.size();
for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size();
i++) {
need_merge_var_names_.push_back(
trainer_desc.downpour_param().stat_var_names(i));
}
running_ = true;
VLOG(3) << "going to initialize pull dense worker";
pull_dense_worker_ = PullDenseWorker::GetInstance();
pull_dense_worker_->Initialize(trainer_desc);
VLOG(3) << "initialize pull dense worker";
SetDebug(trainer_desc.debug());
fleet_ptr_ = FleetWrapper::GetInstance();
heter_ptr_ = HeterWrapper::GetInstance();
RegisterServiceHandler();
// for (int i = 0; i < trainer_desc.worker_places_size(); ++i) {
// int num = trainer_desc.worker_places(i);
// platform::CUDAPlace place = platform::CUDAPlace(num);
// platform::CUDADeviceGuard guard(place.device);
// cudaStream_t stream;
// PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&stream));
// copy_streams_.push_back(stream);
// places_.push_back(place);
// }
trainer_desc_ = trainer_desc;
}
void HeterXpuTrainer::CreateThreadParam(const ProgramDesc& program, int num) {
auto place = places_[num];
Scope* scope = place_scopes_[num];
auto stream = copy_streams_[num];
auto event = events_[num];
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id);
auto& block = program.Block(0);
for (auto& var : block.AllVars()) {
if (var->Persistable()) {
auto name = var->Name();
Variable* root_var = root_scope_->FindVar(name);
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
auto* ptr = scope->Var(name);
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
LoDTensor* thread_tensor = ptr->GetMutable<LoDTensor>();
#define HeterMemcpyFunc(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
HeterMemCpy<cpp_type>(thread_tensor, root_tensor, place, stream); \
} \
} while (0)
_ForEachDataType_(HeterMemcpyFunc);
}
}
PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream));
cudaEventSynchronize(event);
}
template <typename T>
void HeterXpuTrainer::HeterMemCpy(LoDTensor* thread_tensor,
LoDTensor* root_tensor,
const paddle::platform::Place& thread_place,
cudaStream_t stream) {
T* thread_ptr =
thread_tensor->mutable_data<T>(root_tensor->dims(), thread_place);
T* root_ptr = root_tensor->data<T>();
if (platform::is_cpu_place(root_tensor->place())) {
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, thread_place), thread_ptr,
platform::CPUPlace(), root_ptr,
sizeof(T) * root_tensor->numel(), stream);
} else {
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, thread_place), thread_ptr,
BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()),
root_ptr, sizeof(T) * root_tensor->numel(), stream);
}
}
void HeterXpuTrainer::DumpWork(int tid) {}
void HeterXpuTrainer::InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place) {
CacheProgram(main_program);
place_ = place;
auto& profiler = paddle::ps::CostProfiler::instance();
profiler.register_profiler("xpu_service_run_task");
profiler.register_profiler("xpu_service_deserial");
profiler.register_profiler("xpu_service_launch_kernel");
profiler.register_profiler("xpu_service_wait");
}
void HeterXpuTrainer::InitOtherEnv(const ProgramDesc& main_program) {
auto& block = main_program.Block(0);
pull_dense_worker_->SetRootScope(root_scope_);
pull_dense_worker_->CreatePinVar();
for (size_t i = 0; i < places_.size(); ++i) {
Scope* scope = &(root_scope_->NewScope());
// for (auto &var : block.AllVars()) {
// if (var->Persistable()) {
// auto *ptr = scope->Var(var->Name());
// InitializeVariable(ptr, var->GetType());
// }
// }
place_scopes_.push_back(scope);
CreateThreadParam(main_program, i);
pull_dense_worker_->AddThreadScope(scope);
pull_dense_worker_->AddPlace(places_[i]);
pull_dense_worker_->AddStream(copy_streams_[i]);
}
pull_dense_worker_->Start();
for (auto& stream : copy_streams_) {
cudaStreamSynchronize(stream);
}
op_names_.clear();
for (auto& op_desc : block.AllOps()) {
std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc);
op_names_.push_back(op_desc->Type());
OperatorBase* local_op_ptr = local_op.release();
ops_.push_back(local_op_ptr);
continue;
}
xpu_begin_op_index_ = xpu_end_op_index_ = -1;
xpu_begin_op_index_ = trainer_desc_.xpu_start_idx();
xpu_end_op_index_ = trainer_desc_.xpu_end_idx();
VLOG(0) << "xpu begin: " << xpu_begin_op_index_
<< " xpu end: " << xpu_end_op_index_;
// CHECK(xpu_begin_op_index_ == 0);
// CHECK(xpu_end_op_index_ = ops_.size() - 1);
//// init pool
for (size_t i = 0; i < 6; ++i) {
for (size_t j = 0; j < places_.size(); ++j) {
int num = j;
std::shared_ptr<HeterServiceContext> context =
std::make_shared<HeterServiceContext>();
context->place_num_ = num;
auto place = places_[num];
context->scope_ = &(place_scopes_[num]->NewScope());
auto& block = program_.Block(0);
for (auto& var : block.AllVars()) {
if (!var->Persistable()) {
auto* ptr = context->scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
}
}
for (auto& v : dense_grad_names_) {
for (auto& name : v.second) {
auto* ptr = context->scope_->Var(name + "pin");
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
}
}
for (auto& op_desc : block.AllOps()) {
std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc);
OperatorBase* local_op_ptr = local_op.release();
(context->ops_).push_back(local_op_ptr);
}
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id);
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming));
object_pool_.Push(context);
}
}
VLOG(3) << "init other env done.";
}
void HeterXpuTrainer::Run() {}
int HeterXpuTrainer::EndPass(const HeterRequest* request,
HeterResponse* response) {
// int scope_num = object_pool_.Size();
for (size_t i = 0; i < need_merge_var_names_.size(); i++) {
Variable* root_var = root_scope_->FindVar(need_merge_var_names_[i]);
if (root_var == nullptr) {
continue;
}
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
for (size_t j = 0; j < place_scopes_.size(); j++) {
Scope* cur_thread_scope = place_scopes_[j];
Variable* thread_var =
cur_thread_scope->FindVar(need_merge_var_names_[i]);
if (thread_var == nullptr) {
continue;
}
LoDTensor* thread_tensor = thread_var->GetMutable<LoDTensor>();
// if (root_tensor->numel() != thread_tensor->numel()) {
// continue;
// }
#define MergeCallback(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
if (thread_tensor->type() != proto_type) { \
VLOG(0) << "Error: thread id=" << j << ", need_merge_var_names_[" << i \
<< "] " << need_merge_var_names_[i] \
<< ", root tensor type=" << root_tensor->type() \
<< ", thread tensor type=" << thread_tensor->type(); \
exit(-1); \
} \
MergeToRootScope<cpp_type>(root_tensor, thread_tensor); \
} \
} while (0)
_ForEachDataType_(MergeCallback);
if (platform::is_gpu_place(thread_tensor->place())) {
auto dev_id =
BOOST_GET_CONST(platform::CUDAPlace, thread_tensor->place()).device;
platform::CUDADeviceGuard guard(dev_id);
cudaMemset(thread_tensor->data<void>(), 0,
thread_tensor->numel() * SizeOfType(thread_tensor->type()));
} else {
memset(thread_tensor->data<void>(), 0,
thread_tensor->numel() * SizeOfType(thread_tensor->type()));
}
}
auto* merge_var = response->add_vars();
heter_ptr_->SerializeToReq(need_merge_var_names_[i], root_scope_,
merge_var);
if (platform::is_gpu_place(root_tensor->place())) {
auto dev_id =
BOOST_GET_CONST(platform::CUDAPlace, root_tensor->place()).device;
platform::CUDADeviceGuard guard(dev_id);
cudaMemset(root_tensor->data<void>(), 0,
root_tensor->numel() * SizeOfType(root_tensor->type()));
} else {
memset(root_tensor->data<void>(), 0,
root_tensor->numel() * SizeOfType(root_tensor->type()));
}
}
return 0;
}
template <typename T>
void HeterXpuTrainer::MergeToRootScope(LoDTensor* root_tensor,
LoDTensor* tensor) {
LoDTensor tmp_root;
TensorCopy(*root_tensor, platform::CPUPlace(), &tmp_root);
T* tmp_root_data = tmp_root.data<T>();
LoDTensor tmp_tensor;
TensorCopy(*tensor, platform::CPUPlace(), &tmp_tensor);
T* data = tmp_tensor.data<T>();
for (int i = 0; i < tmp_tensor.numel(); i++) {
tmp_root_data[i] += data[i];
}
TensorCopy(tmp_root, root_tensor->place(), root_tensor);
}
int HeterXpuTrainer::StopService(const HeterRequest* request,
HeterResponse* response) {
std::unique_lock<std::mutex> lock(mutex_);
running_ = false;
cond_.notify_one();
return 0;
}
int HeterXpuTrainer::RunTask(const HeterRequest* request,
HeterResponse* response) {
auto timer = std::make_shared<paddle::ps::CostTimer>("xpu_service_run_task");
std::shared_ptr<HeterServiceContext> context = object_pool_.Get();
if (!context->scope_) {
int num = rand() % places_.size();
context->place_num_ = num;
auto place = places_[num];
context->scope_ = &(place_scopes_[num]->NewScope());
auto& block = program_.Block(0);
for (auto& var : block.AllVars()) {
if (!var->Persistable()) {
auto* ptr = context->scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
}
}
for (auto& v : dense_grad_names_) {
for (auto& name : v.second) {
auto* ptr = context->scope_->Var(name + "pin");
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
}
}
for (auto& op_desc : block.AllOps()) {
std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc);
OperatorBase* local_op_ptr = local_op.release();
(context->ops_).push_back(local_op_ptr);
}
auto dev_id = BOOST_GET_CONST(platform::CUDAPlace, place).device;
platform::CUDADeviceGuard guard(dev_id);
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&context->event_, cudaEventDisableTiming));
}
context->Reset();
auto place = places_[context->place_num_];
{
auto deserial_timer =
std::make_shared<paddle::ps::CostTimer>("xpu_service_deserial");
for (int i = 0; i < request->vars_size(); ++i) {
heter_ptr_->DeSerializeToTensor(context->scope_, request->vars(i), place,
copy_streams_[context->place_num_]);
}
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(context->event_, copy_streams_[context->place_num_]));
while (cudaEventQuery(context->event_) != cudaSuccess) {
VLOG(3) << "wait for kernel";
bthread_yield();
}
}
{
auto launch_timer =
std::make_shared<paddle::ps::CostTimer>("xpu_service_launch_kernel");
for (int i = xpu_begin_op_index_; i <= xpu_end_op_index_; ++i) {
auto& op = (context->ops_)[i];
op->Run(*(context->scope_), place);
}
}
auto* dev_ctx = static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(context->event_, dev_ctx->stream()));
// cudaEventSynchronize(context->event_);
{
auto wait_timer =
std::make_shared<paddle::ps::CostTimer>("xpu_service_wait");
while (cudaEventQuery(context->event_) != cudaSuccess) {
VLOG(3) << "wait for kernel";
bthread_yield();
}
}
for (int i = 0; i < trainer_desc_.xpu_send_list_size(); ++i) {
const std::string& varname = trainer_desc_.xpu_send_list(i);
// CHECK(varname == "concat_1.tmp_0@GRAD");
auto* res_var = response->add_vars();
heter_ptr_->SerializeToReq(varname, context->scope_, res_var);
}
// std::string varname = "concat_1.tmp_0@GRAD";
//
// auto* res_var = response->add_vars();
// heter_ptr_->SerializeToReq(varname, context->scope_, res_var);
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid =
static_cast<uint64_t>(param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync(
*(context->scope_), tid, dense_grad_names_[tid],
&(context->push_dense_status_), scale_datanorm_, request->cur_batch(),
places_[context->place_num_], copy_streams_[context->place_num_],
context->event_);
}
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid =
static_cast<uint64_t>(param_.program_config(0).push_dense_table_id(i));
pull_dense_worker_->IncreaseThreadVersion(0, tid);
}
VLOG(3) << "push dense gradient done.";
context->scope_->DropKids();
object_pool_.Push(context);
VLOG(0) << "pool size " << object_pool_.Size();
return 0;
}
void HeterXpuTrainer::RegisterServiceHandler() {
heter_ptr_->RegisterServiceHandler(
0, [this](const HeterRequest* request, HeterResponse* response) -> int {
return this->RunTask(request, response);
});
heter_ptr_->RegisterServiceHandler(
1, [this](const HeterRequest* request, HeterResponse* response) -> int {
return this->EndPass(request, response);
});
heter_ptr_->RegisterServiceHandler(
2, [this](const HeterRequest* request, HeterResponse* response) -> int {
return this->StopService(request, response);
});
}
Scope* HeterXpuTrainer::GetWorkerScope(int thread_id) { return nullptr; }
void HeterXpuTrainer::Finalize() {
// for (auto &th : threads_) {
// th.join();
// }
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return !running_; });
sleep(3);
pull_dense_worker_->Stop();
root_scope_->DropKids();
}
} // namespace framework
} // namespace paddle
#endif
......@@ -102,6 +102,7 @@ void MultiTrainer::InitTrainerEnv(const ProgramDesc& main_program,
workers_[i]->SetRootScope(root_scope_);
workers_[i]->CreateDeviceResource(main_program); // Program
workers_[i]->BindingDataFeedMemory();
workers_[i]->CacheProgram(main_program);
}
}
......
......@@ -56,6 +56,34 @@ void PullDenseWorker::Initialize(const TrainerDesc& param) {
current_version_[tid] = 0;
}
fleet_ptr_ = FleetWrapper::GetInstance();
#ifdef PADDLE_WITH_CUDA
copy_streams_.clear();
places_.clear();
thread_scopes_.clear();
#endif
}
void PullDenseWorker::CreatePinVar() {
#ifdef PADDLE_WITH_CUDA
// for (auto& v : dense_value_names_) {
// for (auto& name : v.second) {
for (int i = 0; i < dwp_param_.program_config(0).pull_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
dwp_param_.program_config(0).pull_dense_table_id(i));
for (size_t j = 0; j < dense_value_names_[tid].size(); j++) {
auto& name = dense_value_names_[tid][j];
Variable* var = root_scope_->FindVar(name);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
auto* ptr = root_scope_->Var(name + "pin");
InitializeVariable(ptr, proto::VarType::LOD_TENSOR);
LoDTensor* pin_tensor = ptr->GetMutable<LoDTensor>();
pin_tensor->mutable_data<float>(tensor->dims(),
platform::CUDAPinnedPlace());
}
}
#endif
}
void PullDenseWorker::Wait(std::vector<::std::future<int32_t>>* status_vec) {
......@@ -75,6 +103,31 @@ void PullDenseWorker::Wait(std::vector<::std::future<int32_t>>* status_vec) {
exit(-1);
}
status_vec->resize(0);
#ifdef PADDLE_WITH_CUDA
for (size_t i = 0; i < places_.size(); ++i) {
// for (auto& v : dense_value_names_) {
// for (auto& name : v.second) {
for (int x = 0; x < dwp_param_.program_config(0).pull_dense_table_id_size();
++x) {
uint64_t tid = static_cast<uint64_t>(
dwp_param_.program_config(0).pull_dense_table_id(x));
for (size_t j = 0; j < dense_value_names_[tid].size(); j++) {
auto& name = dense_value_names_[tid][j];
Variable* pin_var = root_scope_->FindVar(name + "pin");
LoDTensor* pin_tensor = pin_var->GetMutable<LoDTensor>();
float* pin_w = pin_tensor->data<float>();
Variable* var = thread_scopes_[i]->FindVar(name);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* w = tensor->data<float>();
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, places_[i]), w,
platform::CUDAPinnedPlace(), pin_w,
sizeof(float) * tensor->numel(), copy_streams_[i]);
}
}
}
#endif
}
void PullDenseWorker::Stop() {
......@@ -91,8 +144,14 @@ void PullDenseWorker::PullDense(bool force_update) {
uint64_t tid = static_cast<uint64_t>(
dwp_param_.program_config(0).pull_dense_table_id(i));
if (force_update || CheckUpdateParam(tid)) {
#ifdef PADDLE_WITH_CUDA
VLOG(3) << "pull dense " << force_update << " " << tid;
fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid],
&pull_dense_status_);
&pull_dense_status_, false);
#else
fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid],
&pull_dense_status_, true);
#endif
ResetThreadVersion(tid);
}
}
......
......@@ -21,9 +21,12 @@ limitations under the License. */
#include <thread> // NOLINT
#include <vector>
#include <ctime>
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include "paddle/fluid/framework/heter_service.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/reader.h"
......@@ -62,6 +65,7 @@ class TrainerBase {
Scope* root_scope_;
bool debug_;
Dataset* dataset_ptr_;
TrainerDesc trainer_desc_;
// For dump param or field
bool need_dump_field_ = false;
......@@ -118,10 +122,86 @@ class DistMultiTrainer : public MultiTrainer {
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
virtual void InitDumpEnv();
virtual Scope* GetWorkerScope(int thread_id);
virtual void RegisterHeterCallback();
protected:
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
};
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB)
class HeterServiceContext {
public:
HeterServiceContext() {}
virtual ~HeterServiceContext() {
for (OperatorBase* op : ops_) {
delete op;
}
std::vector<OperatorBase*>().swap(ops_);
}
void Reset() { push_dense_status_.clear(); }
int place_num_;
Scope* scope_{nullptr};
cudaEvent_t event_;
std::vector<OperatorBase*> ops_;
std::vector<::std::future<int32_t>> push_dense_status_;
};
class HeterXpuTrainer : public TrainerBase {
public:
HeterXpuTrainer() {}
virtual ~HeterXpuTrainer() {
for (OperatorBase* op : ops_) {
delete op;
}
std::vector<OperatorBase*>().swap(ops_);
}
virtual void Initialize(const TrainerDesc& trainer_desc, Dataset* data_set);
virtual void InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place);
virtual void InitOtherEnv(const ProgramDesc& main_program);
virtual void Run();
virtual void Finalize();
virtual void DumpWork(int tid);
virtual void RegisterServiceHandler();
virtual int RunTask(const HeterRequest* request, HeterResponse* response);
virtual Scope* GetWorkerScope(int thread_id);
virtual void CacheProgram(const ProgramDesc& main_program) {
new (&program_) ProgramDesc(main_program);
}
template <typename T>
void HeterMemCpy(LoDTensor* tensor, LoDTensor* root_tensor,
const paddle::platform::Place& thread_place,
cudaStream_t stream);
void CreateThreadParam(const ProgramDesc& program, int num);
template <typename T>
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
int EndPass(const HeterRequest* request, HeterResponse* response);
int StopService(const HeterRequest* request, HeterResponse* response);
protected:
DownpourWorkerParameter param_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
std::vector<std::string> need_merge_var_names_;
float scale_datanorm_;
int xpu_begin_op_index_;
int xpu_end_op_index_;
bool running_;
paddle::platform::Place place_;
std::mutex mutex_;
ProgramDesc program_;
std::condition_variable cond_;
std::shared_ptr<paddle::framework::FleetWrapper> fleet_ptr_;
std::shared_ptr<paddle::framework::HeterWrapper> heter_ptr_;
std::shared_ptr<paddle::framework::PullDenseWorker> pull_dense_worker_;
std::vector<OperatorBase*> ops_;
std::vector<std::string> op_names_;
std::vector<Scope*> place_scopes_;
BtObjectPool<HeterServiceContext> object_pool_;
std::vector<cudaStream_t> copy_streams_;
std::vector<platform::Place> places_;
std::vector<cudaEvent_t> events_;
};
#endif
#if defined(PADDLE_WITH_NCCL)
class PipelineTrainer : public TrainerBase {
......
......@@ -52,6 +52,12 @@ message TrainerDesc {
optional bool enable_random_dump = 24 [ default = false ];
optional bool random_with_lineid = 25 [ default = false ];
optional int32 dump_interval = 26 [ default = 10000 ];
repeated int32 worker_places = 27;
repeated string xpu_send_list = 28;
repeated string xpu_recv_list = 29;
optional int32 xpu_start_idx = 30;
optional int32 xpu_end_idx = 31;
// device worker parameters
optional HogwildWorkerParameter hogwild_param = 101;
......
......@@ -63,6 +63,9 @@ std::shared_ptr<TrainerBase> TrainerFactory::CreateTrainer(
REGISTER_TRAINER_CLASS(MultiTrainer);
REGISTER_TRAINER_CLASS(DistMultiTrainer);
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB)
REGISTER_TRAINER_CLASS(HeterXpuTrainer);
#endif
#if defined(PADDLE_WITH_NCCL)
REGISTER_TRAINER_CLASS(PipelineTrainer);
#endif
......
......@@ -241,6 +241,156 @@ class Flatten2GradOp : public framework::OperatorWithKernel {
}
};
class FlattenContiguousRangeOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {
OP_INOUT_CHECK(ctx->HasInput("X"), "Input", "X", "FlattenContiguousRange");
OP_INOUT_CHECK(ctx->HasOutput("Out"), "Output", "Out",
"FlattenContiguousRange");
const auto &start_axis = ctx->Attrs().Get<int>("start_axis");
const auto &stop_axis = ctx->Attrs().Get<int>("stop_axis");
const auto &in_dims = ctx->GetInputDim("X");
int in_dims_size = in_dims.size();
int real_start_axis = start_axis, real_stop_axis = stop_axis;
if (start_axis < 0) {
real_start_axis = start_axis + in_dims_size;
}
if (stop_axis < 0) {
real_stop_axis = stop_axis + in_dims_size;
}
PADDLE_ENFORCE_GE(
real_stop_axis, real_start_axis,
platform::errors::InvalidArgument("The stop_axis should be greater"
"than or equal to start_axis."));
const auto &out_dims =
GetOutputShape(real_start_axis, real_stop_axis, in_dims);
ctx->SetOutputDim("Out", framework::make_ddim(out_dims));
if (in_dims[0] == out_dims[0]) {
// Only pass LoD when the first dimension of output and Input(X)
// are the same.
ctx->ShareLoD("X", "Out");
}
OP_INOUT_CHECK(ctx->HasOutput("XShape"), "Output", "XShape", "Flatten2");
std::vector<int64_t> xshape_dims(in_dims.size() + 1);
xshape_dims[0] = 0;
for (int i = 0; i < in_dims.size(); ++i) {
xshape_dims[i + 1] = in_dims[i];
}
ctx->SetOutputDim("XShape", framework::make_ddim(xshape_dims));
ctx->ShareLoD("X", "XShape");
}
static std::vector<int32_t> GetOutputShape(const int start_axis,
const int stop_axis,
const framework::DDim &in_dims) {
int64_t outer = 1;
std::vector<int32_t> out_shape;
int in_dims_size = in_dims.size();
out_shape.reserve(in_dims_size - stop_axis + start_axis);
for (int i = 0; i < start_axis; ++i) {
out_shape.push_back(in_dims[i]);
}
for (int i = start_axis; i <= stop_axis; i++) {
outer *= in_dims[i];
}
out_shape.push_back(outer);
for (int i = stop_axis + 1; i < in_dims_size; i++) {
out_shape.push_back(in_dims[i]);
}
return out_shape;
}
};
class FlattenContiguousRangeOpMaker : public FlattenOpMaker {
public:
void Make() override {
AddInput("X", "(Tensor) A tensor of rank >= axis.");
AddOutput("Out",
"A 2D tensor is reshaped input tensor. The input dimensions"
"up to axis are flattened to the outer dimension of the output"
"and the remaining input dimensions are flattened into the inner"
"dimension of the output.");
AddAttr<int>("start_axis",
"(int)"
"Indicate the input start dimension (exclusive) to flatten")
.SetDefault(1);
AddAttr<int>("stop_axis",
"(int)"
"Indicate the input stop dimension (exclusive) to flatten")
.SetDefault(1);
AddComment(R"DOC(
Flatten Operator
Flattens the input tensor into a new matrix according to start_axis and stop_axis.
Examples:
Case 1:
Given
X.shape = (3, 100, 100, 4)
and
start_axis = 2, stop_axis = -1
We get:
Out.shape = (3, 100, 400)
Case 2:
Given
X.shape = (3, 100, 100, 4)
and
start_axis = 0, stop_axis = -1
We get:
Out.shape = (3 * 100 * 100 * 4)
)DOC");
AddOutput("XShape",
"XShape is just used to store the shape and lod of X, which will "
"be used in FlattenGradOp.")
.AsIntermediate();
}
};
template <typename T>
class FlattenContiguousRangeGradOpMaker
: public framework::SingleGradOpMaker<T> {
public:
using framework::SingleGradOpMaker<T>::SingleGradOpMaker;
void Apply(GradOpPtr<T> grad_op) const override {
grad_op->SetType("flatten_contiguous_range_grad");
grad_op->SetInput("XShape", this->Output("XShape"));
grad_op->SetInput(framework::GradVarName("Out"), this->OutputGrad("Out"));
grad_op->SetOutput(framework::GradVarName("X"), this->InputGrad("X"));
grad_op->SetAttrMap(this->Attrs());
}
};
class FlattenContiguousRangeGradOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *context) const override {
OP_INOUT_CHECK(context->HasInput("XShape"), "Input", "XShape",
"FlattenContiguousRangeGrad");
OP_INOUT_CHECK(context->HasInput(framework::GradVarName("Out")), "Input",
framework::GradVarName("Out"), "FlattenContiguousRangeGrad");
auto xshape_dims = context->GetInputDim("XShape");
auto x_dims = framework::slice_ddim(xshape_dims, 1, xshape_dims.size());
context->SetOutputDim(framework::GradVarName("X"), x_dims);
context->ShareLoD("XShape", framework::GradVarName("X"));
}
protected:
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext &ctx) const override {
return framework::OpKernelType(OperatorWithKernel::IndicateVarDataType(
ctx, framework::GradVarName("Out")),
ctx.device_context());
}
};
DECLARE_INPLACE_OP_INFERER(FlattenOpInplaceInferer, {"X", "Out"});
DECLARE_INPLACE_OP_INFERER(FlattenGradInplaceInferer,
{framework::GradVarName("Out"),
......@@ -266,6 +416,16 @@ REGISTER_OPERATOR(flatten2, ops::Flatten2Op, ops::Flatten2OpMaker,
REGISTER_OPERATOR(flatten2_grad, ops::Flatten2GradOp,
ops::FlattenGradInplaceInferer);
REGISTER_OPERATOR(
flatten_contiguous_range, ops::FlattenContiguousRangeOp,
ops::FlattenContiguousRangeOpMaker,
ops::FlattenContiguousRangeGradOpMaker<paddle::framework::OpDesc>,
ops::FlattenContiguousRangeGradOpMaker<paddle::imperative::OpBase>,
ops::FlattenOpInplaceInferer);
REGISTER_OPERATOR(flatten_contiguous_range_grad,
ops::FlattenContiguousRangeGradOp,
ops::FlattenGradInplaceInferer);
REGISTER_OP_CPU_KERNEL(
flatten, ops::FlattenKernel<paddle::platform::CPUDeviceContext, float>,
ops::FlattenKernel<paddle::platform::CPUDeviceContext, double>,
......@@ -292,3 +452,26 @@ REGISTER_OP_CPU_KERNEL(
ops::Flatten2GradKernel<paddle::platform::CPUDeviceContext, int>,
ops::Flatten2GradKernel<paddle::platform::CPUDeviceContext, int8_t>,
ops::Flatten2GradKernel<paddle::platform::CPUDeviceContext, int64_t>);
REGISTER_OP_CPU_KERNEL(
flatten_contiguous_range,
ops::FlattenContiguousRangeKernel<paddle::platform::CPUDeviceContext,
float>,
ops::FlattenContiguousRangeKernel<paddle::platform::CPUDeviceContext,
double>,
ops::FlattenContiguousRangeKernel<paddle::platform::CPUDeviceContext, int>,
ops::FlattenContiguousRangeKernel<paddle::platform::CPUDeviceContext,
int8_t>,
ops::FlattenContiguousRangeKernel<paddle::platform::CPUDeviceContext,
int64_t>);
REGISTER_OP_CPU_KERNEL(
flatten_contiguous_range_grad,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CPUDeviceContext,
float>,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CPUDeviceContext,
double>,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CPUDeviceContext,
int>,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CPUDeviceContext,
int8_t>,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CPUDeviceContext,
int64_t>);
......@@ -42,3 +42,26 @@ REGISTER_OP_CUDA_KERNEL(
ops::Flatten2GradKernel<paddle::platform::CUDADeviceContext, int>,
ops::Flatten2GradKernel<paddle::platform::CUDADeviceContext, int8_t>,
ops::Flatten2GradKernel<paddle::platform::CUDADeviceContext, int64_t>);
REGISTER_OP_CUDA_KERNEL(
flatten_contiguous_range,
ops::FlattenContiguousRangeKernel<paddle::platform::CUDADeviceContext,
float>,
ops::FlattenContiguousRangeKernel<paddle::platform::CUDADeviceContext,
double>,
ops::FlattenContiguousRangeKernel<paddle::platform::CUDADeviceContext, int>,
ops::FlattenContiguousRangeKernel<paddle::platform::CUDADeviceContext,
int8_t>,
ops::FlattenContiguousRangeKernel<paddle::platform::CUDADeviceContext,
int64_t>);
REGISTER_OP_CUDA_KERNEL(
flatten_contiguous_range_grad,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CUDADeviceContext,
float>,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CUDADeviceContext,
double>,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CUDADeviceContext,
int>,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CUDADeviceContext,
int8_t>,
ops::FlattenContiguousRangeGradKernel<paddle::platform::CUDADeviceContext,
int64_t>);
......@@ -112,5 +112,73 @@ class Flatten2GradKernel : public framework::OpKernel<T> {
}
};
template <typename DeviceContext, typename T>
class FlattenContiguousRangeKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &context) const override {
auto &start_axis = context.Attr<int>("start_axis");
auto &stop_axis = context.Attr<int>("stop_axis");
auto *in = context.Input<framework::LoDTensor>("X");
auto x_dims = in->dims();
int in_dims_size = x_dims.size();
int real_start_axis = start_axis, real_stop_axis = stop_axis;
if (start_axis < 0) {
real_start_axis = start_axis + in_dims_size;
}
if (stop_axis < 0) {
real_stop_axis = stop_axis + in_dims_size;
}
auto *out = context.Output<framework::LoDTensor>("Out");
auto out_dims = framework::make_ddim(
GetOutputShape(real_start_axis, real_stop_axis, x_dims));
out->mutable_data(context.GetPlace(), in->type());
framework::TensorCopy(
*in, context.GetPlace(),
context.template device_context<platform::DeviceContext>(), out);
out->Resize(out_dims);
}
static std::vector<int32_t> GetOutputShape(const int start_axis,
const int stop_axis,
const framework::DDim &in_dims) {
int64_t outer = 1;
std::vector<int32_t> out_shape;
int in_dims_size = in_dims.size();
out_shape.reserve(in_dims_size - stop_axis + start_axis);
for (int i = 0; i < start_axis; ++i) {
out_shape.push_back(in_dims[i]);
}
for (int i = start_axis; i <= stop_axis; i++) {
outer *= in_dims[i];
}
out_shape.push_back(outer);
for (int i = stop_axis + 1; i < in_dims_size; i++) {
out_shape.push_back(in_dims[i]);
}
return out_shape;
}
};
template <typename DeviceContext, typename T>
class FlattenContiguousRangeGradKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
auto *d_x = ctx.Output<framework::LoDTensor>(framework::GradVarName("X"));
auto *d_out =
ctx.Input<framework::LoDTensor>(framework::GradVarName("Out"));
auto xshape_dims = ctx.Input<framework::LoDTensor>("XShape")->dims();
auto x_dims = framework::slice_ddim(xshape_dims, 1, xshape_dims.size());
d_x->mutable_data(ctx.GetPlace(), d_out->type());
framework::TensorCopySync(*d_out, ctx.GetPlace(), d_x);
d_x->Resize(x_dims);
}
};
} // namespace operators
} // namespace paddle
......@@ -7,7 +7,12 @@ register_operators(EXCLUDES
fused_fc_elementwise_layernorm_op
multihead_matmul_op
fused_embedding_eltwise_layernorm_op
fusion_group_op)
fusion_group_op
fusion_gru_op)
# fusion_gru_op does not have CUDA kernel
op_library(fusion_gru_op)
file(APPEND ${pybind_file} "USE_CPU_ONLY_OP(fusion_gru);\n")
if (WITH_GPU)
# fused_bn_activation_op needs cudnn 7.4.1 above
......
......@@ -19,6 +19,9 @@ limitations under the License. */
#include "paddle/fluid/operators/math/blas.h"
#include "paddle/fluid/operators/math/fc.h"
#include "paddle/fluid/operators/math/sequence2batch.h"
#ifdef PADDLE_WITH_MKLDNN
#include "paddle/fluid/platform/mkldnn_helper.h"
#endif
namespace paddle {
namespace operators {
......@@ -122,8 +125,17 @@ void FusionGRUOp::InferShape(framework::InferShapeContext* ctx) const {
framework::OpKernelType FusionGRUOp::GetExpectedKernelType(
const framework::ExecutionContext& ctx) const {
framework::LibraryType library = framework::LibraryType::kPlain;
framework::DataLayout layout = framework::DataLayout::kAnyLayout;
#ifdef PADDLE_WITH_MKLDNN
if (platform::CanMKLDNNBeUsed(ctx)) {
library = framework::LibraryType::kMKLDNN;
layout = framework::DataLayout::kMKLDNN;
}
#endif
return framework::OpKernelType(
OperatorWithKernel::IndicateVarDataType(ctx, "X"), ctx.device_context());
OperatorWithKernel::IndicateVarDataType(ctx, "X"), ctx.GetPlace(), layout,
library);
}
void FusionGRUOpMaker::Make() {
......@@ -187,6 +199,9 @@ void FusionGRUOpMaker::Make() {
"bool"
"use origin mode in article https://arxiv.org/abs/1412.3555")
.SetDefault(false);
AddAttr<bool>("use_mkldnn",
"(bool, default false) Only used in mkldnn kernel")
.SetDefault(false);
AddComment(R"DOC(
The Fusion complete GRU Operator.
This operator fuse the fully-connected operator into GRU,
......
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/fused/fusion_gru_op.h"
#include "paddle/fluid/platform/mkldnn_reuse.h"
namespace paddle {
namespace operators {
using paddle::framework::LoDTensor;
using paddle::framework::Tensor;
using paddle::platform::CPUDeviceContext;
using paddle::platform::MKLDNNGetDataType;
using paddle::platform::MKLDNNMemDesc;
using platform::to_void_cast;
template <typename T>
class GRUMKLDNNHandler : public platform::MKLDNNHandlerT<T, dnnl::gru_forward> {
public:
GRUMKLDNNHandler(const paddle::framework::ExecutionContext& ctx,
const platform::MKLDNNDeviceContext& dev_ctx,
const mkldnn::engine mkldnn_engine,
platform::Place cpu_place, const LoDTensor* input,
const Tensor* weight_h, const Tensor* h0,
const bool is_reverse, const int64_t N, const int64_t Ti,
const int64_t IC, const int64_t OC,
const std::string& unique_name)
: platform::MKLDNNHandlerT<T, dnnl::gru_forward>(
dev_ctx, dev_ctx.GetEngine(), cpu_place,
platform::CreateKey(unique_name, Ti)),
N(N),
Ti(Ti),
IC(IC),
OC(OC) {
// Create memory key without Ti because weights, bias and h0 memories
// do not depend on Ti size but primitive and input/output memory do
if (platform::MKLDNNDeviceContext::tls().get_cur_mkldnn_session_id() !=
platform::MKLDNNDeviceContextThreadLocals::kMKLDNNSessionID_Default) {
memory_key_ = unique_name;
} else {
memory_key_ = unique_name + "-t:" + platform::ThreadIDasStr();
}
if (!this->isCached()) {
// oneDNN kernel has hardcoded activation functions
PADDLE_ENFORCE_EQ(
ctx.Attr<std::string>("gate_activation"), "sigmoid",
platform::errors::Unimplemented(
"oneDNN fusion_gru supports only sigmoid as a gate activation."));
PADDLE_ENFORCE_EQ(
ctx.Attr<std::string>("activation"), "tanh",
platform::errors::Unimplemented(
"oneDNN fusion_gru supports only tanh as an activation."));
// oneDNN RNN dimensions
const int64_t D = 1; // Directions
const int64_t L = 1; // Layers (PP supports only 1 stacked layer)
const int64_t G = 3; // Number of Gates, 3 for GRU
// Create memory descriptors
auto input_md = MKLDNNMemDesc({Ti, N, IC}, MKLDNNGetDataType<T>(),
MKLDNNMemoryFormat::any);
auto weight_x_md = MKLDNNMemDesc(
{L, D, IC, G, OC}, MKLDNNGetDataType<T>(), MKLDNNMemoryFormat::any);
auto weight_h_md = MKLDNNMemDesc(
{L, D, OC, G, OC}, MKLDNNGetDataType<T>(), MKLDNNMemoryFormat::any);
auto bias_md = MKLDNNMemDesc({L, D, G, OC}, MKLDNNGetDataType<float>(),
MKLDNNMemoryFormat::ldgo);
auto hidden_md = MKLDNNMemDesc({Ti, N, OC}, MKLDNNGetDataType<T>(),
MKLDNNMemoryFormat::any);
auto h0_md = dnnl::memory::desc();
if (h0) {
h0_md = MKLDNNMemDesc({L, D, N, OC}, MKLDNNGetDataType<T>(),
MKLDNNMemoryFormat::ldnc);
}
// Create GRU oneDNN primitive
const auto direction =
is_reverse ? dnnl::rnn_direction::unidirectional_right2left
: dnnl::rnn_direction::unidirectional_left2right;
this->AcquireForwardPrimitiveDescriptor(
dnnl::prop_kind::forward_inference, direction, input_md, h0_md,
weight_x_md, weight_h_md, bias_md, hidden_md, dnnl::memory::desc());
}
}
bool is_NTC() {
return (platform::GetMKLDNNFormat(this->fwd_pd_->dst_desc()) ==
dnnl::memory::format_tag::ntc);
}
void reorderRNNdata(const T* input_data, T* output_data,
std::vector<size_t> lod, const bool is_reverse,
platform::RNNReorderType reorder_type) {
switch (reorder_type) {
// Reorder input memory [WORDS, C] + LoD -> [N, T, C]
case platform::RNNReorderType::PP_NTC: {
auto* input_data_iter = input_data;
for (int n = 0; n < N; ++n) {
const auto num_elements = (lod[n + 1] - lod[n]) * IC;
const auto offset = is_reverse ? (Ti * IC - num_elements) : 0;
memcpy(output_data + n * Ti * IC + offset, input_data_iter,
sizeof(T) * num_elements);
input_data_iter += num_elements;
}
} break;
// Reorder input memory [WORDS, C] + LoD -> [T, N, C]
case platform::RNNReorderType::PP_TNC: {
auto* input_data_iter = input_data;
for (int n = 0; n < N; ++n) {
const auto num_elements = (lod[n + 1] - lod[n]);
const auto offset = is_reverse ? (Ti - num_elements) : 0;
for (size_t t = 0; t < num_elements; ++t) {
memcpy(output_data + (t + offset) * N * IC + n * IC,
input_data_iter, sizeof(T) * IC);
input_data_iter += IC;
}
}
} break;
// Reorder output values to PP format [N, T, C] -> [WORDS, C]
case platform::RNNReorderType::NTC_PP: {
auto* output_data_iter = output_data;
for (int n = 0; n < N; ++n) {
const auto num_elements = (lod[n + 1] - lod[n]) * OC;
const auto offset = is_reverse ? (Ti * OC - num_elements) : 0;
memcpy(output_data_iter, input_data + n * Ti * OC + offset,
sizeof(T) * num_elements);
output_data_iter += num_elements;
}
} break;
// Reorder output values to PP format [T, N, C] -> [WORDS, C]
case platform::RNNReorderType::TNC_PP: {
auto* output_data_iter = output_data;
for (int n = 0; n < N; ++n) {
const auto num_elements = lod[n + 1] - lod[n];
const auto offset = is_reverse ? (Ti - num_elements) : 0;
for (size_t t = 0; t < num_elements; ++t) {
memcpy(output_data_iter,
input_data + (t + offset) * N * OC + n * OC, sizeof(T) * OC);
output_data_iter += OC;
}
}
} break;
}
}
std::shared_ptr<dnnl::memory> AcquireInputMemoryWithReorder(
const LoDTensor* input, const bool is_reverse) {
const auto name = this->key_ + "@input_mem";
auto memory_p =
std::static_pointer_cast<dnnl::memory>(this->dev_ctx_.GetBlob(name));
if (!memory_p) {
memory_p = std::make_shared<dnnl::memory>(this->fwd_pd_->src_desc(),
this->engine_);
this->dev_ctx_.SetBlob(name, memory_p);
}
const auto& input_lod = input->lod()[0];
auto* x_data = input->data<T>();
auto* x_onednn_data = reinterpret_cast<T*>(memory_p->get_data_handle());
memset(x_onednn_data, 0, sizeof(T) * N * Ti * IC);
if (platform::GetMKLDNNFormat(this->fwd_pd_->src_desc()) ==
dnnl::memory::format_tag::ntc) {
reorderRNNdata(x_data, x_onednn_data, input_lod, is_reverse,
platform::RNNReorderType::PP_NTC);
} else {
reorderRNNdata(x_data, x_onednn_data, input_lod, is_reverse,
platform::RNNReorderType::PP_TNC);
}
return memory_p;
}
std::shared_ptr<dnnl::memory> AcquireOutputMemory() {
const auto name = this->key_ + "@output_mem";
auto memory_p =
std::static_pointer_cast<dnnl::memory>(this->dev_ctx_.GetBlob(name));
if (!memory_p) {
memory_p = std::make_shared<dnnl::memory>(this->fwd_pd_->dst_desc(),
this->engine_);
this->dev_ctx_.SetBlob(name, memory_p);
}
return memory_p;
}
std::shared_ptr<dnnl::memory> AcquireH0Memory(const Tensor* h0) {
const std::string h0_key = memory_key_ + "@h0";
auto memory_p =
std::static_pointer_cast<dnnl::memory>(this->dev_ctx_.GetBlob(h0_key));
auto* h0_data = to_void_cast(h0->data<T>());
if (!memory_p) {
memory_p = std::make_shared<dnnl::memory>(
this->fwd_pd_->weights_layer_desc(), this->engine_, h0_data);
this->dev_ctx_.SetBlob(h0_key, memory_p);
} else {
memory_p->set_data_handle(h0_data);
}
return memory_p;
}
std::shared_ptr<dnnl::memory> AcquireWeightXMemory(const Tensor* weight_x,
const bool origin_mode) {
const std::string wx_key = memory_key_ + "@weight_x";
auto memory_p =
std::static_pointer_cast<dnnl::memory>(this->dev_ctx_.GetBlob(wx_key));
if (!memory_p) {
auto user_md =
MKLDNNMemDesc({1, 1, IC, 3, OC}, MKLDNNGetDataType<float>(),
MKLDNNMemoryFormat::ldigo);
auto user_memory = dnnl::memory(user_md, this->engine_);
auto* weight_x_data =
reinterpret_cast<float*>(user_memory.get_data_handle());
memcpy(weight_x_data, weight_x->data<float>(),
sizeof(float) * IC * 3 * OC);
if (origin_mode == false) {
for (int64_t i = 0; i < IC; ++i) {
for (int64_t j = 0; j < OC; ++j) {
weight_x_data[j] *= -1;
}
weight_x_data += 3 * OC;
}
}
memory_p = std::make_shared<dnnl::memory>(
this->fwd_pd_->weights_layer_desc(), this->engine_);
dnnl::stream astream(this->engine_);
dnnl::reorder(user_memory, *memory_p)
.execute(astream, user_memory, *memory_p);
this->dev_ctx_.SetBlob(wx_key, memory_p);
}
return memory_p;
}
std::shared_ptr<dnnl::memory> AcquireWeightHMemory(const Tensor* weight_h,
const bool origin_mode) {
const std::string wh_key = memory_key_ + "@weight_h";
auto memory_p =
std::static_pointer_cast<dnnl::memory>(this->dev_ctx_.GetBlob(wh_key));
if (!memory_p) {
auto user_md =
MKLDNNMemDesc({1, 1, OC, 3, OC}, MKLDNNGetDataType<float>(),
MKLDNNMemoryFormat::ldigo);
auto user_memory = dnnl::memory(user_md, this->engine_);
// Reorder weights_h from PP format [OC, 2OC] + [OC, OC] to
// oneDNN format [OC, 3OC]
auto* weight_h_data =
reinterpret_cast<float*>(user_memory.get_data_handle());
auto* user_weight_h_data = weight_h->data<float>();
auto src1_iter = user_weight_h_data;
auto src2_iter = user_weight_h_data + 2 * OC * OC;
for (int64_t c = 0; c < OC; ++c) {
memcpy(weight_h_data, src1_iter, 2 * OC * sizeof(float));
memcpy(weight_h_data + 2 * OC, src2_iter, OC * sizeof(float));
src1_iter += 2 * OC;
src2_iter += OC;
weight_h_data += 3 * OC;
}
weight_h_data = reinterpret_cast<float*>(user_memory.get_data_handle());
if (origin_mode == false) {
for (int64_t i = 0; i < OC; ++i) {
for (int64_t j = 0; j < OC; ++j) {
weight_h_data[j] *= -1;
}
weight_h_data += 3 * OC;
}
}
memory_p = std::make_shared<dnnl::memory>(
this->fwd_pd_->weights_iter_desc(), this->engine_);
dnnl::stream astream(this->engine_);
dnnl::reorder(user_memory, *memory_p)
.execute(astream, user_memory, *memory_p);
this->dev_ctx_.SetBlob(wh_key, memory_p);
}
return memory_p;
}
std::shared_ptr<dnnl::memory> AcquireBiasMemory(const Tensor* bias,
const bool origin_mode) {
const std::string bias_key = memory_key_ + "@bias";
auto memory_p = std::static_pointer_cast<dnnl::memory>(
this->dev_ctx_.GetBlob(bias_key));
if (!memory_p) {
memory_p = std::make_shared<dnnl::memory>(this->fwd_pd_->bias_desc(),
this->engine_);
auto* bias_data = reinterpret_cast<float*>(memory_p->get_data_handle());
if (bias) {
const float* user_bias_data =
bias->data<float>(); // Bias in oneDNN is always float
memcpy(bias_data, user_bias_data, sizeof(float) * 3 * OC);
} else {
// oneDNN always need bias memory, if it's not provided in PP, let
// oneDNN allocate memory and set it to 0
memset(bias_data, 0, sizeof(float) * 3 * OC);
}
if (origin_mode == false && bias) {
for (int64_t i = 0; i < OC; ++i) {
bias_data[i] *= -1;
}
}
this->dev_ctx_.SetBlob(bias_key, memory_p);
}
return memory_p;
}
private:
// RNN dimensions
// N - Batch Size
// Ti - Max sentence length
// IC - Input Channels
// OC - Output Channels
const int64_t N, Ti, IC, OC;
// Memory size of weights, bias and h0 does not depend
// on Ti size, thus we need another key to cache them
std::string memory_key_;
};
template <typename T>
class FusionGRUMKLDNNKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto& dev_ctx =
ctx.template device_context<platform::MKLDNNDeviceContext>();
const auto& mkldnn_engine = dev_ctx.GetEngine();
// Get Tensors
const auto* input = ctx.Input<LoDTensor>("X");
const auto* h0 = ctx.Input<Tensor>("H0");
const auto* weight_x = ctx.Input<Tensor>("WeightX");
const auto* weight_h = ctx.Input<Tensor>("WeightH");
const auto* bias = ctx.Input<Tensor>("Bias");
auto* hidden = ctx.Output<LoDTensor>("Hidden");
// Get attributes
const bool is_reverse = ctx.Attr<bool>("is_reverse");
const bool origin_mode = ctx.Attr<bool>("origin_mode");
// Get tensor dimensions
const auto x_dims = framework::vectorize(input->dims());
const auto weight_h_dims = framework::vectorize(weight_h->dims());
const auto& input_lod = input->lod()[0];
// Calculate RNN dimensions
const int64_t N = input_lod.size() - 1; // Number of sentences (batches)
const int64_t Ti = // Max length of the sentence in a batch
[&input_lod]() {
size_t res = 0;
for (size_t i = 0; i < (input_lod.size() - 1); ++i) {
res = std::max(res, input_lod[i + 1] - input_lod[i]);
}
return res;
}();
const int64_t IC = x_dims[1]; // Input channels
const int64_t OC = weight_h_dims[0]; // Output channels
GRUMKLDNNHandler<T> handler(ctx, dev_ctx, mkldnn_engine, ctx.GetPlace(),
input, weight_h, h0, is_reverse, N, Ti, IC, OC,
ctx.InputName("X") + ctx.InputName("WeightH"));
auto input_memory_p =
handler.AcquireInputMemoryWithReorder(input, is_reverse);
auto weight_x_memory_p =
handler.AcquireWeightXMemory(weight_x, origin_mode);
auto weight_h_memory_p =
handler.AcquireWeightHMemory(weight_h, origin_mode);
auto bias_memory_p = handler.AcquireBiasMemory(bias, origin_mode);
auto hidden_onednn_memory_p = handler.AcquireOutputMemory();
std::unordered_map<int, dnnl::memory> gru_args = {
{DNNL_ARG_SRC_LAYER, *input_memory_p},
{DNNL_ARG_WEIGHTS_LAYER, *weight_x_memory_p},
{DNNL_ARG_WEIGHTS_ITER, *weight_h_memory_p},
{DNNL_ARG_BIAS, *bias_memory_p},
{DNNL_ARG_DST_LAYER, *hidden_onednn_memory_p}};
if (h0) {
auto h0_memory_p = handler.AcquireH0Memory(h0);
gru_args.insert({DNNL_ARG_SRC_ITER, *h0_memory_p});
}
auto gru_forward_p = handler.AcquireForwardPrimitive();
dnnl::stream astream(mkldnn_engine);
gru_forward_p->execute(astream, gru_args);
astream.wait();
auto* hidden_onednn_data =
reinterpret_cast<T*>(hidden_onednn_memory_p->get_data_handle());
auto* hidden_data = hidden->mutable_data<T>(ctx.GetPlace());
if (handler.is_NTC()) {
handler.reorderRNNdata(hidden_onednn_data, hidden_data, input_lod,
is_reverse, platform::RNNReorderType::NTC_PP);
} else {
handler.reorderRNNdata(hidden_onednn_data, hidden_data, input_lod,
is_reverse, platform::RNNReorderType::TNC_PP);
}
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OP_KERNEL(fusion_gru, MKLDNN, paddle::platform::CPUPlace,
ops::FusionGRUMKLDNNKernel<float>);
......@@ -181,6 +181,8 @@ inline mkldnn::memory::format_tag GetMKLDNNFormat(
if (inner_nblks == 0) {
if (strides[0] >= strides[1] && strides[1] >= strides[2]) {
return mkldnn::memory::format_tag::ncw;
} else if (strides[1] >= strides[0] && strides[0] >= strides[2]) {
return mkldnn::memory::format_tag::ntc;
} else {
return mkldnn::memory::format_tag::nwc;
}
......@@ -420,5 +422,7 @@ inline std::vector<std::vector<int64_t>> ToMkldnnPadding(
}
}
enum class RNNReorderType { PP_NTC, PP_TNC, NTC_PP, TNC_PP };
} // namespace platform
} // namespace paddle
set(PYBIND_DEPS pybind python proto_desc memory executor fleet_wrapper box_wrapper prune
feed_fetch_method pass_builder parallel_executor profiler layer tracer engine scope_pool
analysis_predictor imperative_profiler imperative_flag save_load_util dlpack_tensor device_context
gloo_wrapper infer_io_utils)
gloo_wrapper infer_io_utils heter_wrapper)
if (WITH_NCCL)
set(PYBIND_DEPS ${PYBIND_DEPS} nccl_wrapper)
......@@ -31,6 +31,7 @@ set(PYBIND_SRCS
global_value_getter_setter.cc
reader_py.cc
fleet_wrapper_py.cc
heter_wrapper_py.cc
gloo_wrapper_py.cc
box_helper_py.cc
data_set_py.cc
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <fcntl.h>
#ifdef _POSIX_C_SOURCE
#undef _POSIX_C_SOURCE
#endif
#ifdef _XOPEN_SOURCE
#undef _XOPEN_SOURCE
#endif
#include <string>
#include <vector>
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/text_format.h"
#include "paddle/fluid/framework/fleet/heter_wrapper.h"
#include "paddle/fluid/pybind/heter_wrapper_py.h"
namespace py = pybind11;
namespace paddle {
namespace pybind {
#ifdef PADDLE_WITH_PSLIB
void BindHeterWrapper(py::module* m) {
py::class_<framework::HeterWrapper, std::shared_ptr<framework::HeterWrapper>>(
*m, "Heter")
.def(py::init([]() { return framework::HeterWrapper::GetInstance(); }))
.def("create_client2xpu_connection",
&framework::HeterWrapper::CreateClient2XpuConnection)
.def("set_xpu_list", &framework::HeterWrapper::SetXpuList)
.def("start_xpu_service", &framework::HeterWrapper::StartXpuService)
.def("end_pass", &framework::HeterWrapper::EndPass)
.def("stop_xpu_service", &framework::HeterWrapper::StopXpuService);
} // end HeterWrapper
#endif
} // end namespace pybind
} // end namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "pybind11/pybind11.h"
#include "pybind11/stl.h"
namespace py = pybind11;
namespace paddle {
namespace pybind {
#ifdef PADDLE_WITH_PSLIB
void BindHeterWrapper(py::module* m);
#endif
} // namespace pybind
} // namespace paddle
......@@ -66,6 +66,7 @@ limitations under the License. */
#include "paddle/fluid/pybind/fleet_wrapper_py.h"
#include "paddle/fluid/pybind/global_value_getter_setter.h"
#include "paddle/fluid/pybind/gloo_wrapper_py.h"
#include "paddle/fluid/pybind/heter_wrapper_py.h"
#include "paddle/fluid/pybind/imperative.h"
#include "paddle/fluid/pybind/inference_api.h"
#include "paddle/fluid/pybind/ir.h"
......@@ -2479,6 +2480,9 @@ All parameter, weight, gradient are variables in Paddle.
.def("device_count", &ParallelExecutor::DeviceCount);
BindFleetWrapper(&m);
#ifdef PADDLE_WITH_PSLIB
BindHeterWrapper(&m);
#endif
BindGlooWrapper(&m);
BindBoxHelper(&m);
#ifdef PADDLE_WITH_BOX_PS
......
......@@ -16,10 +16,13 @@
from .base.distributed_strategy import DistributedStrategy
from .base.fleet_base import Fleet
from .base.util_factory import UtilBase
from .dataset import *
#from .base.role_maker import PaddleCloudRoleMaker
__all__ = ["DistributedStrategy", "UtilBase"]
__all__ = [
"DistributedStrategy", "UtilBase", "DatasetFactory", "DatasetBase",
"InMemoryDataset", "QueueDataset"
]
fleet = Fleet()
init = fleet.init
......
......@@ -10,3 +10,5 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from .dataset import *
此差异已折叠。
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import functools
import sys
__all__ = ['deprecated']
def deprecated(since, instead, extra_message=""):
def decorator(func):
err_msg = "API {0} is deprecated since {1}. Please use {2} instead.".format(
func.__name__, since, instead)
if len(extra_message) != 0:
err_msg += "\n"
err_msg += extra_message
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(err_msg, file=sys.stderr)
return func(*args, **kwargs)
wrapper.__doc__ += "\n "
wrapper.__doc__ += err_msg
return wrapper
return decorator
......@@ -223,7 +223,8 @@ class DownpourSGD(DeviceWorker):
dense_table_set.add(i)
break
trainer_desc.device_worker_name = "DownpourWorker"
trainer_desc.device_worker_name = opt_info.get("worker_class",
"DownpourWorker")
pull_thread = trainer_desc.pull_dense_param
pull_thread.device_num = trainer_desc.thread_num
if opt_info.get("program_id_to_worker") is None:
......
......@@ -36,7 +36,6 @@ from paddle.fluid.wrapped_decorator import signature_safe_contextmanager
from paddle.fluid.dygraph.base import param_guard
from paddle.fluid.data_feeder import check_type
from paddle.fluid.dygraph.dygraph_to_static.partial_program import partial_program_from
from paddle.fluid.annotations import deprecated
__all__ = ['ProgramTranslator', 'convert_to_static']
......
......@@ -35,7 +35,7 @@ __all__ = [
'Conv2D', 'Conv3D', 'Pool2D', 'Linear', 'BatchNorm', 'Dropout', 'Embedding',
'GRUUnit', 'InstanceNorm', 'LayerNorm', 'NCE', 'PRelu',
'BilinearTensorProduct', 'Conv2DTranspose', 'Conv3DTranspose', 'GroupNorm',
'SpectralNorm', 'TreeConv'
'SpectralNorm', 'TreeConv', 'Flatten'
]
......@@ -3182,3 +3182,62 @@ class TreeConv(layers.Layer):
else:
pre_activation = out
return self._helper.append_activation(pre_activation, act=self._act)
class Flatten(layers.Layer):
"""
:alias_main: paddle.nn.Flatten
:alias: paddle.nn.Flatten,paddle.nn.layer.Flatten,paddle.nn.layer.common.Flatten
This interface is used to construct a callable object of the ``FLatten`` class.
For more details, refer to code examples.
It implements flatten a contiguous range of dims into a tensor.
Equation:
Parameters:
start_axis(int): first dim to flatten (default = 1)
stop_axis(int): last dim to flatten (default = -1).
Returns:
None
Examples:
.. code-block:: python
import paddle
from paddle.imperative import to_variable
import numpy as np
inp_np = np.ones([5, 2, 3, 4]).astype('float32')
paddle.enable_imperative()
inp_np = to_variable(inp_np)
flatten = paddle.nn.Flatten(start_axis=1, stop_axis=2)
flatten_res = flatten(inp_np)
"""
def __init__(self, start_axis=1, stop_axis=-1):
super(Flatten, self).__init__()
self.start_axis = start_axis
self.stop_axis = stop_axis
def forward(self, input):
out = self._helper.create_variable_for_type_inference(input.dtype)
x_shape = self._helper.create_variable_for_type_inference(input.dtype)
if in_dygraph_mode():
dy_out, _ = core.ops.flatten_contiguous_range(
input, 'start_axis', self.start_axis, 'stop_axis',
self.stop_axis)
return dy_out
self._helper.append_op(
type="flatten_contiguous_range",
inputs={"X": input},
outputs={"Out": out,
"XShape": x_shape},
attrs={"start_axis": self.start_axis,
"stop_axis": self.stop_axis})
return out
......@@ -1300,6 +1300,12 @@ class Executor(object):
fetch_list=None,
fetch_info=None,
print_period=100):
is_heter = 0
if not program._fleet_opt is None:
if program._fleet_opt.get("worker_class", "") == "HeterCpuWorker":
is_heter = 1
if program._fleet_opt("trainer", "") == "HeterXpuTrainer":
is_heter = 1
if scope is None:
scope = global_scope()
if fetch_list is None:
......@@ -1308,6 +1314,11 @@ class Executor(object):
fetch_info = []
assert len(fetch_list) == len(fetch_info)
compiled = isinstance(program, compiler.CompiledProgram)
if is_heter:
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fu = FleetUtil()
ret = fu.split_program_by_device(program)
if not compiled:
# TODO: Need a better way to distinguish and specify different execution mode
if program._pipeline_opt:
......@@ -1317,6 +1328,8 @@ class Executor(object):
trainer = TrainerFactory()._create_trainer(program._fleet_opt)
trainer._set_thread_barrier(program._is_distributed)
trainer._set_program(program)
if is_heter:
trainer._set_heter_info(ret)
else:
if program._pipeline_opt:
trainer = TrainerFactory()._create_trainer(
......@@ -1476,6 +1489,60 @@ class Executor(object):
debug, fetch_list, fetch_info,
print_period, fetch_handler)
def start_heter_trainer(self,
program=None,
scope=None,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
return self._start_heter_trainer(program, scope, False, debug,
fetch_list, fetch_info, print_period,
fetch_handler)
def _start_heter_trainer(self,
program=None,
scope=None,
is_infer=False,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
scope, trainer = self._prepare_trainer(
program=program,
dataset=None,
scope=scope,
thread=1,
debug=debug,
fetch_list=fetch_list,
fetch_info=fetch_info,
print_period=print_period)
trainer._set_infer(is_infer)
trainer._gen_trainer_desc()
self._dump_debug_info(program=program, trainer=trainer)
trainer_instance = self._default_executor.init_for_dataset(
program.desc, trainer._desc(), scope, None)
#if fetch_handler is not None:
# scope0 = trainer_instance.get_worker_scope(0)
# fetch_monitor = FetchHandlerMonitor(scope0, fetch_handler)
# fetch_monitor.start()
# self._default_executor.run_from_dataset(trainer_instance)
# fetch_monitor.stop()
# self._default_executor.release_trainer(trainer_instance)
#else:
self._default_executor.run_from_dataset(trainer_instance)
#self._default_executor.release_trainer(trainer_instance)
return trainer_instance
def train_from_dataset(self,
program=None,
dataset=None,
......
......@@ -149,6 +149,16 @@ class Fleet(object):
"""
return self._role_maker.is_server()
def is_xpu(self):
"""
Check whether the node is an instance of server.
Returns:
bool: True if this is a node of server,
False if not.
"""
return self._role_maker.is_xpu()
def split_files(self, files):
"""
split files before distributed training,
......
......@@ -28,6 +28,7 @@ __all__ = [
class Role:
WORKER = 1
SERVER = 2
XPU = 3
class MockBarrier(object):
......@@ -988,6 +989,147 @@ class GeneralRoleMaker(RoleMakerBase):
http_server.stop()
class HeterRoleMaker(GeneralRoleMaker):
"""
This role maker is for general use, you can set os.environ to customize:
PADDLE_PSERVERS_IP_PORT_LIST : all pservers' ip:port, separated by ','
PADDLE_TRAINER_ENDPOINTS : all trainers' ip:port, separated by ','
TRAINING_ROLE : TRAINER or PSERVER
PADDLE_TRAINER_ID : current trainer id (only for trainer),
it is index in PADDLE_TRAINER_ENDPOINTS
PADDLE_PSERVER_ID : current pserver id (only for pserver)
it is index in PADDLE_PSERVERS_IP_PORT_LIST
"""
def generate_role(self):
"""
generate role for general role maker
"""
if not self._role_is_generated:
eplist = os.environ["PADDLE_PSERVERS_IP_PORT_LIST"].split(",")
training_role = os.environ["TRAINING_ROLE"]
worker_endpoints = os.environ["PADDLE_TRAINER_ENDPOINTS"].split(",")
trainers_num = len(worker_endpoints)
xpu_endpoints = os.environ["PADDLE_XPU_ENDPOINTS"].split(",")
xpu_num = len(xpu_endpoints)
if training_role not in ["TRAINER", "PSERVER", "XPU"]:
raise ValueError(
"TRAINING_ROLE must be PSERVER or TRAINER or XPU")
if training_role == "TRAINER":
role = Role.WORKER
current_id = int(os.environ["PADDLE_TRAINER_ID"])
self._node_type = 1
self._cur_endpoint = worker_endpoints[current_id]
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(worker_endpoints),
self._hdfs_path.rstrip("/") + "/trainer",
self._hdfs_name, self._hdfs_ugi, self._iface,
self._prefix)
self._node_type_comm = gloo
elif training_role == "XPU":
role = Role.XPU
current_id = int(os.environ["PADDLE_XPU_ID"])
self._node_type = 2
self._cur_endpoint = xpu_endpoints[current_id]
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(xpu_endpoints),
self._hdfs_path.rstrip("/") + "/xpu", self._hdfs_name,
self._hdfs_ugi, self._iface, self._prefix)
self._node_type_comm = gloo
elif training_role == "PSERVER":
role = Role.SERVER
if os.environ.get("PADDLE_PSERVER_ID") is not None:
current_id = int(os.environ["PADDLE_PSERVER_ID"])
cur_endpoint = eplist[current_id]
else:
# this is for compatible with paddlecloud
cur_ip = os.environ["POD_IP"]
cur_port = os.environ["PADDLE_PORT"]
cur_endpoint = ":".join([cur_ip, cur_port])
current_id = eplist.index(cur_endpoint)
self._node_type = 0
self._cur_endpoint = cur_endpoint
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(eplist),
self._hdfs_path.rstrip("/") + "/pserver",
self._hdfs_name, self._hdfs_ugi, self._iface,
self._prefix)
self._node_type_comm = gloo
if training_role == "TRAINER" or training_role == "XPU":
gloo = fluid.core.Gloo()
heter_list = worker_endpoints + xpu_endpoints
gloo.init(
heter_list.index(self._cur_endpoint),
len(heter_list),
self._hdfs_path.rstrip("/") + "/heter", self._hdfs_name,
self._hdfs_ugi, self._iface, self._prefix)
self._heter_comm = gloo
gloo = fluid.core.Gloo()
all_list = worker_endpoints + eplist + xpu_endpoints
gloo.init(
all_list.index(self._cur_endpoint),
len(all_list),
self._hdfs_path.rstrip("/") + "/all", self._hdfs_name,
self._hdfs_ugi, self._iface, self._prefix)
self._all_comm = gloo
self._trainers_num = trainers_num
self._server_endpoints = eplist
self._role = role
self._current_id = current_id
self._rank = all_list.index(self._cur_endpoint)
self._size = len(all_list)
self._worker_endpoints = worker_endpoints
self._xpu_endpoints = xpu_endpoints
self._role_is_generated = True
def is_xpu(self):
"""
whether current process is server
"""
if not self._role_is_generated:
self.generate_role()
return self._role == Role.XPU
def is_first_xpu(self):
"""
whether current process is worker of rank 0
"""
if not self._role_is_generated:
self.generate_role()
return self._role == Role.XPU and self._current_id == 0
def _barrier_xpu(self):
"""
barrier all workers in current distributed job
"""
if not self._role_is_generated:
self.generate_role()
if self.is_xpu():
self._node_type_comm.barrier()
def _barrier_heter(self):
"""
barrier all workers in current distributed job
"""
if not self._role_is_generated:
self.generate_role()
if self.is_xpu() or self.is_worker:
self._heter_comm.barrier()
def xpu_num(self):
"""
"""
if not self._role_is_generated:
self.generate_role()
return len(self._xpu_endpoints)
class UserDefinedRoleMaker(RoleMakerBase):
"""
UserDefinedRoleMaker is designed for worker and server assignment
......
......@@ -23,6 +23,7 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import HeterRoleMaker
class PSLib(Fleet):
......@@ -44,6 +45,9 @@ class PSLib(Fleet):
role_maker = MPISymetricRoleMaker()
super(PSLib, self).init(role_maker)
self._fleet_ptr = fluid.core.Fleet()
self._heter_ptr = None
if isinstance(role_maker, HeterRoleMaker):
self._heter_ptr = fluid.core.Heter()
def _set_client_communication_config(self, request_timeout_ms,
connect_timeout_ms, max_retry):
......@@ -77,15 +81,27 @@ class PSLib(Fleet):
raise Exception(
"You should run DistributedOptimizer.minimize() first")
# barrier_all for init_server, wait for server starts
if isinstance(self._role_maker, HeterRoleMaker):
if self._role_maker.is_xpu():
local_endpoint = self._role_maker.get_local_endpoint()
local_endpoint = local_endpoint.split(":")
self._heter_ptr.start_xpu_service(
str(local_endpoint[0]), int(local_endpoint[1]))
self._role_maker._barrier_all()
self.all_ips_ = self._role_maker._all_gather(self._local_ip)
# worker_index * 2 is for compatible with older versions of pslib
self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_,
self._role_maker._get_size(),
self._role_maker.worker_index() * 2)
if isinstance(self._role_maker, HeterRoleMaker):
if self._role_maker.is_worker():
self._heter_ptr.set_xpu_list(
self._role_maker._xpu_endpoints)
self._heter_ptr.create_client2xpu_connection()
# barrier_all for init_worker
self._role_maker._barrier_all()
# prepare for client to client communication
if self._role_maker.is_worker():
info = self._fleet_ptr.get_clients_info()
all_info = self._role_maker._worker_gather(info[0])
self._fleet_ptr.gather_clients(all_info)
......@@ -144,6 +160,12 @@ class PSLib(Fleet):
>>> fleet.init_server("/you/path/to/model", mode = 0)
"""
mode = kwargs.get("mode", 0)
if isinstance(self._role_maker, HeterRoleMaker):
self._role_maker._barrier_xpu()
if self._role_maker.is_first_xpu():
self._fleet_ptr.load_model(model_dir, mode)
self._role_maker._barrier_xpu()
else:
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.load_model(model_dir, mode)
......@@ -185,6 +207,54 @@ class PSLib(Fleet):
raise Exception(
"You should run DistributedOptimizer.minimize() first")
def end_pass(self, scope):
if self._role_maker.worker_index() < self._role_maker.xpu_num():
self._heter_ptr.end_pass(scope, self._role_maker.worker_index())
self._heter_ptr.stop_xpu_service(self._role_maker.worker_index())
def train_from_dataset(self,
executor,
program=None,
dataset=None,
scope=None,
thread=0,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
"""
"""
if self._role_maker.is_worker():
self._role_maker._barrier_heter()
executor.train_from_dataset(program, dataset, scope, thread, debug,
fetch_list, fetch_info, print_period,
fetch_handler)
def start_heter_trainer(self,
executor,
program=None,
scope=None,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
"""
"""
trainer_instance = executor.start_heter_trainer(
program, scope, debug, fetch_list, fetch_info, print_period,
fetch_handler)
if self._role_maker.is_xpu():
print("barrier heter")
self._role_maker._barrier_heter()
print("barrier heter")
executor._default_executor.release_trainer(trainer_instance)
def stop_worker(self):
"""
stop(): will be called after a user finishes his/her training task. Fleet instance will be
......@@ -197,6 +267,7 @@ class PSLib(Fleet):
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.stop_server()
self._heter_ptr.stop_xpu_service()
self._role_maker._barrier_worker()
self._role_maker._barrier_all()
self._role_maker._finalize()
......
......@@ -509,13 +509,15 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info = {}
opt_info["program_id_to_worker"] = prog_id_to_worker
opt_info["program_configs"] = program_configs
opt_info["trainer"] = "DistMultiTrainer"
opt_info["trainer"] = strategy.get("trainer", "DistMultiTrainer")
opt_info["device_worker"] = strategy.get("device_worker", "DownpourSGD")
opt_info["optimizer"] = "DownpourSGD"
opt_info["fleet_desc"] = ps_param
opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False)
opt_info["no_cvm"] = strategy.get("no_cvm", False)
opt_info["worker_class"] = strategy.get("worker_class",
"DownpourWorker")
opt_info["stat_var_names"] = strategy.get("stat_var_names", [])
opt_info["local_tables"] = strategy.get("local_tables", [])
opt_info["async_tables"] = strategy.get("async_tables", [])
......@@ -529,6 +531,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["dump_file_num"] = strategy.get("dump_file_num", 16)
opt_info["dump_fields_path"] = strategy.get("dump_fields_path", "")
opt_info["dump_param"] = strategy.get("dump_param", [])
opt_info["worker_places"] = strategy.get("worker_places", [])
if server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class in [
"DownpourCtrAccessor", "DownpourCtrDoubleAccessor",
......
......@@ -14,6 +14,7 @@
"""Fleet Utils."""
import collections
import copy
import json
import logging
import math
......@@ -1615,3 +1616,123 @@ class FleetUtil(object):
"""
program = utils.load_program(prog_path, is_text)
utils.parse_program(program, output_dir)
def split_program_by_device(self, program):
ops_list = []
type_list = []
pre = None
type_cpu = "cpu"
for op in program.global_block().ops:
if op.has_attr("op_device"):
if pre is None or pre != op.attr("op_device"):
ops_list.append([])
type_list.append(
op.attr("op_device")
if op.attr("op_device") != "" else type_cpu)
ops_list[-1].append(op)
pre = op.attr("op_device")
l = len(type_list)
i = 0
type_heter = None
while i < l:
while i < l and type_list[i] == type_cpu:
i += 1
if i == l:
break
type_heter = type_list[i]
i += 1
start = i
valid = True
while i < l and type_list[i] != type_heter:
if type_list[i] != type_cpu:
valid = False
break
i += 1
if i == l:
break
elif not valid:
continue
for j in range(start, i):
for op in ops_list[j]:
op._set_attr("op_device", type_heter)
type_list[j] = type_heter
j += 1
pre = None
merged_ops_list = []
merged_type_list = []
for i in range(l):
if pre is None or pre != type_list[i]:
merged_ops_list.append([])
merged_type_list.append(type_list[i])
merged_ops_list[-1].extend(ops_list[i])
pre = type_list[i]
data_vars = set()
for k in program.global_block().vars:
var = program.global_block().var(k)
if not var.persistable:
data_vars.add(var.name)
l = len(merged_ops_list)
inputs_pre = set()
outputs_pre = set()
in_from_pre = [[] for i in range(l)]
for i in range(l):
inputs = set()
outputs = set()
for op in merged_ops_list[i]:
for input in op.input_names:
for tmp in op.input(input):
if tmp not in outputs:
inputs.add(tmp)
for output in op.output_names:
for tmp in op.output(output):
outputs.add(tmp)
if i == 0:
in_from_pre[i] = []
elif i == 1:
in_from_pre[i] = (outputs_pre | data_vars) & inputs
else:
in_from_pre[i] = outputs_pre & inputs
inputs_pre = copy.deepcopy(inputs)
outputs_pre = copy.deepcopy(outputs)
l = len(in_from_pre)
start_list = []
end_list = []
send_list = [[] for i in range(l)]
sum = 0
program_list = []
for i in range(l):
start_list.append(sum)
end_list.append(sum + len(merged_ops_list[i]) - 1)
sum += len(merged_ops_list[i])
if i < l - 1:
send_list[i].extend(list(in_from_pre[i + 1]))
prog = program.clone()
if merged_type_list[i] != type_cpu:
prog = prog._prune_with_input(
list(in_from_pre[i]), list(send_list[i]))
program_list.append(prog)
else:
program_list.append(prog)
recv_list = [list(i) for i in in_from_pre]
found = False
heter_index = None
for i in range(len(merged_type_list)):
t = merged_type_list[i]
if t != type_cpu:
if found:
print("only one region of program can be heter")
found = True
heter_index = i
if heter_index is None:
print("warning: non heter program")
return None
else:
return [start_list[heter_index], end_list[heter_index], send_list[heter_index], \
recv_list[heter_index], program_list[heter_index]]
......@@ -20,12 +20,12 @@ from __future__ import print_function
from .layer_function_generator import autodoc
from ..framework import unique_name
from ..layer_helper import LayerHelper
from ..annotations import deprecated
from paddle.utils import deprecated
__all__ = []
@deprecated(since='0.15.0', instead="ParallelExecutor")
@deprecated(since='0.15.0', update_to="paddle.fluid.ParallelExecutor")
@autodoc()
def get_places(device_count=None, device_type=None):
helper = LayerHelper('get_places', **locals())
......
此差异已折叠。
此差异已折叠。
......@@ -210,7 +210,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
filelist.append(train_file_path)
# config dataset
dataset = fluid.DatasetFactory().create_dataset()
dataset = paddle.fleet.DatasetFactory().create_dataset()
dataset.set_batch_size(batch_size)
dataset.set_use_var(self.feeds)
pipe_command = 'python ctr_dataset_reader.py'
......
......@@ -17,6 +17,7 @@ no_check_set_white_list = [
'fake_quantize_range_abs_max',
'coalesce_tensor',
'flatten2',
'flatten_contiguous_range',
'lrn',
'squeeze2',
'reshape2',
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册