tensor_table.h 9.1 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

17 18
#include <algorithm>
#include <condition_variable>  // NOLINT
T
tangwei12 已提交
19
#include <memory>
20 21
#include <mutex>  // NOLINT
#include <set>
T
tangwei12 已提交
22 23 24
#include <string>
#include <unordered_map>
#include <vector>
25

26
#include "paddle/fluid/distributed/common/utils.h"
27
#include "paddle/fluid/distributed/ps/table/table.h"
T
tangwei12 已提交
28 29 30 31
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/platform/device_context.h"

32 33 34 35 36 37 38 39
namespace paddle {
namespace framework {
class Executor;
class Scope;
struct ExecutorPrepareContext;
}  // namespace framework
}  // namespace paddle

40 41
DECLARE_double(eager_delete_tensor_gb);

T
tangwei12 已提交
42 43 44
namespace paddle {
namespace distributed {

45 46 47
#define LEARNING_RATE_DECAY_COUNTER "@LR_DECAY_COUNTER@"
#define STEP_COUNTER "@PS_STEP_COUNTER@"

T
tangwei12 已提交
48 49
class TensorTable : public Table {
 public:
50
  TensorTable() {}
T
tangwei12 已提交
51 52
  virtual ~TensorTable() {}

Y
yaoxuefeng 已提交
53 54
  virtual int32_t Pull(TableContext &context) { return 0; }
  virtual int32_t Push(TableContext &context) { return 0; }
Z
zhaocaibei123 已提交
55
  int32_t PullDense(float *values, size_t num) override { return 0; }
T
tangwei12 已提交
56

Z
zhaocaibei123 已提交
57
  int32_t PushDense(const float *values, size_t num) override { return 0; }
T
tangwei12 已提交
58

Z
zhaocaibei123 已提交
59 60
  int32_t PullSparse(float *values,
                     const PullSparseValue &pull_value) override {
T
tangwei12 已提交
61
    return 0;
62
  }
Z
zhaocaibei123 已提交
63 64
  int32_t PushSparse(const uint64_t *keys, const float *values,
                     size_t num) override {
65 66
    return 0;
  }
Z
zhaocaibei123 已提交
67
  int32_t Shrink(const std::string &param) override { return 0; }
68

Z
zhaocaibei123 已提交
69
  virtual void *GetShard(size_t shard_idx) { return 0; }
T
tangwei12 已提交
70

Z
zhaocaibei123 已提交
71
  virtual int32_t InitializeShard() { return 0; }
T
tangwei12 已提交
72

Z
zhaocaibei123 已提交
73
  virtual int32_t Flush() { return 0; }
74

Z
zhaocaibei123 已提交
75
  virtual int32_t Load(const std::string &path, const std::string &param) {
T
tangwei12 已提交
76
    return 0;
77
  }
Z
zhaocaibei123 已提交
78
  virtual int32_t Save(const std::string &path, const std::string &param) {
79 80 81
    return 0;
  }

Z
zhaocaibei123 已提交
82
  virtual void Clear() {}
T
tangwei12 已提交
83

Z
zhaocaibei123 已提交
84
  int32_t Initialize() override { return 0; }
85

Z
zhaocaibei123 已提交
86
  int32_t PushDense(const int64_t *values, const int32_t trainer_id) override {
T
tangwei12 已提交
87
    return 0;
88
  }
T
tangwei12 已提交
89

Z
zhaocaibei123 已提交
90
  int32_t SetProgramEnv(
91
      framework::Scope *scope, platform::Place place,
92 93 94 95 96 97 98
      const std::vector<framework::ProgramDesc> *sub_program) override {
    scope_ = scope;
    place_ = place;
    executor_ = new framework::Executor(place_);
    sub_program_ = sub_program;
    return 0;
  }
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

 protected:
  framework::Executor *executor_;
  framework::Scope *scope_;
  platform::Place place_ = platform::CPUPlace();
  const std::vector<framework::ProgramDesc> *sub_program_;
  paddle::distributed::TensorAccessorParameter program_config_;
  std::shared_ptr<framework::ExecutorPrepareContext> exec_context_ = nullptr;
};

class DenseTensorTable : public TensorTable {
 public:
  DenseTensorTable() {}
  virtual ~DenseTensorTable() {}

Z
zhaocaibei123 已提交
114 115
  int32_t PullSparse(float *values,
                     const PullSparseValue &pull_value) override {
T
tangwei12 已提交
116 117
    return 0;
  }
Z
zhaocaibei123 已提交
118 119
  int32_t PushSparse(const uint64_t *keys, const float *values,
                     size_t num) override {
120 121
    return 0;
  }
Z
zhaocaibei123 已提交
122
  int32_t Shrink(const std::string &param) override { return 0; }
T
tangwei12 已提交
123

Z
zhaocaibei123 已提交
124
  virtual void *GetShard(size_t shard_idx) { return 0; }
T
tangwei12 已提交
125

Z
zhaocaibei123 已提交
126
  virtual int32_t InitializeShard() { return 0; }
T
tangwei12 已提交
127

Z
zhaocaibei123 已提交
128
  virtual int32_t Flush() { return 0; }
T
tangwei12 已提交
129

Z
zhaocaibei123 已提交
130
  virtual void Clear() {}
131 132

  // Todo: Support program Load & Save
Z
zhaocaibei123 已提交
133
  virtual int32_t Load(const std::string &path, const std::string &param) {
T
tangwei12 已提交
134 135
    return 0;
  }
Z
zhaocaibei123 已提交
136
  virtual int32_t Save(const std::string &path, const std::string &param) {
T
tangwei12 已提交
137 138 139
    return 0;
  }

140
  // Todo: Support pull dense
Z
zhaocaibei123 已提交
141
  int32_t PullDense(float *values, size_t num) override { return 0; }
142 143 144

  /*----------------------------------------------------------------------*/

Z
zhaocaibei123 已提交
145
  int32_t Initialize() override { return 0; }
146

Z
zhaocaibei123 已提交
147
  int32_t PushDense(const float *values, size_t num) override { return 0; }
T
tangwei12 已提交
148

Z
zhaocaibei123 已提交
149
  int32_t PushDense(const int64_t *values, const int32_t trainer_id) {
T
tangwei12 已提交
150 151 152
    return 0;
  }

153
 protected:
Z
zhaocaibei123 已提交
154 155
  virtual int32_t _RunProgram(const float *values, size_t num,
                              const uint32_t trainer_id) {
156 157
    return 0;
  }
T
tangwei12 已提交
158

159 160 161 162
  int startup_program_id_ = -1;
  int main_program_id_ = -1;
  std::string feed_var_name_ = "";
  std::string fetch_var_name_ = "";
T
tangwei12 已提交
163 164
};

165
class GlobalStepTable : public DenseTensorTable {
T
tangwei12 已提交
166
 public:
167 168
  GlobalStepTable() {}
  virtual ~GlobalStepTable() {}
T
tangwei12 已提交
169

Z
zhaocaibei123 已提交
170 171
  int32_t PullSparse(float *values,
                     const PullSparseValue &pull_value) override {
T
tangwei12 已提交
172 173
    return 0;
  }
Z
zhaocaibei123 已提交
174 175
  int32_t PushSparse(const uint64_t *keys, const float *values,
                     size_t num) override {
T
tangwei12 已提交
176 177
    return 0;
  }
Z
zhaocaibei123 已提交
178
  int32_t Shrink(const std::string &param) override { return 0; }
T
tangwei12 已提交
179

Z
zhaocaibei123 已提交
180
  virtual void *GetShard(size_t shard_idx) { return 0; }
181

Z
zhaocaibei123 已提交
182
  virtual int32_t InitializeShard() { return 0; }
T
tangwei12 已提交
183

Z
zhaocaibei123 已提交
184
  virtual int32_t Flush() { return 0; }
T
tangwei12 已提交
185

Z
zhaocaibei123 已提交
186
  virtual void Clear() {}
187

Z
zhaocaibei123 已提交
188
  virtual int32_t Load(const std::string &path, const std::string &param) {
T
tangwei12 已提交
189 190
    return 0;
  }
Z
zhaocaibei123 已提交
191
  virtual int32_t Save(const std::string &path, const std::string &param) {
T
tangwei12 已提交
192 193 194
    return 0;
  }

Z
zhaocaibei123 已提交
195
  int32_t PullDense(float *values, size_t num) override { return 0; }
T
tangwei12 已提交
196

197
  /*----------------------------------------------------------------------*/
T
tangwei12 已提交
198

Z
zhaocaibei123 已提交
199
  int32_t Initialize() override {
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
    auto _program_config = _config.tensor();
    auto trainers_ = _config.common().trainer_num();
    FLAGS_eager_delete_tensor_gb = -1;
    // Get Config
    if (_program_config.has_startup_program_id()) {
      startup_program_id_ = _program_config.startup_program_id();
    }
    if (_program_config.has_main_program_id()) {
      main_program_id_ = _program_config.main_program_id();
    }
    if (_program_config.has_feed_var_name()) {
      feed_var_name_ = _program_config.feed_var_name();
    }
    if (_program_config.has_fetch_var_name()) {
      fetch_var_name_ = _program_config.fetch_var_name();
    }

    // Run startup program
    if (startup_program_id_ != -1) {
      std::map<std::string, const framework::LoDTensor *> fake_feed;
      std::map<std::string, framework::FetchType *> fake_fetch;
      auto startup_program_desc = sub_program_->at(startup_program_id_);
      auto ctx = executor_->Prepare(startup_program_desc, 0);
      executor_->RunPreparedContext(ctx.get(), scope_, false);
    }

    if (main_program_id_ != -1) {
      // Run main porgram, if program is used for learning decay
      auto main_program_desc = sub_program_->at(main_program_id_);
      auto main_ctx = executor_->Prepare(main_program_desc, 0);
      exec_context_ = std::move(main_ctx);
      executor_->RunPreparedContext(exec_context_.get(), scope_, false);
      // init decay_counters
      decay_counters_.reserve(trainers_);
      for (int32_t i = 0; i < trainers_; ++i) {
        decay_counters_[i] = 0;
      }
    }
  }
239

Z
zhaocaibei123 已提交
240
  int32_t PushDense(const float *values, size_t num) override { return 0; }
241

Z
zhaocaibei123 已提交
242 243
  int32_t PushDense(const int64_t *values, const int32_t trainer_id) {
    return _RunProgram(values, trainer_id);
244
  }
245

Z
zhaocaibei123 已提交
246 247
  int32_t SetTableMap(std::unordered_map<uint32_t, std::shared_ptr<Table>>
                          *table_map) override {
248 249 250 251 252 253 254 255 256 257
    auto *lr_var = scope_->FindVar(fetch_var_name_);
    auto *lr_tensor = lr_var->GetMutable<framework::LoDTensor>();
    auto *lr_value = lr_tensor->mutable_data<float>(platform::CPUPlace());
    VLOG(3) << "GlobalStepTable::set_table_map set global lr: " << *lr_value;

    for (auto iter = table_map->begin(); iter != table_map->end(); iter++) {
      auto table_id = iter->first;
      if (table_id == _config.table_id()) {
        continue;
      }
Z
zhaocaibei123 已提交
258
      iter->second->SetGlobalLR(lr_value);
259 260 261
    }
    return 0;
  }
262 263

 private:
Z
zhaocaibei123 已提交
264 265
  virtual int32_t _RunProgram(const int64_t *values,
                              const uint32_t trainer_id) {
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
    FLAGS_eager_delete_tensor_gb = -1;
    auto counter = decay_counters_.at(trainer_id);
    counter += int(values[0]);
    decay_counters_.at(trainer_id) = counter;

    auto *global_step_var = scope_->FindVar(feed_var_name_);
    auto *tensor = global_step_var->GetMutable<framework::LoDTensor>();
    auto *value = tensor->mutable_data<int64_t>(platform::CPUPlace());

    auto global_counter = 0;
    for (auto &trainer_counter : decay_counters_) {
      global_counter += trainer_counter.second;
    }

    // Todo: hard code for increment op
    value[0] = global_counter - 1;
    VLOG(3) << "GlobalStepTable::_run_program global_counter " << value[0];

    executor_->RunPreparedContext(exec_context_.get(), scope_, false, false);
    auto *lr_var = scope_->FindVar(fetch_var_name_);
    auto *lr_tensor = lr_var->GetMutable<framework::LoDTensor>();
    auto *lr_value = lr_tensor->mutable_data<float>(platform::CPUPlace());
    VLOG(3) << "GlobalStepTable::LR value: " << lr_value[0];
    return 0;
  }
291 292 293 294

 private:
  std::unordered_map<int, int64_t> decay_counters_;
  int32_t trainers_;
T
tangwei12 已提交
295
};
296

T
tangwei12 已提交
297 298
}  // namespace distributed
}  // namespace paddle