communicator.h 14.4 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

17
#include <ThreadPool.h>
18
#include <atomic>
Q
Qiao Longfei 已提交
19
#include <deque>
20
#include <map>
Q
Qiao Longfei 已提交
21
#include <memory>
T
tangwei12 已提交
22
#include <numeric>
23
#include <set>
Q
Qiao Longfei 已提交
24
#include <string>
Q
Qiao Longfei 已提交
25
#include <unordered_map>
26
#include <unordered_set>
Q
Qiao Longfei 已提交
27
#include <utility>
Q
Qiao Longfei 已提交
28
#include <vector>
29
#include "gflags/gflags.h"
Q
Qiao Longfei 已提交
30 31 32

#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
T
tangwei12 已提交
33
#include "paddle/fluid/framework/variable_helper.h"
34
#include "paddle/fluid/operators/distributed/communicator_common.h"
C
Chengmo 已提交
35
#include "paddle/fluid/operators/distributed/distributed.h"
36
#include "paddle/fluid/operators/distributed/large_scale_kv.h"
C
Chengmo 已提交
37 38
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"
39
#include "paddle/fluid/operators/math/blas.h"
Q
Qiao Longfei 已提交
40 41
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
Q
Qiao Longfei 已提交
42 43 44
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
T
tangwei12 已提交
45
#include "paddle/fluid/string/split.h"
Q
Qiao Longfei 已提交
46

47 48
DECLARE_bool(communicator_is_sgd_optimizer);

Q
Qiao Longfei 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62
namespace paddle {
namespace operators {
namespace distributed {

using Scope = framework::Scope;
using Variable = framework::Variable;

template <typename T>
class BlockingQueue {
 public:
  explicit BlockingQueue(size_t capacity) : capacity_(capacity) {
    PADDLE_ENFORCE_GT(capacity_, 0, "The capacity must be greater than 0.");
  }

63
  bool Push(const T &elem) {
Q
Qiao Longfei 已提交
64 65 66 67 68 69 70
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
      PADDLE_ENFORCE_LT(queue_.size(), capacity_);
      queue_.push_back(elem);
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
71 72 73
    return true;
  }

74
  bool Push(T &&elem) {
Q
Qiao Longfei 已提交
75 76 77 78 79 80 81
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
      PADDLE_ENFORCE_LT(queue_.size(), capacity_);
      queue_.emplace_back(std::move(elem));
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
82 83 84 85 86
    return true;
  }

  T Pop() {
    std::unique_lock<std::mutex> lock(mutex_);
Q
Qiao Longfei 已提交
87
    cv_.wait(lock, [=] { return !queue_.empty(); });
Q
Qiao Longfei 已提交
88 89
    T rc(std::move(queue_.front()));
    queue_.pop_front();
Q
Qiao Longfei 已提交
90
    cv_.notify_one();
Q
Qiao Longfei 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
    return rc;
  }

  size_t Cap() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return capacity_;
  }

  size_t Size() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return queue_.size();
  }

 private:
  const size_t capacity_;
  std::deque<T> queue_;

  mutable std::mutex mutex_;
Q
Qiao Longfei 已提交
109
  std::condition_variable cv_;
Q
Qiao Longfei 已提交
110 111
};

Q
Qiao Longfei 已提交
112 113 114 115
template <typename T, int MajorType = Eigen::RowMajor,
          typename IndexType = Eigen::DenseIndex>
using EigenVector = framework::EigenVector<T, MajorType, IndexType>;

1
123malin 已提交
116
template <typename T>
117 118 119
inline void MergeVars(const std::string &var_name,
                      const std::vector<std::shared_ptr<Variable>> &vars,
                      Scope *scope, bool merge_add = true) {
Q
Qiao Longfei 已提交
120 121
  PADDLE_ENFORCE(!vars.empty(), "should have value to merge!");
  auto cpu_place = platform::CPUPlace();
122 123
  auto &var0 = vars[0];
  auto *out_var = scope->Var(var_name);
Q
Qiao Longfei 已提交
124 125
  if (var0->IsType<framework::LoDTensor>()) {
    auto dims = var0->Get<framework::LoDTensor>().dims();
1
123malin 已提交
126 127
    VLOG(3) << "merge " << var_name << " LoDTensor dims " << dims
            << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
128
    // init output tensor
129
    auto *out_t = out_var->GetMutable<framework::LoDTensor>();
1
123malin 已提交
130
    out_t->mutable_data<T>(dims, cpu_place);
Q
Qiao Longfei 已提交
131
    // check the input dims
132 133
    for (auto &var : vars) {
      auto &var_t = var->Get<framework::LoDTensor>();
Q
Qiao Longfei 已提交
134 135 136 137 138
      PADDLE_ENFORCE_EQ(var_t.dims(), dims, "should have the same dims");
    }

    // set output tensor to 0.
    auto cpu_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
139 140
    math::SetConstant<paddle::platform::CPUDeviceContext, T> constant_functor;
    constant_functor(cpu_ctx, out_t, static_cast<T>(0));
Q
Qiao Longfei 已提交
141
    // sum all vars to out
1
123malin 已提交
142
    auto result = EigenVector<T>::Flatten(*out_t);
143 144
    for (auto &var : vars) {
      auto &in_t = var->Get<framework::LoDTensor>();
1
123malin 已提交
145
      auto in = EigenVector<T>::Flatten(in_t);
Q
Qiao Longfei 已提交
146 147
      result.device(*cpu_ctx.eigen_device()) = result + in;
    }
1
123malin 已提交
148
    if (!merge_add) {
149
      result.device(*cpu_ctx.eigen_device()) =
1
123malin 已提交
150
          result / static_cast<T>(vars.size());
151
    }
Q
Qiao Longfei 已提交
152
  } else if (var0->IsType<framework::SelectedRows>()) {
153 154
    auto &slr0 = var0->Get<framework::SelectedRows>();
    auto *out_slr = out_var->GetMutable<framework::SelectedRows>();
Q
Qiao Longfei 已提交
155
    out_slr->mutable_rows()->clear();
1
123malin 已提交
156
    out_slr->mutable_value()->mutable_data<T>({{}}, cpu_place);
157
    std::vector<const paddle::framework::SelectedRows *> inputs;
Q
Qiao Longfei 已提交
158
    inputs.reserve(vars.size());
159
    for (auto &var : vars) {
Q
Qiao Longfei 已提交
160 161 162
      inputs.push_back(&var->Get<framework::SelectedRows>());
    }
    auto dev_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
163 164
    if (merge_add) {
      math::scatter::MergeAdd<paddle::platform::CPUDeviceContext, T> merge_add;
165 166
      merge_add(dev_ctx, inputs, out_slr);
    } else {
1
123malin 已提交
167
      math::scatter::MergeAverage<paddle::platform::CPUDeviceContext, T>
168 169 170 171
          merge_average;
      merge_average(dev_ctx, inputs, out_slr);
    }

Q
Qiao Longfei 已提交
172
    VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height()
1
123malin 已提交
173
            << " dims: " << slr0.value().dims() << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
174 175 176 177 178
  } else {
    PADDLE_THROW("unsupported var type!");
  }
}

179 180
using RpcCtxMap = std::unordered_map<std::string, CommContext>;
using SparseValue = std::unordered_map<int64_t, std::vector<float>>;
Q
Qiao Longfei 已提交
181

Q
Qiao Longfei 已提交
182 183
class Communicator {
 public:
1
123malin 已提交
184
  Communicator();
185 186 187 188 189 190 191

  explicit Communicator(const std::map<std::string, std::string> &envs_) {
    for (auto &iter : envs_) {
      envs[iter.first] = iter.second;
    }
  }

T
tangwei12 已提交
192
  virtual ~Communicator() {}
Q
Qiao Longfei 已提交
193

T
tangwei12 已提交
194
  virtual void Start() = 0;
195

T
tangwei12 已提交
196
  virtual void Stop() = 0;
197

T
tangwei12 已提交
198
  virtual bool IsRunning() { return running_; }
Q
Qiao Longfei 已提交
199

200 201
  virtual void Clean() {}

202 203 204
  virtual void Send(const std::vector<std::string> &var_names,
                    const std::vector<std::string> &var_tables,
                    const framework::Scope &scope) = 0;
205

206
  virtual void RecvNoBarrier() {}
Q
Qiao Longfei 已提交
207

208
  virtual void Barrier() {}
209

210
  virtual void BarrierTriggerDecrement() {}
211

212 213
  virtual void BarrierTriggerReset(int init_counter) {}

214 215 216 217 218 219 220
  virtual void InitEnvs() = 0;

  virtual void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                        const RpcCtxMap &recv_varname_to_ctx,
                        Scope *recv_scope) {}

  static Communicator *GetInstance() { return communicator_.get(); }
Q
Qiao Longfei 已提交
221

T
tangwei12 已提交
222 223 224
  static std::shared_ptr<Communicator> GetInstantcePtr() {
    return communicator_;
  }
225

226
  template <typename T>
227 228 229 230 231
  static Communicator *InitInstance(
      const RpcCtxMap &send_ctx, const RpcCtxMap &recv_ctx, Scope *recv_scope,
      const std::map<std::string, std::string> &envs) {
    std::call_once(init_flag_, &Communicator::InitWithRpcCtx<T>, send_ctx,
                   recv_ctx, recv_scope, std::ref(envs));
232 233 234
    return communicator_.get();
  }

235
  // Init is called by InitInstance.
T
tangwei12 已提交
236
  template <typename T>
237 238 239
  static void InitWithRpcCtx(const RpcCtxMap &send_ctx,
                             const RpcCtxMap &recv_ctx, Scope *recv_scope,
                             const std::map<std::string, std::string> &envs) {
T
tangwei12 已提交
240
    if (communicator_.get() == nullptr) {
241
      communicator_.reset(new T(std::ref(envs)));
242 243
      communicator_->InitEnvs();
      communicator_->InitImpl(send_ctx, recv_ctx, recv_scope);
T
tangwei12 已提交
244 245 246 247 248
    }
  }

 protected:
  bool running_ = false;
249
  bool waiting_ = true;
T
tangwei12 已提交
250 251
  static std::shared_ptr<Communicator> communicator_;
  static std::once_flag init_flag_;
252
  std::unordered_map<std::string, std::string> envs;
T
tangwei12 已提交
253 254 255 256
};

class AsyncCommunicator : public Communicator {
 public:
1
123malin 已提交
257
  AsyncCommunicator() : Communicator() {}
258 259 260 261 262 263 264

  explicit AsyncCommunicator(const std::map<std::string, std::string> &envs)
      : Communicator(envs) {}

  ~AsyncCommunicator();

  void InitEnvs() {
265 266 267 268 269 270
    min_send_grad_num_before_recv_ =
        std::stoi(envs.at("communicator_min_send_grad_num_before_recv"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
271 272
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
273
    VLOG(0) << "AsyncCommunicator Initialized";
274
  }
275

T
tangwei12 已提交
276
  void Start() override;
277

T
tangwei12 已提交
278 279
  void Stop() override;

280 281 282
  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;
T
tangwei12 已提交
283

T
tangwei12 已提交
284 285
  void InitParams();

286
  void MainThread();
T
tangwei12 已提交
287

288 289 290
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
T
tangwei12 已提交
291

292
  virtual void SendByCommunicator(int batches);
Q
Qiao Longfei 已提交
293

294
  virtual void SendGlobalStep(int batches);
295

296 297 298 299
  virtual void RecvByCommunicator();

  virtual void RecvNoBarrier();

T
tangwei12 已提交
300
  virtual int BatchesCounter();
301 302 303 304 305 306 307 308

  virtual void BarrierSend() {}

  virtual void BarrierRecv() {}

  virtual void BarrierWeakUp() {}

 protected:
309 310 311 312 313
  int min_send_grad_num_before_recv_;
  int thread_pool_size_;
  int max_merge_var_num_;
  int send_wait_times_;
  int send_queue_size_;
314 315
  int trainer_id_ = 0;
  bool need_global_step_ = false;
316

Q
Qiao Longfei 已提交
317 318 319
  std::unordered_map<std::string,
                     std::shared_ptr<BlockingQueue<std::shared_ptr<Variable>>>>
      send_varname_to_queue_;
Q
Qiao Longfei 已提交
320 321
  RpcCtxMap send_varname_to_ctx_;
  RpcCtxMap recv_varname_to_ctx_;
322 323
  std::unique_ptr<std::thread> main_thread_{nullptr};
  Scope *recv_scope_;                  // should be global scope
Q
Qiao Longfei 已提交
324
  std::unique_ptr<Scope> send_scope_;  // an independent scope
Q
Qiao Longfei 已提交
325 326
  std::unique_ptr<::ThreadPool> send_threadpool_{nullptr};
  std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr};
327
  std::atomic_uint grad_num_{0};  // the num of gradient sent since last recv
Q
Qiao Longfei 已提交
328 329
};

330
class HalfAsyncCommunicator : public AsyncCommunicator {
331
 public:
332
  HalfAsyncCommunicator() {}
333 334 335 336 337 338 339

  explicit HalfAsyncCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

340 341 342 343
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
344 345
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
346
    VLOG(0) << "HalfAsyncCommunicator Initialized";
347
  }
348

349 350
  void Clean() override;

351 352 353
  void Barrier() override;

  void BarrierTriggerDecrement() override;
354

355
  void BarrierTriggerReset(int initial_val) override;
356

T
tangwei12 已提交
357
  int BatchesCounter();
358

359
  void BarrierWeakUp();
360

T
tangwei12 已提交
361
 protected:
362 363 364 365 366 367 368
  // mutex for Wait for barrier
  std::mutex barrier_mutex_;
  std::condition_variable barrier_cond_;
  std::atomic<int64_t> barrier_trigger_{0};
  std::atomic<int64_t> barrier_counter_{0};
};

T
tangwei12 已提交
369 370 371
class SyncCommunicator : public HalfAsyncCommunicator {
 public:
  SyncCommunicator() : HalfAsyncCommunicator() {}
372 373 374 375 376 377 378 379 380 381 382 383 384 385

  explicit SyncCommunicator(const std::map<std::string, std::string> &envs)
      : HalfAsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));

T
tangwei12 已提交
386 387 388 389 390
    trainer_id_ = std::stoi(envs.at("trainer_id"));
    auto pserver_strings = envs.at("pserver_endpoints");
    pserver_endpoints_ = paddle::string::Split(pserver_strings, ',');
    VLOG(0) << "SyncCommunicator Initialized";
  }
391

T
tangwei12 已提交
392
  void BarrierSend();
393

T
tangwei12 已提交
394 395 396 397 398 399
  void BarrierRecv();

 private:
  std::vector<std::string> pserver_endpoints_{};
};

400
class GeoCommunicator : public AsyncCommunicator {
401
 public:
402 403 404 405 406 407 408 409 410 411 412 413 414
  GeoCommunicator() : AsyncCommunicator() {}

  explicit GeoCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
415
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
416 417 418 419 420 421
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));

    send_queue_size_ = max_merge_var_num_;
    trainers_ = std::stoi(envs.at("trainers"));
    sparse_attrs_ = envs.at("sparse_attrs");
    VLOG(0) << "GeoCommunicator Initialized";
422 423
  }

424 425 426
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
427

428
  void SendByCommunicator(int batches) override;
429

430
  void SendSparse(const std::string &varname, int batches);
431

432
  void SendDense(const std::string &varname);
433

434
  void SendGlobalStep(int batches) override {}
435

436
  void RecvByCommunicator() override;
437

438
  void RecvSparse(const std::string &varname);
439

440
  void RecvDense(const std::string &varname);
C
Chengmo 已提交
441

T
tangwei12 已提交
442
  void InitParams();
443

444
  void InitSparse();
445

446
  void InitDense(const std::string varname);
447

448 449 450
 private:
  int trainers_;
  std::string sparse_attrs_;
451 452 453 454 455 456 457 458 459 460

  // parameter for delta calc and send
  std::shared_ptr<Scope> delta_scope_;

  // parameter for storage the pserver param after last recv
  std::shared_ptr<Scope> old_scope_;

  // parameter on pserver
  std::shared_ptr<Scope> pserver_scope_;

461 462 463
  std::unordered_map<std::string,
                     std::shared_ptr<BlockingQueue<std::vector<int64_t>>>>
      send_ids_to_queue_;
C
Chengmo 已提交
464

465
  std::unordered_map<std::string, std::shared_ptr<SparseValue>> old_sparses_;
466 467
};

Q
Qiao Longfei 已提交
468 469 470
}  // namespace distributed
}  // namespace operators
}  // namespace paddle