large_scale_kv.h 10.3 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <ThreadPool.h>
#include <functional>
#include <future>  // NOLINT
#include <memory>
#include <string>
#include <thread>  // NOLINT
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
27
#include "gflags/gflags.h"
T
tangwei12 已提交
28

T
Thunderbrook 已提交
29
#include "butil/object_pool.h"
T
tangwei12 已提交
30 31
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
32
#include "paddle/fluid/distributed/thirdparty/round_robin.h"
T
tangwei12 已提交
33 34 35
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/rw_lock.h"
36
#include "paddle/fluid/framework/selected_rows_utils.h"
T
tangwei12 已提交
37 38 39 40 41 42 43 44
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/string_helper.h"
45
#include "paddle/pten/backends/dynload/port.h"
T
tangwei12 已提交
46 47 48 49 50 51

namespace paddle {
namespace distributed {

enum Mode { training, infer };

T
Thunderbrook 已提交
52 53 54 55
static const int SPARSE_SHARD_BUCKET_NUM_BITS = 6;
static const size_t SPARSE_SHARD_BUCKET_NUM = (size_t)1
                                              << SPARSE_SHARD_BUCKET_NUM_BITS;

T
tangwei12 已提交
56
struct VALUE {
T
tangwei12 已提交
57 58
  explicit VALUE(size_t length)
      : length_(length),
59
        count_(0),
T
tangwei12 已提交
60
        unseen_days_(0),
61 62
        need_save_(false),
        is_entry_(false) {
T
Thunderbrook 已提交
63 64
    data_.resize(length);
    memset(data_.data(), 0, sizeof(float) * length);
T
tangwei12 已提交
65 66
  }

T
tangwei12 已提交
67
  size_t length_;
T
Thunderbrook 已提交
68
  std::vector<float> data_;
T
tangwei12 已提交
69
  int count_;
70 71 72
  int unseen_days_;  // use to check knock-out
  bool need_save_;   // whether need to save
  bool is_entry_;    // whether knock-in
T
tangwei12 已提交
73 74
};

75
inline bool count_entry(VALUE *value, int threshold) {
76 77 78
  return value->count_ >= threshold;
}

79
inline bool probility_entry(VALUE *value, float threshold) {
T
tangwei12 已提交
80
  UniformInitializer uniform = UniformInitializer({"uniform", "0", "0", "1"});
81 82 83
  return uniform.GetValue() >= threshold;
}

T
tangwei12 已提交
84 85
class ValueBlock {
 public:
T
Thunderbrook 已提交
86
  typedef typename robin_hood::unordered_map<uint64_t, VALUE *> map_type;
T
tangwei12 已提交
87 88 89 90 91 92 93 94 95 96
  explicit ValueBlock(const std::vector<std::string> &value_names,
                      const std::vector<int> &value_dims,
                      const std::vector<int> &value_offsets,
                      const std::unordered_map<std::string, int> &value_idx,
                      const std::vector<std::string> &init_attrs,
                      const std::string &entry_attr)
      : value_names_(value_names),
        value_dims_(value_dims),
        value_offsets_(value_offsets),
        value_idx_(value_idx) {
T
Thunderbrook 已提交
97
    for (size_t x = 0; x < value_dims.size(); ++x) {
T
tangwei12 已提交
98
      value_length_ += value_dims[x];
T
tangwei12 已提交
99 100
    }

T
tangwei12 已提交
101 102
    // for Entry
    {
T
tangwei12 已提交
103
      auto slices = string::split_string<std::string>(entry_attr, ":");
104 105
      if (slices[0] == "none") {
        entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0);
T
Thunderbrook 已提交
106
        threshold_ = 0;
T
tangwei12 已提交
107
      } else if (slices[0] == "count_filter_entry") {
T
Thunderbrook 已提交
108 109 110
        threshold_ = std::stoi(slices[1]);
        entry_func_ =
            std::bind(&count_entry, std::placeholders::_1, threshold_);
T
tangwei12 已提交
111
      } else if (slices[0] == "probability_entry") {
T
Thunderbrook 已提交
112
        threshold_ = std::stof(slices[1]);
T
tangwei12 已提交
113
        entry_func_ =
T
Thunderbrook 已提交
114
            std::bind(&probility_entry, std::placeholders::_1, threshold_);
T
tangwei12 已提交
115
      } else {
116
        PADDLE_THROW(platform::errors::InvalidArgument(
T
tangwei12 已提交
117 118
            "Not supported Entry Type : %s, Only support [CountFilterEntry, "
            "ProbabilityEntry]",
119
            slices[0]));
T
tangwei12 已提交
120 121
      }
    }
T
tangwei12 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136

    // for Initializer
    {
      for (auto &attr : init_attrs) {
        auto slices = string::split_string<std::string>(attr, "&");

        if (slices[0] == "gaussian_random") {
          initializers_.emplace_back(
              std::make_shared<GaussianInitializer>(slices));
        } else if (slices[0] == "fill_constant") {
          initializers_.emplace_back(
              std::make_shared<FillConstantInitializer>(slices));
        } else if (slices[0] == "uniform_random") {
          initializers_.emplace_back(
              std::make_shared<UniformInitializer>(slices));
C
Chengmo 已提交
137 138 139
        } else if (slices[0] == "truncated_gaussian_random") {
          initializers_.emplace_back(
              std::make_shared<TruncatedGaussianInitializer>(slices));
T
tangwei12 已提交
140 141 142 143 144 145
        } else {
          PADDLE_THROW(platform::errors::InvalidArgument(
              "%s can not be supported", attr));
        }
      }
    }
T
tangwei12 已提交
146 147 148 149
  }

  ~ValueBlock() {}

T
tangwei12 已提交
150
  std::vector<float *> Get(const uint64_t &id,
151 152
                           const std::vector<std::string> &value_names,
                           const std::vector<int> &value_dims) {
T
tangwei12 已提交
153 154
    auto pts = std::vector<float *>();
    pts.reserve(value_names.size());
T
Thunderbrook 已提交
155
    auto values = GetValue(id);
T
tangwei12 已提交
156
    for (int i = 0; i < static_cast<int>(value_names.size()); i++) {
157 158 159
      PADDLE_ENFORCE_EQ(
          value_dims[i], value_dims_[i],
          platform::errors::InvalidArgument("value dims is not match"));
T
Thunderbrook 已提交
160
      pts.push_back(values->data_.data() +
T
tangwei12 已提交
161
                    value_offsets_.at(value_idx_.at(value_names[i])));
T
tangwei12 已提交
162
    }
T
tangwei12 已提交
163
    return pts;
T
tangwei12 已提交
164 165
  }

166
  // pull
167 168
  float *Init(const uint64_t &id, const bool with_update = true,
              const int counter = 1) {
T
Thunderbrook 已提交
169 170
    size_t hash = _hasher(id);
    size_t bucket = compute_bucket(hash);
171

T
Thunderbrook 已提交
172 173
    auto &table = values_[bucket];
    auto res = table.find(id);
174

T
Thunderbrook 已提交
175 176 177 178 179 180 181 182
    VALUE *value = nullptr;
    if (res == table.end()) {
      value = butil::get_object<VALUE>(value_length_);

      table[id] = value;

    } else {
      value = res->second;
183
    }
184

T
Thunderbrook 已提交
185 186 187 188
    if (with_update) {
      AttrUpdate(value, counter);
    }
    return value->data_.data();
T
tangwei12 已提交
189 190
  }

T
Thunderbrook 已提交
191 192
  VALUE *InitGet(const uint64_t &id, const bool with_update = true,
                 const int counter = 1) {
T
Thunderbrook 已提交
193 194
    size_t hash = _hasher(id);
    size_t bucket = compute_bucket(hash);
T
Thunderbrook 已提交
195

T
Thunderbrook 已提交
196 197
    auto &table = values_[bucket];
    auto res = table.find(id);
T
Thunderbrook 已提交
198

T
Thunderbrook 已提交
199 200 201 202 203 204
    VALUE *value = nullptr;
    if (res == table.end()) {
      value = butil::get_object<VALUE>(value_length_);
      // value = _alloc.acquire(value_length_);
      table[id] = value;
    } else {
205
      value = (VALUE *)(void *)(res->second);  // NOLINT
T
Thunderbrook 已提交
206
    }
T
Thunderbrook 已提交
207
    return value;
T
Thunderbrook 已提交
208 209
  }

210
  void AttrUpdate(VALUE *value, const int counter) {
211 212
    // update state
    value->unseen_days_ = 0;
213
    value->count_ += counter;
214 215 216 217 218

    if (!value->is_entry_) {
      value->is_entry_ = entry_func_(value);
      if (value->is_entry_) {
        // initialize
T
Thunderbrook 已提交
219
        for (size_t x = 0; x < value_names_.size(); ++x) {
T
Thunderbrook 已提交
220
          initializers_[x]->GetValue(value->data_.data() + value_offsets_[x],
221 222
                                     value_dims_[x]);
        }
T
tangwei12 已提交
223
        value->need_save_ = true;
T
tangwei12 已提交
224
      }
T
tangwei12 已提交
225 226
    } else {
      value->need_save_ = true;
T
tangwei12 已提交
227
    }
228 229

    return;
T
tangwei12 已提交
230 231
  }

232 233
  // dont jude if (has(id))
  float *Get(const uint64_t &id) {
T
Thunderbrook 已提交
234 235 236 237 238 239 240 241 242
    size_t hash = _hasher(id);
    size_t bucket = compute_bucket(hash);
    auto &table = values_[bucket];

    // auto &value = table.at(id);
    // return value->data_.data();
    auto res = table.find(id);
    VALUE *value = res->second;
    return value->data_.data();
243 244 245
  }

  // for load, to reset count, unseen_days
T
Thunderbrook 已提交
246 247 248 249 250 251 252 253
  VALUE *GetValue(const uint64_t &id) {
    size_t hash = _hasher(id);
    size_t bucket = compute_bucket(hash);

    auto &table = values_[bucket];
    auto res = table.find(id);
    return res->second;
  }
254

T
tangwei12 已提交
255
  bool GetEntry(const uint64_t &id) {
T
Thunderbrook 已提交
256 257
    auto value = GetValue(id);
    return value->is_entry_;
T
tangwei12 已提交
258 259
  }

260
  void SetEntry(const uint64_t &id, const bool state) {
T
Thunderbrook 已提交
261 262
    auto value = GetValue(id);
    value->is_entry_ = state;
263
  }
T
tangwei12 已提交
264

T
Thunderbrook 已提交
265 266 267 268 269 270 271 272 273 274 275 276
  void erase(uint64_t feasign) {
    size_t hash = _hasher(feasign);
    size_t bucket = compute_bucket(hash);
    auto &table = values_[bucket];

    auto iter = table.find(feasign);
    if (iter != table.end()) {
      butil::return_object(iter->second);
      iter = table.erase(iter);
    }
  }

277
  void Shrink(const int threshold) {
T
Thunderbrook 已提交
278 279 280 281 282 283 284
    for (auto &table : values_) {
      for (auto iter = table.begin(); iter != table.end();) {
        // VALUE* value = (VALUE*)(void*)(iter->second);
        VALUE *value = iter->second;
        value->unseen_days_++;
        if (value->unseen_days_ >= threshold) {
          butil::return_object(iter->second);
285 286
          // _alloc.release(iter->second);
          // _alloc.release(value);
T
Thunderbrook 已提交
287 288 289 290
          iter = table.erase(iter);
        } else {
          ++iter;
        }
291
      }
T
tangwei12 已提交
292
    }
293
    return;
T
tangwei12 已提交
294 295
  }

T
Thunderbrook 已提交
296
  float GetThreshold() { return threshold_; }
T
Thunderbrook 已提交
297 298 299 300 301 302 303
  size_t compute_bucket(size_t hash) {
    if (SPARSE_SHARD_BUCKET_NUM == 1) {
      return 0;
    } else {
      return hash >> (sizeof(size_t) * 8 - SPARSE_SHARD_BUCKET_NUM_BITS);
    }
  }
T
Thunderbrook 已提交
304

T
Thunderbrook 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
  map_type::iterator end() {
    return values_[SPARSE_SHARD_BUCKET_NUM - 1].end();
  }

  map_type::iterator Find(uint64_t id) {
    size_t hash = _hasher(id);
    size_t bucket = compute_bucket(hash);
    auto &table = values_[bucket];

    auto got = table.find(id);
    if (got == table.end()) {
      return end();
    } else {
      return got;
    }
  }

T
tangwei12 已提交
322 323
 private:
  bool Has(const uint64_t id) {
T
Thunderbrook 已提交
324 325 326 327 328 329
    size_t hash = _hasher(id);
    size_t bucket = compute_bucket(hash);
    auto &table = values_[bucket];

    auto got = table.find(id);
    if (got == table.end()) {
T
tangwei12 已提交
330 331 332 333 334 335 336
      return false;
    } else {
      return true;
    }
  }

 public:
T
Thunderbrook 已提交
337
  map_type values_[SPARSE_SHARD_BUCKET_NUM];
T
tangwei12 已提交
338
  size_t value_length_ = 0;
T
Thunderbrook 已提交
339
  std::hash<uint64_t> _hasher;
T
tangwei12 已提交
340 341

 private:
T
tangwei12 已提交
342 343 344 345 346
  const std::vector<std::string> &value_names_;
  const std::vector<int> &value_dims_;
  const std::vector<int> &value_offsets_;
  const std::unordered_map<std::string, int> &value_idx_;

347
  std::function<bool(VALUE *)> entry_func_;
T
tangwei12 已提交
348
  std::vector<std::shared_ptr<Initializer>> initializers_;
T
Thunderbrook 已提交
349
  float threshold_;
T
tangwei12 已提交
350 351 352 353
};

}  // namespace distributed
}  // namespace paddle