From 74626f662d7fb475266b9b7052fdf1fbb49544db Mon Sep 17 00:00:00 2001 From: zhaocaibei123 <48509226+zhaocaibei123@users.noreply.github.com> Date: Mon, 11 Apr 2022 15:51:37 +0800 Subject: [PATCH] Unittest recover (#41431) (#41590) * update name * update name * fix test * fix fleet bind * update name * update name * fix test * fix gpups wrapper * remove Push/Pull/Load/Save with context in client and wrapper base class * fix * fix * remove some interface * fix * remove * code style * recover * fix * remove code unused * remove some unused table & accessor & CommonDenseTable => MemoryDenseTable * fix * fix * fix * recover * remove unused code * recover unittest * fix * remove * fix * remove code unuseful * remove * fix * recover * remove Co-authored-by: esythan Co-authored-by: esythan --- .../common/sparse_sharding_merge.h | 310 --------------- paddle/fluid/distributed/ps/README.md | 38 +- .../ps/table/common_sparse_table.h | 203 ---------- .../fluid/distributed/ps/table/ctr_accessor.h | 1 + .../ps/table/depends/large_scale_kv.h | 353 ------------------ paddle/fluid/framework/dist_multi_trainer.cc | 3 + paddle/fluid/pybind/fleet_py.cc | 8 - paddle/fluid/pybind/fleet_py.h | 1 - paddle/fluid/pybind/pybind.cc | 1 - python/paddle/distributed/ps/utils/public.py | 2 +- .../tests/unittests/test_dist_fleet_ctr.py | 15 +- .../tests/unittests/test_dist_fleet_ctr2.py | 10 +- 12 files changed, 52 insertions(+), 893 deletions(-) delete mode 100644 paddle/fluid/distributed/common/sparse_sharding_merge.h delete mode 100644 paddle/fluid/distributed/ps/table/common_sparse_table.h delete mode 100644 paddle/fluid/distributed/ps/table/depends/large_scale_kv.h diff --git a/paddle/fluid/distributed/common/sparse_sharding_merge.h b/paddle/fluid/distributed/common/sparse_sharding_merge.h deleted file mode 100644 index 147403d08e6..00000000000 --- a/paddle/fluid/distributed/common/sparse_sharding_merge.h +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#pragma once -#include - -#include -#include -#include -#include // NOLINT -#include - -#include -#include "glog/logging.h" -#include "paddle/fluid/distributed/common/utils.h" -#include "paddle/fluid/framework/blocking_queue.h" -#include "paddle/fluid/framework/framework.pb.h" -#include "paddle/fluid/framework/tensor.h" -#include "paddle/fluid/framework/tensor_util.h" -#include "paddle/fluid/string/split.h" -#include "paddle/phi/core/utils/dim.h" - -constexpr int FG = 256 * 1024 * 1024; -constexpr int Q_SIZE = 10000; -constexpr int BUCKET = 10; -constexpr char XEOF[] = "EOF"; - -inline double GetCurrentUS() { - struct timeval time; - gettimeofday(&time, NULL); - return 1e+6 * time.tv_sec + time.tv_usec; -} - -namespace paddle { -namespace distributed { - -class ShardingMerge { - public: - ShardingMerge() {} - ~ShardingMerge() {} - - void Merge(const std::vector &inputs, - const std::vector &feasigns, const std::string &output, - const int embedding_dim) { - pool_.reset(new ::ThreadPool(inputs.size())); - - std::vector> tasks(inputs.size()); - std::vector> rows; - rows.resize(inputs.size()); - - auto begin = GetCurrentUS(); - for (int x = 0; x < inputs.size(); ++x) { - tasks[x] = pool_->enqueue([this, x, &rows, &inputs, &feasigns]() -> int { - DeserializeRowsFromFile(inputs[x], feasigns[x], &rows[x]); - return 0; - }); - } - - for (size_t x = 0; x < tasks.size(); ++x) { - tasks[x].wait(); - } - - int64_t total_rows = 0; - for (auto x = 0; x < rows.size(); x++) { - total_rows += rows[x].size(); - } - - auto end = GetCurrentUS(); - - VLOG(0) << "got " << total_rows - << " feasigin ids from sparse embedding using " << end - begin; - - std::vector total_dims = {total_rows, - static_cast(embedding_dim)}; - - std::vector> batch_buckets; - batch_buckets.resize(inputs.size()); - - for (int x = 0; x < rows.size(); ++x) { - batch_buckets[x] = bucket(rows[x].size(), BUCKET); - } - - std::ofstream out(output, std::ios::binary); - - begin = GetCurrentUS(); - SerializeRowsToStream(out, rows, batch_buckets, total_rows); - end = GetCurrentUS(); - VLOG(0) << "write rows to oostrream using " << end - begin; - - begin = GetCurrentUS(); - SerializePreTensorToStream(out, total_dims); - end = GetCurrentUS(); - VLOG(0) << "write pretensor to oostrream using " << end - begin; - - begin = GetCurrentUS(); - SerializeValueToStream(out, inputs, batch_buckets, embedding_dim); - end = GetCurrentUS(); - VLOG(0) << "write values to oostrream using " << end - begin; - } - - private: - void SerializeRowsToStream(std::ostream &os, - const std::vector> &rows, - const std::vector> &batch_buckets, - int64_t total_rows) { - { // the 1st field, uint32_t version - constexpr uint32_t version = 0; - os.write(reinterpret_cast(&version), sizeof(version)); - } - - { - // the 2st field, rows information - os.write(reinterpret_cast(&total_rows), sizeof(total_rows)); - - for (int b = 0; b < BUCKET; ++b) { - for (int x = 0; x < batch_buckets.size(); ++x) { - auto begin = batch_buckets[x][b]; - auto end = batch_buckets[x][b + 1]; - - if (end - begin == 0) continue; - - os.write(reinterpret_cast(rows[x].data() + begin), - sizeof(int64_t) * (end - begin)); - } - } - - // the 3st field, the height of SelectedRows - int64_t height = total_rows; - os.write(reinterpret_cast(&height), sizeof(height)); - } - } - - void SerializePreTensorToStream(std::ostream &os, - const std::vector &dims) { - { // the 1st field, uint32_t version - constexpr uint32_t version = 0; - os.write(reinterpret_cast(&version), sizeof(version)); - } - { // the 2nd field, tensor description - // int32_t size - framework::proto::VarType::TensorDesc desc; - desc.set_data_type(framework::proto::VarType::FP32); - auto *pb_dims = desc.mutable_dims(); - pb_dims->Resize(static_cast(dims.size()), 0); - std::copy(dims.begin(), dims.end(), pb_dims->begin()); - int32_t size = desc.ByteSize(); - os.write(reinterpret_cast(&size), sizeof(size)); - auto out = desc.SerializeAsString(); - os.write(out.data(), size); - } - } - - void SerializeValueToVec(std::ifstream &in, const int batch, - const int embedding_dim, std::vector *out) { - auto queue = - std::make_shared>>(); - - auto read = [batch, &in, &queue]() { - std::string line; - std::vector columns; - std::vector values_str; - - int count = 0; - - while (std::getline(in, line)) { - ++count; - columns = string::Split(line, '\t'); - - if (columns.size() != 5) { - VLOG(0) << "unexpected line: " << line << ", skip it"; - continue; - } - - values_str = string::Split(columns[4], ','); - queue->Push(values_str); - - if (count >= batch) { - break; - } - } - queue->Push({}); - }; - - auto write = [embedding_dim, &out, &queue]() { - std::vector values_str; - std::string line; - - while (true) { - queue->Pop(&values_str); - - if (values_str.size() == 0) { - break; - } - - for (int x = 0; x < embedding_dim; ++x) { - float v = 0.0; - try { - v = std::stof(values_str[x]); - } catch (std::invalid_argument &e) { - VLOG(0) << " get unexpected line: " << line; - } catch (std::out_of_range &e) { - VLOG(0) << " get unexpected line: " << line; - } - out->push_back(v); - } - } - }; - - std::thread p_read(read); - std::thread p_write(write); - p_read.join(); - p_write.join(); - } - - void SerializeVecToStream(std::ostream &out, - const std::vector &value) { - out.write(reinterpret_cast(value.data()), - static_cast(sizeof(float) * value.size())); - } - - void SerializeValueToStream( - std::ostream &out, const std::vector &ins, - const std::vector> &batch_buckets, - const int embedding_dim) { - std::vector> in_streams; - - for (int x = 0; x < ins.size(); ++x) { - in_streams.emplace_back(std::make_shared(ins[x])); - } - - std::vector> tasks(ins.size()); - - for (int b = 0; b < BUCKET; ++b) { - std::vector> values; - values.resize(tasks.size()); - - auto begin = GetCurrentUS(); - - for (int x = 0; x < tasks.size(); ++x) { - auto batch = batch_buckets[x][b + 1] - batch_buckets[x][b]; - values[x].clear(); - values[x].reserve(batch * embedding_dim); - } - - for (int x = 0; x < tasks.size(); ++x) { - tasks[x] = - pool_->enqueue([this, b, x, &out, &in_streams, &batch_buckets, - &values, embedding_dim]() -> int { - auto batch = batch_buckets[x][b + 1] - batch_buckets[x][b]; - if (batch == 0) return 0; - SerializeValueToVec(*(in_streams[x].get()), batch, embedding_dim, - &values[x]); - return 0; - }); - } - - for (size_t x = 0; x < tasks.size(); ++x) { - tasks[x].wait(); - } - - auto end = GetCurrentUS(); - - auto begin1 = GetCurrentUS(); - for (size_t x = 0; x < tasks.size(); ++x) { - SerializeVecToStream(out, values[x]); - } - auto end1 = GetCurrentUS(); - - VLOG(0) << "serialize buckets " << b << " read using " << end - begin - << ", to oostream using " << end1 - begin1; - } - } - - void DeserializeRowsFromFile(const std::string &input_file, - const int64_t feasigns, - std::vector *rows) { - std::string line; - std::vector columns; - std::ifstream file(input_file); - - rows->reserve(feasigns); - - while (std::getline(file, line)) { - columns = string::Split(line, '\t'); - if (columns.size() != 5) { - VLOG(0) << "unexpected line: " << line << ", skip it"; - continue; - } - rows->push_back(std::stoull(columns[0])); - } - - VLOG(0) << "parse " << rows->size() << " embedding rows from " - << input_file; - } - - private: - std::unique_ptr<::ThreadPool> pool_; -}; -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/ps/README.md b/paddle/fluid/distributed/ps/README.md index d287dcd1111..afa6d60a4e0 100755 --- a/paddle/fluid/distributed/ps/README.md +++ b/paddle/fluid/distributed/ps/README.md @@ -1,3 +1,39 @@ # 目录说明 -> 成型之后,上级目录的 table、thirdparty、table、service 目录可以删除,communicator_common.h 、fleet.cc、fleet.h 删除 +Table: for param storage and update +-----MemorySparseTable: table for sparse param, used in cpu async mode +-----MemoryDenseTable: table for dense param, used in cpu async/geo mode +-----MemorySparseGeoTable: table for sparse param, used in cpu async mode +-----CommonGraphTable: table used for graph learning +-----BarrierTable: table for barrier function, used in cpu sync mode +-----TensorTable: table which run program, used for learning rate decay only + +ValueAccessor: for pull param and push gradient +-----CtrCommonAccessor: pull/push value with show/click, float type +-----DownpourCtrDoubleAccessor: same as CtrCommonAccessor, other than show/click with double type +-----SparseAccessor: used for common embedding, pull value without show/click, push value with show/click +-----CommMergeAccessor: used for dense table only, for get param dim + +PsService(proto): for server to handle request +-----PsBaseService +----------BrpcPsService: for cpu dnn training task +----------GraphBrpcService: for graph learning +-----HeterService: for dnn training task with heterogeneous computing resources + +PSServer: recv request from trainer and handle it by service +-----BrpcPsServer: for cpu dnn training task +-----GraphBrpcServer: for graph learning +-----PsLocalServer: for GpuPS + +HeterServer: for HeterPS + +PSClient: pull param and push gradient for trainer +-----BrpcPsClient: for cpu dnn training task +----------GraphBrpcClient: for graph learning +-----PsLocalClient: for GpuPS + +HeterClient: for HeterPS + +PSCore: Wrapper for InitServer + +GraphPyService: for graph learning diff --git a/paddle/fluid/distributed/ps/table/common_sparse_table.h b/paddle/fluid/distributed/ps/table/common_sparse_table.h deleted file mode 100644 index 2673e8dfae3..00000000000 --- a/paddle/fluid/distributed/ps/table/common_sparse_table.h +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include // NOLINT -#include -#include -#include -#include -#include "Eigen/Dense" -#include "paddle/fluid/distributed/ps/table/accessor.h" -#include "paddle/fluid/distributed/ps/table/common_table.h" -#include "paddle/fluid/distributed/ps/table/depends/initializers.h" -#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h" -#include "paddle/fluid/distributed/ps/table/depends/sparse.h" -#include "paddle/fluid/string/string_helper.h" -#include "paddle/phi/core/utils/rw_lock.h" - -#define PSERVER_SAVE_SUFFIX ".shard" - -namespace paddle { -namespace distributed { - -class SparseOptimizer; - -enum SaveMode { all, base, delta }; - -struct Meta { - std::string param; - int shard_id; - std::vector names; - std::vector dims; - uint64_t count; - std::unordered_map dims_map; - - explicit Meta(const std::string& metapath) { - std::ifstream file(metapath); - std::string line; - int num_lines = 0; - while (std::getline(file, line)) { - if (StartWith(line, "#")) { - continue; - } - auto pairs = paddle::string::split_string(line, "="); - PADDLE_ENFORCE_EQ( - pairs.size(), 2, - paddle::platform::errors::InvalidArgument( - "info in %s except k=v, but got %s", metapath, line)); - - if (pairs[0] == "param") { - param = pairs[1]; - } - if (pairs[0] == "shard_id") { - shard_id = std::stoi(pairs[1]); - } - if (pairs[0] == "row_names") { - names = paddle::string::split_string(pairs[1], ","); - } - if (pairs[0] == "row_dims") { - auto dims_strs = - paddle::string::split_string(pairs[1], ","); - for (auto& str : dims_strs) { - dims.push_back(std::stoi(str)); - } - } - if (pairs[0] == "count") { - count = std::stoull(pairs[1]); - } - } - for (int x = 0; x < names.size(); ++x) { - dims_map[names[x]] = dims[x]; - } - } - - Meta(std::string param, int shard_id, std::vector row_names, - std::vector dims, uint64_t count) { - this->param = param; - this->shard_id = shard_id; - this->names = row_names; - this->dims = dims; - this->count = count; - } - - std::string ToString() { - std::stringstream ss; - ss << "param=" << param << "\n"; - ss << "shard_id=" << shard_id << "\n"; - ss << "row_names=" << paddle::string::join_strings(names, ',') << "\n"; - ss << "row_dims=" << paddle::string::join_strings(dims, ',') << "\n"; - ss << "count=" << count << "\n"; - return ss.str(); - } -}; - -class CommonSparseTable : public Table { - public: - CommonSparseTable() { rwlock_.reset(new phi::RWLock); } - virtual ~CommonSparseTable() {} - - // unused method begin - // virtual int32_t PullDense(float* pull_values, size_t num) { return 0; } - // virtual int32_t PushDenseParam(const float* values, size_t num) { return - // 0; } - // virtual int32_t PushDense(const float* values, size_t num) { return 0; } - // unused method end - - virtual int32_t Pull(TableContext& context); - virtual int32_t Push(TableContext& context); - - virtual int32_t Initialize(); - virtual int32_t InitializeShard() { return 0; } - virtual int32_t InitializeValue(); - virtual int32_t InitializeOptimizer(); - virtual int32_t InitializeRecorder(); - - virtual int32_t Load(const std::string& path, const std::string& param); - - virtual int32_t Save(const std::string& path, const std::string& param); - - void SaveMetaToText(std::ostream* os, const CommonAccessorParameter& common, - const size_t shard_idx, const int64_t total); - - int64_t SaveValueToText(std::ostream* os, std::shared_ptr block, - std::shared_ptr<::ThreadPool> pool, const int mode, - int shard_id); - - virtual void ProcessALine(const std::vector& columns, - const Meta& meta, const int64_t id, - std::vector>* values); - - virtual int64_t LoadFromText( - const std::string& valuepath, const std::string& metapath, - const int pserver_id, const int pserver_num, const int local_shard_num, - std::vector>* blocks); - - virtual std::pair PrintTableStat(); - virtual int32_t PullSparse(float* values, const PullSparseValue& pull_value); - - virtual int32_t PullSparsePtr(char** pull_values, const uint64_t* keys, - size_t num); - - virtual int32_t PushSparse(const uint64_t* keys, const float* values, - size_t num); - - virtual int32_t PushSparse(const uint64_t* keys, const float** values, - size_t num); - - // only for sparse geo table - virtual int32_t PushSparseParam(const uint64_t* keys, const float* values, - size_t num); - virtual int32_t SetGlobalLR(float* lr); - - virtual int32_t Pour(); - virtual int32_t Flush(); - virtual int32_t Shrink(const std::string& param); - virtual void Clear(); - - virtual void* GetShard(size_t shard_idx) { return 0; } - - protected: - virtual int32_t _PushSparse(const uint64_t* keys, const float* values, - size_t num); - virtual int32_t _PushSparse(const uint64_t* keys, const float** values, - size_t num); - - protected: - const int task_pool_size_ = 11; - std::vector> _shards_task_pool; - - bool sync = false; - int param_dim_ = 0; - int param_offset_ = 0; - - std::unordered_map value_idx_; - std::vector value_names_; - std::vector value_dims_; - std::vector value_offsets_; - std::vector initializer_attrs_; - - std::shared_ptr optimizer_; - std::vector> shard_values_; - std::unordered_map> pull_reservoir_; - std::unique_ptr rwlock_{nullptr}; -}; - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/ps/table/ctr_accessor.h b/paddle/fluid/distributed/ps/table/ctr_accessor.h index b8895e74d1d..a599bfca7f6 100644 --- a/paddle/fluid/distributed/ps/table/ctr_accessor.h +++ b/paddle/fluid/distributed/ps/table/ctr_accessor.h @@ -186,6 +186,7 @@ class CtrCommonAccessor : public ValueAccessor { // CtrCommonFeatureValue common_feature_value; float _show_click_decay_rate; int32_t _ssd_unseenday_threshold; + bool _show_scale = false; public: // TODO(zhaocaibei123): it should be private, but we make it public // for unit test diff --git a/paddle/fluid/distributed/ps/table/depends/large_scale_kv.h b/paddle/fluid/distributed/ps/table/depends/large_scale_kv.h deleted file mode 100644 index 68c80ad737e..00000000000 --- a/paddle/fluid/distributed/ps/table/depends/large_scale_kv.h +++ /dev/null @@ -1,353 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include // NOLINT -#include -#include -#include // NOLINT -#include -#include -#include -#include -#include "gflags/gflags.h" - -#include "butil/object_pool.h" -#include "paddle/fluid/distributed/common/utils.h" -#include "paddle/fluid/distributed/ps/table/depends/initializers.h" -#include "paddle/fluid/distributed/ps/thirdparty/round_robin.h" -#include "paddle/fluid/framework/generator.h" -#include "paddle/fluid/framework/lod_tensor.h" -#include "paddle/fluid/framework/selected_rows_utils.h" -#include "paddle/fluid/framework/tensor.h" -#include "paddle/fluid/framework/threadpool.h" -#include "paddle/fluid/framework/variable.h" -#include "paddle/fluid/platform/device_context.h" -#include "paddle/fluid/platform/enforce.h" -#include "paddle/fluid/platform/place.h" -#include "paddle/fluid/string/printf.h" -#include "paddle/fluid/string/string_helper.h" -#include "paddle/phi/backends/dynload/port.h" -#include "paddle/phi/core/utils/rw_lock.h" - -namespace paddle { -namespace distributed { - -enum Mode { training, infer }; - -static const int SPARSE_SHARD_BUCKET_NUM_BITS = 6; -static const size_t SPARSE_SHARD_BUCKET_NUM = (size_t)1 - << SPARSE_SHARD_BUCKET_NUM_BITS; - -struct VALUE { - explicit VALUE(size_t length) - : length_(length), - count_(0), - unseen_days_(0), - need_save_(false), - is_entry_(false) { - data_.resize(length); - memset(data_.data(), 0, sizeof(float) * length); - } - - size_t length_; - std::vector data_; - int count_; - int unseen_days_; // use to check knock-out - bool need_save_; // whether need to save - bool is_entry_; // whether knock-in -}; - -inline bool count_entry(VALUE *value, int threshold) { - return value->count_ >= threshold; -} - -inline bool probility_entry(VALUE *value, float threshold) { - UniformInitializer uniform = UniformInitializer({"uniform", "0", "0", "1"}); - return uniform.GetValue() >= threshold; -} - -class ValueBlock { - public: - typedef typename robin_hood::unordered_map map_type; - explicit ValueBlock(const std::vector &value_names, - const std::vector &value_dims, - const std::vector &value_offsets, - const std::unordered_map &value_idx, - const std::vector &init_attrs, - const std::string &entry_attr) - : value_names_(value_names), - value_dims_(value_dims), - value_offsets_(value_offsets), - value_idx_(value_idx) { - for (size_t x = 0; x < value_dims.size(); ++x) { - value_length_ += value_dims[x]; - } - - // for Entry - { - auto slices = string::split_string(entry_attr, ":"); - if (slices[0] == "none") { - entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0); - threshold_ = 0; - } else if (slices[0] == "count_filter_entry") { - threshold_ = std::stoi(slices[1]); - entry_func_ = - std::bind(&count_entry, std::placeholders::_1, threshold_); - } else if (slices[0] == "probability_entry") { - threshold_ = std::stof(slices[1]); - entry_func_ = - std::bind(&probility_entry, std::placeholders::_1, threshold_); - } else { - PADDLE_THROW(platform::errors::InvalidArgument( - "Not supported Entry Type : %s, Only support [CountFilterEntry, " - "ProbabilityEntry]", - slices[0])); - } - } - - // for Initializer - { - for (auto &attr : init_attrs) { - auto slices = string::split_string(attr, "&"); - - if (slices[0] == "gaussian_random") { - initializers_.emplace_back( - std::make_shared(slices)); - } else if (slices[0] == "fill_constant") { - initializers_.emplace_back( - std::make_shared(slices)); - } else if (slices[0] == "uniform_random") { - initializers_.emplace_back( - std::make_shared(slices)); - } else if (slices[0] == "truncated_gaussian_random") { - initializers_.emplace_back( - std::make_shared(slices)); - } else { - PADDLE_THROW(platform::errors::InvalidArgument( - "%s can not be supported", attr)); - } - } - } - } - - ~ValueBlock() {} - - std::vector Get(const uint64_t &id, - const std::vector &value_names, - const std::vector &value_dims) { - auto pts = std::vector(); - pts.reserve(value_names.size()); - auto values = GetValue(id); - for (int i = 0; i < static_cast(value_names.size()); i++) { - PADDLE_ENFORCE_EQ( - value_dims[i], value_dims_[i], - platform::errors::InvalidArgument("value dims is not match")); - pts.push_back(values->data_.data() + - value_offsets_.at(value_idx_.at(value_names[i]))); - } - return pts; - } - - // pull - float *Init(const uint64_t &id, const bool with_update = true, - const int counter = 1) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - - auto &table = values_[bucket]; - auto res = table.find(id); - - VALUE *value = nullptr; - if (res == table.end()) { - value = butil::get_object(value_length_); - - table[id] = value; - - } else { - value = res->second; - } - - if (with_update) { - AttrUpdate(value, counter); - } - return value->data_.data(); - } - - VALUE *InitGet(const uint64_t &id, const bool with_update = true, - const int counter = 1) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - - auto &table = values_[bucket]; - auto res = table.find(id); - - VALUE *value = nullptr; - if (res == table.end()) { - value = butil::get_object(value_length_); - // value = _alloc.acquire(value_length_); - table[id] = value; - } else { - value = (VALUE *)(void *)(res->second); // NOLINT - } - return value; - } - - void AttrUpdate(VALUE *value, const int counter) { - // update state - value->unseen_days_ = 0; - value->count_ += counter; - - if (!value->is_entry_) { - value->is_entry_ = entry_func_(value); - if (value->is_entry_) { - // initialize - for (size_t x = 0; x < value_names_.size(); ++x) { - initializers_[x]->GetValue(value->data_.data() + value_offsets_[x], - value_dims_[x]); - } - value->need_save_ = true; - } - } else { - value->need_save_ = true; - } - - return; - } - - // dont jude if (has(id)) - float *Get(const uint64_t &id) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - // auto &value = table.at(id); - // return value->data_.data(); - auto res = table.find(id); - VALUE *value = res->second; - return value->data_.data(); - } - - // for load, to reset count, unseen_days - VALUE *GetValue(const uint64_t &id) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - - auto &table = values_[bucket]; - auto res = table.find(id); - return res->second; - } - - bool GetEntry(const uint64_t &id) { - auto value = GetValue(id); - return value->is_entry_; - } - - void SetEntry(const uint64_t &id, const bool state) { - auto value = GetValue(id); - value->is_entry_ = state; - } - - void erase(uint64_t feasign) { - size_t hash = _hasher(feasign); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - auto iter = table.find(feasign); - if (iter != table.end()) { - butil::return_object(iter->second); - iter = table.erase(iter); - } - } - - void Shrink(const int threshold) { - for (auto &table : values_) { - for (auto iter = table.begin(); iter != table.end();) { - // VALUE* value = (VALUE*)(void*)(iter->second); - VALUE *value = iter->second; - value->unseen_days_++; - if (value->unseen_days_ >= threshold) { - butil::return_object(iter->second); - // _alloc.release(iter->second); - // _alloc.release(value); - iter = table.erase(iter); - } else { - ++iter; - } - } - } - return; - } - - float GetThreshold() { return threshold_; } - size_t compute_bucket(size_t hash) { - if (SPARSE_SHARD_BUCKET_NUM == 1) { - return 0; - } else { - return hash >> (sizeof(size_t) * 8 - SPARSE_SHARD_BUCKET_NUM_BITS); - } - } - - map_type::iterator end() { - return values_[SPARSE_SHARD_BUCKET_NUM - 1].end(); - } - - map_type::iterator Find(uint64_t id) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - auto got = table.find(id); - if (got == table.end()) { - return end(); - } else { - return got; - } - } - - private: - bool Has(const uint64_t id) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - auto got = table.find(id); - if (got == table.end()) { - return false; - } else { - return true; - } - } - - public: - map_type values_[SPARSE_SHARD_BUCKET_NUM]; - size_t value_length_ = 0; - std::hash _hasher; - - private: - const std::vector &value_names_; - const std::vector &value_dims_; - const std::vector &value_offsets_; - const std::unordered_map &value_idx_; - - std::function entry_func_; - std::vector> initializers_; - float threshold_; -}; - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index d16469e265e..7f9aac4d3f1 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -117,6 +117,9 @@ void DistMultiTrainer::InitOtherEnv(const ProgramDesc &main_program) { InitDumpEnv(); } pull_dense_worker_->SetRootScope(root_scope_); +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_CUDA) + pull_dense_worker_->CreatePinVar(); +#endif pull_dense_worker_->Start(); #if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_PSCORE) for (int i = 0; i < thread_num_; ++i) { diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 330719762ae..8d830168952 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -28,7 +28,6 @@ limitations under the License. */ #include #include -#include "paddle/fluid/distributed/common/sparse_sharding_merge.h" #include "paddle/fluid/distributed/index_dataset/index_sampler.h" #include "paddle/fluid/distributed/index_dataset/index_wrapper.h" #include "paddle/fluid/distributed/ps/service/communicator/communicator.h" @@ -49,7 +48,6 @@ using paddle::distributed::GraphNode; using paddle::distributed::GraphPyServer; using paddle::distributed::GraphPyClient; using paddle::distributed::FeatureNode; -using paddle::distributed::ShardingMerge; namespace paddle { namespace pybind { @@ -93,12 +91,6 @@ void BindPSHost(py::module* m) { .def("to_string", &distributed::PSHost::ToString); } -void BindSparseShardingTools(py::module* m) { - py::class_(*m, "ShardingMerge") - .def(py::init<>()) - .def("merge", &ShardingMerge::Merge); -} - void BindCommunicatorContext(py::module* m) { py::class_(*m, "CommContext") .def( diff --git a/paddle/fluid/pybind/fleet_py.h b/paddle/fluid/pybind/fleet_py.h index 4dc0f002ad3..206a69f5a80 100644 --- a/paddle/fluid/pybind/fleet_py.h +++ b/paddle/fluid/pybind/fleet_py.h @@ -36,6 +36,5 @@ void BindIndexNode(py::module* m); void BindTreeIndex(py::module* m); void BindIndexWrapper(py::module* m); void BindIndexSampler(py::module* m); -void BindSparseShardingTools(py::module* m); } // namespace pybind } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 96d86ee1a31..92985b01adb 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -4504,7 +4504,6 @@ All parameter, weight, gradient are variables in Paddle. BindTreeIndex(&m); BindIndexWrapper(&m); BindIndexSampler(&m); - BindSparseShardingTools(&m); #endif } } // namespace pybind diff --git a/python/paddle/distributed/ps/utils/public.py b/python/paddle/distributed/ps/utils/public.py index b76484a3ebc..e7edc6fd859 100755 --- a/python/paddle/distributed/ps/utils/public.py +++ b/python/paddle/distributed/ps/utils/public.py @@ -58,7 +58,7 @@ DATA_NORM_GRAD_NAME = [x + "@GRAD" for x in DATA_NORM_NAME] def logger_config(log_path, logging_name): logger = logging.getLogger(logging_name) - logger.setLevel(level=logging.DEBUG) + logger.setLevel(level=logging.WARNING) handler = logging.FileHandler( log_path, mode='a', encoding='UTF-8', delay=True) handler.setLevel(logging.INFO) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py index 8ec3fecceb9..59d196fdf55 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py @@ -51,9 +51,8 @@ class TestDistMnistAsyncInMemoryDataset2x2(TestFleetBase): tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) def test_dist_train(self): - # self.check_with_place( - # "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) - print('recover later') + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) class TestDistMnistAsync2x2(TestFleetBase): @@ -86,9 +85,8 @@ class TestDistMnistAsync2x2(TestFleetBase): tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) def test_dist_train(self): - # self.check_with_place( - # "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) - print('recover later') + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) class TestDistCtrHalfAsync2x2(TestFleetBase): @@ -124,9 +122,8 @@ class TestDistCtrHalfAsync2x2(TestFleetBase): tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) def test_dist_train(self): - # self.check_with_place( - # "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) - print('recover later') + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr2.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr2.py index e5e486d7068..e73eff2acc9 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr2.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr2.py @@ -52,9 +52,8 @@ class TestDistMnistSync2x2(TestFleetBase): tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) def test_dist_train(self): - # self.check_with_place( - # "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) - print('recover later') + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) # @unittest.skip(reason="Skip unstable ut, reader need to be rewrite") @@ -92,9 +91,8 @@ class TestDistMnistAsyncDataset2x2(TestFleetBase): tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) def test_dist_train(self): - # self.check_with_place( - # "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) - print('recover later') + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) if __name__ == "__main__": -- GitLab