communicator.h 14.3 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

17
#include <ThreadPool.h>
18
#include <atomic>
Q
Qiao Longfei 已提交
19
#include <deque>
20
#include <map>
Q
Qiao Longfei 已提交
21
#include <memory>
22
#include <set>
Q
Qiao Longfei 已提交
23
#include <string>
Q
Qiao Longfei 已提交
24
#include <unordered_map>
25
#include <unordered_set>
Q
Qiao Longfei 已提交
26
#include <utility>
Q
Qiao Longfei 已提交
27
#include <vector>
28
#include "gflags/gflags.h"
Q
Qiao Longfei 已提交
29 30 31

#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
32
#include "paddle/fluid/operators/distributed/communicator_common.h"
C
Chengmo 已提交
33
#include "paddle/fluid/operators/distributed/distributed.h"
34
#include "paddle/fluid/operators/distributed/large_scale_kv.h"
C
Chengmo 已提交
35 36
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed_ops/send_recv_util.h"
37
#include "paddle/fluid/operators/math/blas.h"
Q
Qiao Longfei 已提交
38 39
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
Q
Qiao Longfei 已提交
40 41 42
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
T
tangwei12 已提交
43
#include "paddle/fluid/string/split.h"
Q
Qiao Longfei 已提交
44

45 46
DECLARE_bool(communicator_is_sgd_optimizer);

Q
Qiao Longfei 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60
namespace paddle {
namespace operators {
namespace distributed {

using Scope = framework::Scope;
using Variable = framework::Variable;

template <typename T>
class BlockingQueue {
 public:
  explicit BlockingQueue(size_t capacity) : capacity_(capacity) {
    PADDLE_ENFORCE_GT(capacity_, 0, "The capacity must be greater than 0.");
  }

61
  bool Push(const T &elem) {
Q
Qiao Longfei 已提交
62 63 64 65 66 67 68
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
      PADDLE_ENFORCE_LT(queue_.size(), capacity_);
      queue_.push_back(elem);
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
69 70 71
    return true;
  }

72
  bool Push(T &&elem) {
Q
Qiao Longfei 已提交
73 74 75 76 77 78 79
    {
      std::unique_lock<std::mutex> lock(mutex_);
      cv_.wait(lock, [&] { return queue_.size() < capacity_; });
      PADDLE_ENFORCE_LT(queue_.size(), capacity_);
      queue_.emplace_back(std::move(elem));
    }
    cv_.notify_one();
Q
Qiao Longfei 已提交
80 81 82 83 84
    return true;
  }

  T Pop() {
    std::unique_lock<std::mutex> lock(mutex_);
Q
Qiao Longfei 已提交
85
    cv_.wait(lock, [=] { return !queue_.empty(); });
Q
Qiao Longfei 已提交
86 87
    T rc(std::move(queue_.front()));
    queue_.pop_front();
Q
Qiao Longfei 已提交
88
    cv_.notify_one();
Q
Qiao Longfei 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
    return rc;
  }

  size_t Cap() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return capacity_;
  }

  size_t Size() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return queue_.size();
  }

 private:
  const size_t capacity_;
  std::deque<T> queue_;

  mutable std::mutex mutex_;
Q
Qiao Longfei 已提交
107
  std::condition_variable cv_;
Q
Qiao Longfei 已提交
108 109
};

Q
Qiao Longfei 已提交
110 111 112 113
template <typename T, int MajorType = Eigen::RowMajor,
          typename IndexType = Eigen::DenseIndex>
using EigenVector = framework::EigenVector<T, MajorType, IndexType>;

1
123malin 已提交
114
template <typename T>
115 116 117
inline void MergeVars(const std::string &var_name,
                      const std::vector<std::shared_ptr<Variable>> &vars,
                      Scope *scope, bool merge_add = true) {
Q
Qiao Longfei 已提交
118 119
  PADDLE_ENFORCE(!vars.empty(), "should have value to merge!");
  auto cpu_place = platform::CPUPlace();
120 121
  auto &var0 = vars[0];
  auto *out_var = scope->Var(var_name);
Q
Qiao Longfei 已提交
122 123
  if (var0->IsType<framework::LoDTensor>()) {
    auto dims = var0->Get<framework::LoDTensor>().dims();
1
123malin 已提交
124 125
    VLOG(3) << "merge " << var_name << " LoDTensor dims " << dims
            << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
126
    // init output tensor
127
    auto *out_t = out_var->GetMutable<framework::LoDTensor>();
1
123malin 已提交
128
    out_t->mutable_data<T>(dims, cpu_place);
Q
Qiao Longfei 已提交
129
    // check the input dims
130 131
    for (auto &var : vars) {
      auto &var_t = var->Get<framework::LoDTensor>();
Q
Qiao Longfei 已提交
132 133 134 135 136
      PADDLE_ENFORCE_EQ(var_t.dims(), dims, "should have the same dims");
    }

    // set output tensor to 0.
    auto cpu_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
137 138
    math::SetConstant<paddle::platform::CPUDeviceContext, T> constant_functor;
    constant_functor(cpu_ctx, out_t, static_cast<T>(0));
Q
Qiao Longfei 已提交
139
    // sum all vars to out
1
123malin 已提交
140
    auto result = EigenVector<T>::Flatten(*out_t);
141 142
    for (auto &var : vars) {
      auto &in_t = var->Get<framework::LoDTensor>();
1
123malin 已提交
143
      auto in = EigenVector<T>::Flatten(in_t);
Q
Qiao Longfei 已提交
144 145
      result.device(*cpu_ctx.eigen_device()) = result + in;
    }
1
123malin 已提交
146
    if (!merge_add) {
147
      result.device(*cpu_ctx.eigen_device()) =
1
123malin 已提交
148
          result / static_cast<T>(vars.size());
149
    }
Q
Qiao Longfei 已提交
150
  } else if (var0->IsType<framework::SelectedRows>()) {
151 152
    auto &slr0 = var0->Get<framework::SelectedRows>();
    auto *out_slr = out_var->GetMutable<framework::SelectedRows>();
Q
Qiao Longfei 已提交
153
    out_slr->mutable_rows()->clear();
1
123malin 已提交
154
    out_slr->mutable_value()->mutable_data<T>({{}}, cpu_place);
155
    std::vector<const paddle::framework::SelectedRows *> inputs;
Q
Qiao Longfei 已提交
156
    inputs.reserve(vars.size());
157
    for (auto &var : vars) {
Q
Qiao Longfei 已提交
158 159 160
      inputs.push_back(&var->Get<framework::SelectedRows>());
    }
    auto dev_ctx = paddle::platform::CPUDeviceContext();
1
123malin 已提交
161 162
    if (merge_add) {
      math::scatter::MergeAdd<paddle::platform::CPUDeviceContext, T> merge_add;
163 164
      merge_add(dev_ctx, inputs, out_slr);
    } else {
1
123malin 已提交
165
      math::scatter::MergeAverage<paddle::platform::CPUDeviceContext, T>
166 167 168 169
          merge_average;
      merge_average(dev_ctx, inputs, out_slr);
    }

Q
Qiao Longfei 已提交
170
    VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height()
1
123malin 已提交
171
            << " dims: " << slr0.value().dims() << "; merge add: " << merge_add;
Q
Qiao Longfei 已提交
172 173 174 175 176
  } else {
    PADDLE_THROW("unsupported var type!");
  }
}

177 178
using RpcCtxMap = std::unordered_map<std::string, CommContext>;
using SparseValue = std::unordered_map<int64_t, std::vector<float>>;
Q
Qiao Longfei 已提交
179

Q
Qiao Longfei 已提交
180 181
class Communicator {
 public:
1
123malin 已提交
182
  Communicator();
183 184 185 186 187 188 189

  explicit Communicator(const std::map<std::string, std::string> &envs_) {
    for (auto &iter : envs_) {
      envs[iter.first] = iter.second;
    }
  }

T
tangwei12 已提交
190
  virtual ~Communicator() {}
Q
Qiao Longfei 已提交
191

T
tangwei12 已提交
192
  virtual void Start() = 0;
193

T
tangwei12 已提交
194
  virtual void Stop() = 0;
195

T
tangwei12 已提交
196
  virtual bool IsRunning() { return running_; }
Q
Qiao Longfei 已提交
197

198 199
  virtual void Clean() {}

200 201 202
  virtual void Send(const std::vector<std::string> &var_names,
                    const std::vector<std::string> &var_tables,
                    const framework::Scope &scope) = 0;
203

204
  virtual void RecvNoBarrier() {}
Q
Qiao Longfei 已提交
205

206
  virtual void Barrier() {}
207

208
  virtual void BarrierTriggerDecrement() {}
209

210 211
  virtual void BarrierTriggerReset(int init_counter) {}

212 213 214 215 216 217 218
  virtual void InitEnvs() = 0;

  virtual void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                        const RpcCtxMap &recv_varname_to_ctx,
                        Scope *recv_scope) {}

  static Communicator *GetInstance() { return communicator_.get(); }
Q
Qiao Longfei 已提交
219

T
tangwei12 已提交
220 221 222
  static std::shared_ptr<Communicator> GetInstantcePtr() {
    return communicator_;
  }
223

224
  template <typename T>
225 226 227 228 229
  static Communicator *InitInstance(
      const RpcCtxMap &send_ctx, const RpcCtxMap &recv_ctx, Scope *recv_scope,
      const std::map<std::string, std::string> &envs) {
    std::call_once(init_flag_, &Communicator::InitWithRpcCtx<T>, send_ctx,
                   recv_ctx, recv_scope, std::ref(envs));
230 231 232
    return communicator_.get();
  }

233
  // Init is called by InitInstance.
T
tangwei12 已提交
234
  template <typename T>
235 236 237
  static void InitWithRpcCtx(const RpcCtxMap &send_ctx,
                             const RpcCtxMap &recv_ctx, Scope *recv_scope,
                             const std::map<std::string, std::string> &envs) {
T
tangwei12 已提交
238
    if (communicator_.get() == nullptr) {
239
      communicator_.reset(new T(std::ref(envs)));
240 241
      communicator_->InitEnvs();
      communicator_->InitImpl(send_ctx, recv_ctx, recv_scope);
T
tangwei12 已提交
242 243 244 245 246
    }
  }

 protected:
  bool running_ = false;
247
  bool waiting_ = true;
T
tangwei12 已提交
248 249
  static std::shared_ptr<Communicator> communicator_;
  static std::once_flag init_flag_;
250
  std::unordered_map<std::string, std::string> envs;
T
tangwei12 已提交
251 252 253 254
};

class AsyncCommunicator : public Communicator {
 public:
1
123malin 已提交
255
  AsyncCommunicator() : Communicator() {}
256 257 258 259 260 261 262

  explicit AsyncCommunicator(const std::map<std::string, std::string> &envs)
      : Communicator(envs) {}

  ~AsyncCommunicator();

  void InitEnvs() {
263 264 265 266 267 268
    min_send_grad_num_before_recv_ =
        std::stoi(envs.at("communicator_min_send_grad_num_before_recv"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
269 270
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
271
    VLOG(0) << "AsyncCommunicator Initialized";
272
  }
273

T
tangwei12 已提交
274
  void Start() override;
275

T
tangwei12 已提交
276 277
  void Stop() override;

278 279 280
  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;
T
tangwei12 已提交
281

282
  void MainThread();
T
tangwei12 已提交
283

284 285 286
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
T
tangwei12 已提交
287

288
  virtual void SendByCommunicator(int batches);
Q
Qiao Longfei 已提交
289

290
  virtual void SendGlobalStep(int batches);
291

292 293 294 295 296 297 298 299 300 301 302 303 304
  virtual void RecvByCommunicator();

  virtual void RecvNoBarrier();

  virtual int Meet();

  virtual void BarrierSend() {}

  virtual void BarrierRecv() {}

  virtual void BarrierWeakUp() {}

 protected:
305 306 307 308 309
  int min_send_grad_num_before_recv_;
  int thread_pool_size_;
  int max_merge_var_num_;
  int send_wait_times_;
  int send_queue_size_;
310 311
  int trainer_id_ = 0;
  bool need_global_step_ = false;
312

Q
Qiao Longfei 已提交
313 314 315
  std::unordered_map<std::string,
                     std::shared_ptr<BlockingQueue<std::shared_ptr<Variable>>>>
      send_varname_to_queue_;
Q
Qiao Longfei 已提交
316 317
  RpcCtxMap send_varname_to_ctx_;
  RpcCtxMap recv_varname_to_ctx_;
318 319
  std::unique_ptr<std::thread> main_thread_{nullptr};
  Scope *recv_scope_;                  // should be global scope
Q
Qiao Longfei 已提交
320
  std::unique_ptr<Scope> send_scope_;  // an independent scope
Q
Qiao Longfei 已提交
321 322
  std::unique_ptr<::ThreadPool> send_threadpool_{nullptr};
  std::unique_ptr<::ThreadPool> recv_threadpool_{nullptr};
323
  std::atomic_uint grad_num_{0};  // the num of gradient sent since last recv
Q
Qiao Longfei 已提交
324 325
};

326
class HalfAsyncCommunicator : public AsyncCommunicator {
327
 public:
328
  HalfAsyncCommunicator() {}
329 330 331 332 333 334 335

  explicit HalfAsyncCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

336 337 338 339
    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
340 341
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));
T
tangwei12 已提交
342
    VLOG(0) << "HalfAsyncCommunicator Initialized";
343
  }
344

345 346
  void Clean() override;

347 348 349
  void Barrier() override;

  void BarrierTriggerDecrement() override;
350

351
  void BarrierTriggerReset(int initial_val) override;
352

353
  int Meet();
354

355
  void BarrierWeakUp();
356

T
tangwei12 已提交
357
 protected:
358 359 360 361 362 363 364
  // mutex for Wait for barrier
  std::mutex barrier_mutex_;
  std::condition_variable barrier_cond_;
  std::atomic<int64_t> barrier_trigger_{0};
  std::atomic<int64_t> barrier_counter_{0};
};

T
tangwei12 已提交
365 366 367
class SyncCommunicator : public HalfAsyncCommunicator {
 public:
  SyncCommunicator() : HalfAsyncCommunicator() {}
368 369 370 371 372 373 374 375 376 377 378 379 380 381

  explicit SyncCommunicator(const std::map<std::string, std::string> &envs)
      : HalfAsyncCommunicator(envs) {}

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));
    send_queue_size_ = std::stoi(envs.at("communicator_send_queue_size"));
    need_global_step_ =
        static_cast<bool>(std::stoi(envs.at("need_global_step")));

T
tangwei12 已提交
382 383 384 385 386
    trainer_id_ = std::stoi(envs.at("trainer_id"));
    auto pserver_strings = envs.at("pserver_endpoints");
    pserver_endpoints_ = paddle::string::Split(pserver_strings, ',');
    VLOG(0) << "SyncCommunicator Initialized";
  }
387

T
tangwei12 已提交
388
  void BarrierSend();
389

T
tangwei12 已提交
390 391 392 393 394 395
  void BarrierRecv();

 private:
  std::vector<std::string> pserver_endpoints_{};
};

396
class GeoCommunicator : public AsyncCommunicator {
397
 public:
398 399 400 401 402 403 404 405 406 407 408 409 410
  GeoCommunicator() : AsyncCommunicator() {}

  explicit GeoCommunicator(const std::map<std::string, std::string> &envs)
      : AsyncCommunicator(envs) {}

  void InitImpl(const RpcCtxMap &send_varname_to_ctx,
                const RpcCtxMap &recv_varname_to_ctx,
                Scope *recv_scope) override;

  void InitEnvs() {
    min_send_grad_num_before_recv_ = 0;

    max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
411
    send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
412 413 414 415 416 417
    thread_pool_size_ = std::stoi(envs.at("communicator_thread_pool_size"));

    send_queue_size_ = max_merge_var_num_;
    trainers_ = std::stoi(envs.at("trainers"));
    sparse_attrs_ = envs.at("sparse_attrs");
    VLOG(0) << "GeoCommunicator Initialized";
418 419
  }

420 421 422
  void Send(const std::vector<std::string> &var_names,
            const std::vector<std::string> &var_tables,
            const framework::Scope &scope) override;
423

424
  void SendByCommunicator(int batches) override;
425

426
  void SendSparse(const std::string &varname, int batches);
427

428
  void SendDense(const std::string &varname);
429

430
  void SendGlobalStep(int batches) override {}
431

432
  void RecvByCommunicator() override;
433

434
  void RecvSparse(const std::string &varname);
435

436
  void RecvDense(const std::string &varname);
C
Chengmo 已提交
437

438
  void Init();
439

440
  void InitSparse();
441

442
  void InitDense(const std::string varname);
443

444 445 446
 private:
  int trainers_;
  std::string sparse_attrs_;
447 448 449 450 451 452 453 454 455 456

  // parameter for delta calc and send
  std::shared_ptr<Scope> delta_scope_;

  // parameter for storage the pserver param after last recv
  std::shared_ptr<Scope> old_scope_;

  // parameter on pserver
  std::shared_ptr<Scope> pserver_scope_;

457 458 459
  std::unordered_map<std::string,
                     std::shared_ptr<BlockingQueue<std::vector<int64_t>>>>
      send_ids_to_queue_;
C
Chengmo 已提交
460

461
  std::unordered_map<std::string, std::shared_ptr<SparseValue>> old_sparses_;
462 463
};

Q
Qiao Longfei 已提交
464 465 466
}  // namespace distributed
}  // namespace operators
}  // namespace paddle