MultiGradientMachine.h 15.8 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Z
zhangjinchao01 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <atomic>

#include "GradientMachine.h"

#include "hl_gpu.h"
Y
Yu Yang 已提交
22 23
#include "paddle/utils/Locks.h"
#include "paddle/utils/Queue.h"
Z
zhangjinchao01 已提交
24 25 26 27 28 29 30 31 32

namespace paddle {

class TrainerThread;

typedef Queue<int> PidQueue;
typedef std::unique_ptr<TrainerThread> TrainerThreadPtr;

struct GradBuffer {
33
  /// GradBuffer is used for gathering gradient for GPU parameters
Z
zhangjinchao01 已提交
34 35
  int paramId;

36 37
  /// sem is used to notify that the local gradient merge of the current thread
  /// finished for the current thread.
Z
zhangjinchao01 已提交
38 39
  Semaphore sem;

40 41
  // bufs[mergeIndex]
  std::vector<VectorPtr> bufs;
Z
zhangjinchao01 已提交
42 43 44 45 46 47 48 49 50 51 52 53
};

/**
 *  A MultiGradientMachine is a synchronous GradientMachine which devides
 *  one data batch into several smaller batches and assign each one small batch
 *  to one computint thread for computation. After each thread finishes
 *  computation, it merges result (including output Argument and gradient during
 *  backward()). It basically is the same as single thread gradient machine,
 *  except that it uses multi-thread to do the computation.
 *
 *  It handles GPU and Cpu parameters differently.  In GPU, one computing thread
 *  generally corresponds to one GPU device. Thus, each thread keeps a separate
54 55
 *  copy of the parameter in its own device's memory. In CPU, we only need to
 keep
Z
zhangjinchao01 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68
 *  one copy of the parameters in the main memory. After, each computing thread
 *  computes its own parameter gradient, the update process needs to accumulate
 *  the parameter gradients from all the computing threads, and update the
 *  accumulated parameter gradient to the corresponding parameter value.
 *
 *  Each GPU parameter is assigned to a thread called its main thread. For each
 *  parameter, the accumulation of its gradients and the update of its value
 *  happens in its main thread. The main thread first gather the parameter
 *  gradients from all the computing thread. Then, it performs parameter update.
 *  After a gradient is updated by the main thread, it is scattered to all the
 *  computing thread so that the parameters in all the computing threads are
 *  synchronized. The scatter and gather process are implemented by ring-style
 *  communication. Assume we have N computing threads, its thread ids will be
69 70 71 72
 *  0, 1, ..., N-1. For each parameter, the id of the main thread is specified
 in
 *  paraMainThread_[pid], where pid is the id of the parameter. Each thread i
 only
Z
zhangjinchao01 已提交
73 74
 *  sends data to its partner thread (i - 1) % N. For example, for a parameter
 *  gradient that is computed in thread 4, and its main thread is 2. Its
75 76
 *  traveling process would be 4, 5,..., N-1, 0, 1, 2. In each step, the
 gradient
Z
zhangjinchao01 已提交
77 78 79
 *  buffer is added to the local gradient, and the local gradient is then copied
 *  to the gradient buffer of the next thread. At last, its main thread 2 will
 *  get the accumulated parameter gradient. For the same parameter, after its
80 81 82 83
 *  value is updated, the value's traveling process would be 2, 1, 0, N-1, ...
 3.
 *  At the end, all the computing threads would have the updated parameter
 value.
Z
zhangjinchao01 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
 *
 *  A computing thread (TrainerThread) uses 4 threads to do different jobs:
 *
 *  1. computeThread(): performing forward(), backward(), prefetch().
 *
 *  2. valueDispatchThread(): copying parameter values to partner thread.
 *
 *  3. copyGradToBufferThread(): copying parameter gradient to partner thread.
 *
 *  4. gradCollectThread(): merging the gradient from step 3 with local gradient
 *     and call the callback supplied by the user to update parameter value.
 *
 *  CPU parameter value has only one copy. And their gradients are merged at the
 *  end of backward().
 *
 *  * Handling of sparse update
 *  Currently, sparse update is only supported for CPU parameters.

102 103 104 105
 *  Sparse updates refers to gradient caculation where the gradient is sparse.
 For
 *  example, if the input argument to a 'fc' layer is sparse, the gradient of
 the
Z
zhangjinchao01 已提交
106 107 108 109 110 111 112 113
 *  weight matrix of this layer will be sparse. It is usually more efficient to
 *  treat the gradient explicitly as sparse vector during the parameter update.

 *  There are two types of sparse updates called local sparse update and remote
 *  sparse update.

 *  For both types of sparse updates, there is one copy of parameter value and
 *  gradient called main parameter value and gradient, and there is a copy of
114 115
 *  parameter value and gradient for each computing thread called slave
 parameter
Z
zhangjinchao01 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
 *  value and gradient. The slave parameter values are always shared with the
 *  corresponding main parameter value. The slave parameter grad is a sparse row
 *  matrix. The sparse pattern for slave parameter grads are different, because
 *  the small batches for each computing thread might have different sparsity
 *  pattern.

 *  1. Local sparse update
 *
 *     Main parameter value type is MAT_NORMAL. It is a dense matrix.
 *
 *     Main parameter grad type is MAT_SPARSE_ROW_IDS (SparseRowIdsCpuMatrix)
 *     It is also a dense matrix, but the updated values are specified by IDS.
 *
 *     Slave parameter value shares with main parameter value.
 *
 *     Slave parameter grad type is MAT_SPARSE_ROW_AUTO_GROW
 *     (SparseAutoGrowRowCpuMatrix). It is a sparse row matrix.
 *
 *     During backward() of each TrainerThread, SparseAutoGrowRowCpuMatrix will
135 136
 *     gather all the non-zero gradient. And After backward(), they will be
 merged
Z
zhangjinchao01 已提交
137 138 139 140 141 142 143 144 145 146 147
 *     into main parameter grad (SparseRowIdsCpuMatrix), with indices indicating
 *     which rows have nonzero gradient.
 *
 *  2. Remote sparse update
 *
 *     Main parameter value type is MAT_SPARSE_ROW_PREFETCH(_FULL_SIZE)
 *     (SparsePrefetchRowCpuMatrix). MAT_SPARSE_ROW_PREFETCH is a sparse matrix.
 *     MAT_SPARSE_ROW_PREFETCH_FULL_SIZE is a dense matrix. However, only the
 *     parameter values that are prefetched is up-to-date.
 *
 *     Main parameter grad type is MAT_SPARSE_ROW (SparseRowCpuMatrix).
148 149
 *     And it shares sparse pattern with value by sharing indexDictHandle_,
 which
Z
zhangjinchao01 已提交
150
 *     is an internal data structure used by SparseRowCpuMatrixto specify the
151 152
 *     sparsity pattern of Slave parameter value shares with main parameter
 value.
Z
zhangjinchao01 已提交
153 154 155 156 157 158 159 160 161
 *
 *     Slave parameter grad type is MAT_SPARSE_ROW_AUTO_GROW
 *     (SparsePrefetchRowCpuMatrix). It is a sparse row matrix
 *
 *     During prefetch(), all the layers will indicates which rows of each
 *     parameter are needed. Then the framework will retrieve those rows from
 *     parameter server.
 *
 *     During backward() of each TrainerThread, SparseAutoGrowRowCpuMatrix will
162 163 164 165
 *     gather all the non-zero gradient. And After backward(), they will be
 merged
 *     into main parameter grad (SparseRowCpuMatrix). And the framework will
 send
Z
zhangjinchao01 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178
 *     the merged gradient to parameter server.
 */
class MultiGradientMachine : public GradientMachine {
public:
  enum TaskType {
    TASK_FORWARD_BACKWARD = 0,
    TASK_FORWARD = 1,
    TASK_BACKWARD = 2,
    TASK_COPY_IN_ARGS = 3,
  };

  explicit MultiGradientMachine(const ModelConfig& config, bool useGpu);

179 180 181
  virtual void start();

  virtual void finish();
182

Z
zhangjinchao01 已提交
183 184
  virtual void prefetch(const std::vector<Argument>& inArgs);

185 186 187
  virtual void forward(const std::vector<Argument>& inArgs,
                       std::vector<Argument>* outArgs,
                       PassType passType);
Z
zhangjinchao01 已提交
188 189 190

  virtual void backward(const UpdateCallback& callback = nullptr);

191 192 193 194
  void forwardBackward(const std::vector<Argument>& inArgs,
                       std::vector<Argument>* outArgs,
                       PassType passType,
                       const UpdateCallback& callback);
Z
zhangjinchao01 已提交
195

L
liaogang 已提交
196
  virtual Argument getLayerOutput(const std::string& layerName);
197

Z
zhangjinchao01 已提交
198 199
  virtual void onPassEnd();

Y
Yu Yang 已提交
200
  virtual Evaluator* makeEvaluator() const;
Z
zhangjinchao01 已提交
201

Y
Yu Yang 已提交
202
  virtual void eval(Evaluator* evaluator) const;
Z
zhangjinchao01 已提交
203

204
  bool useGpu() const { return useGpu_; }
Z
zhangjinchao01 已提交
205

206
  /// @return whether to pass the gradients in outArgs_ to each threads.
Z
zhangjinchao01 已提交
207 208
  bool isPassGrad() { return isPassGrad_; }

209
  /// @brief set whether to pass the gradient in outArgs_ to each threads.
Z
zhangjinchao01 已提交
210 211
  void setPassGrad(bool isPass) { isPassGrad_ = isPass; }

212 213
  /// Set the gradients of the outputs.
  /// The gradietns will be copied to each thread in the computing threads.
Z
zhangjinchao01 已提交
214 215 216 217 218
  virtual void setOutputGrad(const std::vector<Argument>& args);

protected:
  friend class TrainerThread;

219
  std::vector<TrainerThreadPtr>& getAllThreads() { return threads_; }
220 221
  /// Calculate the real device id based on the logical device id and the
  /// thread id.
Z
zhangjinchao01 已提交
222 223 224 225 226 227 228 229
  int logicalDeviceId2RealDeviceId(int logicalId, int threadId = 0) const {
    if (logicalId == -1) {
      logicalId = 0;
    }
    return mod(logicalId + FLAGS_gpu_id + threadId * numLogicalDevices_,
               numDevices_);
  }

230 231
  /// Calculate the logical device id based on the real device id and the
  /// thread id.
Z
zhangjinchao01 已提交
232 233 234 235 236 237 238 239 240 241 242
  int realDeviceId2LogicalDeviceId(int realId, int threadId = 0) const {
    if (realId == -1) {
      return 0;
    } else {
      return mod(realId - FLAGS_gpu_id - threadId * numLogicalDevices_,
                 numDevices_);
    }
  }

  std::vector<const std::vector<ParameterPtr>*> getSlaveParameters();

243
  bool hasNonstaticCpuParamters() const { return hasNonstaticCpuParamters_; }
Z
zhangjinchao01 已提交
244

245
  /// Called TrainerThread to wait before merging CPU parameter gradients.
Z
zhangjinchao01 已提交
246 247
  void waitBeforeMerge() { trainerBarrier_.wait(); }

248 249
  /// called by MultiGradientMachine and TrainerThread to wait after merging
  /// CPU parameter graidents.
Z
zhangjinchao01 已提交
250 251
  void waitAfterMerge() { allBarrier_.wait(); }

252 253
  /// called by MultiGradientMachine and TrainerThread to wait for copyInArgs()
  /// finishing
Z
zhangjinchao01 已提交
254 255
  void waitForCopyInArgs() { allBarrier_.wait(); }

256
  TrainerThreadPtr& getThread(int threadId) { return threads_[threadId]; }
Z
zhangjinchao01 已提交
257 258 259 260 261

  std::vector<GradBuffer>& getGradBuf(int threadId) {
    return gradBufs_[threadId];
  }

262
  PassType getPassType() const { return passType_; }
Z
zhangjinchao01 已提交
263

264 265
  /// Called by TrainerThread to notify MultiGradientMachine that the gradient
  /// for paramId is ready
Z
zhangjinchao01 已提交
266 267
  void notifyGradientTransfer(int paramId);

268
  const std::vector<Argument>& getInArgs() { return inArgs_; }
Z
zhangjinchao01 已提交
269

270
  TaskType getTaskType() const { return taskType_; }
Z
zhangjinchao01 已提交
271 272 273 274 275

  const UpdateCallback& getBackwardCallback() const {
    return backwardCallback_;
  }

276
  int getNumDevices() const { return numDevices_; }
Z
zhangjinchao01 已提交
277

278
  int getNumLogicalDevices() const { return numLogicalDevices_; }
Z
zhangjinchao01 已提交
279

280
  int getNumThreads() const { return numThreads_; }
Z
zhangjinchao01 已提交
281

282
  int paraMainThread(int pid) const { return paraMainThread_[pid]; }
Z
zhangjinchao01 已提交
283 284

protected:
285 286 287 288
  virtual void forwardImp(const std::vector<Argument>& inArgs,
                          std::vector<Argument>* outArgs,
                          PassType passType,
                          TaskType taskType);
Z
zhangjinchao01 已提交
289

290
  virtual void backwardImp(const UpdateCallback& callback = NULL);
Z
zhangjinchao01 已提交
291

292
  /// update all parameters
Z
zhangjinchao01 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305
  void updateThreadParameters();

  void startTask(TaskType taskType);

  void getOutArgs(std::vector<Argument>* outArgs, PassType passType);

  void allocGradBufs();

protected:
  bool useGpu_;

  bool hasNonstaticCpuParamters_;

306
  /// store main parameter only
Z
zhangjinchao01 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320
  std::unique_ptr<GradientMachine> gradientMachine_;

  std::vector<TrainerThreadPtr> threads_;
  std::vector<int> paraMainThread_;
  std::vector<std::vector<GradBuffer>> gradBufs_;  // [threadId][deviceId]
  std::vector<size_t> bufferSizes_;

  PassType passType_;
  TaskType taskType_;
  PidQueue gradQueue_;
  std::vector<Argument> inArgs_;
  std::vector<Argument> outArgs_;
  hl_stream_t outArgStream_;

L
liaogang 已提交
321 322
  Argument outLayerArgs_;

323
  /// ParameterType which needs to be merged from each GPU
Z
zhangjinchao01 已提交
324
  std::vector<ParameterType> mergeTypes_;
325
  int numDevices_;         /* number of gpu devices */
Z
zhangjinchao01 已提交
326
  int numLogicalDevices_;  // number of GPU used by one NN
327
  int numThreads_;         /* number of train threads */
Z
zhangjinchao01 已提交
328 329 330

  UpdateCallback backwardCallback_;

331
  /// barrrier for threads_
Z
zhangjinchao01 已提交
332 333
  ThreadBarrier trainerBarrier_;

334
  /// barrier for both MultiGradientMachine and threds_
Z
zhangjinchao01 已提交
335 336
  ThreadBarrier allBarrier_;

337
  /// indicate whether inArgs is copied before forward()
Z
zhangjinchao01 已提交
338 339
  bool inArgsCopied_;

340
  /// Whether to copy the gradient back from an external input.
Z
zhangjinchao01 已提交
341 342 343 344 345
  bool isPassGrad_;
};

class TrainerThread {
public:
346 347 348
  TrainerThread(const ModelConfig& config,
                int threadId,
                MultiGradientMachine* multiMachine);
Z
zhangjinchao01 已提交
349 350 351 352 353

  ~TrainerThread();

  void start();

354
  void onPassEnd() { gradientMachine_->onPassEnd(); }
Z
zhangjinchao01 已提交
355

356
  void waitOutArgsReady() { outArgsReadySem_.wait(); }
Z
zhangjinchao01 已提交
357

358
  void notifyTaskReady() { taskReadySem_.post(); }
Z
zhangjinchao01 已提交
359

360
  int getDeviceId() const { return deviceId_; }
Z
zhangjinchao01 已提交
361

362
  GradientMachine* getGradientMachine() { return gradientMachine_.get(); }
Z
zhangjinchao01 已提交
363

364
  const std::vector<ParameterPtr>& getParameters() { return parameters_; }
Z
zhangjinchao01 已提交
365 366 367 368 369 370 371 372 373

  void stop();

  void notifyValueReady(int paramId);

  const VectorPtr& getValueBuf(int paramId) {
    return parameters_[paramId]->getBuf(PARAMETER_VALUE);
  }

374
  const std::vector<Argument>& getOutArgs() { return outArgs_; }
Z
zhangjinchao01 已提交
375 376 377 378 379 380

  void incUpdateCounter(int n = 1) {
    updateCounter_ += n;
    parameterUpdated_ = true;
  }

381
  void notifyGradientCollect(int paramId) { gradQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
382

383
  void notifyCopyGradToBuffer(int paramId) { gradBufQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
384

385
  void notifyValueDispatch(int paramId) { valueReadyQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
386 387 388

  void prefetch();

389
  /// copy the output gradient from the main GradientMachine.
Z
zhangjinchao01 已提交
390 391
  void copyOutputGrad();

H
hedaoyuan 已提交
392 393 394
  /// Whether the thread has input data.
  bool hasInputData() { return batchSize_ != 0; }

Z
zhangjinchao01 已提交
395 396 397 398
protected:
  void mergeCpuGradients();

  void mergeGradSparse(
399 400
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
401 402

  void mergeGradSparseRemote(
403 404
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
405 406

  void mergeGradDense(
407 408
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
409 410 411 412 413 414

  void computeThread();
  void valueDispatchThread();
  void copyGradToBufferThread();
  void gradCollectThread();

H
hedaoyuan 已提交
415
  int copyInArgs();
Z
zhangjinchao01 已提交
416 417 418 419
  void forward();
  void backward();
  void backwardCallback(Parameter* para);

420 421
  /// call the actuall callback supplied by the caller of
  /// GradientMachine::backward
Z
zhangjinchao01 已提交
422 423 424 425 426
  void doCallback(int pid);

protected:
  MultiGradientMachine* multiMachine_;
  ModelConfig config_;
427 428 429 430 431 432
  /// whether the thread should stop
  bool stopping_;
  /// the threads form which to collect gradient
  int partnerId_;
  /// from 0 to threads-1
  int threadId_;
Z
zhangjinchao01 已提交
433 434 435 436
  int deviceId_;
  std::unique_ptr<GradientMachine> gradientMachine_;
  std::vector<ParameterPtr> parameters_;

437
  /// ParameterType which needs to be merged from each GPU
Z
zhangjinchao01 已提交
438 439
  std::vector<ParameterType> mergeTypes_;

440 441
  /// compute thread
  std::unique_ptr<std::thread> computeThread_;
Z
zhangjinchao01 已提交
442 443 444 445 446
  std::vector<Argument> inArgs_;
  std::vector<Argument> outArgs_;
  Semaphore taskReadySem_;
  Semaphore outArgsReadySem_;

447 448 449 450
  /// copy thread
  std::unique_ptr<std::thread> copyThread_;
  /// queue of gradient needs to be copied to partner
  PidQueue gradBufQueue_;
Z
zhangjinchao01 已提交
451 452
  hl_stream_t gradStream_;

453 454 455 456
  /// grad merge thread
  std::unique_ptr<std::thread> gradCollectThread_;
  /// queue of gradient needs to be merged with gradient coopied by
  /// copyGradToBufferThread
Z
zhangjinchao01 已提交
457 458 459
  PidQueue gradQueue_;
  UpdateCallback backwardCallback_;

460 461 462
  /// value dispatch thread
  std::unique_ptr<std::thread> valueDispatchThread_;
  /// queue of the parameter whose the vale are ready for copy
Z
zhangjinchao01 已提交
463 464
  PidQueue valueReadyQueue_;

465
  /// used to notify all the parameter values are ready
Z
zhangjinchao01 已提交
466 467 468
  LockedCondition valueReadyCond_;

  hl_stream_t valueStream_;
469 470
  /// how many parameters are updated
  std::atomic<int> updateCounter_;
Z
zhangjinchao01 已提交
471 472
  bool parameterUpdated_;

473
  /// indicate whether inArgs is copied before forward()
Z
zhangjinchao01 已提交
474
  bool inArgsCopied_;
H
hedaoyuan 已提交
475
  int batchSize_;
Z
zhangjinchao01 已提交
476 477 478
};

}  // namespace paddle