RemoteParameterUpdater.h 14.1 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Z
zhangjinchao01 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <functional>
Y
Yu Yang 已提交
18
#include <thread>
Z
zhangjinchao01 已提交
19
#include "ParameterUpdater.h"
Y
Yu Yang 已提交
20
#include "paddle/pserver/ParameterClient2.h"
Z
zhangjinchao01 已提交
21
#include "paddle/utils/Queue.h"
Y
Yu Yang 已提交
22
#include "paddle/utils/Util.h"
Z
zhangjinchao01 已提交
23 24 25

namespace paddle {

26
// TODO(yanfei):
Z
zhangjinchao01 已提交
27 28 29 30 31 32 33 34 35 36 37
// I think that the biggest feature of rdma is packet lossless control
// feature instead of high bandwiths, zero copy and gpu-direct rdma in
// theroy.
// But zero-copy and gpu-direct rdma features can help to reduce latency
// caused by os system.
// So, for some specified cluster, such as high density gpu cluster,
// gpu-direct and zero copy could help to improve cluster communication
// performance.
//

/**
38
 * Normal remote parameter updater for dense parameters.
Z
zhangjinchao01 已提交
39
 *
40 41 42
 * It first packs all parameters for all pservers using ParameterClient
 * module, then wait for merged parameters data from all pservers.
 * The synchronization pattern specified by sync-sgd or async-sgd is
Z
zhangjinchao01 已提交
43 44
 * achieved by all pservers with the help of the controller within this
 * remote parameter updater.
45 46 47
 * This module indeedly bridges the gradient machines and parameter servers.
 * It helps to transfer the parameters from acceleration device to cpu end
 * for network. It contains additional parameters copy buffers for
Z
zhangjinchao01 已提交
48
 * acceleration devices at cpu end, such as gpu, otherwise it will
49
 * directly use original parameters data to update pservers.
Z
zhangjinchao01 已提交
50
 *
51 52
 * This remote parameter updater does not use pipeline mechanism to hide
 * copy latency from gpu to cpu buffer. In addition the overlapped between
Z
zhangjinchao01 已提交
53 54 55 56 57
 * backward and communication is not supported.
 */
class RemoteParameterUpdater : public ParameterUpdater {
public:
  RemoteParameterUpdater(
58 59
      const OptimizationConfig& config,
      int expectedPpassCount,
Z
zhangjinchao01 已提交
60 61 62 63 64 65 66 67 68 69
      std::unique_ptr<ParameterUpdater>&& localUpdater = nullptr);
  ~RemoteParameterUpdater() {
    if (controllerThread_) {
      controllerThread_->join();
    }
  }

  /**
   * initialize the internal parameter client and itself.
   */
Y
Yu Yang 已提交
70
  virtual void init(const std::vector<ParameterPtr>& parameters);
Z
zhangjinchao01 已提交
71
  /**
72 73 74 75
   * @brief start batch
   *
   * @note  one batch training exhibits stateful feature to help
   *        to do performance tuning, sgd optimization if necessary.
Z
zhangjinchao01 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
   */
  virtual PassType startBatch(int64_t batchSize) {
    if (localUpdater_) {
      localUpdater_->startBatch(batchSize);
    }
    batchSize_ = batchSize;
    batchStatus_ = BATCH_START;
    return PASS_TRAIN;
  }

  /**
   * send parameters to pservers and get returned parameters
   * from all pservers if necessary. it will implictly
   * cooperate with controller thread for sync-sgd.
   */
  virtual void finishBatch(real cost);
  virtual void startPass();
  virtual bool finishPass(real cost);

#ifndef PADDLE_DISABLE_TIMER
  virtual void setForwardbackwardTime(uint64_t delta) {
    parameterClient_->setForwardbackwardTime(delta);
  }
#endif

  virtual void apply();
  virtual void restore();

protected:
  /**
   * control all pservers with all trainers for sync-sgd
   */
  virtual void controller();

  /**
   * work need to do after finishBatch
   */
  virtual void updateImpl(Parameter* para);

  void startController();

  /**
   * @brief copy parameters from cpu host to device, such as gpu.
   *
   * @note  return if all data are transfered.
   */
  void copyParametersToDevice(ParameterType parameterType);

  /**
   * @brief copy parameters from device to cpu host
   *
   * @note  return if all data are transfered
   */
  void copyParametersFromDevice(ParameterType parameterType);

protected:
  /// Optimization config used to guide initialization and finishBatch
  OptimizationConfig config_;
  /// internal parameter client object for exchanging data with pserver
  std::unique_ptr<ParameterClient2> parameterClient_;
  /// internal shadow buffer at cpu host end, use original parameters_
  /// if no acceleration devices are used.
  std::vector<ParameterPtr> cpuParameters_;
  /// local updater for aggregating multi-batches local delta
  std::unique_ptr<ParameterUpdater> localUpdater_;
  /// the size of mini-batch
  int64_t batchSize_;
  /// batches passed
  int64_t numBatches_;
  /// for stateful control
  BatchStatus batchStatus_;
  /// controller thread for sync-sgd
  std::unique_ptr<std::thread> controllerThread_;
  /// passed alread finished
  int64_t passCount_;
  /// expected passes to finished
  int64_t expectedPassCount_;
  /// use normal synchronization communication if True
  bool separateSendAndRecv_;
  /// true if it's first pass
  bool isFirstPass_;
  bool useApplyInPserver_;

  static const std::string kAverage;
  static const std::string kElasticAverage;
};

163
// TODO(yanfei):
Z
zhangjinchao01 已提交
164 165 166 167 168
// do parameters level synchronization Optimization at pserver end with
// ConcurrentRemoteParameterUpdater to get more parallelization, at last
// to really hide pserver latency in backward computation.
//
/**
169
 * This updater add additional optimization for overlapping synchronization
Z
zhangjinchao01 已提交
170 171
 * from pservers with backward computation.
 *
172 173
 * Parameter can be sent to pservers when related backward stage is finished.
 * This concurrent udpater does data copy from acceleration device to host
Z
zhangjinchao01 已提交
174 175 176 177
 * memory aynchronously. In addition internal parameter client reads data in
 * host memory and send them to all pservers in next stage. So this class
 * help to pipeline device-to-host copy and host-to-network to hide network
 * latency in backward stage.
178
 * It contains separate send and recv thread for pipeline usage.
Z
zhangjinchao01 已提交
179 180 181 182
 */
class ConcurrentRemoteParameterUpdater : public RemoteParameterUpdater {
public:
  ConcurrentRemoteParameterUpdater(
183 184
      OptimizationConfig config,
      int expectedPassCount,
Z
zhangjinchao01 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
      std::unique_ptr<ParameterUpdater>&& localUpdater);
  ~ConcurrentRemoteParameterUpdater();

  /**
   * @brief send paraemeters to all pservers
   *
   * @note  it just signal the end signal to internal parameter client
   *        to finished the aynchronous send action. In addition it also
   *        do synchronization for all asynchronous host-to-device copy.
   */
  virtual void finishBatch(real cost);

protected:
  virtual void updateImpl(Parameter* para);
  /// internal thread called in send thread
  void send(Parameter* para);  // para == NULL indicate end of a minibatch
  /// internal function called in recv thread
  void recv(Parameter* para);
  /**
   * @brief send thread for relaying data from gradient to parameter client
   *
   * @note  just pipe data to internal parameter client for pipeline
   */
  void send();
  /**
   * @brief recv thread for relaying data from internal parameter client to
   *        host memory
   *
   * @note  it contains the asynchronous data copy form host to device
   */
  void recv();
  /// copy specified parameter from host to device
  void copySingleParaToDevice(Parameter* para, ParameterType parameterType);
  /// copy specified parameter from device to host
  void copySingleParaFromDevice(Parameter* para, ParameterType parameterType);
  bool needToUpdateRemotely() {
    return (numBatches_ + 1) % config_.num_batches_per_send_parameter() == 0;
  }

private:
  /// send thread used for overlapping
  std::unique_ptr<std::thread> sendThread_;
  /// recv thread used for overlapping
  std::unique_ptr<std::thread> recvThread_;
  /// buffer queue for overlapping
  Queue<int> sendQueue_;
  /// buffer queue for overlapping
  Queue<int> recvQueue_;
  /// flags indicating to stop
  bool stopping_;
  /// conditional variable for threads synchronization between the
  /// thread calling finishBatch and internal recv thread
  LockedCondition finishBatchCond_;
  bool oneBatchFinished_;
};

241
// TODO(yanfei):
Z
zhangjinchao01 已提交
242 243 244 245
// merge sparse updater with dense updater, and could help to reduce
// the synchronization between sparse and dense udpater. it could also
// reduce the threads for managing all connections.
/**
246
 * This class is specified for updating sparse parameters.
Z
zhangjinchao01 已提交
247
 *
248 249
 * It allows part of parameter to be exchanged with all pservers.
 * If sparse input assigned, part gradients of first hidden layer
Z
zhangjinchao01 已提交
250
 * could remained zero which can not need to be exchanged within
251
 * all pservers. This is the key optimization point for this updater
Z
zhangjinchao01 已提交
252
 *
253 254
 * For updating sparse parameters, all latest parameters are stored
 * in pservers instead of keeping full copy at train end, so need to
Z
zhangjinchao01 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267
 * prefetch parameters weight value which can be changed in next-batch
 * before doing next forwardbackward. Also, with above fact that the
 * parameters can be stored in pserver instead of trainer, we can
 * fetch specified parmeters if necessary, and can support huge
 * parameters which is larger enough than  the RAM size in single
 * node.
 *
 * Internally, this updater will direct internal parameter client
 * to encapsulate sparse specified message for all pservers.
 */
class SparseRemoteParameterUpdater : public ParameterUpdater {
public:
  SparseRemoteParameterUpdater(const OptimizationConfig& config,
268 269
                               int expectedPassCount,
                               bool testing);
Z
zhangjinchao01 已提交
270 271 272 273 274 275 276
  ~SparseRemoteParameterUpdater() {
    if (controllerThread_) {
      controllerThread_->join();
    }
  }

  /// initialization
Y
Yu Yang 已提交
277
  virtual void init(const std::vector<ParameterPtr>& parameters);
Z
zhangjinchao01 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329

  /// stateful batch control
  virtual PassType startBatch(int64_t batchSize);
  /// send all sparse related parameters to all pservers
  virtual void finishBatch(real cost);
  virtual void startPass();
  virtual bool finishPass(real cost);

  virtual void apply();
  virtual void restore();

  /// load parameters from pservers
  virtual void loadParametersRemote(const std::string& dirName);
  /// save parameters to pservers
  virtual void saveParametersRemote(const std::string& dirName);
  /**
   * @brief get latest sparse parameters value from all pservers
   *
   * @note  call it before next mini-batch
   */
  virtual void getParametersRemote(bool fullSize, bool apply);
  virtual void randParametersRemote();
#ifndef PADDLE_DISABLE_TIMER
  virtual void setForwardbackwardTime(uint64_t delta) {
    parameterClient_->setForwardbackwardTime(delta);
  }
#endif

protected:
  /// update implimentation, not implemented
  virtual void updateImpl(Parameter* para) {}

  /// internal controller routine for controller thread
  virtual void controller();

  /// start controller thread
  void startController();

protected:
  /// optimization config
  OptimizationConfig config_;
  /// internal parameter client
  std::unique_ptr<ParameterClient2> parameterClient_;
  int64_t batchSize_;
  std::unique_ptr<std::thread> controllerThread_;
  int64_t passCount_;
  int64_t expectedPassCount_;
  bool testing_;
  bool useApplyInPserver_;
};

/**
330
 * Class for supporting normal updater and sparse updater
Z
zhangjinchao01 已提交
331
 *
332 333
 * Not all parts of one model are sparse, so it exists dense updater
 * for normal layers while sparse updater is for sparse layers.
Z
zhangjinchao01 已提交
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
 *
 * it directly call internal dense and sparse udpater individually.
 */
class SparseRemoteParameterUpdaterComposite : public ParameterUpdaterComposite {
public:
  enum {
    UPDATER_SPARSE_REMOTE = 0,  // execute in sync thread pool(tid:0)
    UPDATER_NORMAL = 1,         // execute in Owner thread(tid:1)
    NUMBER_UPDATERS = 2,
  };
  /**
   * @brief create one dense updater and one sparse updater
   *
   * @note  use syncThreadPool to synchronize these two updaters
   */
  SparseRemoteParameterUpdaterComposite(
350 351 352
      const OptimizationConfig& config,
      int expectedPassCount,
      bool testing,
Z
zhangjinchao01 已提交
353 354 355 356 357 358 359 360 361 362
      std::unique_ptr<ParameterUpdater>&& normalUpdater) {
    updaters_.resize(NUMBER_UPDATERS);
    updaters_[UPDATER_SPARSE_REMOTE].reset(
        new SparseRemoteParameterUpdater(config, expectedPassCount, testing));
    updaters_[UPDATER_NORMAL] = std::move(normalUpdater);

    syncThreadPool_.reset(new SyncThreadPool(NUMBER_UPDATERS - 1));
  }

  /// initialization of dense and sparse updaters
Y
Yu Yang 已提交
363
  virtual void init(const std::vector<ParameterPtr>& parameters);
Z
zhangjinchao01 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
};

class ParameterUpdaterCreators {
public:
  /**
   * @brief add a creator to create custom ParameterUpdater while training.
   *        The creator is a function with type (alogrithm, optConfig, isLocal,
   *        numPasses) -> ParameterUpdater*. Trainer will use this
   *        ParameterUpdater if creator can create a no nullptr
   *        ParameterUpdater. Return nullptr will use trainer's default
   *        updaters.
   *
   * @param creator method which can create ParameterUpdater.
   */
  static void addCreator(
      const std::function<ParameterUpdater*(
380
          const std::string&,         // algo
Z
zhangjinchao01 已提交
381
          const OptimizationConfig&,  // optConfig
382 383 384
          bool,                       // isLocal
          size_t                      // numPasses
          )>& creator) {  // NOLINT  explicit move closing ) in this line
Z
zhangjinchao01 已提交
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
                          // for readability
    constructors_.push_back(creator);
  }

  /**
   * @brief Try to create an updater by given algo, optConfig, isLocal,
   *        numPasses. Return nullptr if cannot create anyone.
   * @param algo algorithm string.
   * @param optConfig optimization config.
   * @param isLocal is in local mode or not.
   * @param numPasses total passes that trainer will train.
   * @return nullptr if fail, not nullptr if we can create an updater.
   */
  static ParameterUpdater* tryCreateUpdater(const std::string& algo,
                                            const OptimizationConfig& optConfig,
                                            bool isLocal,
                                            size_t numPasses) {
402
    for (auto& c : constructors_) {
Z
zhangjinchao01 已提交
403 404 405 406 407 408 409 410 411 412
      if (auto updater = c(algo, optConfig, isLocal, numPasses)) {
        return updater;
      }
    }
    return nullptr;
  }

private:
  static std::vector<std::function<ParameterUpdater*(
      const std::string&, const OptimizationConfig&, bool, size_t)>>
413
      constructors_;
Z
zhangjinchao01 已提交
414 415 416
};

}  // namespace paddle