large_scale_kv.h 6.5 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <ThreadPool.h>
#include <gflags/gflags.h>
#include <functional>
#include <future>  // NOLINT
#include <memory>
#include <string>
#include <thread>  // NOLINT
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/rw_lock.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/string_helper.h"

namespace paddle {
namespace distributed {

enum Mode { training, infer };

template <typename T>
inline bool entry(const int count, const T threshold);

template <>
inline bool entry<std::string>(const int count, const std::string threshold) {
  return true;
}

template <>
inline bool entry<int>(const int count, const int threshold) {
  return count >= threshold;
}

template <>
inline bool entry<float>(const int count, const float threshold) {
  UniformInitializer uniform = UniformInitializer({"0", "0", "1"});
  return uniform.GetValue() >= threshold;
}

struct VALUE {
70 71 72 73 74 75 76
  explicit VALUE(size_t length)
      : length_(length),
        count_(1),
        unseen_days_(0),
        seen_after_last_save_(true),
        is_entry_(true) {
    data_.resize(length);
T
tangwei12 已提交
77 78
  }

79 80
  size_t length_;
  std::vector<float> data_;
T
tangwei12 已提交
81 82
  int count_;
  int unseen_days_;
T
tangwei12 已提交
83
  bool seen_after_last_save_;
T
tangwei12 已提交
84 85 86 87 88
  bool is_entry_;
};

class ValueBlock {
 public:
89 90 91 92 93 94 95 96 97 98 99 100
  explicit ValueBlock(const std::vector<std::string> &value_names,
                      const std::vector<int> &value_dims,
                      const std::vector<int> &value_offsets,
                      const std::unordered_map<std::string, int> &value_idx,
                      const std::vector<std::string> &init_attrs,
                      const std::string &entry_attr)
      : value_names_(value_names),
        value_dims_(value_dims),
        value_offsets_(value_offsets),
        value_idx_(value_idx) {
    for (int x = 0; x < value_dims.size(); ++x) {
      value_length_ += value_dims[x];
T
tangwei12 已提交
101 102
    }

T
tangwei12 已提交
103 104 105
    // for Entry
    {
      if (entry_attr == "none") {
106
        has_entry_ = false;
T
tangwei12 已提交
107 108 109
        entry_func_ =
            std::bind(entry<std::string>, std::placeholders::_1, "none");
      } else {
110
        has_entry_ = true;
T
tangwei12 已提交
111 112 113 114 115 116 117 118 119 120 121
        auto slices = string::split_string<std::string>(entry_attr, "&");
        if (slices[0] == "count_filter") {
          int threshold = std::stoi(slices[1]);
          entry_func_ = std::bind(entry<int>, std::placeholders::_1, threshold);
        } else if (slices[0] == "probability") {
          float threshold = std::stof(slices[1]);
          entry_func_ =
              std::bind(entry<float>, std::placeholders::_1, threshold);
        }
      }
    }
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

    // for Initializer
    {
      for (auto &attr : init_attrs) {
        auto slices = string::split_string<std::string>(attr, "&");

        if (slices[0] == "gaussian_random") {
          initializers_.emplace_back(
              std::make_shared<GaussianInitializer>(slices));
        } else if (slices[0] == "fill_constant") {
          initializers_.emplace_back(
              std::make_shared<FillConstantInitializer>(slices));
        } else if (slices[0] == "uniform_random") {
          initializers_.emplace_back(
              std::make_shared<UniformInitializer>(slices));
        } else {
          PADDLE_THROW(platform::errors::InvalidArgument(
              "%s can not be supported", attr));
        }
      }
    }
T
tangwei12 已提交
143 144 145 146
  }

  ~ValueBlock() {}

147 148 149 150 151
  float *Init(const uint64_t &id) {
    auto value = std::make_shared<VALUE>(value_length_);
    for (int x = 0; x < value_names_.size(); ++x) {
      initializers_[x]->GetValue(value->data_.data() + value_offsets_[x],
                                 value_dims_[x]);
T
tangwei12 已提交
152 153
    }
    values_[id] = value;
154
    return value->data_.data();
T
tangwei12 已提交
155 156
  }

157 158 159 160 161 162 163 164
  std::vector<float *> Get(const uint64_t &id,
                           const std::vector<std::string> &value_names) {
    auto pts = std::vector<float *>();
    pts.reserve(value_names.size());
    auto &values = values_.at(id);
    for (int i = 0; i < static_cast<int>(value_names.size()); i++) {
      pts.push_back(values->data_.data() +
                    value_offsets_.at(value_idx_.at(value_names[i])));
T
tangwei12 已提交
165
    }
166
    return pts;
T
tangwei12 已提交
167 168
  }

169 170 171
  float *Get(const uint64_t &id) {
    auto pts = std::vector<std::vector<float> *>();
    auto &values = values_.at(id);
T
tangwei12 已提交
172

173
    return values->data_.data();
T
tangwei12 已提交
174 175
  }

176
  float *InitFromInitializer(const uint64_t &id) {
T
tangwei12 已提交
177
    if (Has(id)) {
178
      if (has_entry_) {
T
tangwei12 已提交
179
        Update(id);
T
tangwei12 已提交
180
      }
181
      return Get(id);
T
tangwei12 已提交
182
    }
183
    return Init(id);
T
tangwei12 已提交
184 185 186 187
  }

  bool GetEntry(const uint64_t &id) {
    auto value = values_.at(id);
188
    return value->is_entry_;
T
tangwei12 已提交
189 190 191
  }

  void Update(const uint64_t id) {
192 193 194
    auto value = values_.at(id);
    value->unseen_days_ = 0;
    auto count = ++value->count_;
T
tangwei12 已提交
195

196 197
    if (!value->is_entry_) {
      value->is_entry_ = entry_func_(count);
T
tangwei12 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211
    }
  }

 private:
  bool Has(const uint64_t id) {
    auto got = values_.find(id);
    if (got == values_.end()) {
      return false;
    } else {
      return true;
    }
  }

 public:
212 213
  std::unordered_map<uint64_t, std::shared_ptr<VALUE>> values_;
  size_t value_length_ = 0;
T
tangwei12 已提交
214 215

 private:
216 217 218 219 220 221
  const std::vector<std::string> &value_names_;
  const std::vector<int> &value_dims_;
  const std::vector<int> &value_offsets_;
  const std::unordered_map<std::string, int> &value_idx_;

  bool has_entry_ = false;
T
tangwei12 已提交
222
  std::function<bool(uint64_t)> entry_func_;
223
  std::vector<std::shared_ptr<Initializer>> initializers_;
T
tangwei12 已提交
224 225 226 227
};

}  // namespace distributed
}  // namespace paddle