RemoteParameterUpdater.h 14.0 KB
Newer Older
Z
zhangjinchao01 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
/* Copyright (c) 2016 Baidu, Inc. All Rights Reserve.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */


#pragma once

#include <thread>
#include <functional>
#include "paddle/pserver/ParameterClient2.h"
#include "ParameterUpdater.h"
#include "paddle/utils/Util.h"
#include "paddle/utils/Queue.h"

namespace paddle {

/**
 * this module call ParameterClient to exchange parameters among all
 * parameters servers.
 */

32
// TODO(yanfei):
Z
zhangjinchao01 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
// I think that the biggest feature of rdma is packet lossless control
// feature instead of high bandwiths, zero copy and gpu-direct rdma in
// theroy.
// But zero-copy and gpu-direct rdma features can help to reduce latency
// caused by os system.
// So, for some specified cluster, such as high density gpu cluster,
// gpu-direct and zero copy could help to improve cluster communication
// performance.
//

/**
 * normal remote parameter updater for dense parameters.
 *
 * it first packs all parameters for all pservers using ParameterClient
 * module, then wait for return merged parameters data from all pservers.
 * the synchronization pattern specified by sync-sgd or async-sgd is
 * achieved by all pservers with the help of the controller within this
 * remote parameter updater.
 * this module indeedly bridges the gradient machines and parameter servers.
 * it help to transfer the parameters from acceleration device to cpu end
 * for network. it contains additional parameters copy buffers for
 * acceleration devices at cpu end, such as gpu, otherwise it will
 * directly use original parameters data to launching.
 *
 * this remote parameter updater do not use pipeline mechanism to hide
 * copy latency from gpu to cpu buffer, as well as the overlapped between
 * backward and communication is not supported.
 */
class RemoteParameterUpdater : public ParameterUpdater {
public:
  RemoteParameterUpdater(
      const OptimizationConfig& config, int expectedPpassCount,
      std::unique_ptr<ParameterUpdater>&& localUpdater = nullptr);
  ~RemoteParameterUpdater() {
    if (controllerThread_) {
      controllerThread_->join();
    }
  }

  /**
   * initialize the internal parameter client and itself.
   */
  virtual void init(std::vector<ParameterPtr>& parameters);
  /**
   * start batch
   * one batch training exhibits stateful feature to help
   * to do performance tuning, sgd optimization if necessary.
   */
  virtual PassType startBatch(int64_t batchSize) {
    if (localUpdater_) {
      localUpdater_->startBatch(batchSize);
    }
    batchSize_ = batchSize;
    batchStatus_ = BATCH_START;
    return PASS_TRAIN;
  }

  /**
   * send parameters to pservers and get returned parameters
   * from all pservers if necessary. it will implictly
   * cooperate with controller thread for sync-sgd.
   */
  virtual void finishBatch(real cost);
  virtual void startPass();
  virtual bool finishPass(real cost);

#ifndef PADDLE_DISABLE_TIMER
  virtual void setForwardbackwardTime(uint64_t delta) {
    parameterClient_->setForwardbackwardTime(delta);
  }
#endif

  virtual void apply();
  virtual void restore();

protected:
  /**
   * control all pservers with all trainers for sync-sgd
   */
  virtual void controller();

  /**
   * work need to do after finishBatch
   */
  virtual void updateImpl(Parameter* para);

  void startController();

  /**
   * @brief copy parameters from cpu host to device, such as gpu.
   *
   * @note  return if all data are transfered.
   */
  void copyParametersToDevice(ParameterType parameterType);

  /**
   * @brief copy parameters from device to cpu host
   *
   * @note  return if all data are transfered
   */
  void copyParametersFromDevice(ParameterType parameterType);

protected:
  /// Optimization config used to guide initialization and finishBatch
  OptimizationConfig config_;
  /// internal parameter client object for exchanging data with pserver
  std::unique_ptr<ParameterClient2> parameterClient_;
  /// internal shadow buffer at cpu host end, use original parameters_
  /// if no acceleration devices are used.
  std::vector<ParameterPtr> cpuParameters_;
  /// local updater for aggregating multi-batches local delta
  std::unique_ptr<ParameterUpdater> localUpdater_;
  /// the size of mini-batch
  int64_t batchSize_;
  /// batches passed
  int64_t numBatches_;
  /// for stateful control
  BatchStatus batchStatus_;
  /// controller thread for sync-sgd
  std::unique_ptr<std::thread> controllerThread_;
  /// passed alread finished
  int64_t passCount_;
  /// expected passes to finished
  int64_t expectedPassCount_;
  /// use normal synchronization communication if True
  bool separateSendAndRecv_;
  /// true if it's first pass
  bool isFirstPass_;
  bool useApplyInPserver_;

  static const std::string kAverage;
  static const std::string kElasticAverage;
};

167
// TODO(yanfei):
Z
zhangjinchao01 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
// do parameters level synchronization Optimization at pserver end with
// ConcurrentRemoteParameterUpdater to get more parallelization, at last
// to really hide pserver latency in backward computation.
//
/**
 * this updater add additional optimization for overlapping synchronization
 * from pservers with backward computation.
 *
 * parameter can be sent to pservers when related backward stage is finished.
 * this concurrent udpater does data copy from acceleration device to host
 * memory aynchronously. In addition internal parameter client reads data in
 * host memory and send them to all pservers in next stage. So this class
 * help to pipeline device-to-host copy and host-to-network to hide network
 * latency in backward stage.
 * it contains separate send and recv thread for pipeline usage.
 */
class ConcurrentRemoteParameterUpdater : public RemoteParameterUpdater {
public:
  ConcurrentRemoteParameterUpdater(
      OptimizationConfig config, int expectedPassCount,
      std::unique_ptr<ParameterUpdater>&& localUpdater);
  ~ConcurrentRemoteParameterUpdater();

  /**
   * @brief send paraemeters to all pservers
   *
   * @note  it just signal the end signal to internal parameter client
   *        to finished the aynchronous send action. In addition it also
   *        do synchronization for all asynchronous host-to-device copy.
   */
  virtual void finishBatch(real cost);

protected:
  virtual void updateImpl(Parameter* para);
  /// internal thread called in send thread
  void send(Parameter* para);  // para == NULL indicate end of a minibatch
  /// internal function called in recv thread
  void recv(Parameter* para);
  /**
   * @brief send thread for relaying data from gradient to parameter client
   *
   * @note  just pipe data to internal parameter client for pipeline
   */
  void send();
  /**
   * @brief recv thread for relaying data from internal parameter client to
   *        host memory
   *
   * @note  it contains the asynchronous data copy form host to device
   */
  void recv();
  /// copy specified parameter from host to device
  void copySingleParaToDevice(Parameter* para, ParameterType parameterType);
  /// copy specified parameter from device to host
  void copySingleParaFromDevice(Parameter* para, ParameterType parameterType);
  bool needToUpdateRemotely() {
    return (numBatches_ + 1) % config_.num_batches_per_send_parameter() == 0;
  }

private:
  /// send thread used for overlapping
  std::unique_ptr<std::thread> sendThread_;
  /// recv thread used for overlapping
  std::unique_ptr<std::thread> recvThread_;
  /// buffer queue for overlapping
  Queue<int> sendQueue_;
  /// buffer queue for overlapping
  Queue<int> recvQueue_;
  /// flags indicating to stop
  bool stopping_;
  /// conditional variable for threads synchronization between the
  /// thread calling finishBatch and internal recv thread
  LockedCondition finishBatchCond_;
  bool oneBatchFinished_;
};

244
// TODO(yanfei):
Z
zhangjinchao01 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
// merge sparse updater with dense updater, and could help to reduce
// the synchronization between sparse and dense udpater. it could also
// reduce the threads for managing all connections.
/**
 * this class is specified for updating sparse parameters.
 *
 * it allows part of parameter to be exchanged with all pservers.
 * if sparse input assigned, part gradients of first hidden layer
 * could remained zero which can not need to be exchanged within
 * all pservers. this is the key optimization point for this updater
 *
 * for updating sparse parameters, all latest parameters are stored
 * in pservers instead of keeping full copy at train end, so need
 * prefetch parameters weight value which can be changed in next-batch
 * before doing next forwardbackward. Also, with above fact that the
 * parameters can be stored in pserver instead of trainer, we can
 * fetch specified parmeters if necessary, and can support huge
 * parameters which is larger enough than  the RAM size in single
 * node.
 *
 * Internally, this updater will direct internal parameter client
 * to encapsulate sparse specified message for all pservers.
 */
class SparseRemoteParameterUpdater : public ParameterUpdater {
public:
  SparseRemoteParameterUpdater(const OptimizationConfig& config,
                               int expectedPassCount, bool testing);
  ~SparseRemoteParameterUpdater() {
    if (controllerThread_) {
      controllerThread_->join();
    }
  }

  /// initialization
  virtual void init(std::vector<ParameterPtr>& parameters);

  /// stateful batch control
  virtual PassType startBatch(int64_t batchSize);
  /// send all sparse related parameters to all pservers
  virtual void finishBatch(real cost);
  virtual void startPass();
  virtual bool finishPass(real cost);

  virtual void apply();
  virtual void restore();

  /// load parameters from pservers
  virtual void loadParametersRemote(const std::string& dirName);
  /// save parameters to pservers
  virtual void saveParametersRemote(const std::string& dirName);
  /**
   * @brief get latest sparse parameters value from all pservers
   *
   * @note  call it before next mini-batch
   */
  virtual void getParametersRemote(bool fullSize, bool apply);
  virtual void randParametersRemote();
#ifndef PADDLE_DISABLE_TIMER
  virtual void setForwardbackwardTime(uint64_t delta) {
    parameterClient_->setForwardbackwardTime(delta);
  }
#endif

protected:
  /// update implimentation, not implemented
  virtual void updateImpl(Parameter* para) {}

  /// internal controller routine for controller thread
  virtual void controller();

  /// start controller thread
  void startController();

protected:
  /// optimization config
  OptimizationConfig config_;
  /// internal parameter client
  std::unique_ptr<ParameterClient2> parameterClient_;
  int64_t batchSize_;
  std::unique_ptr<std::thread> controllerThread_;
  int64_t passCount_;
  int64_t expectedPassCount_;
  bool testing_;
  bool useApplyInPserver_;
};

/**
 * class for supporting normal updater and sparse updater
 *
 * not all parts of one model are sparse, so it exists dense updater
 * for normal layers which sparse updater is for sparse layers.
 *
 * it directly call internal dense and sparse udpater individually.
 */
class SparseRemoteParameterUpdaterComposite : public ParameterUpdaterComposite {
public:
  enum {
    UPDATER_SPARSE_REMOTE = 0,  // execute in sync thread pool(tid:0)
    UPDATER_NORMAL = 1,         // execute in Owner thread(tid:1)
    NUMBER_UPDATERS = 2,
  };
  /**
   * @brief create one dense updater and one sparse updater
   *
   * @note  use syncThreadPool to synchronize these two updaters
   */
  SparseRemoteParameterUpdaterComposite(
      const OptimizationConfig& config, int expectedPassCount, bool testing,
      std::unique_ptr<ParameterUpdater>&& normalUpdater) {
    updaters_.resize(NUMBER_UPDATERS);
    updaters_[UPDATER_SPARSE_REMOTE].reset(
        new SparseRemoteParameterUpdater(config, expectedPassCount, testing));
    updaters_[UPDATER_NORMAL] = std::move(normalUpdater);

    syncThreadPool_.reset(new SyncThreadPool(NUMBER_UPDATERS - 1));
  }

  /// initialization of dense and sparse updaters
  virtual void init(std::vector<ParameterPtr>& parameters);
};

class ParameterUpdaterCreators {
public:
  /**
   * @brief add a creator to create custom ParameterUpdater while training.
   *        The creator is a function with type (alogrithm, optConfig, isLocal,
   *        numPasses) -> ParameterUpdater*. Trainer will use this
   *        ParameterUpdater if creator can create a no nullptr
   *        ParameterUpdater. Return nullptr will use trainer's default
   *        updaters.
   *
   * @param creator method which can create ParameterUpdater.
   */
  static void addCreator(
      const std::function<ParameterUpdater*(
          const std::string&,  // algo
          const OptimizationConfig&,  // optConfig
          bool,  // isLocal
          size_t  // numPasses
        )>& creator) {    // NOLINT  explicit move closing ) in this line
                          // for readability
    constructors_.push_back(creator);
  }

  /**
   * @brief Try to create an updater by given algo, optConfig, isLocal,
   *        numPasses. Return nullptr if cannot create anyone.
   * @param algo algorithm string.
   * @param optConfig optimization config.
   * @param isLocal is in local mode or not.
   * @param numPasses total passes that trainer will train.
   * @return nullptr if fail, not nullptr if we can create an updater.
   */
  static ParameterUpdater* tryCreateUpdater(const std::string& algo,
                                            const OptimizationConfig& optConfig,
                                            bool isLocal,
                                            size_t numPasses) {
    for (auto & c : constructors_) {
      if (auto updater = c(algo, optConfig, isLocal, numPasses)) {
        return updater;
      }
    }
    return nullptr;
  }

private:
  static std::vector<std::function<ParameterUpdater*(
      const std::string&, const OptimizationConfig&, bool, size_t)>>
  constructors_;
};

}  // namespace paddle