From 630f5b891792effb60d1d6af0f64dbdd18629b18 Mon Sep 17 00:00:00 2001 From: wangguanqun Date: Mon, 28 Mar 2022 20:36:15 +0800 Subject: [PATCH] delete commonsparsetable and communicator from gpups (#40973) * trainer and worker * delete commonsparsetable from gpups * delete vlog * codestyle * delete communicator from gpups --- .../distributed/ps/service/ps_local_client.cc | 2 +- .../ps/table/depends/feature_value.h | 3 +- .../ps/table/memory_sparse_table.cc | 46 +++++++++++++++ paddle/fluid/distributed/ps/wrapper/fleet.cc | 16 +++-- paddle/fluid/framework/fleet/heter_context.h | 8 +-- .../framework/fleet/heter_ps/hashtable.h | 2 +- .../framework/fleet/heter_ps/hashtable_inl.h | 22 +++++-- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 27 +++++---- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 6 +- python/paddle/distributed/ps/the_one_ps.py | 6 +- .../ps/utils/ps_program_builder.py | 59 +++++++++++-------- 11 files changed, 136 insertions(+), 61 deletions(-) mode change 100644 => 100755 paddle/fluid/distributed/ps/service/ps_local_client.cc mode change 100644 => 100755 paddle/fluid/framework/fleet/heter_context.h mode change 100644 => 100755 paddle/fluid/framework/fleet/heter_ps/hashtable.h mode change 100644 => 100755 paddle/fluid/framework/fleet/ps_gpu_wrapper.cc mode change 100644 => 100755 paddle/fluid/framework/fleet/ps_gpu_wrapper.h diff --git a/paddle/fluid/distributed/ps/service/ps_local_client.cc b/paddle/fluid/distributed/ps/service/ps_local_client.cc old mode 100644 new mode 100755 index 9e364b6d3e..55519b4f62 --- a/paddle/fluid/distributed/ps/service/ps_local_client.cc +++ b/paddle/fluid/distributed/ps/service/ps_local_client.cc @@ -25,9 +25,9 @@ int32_t PsLocalClient::initialize() { for (size_t i = 0; i < downpour_param.downpour_table_param_size(); ++i) { auto* table = CREATE_PSCORE_CLASS( Table, downpour_param.downpour_table_param(i).table_class()); + table->set_shard(0, 1); table->initialize(downpour_param.downpour_table_param(i), _config.fs_client_param()); - table->set_shard(0, 1); _table_map[downpour_param.downpour_table_param(i).table_id()].reset(table); } return 0; diff --git a/paddle/fluid/distributed/ps/table/depends/feature_value.h b/paddle/fluid/distributed/ps/table/depends/feature_value.h index 7a83fdec1d..36dc34808b 100644 --- a/paddle/fluid/distributed/ps/table/depends/feature_value.h +++ b/paddle/fluid/distributed/ps/table/depends/feature_value.h @@ -56,7 +56,8 @@ struct alignas(64) SparseTableShard { return a.it != b.it; } const KEY& key() const { return it->first; } - VALUE& value() const { return *(VALUE*)(void*)it->second; } // NOLINT + VALUE& value() const { return *(VALUE*)(void*)it->second; } // NOLINT + VALUE* value_ptr() const { return (VALUE*)(void*)it->second; } // NOLINT iterator& operator++() { ++it; diff --git a/paddle/fluid/distributed/ps/table/memory_sparse_table.cc b/paddle/fluid/distributed/ps/table/memory_sparse_table.cc index 98454ca747..ea61ca444f 100644 --- a/paddle/fluid/distributed/ps/table/memory_sparse_table.cc +++ b/paddle/fluid/distributed/ps/table/memory_sparse_table.cc @@ -481,6 +481,52 @@ int32_t MemorySparseTable::pull_sparse(float* pull_values, int32_t MemorySparseTable::pull_sparse_ptr(char** pull_values, const uint64_t* keys, size_t num) { + CostTimer timer("pscore_sparse_select_all"); + size_t value_size = _value_accesor->size() / sizeof(float); + size_t mf_value_size = _value_accesor->mf_size() / sizeof(float); + + std::vector> tasks(_real_local_shard_num); + std::vector>> task_keys( + _real_local_shard_num); + for (size_t i = 0; i < num; ++i) { + int shard_id = (keys[i] % _sparse_table_shard_num) % _avg_local_shard_num; + task_keys[shard_id].push_back({keys[i], i}); + } + // std::atomic missed_keys{0}; + for (size_t shard_id = 0; shard_id < _real_local_shard_num; ++shard_id) { + tasks[shard_id] = + _shards_task_pool[shard_id % _shards_task_pool.size()]->enqueue( + [this, shard_id, &task_keys, pull_values, value_size, + mf_value_size]() -> int { + auto& keys = task_keys[shard_id]; + auto& local_shard = _local_shards[shard_id]; + float data_buffer[value_size]; + float* data_buffer_ptr = data_buffer; + for (int i = 0; i < keys.size(); ++i) { + uint64_t key = keys[i].first; + auto itr = local_shard.find(key); + size_t data_size = value_size - mf_value_size; + FixedFeatureValue* ret = NULL; + if (itr == local_shard.end()) { + // ++missed_keys; + auto& feature_value = local_shard[key]; + feature_value.resize(data_size); + float* data_ptr = feature_value.data(); + _value_accesor->create(&data_buffer_ptr, 1); + memcpy(data_ptr, data_buffer_ptr, data_size * sizeof(float)); + ret = &feature_value; + } else { + ret = itr.value_ptr(); + } + int pull_data_idx = keys[i].second; + pull_values[pull_data_idx] = (char*)ret; + } + return 0; + }); + } + for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { + tasks[shard_id].wait(); + } return 0; } diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.cc b/paddle/fluid/distributed/ps/wrapper/fleet.cc index 22c8495c5e..8a129c6cc5 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.cc +++ b/paddle/fluid/distributed/ps/wrapper/fleet.cc @@ -379,9 +379,11 @@ void FleetWrapper::PullDenseVarsSync( for (auto& t : var_names) { Variable* var = scope.FindVar(t); LoDTensor* tensor = var->GetMutable(); - float* w = tensor->data(); - paddle::distributed::Region reg(w, tensor->numel()); - regions.emplace_back(std::move(reg)); + if (!platform::is_gpu_place(tensor->place())) { + float* w = tensor->data(); + paddle::distributed::Region reg(w, tensor->numel()); + regions.emplace_back(std::move(reg)); + } } auto status = worker_ptr_->pull_dense(regions.data(), regions.size(), tid); status.wait(); @@ -396,9 +398,11 @@ void FleetWrapper::PushDenseParamSync( Variable* var = scope.FindVar(t); CHECK(var != nullptr) << "var[" << t << "] not found"; LoDTensor* tensor = var->GetMutable(); - float* g = tensor->mutable_data(place); - paddle::distributed::Region reg(g, tensor->numel()); - regions.emplace_back(std::move(reg)); + if (!platform::is_gpu_place(tensor->place())) { + float* g = tensor->mutable_data(place); + paddle::distributed::Region reg(g, tensor->numel()); + regions.emplace_back(std::move(reg)); + } } auto push_status = worker_ptr_->push_dense_param(regions.data(), regions.size(), table_id); diff --git a/paddle/fluid/framework/fleet/heter_context.h b/paddle/fluid/framework/fleet/heter_context.h old mode 100644 new mode 100755 index a88ffbe3d9..8e51f0e240 --- a/paddle/fluid/framework/fleet/heter_context.h +++ b/paddle/fluid/framework/fleet/heter_context.h @@ -26,7 +26,7 @@ limitations under the License. */ #endif #ifdef PADDLE_WITH_PSCORE -#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h" +#include "paddle/fluid/distributed/ps/table/depends/feature_value.h" #endif #include "paddle/fluid/distributed/ps/thirdparty/round_robin.h" @@ -65,10 +65,10 @@ class HeterContext { device_dim_ptr_; #endif #ifdef PADDLE_WITH_PSCORE - std::vector> value_ptr_; - std::vector>> + std::vector> value_ptr_; + std::vector>> value_dim_ptr_; - std::vector>> + std::vector>> device_dim_ptr_; #endif std::vector> device_values_; diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable.h b/paddle/fluid/framework/fleet/heter_ps/hashtable.h old mode 100644 new mode 100755 index fd0396c18a..e8eb91f6f6 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable.h +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable.h @@ -21,7 +21,7 @@ limitations under the License. */ #include "common_value.h" // NOLINT #endif #ifdef PADDLE_WITH_PSCORE -#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h" +#include "paddle/fluid/distributed/ps/table/depends/feature_value.h" #endif #include "paddle/phi/core/utils/rw_lock.h" #include "thrust/pair.h" diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h b/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h index 59220fc9cd..0297e71c35 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h @@ -224,10 +224,24 @@ void HashTable::dump_to_cpu(int devid, cudaStream_t stream) { } #endif #ifdef PADDLE_WITH_PSCORE - auto* downpour_value = (paddle::distributed::VALUE*)(gpu_val.cpu_ptr); - downpour_value->count_ = gpu_val.show; - for (int x = 0; x < gpu_val.mf_size; x++) { - downpour_value->data_[x] = gpu_val.mf[x]; + auto* downpour_value = + (paddle::distributed::FixedFeatureValue*)(gpu_val.cpu_ptr); + int downpour_value_size = downpour_value->size(); + if (gpu_val.mf_size > 0 && downpour_value_size == 7) { + downpour_value->resize(gpu_val.mf_size + downpour_value_size); + } + float* cpu_val = downpour_value->data(); + // cpu_val[0] = 0; + cpu_val[2] = gpu_val.delta_score; + cpu_val[3] = gpu_val.show; + cpu_val[4] = gpu_val.clk; + cpu_val[5] = gpu_val.lr; + cpu_val[6] = gpu_val.lr_g2sum; + cpu_val[0] = gpu_val.slot; + if (gpu_val.mf_size > 0) { + for (int x = 0; x < gpu_val.mf_size; x++) { + cpu_val[x + 7] = gpu_val.mf[x]; + } } #endif } diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc old mode 100644 new mode 100755 index baf8144131..72f998a772 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -286,7 +286,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { auto fleet_ptr = FleetWrapper::GetInstance(); #endif #ifdef PADDLE_WITH_PSCORE - auto fleet_ptr = paddle::distributed::Communicator::GetInstance(); + auto fleet_ptr = paddle::distributed::FleetWrapper::GetInstance(); #endif #if (defined PADDLE_WITH_PSLIB) && (defined PADDLE_WITH_HETERPS) @@ -343,7 +343,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { #ifdef PADDLE_WITH_PSCORE int32_t cnt = 0; while (true) { - auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr( + auto tt = fleet_ptr->worker_ptr_->pull_sparse_ptr( reinterpret_cast(local_ptr[i].data()), this->table_id_, local_keys[i].data(), key_size); bool flag = true; @@ -506,7 +506,8 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { #endif #ifdef PADDLE_WITH_PSCORE - std::vector> task_ptrs(device_num); + std::vector> task_ptrs( + device_num); #endif for (size_t j = 0; j < local_keys[i].size(); j++) { @@ -569,21 +570,21 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { #ifdef PADDLE_WITH_PSCORE for (int j = 0; j < len; ++j) { device_keys[dev][cur + j] = task_keys[dev][j]; - distributed::VALUE* ptr_val = task_ptrs[dev][j]; + float* ptr_val = task_ptrs[dev][j]->data(); FeatureValue& val = device_vals[dev][cur + j]; - bool has_mf = 1; - val.delta_score = 0; - val.show = ptr_val->count_; - val.clk = 0; - val.slot = 0; - val.lr = 0; - val.lr_g2sum = 0; + size_t dim = task_ptrs[dev][j]->size(); + val.delta_score = ptr_val[2]; + val.show = ptr_val[3]; + val.clk = ptr_val[4]; + val.slot = ptr_val[0]; + val.lr = ptr_val[5]; + val.lr_g2sum = ptr_val[6]; val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]); - if (has_mf) { + if (dim > 7) { val.mf_size = MF_DIM + 1; for (int x = 0; x < val.mf_size; x++) { - val.mf[x] = ptr_val->data_[x]; + val.mf[x] = ptr_val[x + 7]; } } else { val.mf_size = 0; diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h old mode 100644 new mode 100755 index eb7bd6da1e..d9d29cc072 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -43,7 +43,7 @@ limitations under the License. */ #include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN #include "paddle/fluid/platform/place.h" #ifdef PADDLE_WITH_PSCORE -#include "paddle/fluid/distributed/ps/service/communicator/communicator.h" +#include "paddle/fluid/distributed/ps/wrapper/fleet.h" #endif #ifdef PADDLE_WITH_PSLIB #include "afs_api.h" @@ -177,10 +177,8 @@ class PSGPUWrapper { current_task_ = nullptr; gpu_free_channel_->Put(current_task_); - table_id_ = 1; -#ifdef PADDLE_WITH_PSLIB table_id_ = 0; -#endif + // start build cpu&gpu ps thread start_build_thread(); } diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index b9bd4c3074..123fa77307 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -134,7 +134,10 @@ class Accessor: if not accessor_proto.HasField("accessor_class"): # DownpourSparseValueAccessor - accessor_proto.accessor_class = "SparseAccessor" + if context['use_ps_gpu']: + accessor_proto.accessor_class = "CtrCommonAccessor" + else: + accessor_proto.accessor_class = "SparseAccessor" if not accessor_proto.HasField("fea_dim"): if accessor_proto.accessor_class == "SparseAccessor": accessor_proto.fea_dim = embedding_dim + 2 @@ -1010,6 +1013,7 @@ class TheOnePSRuntime(RuntimeBase): self._init_params(scopes, send_ctx, dense_map) fleet.util.barrier() + self._pull_all_dense(scopes, send_ctx, dense_map) fleet.util.barrier() diff --git a/python/paddle/distributed/ps/utils/ps_program_builder.py b/python/paddle/distributed/ps/utils/ps_program_builder.py index b81c80bbce..f1d6a1f04a 100755 --- a/python/paddle/distributed/ps/utils/ps_program_builder.py +++ b/python/paddle/distributed/ps/utils/ps_program_builder.py @@ -40,32 +40,8 @@ class PsProgramBuilder(object): def _build_trainer_desc(self): opt_info = self.loss.block.program._fleet_opt opt_info = {} if opt_info is None else opt_info - opt_info["trainer"] = opt_info.get("trainer", "DistMultiTrainer") - opt_info["device_worker"] = opt_info.get("device_worker", - "DownpourLite") - pid = str(id(self.cloned_main)) - program_configs = { - pid: { - 'pull_dense': [], - 'push_dense': [], - 'pull_sparse': [], - 'push_sparse': [] - } - } - dense_table_config = {} - send_ctx = get_the_one_send_context(self.attrs) - recv_ctx = get_the_one_recv_context(self.attrs) - for name, ctx in send_ctx.items(): - if ctx.program_id() != id(self.loss.block.program): - continue - if ctx.is_sparse(): - continue - if not ctx.is_tensor_table(): - program_configs[pid]['pull_dense'].append(ctx.table_id()) - program_configs[pid]['push_dense'].append(ctx.table_id()) - dense_table_config[ctx.table_id()] = recv_ctx[ctx.table_id()] - opt_info['program_configs'] = program_configs - opt_info['dense_table_config'] = dense_table_config + opt_info["trainer"] = opt_info.get("trainer", "MultiTrainer") + opt_info["device_worker"] = opt_info.get("device_worker", "Hogwild") self.cloned_main._fleet_opt = opt_info def _optimize_programs(self): @@ -188,6 +164,37 @@ class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder): logger.info("start building cpu-async-ps program") super(CpuAsyncPsProgramBuilder, self).__init__(pass_ctx) + def _build_trainer_desc(self): + opt_info = self.loss.block.program._fleet_opt + opt_info = {} if opt_info is None else opt_info + opt_info["trainer"] = opt_info.get("trainer", "DistMultiTrainer") + opt_info["device_worker"] = opt_info.get("device_worker", + "DownpourLite") + pid = str(id(self.cloned_main)) + program_configs = { + pid: { + 'pull_dense': [], + 'push_dense': [], + 'pull_sparse': [], + 'push_sparse': [] + } + } + dense_table_config = {} + send_ctx = get_the_one_send_context(self.attrs) + recv_ctx = get_the_one_recv_context(self.attrs) + for name, ctx in send_ctx.items(): + if ctx.program_id() != id(self.loss.block.program): + continue + if ctx.is_sparse(): + continue + if not ctx.is_tensor_table(): + program_configs[pid]['pull_dense'].append(ctx.table_id()) + program_configs[pid]['push_dense'].append(ctx.table_id()) + dense_table_config[ctx.table_id()] = recv_ctx[ctx.table_id()] + opt_info['program_configs'] = program_configs + opt_info['dense_table_config'] = dense_table_config + self.cloned_main._fleet_opt = opt_info + class GpuPsProgramBuilder(PsProgramBuilder): def __init__(self, pass_ctx): -- GitLab