communicator.h 14.4 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

17
#include <ThreadPool.h>
W
wanghuancoder 已提交
18
#include <stdint.h>
19
#include <atomic>
Q
Qiao Longfei 已提交
20
#include <deque>
21
#include <map>
Q
Qiao Longfei 已提交
22
#include <memory>
T
tangwei12 已提交
23
#include <numeric>
24
#include <set>
Q
Qiao Longfei 已提交
25
#include <string>
Q
Qiao Longfei 已提交
26
#include <unordered_map>
27
#include <unordered_set>
Q
Qiao Longfei 已提交
28
#include <utility>
Q
Qiao Longfei 已提交
29 30
#include <vector>

W
wanghuancoder 已提交
31
#include "gflags/gflags.h"
Q
Qiao Longfei 已提交
32 33
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
T
tangwei12 已提交
34
#include "paddle/fluid/framework/variable_helper.h"
35
#include "paddle/fluid/operators/distributed/communicator_common.h"
C
Chengmo 已提交
36
#include "paddle/fluid/operators/distributed/distributed.h"
37
#include "paddle/fluid/operators/distributed/large_scale_kv.h"
C
Chengmo 已提交
38 39
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"
40
#include "paddle/fluid/operators/math/blas.h"
Q
Qiao Longfei 已提交
41 42
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
Q
Qiao Longfei 已提交
43 44 45
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
T
tangwei12 已提交
46
#include "paddle/fluid/string/split.h"
Q
Qiao Longfei 已提交
47

48 49
DECLARE_bool(communicator_is_sgd_optimizer);

Q
Qiao Longfei 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63
namespace paddle {
namespace operators {
namespace distributed {

using Scope = framework::Scope;
using Variable = framework::Variable;

template <typename T>
class BlockingQueue {
 public:
  explicit BlockingQueue(size_t capacity) : capacity_(capacity) {
    PADDLE_ENFORCE_GT(capacity_, 0, "The capacity must be greater than 0.");
  }

64
  bool Push(const T &elem) {
Q
Qiao Longfei 已提交
65 66 67 68 69 70 71
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
      PADDLE_ENFORCE_LT(queue_.size(), capacity_);
      queue_.push_back(elem);
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
72 73 74
    return true;
  }

75
  bool Push(T &&elem) {
Q
Qiao Longfei 已提交
76 77 78 79 80 81 82
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
      PADDLE_ENFORCE_LT(queue_.size(), capacity_);
      queue_.emplace_back(std::move(elem));
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
83 84 85 86 87
    return true;
  }

  T Pop() {
    std::unique_lock<std::mutex> lock(mutex_);
Q
Qiao Longfei 已提交
88
    cv_.wait(lock, [=] { return !queue_.empty(); });
Q
Qiao Longfei 已提交
89 90
    T rc(std::move(queue_.front()));
    queue_.pop_front();
Q
Qiao Longfei 已提交
91
    cv_.notify_one();
Q
Qiao Longfei 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
    return rc;
  }

  size_t Cap() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return capacity_;
  }

  size_t Size() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return queue_.size();
  }

 private:
  const size_t capacity_;
  std::deque<T> queue_;

  mutable std::mutex mutex_;
Q
Qiao Longfei 已提交
110
  std::condition_variable cv_;
Q
Qiao Longfei 已提交
111 112
};

Q
Qiao Longfei 已提交
113 114 115 116
template <typename T, int MajorType = Eigen::RowMajor,
          typename IndexType = Eigen::DenseIndex>
using EigenVector = framework::EigenVector<T, MajorType, IndexType>;

1
123malin 已提交
117
template <typename T>
118 119 120
inline void MergeVars(const std::string &var_name,
                      const std::vector<std::shared_ptr<Variable>> &vars,
                      Scope *scope, bool merge_add = true) {
Q
Qiao Longfei 已提交
121 122
  PADDLE_ENFORCE(!vars.empty(), "should have value to merge!");
  auto cpu_place = platform::CPUPlace();
123 124
  auto &var0 = vars[0];
  auto *out_var = scope->Var(var_name);
Q
Qiao Longfei 已提交
125 126
  if (var0->IsType<framework::LoDTensor>()) {
    auto dims = var0->Get<framework::LoDTensor>().dims();
1
123malin 已提交
127 128
    VLOG(3) << "merge " << var_name << " LoDTensor dims " << dims
            << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
129
    // init output tensor
130
    auto *out_t = out_var->GetMutable<framework::LoDTensor>();
1
123malin 已提交
131
    out_t->mutable_data<T>(dims, cpu_place);
Q
Qiao Longfei 已提交
132
    // check the input dims
133 134
    for (auto &var : vars) {
      auto &var_t = var->Get<framework::LoDTensor>();
Q
Qiao Longfei 已提交
135 136 137 138 139
      PADDLE_ENFORCE_EQ(var_t.dims(), dims, "should have the same dims");
    }

    // set output tensor to 0.
    auto cpu_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
140 141
    math::SetConstant<paddle::platform::CPUDeviceContext, T> constant_functor;
    constant_functor(cpu_ctx, out_t, static_cast<T>(0));
Q
Qiao Longfei 已提交
142
    // sum all vars to out
1
123malin 已提交
143
    auto result = EigenVector<T>::Flatten(*out_t);
144 145
    for (auto &var : vars) {
      auto &in_t = var->Get<framework::LoDTensor>();
1
123malin 已提交
146
      auto in = EigenVector<T>::Flatten(in_t);
Q
Qiao Longfei 已提交
147 148
      result.device(*cpu_ctx.eigen_device()) = result + in;
    }
1
123malin 已提交
149
    if (!merge_add) {
150
      result.device(*cpu_ctx.eigen_device()) =
1
123malin 已提交
151
          result / static_cast<T>(vars.size());
152
    }
Q
Qiao Longfei 已提交
153
  } else if (var0->IsType<framework::SelectedRows>()) {
154 155
    auto &slr0 = var0->Get<framework::SelectedRows>();
    auto *out_slr = out_var->GetMutable<framework::SelectedRows>();
Q
Qiao Longfei 已提交
156
    out_slr->mutable_rows()->clear();
1
123malin 已提交
157
    out_slr->mutable_value()->mutable_data<T>({{}}, cpu_place);
158
    std::vector<const paddle::framework::SelectedRows *> inputs;
Q
Qiao Longfei 已提交
159
    inputs.reserve(vars.size());
160
    for (auto &var : vars) {
Q
Qiao Longfei 已提交
161 162 163
      inputs.push_back(&var->Get<framework::SelectedRows>());
    }
    auto dev_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
164 165
    if (merge_add) {
      math::scatter::MergeAdd<paddle::platform::CPUDeviceContext, T> merge_add;
166 167
      merge_add(dev_ctx, inputs, out_slr);
    } else {
1
123malin 已提交
168
      math::scatter::MergeAverage<paddle::platform::CPUDeviceContext, T>
169 170 171 172
          merge_average;
      merge_average(dev_ctx, inputs, out_slr);
    }

Q
Qiao Longfei 已提交
173
    VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height()
1
123malin 已提交
174
            << " dims: " << slr0.value().dims() << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
175 176 177 178 179
  } else {
    PADDLE_THROW("unsupported var type!");
  }
}

180 181
using RpcCtxMap = std::unordered_map<std::string, CommContext>;
using SparseValue = std::unordered_map<int64_t, std::vector<float>>;
Q
Qiao Longfei 已提交
182

Q
Qiao Longfei 已提交
183 184
class Communicator {
 public:
1
123malin 已提交
185
  Communicator();
186 187 188 189 190 191 192

  explicit Communicator(const std::map<std::string, std::string> &envs_) {
    for (auto &iter : envs_) {
      envs[iter.first] = iter.second;
    }
  }

T
tangwei12 已提交
193
  virtual ~Communicator() {}
Q
Qiao Longfei 已提交
194

T
tangwei12 已提交
195
  virtual void Start() = 0;
196

T
tangwei12 已提交
197
  virtual void Stop() = 0;
198

T
tangwei12 已提交
199
  virtual bool IsRunning() { return running_; }
Q
Qiao Longfei 已提交
200

201 202
  virtual void Clean() {}

203 204 205
  virtual void Send(const std::vector<std::string> &var_names,
                    const std::vector<std::string> &var_tables,
                    const framework::Scope &scope) = 0;
206

207
  virtual void RecvNoBarrier() {}
Q
Qiao Longfei 已提交
208

209
  virtual void Barrier() {}
210

211
  virtual void BarrierTriggerDecrement() {}
212

213 214
  virtual void BarrierTriggerReset(int init_counter) {}

215 216 217 218 219 220 221
  virtual void InitEnvs() = 0;

  virtual void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                        const RpcCtxMap &recv_varname_to_ctx,
                        Scope *recv_scope) {}

  static Communicator *GetInstance() { return communicator_.get(); }
Q
Qiao Longfei 已提交
222

T
tangwei12 已提交
223 224 225
  static std::shared_ptr<Communicator> GetInstantcePtr() {
    return communicator_;
  }
226

227
  template <typename T>
228 229 230 231 232
  static Communicator *InitInstance(
      const RpcCtxMap &send_ctx, const RpcCtxMap &recv_ctx, Scope *recv_scope,
      const std::map<std::string, std::string> &envs) {
    std::call_once(init_flag_, &Communicator::InitWithRpcCtx<T>, send_ctx,
                   recv_ctx, recv_scope, std::ref(envs));
233 234 235
    return communicator_.get();
  }

236
  // Init is called by InitInstance.
T
tangwei12 已提交
237
  template <typename T>
238 239 240
  static void InitWithRpcCtx(const RpcCtxMap &send_ctx,
                             const RpcCtxMap &recv_ctx, Scope *recv_scope,
                             const std::map<std::string, std::string> &envs) {
T
tangwei12 已提交
241
    if (communicator_.get() == nullptr) {
242
      communicator_.reset(new T(std::ref(envs)));
243 244
      communicator_->InitEnvs();
      communicator_->InitImpl(send_ctx, recv_ctx, recv_scope);
T
tangwei12 已提交
245 246 247 248 249
    }
  }

 protected:
  bool running_ = false;
250
  bool waiting_ = true;
T
tangwei12 已提交
251 252
  static std::shared_ptr<Communicator> communicator_;
  static std::once_flag init_flag_;
253
  std::unordered_map<std::string, std::string> envs;
T
tangwei12 已提交
254 255 256 257
};

class AsyncCommunicator : public Communicator {
 public:
1
123malin 已提交
258
  AsyncCommunicator() : Communicator() {}
259 260 261 262 263 264 265

  explicit AsyncCommunicator(const std::map<std::string, std::string> &envs)
      : Communicator(envs) {}

  ~AsyncCommunicator();

  void InitEnvs() {
266 267 268 269 270 271
    min_send_grad_num_before_recv_ =
        std::stoi(envs.at("communicator_min_send_grad_num_before_recv"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
272 273
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
274
    VLOG(0) << "AsyncCommunicator Initialized";
275
  }
276

T
tangwei12 已提交
277
  void Start() override;
278

T
tangwei12 已提交
279 280
  void Stop() override;

281 282 283
  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;
T
tangwei12 已提交
284

T
tangwei12 已提交
285 286
  void InitParams();

287
  void MainThread();
T
tangwei12 已提交
288

289 290 291
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
T
tangwei12 已提交
292

293
  virtual void SendByCommunicator(int batches);
Q
Qiao Longfei 已提交
294

295
  virtual void SendGlobalStep(int batches);
296

297 298 299 300
  virtual void RecvByCommunicator();

  virtual void RecvNoBarrier();

T
tangwei12 已提交
301
  virtual int BatchesCounter();
302 303 304 305 306 307 308 309

  virtual void BarrierSend() {}

  virtual void BarrierRecv() {}

  virtual void BarrierWeakUp() {}

 protected:
310 311 312 313 314
  int min_send_grad_num_before_recv_;
  int thread_pool_size_;
  int max_merge_var_num_;
  int send_wait_times_;
  int send_queue_size_;
315 316
  int trainer_id_ = 0;
  bool need_global_step_ = false;
317

Q
Qiao Longfei 已提交
318 319 320
  std::unordered_map<std::string,
                     std::shared_ptr<BlockingQueue<std::shared_ptr<Variable>>>>
      send_varname_to_queue_;
Q
Qiao Longfei 已提交
321 322
  RpcCtxMap send_varname_to_ctx_;
  RpcCtxMap recv_varname_to_ctx_;
323 324
  std::unique_ptr<std::thread> main_thread_{nullptr};
  Scope *recv_scope_;                  // should be global scope
Q
Qiao Longfei 已提交
325
  std::unique_ptr<Scope> send_scope_;  // an independent scope
Q
Qiao Longfei 已提交
326 327
  std::unique_ptr<::ThreadPool> send_threadpool_{nullptr};
  std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr};
328
  std::atomic_uint grad_num_{0};  // the num of gradient sent since last recv
Q
Qiao Longfei 已提交
329 330
};

331
class HalfAsyncCommunicator : public AsyncCommunicator {
332
 public:
333
  HalfAsyncCommunicator() {}
334 335 336 337 338 339 340

  explicit HalfAsyncCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

341 342 343 344
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
345 346
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
347
    VLOG(0) << "HalfAsyncCommunicator Initialized";
348
  }
349

350 351
  void Clean() override;

352 353 354
  void Barrier() override;

  void BarrierTriggerDecrement() override;
355

356
  void BarrierTriggerReset(int initial_val) override;
357

T
tangwei12 已提交
358
  int BatchesCounter();
359

360
  void BarrierWeakUp();
361

T
tangwei12 已提交
362
 protected:
363 364 365 366 367 368 369
  // mutex for Wait for barrier
  std::mutex barrier_mutex_;
  std::condition_variable barrier_cond_;
  std::atomic<int64_t> barrier_trigger_{0};
  std::atomic<int64_t> barrier_counter_{0};
};

T
tangwei12 已提交
370 371 372
class SyncCommunicator : public HalfAsyncCommunicator {
 public:
  SyncCommunicator() : HalfAsyncCommunicator() {}
373 374 375 376 377 378 379 380 381 382 383 384 385 386

  explicit SyncCommunicator(const std::map<std::string, std::string> &envs)
      : HalfAsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));

T
tangwei12 已提交
387 388 389 390 391
    trainer_id_ = std::stoi(envs.at("trainer_id"));
    auto pserver_strings = envs.at("pserver_endpoints");
    pserver_endpoints_ = paddle::string::Split(pserver_strings, ',');
    VLOG(0) << "SyncCommunicator Initialized";
  }
392

T
tangwei12 已提交
393
  void BarrierSend();
394

T
tangwei12 已提交
395 396 397 398 399 400
  void BarrierRecv();

 private:
  std::vector<std::string> pserver_endpoints_{};
};

401
class GeoCommunicator : public AsyncCommunicator {
402
 public:
403 404 405 406 407 408 409 410 411 412 413 414 415
  GeoCommunicator() : AsyncCommunicator() {}

  explicit GeoCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
416
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
417 418 419 420 421 422
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));

    send_queue_size_ = max_merge_var_num_;
    trainers_ = std::stoi(envs.at("trainers"));
    sparse_attrs_ = envs.at("sparse_attrs");
    VLOG(0) << "GeoCommunicator Initialized";
423 424
  }

425 426 427
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
428

429
  void SendByCommunicator(int batches) override;
430

431
  void SendSparse(const std::string &varname, int batches);
432

433
  void SendDense(const std::string &varname);
434

435
  void SendGlobalStep(int batches) override {}
436

437
  void RecvByCommunicator() override;
438

439
  void RecvSparse(const std::string &varname);
440

441
  void RecvDense(const std::string &varname);
C
Chengmo 已提交
442

T
tangwei12 已提交
443
  void InitParams();
444

445
  void InitSparse();
446

447
  void InitDense(const std::string varname);
448

449 450 451
 private:
  int trainers_;
  std::string sparse_attrs_;
452 453 454 455 456 457 458 459 460 461

  // parameter for delta calc and send
  std::shared_ptr<Scope> delta_scope_;

  // parameter for storage the pserver param after last recv
  std::shared_ptr<Scope> old_scope_;

  // parameter on pserver
  std::shared_ptr<Scope> pserver_scope_;

462 463 464
  std::unordered_map<std::string,
                     std::shared_ptr<BlockingQueue<std::vector<int64_t>>>>
      send_ids_to_queue_;
C
Chengmo 已提交
465

466
  std::unordered_map<std::string, std::shared_ptr<SparseValue>> old_sparses_;
467 468
};

Q
Qiao Longfei 已提交
469 470 471
}  // namespace distributed
}  // namespace operators
}  // namespace paddle