未验证 提交 2a9c993e 编写于 作者: Y yaoxuefeng 提交者: GitHub

mod communicator (#39064)

上级 6b0c57cf
......@@ -30,6 +30,8 @@ namespace distributed {
using framework::LoDTensor;
using framework::SelectedRows;
const uint32_t MAX_FEASIGN_NUM = 1024 * 100 * 100;
inline double GetCurrentUS() {
struct timeval time;
gettimeofday(&time, NULL);
......@@ -576,6 +578,181 @@ void AsyncCommunicator::MainThread() {
VLOG(1) << "communicator stopped, send thread exit";
}
void AsyncCommunicator::PullSparseToTensorSync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, bool is_training,
std::vector<const LoDTensor *> *inputs, std::vector<LoDTensor *> *outputs) {
std::vector<uint64_t> fea_keys;
std::vector<float *> pull_result_ptr;
fea_keys.reserve(MAX_FEASIGN_NUM / 100);
pull_result_ptr.reserve(MAX_FEASIGN_NUM / 100);
std::vector<float> init_value(fea_dim, 0);
framework::LoDTensor *output = nullptr;
float *output_data = nullptr;
size_t output_index = -1;
size_t output_len = 0;
for (size_t index = 0; index < inputs->size(); ++index) {
const framework::LoDTensor *tensor = inputs->at(index);
const int64_t *ids = tensor->data<int64_t>();
size_t len = tensor->numel();
for (size_t i = 0; i < len; ++i, output_len += fea_dim) {
if (!output || output_len == size_t(output->numel())) {
++output_index;
CHECK(output_index < outputs->size()); // NOLINT
output = outputs->at(output_index);
output->set_lod(tensor->lod());
output_data = output->mutable_data<float>(place);
output_len = 0;
CHECK(output->numel() % fea_dim == 0); // NOLINT
CHECK(output_data != nullptr); // NOLINT
}
uint64_t real_id = static_cast<uint64_t>(ids[i]);
if (real_id == padding_id) {
memcpy(output_data + output_len, init_value.data(),
sizeof(float) * fea_dim);
continue;
}
fea_keys.push_back(real_id);
pull_result_ptr.push_back(output_data + output_len);
}
}
auto status =
_worker_ptr->pull_sparse(pull_result_ptr.data(), table_id,
fea_keys.data(), fea_keys.size(), is_training);
status.wait();
auto ret = status.get();
if (ret != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << ret << "]";
sleep(sleep_seconds_before_fail_exit_);
}
}
void AsyncCommunicator::PushSparseFromTensorAsync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, std::vector<const framework::LoDTensor *> *inputs,
const framework::LoDTensor *shows, const framework::LoDTensor *clks,
std::vector<framework::LoDTensor *> *outputs) {
int batch_size = -1;
bool batch_size_consist = true;
for (auto *input : *inputs) {
int cur_batch_size =
input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0];
if (batch_size == -1) {
batch_size = cur_batch_size;
} else {
// CHECK(batch_size == cur_batch_size); // NOLINT
batch_size_consist = false;
break;
}
}
CHECK(batch_size > 0); // NOLINT
int show_size =
shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0];
CHECK(show_size == batch_size || show_size == 1);
int clk_size =
clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
CHECK(clk_size == batch_size || clk_size == 1);
CHECK(outputs->size() == inputs->size());
std::vector<uint64_t> push_keys;
push_keys.reserve(MAX_FEASIGN_NUM / 100);
std::vector<std::vector<float>> push_values;
push_values.reserve(MAX_FEASIGN_NUM / 100);
size_t output_len = 0;
size_t input_idx = 0;
VLOG(2) << "fleet.cc::emb_dim: " << fea_dim;
// TODO(zhaocaibei123): check type of show/clk is int? float? uint64?
// const long int* show_tensor = shows->data<int64_t>();
// const long int* clk_tensor = clks->data<int64_t>();
const int64_t *show_tensor = shows->data<int64_t>();
const int64_t *clk_tensor = clks->data<int64_t>();
for (size_t index = 0; index < inputs->size(); ++index) {
framework::LoDTensor *g_tensor = outputs->at(index);
float *g = g_tensor->data<float>();
// no cvm
if (batch_size_consist) { // TODO(zhaocaibei123): add config
// scale_sparse_gradient_with_batch_size_
Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
g_mat(g, g_tensor->numel() / fea_dim, fea_dim);
g_mat.rightCols(fea_dim) *= batch_size;
}
const framework::LoDTensor *tensor = inputs->at(index);
const int64_t *ids = tensor->data<int64_t>();
size_t len = tensor->numel();
output_len = 0;
if (tensor->lod().size() > 0) {
for (size_t i = 0; i < tensor->lod()[0].size() - 1; ++i) {
for (int j = tensor->lod()[0][i]; j < tensor->lod()[0][i + 1];
++j, output_len += fea_dim) {
uint64_t real_id = static_cast<uint64_t>(ids[j]);
if (real_id == padding_id) {
continue;
}
push_keys.emplace_back(real_id);
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));
float *data = push_values.back().data() + 3;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
++input_idx;
}
}
} else {
for (size_t i = 0; i < len; ++i, output_len += fea_dim) {
uint64_t real_id = static_cast<uint64_t>(ids[i]);
if (real_id == padding_id) {
continue;
}
push_keys.emplace_back(real_id);
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));
float *data = push_values.back().data() + 3;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
++input_idx;
}
}
CHECK(output_len == g_tensor->numel());
}
std::vector<float *> push_g_vec(input_idx, nullptr);
for (auto i = 0u; i < push_keys.size(); ++i) {
push_g_vec[i] = push_values.at(i).data();
}
PADDLE_ENFORCE_EQ(
this->Check(table_id), true,
platform::errors::InvalidArgument(
"can not find table: %s, please check your config", table_id));
auto status = _worker_ptr->push_sparse(table_id, push_keys.data(),
(const float **)push_g_vec.data(),
push_keys.size());
}
void HalfAsyncCommunicator::MainThread() {
VLOG(3) << "HalfAsyncCommunicator MainThread start and wait";
......
......@@ -453,6 +453,18 @@ class AsyncCommunicator : public Communicator {
void PushDensePostProcessing();
void PullSparseToTensorSync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, bool is_training,
std::vector<const framework::LoDTensor *> *inputs, // NOLINT
std::vector<framework::LoDTensor *> *outputs); // NOLINT
void PushSparseFromTensorAsync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, std::vector<const framework::LoDTensor *> *inputs,
const framework::LoDTensor *shows, const framework::LoDTensor *clicks,
std::vector<framework::LoDTensor *> *outputs);
protected:
std::unordered_map<std::string,
std::shared_ptr<BlockingQueue<std::shared_ptr<Variable>>>>
......@@ -467,6 +479,7 @@ class AsyncCommunicator : public Communicator {
bool need_global_step_ = false;
bool independent_recv_ = true;
int parallel_task_nums_ = 0;
int32_t sleep_seconds_before_fail_exit_;
std::unique_ptr<std::thread> main_thread_{nullptr};
std::unique_ptr<std::thread> recv_thread_{nullptr};
......
......@@ -14,6 +14,7 @@
#include <string>
#include <vector>
#include "paddle/fluid/distributed/fleet.h"
#include "paddle/fluid/distributed/service/communicator.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/tensor_util.h"
......@@ -51,13 +52,15 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> {
auto inputs = context.MultiInput<framework::LoDTensor>("Ids");
auto outputs = context.MultiOutput<framework::LoDTensor>("Outputs");
auto fleet = distributed::FleetWrapper::GetInstance();
// auto fleet = distributed::FleetWrapper::GetInstance();
auto *communicator = (distributed::AsyncCommunicator *)
distributed::Communicator::GetInstance();
if (platform::is_cpu_place(context.GetPlace())) {
fleet->PullSparseToTensorSync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
context.GetPlace(), !is_test, &inputs,
&outputs);
communicator->PullSparseToTensorSync(
static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx), context.GetPlace(), !is_test,
&inputs, &outputs);
} else {
auto inputs_variable = context.MultiInputVar("Ids");
auto outputs_variable = context.MultiOutputVar("Outputs");
......@@ -93,10 +96,10 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> {
}
// use fleet->PullSparse
fleet->PullSparseToTensorSync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
cpu_place, !is_test, &tmp_input_vec,
&tmp_output_vec);
communicator->PullSparseToTensorSync(
static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx), cpu_place, !is_test,
&tmp_input_vec, &tmp_output_vec);
// cp temp to origin
for (size_t idx = 0; idx < output_var_size; ++idx) {
......
......@@ -14,6 +14,7 @@
#include <string>
#include <vector>
#include "paddle/fluid/distributed/fleet.h"
#include "paddle/fluid/distributed/service/communicator.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/tensor_util.h"
......@@ -32,20 +33,21 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
auto table_id = context.Attr<int>("table_id");
auto emb_dim = context.Attr<int>("size");
VLOG(1) << "push_sparse.h::emb_dim: " << emb_dim;
bool is_test = context.Attr<bool>("is_test");
auto inputs = context.MultiInput<framework::LoDTensor>("Ids");
auto shows = context.Input<framework::LoDTensor>("Shows");
auto clks = context.Input<framework::LoDTensor>("Clicks");
auto outputs = context.MultiOutput<framework::LoDTensor>("Outputs");
auto fleet = distributed::FleetWrapper::GetInstance();
// auto fleet = distributed::FleetWrapper::GetInstance();
auto *communicator = (distributed::AsyncCommunicator *)
distributed::Communicator::GetInstance();
if (platform::is_cpu_place(context.GetPlace())) {
fleet->PushSparseFromTensorAsync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
context.GetPlace(), &inputs, shows, clks,
&outputs);
communicator->PushSparseFromTensorAsync(
static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx), context.GetPlace(), &inputs,
shows, clks, &outputs);
} else {
auto inputs_variable = context.MultiInputVar("Ids");
auto outputs_variable = context.MultiOutputVar("Outputs");
......@@ -71,6 +73,17 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
tmp_input_vec.push_back(tmp_input_tensor);
}
framework::Variable *tmp_shows_var = tmp_scope->Var("Shows");
framework::LoDTensor *tmp_shows_tensor =
tmp_shows_var->GetMutable<framework::LoDTensor>();
framework::Variable *tmp_clicks_var = tmp_scope->Var("Clicks");
framework::LoDTensor *tmp_clicks_tensor =
tmp_clicks_var->GetMutable<framework::LoDTensor>();
framework::TensorCopy(*shows, cpu_place, context.device_context(),
tmp_shows_tensor);
framework::TensorCopy(*clks, cpu_place, context.device_context(),
tmp_clicks_tensor);
// create temp output
for (size_t idx = 0; idx < output_var_size; ++idx) {
framework::Variable *tmp_output_var = tmp_scope->Var(outputs_name[idx]);
......@@ -81,10 +94,10 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
}
// use fleet->PullSparse
fleet->PullSparseToTensorSync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
cpu_place, !is_test, &tmp_input_vec,
&tmp_output_vec);
communicator->PushSparseFromTensorAsync(
static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx), context.GetPlace(),
&tmp_input_vec, tmp_shows_tensor, tmp_clicks_tensor, &tmp_output_vec);
// cp temp to origin
for (size_t idx = 0; idx < output_var_size; ++idx) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册