From 2a9c993ea75ec3ddb21ae5ede1c733f82b990749 Mon Sep 17 00:00:00 2001 From: yaoxuefeng Date: Thu, 20 Jan 2022 15:30:25 +0800 Subject: [PATCH] mod communicator (#39064) --- .../fluid/distributed/service/communicator.cc | 177 ++++++++++++++++++ .../fluid/distributed/service/communicator.h | 13 ++ .../pscore/distributed_lookup_table_op.h | 21 ++- .../pscore/distributed_push_sparse_op.h | 33 +++- 4 files changed, 225 insertions(+), 19 deletions(-) diff --git a/paddle/fluid/distributed/service/communicator.cc b/paddle/fluid/distributed/service/communicator.cc index eefafafcdf..e2b81ace29 100644 --- a/paddle/fluid/distributed/service/communicator.cc +++ b/paddle/fluid/distributed/service/communicator.cc @@ -30,6 +30,8 @@ namespace distributed { using framework::LoDTensor; using framework::SelectedRows; +const uint32_t MAX_FEASIGN_NUM = 1024 * 100 * 100; + inline double GetCurrentUS() { struct timeval time; gettimeofday(&time, NULL); @@ -576,6 +578,181 @@ void AsyncCommunicator::MainThread() { VLOG(1) << "communicator stopped, send thread exit"; } +void AsyncCommunicator::PullSparseToTensorSync( + const uint64_t table_id, int fea_dim, uint64_t padding_id, + platform::Place place, bool is_training, + std::vector *inputs, std::vector *outputs) { + std::vector fea_keys; + std::vector pull_result_ptr; + fea_keys.reserve(MAX_FEASIGN_NUM / 100); + pull_result_ptr.reserve(MAX_FEASIGN_NUM / 100); + std::vector init_value(fea_dim, 0); + framework::LoDTensor *output = nullptr; + float *output_data = nullptr; + size_t output_index = -1; + size_t output_len = 0; + for (size_t index = 0; index < inputs->size(); ++index) { + const framework::LoDTensor *tensor = inputs->at(index); + const int64_t *ids = tensor->data(); + size_t len = tensor->numel(); + for (size_t i = 0; i < len; ++i, output_len += fea_dim) { + if (!output || output_len == size_t(output->numel())) { + ++output_index; + CHECK(output_index < outputs->size()); // NOLINT + output = outputs->at(output_index); + output->set_lod(tensor->lod()); + output_data = output->mutable_data(place); + output_len = 0; + CHECK(output->numel() % fea_dim == 0); // NOLINT + CHECK(output_data != nullptr); // NOLINT + } + uint64_t real_id = static_cast(ids[i]); + if (real_id == padding_id) { + memcpy(output_data + output_len, init_value.data(), + sizeof(float) * fea_dim); + continue; + } + fea_keys.push_back(real_id); + pull_result_ptr.push_back(output_data + output_len); + } + } + auto status = + _worker_ptr->pull_sparse(pull_result_ptr.data(), table_id, + fea_keys.data(), fea_keys.size(), is_training); + status.wait(); + auto ret = status.get(); + if (ret != 0) { + LOG(ERROR) << "fleet pull sparse failed, status[" << ret << "]"; + sleep(sleep_seconds_before_fail_exit_); + } +} + +void AsyncCommunicator::PushSparseFromTensorAsync( + const uint64_t table_id, int fea_dim, uint64_t padding_id, + platform::Place place, std::vector *inputs, + const framework::LoDTensor *shows, const framework::LoDTensor *clks, + std::vector *outputs) { + int batch_size = -1; + bool batch_size_consist = true; + for (auto *input : *inputs) { + int cur_batch_size = + input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0]; + if (batch_size == -1) { + batch_size = cur_batch_size; + } else { + // CHECK(batch_size == cur_batch_size); // NOLINT + batch_size_consist = false; + break; + } + } + CHECK(batch_size > 0); // NOLINT + + int show_size = + shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0]; + CHECK(show_size == batch_size || show_size == 1); + int clk_size = + clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0]; + CHECK(clk_size == batch_size || clk_size == 1); + + CHECK(outputs->size() == inputs->size()); + std::vector push_keys; + push_keys.reserve(MAX_FEASIGN_NUM / 100); + std::vector> push_values; + push_values.reserve(MAX_FEASIGN_NUM / 100); + size_t output_len = 0; + size_t input_idx = 0; + + VLOG(2) << "fleet.cc::emb_dim: " << fea_dim; + + // TODO(zhaocaibei123): check type of show/clk is int? float? uint64? + // const long int* show_tensor = shows->data(); + // const long int* clk_tensor = clks->data(); + const int64_t *show_tensor = shows->data(); + const int64_t *clk_tensor = clks->data(); + + for (size_t index = 0; index < inputs->size(); ++index) { + framework::LoDTensor *g_tensor = outputs->at(index); + float *g = g_tensor->data(); + // no cvm + if (batch_size_consist) { // TODO(zhaocaibei123): add config + // scale_sparse_gradient_with_batch_size_ + Eigen::Map< + Eigen::Matrix> + g_mat(g, g_tensor->numel() / fea_dim, fea_dim); + g_mat.rightCols(fea_dim) *= batch_size; + } + + const framework::LoDTensor *tensor = inputs->at(index); + const int64_t *ids = tensor->data(); + size_t len = tensor->numel(); + output_len = 0; + + if (tensor->lod().size() > 0) { + for (size_t i = 0; i < tensor->lod()[0].size() - 1; ++i) { + for (int j = tensor->lod()[0][i]; j < tensor->lod()[0][i + 1]; + ++j, output_len += fea_dim) { + uint64_t real_id = static_cast(ids[j]); + if (real_id == padding_id) { + continue; + } + push_keys.emplace_back(real_id); + push_values.emplace_back(fea_dim + 3); + // slot show clk grad... consistent with CtrCommonPushValue defined in + // ctr_accessor.h + push_values.back()[0] = 2; // TODO(zhaocaibei123): slot + push_values.back()[1] = + (i >= show_size ? 1 : static_cast(show_tensor[i])); + push_values.back()[2] = + (i >= clk_size ? 0 : static_cast(clk_tensor[i])); + + float *data = push_values.back().data() + 3; + + memcpy(data, g + output_len, sizeof(float) * fea_dim); + + ++input_idx; + } + } + } else { + for (size_t i = 0; i < len; ++i, output_len += fea_dim) { + uint64_t real_id = static_cast(ids[i]); + if (real_id == padding_id) { + continue; + } + push_keys.emplace_back(real_id); + push_values.emplace_back(fea_dim + 3); + // slot show clk grad... consistent with CtrCommonPushValue defined in + // ctr_accessor.h + push_values.back()[0] = 2; // TODO(zhaocaibei123): slot + push_values.back()[1] = + (i >= show_size ? 1 : static_cast(show_tensor[i])); + push_values.back()[2] = + (i >= clk_size ? 0 : static_cast(clk_tensor[i])); + + float *data = push_values.back().data() + 3; + + memcpy(data, g + output_len, sizeof(float) * fea_dim); + + ++input_idx; + } + } + CHECK(output_len == g_tensor->numel()); + } + + std::vector push_g_vec(input_idx, nullptr); + + for (auto i = 0u; i < push_keys.size(); ++i) { + push_g_vec[i] = push_values.at(i).data(); + } + + PADDLE_ENFORCE_EQ( + this->Check(table_id), true, + platform::errors::InvalidArgument( + "can not find table: %s, please check your config", table_id)); + auto status = _worker_ptr->push_sparse(table_id, push_keys.data(), + (const float **)push_g_vec.data(), + push_keys.size()); +} + void HalfAsyncCommunicator::MainThread() { VLOG(3) << "HalfAsyncCommunicator MainThread start and wait"; diff --git a/paddle/fluid/distributed/service/communicator.h b/paddle/fluid/distributed/service/communicator.h index 9ea44310f3..7056c9aba6 100644 --- a/paddle/fluid/distributed/service/communicator.h +++ b/paddle/fluid/distributed/service/communicator.h @@ -453,6 +453,18 @@ class AsyncCommunicator : public Communicator { void PushDensePostProcessing(); + void PullSparseToTensorSync( + const uint64_t table_id, int fea_dim, uint64_t padding_id, + platform::Place place, bool is_training, + std::vector *inputs, // NOLINT + std::vector *outputs); // NOLINT + + void PushSparseFromTensorAsync( + const uint64_t table_id, int fea_dim, uint64_t padding_id, + platform::Place place, std::vector *inputs, + const framework::LoDTensor *shows, const framework::LoDTensor *clicks, + std::vector *outputs); + protected: std::unordered_map>>> @@ -467,6 +479,7 @@ class AsyncCommunicator : public Communicator { bool need_global_step_ = false; bool independent_recv_ = true; int parallel_task_nums_ = 0; + int32_t sleep_seconds_before_fail_exit_; std::unique_ptr main_thread_{nullptr}; std::unique_ptr recv_thread_{nullptr}; diff --git a/paddle/fluid/operators/pscore/distributed_lookup_table_op.h b/paddle/fluid/operators/pscore/distributed_lookup_table_op.h index 413b4ab358..292db60079 100644 --- a/paddle/fluid/operators/pscore/distributed_lookup_table_op.h +++ b/paddle/fluid/operators/pscore/distributed_lookup_table_op.h @@ -14,6 +14,7 @@ #include #include #include "paddle/fluid/distributed/fleet.h" +#include "paddle/fluid/distributed/service/communicator.h" #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/tensor_util.h" @@ -51,13 +52,15 @@ class DistributedLookupTableKernel : public framework::OpKernel { auto inputs = context.MultiInput("Ids"); auto outputs = context.MultiOutput("Outputs"); - auto fleet = distributed::FleetWrapper::GetInstance(); + // auto fleet = distributed::FleetWrapper::GetInstance(); + auto *communicator = (distributed::AsyncCommunicator *) + distributed::Communicator::GetInstance(); if (platform::is_cpu_place(context.GetPlace())) { - fleet->PullSparseToTensorSync(static_cast(table_id), emb_dim, - static_cast(padding_idx), - context.GetPlace(), !is_test, &inputs, - &outputs); + communicator->PullSparseToTensorSync( + static_cast(table_id), emb_dim, + static_cast(padding_idx), context.GetPlace(), !is_test, + &inputs, &outputs); } else { auto inputs_variable = context.MultiInputVar("Ids"); auto outputs_variable = context.MultiOutputVar("Outputs"); @@ -93,10 +96,10 @@ class DistributedLookupTableKernel : public framework::OpKernel { } // use fleet->PullSparse - fleet->PullSparseToTensorSync(static_cast(table_id), emb_dim, - static_cast(padding_idx), - cpu_place, !is_test, &tmp_input_vec, - &tmp_output_vec); + communicator->PullSparseToTensorSync( + static_cast(table_id), emb_dim, + static_cast(padding_idx), cpu_place, !is_test, + &tmp_input_vec, &tmp_output_vec); // cp temp to origin for (size_t idx = 0; idx < output_var_size; ++idx) { diff --git a/paddle/fluid/operators/pscore/distributed_push_sparse_op.h b/paddle/fluid/operators/pscore/distributed_push_sparse_op.h index 1e27411ad6..a232d52dec 100644 --- a/paddle/fluid/operators/pscore/distributed_push_sparse_op.h +++ b/paddle/fluid/operators/pscore/distributed_push_sparse_op.h @@ -14,6 +14,7 @@ #include #include #include "paddle/fluid/distributed/fleet.h" +#include "paddle/fluid/distributed/service/communicator.h" #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/tensor_util.h" @@ -32,20 +33,21 @@ class DistributedPushSparseKernel : public framework::OpKernel { auto table_id = context.Attr("table_id"); auto emb_dim = context.Attr("size"); VLOG(1) << "push_sparse.h::emb_dim: " << emb_dim; - bool is_test = context.Attr("is_test"); auto inputs = context.MultiInput("Ids"); auto shows = context.Input("Shows"); auto clks = context.Input("Clicks"); auto outputs = context.MultiOutput("Outputs"); - auto fleet = distributed::FleetWrapper::GetInstance(); + // auto fleet = distributed::FleetWrapper::GetInstance(); + auto *communicator = (distributed::AsyncCommunicator *) + distributed::Communicator::GetInstance(); if (platform::is_cpu_place(context.GetPlace())) { - fleet->PushSparseFromTensorAsync(static_cast(table_id), emb_dim, - static_cast(padding_idx), - context.GetPlace(), &inputs, shows, clks, - &outputs); + communicator->PushSparseFromTensorAsync( + static_cast(table_id), emb_dim, + static_cast(padding_idx), context.GetPlace(), &inputs, + shows, clks, &outputs); } else { auto inputs_variable = context.MultiInputVar("Ids"); auto outputs_variable = context.MultiOutputVar("Outputs"); @@ -71,6 +73,17 @@ class DistributedPushSparseKernel : public framework::OpKernel { tmp_input_vec.push_back(tmp_input_tensor); } + framework::Variable *tmp_shows_var = tmp_scope->Var("Shows"); + framework::LoDTensor *tmp_shows_tensor = + tmp_shows_var->GetMutable(); + framework::Variable *tmp_clicks_var = tmp_scope->Var("Clicks"); + framework::LoDTensor *tmp_clicks_tensor = + tmp_clicks_var->GetMutable(); + framework::TensorCopy(*shows, cpu_place, context.device_context(), + tmp_shows_tensor); + framework::TensorCopy(*clks, cpu_place, context.device_context(), + tmp_clicks_tensor); + // create temp output for (size_t idx = 0; idx < output_var_size; ++idx) { framework::Variable *tmp_output_var = tmp_scope->Var(outputs_name[idx]); @@ -81,10 +94,10 @@ class DistributedPushSparseKernel : public framework::OpKernel { } // use fleet->PullSparse - fleet->PullSparseToTensorSync(static_cast(table_id), emb_dim, - static_cast(padding_idx), - cpu_place, !is_test, &tmp_input_vec, - &tmp_output_vec); + communicator->PushSparseFromTensorAsync( + static_cast(table_id), emb_dim, + static_cast(padding_idx), context.GetPlace(), + &tmp_input_vec, tmp_shows_tensor, tmp_clicks_tensor, &tmp_output_vec); // cp temp to origin for (size_t idx = 0; idx < output_var_size; ++idx) { -- GitLab