communicator.h 15.3 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

17
#include <ThreadPool.h>
W
wanghuancoder 已提交
18
#include <stdint.h>
19
#include <atomic>
Q
Qiao Longfei 已提交
20
#include <deque>
21
#include <map>
Q
Qiao Longfei 已提交
22
#include <memory>
T
tangwei12 已提交
23
#include <numeric>
24
#include <set>
Q
Qiao Longfei 已提交
25
#include <string>
Q
Qiao Longfei 已提交
26
#include <unordered_map>
27
#include <unordered_set>
Q
Qiao Longfei 已提交
28
#include <utility>
Q
Qiao Longfei 已提交
29 30
#include <vector>

W
wanghuancoder 已提交
31
#include "gflags/gflags.h"
Q
Qiao Longfei 已提交
32 33
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
T
tangwei12 已提交
34
#include "paddle/fluid/framework/variable_helper.h"
35
#include "paddle/fluid/operators/distributed/communicator_common.h"
C
Chengmo 已提交
36
#include "paddle/fluid/operators/distributed/distributed.h"
37
#include "paddle/fluid/operators/distributed/large_scale_kv.h"
C
Chengmo 已提交
38 39
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"
40
#include "paddle/fluid/operators/math/blas.h"
Q
Qiao Longfei 已提交
41 42
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
Q
Qiao Longfei 已提交
43 44 45
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
T
tangwei12 已提交
46
#include "paddle/fluid/string/split.h"
Q
Qiao Longfei 已提交
47

48 49
DECLARE_bool(communicator_is_sgd_optimizer);

Q
Qiao Longfei 已提交
50 51 52 53 54 55 56 57 58 59 60
namespace paddle {
namespace operators {
namespace distributed {

using Scope = framework::Scope;
using Variable = framework::Variable;

template <typename T>
class BlockingQueue {
 public:
  explicit BlockingQueue(size_t capacity) : capacity_(capacity) {
M
MRXLT 已提交
61 62 63
    PADDLE_ENFORCE_GT(capacity_, 0,
                      platform::errors::InvalidArgument(
                          "The capacity must be greater than 0."));
Q
Qiao Longfei 已提交
64 65
  }

66
  bool Push(const T &elem) {
Q
Qiao Longfei 已提交
67 68 69
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
M
MRXLT 已提交
70 71 72 73
      PADDLE_ENFORCE_LT(
          queue_.size(), capacity_,
          platform::errors::OutOfRange("The queue size: %s out of capacity:%s",
                                       queue_.size(), capacity_));
Q
Qiao Longfei 已提交
74 75 76
      queue_.push_back(elem);
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
77 78 79
    return true;
  }

80
  bool Push(T &&elem) {
Q
Qiao Longfei 已提交
81 82 83
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
M
MRXLT 已提交
84 85 86 87
      PADDLE_ENFORCE_LT(
          queue_.size(), capacity_,
          platform::errors::OutOfRange("The queue size: %s out of capacity:%s",
                                       queue_.size(), capacity_));
Q
Qiao Longfei 已提交
88 89 90
      queue_.emplace_back(std::move(elem));
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
91 92 93 94 95
    return true;
  }

  T Pop() {
    std::unique_lock<std::mutex> lock(mutex_);
Q
Qiao Longfei 已提交
96
    cv_.wait(lock, [=] { return !queue_.empty(); });
Q
Qiao Longfei 已提交
97 98
    T rc(std::move(queue_.front()));
    queue_.pop_front();
Q
Qiao Longfei 已提交
99
    cv_.notify_one();
Q
Qiao Longfei 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    return rc;
  }

  size_t Cap() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return capacity_;
  }

  size_t Size() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return queue_.size();
  }

 private:
  const size_t capacity_;
  std::deque<T> queue_;

  mutable std::mutex mutex_;
Q
Qiao Longfei 已提交
118
  std::condition_variable cv_;
Q
Qiao Longfei 已提交
119 120
};

Q
Qiao Longfei 已提交
121 122 123 124
template <typename T, int MajorType = Eigen::RowMajor,
          typename IndexType = Eigen::DenseIndex>
using EigenVector = framework::EigenVector<T, MajorType, IndexType>;

1
123malin 已提交
125
template <typename T>
126 127 128
inline void MergeVars(const std::string &var_name,
                      const std::vector<std::shared_ptr<Variable>> &vars,
                      Scope *scope, bool merge_add = true) {
M
MRXLT 已提交
129 130
  PADDLE_ENFORCE_NE(vars.empty(), true, platform::errors::InvalidArgument(
                                            "vector vars are empty."));
Q
Qiao Longfei 已提交
131
  auto cpu_place = platform::CPUPlace();
132 133
  auto &var0 = vars[0];
  auto *out_var = scope->Var(var_name);
Q
Qiao Longfei 已提交
134 135
  if (var0->IsType<framework::LoDTensor>()) {
    auto dims = var0->Get<framework::LoDTensor>().dims();
1
123malin 已提交
136 137
    VLOG(3) << "merge " << var_name << " LoDTensor dims " << dims
            << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
138
    // init output tensor
139
    auto *out_t = out_var->GetMutable<framework::LoDTensor>();
1
123malin 已提交
140
    out_t->mutable_data<T>(dims, cpu_place);
Q
Qiao Longfei 已提交
141
    // check the input dims
142 143
    for (auto &var : vars) {
      auto &var_t = var->Get<framework::LoDTensor>();
M
MRXLT 已提交
144 145 146
      PADDLE_ENFORCE_EQ(
          var_t.dims(), dims,
          platform::errors::InvalidArgument("vars should have the same dims"));
Q
Qiao Longfei 已提交
147 148 149 150
    }

    // set output tensor to 0.
    auto cpu_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
151 152
    math::SetConstant<paddle::platform::CPUDeviceContext, T> constant_functor;
    constant_functor(cpu_ctx, out_t, static_cast<T>(0));
Q
Qiao Longfei 已提交
153
    // sum all vars to out
1
123malin 已提交
154
    auto result = EigenVector<T>::Flatten(*out_t);
155 156
    for (auto &var : vars) {
      auto &in_t = var->Get<framework::LoDTensor>();
1
123malin 已提交
157
      auto in = EigenVector<T>::Flatten(in_t);
Q
Qiao Longfei 已提交
158 159
      result.device(*cpu_ctx.eigen_device()) = result + in;
    }
1
123malin 已提交
160
    if (!merge_add) {
161
      result.device(*cpu_ctx.eigen_device()) =
1
123malin 已提交
162
          result / static_cast<T>(vars.size());
163
    }
Q
Qiao Longfei 已提交
164
  } else if (var0->IsType<framework::SelectedRows>()) {
165 166
    auto &slr0 = var0->Get<framework::SelectedRows>();
    auto *out_slr = out_var->GetMutable<framework::SelectedRows>();
Q
Qiao Longfei 已提交
167
    out_slr->mutable_rows()->clear();
1
123malin 已提交
168
    out_slr->mutable_value()->mutable_data<T>({{}}, cpu_place);
169
    std::vector<const paddle::framework::SelectedRows *> inputs;
Q
Qiao Longfei 已提交
170
    inputs.reserve(vars.size());
171
    for (auto &var : vars) {
Q
Qiao Longfei 已提交
172 173 174
      inputs.push_back(&var->Get<framework::SelectedRows>());
    }
    auto dev_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
175 176
    if (merge_add) {
      math::scatter::MergeAdd<paddle::platform::CPUDeviceContext, T> merge_add;
177 178
      merge_add(dev_ctx, inputs, out_slr);
    } else {
1
123malin 已提交
179
      math::scatter::MergeAverage<paddle::platform::CPUDeviceContext, T>
180 181 182 183
          merge_average;
      merge_average(dev_ctx, inputs, out_slr);
    }

Q
Qiao Longfei 已提交
184
    VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height()
1
123malin 已提交
185
            << " dims: " << slr0.value().dims() << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
186
  } else {
M
MRXLT 已提交
187 188
    PADDLE_THROW(platform::errors::InvalidArgument("unsupported var type: %s!",
                                                   var0->Type()));
Q
Qiao Longfei 已提交
189 190 191
  }
}

192 193
using RpcCtxMap = std::unordered_map<std::string, CommContext>;
using SparseValue = std::unordered_map<int64_t, std::vector<float>>;
Q
Qiao Longfei 已提交
194

Q
Qiao Longfei 已提交
195 196
class Communicator {
 public:
1
123malin 已提交
197
  Communicator();
198 199 200 201 202 203 204

  explicit Communicator(const std::map<std::string, std::string> &envs_) {
    for (auto &iter : envs_) {
      envs[iter.first] = iter.second;
    }
  }

T
tangwei12 已提交
205
  virtual ~Communicator() {}
Q
Qiao Longfei 已提交
206

T
tangwei12 已提交
207
  virtual void Start() = 0;
208

T
tangwei12 已提交
209
  virtual void Stop() = 0;
210

T
tangwei12 已提交
211
  virtual bool IsRunning() { return running_; }
Q
Qiao Longfei 已提交
212

213 214
  virtual void Clean() {}

215 216 217
  virtual void Send(const std::vector<std::string> &var_names,
                    const std::vector<std::string> &var_tables,
                    const framework::Scope &scope) = 0;
218

219
  virtual void RecvNoBarrier() {}
Q
Qiao Longfei 已提交
220

221
  virtual void Barrier() {}
222

223
  virtual void BarrierTriggerDecrement() {}
224

225 226
  virtual void BarrierTriggerReset(int init_counter) {}

227 228 229 230 231 232 233
  virtual void InitEnvs() = 0;

  virtual void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                        const RpcCtxMap &recv_varname_to_ctx,
                        Scope *recv_scope) {}

  static Communicator *GetInstance() { return communicator_.get(); }
Q
Qiao Longfei 已提交
234

T
tangwei12 已提交
235 236 237
  static std::shared_ptr<Communicator> GetInstantcePtr() {
    return communicator_;
  }
238

239
  template <typename T>
240 241 242 243 244
  static Communicator *InitInstance(
      const RpcCtxMap &send_ctx, const RpcCtxMap &recv_ctx, Scope *recv_scope,
      const std::map<std::string, std::string> &envs) {
    std::call_once(init_flag_, &Communicator::InitWithRpcCtx<T>, send_ctx,
                   recv_ctx, recv_scope, std::ref(envs));
245 246 247
    return communicator_.get();
  }

248
  // Init is called by InitInstance.
T
tangwei12 已提交
249
  template <typename T>
250 251 252
  static void InitWithRpcCtx(const RpcCtxMap &send_ctx,
                             const RpcCtxMap &recv_ctx, Scope *recv_scope,
                             const std::map<std::string, std::string> &envs) {
T
tangwei12 已提交
253
    if (communicator_.get() == nullptr) {
254
      communicator_.reset(new T(std::ref(envs)));
255 256
      communicator_->InitEnvs();
      communicator_->InitImpl(send_ctx, recv_ctx, recv_scope);
T
tangwei12 已提交
257 258 259 260 261
    }
  }

 protected:
  bool running_ = false;
262
  bool waiting_ = true;
T
tangwei12 已提交
263 264
  static std::shared_ptr<Communicator> communicator_;
  static std::once_flag init_flag_;
265
  std::unordered_map<std::string, std::string> envs;
T
tangwei12 已提交
266 267 268 269
};

class AsyncCommunicator : public Communicator {
 public:
1
123malin 已提交
270
  AsyncCommunicator() : Communicator() {}
271 272 273 274 275 276 277

  explicit AsyncCommunicator(const std::map<std::string, std::string> &envs)
      : Communicator(envs) {}

  ~AsyncCommunicator();

  void InitEnvs() {
278 279 280 281 282 283
    min_send_grad_num_before_recv_ =
        std::stoi(envs.at("communicator_min_send_grad_num_before_recv"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
284 285
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
286
    VLOG(0) << "AsyncCommunicator Initialized";
287
  }
288

T
tangwei12 已提交
289
  void Start() override;
290

T
tangwei12 已提交
291 292
  void Stop() override;

293 294 295
  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;
T
tangwei12 已提交
296

T
tangwei12 已提交
297 298
  void InitParams();

299
  virtual void MainThread();
T
tangwei12 已提交
300

301 302 303
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
T
tangwei12 已提交
304

305
  virtual void SendByCommunicator();
306
  virtual void SendGlobalStep(int batches);
307

308 309 310 311 312 313 314 315 316 317 318
  virtual void RecvByCommunicator();

  virtual void RecvNoBarrier();

  virtual void BarrierSend() {}

  virtual void BarrierRecv() {}

  virtual void BarrierWeakUp() {}

 protected:
319 320 321 322 323
  int min_send_grad_num_before_recv_;
  int thread_pool_size_;
  int max_merge_var_num_;
  int send_wait_times_;
  int send_queue_size_;
324 325
  int trainer_id_ = 0;
  bool need_global_step_ = false;
326

Q
Qiao Longfei 已提交
327 328 329
  std::unordered_map<std::string,
                     std::shared_ptr<BlockingQueue<std::shared_ptr<Variable>>>>
      send_varname_to_queue_;
Q
Qiao Longfei 已提交
330 331
  RpcCtxMap send_varname_to_ctx_;
  RpcCtxMap recv_varname_to_ctx_;
332 333
  std::unique_ptr<std::thread> main_thread_{nullptr};
  Scope *recv_scope_;                  // should be global scope
Q
Qiao Longfei 已提交
334
  std::unique_ptr<Scope> send_scope_;  // an independent scope
Q
Qiao Longfei 已提交
335 336
  std::unique_ptr<::ThreadPool> send_threadpool_{nullptr};
  std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr};
337
  std::atomic_uint grad_num_{0};  // the num of gradient sent since last recv
Q
Qiao Longfei 已提交
338 339
};

340
class HalfAsyncCommunicator : public AsyncCommunicator {
341
 public:
342
  HalfAsyncCommunicator() {}
343 344 345 346 347 348 349

  explicit HalfAsyncCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

350 351 352 353
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
354 355
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
356
    VLOG(0) << "HalfAsyncCommunicator Initialized";
357
  }
358

359 360 361 362
  void MainThread() override;

  void SendByCommunicator() override;

363 364
  void Clean() override;

365 366 367
  void Barrier() override;

  void BarrierTriggerDecrement() override;
368

369
  void BarrierTriggerReset(int initial_val) override;
370

T
tangwei12 已提交
371
  int BatchesCounter();
372

373
  void BarrierWeakUp();
374

T
tangwei12 已提交
375
 protected:
376 377 378 379 380 381 382
  // mutex for Wait for barrier
  std::mutex barrier_mutex_;
  std::condition_variable barrier_cond_;
  std::atomic<int64_t> barrier_trigger_{0};
  std::atomic<int64_t> barrier_counter_{0};
};

T
tangwei12 已提交
383 384 385
class SyncCommunicator : public HalfAsyncCommunicator {
 public:
  SyncCommunicator() : HalfAsyncCommunicator() {}
386 387 388 389 390 391 392 393 394 395 396 397 398 399

  explicit SyncCommunicator(const std::map<std::string, std::string> &envs)
      : HalfAsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));

T
tangwei12 已提交
400 401 402 403 404
    trainer_id_ = std::stoi(envs.at("trainer_id"));
    auto pserver_strings = envs.at("pserver_endpoints");
    pserver_endpoints_ = paddle::string::Split(pserver_strings, ',');
    VLOG(0) << "SyncCommunicator Initialized";
  }
405

T
tangwei12 已提交
406
  void BarrierSend();
407

T
tangwei12 已提交
408 409 410 411 412 413
  void BarrierRecv();

 private:
  std::vector<std::string> pserver_endpoints_{};
};

414
class GeoCommunicator : public AsyncCommunicator {
415
 public:
416 417 418 419 420 421 422 423
  GeoCommunicator() : AsyncCommunicator() {}

  explicit GeoCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;
424
  void MainThread() override;
425 426 427 428
  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
429
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
430 431 432 433 434 435
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));

    send_queue_size_ = max_merge_var_num_;
    trainers_ = std::stoi(envs.at("trainers"));
    sparse_attrs_ = envs.at("sparse_attrs");
    VLOG(0) << "GeoCommunicator Initialized";
436 437
  }

438 439 440
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
441

442
  void SendByCommunicator() { return; }
443 444

  std::vector<int64_t> MergeSparseIds(const std::string &send_varname);
445

446 447
  void SendSparse(const std::string &varname, int ep_idx,
                  const std::vector<int64_t> &sparse_ids);
448

449
  void SendDense(const std::string &varname);
450

451
  void SendGlobalStep(int batches) override {}
452

453
  void RecvByCommunicator() override;
454

455
  void RecvSparse(const std::string &varname, int ep_idx);
456

457
  void RecvDense(const std::string &varname);
C
Chengmo 已提交
458

T
tangwei12 已提交
459
  void InitParams();
460

461
  void InitSparse();
462

463
  void InitDense(const std::string varname);
464

465 466 467
 private:
  int trainers_;
  std::string sparse_attrs_;
468 469 470 471 472 473 474 475 476 477

  // parameter for delta calc and send
  std::shared_ptr<Scope> delta_scope_;

  // parameter for storage the pserver param after last recv
  std::shared_ptr<Scope> old_scope_;

  // parameter on pserver
  std::shared_ptr<Scope> pserver_scope_;

478
  int send_var_nums_ = 0;
479

480
  std::unordered_map<std::string, std::shared_ptr<SparseValue>> old_sparses_;
481 482 483 484 485

  std::unordered_map<
      std::string,
      std::shared_ptr<BlockingQueue<std::shared_ptr<std::vector<int64_t>>>>>
      sparse_id_queues_;
486 487
};

Q
Qiao Longfei 已提交
488 489 490
}  // namespace distributed
}  // namespace operators
}  // namespace paddle