MultiGradientMachine.h 15.8 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Z
zhangjinchao01 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <atomic>

#include "GradientMachine.h"

#include "hl_gpu.h"
Y
Yu Yang 已提交
22 23
#include "paddle/utils/Locks.h"
#include "paddle/utils/Queue.h"
Z
zhangjinchao01 已提交
24 25 26 27 28 29 30 31 32

namespace paddle {

class TrainerThread;

typedef Queue<int> PidQueue;
typedef std::unique_ptr<TrainerThread> TrainerThreadPtr;

struct GradBuffer {
33
  /// GradBuffer is used for gathering gradient for GPU parameters
Z
zhangjinchao01 已提交
34 35
  int paramId;

36 37
  /// sem is used to notify that the local gradient merge of the current thread
  /// finished for the current thread.
Z
zhangjinchao01 已提交
38 39
  Semaphore sem;

40 41
  // bufs[mergeIndex]
  std::vector<VectorPtr> bufs;
Z
zhangjinchao01 已提交
42 43 44 45 46 47 48 49 50 51 52 53
};

/**
 *  A MultiGradientMachine is a synchronous GradientMachine which devides
 *  one data batch into several smaller batches and assign each one small batch
 *  to one computint thread for computation. After each thread finishes
 *  computation, it merges result (including output Argument and gradient during
 *  backward()). It basically is the same as single thread gradient machine,
 *  except that it uses multi-thread to do the computation.
 *
 *  It handles GPU and Cpu parameters differently.  In GPU, one computing thread
 *  generally corresponds to one GPU device. Thus, each thread keeps a separate
54 55
 *  copy of the parameter in its own device's memory. In CPU, we only need to
 keep
Z
zhangjinchao01 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68
 *  one copy of the parameters in the main memory. After, each computing thread
 *  computes its own parameter gradient, the update process needs to accumulate
 *  the parameter gradients from all the computing threads, and update the
 *  accumulated parameter gradient to the corresponding parameter value.
 *
 *  Each GPU parameter is assigned to a thread called its main thread. For each
 *  parameter, the accumulation of its gradients and the update of its value
 *  happens in its main thread. The main thread first gather the parameter
 *  gradients from all the computing thread. Then, it performs parameter update.
 *  After a gradient is updated by the main thread, it is scattered to all the
 *  computing thread so that the parameters in all the computing threads are
 *  synchronized. The scatter and gather process are implemented by ring-style
 *  communication. Assume we have N computing threads, its thread ids will be
69 70 71 72
 *  0, 1, ..., N-1. For each parameter, the id of the main thread is specified
 in
 *  paraMainThread_[pid], where pid is the id of the parameter. Each thread i
 only
Z
zhangjinchao01 已提交
73 74
 *  sends data to its partner thread (i - 1) % N. For example, for a parameter
 *  gradient that is computed in thread 4, and its main thread is 2. Its
75 76
 *  traveling process would be 4, 5,..., N-1, 0, 1, 2. In each step, the
 gradient
Z
zhangjinchao01 已提交
77 78 79
 *  buffer is added to the local gradient, and the local gradient is then copied
 *  to the gradient buffer of the next thread. At last, its main thread 2 will
 *  get the accumulated parameter gradient. For the same parameter, after its
80 81 82 83
 *  value is updated, the value's traveling process would be 2, 1, 0, N-1, ...
 3.
 *  At the end, all the computing threads would have the updated parameter
 value.
Z
zhangjinchao01 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
 *
 *  A computing thread (TrainerThread) uses 4 threads to do different jobs:
 *
 *  1. computeThread(): performing forward(), backward(), prefetch().
 *
 *  2. valueDispatchThread(): copying parameter values to partner thread.
 *
 *  3. copyGradToBufferThread(): copying parameter gradient to partner thread.
 *
 *  4. gradCollectThread(): merging the gradient from step 3 with local gradient
 *     and call the callback supplied by the user to update parameter value.
 *
 *  CPU parameter value has only one copy. And their gradients are merged at the
 *  end of backward().
 *
 *  * Handling of sparse update
 *  Currently, sparse update is only supported for CPU parameters.

102 103 104 105
 *  Sparse updates refers to gradient caculation where the gradient is sparse.
 For
 *  example, if the input argument to a 'fc' layer is sparse, the gradient of
 the
Z
zhangjinchao01 已提交
106 107 108 109 110 111 112 113
 *  weight matrix of this layer will be sparse. It is usually more efficient to
 *  treat the gradient explicitly as sparse vector during the parameter update.

 *  There are two types of sparse updates called local sparse update and remote
 *  sparse update.

 *  For both types of sparse updates, there is one copy of parameter value and
 *  gradient called main parameter value and gradient, and there is a copy of
114 115
 *  parameter value and gradient for each computing thread called slave
 parameter
Z
zhangjinchao01 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
 *  value and gradient. The slave parameter values are always shared with the
 *  corresponding main parameter value. The slave parameter grad is a sparse row
 *  matrix. The sparse pattern for slave parameter grads are different, because
 *  the small batches for each computing thread might have different sparsity
 *  pattern.

 *  1. Local sparse update
 *
 *     Main parameter value type is MAT_NORMAL. It is a dense matrix.
 *
 *     Main parameter grad type is MAT_SPARSE_ROW_IDS (SparseRowIdsCpuMatrix)
 *     It is also a dense matrix, but the updated values are specified by IDS.
 *
 *     Slave parameter value shares with main parameter value.
 *
 *     Slave parameter grad type is MAT_SPARSE_ROW_AUTO_GROW
 *     (SparseAutoGrowRowCpuMatrix). It is a sparse row matrix.
 *
 *     During backward() of each TrainerThread, SparseAutoGrowRowCpuMatrix will
135 136
 *     gather all the non-zero gradient. And After backward(), they will be
 merged
Z
zhangjinchao01 已提交
137 138 139 140 141 142 143 144 145 146 147
 *     into main parameter grad (SparseRowIdsCpuMatrix), with indices indicating
 *     which rows have nonzero gradient.
 *
 *  2. Remote sparse update
 *
 *     Main parameter value type is MAT_SPARSE_ROW_PREFETCH(_FULL_SIZE)
 *     (SparsePrefetchRowCpuMatrix). MAT_SPARSE_ROW_PREFETCH is a sparse matrix.
 *     MAT_SPARSE_ROW_PREFETCH_FULL_SIZE is a dense matrix. However, only the
 *     parameter values that are prefetched is up-to-date.
 *
 *     Main parameter grad type is MAT_SPARSE_ROW (SparseRowCpuMatrix).
148 149
 *     And it shares sparse pattern with value by sharing indexDictHandle_,
 which
Z
zhangjinchao01 已提交
150
 *     is an internal data structure used by SparseRowCpuMatrixto specify the
151 152
 *     sparsity pattern of Slave parameter value shares with main parameter
 value.
Z
zhangjinchao01 已提交
153 154 155 156 157 158 159 160 161
 *
 *     Slave parameter grad type is MAT_SPARSE_ROW_AUTO_GROW
 *     (SparsePrefetchRowCpuMatrix). It is a sparse row matrix
 *
 *     During prefetch(), all the layers will indicates which rows of each
 *     parameter are needed. Then the framework will retrieve those rows from
 *     parameter server.
 *
 *     During backward() of each TrainerThread, SparseAutoGrowRowCpuMatrix will
162 163 164 165
 *     gather all the non-zero gradient. And After backward(), they will be
 merged
 *     into main parameter grad (SparseRowCpuMatrix). And the framework will
 send
Z
zhangjinchao01 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
 *     the merged gradient to parameter server.
 */
class MultiGradientMachine : public GradientMachine {
public:
  enum TaskType {
    TASK_FORWARD_BACKWARD = 0,
    TASK_FORWARD = 1,
    TASK_BACKWARD = 2,
    TASK_COPY_IN_ARGS = 3,
  };

  explicit MultiGradientMachine(const ModelConfig& config, bool useGpu);

  virtual void prefetch(const std::vector<Argument>& inArgs);

181 182 183
  virtual void forward(const std::vector<Argument>& inArgs,
                       std::vector<Argument>* outArgs,
                       PassType passType);
Z
zhangjinchao01 已提交
184 185 186

  virtual void backward(const UpdateCallback& callback = nullptr);

187 188 189 190
  void forwardBackward(const std::vector<Argument>& inArgs,
                       std::vector<Argument>* outArgs,
                       PassType passType,
                       const UpdateCallback& callback);
Z
zhangjinchao01 已提交
191

L
liaogang 已提交
192
  virtual Argument getLayerOutput(const std::string& layerName);
193

Z
zhangjinchao01 已提交
194 195 196 197
  virtual void onPassEnd();

  virtual void finish();

Y
Yu Yang 已提交
198
  virtual Evaluator* makeEvaluator() const;
Z
zhangjinchao01 已提交
199

Y
Yu Yang 已提交
200
  virtual void eval(Evaluator* evaluator) const;
Z
zhangjinchao01 已提交
201

202
  bool useGpu() const { return useGpu_; }
Z
zhangjinchao01 已提交
203

204
  /// @return whether to pass the gradients in outArgs_ to each threads.
Z
zhangjinchao01 已提交
205 206
  bool isPassGrad() { return isPassGrad_; }

207
  /// @brief set whether to pass the gradient in outArgs_ to each threads.
Z
zhangjinchao01 已提交
208 209
  void setPassGrad(bool isPass) { isPassGrad_ = isPass; }

210 211
  /// Set the gradients of the outputs.
  /// The gradietns will be copied to each thread in the computing threads.
Z
zhangjinchao01 已提交
212 213 214 215 216
  virtual void setOutputGrad(const std::vector<Argument>& args);

protected:
  friend class TrainerThread;

217
  std::vector<TrainerThreadPtr>& getAllThreads() { return threads_; }
218 219
  /// Calculate the real device id based on the logical device id and the
  /// thread id.
Z
zhangjinchao01 已提交
220 221 222 223 224 225 226 227
  int logicalDeviceId2RealDeviceId(int logicalId, int threadId = 0) const {
    if (logicalId == -1) {
      logicalId = 0;
    }
    return mod(logicalId + FLAGS_gpu_id + threadId * numLogicalDevices_,
               numDevices_);
  }

228 229
  /// Calculate the logical device id based on the real device id and the
  /// thread id.
Z
zhangjinchao01 已提交
230 231 232 233 234 235 236 237 238 239 240
  int realDeviceId2LogicalDeviceId(int realId, int threadId = 0) const {
    if (realId == -1) {
      return 0;
    } else {
      return mod(realId - FLAGS_gpu_id - threadId * numLogicalDevices_,
                 numDevices_);
    }
  }

  std::vector<const std::vector<ParameterPtr>*> getSlaveParameters();

241
  bool hasNonstaticCpuParamters() const { return hasNonstaticCpuParamters_; }
Z
zhangjinchao01 已提交
242

243
  /// Called TrainerThread to wait before merging CPU parameter gradients.
Z
zhangjinchao01 已提交
244 245
  void waitBeforeMerge() { trainerBarrier_.wait(); }

246 247
  /// called by MultiGradientMachine and TrainerThread to wait after merging
  /// CPU parameter graidents.
Z
zhangjinchao01 已提交
248 249
  void waitAfterMerge() { allBarrier_.wait(); }

250 251
  /// called by MultiGradientMachine and TrainerThread to wait for copyInArgs()
  /// finishing
Z
zhangjinchao01 已提交
252 253
  void waitForCopyInArgs() { allBarrier_.wait(); }

254
  TrainerThreadPtr& getThread(int threadId) { return threads_[threadId]; }
Z
zhangjinchao01 已提交
255 256 257 258 259

  std::vector<GradBuffer>& getGradBuf(int threadId) {
    return gradBufs_[threadId];
  }

260
  PassType getPassType() const { return passType_; }
Z
zhangjinchao01 已提交
261

262 263
  /// Called by TrainerThread to notify MultiGradientMachine that the gradient
  /// for paramId is ready
Z
zhangjinchao01 已提交
264 265
  void notifyGradientTransfer(int paramId);

266
  const std::vector<Argument>& getInArgs() { return inArgs_; }
Z
zhangjinchao01 已提交
267

268
  TaskType getTaskType() const { return taskType_; }
Z
zhangjinchao01 已提交
269 270 271 272 273

  const UpdateCallback& getBackwardCallback() const {
    return backwardCallback_;
  }

274
  int getNumDevices() const { return numDevices_; }
Z
zhangjinchao01 已提交
275

276
  int getNumLogicalDevices() const { return numLogicalDevices_; }
Z
zhangjinchao01 已提交
277

278
  int getNumThreads() const { return numThreads_; }
Z
zhangjinchao01 已提交
279

280
  int paraMainThread(int pid) const { return paraMainThread_[pid]; }
Z
zhangjinchao01 已提交
281 282

protected:
283 284 285 286
  virtual void forwardImp(const std::vector<Argument>& inArgs,
                          std::vector<Argument>* outArgs,
                          PassType passType,
                          TaskType taskType);
Z
zhangjinchao01 已提交
287

288
  virtual void backwardImp(const UpdateCallback& callback = NULL);
Z
zhangjinchao01 已提交
289

290
  /// update all parameters
Z
zhangjinchao01 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303
  void updateThreadParameters();

  void startTask(TaskType taskType);

  void getOutArgs(std::vector<Argument>* outArgs, PassType passType);

  void allocGradBufs();

protected:
  bool useGpu_;

  bool hasNonstaticCpuParamters_;

304
  /// store main parameter only
Z
zhangjinchao01 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318
  std::unique_ptr<GradientMachine> gradientMachine_;

  std::vector<TrainerThreadPtr> threads_;
  std::vector<int> paraMainThread_;
  std::vector<std::vector<GradBuffer>> gradBufs_;  // [threadId][deviceId]
  std::vector<size_t> bufferSizes_;

  PassType passType_;
  TaskType taskType_;
  PidQueue gradQueue_;
  std::vector<Argument> inArgs_;
  std::vector<Argument> outArgs_;
  hl_stream_t outArgStream_;

L
liaogang 已提交
319 320
  Argument outLayerArgs_;

321
  /// ParameterType which needs to be merged from each GPU
Z
zhangjinchao01 已提交
322
  std::vector<ParameterType> mergeTypes_;
323
  int numDevices_;         /* number of gpu devices */
Z
zhangjinchao01 已提交
324
  int numLogicalDevices_;  // number of GPU used by one NN
325
  int numThreads_;         /* number of train threads */
Z
zhangjinchao01 已提交
326 327 328

  UpdateCallback backwardCallback_;

329
  /// barrrier for threads_
Z
zhangjinchao01 已提交
330 331
  ThreadBarrier trainerBarrier_;

332
  /// barrier for both MultiGradientMachine and threds_
Z
zhangjinchao01 已提交
333 334
  ThreadBarrier allBarrier_;

335
  /// indicate whether inArgs is copied before forward()
Z
zhangjinchao01 已提交
336 337
  bool inArgsCopied_;

338
  /// Whether to copy the gradient back from an external input.
Z
zhangjinchao01 已提交
339 340 341 342 343
  bool isPassGrad_;
};

class TrainerThread {
public:
344 345 346
  TrainerThread(const ModelConfig& config,
                int threadId,
                MultiGradientMachine* multiMachine);
Z
zhangjinchao01 已提交
347 348 349 350 351

  ~TrainerThread();

  void start();

352
  void onPassEnd() { gradientMachine_->onPassEnd(); }
Z
zhangjinchao01 已提交
353

354
  void waitOutArgsReady() { outArgsReadySem_.wait(); }
Z
zhangjinchao01 已提交
355

356
  void notifyTaskReady() { taskReadySem_.post(); }
Z
zhangjinchao01 已提交
357

358
  int getDeviceId() const { return deviceId_; }
Z
zhangjinchao01 已提交
359

360
  GradientMachine* getGradientMachine() { return gradientMachine_.get(); }
Z
zhangjinchao01 已提交
361

362
  const std::vector<ParameterPtr>& getParameters() { return parameters_; }
Z
zhangjinchao01 已提交
363 364 365 366 367 368 369 370 371

  void stop();

  void notifyValueReady(int paramId);

  const VectorPtr& getValueBuf(int paramId) {
    return parameters_[paramId]->getBuf(PARAMETER_VALUE);
  }

372
  const std::vector<Argument>& getOutArgs() { return outArgs_; }
Z
zhangjinchao01 已提交
373 374 375 376 377 378

  void incUpdateCounter(int n = 1) {
    updateCounter_ += n;
    parameterUpdated_ = true;
  }

379
  void notifyGradientCollect(int paramId) { gradQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
380

381
  void notifyCopyGradToBuffer(int paramId) { gradBufQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
382

383
  void notifyValueDispatch(int paramId) { valueReadyQueue_.enqueue(paramId); }
Z
zhangjinchao01 已提交
384 385 386

  void prefetch();

387
  /// copy the output gradient from the main GradientMachine.
Z
zhangjinchao01 已提交
388 389
  void copyOutputGrad();

H
hedaoyuan 已提交
390 391 392
  /// Whether the thread has input data.
  bool hasInputData() { return batchSize_ != 0; }

Z
zhangjinchao01 已提交
393 394 395 396
protected:
  void mergeCpuGradients();

  void mergeGradSparse(
397 398
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
399 400

  void mergeGradSparseRemote(
401 402
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
403 404

  void mergeGradDense(
405 406
      Parameter* para,
      std::vector<const std::vector<ParameterPtr>*>& slaveParameters);
Z
zhangjinchao01 已提交
407 408 409 410 411 412

  void computeThread();
  void valueDispatchThread();
  void copyGradToBufferThread();
  void gradCollectThread();

H
hedaoyuan 已提交
413
  int copyInArgs();
Z
zhangjinchao01 已提交
414 415 416 417
  void forward();
  void backward();
  void backwardCallback(Parameter* para);

418 419
  /// call the actuall callback supplied by the caller of
  /// GradientMachine::backward
Z
zhangjinchao01 已提交
420 421 422 423 424
  void doCallback(int pid);

protected:
  MultiGradientMachine* multiMachine_;
  ModelConfig config_;
425 426 427 428 429 430
  /// whether the thread should stop
  bool stopping_;
  /// the threads form which to collect gradient
  int partnerId_;
  /// from 0 to threads-1
  int threadId_;
Z
zhangjinchao01 已提交
431 432 433 434
  int deviceId_;
  std::unique_ptr<GradientMachine> gradientMachine_;
  std::vector<ParameterPtr> parameters_;

435
  /// ParameterType which needs to be merged from each GPU
Z
zhangjinchao01 已提交
436 437
  std::vector<ParameterType> mergeTypes_;

438 439
  /// compute thread
  std::unique_ptr<std::thread> computeThread_;
Z
zhangjinchao01 已提交
440 441 442 443 444
  std::vector<Argument> inArgs_;
  std::vector<Argument> outArgs_;
  Semaphore taskReadySem_;
  Semaphore outArgsReadySem_;

445 446 447 448
  /// copy thread
  std::unique_ptr<std::thread> copyThread_;
  /// queue of gradient needs to be copied to partner
  PidQueue gradBufQueue_;
Z
zhangjinchao01 已提交
449 450
  hl_stream_t gradStream_;

451 452 453 454
  /// grad merge thread
  std::unique_ptr<std::thread> gradCollectThread_;
  /// queue of gradient needs to be merged with gradient coopied by
  /// copyGradToBufferThread
Z
zhangjinchao01 已提交
455 456 457
  PidQueue gradQueue_;
  UpdateCallback backwardCallback_;

458 459 460
  /// value dispatch thread
  std::unique_ptr<std::thread> valueDispatchThread_;
  /// queue of the parameter whose the vale are ready for copy
Z
zhangjinchao01 已提交
461 462
  PidQueue valueReadyQueue_;

463
  /// used to notify all the parameter values are ready
Z
zhangjinchao01 已提交
464 465 466
  LockedCondition valueReadyCond_;

  hl_stream_t valueStream_;
467 468
  /// how many parameters are updated
  std::atomic<int> updateCounter_;
Z
zhangjinchao01 已提交
469 470
  bool parameterUpdated_;

471
  /// indicate whether inArgs is copied before forward()
Z
zhangjinchao01 已提交
472
  bool inArgsCopied_;
H
hedaoyuan 已提交
473
  int batchSize_;
Z
zhangjinchao01 已提交
474 475 476
};

}  // namespace paddle