diff --git a/cmake/external/libmct.cmake b/cmake/external/libmct.cmake index c002def29c7a2f4f279463b300ed6c7f53c38a00..92c3165fbaa904aa6e7cf08653a5919240628c4e 100644 --- a/cmake/external/libmct.cmake +++ b/cmake/external/libmct.cmake @@ -19,7 +19,7 @@ IF((NOT DEFINED LIBMCT_VER) OR (NOT DEFINED LIBMCT_URL)) MESSAGE(STATUS "use pre defined download url") SET(LIBMCT_VER "0.1.0" CACHE STRING "" FORCE) SET(LIBMCT_NAME "libmct" CACHE STRING "" FORCE) - SET(LIBMCT_URL "https://pslib.bj.bcebos.com/libmct.tar.gz" CACHE STRING "" FORCE) + SET(LIBMCT_URL "https://pslib.bj.bcebos.com/libmct/libmct.tar.gz" CACHE STRING "" FORCE) ENDIF() MESSAGE(STATUS "LIBMCT_NAME: ${LIBMCT_NAME}, LIBMCT_URL: ${LIBMCT_URL}") SET(LIBMCT_PREFIX_DIR "${THIRD_PARTY_PATH}/libmct") diff --git a/paddle/fluid/distributed/common/chunk_allocator.h b/paddle/fluid/distributed/common/chunk_allocator.h new file mode 100644 index 0000000000000000000000000000000000000000..17f7bb14224d3507efc68a6fba24ce69ed79b08e --- /dev/null +++ b/paddle/fluid/distributed/common/chunk_allocator.h @@ -0,0 +1,95 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include + +namespace paddle { +namespace distributed { + +// Fast allocation and deallocation of objects by allocating them in chunks. +template +class ChunkAllocator { + public: + explicit ChunkAllocator(size_t chunk_size = 64) { + CHECK(sizeof(Node) == std::max(sizeof(void*), sizeof(T))); + _chunk_size = chunk_size; + _chunks = NULL; + _free_nodes = NULL; + _counter = 0; + } + ChunkAllocator(const ChunkAllocator&) = delete; + ~ChunkAllocator() { + while (_chunks != NULL) { + Chunk* x = _chunks; + _chunks = _chunks->next; + free(x); + } + } + template + T* acquire(ARGS&&... args) { + if (_free_nodes == NULL) { + create_new_chunk(); + } + + T* x = (T*)(void*)_free_nodes; // NOLINT + _free_nodes = _free_nodes->next; + new (x) T(std::forward(args)...); + _counter++; + return x; + } + void release(T* x) { + x->~T(); + Node* node = (Node*)(void*)x; // NOLINT + node->next = _free_nodes; + _free_nodes = node; + _counter--; + } + size_t size() const { return _counter; } + + private: + struct alignas(T) Node { + union { + Node* next; + char data[sizeof(T)]; + }; + }; + struct Chunk { + Chunk* next; + Node nodes[]; + }; + + size_t _chunk_size; // how many elements in one chunk + Chunk* _chunks; // a list + Node* _free_nodes; // a list + size_t _counter; // how many elements are acquired + + void create_new_chunk() { + Chunk* chunk; + posix_memalign(reinterpret_cast(&chunk), + std::max(sizeof(void*), alignof(Chunk)), + sizeof(Chunk) + sizeof(Node) * _chunk_size); + chunk->next = _chunks; + _chunks = chunk; + + for (size_t i = 0; i < _chunk_size; i++) { + Node* node = &chunk->nodes[i]; + node->next = _free_nodes; + _free_nodes = node; + } + } +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet.cc b/paddle/fluid/distributed/fleet.cc index 9eb6cbecdc752f96a9e17d9be0d3d5d4c2dfee11..5caeab832a3e746720dae2104e6f91d325e101fd 100644 --- a/paddle/fluid/distributed/fleet.cc +++ b/paddle/fluid/distributed/fleet.cc @@ -460,25 +460,7 @@ void FleetWrapper::PushSparseFromTensorAsync( clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0]; CHECK(clk_size == batch_size || clk_size == 1); - std::vector g; - for (framework::LoDTensor* g_tensor : *outputs) { - float* g_ori = g_tensor->data(); - // no cvm - if (batch_size_consist) { // TODO(zhaocaibei123): add config - // scale_sparse_gradient_with_batch_size_ - Eigen::Map< - Eigen::Matrix> - g_mat(g_ori, g_tensor->numel() / fea_dim, fea_dim); - g_mat.rightCols(fea_dim) *= batch_size; - } - - size_t origin = g.size(); - size_t add = g_tensor->numel(); - g.resize(origin + add); - - memcpy(g.data() + origin, g_tensor->data(), add * sizeof(float)); - } - + CHECK(outputs->size() == inputs->size()); std::vector push_keys; push_keys.reserve(MAX_FEASIGN_NUM / 100); std::vector> push_values; @@ -495,9 +477,21 @@ void FleetWrapper::PushSparseFromTensorAsync( const int64_t* clk_tensor = clks->data(); for (size_t index = 0; index < inputs->size(); ++index) { + framework::LoDTensor* g_tensor = outputs->at(index); + float* g = g_tensor->data(); + // no cvm + if (batch_size_consist) { // TODO(zhaocaibei123): add config + // scale_sparse_gradient_with_batch_size_ + Eigen::Map< + Eigen::Matrix> + g_mat(g, g_tensor->numel() / fea_dim, fea_dim); + g_mat.rightCols(fea_dim) *= batch_size; + } + const framework::LoDTensor* tensor = inputs->at(index); const int64_t* ids = tensor->data(); size_t len = tensor->numel(); + output_len = 0; if (tensor->lod().size() > 0) { for (size_t i = 0; i < tensor->lod()[0].size() - 1; ++i) { @@ -519,7 +513,7 @@ void FleetWrapper::PushSparseFromTensorAsync( float* data = push_values.back().data() + 3; - memcpy(data, g.data() + output_len, sizeof(float) * fea_dim); + memcpy(data, g + output_len, sizeof(float) * fea_dim); ++input_idx; } @@ -542,14 +536,13 @@ void FleetWrapper::PushSparseFromTensorAsync( float* data = push_values.back().data() + 3; - memcpy(data, g.data() + output_len, sizeof(float) * fea_dim); + memcpy(data, g + output_len, sizeof(float) * fea_dim); ++input_idx; } } + CHECK(output_len == g_tensor->numel()); } - VLOG(1) << "output_len: " << output_len << " g.size(): " << g.size(); - CHECK(output_len == g.size()); std::vector push_g_vec(input_idx, nullptr); diff --git a/paddle/fluid/distributed/service/brpc_ps_client.cc b/paddle/fluid/distributed/service/brpc_ps_client.cc index f6b544d22b22ddeb8d4f3f029d7cb240435d9b5c..a0a09b14dba72cd3174280104edc6403d49a69f4 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.cc +++ b/paddle/fluid/distributed/service/brpc_ps_client.cc @@ -210,6 +210,23 @@ int32_t BrpcPsClient::initialize() { } } + auto &profiler = CostProfiler::instance(); + profiler.register_profiler("pserver_client_pull_dense"); + profiler.register_profiler("pserver_client_pull_sparse"); + profiler.register_profiler("pserver_client_pull_sparse_local"); + profiler.register_profiler("pserver_client_push_sparse"); + profiler.register_profiler("pserver_client_push_sparse_parse"); + profiler.register_profiler("client_push_sparse_put"); + profiler.register_profiler("pserver_client_push_sparse"); + profiler.register_profiler("pserver_client_push_sparse_merge"); + profiler.register_profiler("pserver_client_push_sparse_rpc"); + profiler.register_profiler("pserver_client_push_dense"); + profiler.register_profiler("pserver_client_push_dense_parse"); + profiler.register_profiler("push_dense_put"); + profiler.register_profiler("pserver_client_push_dense_merge"); + profiler.register_profiler("pserver_client_push_dense_rpc"); + profiler.register_profiler("pserver_client_push_dense_send"); + _running = true; _flushing = false; // 启动异步push线程 @@ -588,6 +605,7 @@ std::future BrpcPsClient::push_sparse_param( std::future BrpcPsClient::pull_dense(Region *regions, size_t region_num, size_t table_id) { + auto timer = std::make_shared("pserver_client_pull_dense"); auto *accessor = table_accessor(table_id); size_t request_call_num = _server_channels.size(); uint32_t num_per_shard = @@ -643,6 +661,7 @@ std::future BrpcPsClient::pull_dense(Region *regions, } closure->set_promise_value(ret); }); + closure->add_timer(timer); auto promise = std::make_shared>(); closure->add_promise(promise); std::future fut = promise->get_future(); @@ -865,6 +884,9 @@ std::future BrpcPsClient::pull_sparse(float **select_values, size_t table_id, const uint64_t *keys, size_t num, bool is_training) { + auto timer = std::make_shared("pserver_client_pull_sparse"); + auto local_timer = + std::make_shared("pserver_client_pull_sparse_local"); size_t request_call_num = _server_channels.size(); auto shard_sorted_kvs = std::make_shared< @@ -925,7 +947,7 @@ std::future BrpcPsClient::pull_sparse(float **select_values, } closure->set_promise_value(ret); }); - + closure->add_timer(timer); auto promise = std::make_shared>(); closure->add_promise(promise); std::future fut = promise->get_future(); @@ -1110,8 +1132,8 @@ std::future BrpcPsClient::push_sparse(size_t table_id, const uint64_t *keys, const float **update_values, size_t num) { - auto push_timer = - std::make_shared("pserver_client_push_sparse_parse"); + auto push_timer = std::make_shared("pserver_client_push_sparse"); + CostTimer parse_timer("pserver_client_push_sparse_parse"); int push_sparse_async_num = _push_sparse_task_queue_map[table_id]->Size(); while (push_sparse_async_num > FLAGS_pserver_max_async_call_num) { // LOG(INFO) << "push_sparse Waiting for async_call_num comsume, task_num:" @@ -1121,6 +1143,7 @@ std::future BrpcPsClient::push_sparse(size_t table_id, // push_sparse_async_num = _push_sparse_task_queue_map[table_id]->size(); push_sparse_async_num = _push_sparse_task_queue_map[table_id]->Size(); } + auto put_timer = std::make_shared("client_push_sparse_put"); thread_local std::vector>> shard_sorted_kv_list; auto *accessor = table_accessor(table_id); @@ -1250,14 +1273,14 @@ void BrpcPsClient::push_sparse_task_consume() { for_each(task_list.begin() + 1, task_list.end(), [&request_kv_num, request_call_num, closure](std::shared_ptr &task) { - // closure->add_timer(task->timer()); + closure->add_timer(task->timer()); closure->add_promise(task->promise()); }); - // CostTimer merge_timer("pserver_client_push_sparse_merge"); - // auto rpc_timer = - // std::make_shared("pserver_client_push_sparse_rpc"); - // closure->add_timer(rpc_timer); + CostTimer merge_timer("pserver_client_push_sparse_merge"); + auto rpc_timer = + std::make_shared("pserver_client_push_sparse_rpc"); + closure->add_timer(rpc_timer); std::vector> merge_status(request_call_num); for (int shard_idx = 0; shard_idx < request_call_num; ++shard_idx) { @@ -1295,6 +1318,7 @@ void BrpcPsClient::push_sparse_task_consume() { std::vector>().swap(merge_status); } } + timeline.Pause(); auto wait_ms = FLAGS_pserver_async_push_sparse_interval_ms - (timeline.ElapsedMS()); if (wait_ms > 0) { @@ -1464,10 +1488,12 @@ std::future BrpcPsClient::push_dense(const Region *regions, usleep(5000); // 5ms push_dense_async_num = _push_dense_task_queue_map[table_id]->Size(); } + auto push_dense_timer = std::make_shared("push_dense_put"); // auto dense_data = _dense_matrix_obj_pool.get(); auto dense_data = std::make_shared>(); auto async_task = new DenseAsyncTask(dense_data, table_id, push_timer); size_t request_call_num = _server_channels.size(); + uint32_t num_per_shard = dense_dim_per_shard(accessor->fea_dim(), request_call_num); @@ -1567,6 +1593,7 @@ void BrpcPsClient::push_dense_task_consume() { << total_send_data[total_send_data_size - 2] << total_send_data[0] << " total_send_data[-1]" << total_send_data[total_send_data_size - 1]; + if (scale_gradient && merge_count > 1) { Eigen::Map mat(total_send_data, 1, total_send_data_size); @@ -1585,6 +1612,7 @@ void BrpcPsClient::push_dense_task_consume() { push_dense_raw_gradient(task_ptr, total_send_data, total_send_data_size, closure); } + timeline.Pause(); auto wait_ms = FLAGS_pserver_async_push_dense_interval_ms - (timeline.ElapsedMS()); if (wait_ms > 0) { @@ -1603,6 +1631,8 @@ void BrpcPsClient::push_dense_raw_gradient( closure->add_timer(timer); uint32_t num_per_shard = dense_dim_per_shard(accessor->fea_dim(), request_call_num); + auto send_timer = + std::make_shared("pserver_client_push_dense_send"); for (size_t i = 0; i < request_call_num; ++i) { closure->request(i)->set_cmd_id(PS_PUSH_DENSE_TABLE); closure->request(i)->set_table_id(task->table_id()); diff --git a/paddle/fluid/distributed/service/brpc_ps_server.cc b/paddle/fluid/distributed/service/brpc_ps_server.cc index a1440260bf2e77093bb937e62b13b54ad06a3e64..dd7072be7de63ba90c55e176671c63ba1d444e09 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.cc +++ b/paddle/fluid/distributed/service/brpc_ps_server.cc @@ -15,6 +15,7 @@ #include "paddle/fluid/distributed/service/brpc_ps_server.h" #include // NOLINT #include "butil/object_pool.h" +#include "paddle/fluid/distributed/common/cost_timer.h" #include "paddle/fluid/distributed/table/depends/sparse_utils.h" #include "paddle/fluid/distributed/table/table.h" #include "paddle/fluid/framework/archive.h" @@ -117,6 +118,11 @@ int32_t BrpcPsService::initialize() { _service_handler_map[PS_START_PROFILER] = &BrpcPsService::start_profiler; _service_handler_map[PS_STOP_PROFILER] = &BrpcPsService::stop_profiler; _service_handler_map[PS_PUSH_GLOBAL_STEP] = &BrpcPsService::push_global_step; + auto &profiler = CostProfiler::instance(); + profiler.register_profiler("pserver_server_pull_dense"); + profiler.register_profiler("pserver_server_push_dense"); + profiler.register_profiler("pserver_server_pull_sparse"); + profiler.register_profiler("pserver_server_push_sparse"); // shard初始化,server启动后才可从env获取到server_list的shard信息 initialize_shard_info(); @@ -190,6 +196,7 @@ int32_t BrpcPsService::pull_dense(Table *table, const PsRequestMessage &request, "PsRequestMessage.datas is requeired at least 1 for num of dense"); return 0; } + CostTimer timer("pserver_server_pull_dense"); uint32_t num = *(const uint32_t *)request.params(0).c_str(); if (num < 0) { set_response_code(response, -1, @@ -246,6 +253,7 @@ int32_t BrpcPsService::push_dense(Table *table, const PsRequestMessage &request, return 0; } + CostTimer timer("pserver_server_push_dense"); /* Push Content: |--num--|---valuesData---| @@ -356,6 +364,7 @@ int32_t BrpcPsService::pull_sparse(Table *table, return 0; } + CostTimer timer("pserver_server_pull_sparse"); uint32_t num = *(uint32_t *)(request.params(0).c_str()); auto dim = table->value_accesor()->select_dim(); @@ -396,6 +405,7 @@ int32_t BrpcPsService::push_sparse(Table *table, "least 1 for num of sparse_key"); return 0; } + CostTimer timer("pserver_server_push_sparse"); uint32_t num = *(uint32_t *)(request.params(0).c_str()); /* Push Content: diff --git a/paddle/fluid/distributed/table/CMakeLists.txt b/paddle/fluid/distributed/table/CMakeLists.txt index 0201b627801cbdc1276affe2546ca24870ec1564..b0a553f210044172c0582b3061f57ecb7cf43e12 100644 --- a/paddle/fluid/distributed/table/CMakeLists.txt +++ b/paddle/fluid/distributed/table/CMakeLists.txt @@ -16,6 +16,11 @@ set_source_files_properties(common_graph_table.cc PROPERTIES COMPILE_FLAGS ${DIS get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS) +set(PADDLE_LIB_THIRD_PARTY_PATH "${PADDLE_LIB}/third_party/") +include_directories(${PADDLE_LIB_THIRD_PARTY_PATH}libmct/src/extern_libmct/libmct/include) + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp") + set(EXTERN_DEP "") if(WITH_HETERPS) set(TABLE_SRC common_sparse_table.cc ssd_sparse_table.cc common_dense_table.cc sparse_geo_table.cc barrier_table.cc common_graph_table.cc) @@ -43,3 +48,5 @@ cc_library(ctr_accessor SRCS ctr_accessor.cc DEPS ${TABLE_DEPS} ps_framework_pro cc_library(memory_sparse_table SRCS memory_sparse_table.cc DEPS ps_framework_proto ${TABLE_DEPS} fs afs_wrapper ctr_accessor common_table) cc_library(table SRCS table.cc DEPS memory_sparse_table common_table tensor_accessor tensor_table ps_framework_proto string_helper device_context gflags glog boost) + +target_link_libraries(table -fopenmp) diff --git a/paddle/fluid/distributed/table/common_dense_table.h b/paddle/fluid/distributed/table/common_dense_table.h index 1fa0226decd56eab8ab1e1b44a622378c950029a..c8813dc33053f0c8a42a1090b262c7fde79f5ed5 100644 --- a/paddle/fluid/distributed/table/common_dense_table.h +++ b/paddle/fluid/distributed/table/common_dense_table.h @@ -57,7 +57,7 @@ class CommonDenseTable : public DenseTable { int32_t _push_dense(const float* values, size_t num); private: - const int task_pool_size_ = 1; + const int task_pool_size_ = 10; bool sync = true; std::vector> _shards_task_pool; int param_dim_ = 0; diff --git a/paddle/fluid/distributed/table/depends/dense.h b/paddle/fluid/distributed/table/depends/dense.h index 8e507842bc3300c4d0afbf4eb11faddd65d00c7b..d2042b7a718e6de50f89052b43432e9afc03ef61 100644 --- a/paddle/fluid/distributed/table/depends/dense.h +++ b/paddle/fluid/distributed/table/depends/dense.h @@ -99,6 +99,7 @@ class DSGD : public DenseOptimizer { }; // adam optimizer for dense tensor +// TODO(zhaocaibei123): add CHECK(common_dense_table.task_pool_size_) == 1 class DAdam : public DenseOptimizer { public: explicit DAdam(const CommonAccessorParameter& accessor, @@ -131,6 +132,8 @@ class DAdam : public DenseOptimizer { epsilon = 1.0e-8; } + // make sure common_dense_table.task_pool_size_ == 1; + // otherwise, task_pool_size_ times beta1_pow/beta2_pow multiplication void update(const float* update_values, size_t num, int begin, int end) override { auto update_numel = end - begin; @@ -221,45 +224,35 @@ class DAdamD2Sum : public DenseOptimizer { void update(const float* update_values, size_t num, int begin, int end) override { auto update_numel = end - begin; - std::vector grad, grad2, scale; - grad.resize(update_numel); - grad2.resize(update_numel); - scale.resize(update_numel); - - auto blas = GetBlas(); - // copy grad - blas.VCOPY(update_numel, update_values + begin, grad.data()); - blas.VCOPY(update_numel, update_values + begin, grad2.data()); - - // d2sum - blas.SCAL(update_numel, ada_decay_rate[0], ada_d2sum + begin); - ADD(update_numel, ada_d2sum + begin, 1, ada_d2sum + begin); - - // g2sum - blas.SCAL(update_numel, ada_decay_rate[0], ada_g2sum + begin); - blas.VSQUARE(update_numel, grad2.data(), grad2.data()); - blas.VADD(update_numel, ada_g2sum + begin, grad2.data(), ada_g2sum + begin); - - // mom - blas.SCAL(update_numel, mom_decay_rate[0], mom_velocity + begin); - blas.SCAL(update_numel, 1 - mom_decay_rate[0], grad.data()); - blas.VADD(update_numel, mom_velocity + begin, grad.data(), - mom_velocity + begin); - - // scale - float* scale_ = scale.data(); - blas.VDIV(update_numel, ada_g2sum + begin, ada_d2sum + begin, scale_); - ADD(update_numel, scale_, ada_epsilon[0], scale_); - DIV(update_numel, 1 + ada_epsilon[0], scale_, scale_); - SQRT(update_numel, scale_, scale_); - - blas.SCAL(update_numel, learning_rate[0], scale_); - - // TODO(zhaocaibei123): check if there exists elementwise_multiply in blas - // TODO(zhaocaibei123): blas.VMUL - ELE_MUL(update_numel, scale_, mom_velocity + begin, scale_); - - blas.VSUB(update_numel, param + begin, scale_, param + begin); + Eigen::Map mat_ada_g2sum(ada_g2sum + begin, 1, + update_numel); + + Eigen::Map mat_ada_d2sum(ada_d2sum + begin, 1, + update_numel); + Eigen::Map mat_mom_velocity(mom_velocity + begin, 1, + update_numel); + Eigen::Map mat_w(param + begin, 1, update_numel); + + Eigen::Map mat_grad(update_values + begin, 1, + update_numel); + + mat_ada_d2sum = (mat_ada_d2sum * ada_decay_rate[0]).array() + 1; + mat_ada_g2sum = + (mat_ada_g2sum * ada_decay_rate[0]) + mat_grad.cwiseProduct(mat_grad); + + thread_local std::vector scale_vec; + scale_vec.resize(update_numel); + Eigen::Map scale(scale_vec.data(), 1, update_numel); + memcpy(scale_vec.data(), mat_ada_d2sum.data(), + sizeof(float) * update_numel); + + scale = scale.array() * ada_epsilon[0]; + scale = (mat_ada_d2sum + scale).cwiseQuotient(mat_ada_g2sum + scale); + scale = scale.cwiseSqrt(); + mat_mom_velocity = + (mat_mom_velocity - mat_grad) * mom_decay_rate[0] + mat_grad; + + mat_w -= learning_rate[0] * mat_mom_velocity.cwiseProduct(scale); } float* learning_rate; diff --git a/paddle/fluid/distributed/table/depends/feature_value.h b/paddle/fluid/distributed/table/depends/feature_value.h index ad037a86bce80ce9fd728c1fae1ee8648b13d9bd..7a83fdec1d7ebf408c494b1a9230a515ea71a4c1 100644 --- a/paddle/fluid/distributed/table/depends/feature_value.h +++ b/paddle/fluid/distributed/table/depends/feature_value.h @@ -14,35 +14,11 @@ #pragma once -#include -#include -#include // NOLINT -#include -#include -#include // NOLINT -#include -#include -#include #include #include "gflags/gflags.h" -#include "butil/object_pool.h" -#include "paddle/fluid/distributed/common/utils.h" -#include "paddle/fluid/distributed/table/depends/initializers.h" -#include "paddle/fluid/distributed/thirdparty/round_robin.h" -#include "paddle/fluid/framework/generator.h" -#include "paddle/fluid/framework/lod_tensor.h" -#include "paddle/fluid/framework/rw_lock.h" -#include "paddle/fluid/framework/selected_rows.h" -#include "paddle/fluid/framework/tensor.h" -#include "paddle/fluid/framework/threadpool.h" -#include "paddle/fluid/framework/variable.h" -#include "paddle/fluid/platform/device_context.h" -#include "paddle/fluid/platform/enforce.h" -#include "paddle/fluid/platform/place.h" -#include "paddle/fluid/platform/port.h" -#include "paddle/fluid/string/printf.h" -#include "paddle/fluid/string/string_helper.h" +#include +#include "paddle/fluid/distributed/common/chunk_allocator.h" namespace paddle { namespace distributed { @@ -55,112 +31,169 @@ class FixedFeatureValue { public: FixedFeatureValue() {} ~FixedFeatureValue() {} - float *data() { return data_.data(); } - size_t size() { return data_.size(); } - void resize(size_t size) { data_.resize(size); } - void shrink_to_fit() { data_.shrink_to_fit(); } + float* data() { return _data.data(); } + size_t size() { return _data.size(); } + void resize(size_t size) { _data.resize(size); } + void shrink_to_fit() { _data.shrink_to_fit(); } private: - std::vector data_; + std::vector _data; }; -class SparseTableShard { +template +struct alignas(64) SparseTableShard { public: - typedef typename robin_hood::unordered_map + typedef typename mct::closed_hash_map> map_type; - SparseTableShard() {} - ~SparseTableShard() {} + struct iterator { + typename map_type::iterator it; + size_t bucket; + map_type* buckets; + friend bool operator==(const iterator& a, const iterator& b) { + return a.it == b.it; + } + friend bool operator!=(const iterator& a, const iterator& b) { + return a.it != b.it; + } + const KEY& key() const { return it->first; } + VALUE& value() const { return *(VALUE*)(void*)it->second; } // NOLINT + iterator& operator++() { + ++it; - FixedFeatureValue *Init(const uint64_t &id) { - size_t hash = hasher_(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; + while (it == buckets[bucket].end() && + bucket + 1 < CTR_SPARSE_SHARD_BUCKET_NUM) { + it = buckets[++bucket].begin(); + } - FixedFeatureValue *value = nullptr; - value = butil::get_object(); - table[id] = value; - return value; + return *this; + } + iterator operator++(int) { + iterator ret = *this; + ++*this; + return ret; + } + }; + struct local_iterator { + typename map_type::iterator it; + friend bool operator==(const local_iterator& a, const local_iterator& b) { + return a.it == b.it; + } + friend bool operator!=(const local_iterator& a, const local_iterator& b) { + return a.it != b.it; + } + const KEY& key() const { return it->first; } + VALUE& value() const { return *(VALUE*)(void*)it->second; } // NOLINT + local_iterator& operator++() { + ++it; + return *this; + } + local_iterator operator++(int) { return {it++}; } + }; + + ~SparseTableShard() { clear(); } + bool empty() { return _alloc.size() == 0; } + size_t size() { return _alloc.size(); } + void set_max_load_factor(float x) { + for (size_t bucket = 0; bucket < CTR_SPARSE_SHARD_BUCKET_NUM; bucket++) { + _buckets[bucket].max_load_factor(x); + } } - - // dont judge if (has(id)) - float *Get(const uint64_t &id) { - size_t hash = hasher_(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - // auto &value = table.at(id); - // return value->data_.data(); - auto res = table.find(id); - FixedFeatureValue *value = res->second; - return value->data(); + size_t bucket_count() { return CTR_SPARSE_SHARD_BUCKET_NUM; } + size_t bucket_size(size_t bucket) { return _buckets[bucket].size(); } + void clear() { + for (size_t bucket = 0; bucket < CTR_SPARSE_SHARD_BUCKET_NUM; bucket++) { + map_type& data = _buckets[bucket]; + for (auto it = data.begin(); it != data.end(); ++it) { + _alloc.release((VALUE*)(void*)it->second); // NOLINT + } + data.clear(); + } } - - // for load, to reset count, unseen_days - FixedFeatureValue *GetValue(const uint64_t &id) { - size_t hash = hasher_(id); - size_t bucket = compute_bucket(hash); - - auto &table = values_[bucket]; - auto res = table.find(id); - return res->second; + iterator begin() { + auto it = _buckets[0].begin(); + size_t bucket = 0; + while (it == _buckets[bucket].end() && + bucket + 1 < CTR_SPARSE_SHARD_BUCKET_NUM) { + it = _buckets[++bucket].begin(); + } + return {it, bucket, _buckets}; } - - void erase(uint64_t feasign) { - size_t hash = hasher_(feasign); + iterator end() { + return {_buckets[CTR_SPARSE_SHARD_BUCKET_NUM - 1].end(), + CTR_SPARSE_SHARD_BUCKET_NUM - 1, _buckets}; + } + local_iterator begin(size_t bucket) { return {_buckets[bucket].begin()}; } + local_iterator end(size_t bucket) { return {_buckets[bucket].end()}; } + iterator find(const KEY& key) { + size_t hash = _hasher(key); size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - auto iter = table.find(feasign); - if (iter != table.end()) { - butil::return_object(iter->second); - iter = table.erase(iter); + auto it = _buckets[bucket].find_with_hash(key, hash); + if (it == _buckets[bucket].end()) { + return end(); } + return {it, bucket, _buckets}; + } + VALUE& operator[](const KEY& key) { return emplace(key).first.value(); } + std::pair insert(const KEY& key, const VALUE& val) { + return emplace(key, val); } + std::pair insert(const KEY& key, VALUE&& val) { + return emplace(key, std::move(val)); + } + template + std::pair emplace(const KEY& key, ARGS&&... args) { + size_t hash = _hasher(key); + size_t bucket = compute_bucket(hash); + auto res = _buckets[bucket].insert_with_hash({key, NULL}, hash); - void clear() {} + if (res.second) { + res.first->second = _alloc.acquire(std::forward(args)...); + } - size_t compute_bucket(size_t hash) { - if (CTR_SPARSE_SHARD_BUCKET_NUM == 1) { - return 0; - } else { - return hash >> (sizeof(size_t) * 8 - CTR_SPARSE_SHARD_BUCKET_NUM_BITS); + return {{res.first, bucket, _buckets}, res.second}; + } + iterator erase(iterator it) { + _alloc.release((VALUE*)(void*)it.it->second); // NOLINT + size_t bucket = it.bucket; + auto it2 = _buckets[bucket].erase(it.it); + while (it2 == _buckets[bucket].end() && + bucket + 1 < CTR_SPARSE_SHARD_BUCKET_NUM) { + it2 = _buckets[++bucket].begin(); } + return {it2, bucket, _buckets}; } - - map_type::iterator end() { - return values_[CTR_SPARSE_SHARD_BUCKET_NUM - 1].end(); + void quick_erase(iterator it) { + _alloc.release((VALUE*)(void*)it.it->second); // NOLINT + _buckets[it.bucket].quick_erase(it.it); } - - map_type::iterator Find(uint64_t id) { - size_t hash = hasher_(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - auto got = table.find(id); - if (got == table.end()) { - return end(); - } else { - return got; + local_iterator erase(size_t bucket, local_iterator it) { + _alloc.release((VALUE*)(void*)it.it->second); // NOLINT + return {_buckets[bucket].erase(it.it)}; + } + void quick_erase(size_t bucket, local_iterator it) { + _alloc.release((VALUE*)(void*)it.it->second); // NOLINT + _buckets[bucket].quick_erase(it.it); + } + size_t erase(const KEY& key) { + auto it = find(key); + if (it == end()) { + return 0; } + quick_erase(it); + return 1; } - - private: - bool Has(const uint64_t id) { - size_t hash = hasher_(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - auto got = table.find(id); - if (got == table.end()) { - return false; + size_t compute_bucket(size_t hash) { + if (CTR_SPARSE_SHARD_BUCKET_NUM == 1) { + return 0; } else { - return true; + return hash >> (sizeof(size_t) * 8 - CTR_SPARSE_SHARD_BUCKET_NUM_BITS); } } - public: - map_type values_[CTR_SPARSE_SHARD_BUCKET_NUM]; - std::hash hasher_; + private: + map_type _buckets[CTR_SPARSE_SHARD_BUCKET_NUM]; + ChunkAllocator _alloc; + std::hash _hasher; }; } // namespace distributed diff --git a/paddle/fluid/distributed/table/memory_sparse_table.cc b/paddle/fluid/distributed/table/memory_sparse_table.cc index da5c51dfd560a1ba7b0d7b3bca6d6d31230dcfd0..7501207abe09b960db326a5b86684217d53fb061 100644 --- a/paddle/fluid/distributed/table/memory_sparse_table.cc +++ b/paddle/fluid/distributed/table/memory_sparse_table.cc @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include "paddle/fluid/distributed/common/cost_timer.h" #include "paddle/fluid/distributed/table/memory_sparse_table.h" #include "paddle/fluid/framework/io/fs.h" @@ -25,41 +27,40 @@ namespace paddle { namespace distributed { // TODO(zhaocaibei123): configure -bool FLAGS_pslib_create_value_when_push = false; -int FLAGS_pslib_table_save_max_retry = 3; -bool FLAGS_pslib_enable_create_feasign_randomly = false; +bool FLAGS_pserver_create_value_when_push = false; +int FLAGS_pserver_table_save_max_retry = 3; +bool FLAGS_pserver_enable_create_feasign_randomly = false; int32_t MemorySparseTable::initialize() { - shards_task_pool_.resize(task_pool_size_); - for (int i = 0; i < shards_task_pool_.size(); ++i) { - shards_task_pool_[i].reset(new ::ThreadPool(1)); + _shards_task_pool.resize(_task_pool_size); + for (int i = 0; i < _shards_task_pool.size(); ++i) { + _shards_task_pool[i].reset(new ::ThreadPool(1)); } + auto& profiler = CostProfiler::instance(); + profiler.register_profiler("pserver_sparse_update_all"); + profiler.register_profiler("pserver_sparse_select_all"); initialize_value(); VLOG(0) << "initalize MemorySparseTable succ"; return 0; } int32_t MemorySparseTable::initialize_value() { - sparse_table_shard_num_ = static_cast(_config.shard_num()); - avg_local_shard_num_ = - SparseTable::sparse_local_shard_num(sparse_table_shard_num_, _shard_num); - real_local_shard_num_ = avg_local_shard_num_; - if (real_local_shard_num_ * (_shard_idx + 1) > sparse_table_shard_num_) { - real_local_shard_num_ = - sparse_table_shard_num_ - real_local_shard_num_ * _shard_idx; - real_local_shard_num_ = - real_local_shard_num_ < 0 ? 0 : real_local_shard_num_; + _sparse_table_shard_num = static_cast(_config.shard_num()); + _avg_local_shard_num = + SparseTable::sparse_local_shard_num(_sparse_table_shard_num, _shard_num); + _real_local_shard_num = _avg_local_shard_num; + if (_real_local_shard_num * (_shard_idx + 1) > _sparse_table_shard_num) { + _real_local_shard_num = + _sparse_table_shard_num - _real_local_shard_num * _shard_idx; + _real_local_shard_num = + _real_local_shard_num < 0 ? 0 : _real_local_shard_num; } - VLOG(1) << "memory sparse table avg_local_shard_num_: " - << avg_local_shard_num_ - << " real_local_shard_num_: " << real_local_shard_num_; + VLOG(1) << "memory sparse table _avg_local_shard_num: " + << _avg_local_shard_num + << " _real_local_shard_num: " << _real_local_shard_num; - shard_values_.reserve(real_local_shard_num_); + _local_shards.reset(new shard_type[_real_local_shard_num]); - for (int x = 0; x < real_local_shard_num_; ++x) { - auto shard = std::make_shared(); - shard_values_.emplace_back(shard); - } return 0; } @@ -74,7 +75,7 @@ int32_t MemorySparseTable::load(const std::string& path, } int load_param = atoi(param.c_str()); - auto expect_shard_num = sparse_table_shard_num_; + auto expect_shard_num = _sparse_table_shard_num; if (file_list.size() != expect_shard_num) { LOG(WARNING) << "MemorySparseTable file_size:" << file_list.size() << " not equal to expect_shard_num:" << expect_shard_num; @@ -85,14 +86,14 @@ int32_t MemorySparseTable::load(const std::string& path, return -1; } - size_t file_start_idx = _shard_idx * avg_local_shard_num_; + size_t file_start_idx = _shard_idx * _avg_local_shard_num; size_t feature_value_size = _value_accesor->size() / sizeof(float); - // TODO(zhaocaibei123): multi-thread - // int thread_num = shard_values_.size() < 15 ? shard_values_.size() : 15; - // omp_set_num_threads(thread_num); - // #pragma omp parallel for schedule(dynamic) - for (size_t i = 0; i < real_local_shard_num_; ++i) { + + int thread_num = _real_local_shard_num < 15 ? _real_local_shard_num : 15; + omp_set_num_threads(thread_num); +#pragma omp parallel for schedule(dynamic) + for (size_t i = 0; i < _real_local_shard_num; ++i) { FsChannelConfig channel_config; channel_config.path = file_list[file_start_idx + i]; VLOG(1) << "MemorySparseTable::load begin load " << channel_config.path @@ -110,21 +111,21 @@ int32_t MemorySparseTable::load(const std::string& path, std::string line_data; auto read_channel = _afs_client.open_r(channel_config, 0, &err_no); char* end = NULL; - auto& shard = shard_values_[i]; + auto& shard = _local_shards[i]; try { while (read_channel->read_line(line_data) == 0 && line_data.size() > 1) { uint64_t key = std::strtoul(line_data.data(), &end, 10); - auto* value = shard->Init(key); - value->resize(feature_value_size); + auto& value = shard[key]; + value.resize(feature_value_size); int parse_size = - _value_accesor->parse_from_string(++end, value->data()); - value->resize(parse_size); + _value_accesor->parse_from_string(++end, value.data()); + value.resize(parse_size); // for debug for (int ii = 0; ii < parse_size; ++ii) { VLOG(2) << "MemorySparseTable::load key: " << key << " value " << ii - << ": " << value->data()[ii] << " local_shard: " << i; + << ": " << value.data()[ii] << " local_shard: " << i; } } read_channel->close(); @@ -141,7 +142,7 @@ int32_t MemorySparseTable::load(const std::string& path, LOG(ERROR) << "MemorySparseTable load failed, retry it! path:" << channel_config.path << " , retry_num=" << retry_num; } - if (retry_num > paddle::distributed::FLAGS_pslib_table_save_max_retry) { + if (retry_num > paddle::distributed::FLAGS_pserver_table_save_max_retry) { LOG(ERROR) << "MemorySparseTable load failed reach max limit!"; exit(-1); } @@ -149,7 +150,7 @@ int32_t MemorySparseTable::load(const std::string& path, } LOG(INFO) << "MemorySparseTable load success, path from " << file_list[file_start_idx] << " to " - << file_list[file_start_idx + real_local_shard_num_ - 1]; + << file_list[file_start_idx + _real_local_shard_num - 1]; return 0; } @@ -159,7 +160,7 @@ int32_t MemorySparseTable::load_local_fs(const std::string& path, auto file_list = paddle::framework::localfs_list(table_path); int load_param = atoi(param.c_str()); - auto expect_shard_num = sparse_table_shard_num_; + auto expect_shard_num = _sparse_table_shard_num; if (file_list.size() != expect_shard_num) { LOG(WARNING) << "MemorySparseTable file_size:" << file_list.size() << " not equal to expect_shard_num:" << expect_shard_num; @@ -170,14 +171,14 @@ int32_t MemorySparseTable::load_local_fs(const std::string& path, return -1; } - size_t file_start_idx = _shard_idx * avg_local_shard_num_; + size_t file_start_idx = _shard_idx * _avg_local_shard_num; size_t feature_value_size = _value_accesor->size() / sizeof(float); - // int thread_num = shard_values_.size() < 15 ? shard_values_.size() : 15; - // omp_set_num_threads(thread_num); - // #pragma omp parallel for schedule(dynamic) - for (size_t i = 0; i < real_local_shard_num_; ++i) { + int thread_num = _real_local_shard_num < 15 ? _real_local_shard_num : 15; + omp_set_num_threads(thread_num); +#pragma omp parallel for schedule(dynamic) + for (size_t i = 0; i < _real_local_shard_num; ++i) { bool is_read_failed = false; int retry_num = 0; int err_no = 0; @@ -187,16 +188,15 @@ int32_t MemorySparseTable::load_local_fs(const std::string& path, std::string line_data; std::ifstream file(file_list[file_start_idx + i]); char* end = NULL; - auto& shard = shard_values_[i]; + auto& shard = _local_shards[i]; try { while (std::getline(file, line_data) && line_data.size() > 1) { uint64_t key = std::strtoul(line_data.data(), &end, 10); - auto* value = shard->Init(key); - value->resize(feature_value_size); + auto& value = shard[key]; + value.resize(feature_value_size); int parse_size = - _value_accesor->parse_from_string(++end, value->data()); - value->resize(parse_size); - // value->shrink_to_fit(); + _value_accesor->parse_from_string(++end, value.data()); + value.resize(parse_size); } file.close(); if (err_no == -1) { @@ -213,7 +213,7 @@ int32_t MemorySparseTable::load_local_fs(const std::string& path, << file_list[file_start_idx + i] << " , retry_num=" << retry_num; } - if (retry_num > paddle::distributed::FLAGS_pslib_table_save_max_retry) { + if (retry_num > paddle::distributed::FLAGS_pserver_table_save_max_retry) { LOG(ERROR) << "MemorySparseTable load failed reach max limit!"; exit(-1); } @@ -221,7 +221,7 @@ int32_t MemorySparseTable::load_local_fs(const std::string& path, } LOG(INFO) << "MemorySparseTable load success, path from " << file_list[file_start_idx] << " to " - << file_list[file_start_idx + real_local_shard_num_ - 1]; + << file_list[file_start_idx + _real_local_shard_num - 1]; return 0; } @@ -233,15 +233,14 @@ int32_t MemorySparseTable::save(const std::string& dirname, std::string table_path = table_dir(dirname); _afs_client.remove(paddle::string::format_string( "%s/part-%03d-*", table_path.c_str(), _shard_idx)); - // int thread_num = shard_values_.size() < 20 ? shard_values_.size() : 20; std::atomic feasign_size_all{0}; - size_t file_start_idx = avg_local_shard_num_ * _shard_idx; + size_t file_start_idx = _avg_local_shard_num * _shard_idx; - // TODO(zhaocaibei123): openmp - // omp_set_num_threads(thread_num); - // #pragma omp parallel for schedule(dynamic) - for (size_t i = 0; i < real_local_shard_num_; ++i) { + int thread_num = _real_local_shard_num < 20 ? _real_local_shard_num : 20; + omp_set_num_threads(thread_num); +#pragma omp parallel for schedule(dynamic) + for (size_t i = 0; i < _real_local_shard_num; ++i) { FsChannelConfig channel_config; if (_config.compress_in_save() && (save_param == 0 || save_param == 3)) { channel_config.path = paddle::string::format_string( @@ -259,30 +258,28 @@ int32_t MemorySparseTable::save(const std::string& dirname, int feasign_size = 0; int retry_num = 0; int err_no = 0; - auto& shard = shard_values_[i]; + auto& shard = _local_shards[i]; do { err_no = 0; feasign_size = 0; is_write_failed = false; auto write_channel = _afs_client.open_w(channel_config, 1024 * 1024 * 40, &err_no); - for (auto& table : shard->values_) { - for (auto& value : table) { - if (_value_accesor->save(value.second->data(), save_param)) { - std::string format_value = _value_accesor->parse_to_string( - value.second->data(), value.second->size()); - if (0 != - write_channel->write_line(paddle::string::format_string( - "%lu %s", value.first, format_value.c_str()))) { - ++retry_num; - is_write_failed = true; - LOG(ERROR) - << "MemorySparseTable save prefix failed, retry it! path:" - << channel_config.path << " , retry_num=" << retry_num; - break; - } - ++feasign_size; + for (auto it = shard.begin(); it != shard.end(); ++it) { + if (_value_accesor->save(it.value().data(), save_param)) { + std::string format_value = _value_accesor->parse_to_string( + it.value().data(), it.value().size()); + if (0 != + write_channel->write_line(paddle::string::format_string( + "%lu %s", it.key(), format_value.c_str()))) { + ++retry_num; + is_write_failed = true; + LOG(ERROR) + << "MemorySparseTable save prefix failed, retry it! path:" + << channel_config.path << " , retry_num=" << retry_num; + break; } + ++feasign_size; } } write_channel->close(); @@ -296,17 +293,14 @@ int32_t MemorySparseTable::save(const std::string& dirname, if (is_write_failed) { _afs_client.remove(channel_config.path); } - if (retry_num > paddle::distributed::FLAGS_pslib_table_save_max_retry) { + if (retry_num > paddle::distributed::FLAGS_pserver_table_save_max_retry) { LOG(ERROR) << "MemorySparseTable save prefix failed reach max limit!"; exit(-1); } } while (is_write_failed); feasign_size_all += feasign_size; - for (auto& table : shard->values_) { - for (auto& value : table) { - _value_accesor->update_stat_after_save(value.second->data(), - save_param); - } + for (auto it = shard.begin(); it != shard.end(); ++it) { + _value_accesor->update_stat_after_save(it.value().data(), save_param); } LOG(INFO) << "MemorySparseTable save prefix success, path: " << channel_config.path; @@ -322,26 +316,30 @@ int32_t MemorySparseTable::save_local_fs(const std::string& dirname, atoi(param.c_str()); // checkpoint:0 xbox delta:1 xbox base:2 std::string table_path = table_dir(dirname); int feasign_cnt = 0; - size_t file_start_idx = avg_local_shard_num_ * _shard_idx; - for (size_t i = 0; i < real_local_shard_num_; ++i) { + size_t file_start_idx = _avg_local_shard_num * _shard_idx; + + int thread_num = _real_local_shard_num < 20 ? _real_local_shard_num : 20; + std::atomic feasign_size_all{0}; + + omp_set_num_threads(thread_num); +#pragma omp parallel for schedule(dynamic) + for (size_t i = 0; i < _real_local_shard_num; ++i) { feasign_cnt = 0; - auto& shard = shard_values_[i]; + auto& shard = _local_shards[i]; std::string file_name = paddle::string::format_string( "%s/part-%s-%03d-%05d", table_path.c_str(), prefix.c_str(), _shard_idx, file_start_idx + i); std::ofstream os; os.open(file_name); - for (auto& table : shard->values_) { - for (auto& value : table) { - if (_value_accesor->save(value.second->data(), save_param)) { - std::string format_value = _value_accesor->parse_to_string( - value.second->data(), value.second->size()); - std::string out_line = paddle::string::format_string( - "%lu %s\n", value.first, format_value.c_str()); - // VLOG(2) << out_line.c_str(); - os.write(out_line.c_str(), sizeof(char) * out_line.size()); - ++feasign_cnt; - } + for (auto it = shard.begin(); it != shard.end(); ++it) { + if (_value_accesor->save(it.value().data(), save_param)) { + std::string format_value = _value_accesor->parse_to_string( + it.value().data(), it.value().size()); + std::string out_line = paddle::string::format_string( + "%lu %s\n", it.key(), format_value.c_str()); + // VLOG(2) << out_line.c_str(); + os.write(out_line.c_str(), sizeof(char) * out_line.size()); + ++feasign_cnt; } } os.close(); @@ -351,22 +349,51 @@ int32_t MemorySparseTable::save_local_fs(const std::string& dirname, return 0; } -std::pair MemorySparseTable::print_table_stat() { - int64_t feasign_size = 0; - int64_t mf_size = 0; +int64_t MemorySparseTable::local_size() { + int64_t local_size = 0; + for (size_t i = 0; i < _real_local_shard_num; ++i) { + local_size += _local_shards[i].size(); + } + return local_size; +} - for (auto& shard : shard_values_) { - for (auto& table : shard->values_) { - feasign_size += table.size(); - } +int64_t MemorySparseTable::local_mf_size() { + std::vector size_arr(_real_local_shard_num, 0); + std::vector> tasks(_real_local_shard_num); + int64_t ret_size = 0; + for (size_t shard_id = 0; shard_id < _real_local_shard_num; ++shard_id) { + tasks[shard_id] = + _shards_task_pool[shard_id % _shards_task_pool.size()]->enqueue( + [this, shard_id, &size_arr]() -> int { + auto& local_shard = _local_shards[shard_id]; + for (auto it = local_shard.begin(); it != local_shard.end(); + ++it) { + if (_value_accesor->has_mf(it.value().size())) { + size_arr[shard_id] += 1; + } + } + return 0; + }); + } + for (size_t i = 0; i < _real_local_shard_num; ++i) { + tasks[i].wait(); + } + for (auto x : size_arr) { + ret_size += x; } + return ret_size; +} +std::pair MemorySparseTable::print_table_stat() { + int64_t feasign_size = local_size(); + int64_t mf_size = local_mf_size(); return {feasign_size, mf_size}; } int32_t MemorySparseTable::pull_sparse(float* pull_values, const PullSparseValue& pull_value) { - std::vector> tasks(real_local_shard_num_); + CostTimer timer("pserver_sparse_select_all"); + std::vector> tasks(_real_local_shard_num); const size_t value_size = _value_accesor->size() / sizeof(float); size_t mf_value_size = _value_accesor->mf_size() / sizeof(float); @@ -374,42 +401,42 @@ int32_t MemorySparseTable::pull_sparse(float* pull_values, // std::atomic missed_keys{0}; std::vector>> task_keys( - real_local_shard_num_); + _real_local_shard_num); size_t num = pull_value.numel_; for (size_t i = 0; i < num; ++i) { - int shard_id = (pull_value.feasigns_[i] % sparse_table_shard_num_) % - avg_local_shard_num_; + int shard_id = (pull_value.feasigns_[i] % _sparse_table_shard_num) % + _avg_local_shard_num; task_keys[shard_id].push_back({pull_value.feasigns_[i], i}); } - for (int shard_id = 0; shard_id < real_local_shard_num_; ++shard_id) { + for (int shard_id = 0; shard_id < _real_local_shard_num; ++shard_id) { tasks[shard_id] = - shards_task_pool_[shard_id % shards_task_pool_.size()]->enqueue( + _shards_task_pool[shard_id % _shards_task_pool.size()]->enqueue( [this, shard_id, &task_keys, value_size, pull_values, mf_value_size, select_value_size]() -> int { - auto& local_shard = shard_values_[shard_id]; + auto& local_shard = _local_shards[shard_id]; float data_buffer[value_size]; // NOLINT float* data_buffer_ptr = data_buffer; auto& keys = task_keys[shard_id]; for (size_t i = 0; i < keys.size(); i++) { uint64_t key = keys[i].first; - auto itr = local_shard->Find(key); + auto itr = local_shard.find(key); size_t data_size = value_size - mf_value_size; - if (itr == local_shard->end()) { + if (itr == local_shard.end()) { // ++missed_keys; - if (FLAGS_pslib_create_value_when_push) { + if (FLAGS_pserver_create_value_when_push) { memset(data_buffer, 0, sizeof(float) * data_size); } else { - auto* feature_value = local_shard->Init(key); - feature_value->resize(data_size); - float* data_ptr = feature_value->data(); + auto& feature_value = local_shard[key]; + feature_value.resize(data_size); + float* data_ptr = feature_value.data(); _value_accesor->create(&data_buffer_ptr, 1); memcpy(data_ptr, data_buffer_ptr, data_size * sizeof(float)); } } else { - data_size = itr->second->size(); - memcpy(data_buffer_ptr, itr->second->data(), + data_size = itr.value().size(); + memcpy(data_buffer_ptr, itr.value().data(), data_size * sizeof(float)); } for (int mf_idx = data_size; mf_idx < value_size; ++mf_idx) { @@ -439,11 +466,12 @@ int32_t MemorySparseTable::pull_sparse_ptr(char** pull_values, int32_t MemorySparseTable::push_sparse(const uint64_t* keys, const float* values, size_t num) { - std::vector> tasks(real_local_shard_num_); + CostTimer timer("pserver_sparse_update_all"); + std::vector> tasks(_real_local_shard_num); std::vector>> task_keys( - real_local_shard_num_); + _real_local_shard_num); for (size_t i = 0; i < num; ++i) { - int shard_id = (keys[i] % sparse_table_shard_num_) % avg_local_shard_num_; + int shard_id = (keys[i] % _sparse_table_shard_num) % _avg_local_shard_num; task_keys[shard_id].push_back({keys[i], i}); } @@ -451,41 +479,38 @@ int32_t MemorySparseTable::push_sparse(const uint64_t* keys, size_t mf_value_col = _value_accesor->mf_size() / sizeof(float); size_t update_value_col = _value_accesor->update_size() / sizeof(float); - for (size_t shard_id = 0; shard_id < real_local_shard_num_; ++shard_id) { - tasks[shard_id] = shards_task_pool_[shard_id % task_pool_size_]->enqueue( + for (size_t shard_id = 0; shard_id < _real_local_shard_num; ++shard_id) { + tasks[shard_id] = _shards_task_pool[shard_id % _task_pool_size]->enqueue( [this, shard_id, value_col, mf_value_col, update_value_col, values, &task_keys]() -> int { auto& keys = task_keys[shard_id]; - auto& local_shard = shard_values_[shard_id]; + auto& local_shard = _local_shards[shard_id]; float data_buffer[value_col]; // NOLINT float* data_buffer_ptr = data_buffer; - for (int i = 0; i < keys.size(); ++i) { uint64_t key = keys[i].first; uint64_t push_data_idx = keys[i].second; const float* update_data = values + push_data_idx * update_value_col; - auto itr = local_shard->Find(key); - if (itr == local_shard->end()) { + auto itr = local_shard.find(key); + if (itr == local_shard.end()) { VLOG(0) << "sparse table push_sparse: " << key << "not found!"; - if (FLAGS_pslib_enable_create_feasign_randomly && + if (FLAGS_pserver_enable_create_feasign_randomly && !_value_accesor->create_value(1, update_data)) { continue; } auto value_size = value_col - mf_value_col; - auto* feature_value = local_shard->Init(key); - feature_value->resize(value_size); + auto& feature_value = local_shard[key]; + feature_value.resize(value_size); _value_accesor->create(&data_buffer_ptr, 1); - memcpy(feature_value->data(), data_buffer_ptr, + memcpy(feature_value.data(), data_buffer_ptr, value_size * sizeof(float)); - itr = local_shard->Find(key); - } else { - VLOG(2) << "sparse table debug push_sparse: " << key << " found!"; + itr = local_shard.find(key); } - auto* feature_value = itr->second; - float* value_data = feature_value->data(); - size_t value_size = feature_value->size(); + auto& feature_value = itr.value(); + float* value_data = feature_value.data(); + size_t value_size = feature_value.size(); if (value_size == value_col) { // 已拓展到最大size, 则就地update _value_accesor->update(&value_data, &update_data, 1); @@ -495,8 +520,8 @@ int32_t MemorySparseTable::push_sparse(const uint64_t* keys, _value_accesor->update(&data_buffer_ptr, &update_data, 1); if (_value_accesor->need_extend_mf(data_buffer)) { - feature_value->resize(value_col); - value_data = feature_value->data(); + feature_value.resize(value_col); + value_data = feature_value.data(); _value_accesor->create(&value_data, 1); } memcpy(value_data, data_buffer_ptr, value_size * sizeof(float)); @@ -520,11 +545,11 @@ int32_t MemorySparseTable::push_sparse(const uint64_t* keys, int32_t MemorySparseTable::_push_sparse(const uint64_t* keys, const float** values, size_t num) { - std::vector> tasks(real_local_shard_num_); + std::vector> tasks(_real_local_shard_num); std::vector>> task_keys( - real_local_shard_num_); + _real_local_shard_num); for (size_t i = 0; i < num; ++i) { - int shard_id = (keys[i] % sparse_table_shard_num_) % avg_local_shard_num_; + int shard_id = (keys[i] % _sparse_table_shard_num) % _avg_local_shard_num; task_keys[shard_id].push_back({keys[i], i}); } @@ -532,36 +557,35 @@ int32_t MemorySparseTable::_push_sparse(const uint64_t* keys, size_t mf_value_col = _value_accesor->mf_size() / sizeof(float); size_t update_value_col = _value_accesor->update_size() / sizeof(float); - for (int shard_id = 0; shard_id < real_local_shard_num_; ++shard_id) { - tasks[shard_id] = shards_task_pool_[shard_id % task_pool_size_]->enqueue( + for (int shard_id = 0; shard_id < _real_local_shard_num; ++shard_id) { + tasks[shard_id] = _shards_task_pool[shard_id % _task_pool_size]->enqueue( [this, shard_id, value_col, mf_value_col, update_value_col, values, &task_keys]() -> int { auto& keys = task_keys[shard_id]; - auto& local_shard = shard_values_[shard_id]; + auto& local_shard = _local_shards[shard_id]; float data_buffer[value_col]; // NOLINT float* data_buffer_ptr = data_buffer; - for (int i = 0; i < keys.size(); ++i) { uint64_t key = keys[i].first; uint64_t push_data_idx = keys[i].second; const float* update_data = values[push_data_idx]; - auto itr = local_shard->Find(key); - if (itr == local_shard->end()) { - if (FLAGS_pslib_enable_create_feasign_randomly && + auto itr = local_shard.find(key); + if (itr == local_shard.end()) { + if (FLAGS_pserver_enable_create_feasign_randomly && !_value_accesor->create_value(1, update_data)) { continue; } auto value_size = value_col - mf_value_col; - auto* feature_value = local_shard->Init(key); - feature_value->resize(value_size); + auto& feature_value = local_shard[key]; + feature_value.resize(value_size); _value_accesor->create(&data_buffer_ptr, 1); - memcpy(feature_value->data(), data_buffer_ptr, + memcpy(feature_value.data(), data_buffer_ptr, value_size * sizeof(float)); - itr = local_shard->Find(key); + itr = local_shard.find(key); } - auto* feature_value = itr->second; - float* value_data = feature_value->data(); - size_t value_size = feature_value->size(); + auto& feature_value = itr.value(); + float* value_data = feature_value.data(); + size_t value_size = feature_value.size(); if (value_size == value_col) { // 已拓展到最大size, 则就地update _value_accesor->update(&value_data, &update_data, 1); } else { @@ -569,8 +593,8 @@ int32_t MemorySparseTable::_push_sparse(const uint64_t* keys, memcpy(data_buffer_ptr, value_data, value_size * sizeof(float)); _value_accesor->update(&data_buffer_ptr, &update_data, 1); if (_value_accesor->need_extend_mf(data_buffer)) { - feature_value->resize(value_col); - value_data = feature_value->data(); + feature_value.resize(value_col); + value_data = feature_value.data(); _value_accesor->create(&value_data, 1); } memcpy(value_data, data_buffer_ptr, value_size * sizeof(float)); @@ -591,18 +615,14 @@ int32_t MemorySparseTable::flush() { return 0; } int32_t MemorySparseTable::shrink(const std::string& param) { VLOG(0) << "MemorySparseTable::shrink"; // TODO(zhaocaibei123): implement with multi-thread - for (int shard_id = 0; shard_id < real_local_shard_num_; ++shard_id) { + for (int shard_id = 0; shard_id < _real_local_shard_num; ++shard_id) { // shrink - auto& shard = shard_values_[shard_id]; - for (auto& table : shard->values_) { - for (auto iter = table.begin(); iter != table.end();) { - if (_value_accesor->shrink(iter->second->data())) { - butil::return_object(iter->second); - iter = table.erase(iter); - VLOG(1) << "shrink erase key: " << iter->first; - } else { - ++iter; - } + auto& shard = _local_shards[shard_id]; + for (auto it = shard.begin(); it != shard.end();) { + if (_value_accesor->shrink(it.value().data())) { + it = shard.erase(it); + } else { + ++it; } } } diff --git a/paddle/fluid/distributed/table/memory_sparse_table.h b/paddle/fluid/distributed/table/memory_sparse_table.h index 409757ebec22a155857b6c15a57a00bfe2c72d6a..cb552beab13717c270c4a8495a6794c9dc912b08 100644 --- a/paddle/fluid/distributed/table/memory_sparse_table.h +++ b/paddle/fluid/distributed/table/memory_sparse_table.h @@ -36,6 +36,7 @@ namespace distributed { class MemorySparseTable : public SparseTable { public: + typedef SparseTableShard shard_type; MemorySparseTable() {} virtual ~MemorySparseTable() {} @@ -59,6 +60,9 @@ class MemorySparseTable : public SparseTable { int32_t save_local_fs(const std::string& path, const std::string& param, const std::string& prefix); + int64_t local_size(); + int64_t local_mf_size(); + virtual std::pair print_table_stat(); virtual int32_t pull_sparse(float* values, const PullSparseValue& pull_value); @@ -80,12 +84,12 @@ class MemorySparseTable : public SparseTable { size_t num); protected: - const int task_pool_size_ = 24; - size_t avg_local_shard_num_; - size_t real_local_shard_num_; - size_t sparse_table_shard_num_; - std::vector> shards_task_pool_; - std::vector> shard_values_; + const int _task_pool_size = 24; + size_t _avg_local_shard_num; + size_t _real_local_shard_num; + size_t _sparse_table_shard_num; + std::vector> _shards_task_pool; + std::unique_ptr _local_shards; }; } // namespace distributed diff --git a/paddle/fluid/distributed/test/dense_table_test.cc b/paddle/fluid/distributed/test/dense_table_test.cc index f2f1e098faae2c1f046bd97465dfd4938dd729a7..2e48b791dc8db510749aec7eed2184b8ef232381 100644 --- a/paddle/fluid/distributed/test/dense_table_test.cc +++ b/paddle/fluid/distributed/test/dense_table_test.cc @@ -27,9 +27,6 @@ class Table; TEST(CommonDenseTable, Adam) { int fea_dim = 10; int trainers = 2; - float beta1 = 0.9; - float beta2 = 0.999; - float epsilon = 1.0e-8; TableParameter table_config; table_config.set_table_class("CommonDenseTable"); @@ -39,27 +36,33 @@ TEST(CommonDenseTable, Adam) { accessor_config->set_accessor_class("CommMergeAccessor"); CommonAccessorParameter *common_config = table_config.mutable_common(); // set adam optimize config - common_config->set_name("adam"); + common_config->set_name("adam_d2sum"); common_config->set_table_name("adam_test_table"); common_config->set_trainer_num(trainers); common_config->add_params("Param"); common_config->add_dims(fea_dim); common_config->add_initializers("gaussian_random&0&0.0&1.0"); - common_config->add_params("LearningRate"); - common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); - common_config->add_params("Moment1"); + common_config->add_params("D2Sum"); + common_config->add_dims(fea_dim); + common_config->add_initializers("fill_constant&0.0"); + common_config->add_params("G2Sum"); common_config->add_dims(fea_dim); common_config->add_initializers("fill_constant&0.0"); - common_config->add_params("Moment2"); + common_config->add_params("Moment"); common_config->add_dims(fea_dim); common_config->add_initializers("fill_constant&0.0"); - common_config->add_params("Beta1Pow"); + common_config->add_params("MomentDecayRate"); common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); - common_config->add_params("Beta2Pow"); + common_config->add_initializers("fill_constant&0.99"); + common_config->add_params("AdaDecayRate"); common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); + common_config->add_initializers("fill_constant&0.9999"); + common_config->add_params("AdaEpsilon"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0e-8"); + common_config->add_params("LearningRate"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&5e-6"); auto ret = table->initialize(table_config, fs_config); ASSERT_EQ(ret, 0); @@ -89,29 +92,30 @@ TEST(CommonDenseTable, Adam) { pull_values.resize(fea_dim); table->pull_dense(pull_values.data(), fea_dim); - std::vector beta1_pow, beta2_pow, lr, mom1, mom2, param; - beta1_pow.push_back(beta1); - beta2_pow.push_back(beta2); - lr.push_back(1.0); + float mom_rate = 0.99; + float decay_rate = 0.9999; + float epsilon = 1.0e-8; + float lr = 5e-6; + std::vector d2sum, g2sum, mom, param; for (int i = 0; i < fea_dim; i++) { - mom1.push_back(0.0); - mom2.push_back(0.0); + mom.push_back(0.0); + d2sum.push_back(0.0); + g2sum.push_back(0.0); param.push_back(init_values[i]); } for (int i = 0; i < trainers; i++) { - auto lr_ = lr[0] * sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); for (int j = 0; j < fea_dim; j++) { - mom1[j] = beta1 * mom1[j] + (1 - beta1) * trainer_gradient_values[i][j]; - mom2[j] = beta2 * mom2[j] + - (1 - beta2) * trainer_gradient_values[i][j] * - trainer_gradient_values[i][j]; - param[j] = - param[j] - - lr_ * (mom1[j] / (sqrt(mom2[j]) + epsilon * sqrt(1 - beta2_pow[0]))); + d2sum[j] = d2sum[j] * decay_rate + 1; + g2sum[j] = g2sum[j] * decay_rate + + trainer_gradient_values[i][j] * trainer_gradient_values[i][j]; + float scale = d2sum[j] * epsilon; + scale = (scale + d2sum[j]) / (scale + g2sum[j]); + scale = sqrt(scale); + mom[j] = (mom[j] - trainer_gradient_values[i][j]) * mom_rate + + trainer_gradient_values[i][j]; + param[j] = param[j] - lr * scale * mom[j]; } - beta1_pow[0] *= beta1; - beta2_pow[0] *= beta2; } for (int j = 0; j < fea_dim; j++) { ASSERT_TRUE(abs(param[j] - pull_values[j]) < 1e-5); diff --git a/paddle/fluid/distributed/test/feature_value_test.cc b/paddle/fluid/distributed/test/feature_value_test.cc index 9c9f0ffcac321dd54e34241e9221200fdb125bbe..9bd00dcc56fc2da43135d0ffc9fc36821fb59941 100644 --- a/paddle/fluid/distributed/test/feature_value_test.cc +++ b/paddle/fluid/distributed/test/feature_value_test.cc @@ -12,38 +12,31 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include - -#include -#include -#include // NOLINT +#include "paddle/fluid/distributed/table/depends/feature_value.h" #include - -#include "google/protobuf/text_format.h" #include "gtest/gtest.h" -#include "paddle/fluid/distributed/table/depends/feature_value.h" namespace paddle { namespace distributed { TEST(BENCHMARK, LargeScaleKV) { - std::shared_ptr shard = - std::make_shared(); + typedef SparseTableShard shard_type; + shard_type shard; uint64_t key = 1; - auto itr = shard->Find(key); - ASSERT_TRUE(itr == shard->end()); + auto itr = shard.find(key); + ASSERT_TRUE(itr == shard.end()); std::vector vec = {0.0, 0.1, 0.2, 0.3}; - auto* feature_value = shard->Init(key); - feature_value->resize(vec.size()); - memcpy(feature_value->data(), vec.data(), vec.size() * sizeof(float)); + auto& feature_value = shard[key]; + feature_value.resize(vec.size()); + memcpy(feature_value.data(), vec.data(), vec.size() * sizeof(float)); - itr = shard->Find(key); - ASSERT_TRUE(itr != shard->end()); + itr = shard.find(key); + ASSERT_TRUE(itr != shard.end()); - feature_value = itr->second; - float* value_data = feature_value->data(); + feature_value = itr.value(); + float* value_data = feature_value.data(); ASSERT_FLOAT_EQ(value_data[0], 0.0); ASSERT_FLOAT_EQ(value_data[1], 0.1);