communicator.h 15.2 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

17
#include <ThreadPool.h>
W
wanghuancoder 已提交
18
#include <stdint.h>
19
#include <atomic>
Q
Qiao Longfei 已提交
20
#include <deque>
21
#include <map>
Q
Qiao Longfei 已提交
22
#include <memory>
T
tangwei12 已提交
23
#include <numeric>
24
#include <set>
Q
Qiao Longfei 已提交
25
#include <string>
Q
Qiao Longfei 已提交
26
#include <unordered_map>
27
#include <unordered_set>
Q
Qiao Longfei 已提交
28
#include <utility>
Q
Qiao Longfei 已提交
29 30
#include <vector>

W
wanghuancoder 已提交
31
#include "gflags/gflags.h"
Q
Qiao Longfei 已提交
32 33
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
T
tangwei12 已提交
34
#include "paddle/fluid/framework/variable_helper.h"
35
#include "paddle/fluid/operators/distributed/communicator_common.h"
C
Chengmo 已提交
36
#include "paddle/fluid/operators/distributed/distributed.h"
37
#include "paddle/fluid/operators/distributed/large_scale_kv.h"
C
Chengmo 已提交
38 39
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"
40
#include "paddle/fluid/operators/math/blas.h"
Q
Qiao Longfei 已提交
41 42
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
Q
Qiao Longfei 已提交
43 44 45
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
T
tangwei12 已提交
46
#include "paddle/fluid/string/split.h"
Q
Qiao Longfei 已提交
47

48 49
DECLARE_bool(communicator_is_sgd_optimizer);

Q
Qiao Longfei 已提交
50 51 52 53 54 55 56 57 58 59 60
namespace paddle {
namespace operators {
namespace distributed {

using Scope = framework::Scope;
using Variable = framework::Variable;

template <typename T>
class BlockingQueue {
 public:
  explicit BlockingQueue(size_t capacity) : capacity_(capacity) {
M
MRXLT 已提交
61 62 63
    PADDLE_ENFORCE_GT(capacity_, 0,
                      platform::errors::InvalidArgument(
                          "The capacity must be greater than 0."));
Q
Qiao Longfei 已提交
64 65
  }

66
  bool Push(const T &elem) {
Q
Qiao Longfei 已提交
67 68 69
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
M
MRXLT 已提交
70 71 72 73
      PADDLE_ENFORCE_LT(
          queue_.size(), capacity_,
          platform::errors::OutOfRange("The queue size: %s out of capacity:%s",
                                       queue_.size(), capacity_));
Q
Qiao Longfei 已提交
74 75 76
      queue_.push_back(elem);
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
77 78 79
    return true;
  }

80
  bool Push(T &&elem) {
Q
Qiao Longfei 已提交
81 82 83
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
M
MRXLT 已提交
84 85 86 87
      PADDLE_ENFORCE_LT(
          queue_.size(), capacity_,
          platform::errors::OutOfRange("The queue size: %s out of capacity:%s",
                                       queue_.size(), capacity_));
Q
Qiao Longfei 已提交
88 89 90
      queue_.emplace_back(std::move(elem));
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
91 92 93 94 95
    return true;
  }

  T Pop() {
    std::unique_lock<std::mutex> lock(mutex_);
Q
Qiao Longfei 已提交
96
    cv_.wait(lock, [=] { return !queue_.empty(); });
Q
Qiao Longfei 已提交
97 98
    T rc(std::move(queue_.front()));
    queue_.pop_front();
Q
Qiao Longfei 已提交
99
    cv_.notify_one();
Q
Qiao Longfei 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    return rc;
  }

  size_t Cap() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return capacity_;
  }

  size_t Size() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return queue_.size();
  }

 private:
  const size_t capacity_;
  std::deque<T> queue_;

  mutable std::mutex mutex_;
Q
Qiao Longfei 已提交
118
  std::condition_variable cv_;
Q
Qiao Longfei 已提交
119 120
};

Q
Qiao Longfei 已提交
121 122 123 124
template <typename T, int MajorType = Eigen::RowMajor,
          typename IndexType = Eigen::DenseIndex>
using EigenVector = framework::EigenVector<T, MajorType, IndexType>;

1
123malin 已提交
125
template <typename T>
126 127 128
inline void MergeVars(const std::string &var_name,
                      const std::vector<std::shared_ptr<Variable>> &vars,
                      Scope *scope, bool merge_add = true) {
M
MRXLT 已提交
129 130
  PADDLE_ENFORCE_NE(vars.empty(), true, platform::errors::InvalidArgument(
                                            "vector vars are empty."));
Q
Qiao Longfei 已提交
131
  auto cpu_place = platform::CPUPlace();
132 133
  auto &var0 = vars[0];
  auto *out_var = scope->Var(var_name);
Q
Qiao Longfei 已提交
134 135
  if (var0->IsType<framework::LoDTensor>()) {
    auto dims = var0->Get<framework::LoDTensor>().dims();
1
123malin 已提交
136 137
    VLOG(3) << "merge " << var_name << " LoDTensor dims " << dims
            << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
138
    // init output tensor
139
    auto *out_t = out_var->GetMutable<framework::LoDTensor>();
1
123malin 已提交
140
    out_t->mutable_data<T>(dims, cpu_place);
Q
Qiao Longfei 已提交
141
    // check the input dims
142 143
    for (auto &var : vars) {
      auto &var_t = var->Get<framework::LoDTensor>();
M
MRXLT 已提交
144 145 146
      PADDLE_ENFORCE_EQ(
          var_t.dims(), dims,
          platform::errors::InvalidArgument("vars should have the same dims"));
Q
Qiao Longfei 已提交
147 148 149 150
    }

    // set output tensor to 0.
    auto cpu_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
151 152
    math::SetConstant<paddle::platform::CPUDeviceContext, T> constant_functor;
    constant_functor(cpu_ctx, out_t, static_cast<T>(0));
Q
Qiao Longfei 已提交
153
    // sum all vars to out
1
123malin 已提交
154
    auto result = EigenVector<T>::Flatten(*out_t);
155 156
    for (auto &var : vars) {
      auto &in_t = var->Get<framework::LoDTensor>();
1
123malin 已提交
157
      auto in = EigenVector<T>::Flatten(in_t);
Q
Qiao Longfei 已提交
158 159
      result.device(*cpu_ctx.eigen_device()) = result + in;
    }
1
123malin 已提交
160
    if (!merge_add) {
161
      result.device(*cpu_ctx.eigen_device()) =
1
123malin 已提交
162
          result / static_cast<T>(vars.size());
163
    }
Q
Qiao Longfei 已提交
164
  } else if (var0->IsType<framework::SelectedRows>()) {
165 166
    auto &slr0 = var0->Get<framework::SelectedRows>();
    auto *out_slr = out_var->GetMutable<framework::SelectedRows>();
Q
Qiao Longfei 已提交
167
    out_slr->mutable_rows()->clear();
1
123malin 已提交
168
    out_slr->mutable_value()->mutable_data<T>({{}}, cpu_place);
169
    std::vector<const paddle::framework::SelectedRows *> inputs;
Q
Qiao Longfei 已提交
170
    inputs.reserve(vars.size());
171
    for (auto &var : vars) {
Q
Qiao Longfei 已提交
172 173 174
      inputs.push_back(&var->Get<framework::SelectedRows>());
    }
    auto dev_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
175 176
    if (merge_add) {
      math::scatter::MergeAdd<paddle::platform::CPUDeviceContext, T> merge_add;
177 178
      merge_add(dev_ctx, inputs, out_slr);
    } else {
1
123malin 已提交
179
      math::scatter::MergeAverage<paddle::platform::CPUDeviceContext, T>
180 181 182 183
          merge_average;
      merge_average(dev_ctx, inputs, out_slr);
    }

Q
Qiao Longfei 已提交
184
    VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height()
1
123malin 已提交
185
            << " dims: " << slr0.value().dims() << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
186
  } else {
M
MRXLT 已提交
187 188
    PADDLE_THROW(platform::errors::InvalidArgument("unsupported var type: %s!",
                                                   var0->Type()));
Q
Qiao Longfei 已提交
189 190 191
  }
}

192 193
using RpcCtxMap = std::unordered_map<std::string, CommContext>;
using SparseValue = std::unordered_map<int64_t, std::vector<float>>;
Q
Qiao Longfei 已提交
194

Q
Qiao Longfei 已提交
195 196
class Communicator {
 public:
1
123malin 已提交
197
  Communicator();
198 199 200 201 202 203 204

  explicit Communicator(const std::map<std::string, std::string> &envs_) {
    for (auto &iter : envs_) {
      envs[iter.first] = iter.second;
    }
  }

T
tangwei12 已提交
205
  virtual ~Communicator() {}
Q
Qiao Longfei 已提交
206

T
tangwei12 已提交
207
  virtual void Start() = 0;
208

T
tangwei12 已提交
209
  virtual void Stop() = 0;
210

T
tangwei12 已提交
211
  virtual bool IsRunning() { return running_; }
Q
Qiao Longfei 已提交
212

213 214
  virtual void Clean() {}

215 216 217
  virtual void Send(const std::vector<std::string> &var_names,
                    const std::vector<std::string> &var_tables,
                    const framework::Scope &scope) = 0;
218

219
  virtual void RecvNoBarrier() {}
Q
Qiao Longfei 已提交
220

221
  virtual void Barrier() {}
222

223
  virtual void BarrierTriggerDecrement() {}
224

225 226
  virtual void BarrierTriggerReset(int init_counter) {}

227 228 229 230 231 232 233
  virtual void InitEnvs() = 0;

  virtual void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                        const RpcCtxMap &recv_varname_to_ctx,
                        Scope *recv_scope) {}

  static Communicator *GetInstance() { return communicator_.get(); }
Q
Qiao Longfei 已提交
234

T
tangwei12 已提交
235 236 237
  static std::shared_ptr<Communicator> GetInstantcePtr() {
    return communicator_;
  }
238

239
  template <typename T>
240 241 242 243 244
  static Communicator *InitInstance(
      const RpcCtxMap &send_ctx, const RpcCtxMap &recv_ctx, Scope *recv_scope,
      const std::map<std::string, std::string> &envs) {
    std::call_once(init_flag_, &Communicator::InitWithRpcCtx<T>, send_ctx,
                   recv_ctx, recv_scope, std::ref(envs));
245 246 247
    return communicator_.get();
  }

248
  // Init is called by InitInstance.
T
tangwei12 已提交
249
  template <typename T>
250 251 252
  static void InitWithRpcCtx(const RpcCtxMap &send_ctx,
                             const RpcCtxMap &recv_ctx, Scope *recv_scope,
                             const std::map<std::string, std::string> &envs) {
T
tangwei12 已提交
253
    if (communicator_.get() == nullptr) {
254
      communicator_.reset(new T(std::ref(envs)));
255 256
      communicator_->InitEnvs();
      communicator_->InitImpl(send_ctx, recv_ctx, recv_scope);
T
tangwei12 已提交
257 258 259 260 261
    }
  }

 protected:
  bool running_ = false;
262
  bool waiting_ = true;
T
tangwei12 已提交
263 264
  static std::shared_ptr<Communicator> communicator_;
  static std::once_flag init_flag_;
265
  std::unordered_map<std::string, std::string> envs;
T
tangwei12 已提交
266 267 268 269
};

class AsyncCommunicator : public Communicator {
 public:
1
123malin 已提交
270
  AsyncCommunicator() : Communicator() {}
271 272 273 274 275 276 277

  explicit AsyncCommunicator(const std::map<std::string, std::string> &envs)
      : Communicator(envs) {}

  ~AsyncCommunicator();

  void InitEnvs() {
278 279 280 281 282 283
    min_send_grad_num_before_recv_ =
        std::stoi(envs.at("communicator_min_send_grad_num_before_recv"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
284 285
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
286
    VLOG(0) << "AsyncCommunicator Initialized";
287
  }
288

T
tangwei12 已提交
289
  void Start() override;
290

T
tangwei12 已提交
291 292
  void Stop() override;

293 294 295
  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;
T
tangwei12 已提交
296

T
tangwei12 已提交
297 298
  void InitParams();

299
  virtual void MainThread();
T
tangwei12 已提交
300

301 302 303
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
T
tangwei12 已提交
304

305
  virtual void SendByCommunicator(int batches);
Q
Qiao Longfei 已提交
306

307
  virtual void SendGlobalStep(int batches);
308

309 310 311 312
  virtual void RecvByCommunicator();

  virtual void RecvNoBarrier();

T
tangwei12 已提交
313
  virtual int BatchesCounter();
314 315 316 317 318 319 320 321

  virtual void BarrierSend() {}

  virtual void BarrierRecv() {}

  virtual void BarrierWeakUp() {}

 protected:
322 323 324 325 326
  int min_send_grad_num_before_recv_;
  int thread_pool_size_;
  int max_merge_var_num_;
  int send_wait_times_;
  int send_queue_size_;
327 328
  int trainer_id_ = 0;
  bool need_global_step_ = false;
329

Q
Qiao Longfei 已提交
330 331 332
  std::unordered_map<std::string,
                     std::shared_ptr<BlockingQueue<std::shared_ptr<Variable>>>>
      send_varname_to_queue_;
Q
Qiao Longfei 已提交
333 334
  RpcCtxMap send_varname_to_ctx_;
  RpcCtxMap recv_varname_to_ctx_;
335 336
  std::unique_ptr<std::thread> main_thread_{nullptr};
  Scope *recv_scope_;                  // should be global scope
Q
Qiao Longfei 已提交
337
  std::unique_ptr<Scope> send_scope_;  // an independent scope
Q
Qiao Longfei 已提交
338 339
  std::unique_ptr<::ThreadPool> send_threadpool_{nullptr};
  std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr};
340
  std::atomic_uint grad_num_{0};  // the num of gradient sent since last recv
Q
Qiao Longfei 已提交
341 342
};

343
class HalfAsyncCommunicator : public AsyncCommunicator {
344
 public:
345
  HalfAsyncCommunicator() {}
346 347 348 349 350 351 352

  explicit HalfAsyncCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

353 354 355 356
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
357 358
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
359
    VLOG(0) << "HalfAsyncCommunicator Initialized";
360
  }
361

362 363
  void Clean() override;

364 365 366
  void Barrier() override;

  void BarrierTriggerDecrement() override;
367

368
  void BarrierTriggerReset(int initial_val) override;
369

T
tangwei12 已提交
370
  int BatchesCounter();
371

372
  void BarrierWeakUp();
373

T
tangwei12 已提交
374
 protected:
375 376 377 378 379 380 381
  // mutex for Wait for barrier
  std::mutex barrier_mutex_;
  std::condition_variable barrier_cond_;
  std::atomic<int64_t> barrier_trigger_{0};
  std::atomic<int64_t> barrier_counter_{0};
};

T
tangwei12 已提交
382 383 384
class SyncCommunicator : public HalfAsyncCommunicator {
 public:
  SyncCommunicator() : HalfAsyncCommunicator() {}
385 386 387 388 389 390 391 392 393 394 395 396 397 398

  explicit SyncCommunicator(const std::map<std::string, std::string> &envs)
      : HalfAsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));

T
tangwei12 已提交
399 400 401 402 403
    trainer_id_ = std::stoi(envs.at("trainer_id"));
    auto pserver_strings = envs.at("pserver_endpoints");
    pserver_endpoints_ = paddle::string::Split(pserver_strings, ',');
    VLOG(0) << "SyncCommunicator Initialized";
  }
404

T
tangwei12 已提交
405
  void BarrierSend();
406

T
tangwei12 已提交
407 408 409 410 411 412
  void BarrierRecv();

 private:
  std::vector<std::string> pserver_endpoints_{};
};

413
class GeoCommunicator : public AsyncCommunicator {
414
 public:
415 416 417 418 419 420 421 422
  GeoCommunicator() : AsyncCommunicator() {}

  explicit GeoCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;
423
  void MainThread() override;
424 425 426 427
  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
428
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
429 430 431 432 433 434
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));

    send_queue_size_ = max_merge_var_num_;
    trainers_ = std::stoi(envs.at("trainers"));
    sparse_attrs_ = envs.at("sparse_attrs");
    VLOG(0) << "GeoCommunicator Initialized";
435 436
  }

437 438 439
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
440

441 442 443
  void SendByCommunicator(int batches) { return; }

  std::vector<int64_t> MergeSparseIds(const std::string &send_varname);
444

445 446
  void SendSparse(const std::string &varname, int ep_idx,
                  const std::vector<int64_t> &sparse_ids);
447

448
  void SendDense(const std::string &varname);
449

450
  void SendGlobalStep(int batches) override {}
451

452
  void RecvByCommunicator() override;
453

454
  void RecvSparse(const std::string &varname, int ep_idx);
455

456
  void RecvDense(const std::string &varname);
C
Chengmo 已提交
457

T
tangwei12 已提交
458
  void InitParams();
459

460
  void InitSparse();
461

462
  void InitDense(const std::string varname);
463

464 465 466
 private:
  int trainers_;
  std::string sparse_attrs_;
467 468 469 470 471 472 473 474 475 476

  // parameter for delta calc and send
  std::shared_ptr<Scope> delta_scope_;

  // parameter for storage the pserver param after last recv
  std::shared_ptr<Scope> old_scope_;

  // parameter on pserver
  std::shared_ptr<Scope> pserver_scope_;

477
  int send_var_nums_ = 0;
478
  std::unordered_map<std::string, std::shared_ptr<SparseValue>> old_sparses_;
479 480 481 482 483

  std::unordered_map<
      std::string,
      std::shared_ptr<BlockingQueue<std::shared_ptr<std::vector<int64_t>>>>>
      sparse_id_queues_;
484 485
};

Q
Qiao Longfei 已提交
486 487 488
}  // namespace distributed
}  // namespace operators
}  // namespace paddle