MultiGradientMachine.h 15.6 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Z
zhangjinchao01 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <atomic>

#include "GradientMachine.h"

#include "paddle/utils/Queue.h"
#include "paddle/utils/Locks.h"
#include "hl_gpu.h"

namespace paddle {

class TrainerThread;

typedef Queue<int> PidQueue;
typedef std::unique_ptr<TrainerThread> TrainerThreadPtr;

struct GradBuffer {
33
  /// GradBuffer is used for gathering gradient for GPU parameters
Z
zhangjinchao01 已提交
34 35
  int paramId;

36 37
  /// sem is used to notify that the local gradient merge of the current thread
  /// finished for the current thread.
Z
zhangjinchao01 已提交
38 39
  Semaphore sem;

40 41
  // bufs[mergeIndex]
  std::vector<VectorPtr> bufs;
Z
zhangjinchao01 已提交
42 43 44 45 46 47 48 49 50 51 52 53
};

/**
 *  A MultiGradientMachine is a synchronous GradientMachine which devides
 *  one data batch into several smaller batches and assign each one small batch
 *  to one computint thread for computation. After each thread finishes
 *  computation, it merges result (including output Argument and gradient during
 *  backward()). It basically is the same as single thread gradient machine,
 *  except that it uses multi-thread to do the computation.
 *
 *  It handles GPU and Cpu parameters differently.  In GPU, one computing thread
 *  generally corresponds to one GPU device. Thus, each thread keeps a separate
54 55
 *  copy of the parameter in its own device's memory. In CPU, we only need to
 keep
Z
zhangjinchao01 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68
 *  one copy of the parameters in the main memory. After, each computing thread
 *  computes its own parameter gradient, the update process needs to accumulate
 *  the parameter gradients from all the computing threads, and update the
 *  accumulated parameter gradient to the corresponding parameter value.
 *
 *  Each GPU parameter is assigned to a thread called its main thread. For each
 *  parameter, the accumulation of its gradients and the update of its value
 *  happens in its main thread. The main thread first gather the parameter
 *  gradients from all the computing thread. Then, it performs parameter update.
 *  After a gradient is updated by the main thread, it is scattered to all the
 *  computing thread so that the parameters in all the computing threads are
 *  synchronized. The scatter and gather process are implemented by ring-style
 *  communication. Assume we have N computing threads, its thread ids will be
69 70 71 72
 *  0, 1, ..., N-1. For each parameter, the id of the main thread is specified
 in
 *  paraMainThread_[pid], where pid is the id of the parameter. Each thread i
 only
Z
zhangjinchao01 已提交
73 74
 *  sends data to its partner thread (i - 1) % N. For example, for a parameter
 *  gradient that is computed in thread 4, and its main thread is 2. Its
75 76
 *  traveling process would be 4, 5,..., N-1, 0, 1, 2. In each step, the
 gradient
Z
zhangjinchao01 已提交
77 78 79
 *  buffer is added to the local gradient, and the local gradient is then copied
 *  to the gradient buffer of the next thread. At last, its main thread 2 will
 *  get the accumulated parameter gradient. For the same parameter, after its
80 81 82 83
 *  value is updated, the value's traveling process would be 2, 1, 0, N-1, ...
 3.
 *  At the end, all the computing threads would have the updated parameter
 value.
Z
zhangjinchao01 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
 *
 *  A computing thread (TrainerThread) uses 4 threads to do different jobs:
 *
 *  1. computeThread(): performing forward(), backward(), prefetch().
 *
 *  2. valueDispatchThread(): copying parameter values to partner thread.
 *
 *  3. copyGradToBufferThread(): copying parameter gradient to partner thread.
 *
 *  4. gradCollectThread(): merging the gradient from step 3 with local gradient
 *     and call the callback supplied by the user to update parameter value.
 *
 *  CPU parameter value has only one copy. And their gradients are merged at the
 *  end of backward().
 *
 *  * Handling of sparse update
 *  Currently, sparse update is only supported for CPU parameters.

102 103 104 105
 *  Sparse updates refers to gradient caculation where the gradient is sparse.
 For
 *  example, if the input argument to a 'fc' layer is sparse, the gradient of
 the
Z
zhangjinchao01 已提交
106 107 108 109 110 111 112 113
 *  weight matrix of this layer will be sparse. It is usually more efficient to
 *  treat the gradient explicitly as sparse vector during the parameter update.

 *  There are two types of sparse updates called local sparse update and remote
 *  sparse update.

 *  For both types of sparse updates, there is one copy of parameter value and
 *  gradient called main parameter value and gradient, and there is a copy of
114 115
 *  parameter value and gradient for each computing thread called slave
 parameter
Z
zhangjinchao01 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
 *  value and gradient. The slave parameter values are always shared with the
 *  corresponding main parameter value. The slave parameter grad is a sparse row
 *  matrix. The sparse pattern for slave parameter grads are different, because
 *  the small batches for each computing thread might have different sparsity
 *  pattern.

 *  1. Local sparse update
 *
 *     Main parameter value type is MAT_NORMAL. It is a dense matrix.
 *
 *     Main parameter grad type is MAT_SPARSE_ROW_IDS (SparseRowIdsCpuMatrix)
 *     It is also a dense matrix, but the updated values are specified by IDS.
 *
 *     Slave parameter value shares with main parameter value.
 *
 *     Slave parameter grad type is MAT_SPARSE_ROW_AUTO_GROW
 *     (SparseAutoGrowRowCpuMatrix). It is a sparse row matrix.
 *
 *     During backward() of each TrainerThread, SparseAutoGrowRowCpuMatrix will
135 136
 *     gather all the non-zero gradient. And After backward(), they will be
 merged
Z
zhangjinchao01 已提交
137 138 139 140 141 142 143 144 145 146 147
 *     into main parameter grad (SparseRowIdsCpuMatrix), with indices indicating
 *     which rows have nonzero gradient.
 *
 *  2. Remote sparse update
 *
 *     Main parameter value type is MAT_SPARSE_ROW_PREFETCH(_FULL_SIZE)
 *     (SparsePrefetchRowCpuMatrix). MAT_SPARSE_ROW_PREFETCH is a sparse matrix.
 *     MAT_SPARSE_ROW_PREFETCH_FULL_SIZE is a dense matrix. However, only the
 *     parameter values that are prefetched is up-to-date.
 *
 *     Main parameter grad type is MAT_SPARSE_ROW (SparseRowCpuMatrix).
148 149
 *     And it shares sparse pattern with value by sharing indexDictHandle_,
 which
Z
zhangjinchao01 已提交
150
 *     is an internal data structure used by SparseRowCpuMatrixto specify the
151 152
 *     sparsity pattern of Slave parameter value shares with main parameter
 value.
Z
zhangjinchao01 已提交
153 154 155 156 157 158 159 160 161
 *
 *     Slave parameter grad type is MAT_SPARSE_ROW_AUTO_GROW
 *     (SparsePrefetchRowCpuMatrix). It is a sparse row matrix
 *
 *     During prefetch(), all the layers will indicates which rows of each
 *     parameter are needed. Then the framework will retrieve those rows from
 *     parameter server.
 *
 *     During backward() of each TrainerThread, SparseAutoGrowRowCpuMatrix will
162 163 164 165
 *     gather all the non-zero gradient. And After backward(), they will be
 merged
 *     into main parameter grad (SparseRowCpuMatrix). And the framework will
 send
Z
zhangjinchao01 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
 *     the merged gradient to parameter server.
 */
class MultiGradientMachine : public GradientMachine {
public:
  enum TaskType {
    TASK_FORWARD_BACKWARD = 0,
    TASK_FORWARD = 1,
    TASK_BACKWARD = 2,
    TASK_COPY_IN_ARGS = 3,
  };

  explicit MultiGradientMachine(const ModelConfig& config, bool useGpu);

  virtual void prefetch(const std::vector<Argument>& inArgs);

181 182 183
  virtual void forward(const std::vector<Argument>& inArgs,
                       std::vector<Argument>* outArgs,
                       PassType passType);
Z
zhangjinchao01 已提交
184 185 186

  virtual void backward(const UpdateCallback& callback = nullptr);

187 188 189 190
  void forwardBackward(const std::vector<Argument>& inArgs,
                       std::vector<Argument>* outArgs,
                       PassType passType,
                       const UpdateCallback& callback);
Z
zhangjinchao01 已提交
191 192 193 194 195 196 197 198 199

  virtual void onPassEnd();

  virtual void finish();

  virtual Evaluator* makeEvaluator();

  virtual void eval(Evaluator* evaluator);

200
  bool useGpu() const { return useGpu_; }
Z
zhangjinchao01 已提交
201

202
  /// @return whether to pass the gradients in outArgs_ to each threads.
Z
zhangjinchao01 已提交
203 204
  bool isPassGrad() { return isPassGrad_; }

205
  /// @brief set whether to pass the gradient in outArgs_ to each threads.
Z
zhangjinchao01 已提交
206 207
  void setPassGrad(bool isPass) { isPassGrad_ = isPass; }

208 209
  /// Set the gradients of the outputs.
  /// The gradietns will be copied to each thread in the computing threads.
Z
zhangjinchao01 已提交
210 211 212 213 214
  virtual void setOutputGrad(const std::vector<Argument>& args);

protected:
  friend class TrainerThread;

215
  std::vector<TrainerThreadPtr>& getAllThreads() { return threads_; }
216 217
  /// Calculate the real device id based on the logical device id and the
  /// thread id.
Z
zhangjinchao01 已提交
218 219 220 221 222 223 224 225
  int logicalDeviceId2RealDeviceId(int logicalId, int threadId = 0) const {
    if (logicalId == -1) {
      logicalId = 0;
    }
    return mod(logicalId + FLAGS_gpu_id + threadId * numLogicalDevices_,
               numDevices_);
  }

226 227
  /// Calculate the logical device id based on the real device id and the
  /// thread id.
Z
zhangjinchao01 已提交
228 229 230 231 232 233 234 235 236 237 238
  int realDeviceId2LogicalDeviceId(int realId, int threadId = 0) const {
    if (realId == -1) {
      return 0;
    } else {
      return mod(realId - FLAGS_gpu_id - threadId * numLogicalDevices_,
                 numDevices_);
    }
  }

  std::vector<const std::vector<ParameterPtr>*> getSlaveParameters();

239
  bool hasNonstaticCpuParamters() const { return hasNonstaticCpuParamters_; }
Z
zhangjinchao01 已提交
240

241
  /// Called TrainerThread to wait before merging CPU parameter gradients.
Z
zhangjinchao01 已提交
242 243
  void waitBeforeMerge() { trainerBarrier_.wait(); }

244 245
  /// called by MultiGradientMachine and TrainerThread to wait after merging
  /// CPU parameter graidents.
Z
zhangjinchao01 已提交
246 247
  void waitAfterMerge() { allBarrier_.wait(); }

248 249
  /// called by MultiGradientMachine and TrainerThread to wait for copyInArgs()
  /// finishing
Z
zhangjinchao01 已提交
250 251
  void waitForCopyInArgs() { allBarrier_.wait(); }

252
  TrainerThreadPtr& getThread(int threadId) { return threads_[threadId]; }
Z
zhangjinchao01 已提交
253 254 255 256 257

  std::vector<GradBuffer>& getGradBuf(int threadId) {
    return gradBufs_[threadId];
  }

258
  PassType getPassType() const { return passType_; }
Z
zhangjinchao01 已提交
259

260 261
  /// Called by TrainerThread to notify MultiGradientMachine that the gradient
  /// for paramId is ready
Z
zhangjinchao01 已提交
262 263
  void notifyGradientTransfer(int paramId);

264
  const std::vector<Argument>& getInArgs() { return inArgs_; }
Z
zhangjinchao01 已提交
265

266
  TaskType getTaskType() const { return taskType_; }
Z
zhangjinchao01 已提交
267 268 269 270 271

  const UpdateCallback& getBackwardCallback() const {
    return backwardCallback_;
  }

272
  int getNumDevices() const { return numDevices_; }
Z
zhangjinchao01 已提交
273

274
  int getNumLogicalDevices() const { return numLogicalDevices_; }
Z
zhangjinchao01 已提交
275

276
  int getNumThreads() const { return numThreads_; }
Z
zhangjinchao01 已提交
277

278
  int paraMainThread(int pid) const { return paraMainThread_[pid]; }
Z
zhangjinchao01 已提交
279 280

protected:
281 282 283 284
  virtual void forwardImp(const std::vector<Argument>& inArgs,
                          std::vector<Argument>* outArgs,
                          PassType passType,
                          TaskType taskType);
Z
zhangjinchao01 已提交
285

286
  virtual void backwardImp(const UpdateCallback& callback = NULL);
Z
zhangjinchao01 已提交
287

288
  /// update all parameters
Z
zhangjinchao01 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301
  void updateThreadParameters();

  void startTask(TaskType taskType);

  void getOutArgs(std::vector<Argument>* outArgs, PassType passType);

  void allocGradBufs();

protected:
  bool useGpu_;

  bool hasNonstaticCpuParamters_;

302
  /// store main parameter only
Z
zhangjinchao01 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316
  std::unique_ptr<GradientMachine> gradientMachine_;

  std::vector<TrainerThreadPtr> threads_;
  std::vector<int> paraMainThread_;
  std::vector<std::vector<GradBuffer>> gradBufs_;  // [threadId][deviceId]
  std::vector<size_t> bufferSizes_;

  PassType passType_;
  TaskType taskType_;
  PidQueue gradQueue_;
  std::vector<Argument> inArgs_;
  std::vector<Argument> outArgs_;
  hl_stream_t outArgStream_;

317
  /// ParameterType which needs to be merged from each GPU
Z
zhangjinchao01 已提交
318
  std::vector<ParameterType> mergeTypes_;
319
  int numDevices_;         /* number of gpu devices */
Z
zhangjinchao01 已提交
320
  int numLogicalDevices_;  // number of GPU used by one NN
321
  int numThreads_;         /* number of train threads */
Z
zhangjinchao01 已提交
322 323 324

  UpdateCallback backwardCallback_;

325
  /// barrrier for threads_
Z
zhangjinchao01 已提交
326 327
  ThreadBarrier trainerBarrier_;

328
  /// barrier for both MultiGradientMachine and threds_
Z
zhangjinchao01 已提交
329 330
  ThreadBarrier allBarrier_;

331
  /// indicate whether inArgs is copied before forward()
Z
zhangjinchao01 已提交
332 333
  bool inArgsCopied_;

334
  /// Whether to copy the gradient back from an external input.
Z
zhangjinchao01 已提交
335 336 337 338 339
  bool isPassGrad_;
};

class TrainerThread {
public:
340 341 342
  TrainerThread(const ModelConfig& config,
                int threadId,
                MultiGradientMachine* multiMachine);
Z
zhangjinchao01 已提交
343 344 345 346 347

  ~TrainerThread();

  void start();

348
  void onPassEnd() { gradientMachine_->onPassEnd(); }
Z
zhangjinchao01 已提交
349

350
  void waitOutArgsReady() { outArgsReadySem_.wait(); }
Z
zhangjinchao01 已提交
351

352
  void notifyTaskReady() { taskReadySem_.post(); }
Z
zhangjinchao01 已提交
353

354
  int getDeviceId() const { return deviceId_; }
Z
zhangjinchao01 已提交
355

356
  GradientMachine* getGradientMachine() { return gradientMachine_.get(); }
Z
zhangjinchao01 已提交
357

358
  const std::vector<ParameterPtr>& getParameters() { return parameters_; }
Z
zhangjinchao01 已提交
359 360 361 362 363 364 365 366 367

  void stop();

  void notifyValueReady(int paramId);

  const VectorPtr& getValueBuf(int paramId) {
    return parameters_[paramId]->getBuf(PARAMETER_VALUE);
  }

368
  const std::vector<Argument>& getOutArgs() { return outArgs_; }
Z
zhangjinchao01 已提交
369 370 371 372 373 374

  void incUpdateCounter(int n = 1) {
    updateCounter_ += n;
    parameterUpdated_ = true;
  }

375
  void notifyGradientCollect(int paramId) { gradQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
376

377
  void notifyCopyGradToBuffer(int paramId) { gradBufQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
378

379
  void notifyValueDispatch(int paramId) { valueReadyQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
380 381 382

  void prefetch();

383
  /// copy the output gradient from the main GradientMachine.
Z
zhangjinchao01 已提交
384 385 386 387 388 389
  void copyOutputGrad();

protected:
  void mergeCpuGradients();

  void mergeGradSparse(
390 391
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
392 393

  void mergeGradSparseRemote(
394 395
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
396 397

  void mergeGradDense(
398 399
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
400 401 402 403 404 405 406 407 408 409 410

  void computeThread();
  void valueDispatchThread();
  void copyGradToBufferThread();
  void gradCollectThread();

  void copyInArgs();
  void forward();
  void backward();
  void backwardCallback(Parameter* para);

411 412
  /// call the actuall callback supplied by the caller of
  /// GradientMachine::backward
Z
zhangjinchao01 已提交
413 414 415 416 417
  void doCallback(int pid);

protected:
  MultiGradientMachine* multiMachine_;
  ModelConfig config_;
418 419 420 421 422 423
  /// whether the thread should stop
  bool stopping_;
  /// the threads form which to collect gradient
  int partnerId_;
  /// from 0 to threads-1
  int threadId_;
Z
zhangjinchao01 已提交
424 425 426 427
  int deviceId_;
  std::unique_ptr<GradientMachine> gradientMachine_;
  std::vector<ParameterPtr> parameters_;

428
  /// ParameterType which needs to be merged from each GPU
Z
zhangjinchao01 已提交
429 430
  std::vector<ParameterType> mergeTypes_;

431 432
  /// compute thread
  std::unique_ptr<std::thread> computeThread_;
Z
zhangjinchao01 已提交
433 434 435 436 437
  std::vector<Argument> inArgs_;
  std::vector<Argument> outArgs_;
  Semaphore taskReadySem_;
  Semaphore outArgsReadySem_;

438 439 440 441
  /// copy thread
  std::unique_ptr<std::thread> copyThread_;
  /// queue of gradient needs to be copied to partner
  PidQueue gradBufQueue_;
Z
zhangjinchao01 已提交
442 443
  hl_stream_t gradStream_;

444 445 446 447
  /// grad merge thread
  std::unique_ptr<std::thread> gradCollectThread_;
  /// queue of gradient needs to be merged with gradient coopied by
  /// copyGradToBufferThread
Z
zhangjinchao01 已提交
448 449 450
  PidQueue gradQueue_;
  UpdateCallback backwardCallback_;

451 452 453
  /// value dispatch thread
  std::unique_ptr<std::thread> valueDispatchThread_;
  /// queue of the parameter whose the vale are ready for copy
Z
zhangjinchao01 已提交
454 455
  PidQueue valueReadyQueue_;

456
  /// used to notify all the parameter values are ready
Z
zhangjinchao01 已提交
457 458 459
  LockedCondition valueReadyCond_;

  hl_stream_t valueStream_;
460 461
  /// how many parameters are updated
  std::atomic<int> updateCounter_;
Z
zhangjinchao01 已提交
462 463
  bool parameterUpdated_;

464
  /// indicate whether inArgs is copied before forward()
Z
zhangjinchao01 已提交
465 466 467 468
  bool inArgsCopied_;
};

}  // namespace paddle