diff --git a/paddle/fluid/distributed/fleet.cc b/paddle/fluid/distributed/fleet.cc index b638af49730dd4800109729c9d91afa82efa80e4..9aafdd769ed4a0ae84fa84ed69d7b225e68b078b 100644 --- a/paddle/fluid/distributed/fleet.cc +++ b/paddle/fluid/distributed/fleet.cc @@ -146,41 +146,6 @@ void FleetWrapper::CreateClient2ClientConnection() { client2client_max_retry_); } -std::future FleetWrapper::PullSparseVarsAsync( - const Scope& scope, const uint64_t table_id, - const std::vector& var_names, std::vector* fea_keys, - std::vector>* fea_values, int fea_value_dim) { - fea_keys->clear(); - fea_keys->resize(0); - fea_keys->reserve(MAX_FEASIGN_NUM); - for (auto name : var_names) { - Variable* var = scope.FindVar(name); - if (var == nullptr) { - continue; - } - LoDTensor* tensor = var->GetMutable(); - CHECK(tensor != nullptr) << "tensor of var " << name << " is null"; - int64_t* ids = tensor->data(); - size_t len = tensor->numel(); - for (auto i = 0u; i < len; ++i) { - if (ids[i] == 0u) { - continue; - } - fea_keys->push_back(static_cast(ids[i])); - } - } - fea_values->resize(fea_keys->size() + 1); - for (auto& t : *fea_values) { - t.resize(fea_value_dim); - } - std::vector pull_result_ptr; - for (auto& t : *fea_values) { - pull_result_ptr.push_back(t.data()); - } - return pserver_ptr_->_worker_ptr->pull_sparse( - pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size()); -} - void FleetWrapper::PullSparseVarsSync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, std::vector* fea_keys, @@ -224,8 +189,10 @@ void FleetWrapper::PullSparseVarsSync( for (auto& t : *fea_values) { pull_result_ptr.push_back(t.data()); } + bool training = true; auto status = pserver_ptr_->_worker_ptr->pull_sparse( - pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size()); + pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size(), + training); pull_sparse_status.push_back(std::move(status)); for (auto& t : pull_sparse_status) { t.wait(); @@ -238,9 +205,13 @@ void FleetWrapper::PullSparseVarsSync( } } +// is_training is true means training, false means inference, the behavior is +// different on pserver + void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim, uint64_t padding_id, platform::Place place, + bool is_training, std::vector* inputs, std::vector* outputs) { std::vector fea_keys; @@ -279,7 +250,8 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim, } auto* communicator = Communicator::GetInstance(); auto status = communicator->_worker_ptr->pull_sparse( - pull_result_ptr.data(), table_id, fea_keys.data(), fea_keys.size()); + pull_result_ptr.data(), table_id, fea_keys.data(), fea_keys.size(), + is_training); status.wait(); auto ret = status.get(); if (ret != 0) { diff --git a/paddle/fluid/distributed/fleet.h b/paddle/fluid/distributed/fleet.h index ac566606ddcb4024eeaf7b846c894f7f5cdafa82..863440180a808dcc005d39bbb7580368536b4510 100644 --- a/paddle/fluid/distributed/fleet.h +++ b/paddle/fluid/distributed/fleet.h @@ -84,19 +84,14 @@ class FleetWrapper { int fea_dim, const std::vector& var_emb_names); - // Pull sparse variables from server in async mode - // Param: scope, table_id, var_names, fea_keys, fea_dim - // Param: fea_values std::future - std::future PullSparseVarsAsync( - const Scope& scope, const uint64_t table_id, - const std::vector& var_names, - std::vector* fea_keys, - std::vector>* fea_values, int fea_dim); - // Pull sparse variables from server in sync mode // pull immediately to tensors + // is_training is true means training, false means inference, the behavior is + // different on pserver + void PullSparseToTensorSync(const uint64_t table_id, int fea_dim, uint64_t padding_id, platform::Place place, + bool is_training, std::vector* inputs, // NOLINT std::vector* outputs); // NOLINT diff --git a/paddle/fluid/distributed/service/brpc_ps_client.cc b/paddle/fluid/distributed/service/brpc_ps_client.cc index 5c226e6a0ddd05fdbc855f091db1f84fcfb3324d..b49a71ab0c13adb725b2dad4620554a15b98d5b5 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.cc +++ b/paddle/fluid/distributed/service/brpc_ps_client.cc @@ -768,8 +768,8 @@ std::future BrpcPsClient::push_global_step(int table_id, std::future BrpcPsClient::pull_sparse(float **select_values, size_t table_id, - const uint64_t *keys, - size_t num) { + const uint64_t *keys, size_t num, + bool is_training) { size_t request_call_num = _server_channels.size(); auto shard_sorted_kvs = std::make_shared< @@ -837,16 +837,27 @@ std::future BrpcPsClient::pull_sparse(float **select_values, uint32_t kv_request_count = 0; size_t sorted_kv_size = sorted_kvs.size(); auto &request_buffer = closure->cntl(i)->request_attachment(); + + request_buffer.append((void *)&is_training, sizeof(bool)); + std::vector keys_counter; + keys_counter.reserve(sorted_kv_size); + for (size_t kv_idx = 0; kv_idx < sorted_kv_size; ++kv_idx) { ++kv_request_count; + uint32_t keys = 1; last_key = sorted_kvs[kv_idx].first; request_buffer.append((void *)&last_key, sizeof(uint64_t)); while (kv_idx < sorted_kv_size - 1 && last_key == sorted_kvs[kv_idx + 1].first) { ++kv_idx; + ++keys; } + keys_counter.push_back(keys); } + request_buffer.append((void *)keys_counter.data(), + sizeof(uint32_t) * keys_counter.size()); + if (kv_request_count == 0) { closure->Run(); } else { @@ -956,7 +967,7 @@ int32_t BrpcPsClient::recv_and_save_table(const uint64_t table_id, } auto status = pull_sparse((float **)save_vec.data(), table_id, - save_key.data(), save_key.size()); + save_key.data(), save_key.size(), true); status.wait(); // create lod tensor diff --git a/paddle/fluid/distributed/service/brpc_ps_client.h b/paddle/fluid/distributed/service/brpc_ps_client.h index 84a31fdbd5df8e1e424fc5225fdc11cb86ba94ea..5192356e4b5e574de385478c57a7b7cedb49988a 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.h +++ b/paddle/fluid/distributed/service/brpc_ps_client.h @@ -148,7 +148,8 @@ class BrpcPsClient : public PSClient { virtual std::future pull_sparse(float **select_values, size_t table_id, - const uint64_t *keys, size_t num); + const uint64_t *keys, size_t num, + bool is_training); virtual std::future print_table_stat(uint32_t table_id); diff --git a/paddle/fluid/distributed/service/brpc_ps_server.cc b/paddle/fluid/distributed/service/brpc_ps_server.cc index d7ff0ecd95a3bdfd0191e7a6d65822c425e6bb3c..a9370561a540bea3416508b45d8cbf8cb997ed33 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.cc +++ b/paddle/fluid/distributed/service/brpc_ps_server.cc @@ -14,6 +14,7 @@ #include "paddle/fluid/distributed/service/brpc_ps_server.h" #include // NOLINT +#include "paddle/fluid/distributed/table/depends/sparse_utils.h" #include "paddle/fluid/distributed/table/table.h" #include "paddle/fluid/framework/archive.h" #include "paddle/fluid/platform/profiler.h" @@ -337,33 +338,39 @@ int32_t BrpcPsService::pull_sparse(Table *table, brpc::Controller *cntl) { platform::RecordEvent record_event("PsService->pull_sparse"); CHECK_TABLE_EXIST(table, request, response) - thread_local std::string push_sparse_request_buffer; + auto &req_io_buffer = cntl->request_attachment(); auto req_buffer_size = req_io_buffer.size(); + if (req_buffer_size < 1) { set_response_code(response, -1, "req attachment is empty"); return 0; } + if (request.params_size() < 1) { set_response_code(response, -1, "PsRequestMessage.params is requeired at " "least 1 for num of sparse_key"); return 0; } + uint32_t num = *(uint32_t *)(request.params(0).c_str()); - push_sparse_request_buffer.resize(0); - push_sparse_request_buffer.reserve(req_buffer_size); - const char *data = (const char *)cntl->request_attachment().fetch( - const_cast(push_sparse_request_buffer.data()), req_buffer_size); - /* - Attachment Content: - |---keysData---| - |---8*{num}B---| - */ - const uint64_t *keys = (const uint64_t *)data; + auto dim = table->value_accesor()->select_dim(); + + thread_local std::string req_buffer; + req_buffer.reserve(req_buffer_size); + + const void *data = cntl->request_attachment().fetch( + const_cast(req_buffer.data()), req_buffer_size); + + auto value = PullSparseValue(num, dim); + + value.DeserializeFromBytes(const_cast(data)); + std::vector res_data; - res_data.resize(num * table->value_accesor()->select_size() / sizeof(float)); - table->pull_sparse(res_data.data(), keys, num); + res_data.resize(num * dim); + table->pull_sparse(res_data.data(), value); + cntl->response_attachment().append((char *)res_data.data(), res_data.size() * sizeof(float)); return 0; diff --git a/paddle/fluid/distributed/service/communicator.cc b/paddle/fluid/distributed/service/communicator.cc index 8699719e5cdcc8f40cf26fc90c17ad52849804d3..3d5ab8e16d90202d2365c14f764f5e0f53929b68 100644 --- a/paddle/fluid/distributed/service/communicator.cc +++ b/paddle/fluid/distributed/service/communicator.cc @@ -320,9 +320,11 @@ void Communicator::RpcRecvSparse(const std::string &varname, int table_id, push_g_vec.push_back(tensor->data() + i * dim); } + bool training = true; + auto status = _worker_ptr->pull_sparse( (float **)push_g_vec.data(), table_id, // NOLINT - sparse_push_keys.data(), sparse_push_keys.size()); + sparse_push_keys.data(), sparse_push_keys.size(), training); status.wait(); return; } diff --git a/paddle/fluid/distributed/service/ps_client.h b/paddle/fluid/distributed/service/ps_client.h index 7b698afa72645d9af4f20c82e361e4d0c90c33a5..3ff4b9d063f33a35418d6393edb010923caae838 100644 --- a/paddle/fluid/distributed/service/ps_client.h +++ b/paddle/fluid/distributed/service/ps_client.h @@ -112,10 +112,11 @@ class PSClient { // future结束前keys和values缓冲区不能再次使用 // 整合多个线程请求的keys,聚集并分散发送到server // 返回结果后,遍历buffer并对values赋值 + // is_training 用于区分请求是训练/预测,server端对于特征和准入会有不同的处理. virtual std::future pull_sparse(float **select_values, size_t table_id, - const uint64_t *keys, - size_t num) = 0; + const uint64_t *keys, size_t num, + bool is_training) = 0; virtual std::future print_table_stat(uint32_t table_id) = 0; diff --git a/paddle/fluid/distributed/table/common_graph_table.h b/paddle/fluid/distributed/table/common_graph_table.h index de3cac134cd51a095a39ea8a1cd0a8d662677615..ab28961846297457187b92346f304f68c6bd514c 100644 --- a/paddle/fluid/distributed/table/common_graph_table.h +++ b/paddle/fluid/distributed/table/common_graph_table.h @@ -103,13 +103,16 @@ class GraphTable : public SparseTable { Node *find_node(uint64_t id); - virtual int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) { + virtual int32_t pull_sparse(float *values, + const PullSparseValue &pull_value) { return 0; } + virtual int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) { return 0; } + virtual void clear() {} virtual int32_t flush() { return 0; } virtual int32_t shrink(const std::string ¶m) { return 0; } @@ -140,5 +143,5 @@ class GraphTable : public SparseTable { std::vector> _shards_task_pool; }; -} -}; +} // namespace distributed +}; // namespace paddle diff --git a/paddle/fluid/distributed/table/common_sparse_table.cc b/paddle/fluid/distributed/table/common_sparse_table.cc index ffedbea14a0290730b9d785464a84e3c4536a9e7..a25a90aa9a7c1fe323a9a2d71bcac57cf6a89f84 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.cc +++ b/paddle/fluid/distributed/table/common_sparse_table.cc @@ -254,7 +254,6 @@ int32_t CommonSparseTable::initialize_value() { } auto accessor = _config.accessor(); - std::vector feasigns; for (size_t x = 0; x < accessor.fea_dim(); ++x) { @@ -271,9 +270,14 @@ int32_t CommonSparseTable::initialize_value() { std::vector ids(bucket_feasigns); std::copy(feasigns.begin() + buckets[x], feasigns.begin() + buckets[x + 1], ids.begin()); + + std::vector fres; + fres.resize(ids.size(), 1); + + auto pull_value = PullSparseValue(ids, fres, param_dim_); std::vector pulls; pulls.resize(bucket_feasigns * param_dim_); - pull_sparse(pulls.data(), ids.data(), bucket_feasigns); + pull_sparse(pulls.data(), pull_value); } return 0; @@ -399,32 +403,36 @@ int32_t CommonSparseTable::pour() { return 0; } -int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys, - size_t num) { +int32_t CommonSparseTable::pull_sparse(float* pull_values, + const PullSparseValue& pull_value) { rwlock_->RDLock(); - std::vector> offset_bucket; - offset_bucket.resize(task_pool_size_); - - for (int x = 0; x < num; ++x) { - auto y = keys[x] % task_pool_size_; - offset_bucket[y].push_back(x); - } - - std::vector> tasks(task_pool_size_); + auto shard_num = task_pool_size_; + std::vector> tasks(shard_num); - for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { + for (int shard_id = 0; shard_id < shard_num; ++shard_id) { tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( - [this, shard_id, &keys, &offset_bucket, &pull_values]() -> int { + [this, shard_id, shard_num, &pull_value, &pull_values]() -> int { auto& block = shard_values_[shard_id]; - auto& offsets = offset_bucket[shard_id]; - for (int i = 0; i < offsets.size(); ++i) { - auto offset = offsets[i]; - auto id = keys[offset]; - auto* value = block->Init(id); - std::copy_n(value + param_offset_, param_dim_, - pull_values + param_dim_ * offset); + std::vector offsets; + pull_value.Fission(shard_id, shard_num, &offsets); + + if (pull_value.is_training_) { + for (auto& offset : offsets) { + auto feasign = pull_value.feasigns_[offset]; + auto frequencie = pull_value.frequencies_[offset]; + auto* value = block->Init(feasign, true, frequencie); + std::copy_n(value + param_offset_, param_dim_, + pull_values + param_dim_ * offset); + } + } else { + for (auto& offset : offsets) { + auto feasign = pull_value.feasigns_[offset]; + auto* value = block->Init(feasign, false); + std::copy_n(value + param_offset_, param_dim_, + pull_values + param_dim_ * offset); + } } return 0; diff --git a/paddle/fluid/distributed/table/common_sparse_table.h b/paddle/fluid/distributed/table/common_sparse_table.h index 98cbf2b4a21057f64a5d510158907df1de393925..31f4dabcdfdd74c9a5837763696d8524dcca7766 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.h +++ b/paddle/fluid/distributed/table/common_sparse_table.h @@ -61,8 +61,7 @@ class CommonSparseTable : public SparseTable { int32_t save(const std::string& path, const std::string& param); virtual std::pair print_table_stat(); - virtual int32_t pull_sparse(float* pull_values, const uint64_t* keys, - size_t num); + virtual int32_t pull_sparse(float* values, const PullSparseValue& pull_value); virtual int32_t push_sparse(const uint64_t* keys, const float* values, size_t num); diff --git a/paddle/fluid/distributed/table/common_table.h b/paddle/fluid/distributed/table/common_table.h index dc3cfa75ff689863773e88ef2d077b80c1f0a5d5..bc7f17f5f245794cebf96a8a4bc69e0dce8ac997 100644 --- a/paddle/fluid/distributed/table/common_table.h +++ b/paddle/fluid/distributed/table/common_table.h @@ -98,8 +98,8 @@ class DenseTable : public Table { virtual ~DenseTable() {} virtual void *get_shard(size_t shard_idx) { return 0; } - int32_t pull_sparse(float *values, const uint64_t *keys, - size_t num) override { + int32_t pull_sparse(float *values, + const PullSparseValue &pull_value) override { return 0; } int32_t push_sparse(const uint64_t *keys, const float *values, @@ -123,8 +123,8 @@ class BarrierTable : public Table { int32_t push_dense(const float *values, size_t num) override { return 0; } - int32_t pull_sparse(float *values, const uint64_t *keys, - size_t num) override { + int32_t pull_sparse(float *values, + const PullSparseValue &pull_value) override { return 0; } int32_t push_sparse(const uint64_t *keys, const float *values, diff --git a/paddle/fluid/distributed/table/depends/large_scale_kv.h b/paddle/fluid/distributed/table/depends/large_scale_kv.h index ba79a381a6d881fdc153ad0e04e0ee436120b179..cb077033cad42d2dd0dd1e34c206b39c3a421430 100644 --- a/paddle/fluid/distributed/table/depends/large_scale_kv.h +++ b/paddle/fluid/distributed/table/depends/large_scale_kv.h @@ -155,7 +155,8 @@ class ValueBlock { } // pull - float *Init(const uint64_t &id, const bool with_update = true) { + float *Init(const uint64_t &id, const bool with_update = true, + const int counter = 1) { if (!Has(id)) { values_[id] = std::make_shared(value_length_); } @@ -163,16 +164,16 @@ class ValueBlock { auto &value = values_.at(id); if (with_update) { - AttrUpdate(value); + AttrUpdate(value, counter); } return value->data_.data(); } - void AttrUpdate(std::shared_ptr value) { + void AttrUpdate(std::shared_ptr value, const int counter) { // update state value->unseen_days_ = 0; - ++value->count_; + value->count_ += counter; if (!value->is_entry_) { value->is_entry_ = entry_func_(value); diff --git a/paddle/fluid/distributed/table/depends/sparse_utils.h b/paddle/fluid/distributed/table/depends/sparse_utils.h new file mode 100644 index 0000000000000000000000000000000000000000..c185dd17d792e4715ae884e66c412aa5f24f809f --- /dev/null +++ b/paddle/fluid/distributed/table/depends/sparse_utils.h @@ -0,0 +1,74 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace paddle { +namespace distributed { + +struct PullSparseValue { + explicit PullSparseValue(int numel, int dim) + : numel_(numel), + dim_(dim), + is_training_(true), + feasigns_(nullptr), + frequencies_(nullptr) {} + + explicit PullSparseValue(std::vector feasigns, + std::vector frequencies, int dim) { + numel_ = feasigns.size(); + dim_ = dim; + is_training_ = true; + feasigns_ = feasigns.data(); + frequencies_ = frequencies.data(); + } + + void DeserializeFromBytes(void* bytes) { + /* + |---isTraining--------------| + |---8*{num}B(keysData)------| + |---4*{num}B(Frequencies)---| + */ + auto* begin = reinterpret_cast(bytes); + is_training_ = reinterpret_cast(begin)[0]; + feasigns_ = reinterpret_cast(begin + sizeof(bool)); + frequencies_ = reinterpret_cast(begin + sizeof(bool) + + sizeof(uint64_t) * numel_); + } + + void Fission(const int shard_id, const int shard_num, + std::vector* offset_shard) const { + offset_shard->reserve(numel_ / shard_num + 1); + for (int x = 0; x < numel_; ++x) { + if (feasigns_[x] % shard_num == shard_id) { + offset_shard->push_back(x); + } + } + } + + int numel_; + int dim_; + bool is_training_; + uint64_t* feasigns_; + uint32_t* frequencies_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/sparse_geo_table.cc b/paddle/fluid/distributed/table/sparse_geo_table.cc index 9b276e7de5c92d495f9d40535033b0a82186bc82..04cd1136382a4e24eb0a6d196ec01ad68ed56309 100644 --- a/paddle/fluid/distributed/table/sparse_geo_table.cc +++ b/paddle/fluid/distributed/table/sparse_geo_table.cc @@ -22,8 +22,17 @@ int32_t SparseGeoTable::pull_geo_param(const uint32_t trainer_id, std::vector* ids) { geo_recorder->GetAndClear(trainer_id, ids); auto dim = _config.common().dims()[0]; + + std::vector frequencies; + frequencies.resize(ids->size(), 1); + + auto pull_value = PullSparseValue(ids->size(), dim); + pull_value.is_training_ = true; + pull_value.feasigns_ = ids->data(); + pull_value.frequencies_ = frequencies.data(); + values->resize(ids->size() * dim); - CommonSparseTable::pull_sparse(values->data(), ids->data(), ids->size()); + CommonSparseTable::pull_sparse(values->data(), pull_value); return 0; } diff --git a/paddle/fluid/distributed/table/table.h b/paddle/fluid/distributed/table/table.h index d64e805af40e68f74e602e638ef2aa3540198e79..8f014ac98ba4bbcfe0b90d774733406b68d94ee6 100644 --- a/paddle/fluid/distributed/table/table.h +++ b/paddle/fluid/distributed/table/table.h @@ -21,6 +21,7 @@ #include #include #include "paddle/fluid/distributed/table/accessor.h" +#include "paddle/fluid/distributed/table/depends/sparse_utils.h" #include "paddle/fluid/distributed/table/graph_node.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" @@ -47,8 +48,8 @@ class Table { return 0; } - virtual int32_t pull_sparse(float *values, const uint64_t *keys, - size_t num) = 0; + virtual int32_t pull_sparse(float *values, + const PullSparseValue &pull_value) = 0; virtual int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) = 0; virtual int32_t push_sparse_param(const uint64_t *keys, const float *values, diff --git a/paddle/fluid/distributed/table/tensor_table.h b/paddle/fluid/distributed/table/tensor_table.h index 1a8f1a9cd9adb841c3ed1fcf849a3a293c47cc52..080682d131420b5b57ce470b6b570fe24a1925b3 100644 --- a/paddle/fluid/distributed/table/tensor_table.h +++ b/paddle/fluid/distributed/table/tensor_table.h @@ -52,8 +52,8 @@ class TensorTable : public Table { int32_t push_dense(const float *values, size_t num) override { return 0; } - int32_t pull_sparse(float *values, const uint64_t *keys, - size_t num) override { + int32_t pull_sparse(float *values, + const PullSparseValue &pull_value) override { return 0; } int32_t push_sparse(const uint64_t *keys, const float *values, @@ -102,8 +102,8 @@ class DenseTensorTable : public TensorTable { DenseTensorTable() {} virtual ~DenseTensorTable() {} - int32_t pull_sparse(float *values, const uint64_t *keys, - size_t num) override { + int32_t pull_sparse(float *values, + const PullSparseValue &pull_value) override { return 0; } int32_t push_sparse(const uint64_t *keys, const float *values, @@ -158,8 +158,8 @@ class GlobalStepTable : public DenseTensorTable { GlobalStepTable() {} virtual ~GlobalStepTable() {} - int32_t pull_sparse(float *values, const uint64_t *keys, - size_t num) override { + int32_t pull_sparse(float *values, + const PullSparseValue &pull_value) override { return 0; } int32_t push_sparse(const uint64_t *keys, const float *values, diff --git a/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc b/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc index fbd236012f523715451e9c21d3f2028f88d573f3..8fb3434af6e281762b762bbc8d01b372e5c0ee34 100644 --- a/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc +++ b/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc @@ -212,8 +212,8 @@ void RunBrpcPushSparse() { /*-----------------------Test Server Init----------------------------------*/ LOG(INFO) << "Run pull_sparse_param"; - auto pull_status = worker_ptr_->pull_sparse(fea_value_ptr.data(), 0, - fea_keys.data(), fea_keys.size()); + auto pull_status = worker_ptr_->pull_sparse( + fea_value_ptr.data(), 0, fea_keys.data(), fea_keys.size(), true); pull_status.wait(); for (size_t idx = 0; idx < tensor->numel(); ++idx) { fea_values.data()[idx] *= 2.0; @@ -241,7 +241,7 @@ void RunBrpcPushSparse() { push_status.wait(); auto pull_param_status = worker_ptr_->pull_sparse( - fea_temp_value_ptr.data(), 0, fea_keys.data(), fea_keys.size()); + fea_temp_value_ptr.data(), 0, fea_keys.data(), fea_keys.size(), true); pull_param_status.wait(); for (size_t idx = 0; idx < tensor->numel(); ++idx) { @@ -275,7 +275,7 @@ void RunBrpcPushSparse() { push_grad_status.wait(); auto pull_update_status = worker_ptr_->pull_sparse( - fea_temp_value_ptr.data(), 0, fea_keys.data(), fea_keys.size()); + fea_temp_value_ptr.data(), 0, fea_keys.data(), fea_keys.size(), true); pull_update_status.wait(); for (size_t idx = 0; idx < tensor->numel(); ++idx) { diff --git a/paddle/fluid/distributed/test/geo_table_test.cc b/paddle/fluid/distributed/test/geo_table_test.cc index 22e11acf6584eefa3e41ccd950feb2dfb4bf3720..c9f15db3f788e13ca2f9a8279358358f1c50131b 100644 --- a/paddle/fluid/distributed/test/geo_table_test.cc +++ b/paddle/fluid/distributed/test/geo_table_test.cc @@ -23,6 +23,7 @@ limitations under the License. */ #include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/table/common_dense_table.h" #include "paddle/fluid/distributed/table/common_sparse_table.h" +#include "paddle/fluid/distributed/table/depends/sparse_utils.h" #include "paddle/fluid/distributed/table/sparse_geo_table.h" #include "paddle/fluid/distributed/table/table.h" @@ -53,14 +54,18 @@ TEST(SparseGeoTable, SSUM) { // test push_sparse_param, and create params std::vector init_keys = {0, 1, 2, 3, 4}; + std::vector init_fres = {1, 1, 1, 1, 1}; std::vector init_values; for (size_t i = 0; i < init_keys.size() * emb_dim; i++) { init_values.push_back(0.0); } table->push_sparse_param(init_keys.data(), init_values.data(), init_keys.size()); + std::vector pull_values(init_values.size()); - table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size()); + auto value = PullSparseValue(init_keys, init_fres, emb_dim); + table->pull_sparse(pull_values.data(), value); + for (size_t i = 0; i < init_keys.size() * emb_dim; i++) { ASSERT_TRUE(abs(pull_values[i] - init_values[i]) < 1e-5); } diff --git a/paddle/fluid/distributed/test/sparse_table_test.cc b/paddle/fluid/distributed/test/sparse_table_test.cc index 6db95c5fac211b94db726ee77c9122a8824c2351..26bede392d6fade06dd29cf5e5a28295bb1cbc43 100644 --- a/paddle/fluid/distributed/test/sparse_table_test.cc +++ b/paddle/fluid/distributed/test/sparse_table_test.cc @@ -55,9 +55,14 @@ TEST(CommonSparseTable, SGD) { // pull parameters for create and check std::vector init_keys = {0, 1, 2, 3, 4}; + std::vector init_fres = {1, 1, 1, 1, 1}; + std::vector init_values; init_values.resize(init_keys.size() * emb_dim); - table->pull_sparse(init_values.data(), init_keys.data(), init_keys.size()); + + std::vector pull_values(init_values.size()); + auto value = PullSparseValue(init_keys, init_fres, emb_dim); + table->pull_sparse(init_values.data(), value); // for check std::vector total_gradients; @@ -100,7 +105,8 @@ TEST(CommonSparseTable, SGD) { std::vector pull_values; pull_values.resize(init_keys.size() * emb_dim); - table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size()); + table->pull_sparse(init_values.data(), value); + for (size_t i = 0; i < init_values.size(); ++i) { auto update_val = init_values[i] - 1.0 * total_gradients[i]; ASSERT_TRUE(abs(update_val - pull_values[i]) < 1e-5); @@ -148,9 +154,13 @@ TEST(CommonSparseTable, Adam) { // pull parameters for create and check std::vector init_keys = {0, 1, 2, 3, 4}; + std::vector init_fres = {1, 1, 1, 1, 1}; + std::vector init_values; init_values.resize(init_keys.size() * emb_dim); - table->pull_sparse(init_values.data(), init_keys.data(), init_keys.size()); + + auto value = PullSparseValue(init_keys, init_fres, emb_dim); + table->pull_sparse(init_values.data(), value); // push gradient std::vector> trainer_keys; diff --git a/paddle/fluid/operators/pscore/distributed_lookup_table_op.cc b/paddle/fluid/operators/pscore/distributed_lookup_table_op.cc index 159bdcabd657b0cdacba5a8b846656e9038b54c6..277c93fad6aa83df21fa918013a03d8e91e5b29e 100644 --- a/paddle/fluid/operators/pscore/distributed_lookup_table_op.cc +++ b/paddle/fluid/operators/pscore/distributed_lookup_table_op.cc @@ -119,6 +119,11 @@ class DistributedLookupTableOpMaker : public framework::OpProtoAndCheckerMaker { "Output data type") .SetDefault(framework::proto::VarType::FP32); + AddAttr("is_test", + "(bool, default false) Set to true for inference only, false " + "for training.") + .SetDefault(false); + AddComment(R"DOC( Lookup Tablel Prefetch Operator. This operator is used to perform lookup on parameter W, diff --git a/paddle/fluid/operators/pscore/distributed_lookup_table_op.h b/paddle/fluid/operators/pscore/distributed_lookup_table_op.h index 0f1a096e207692e2bb901ec3235875ece4fa0eab..413b4ab358536c1efd9a5c875afeb05c231202a9 100644 --- a/paddle/fluid/operators/pscore/distributed_lookup_table_op.h +++ b/paddle/fluid/operators/pscore/distributed_lookup_table_op.h @@ -30,6 +30,7 @@ class DistributedLookupTableKernel : public framework::OpKernel { auto padding_idx = context.Attr("padding_idx"); auto table_id = context.Attr("table_id"); + bool is_test = context.Attr("is_test"); auto embedding_name = context.InputNames("W").front(); int64_t emb_dim = 0; @@ -55,7 +56,8 @@ class DistributedLookupTableKernel : public framework::OpKernel { if (platform::is_cpu_place(context.GetPlace())) { fleet->PullSparseToTensorSync(static_cast(table_id), emb_dim, static_cast(padding_idx), - context.GetPlace(), &inputs, &outputs); + context.GetPlace(), !is_test, &inputs, + &outputs); } else { auto inputs_variable = context.MultiInputVar("Ids"); auto outputs_variable = context.MultiOutputVar("Outputs"); @@ -93,7 +95,8 @@ class DistributedLookupTableKernel : public framework::OpKernel { // use fleet->PullSparse fleet->PullSparseToTensorSync(static_cast(table_id), emb_dim, static_cast(padding_idx), - cpu_place, &tmp_input_vec, &tmp_output_vec); + cpu_place, !is_test, &tmp_input_vec, + &tmp_output_vec); // cp temp to origin for (size_t idx = 0; idx < output_var_size; ++idx) { diff --git a/python/paddle/distributed/fleet/utils/ps_util.py b/python/paddle/distributed/fleet/utils/ps_util.py index a409d02c984cf222ad3c67b22d1fb2753e03b0a3..7bf7bec43de00887d43c1e7047e494a4941163c0 100644 --- a/python/paddle/distributed/fleet/utils/ps_util.py +++ b/python/paddle/distributed/fleet/utils/ps_util.py @@ -16,6 +16,7 @@ import numpy as np import os import paddle +import warnings class DistributedInfer: @@ -104,8 +105,6 @@ class DistributedInfer: vars=need_load_vars) def get_dist_infer_program(self): - import paddle.distributed.fleet as fleet - varname2tables = self._get_sparse_table_map() convert_program = self._convert_program(self.origin_main_program, varname2tables) @@ -185,6 +184,7 @@ class DistributedInfer: "is_distributed": is_distributed, "padding_idx": padding_idx, "table_id": table_id, + "is_test": True, "lookup_table_version": op_type }) else: @@ -193,6 +193,9 @@ class DistributedInfer: ) pull_sparse_ops = _get_pull_sparse_ops(program) + warnings.warn( + "lookup_table will be forced to test mode when use DistributedInfer" + ) _pull_sparse_fuse(program, pull_sparse_ops) return program