large_scale_kv.h 8.7 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <ThreadPool.h>
#include <functional>
#include <future>  // NOLINT
#include <memory>
#include <string>
#include <thread>  // NOLINT
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
27
#include "gflags/gflags.h"
T
tangwei12 已提交
28 29 30

#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
31
#include "paddle/fluid/distributed/thirdparty/round_robin.h"
T
tangwei12 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/rw_lock.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/string_helper.h"

namespace paddle {
namespace distributed {

enum Mode { training, infer };

struct VALUE {
T
tangwei12 已提交
52 53
  explicit VALUE(size_t length)
      : length_(length),
54
        count_(0),
T
tangwei12 已提交
55
        unseen_days_(0),
56 57
        need_save_(false),
        is_entry_(false) {
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    data_ = new float[length];
    memset(data_, 0, sizeof(float) * length);
  }

  VALUE(const VALUE &value) {
    length_ = value.length_;
    count_ = value.count_;
    unseen_days_ = value.unseen_days_;
    need_save_ = value.need_save_;
    is_entry_ = value.is_entry_;
    data_ = new float[length_];
    memcpy(data_, value.data_, sizeof(float) * length_);
  }

  VALUE &operator=(const VALUE &value) {
    if (this != &value) {
      delete[] data_;
      length_ = value.length_;
      count_ = value.count_;
      unseen_days_ = value.unseen_days_;
      need_save_ = value.need_save_;
      is_entry_ = value.is_entry_;

      data_ = new float[length_];
      memcpy(data_, value.data_, sizeof(float) * length_);
    }
    return *this;
  }

  ~VALUE() {
    delete[] data_;
    data_ = nullptr;
T
tangwei12 已提交
90 91
  }

T
tangwei12 已提交
92
  size_t length_;
T
tangwei12 已提交
93
  int count_;
94 95 96
  int unseen_days_;  // use to check knock-out
  bool need_save_;   // whether need to save
  bool is_entry_;    // whether knock-in
97
  float *data_;
T
tangwei12 已提交
98 99
};

100
inline bool count_entry(VALUE *value, int threshold) {
101 102 103
  return value->count_ >= threshold;
}

104
inline bool probility_entry(VALUE *value, float threshold) {
T
tangwei12 已提交
105
  UniformInitializer uniform = UniformInitializer({"uniform", "0", "0", "1"});
106 107 108
  return uniform.GetValue() >= threshold;
}

T
tangwei12 已提交
109 110
class ValueBlock {
 public:
T
tangwei12 已提交
111 112 113 114 115 116 117 118 119 120
  explicit ValueBlock(const std::vector<std::string> &value_names,
                      const std::vector<int> &value_dims,
                      const std::vector<int> &value_offsets,
                      const std::unordered_map<std::string, int> &value_idx,
                      const std::vector<std::string> &init_attrs,
                      const std::string &entry_attr)
      : value_names_(value_names),
        value_dims_(value_dims),
        value_offsets_(value_offsets),
        value_idx_(value_idx) {
T
Thunderbrook 已提交
121
    for (size_t x = 0; x < value_dims.size(); ++x) {
T
tangwei12 已提交
122
      value_length_ += value_dims[x];
T
tangwei12 已提交
123 124
    }

T
tangwei12 已提交
125 126
    // for Entry
    {
T
tangwei12 已提交
127
      auto slices = string::split_string<std::string>(entry_attr, ":");
128 129
      if (slices[0] == "none") {
        entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0);
T
Thunderbrook 已提交
130
        threshold_ = 0;
T
tangwei12 已提交
131
      } else if (slices[0] == "count_filter_entry") {
T
Thunderbrook 已提交
132 133 134
        threshold_ = std::stoi(slices[1]);
        entry_func_ =
            std::bind(&count_entry, std::placeholders::_1, threshold_);
T
tangwei12 已提交
135
      } else if (slices[0] == "probability_entry") {
T
Thunderbrook 已提交
136
        threshold_ = std::stof(slices[1]);
T
tangwei12 已提交
137
        entry_func_ =
T
Thunderbrook 已提交
138
            std::bind(&probility_entry, std::placeholders::_1, threshold_);
T
tangwei12 已提交
139
      } else {
140
        PADDLE_THROW(platform::errors::InvalidArgument(
T
tangwei12 已提交
141 142
            "Not supported Entry Type : %s, Only support [CountFilterEntry, "
            "ProbabilityEntry]",
143
            slices[0]));
T
tangwei12 已提交
144 145
      }
    }
T
tangwei12 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160

    // for Initializer
    {
      for (auto &attr : init_attrs) {
        auto slices = string::split_string<std::string>(attr, "&");

        if (slices[0] == "gaussian_random") {
          initializers_.emplace_back(
              std::make_shared<GaussianInitializer>(slices));
        } else if (slices[0] == "fill_constant") {
          initializers_.emplace_back(
              std::make_shared<FillConstantInitializer>(slices));
        } else if (slices[0] == "uniform_random") {
          initializers_.emplace_back(
              std::make_shared<UniformInitializer>(slices));
C
Chengmo 已提交
161 162 163
        } else if (slices[0] == "truncated_gaussian_random") {
          initializers_.emplace_back(
              std::make_shared<TruncatedGaussianInitializer>(slices));
T
tangwei12 已提交
164 165 166 167 168 169
        } else {
          PADDLE_THROW(platform::errors::InvalidArgument(
              "%s can not be supported", attr));
        }
      }
    }
T
tangwei12 已提交
170 171 172 173
  }

  ~ValueBlock() {}

T
tangwei12 已提交
174
  std::vector<float *> Get(const uint64_t &id,
175 176
                           const std::vector<std::string> &value_names,
                           const std::vector<int> &value_dims) {
T
tangwei12 已提交
177 178 179 180
    auto pts = std::vector<float *>();
    pts.reserve(value_names.size());
    auto &values = values_.at(id);
    for (int i = 0; i < static_cast<int>(value_names.size()); i++) {
181 182 183
      PADDLE_ENFORCE_EQ(
          value_dims[i], value_dims_[i],
          platform::errors::InvalidArgument("value dims is not match"));
184
      pts.push_back(values.data_ +
T
tangwei12 已提交
185
                    value_offsets_.at(value_idx_.at(value_names[i])));
T
tangwei12 已提交
186
    }
T
tangwei12 已提交
187
    return pts;
T
tangwei12 已提交
188 189
  }

190
  // pull
191 192
  float *Init(const uint64_t &id, const bool with_update = true,
              const int counter = 1) {
193
    if (!Has(id)) {
194
      values_.emplace(std::make_pair(id, VALUE(value_length_)));
195 196 197
    }

    auto &value = values_.at(id);
T
tangwei12 已提交
198

199
    if (with_update) {
200
      AttrUpdate(&value, counter);
201 202
    }

203
    return value.data_;
T
tangwei12 已提交
204 205
  }

206

T
Thunderbrook 已提交
207 208 209
  VALUE *InitGet(const uint64_t &id, const bool with_update = true,
                 const int counter = 1) {
    if (!Has(id)) {
210
      values_.emplace(std::make_pair(id, VALUE(value_length_)));
T
Thunderbrook 已提交
211 212 213 214 215
    }

    auto &value = values_.at(id);

    if (with_update) {
216
      AttrUpdate(&value, counter);
T
Thunderbrook 已提交
217 218
    }

219
    return &value;
T
Thunderbrook 已提交
220 221
  }

222
  void AttrUpdate(VALUE *value, const int counter) {
223 224
    // update state
    value->unseen_days_ = 0;
225
    value->count_ += counter;
226 227 228 229 230

    if (!value->is_entry_) {
      value->is_entry_ = entry_func_(value);
      if (value->is_entry_) {
        // initialize
T
Thunderbrook 已提交
231
        for (size_t x = 0; x < value_names_.size(); ++x) {
232
          initializers_[x]->GetValue(value->data_ + value_offsets_[x],
233 234
                                     value_dims_[x]);
        }
T
tangwei12 已提交
235
        value->need_save_ = true;
T
tangwei12 已提交
236
      }
T
tangwei12 已提交
237 238
    } else {
      value->need_save_ = true;
T
tangwei12 已提交
239
    }
240 241

    return;
T
tangwei12 已提交
242 243
  }

244 245 246
  // dont jude if (has(id))
  float *Get(const uint64_t &id) {
    auto &value = values_.at(id);
247
    return value.data_;
248 249 250
  }

  // for load, to reset count, unseen_days
251
  VALUE *GetValue(const uint64_t &id) { return &values_.at(id); }
252

T
tangwei12 已提交
253
  bool GetEntry(const uint64_t &id) {
254
    auto &value = values_.at(id);
255
    return value.is_entry_;
T
tangwei12 已提交
256 257
  }

258 259
  void SetEntry(const uint64_t &id, const bool state) {
    auto &value = values_.at(id);
260
    value.is_entry_ = state;
261
  }
T
tangwei12 已提交
262

263 264 265
  void Shrink(const int threshold) {
    for (auto iter = values_.begin(); iter != values_.end();) {
      auto &value = iter->second;
266 267
      value.unseen_days_++;
      if (value.unseen_days_ >= threshold) {
268 269 270 271
        iter = values_.erase(iter);
      } else {
        ++iter;
      }
T
tangwei12 已提交
272
    }
273
    return;
T
tangwei12 已提交
274 275
  }

T
Thunderbrook 已提交
276 277
  float GetThreshold() { return threshold_; }

T
tangwei12 已提交
278 279 280 281 282 283 284 285 286 287 288
 private:
  bool Has(const uint64_t id) {
    auto got = values_.find(id);
    if (got == values_.end()) {
      return false;
    } else {
      return true;
    }
  }

 public:
289
  robin_hood::unordered_map<uint64_t, VALUE> values_;
T
tangwei12 已提交
290
  size_t value_length_ = 0;
T
tangwei12 已提交
291 292

 private:
T
tangwei12 已提交
293 294 295 296 297
  const std::vector<std::string> &value_names_;
  const std::vector<int> &value_dims_;
  const std::vector<int> &value_offsets_;
  const std::unordered_map<std::string, int> &value_idx_;

298
  std::function<bool(VALUE *)> entry_func_;
T
tangwei12 已提交
299
  std::vector<std::shared_ptr<Initializer>> initializers_;
T
Thunderbrook 已提交
300
  float threshold_;
T
tangwei12 已提交
301 302 303 304
};

}  // namespace distributed
}  // namespace paddle
305