diff --git a/paddle/gserver/layers/Layer.h b/paddle/gserver/layers/Layer.h index 7c4bea072157aac17787afab184b51c09ff656f2..47182c9ecc695f4d79089d06d6a1a61b878ce409 100644 --- a/paddle/gserver/layers/Layer.h +++ b/paddle/gserver/layers/Layer.h @@ -14,20 +14,18 @@ limitations under the License. */ #pragma once -#include #include #include #include "ModelConfig.pb.h" #include "paddle/function/Function.h" +#include "paddle/gserver/activations/ActivationFunction.h" #include "paddle/math/CpuSparseMatrix.h" +#include "paddle/parameter/Argument.h" #include "paddle/parameter/Parameter.h" +#include "paddle/parameter/Weight.h" #include "paddle/utils/ClassRegistrar.h" #include "paddle/utils/Util.h" -#include -#include -#include "paddle/gserver/activations/ActivationFunction.h" - /// Macro for registering a layer type. /// Example: REGISTER_LAYER(crf_error, CRFDecodingErrorLayer); #define REGISTER_LAYER(__type_name, __class_name) \ diff --git a/paddle/gserver/tests/test_RecurrentGradientMachine.cpp b/paddle/gserver/tests/test_RecurrentGradientMachine.cpp index 150850da4d49a2320acc70ed370cf8728d5c9def..4a846397e6cf3100f948af46874b0739e32bf4a5 100644 --- a/paddle/gserver/tests/test_RecurrentGradientMachine.cpp +++ b/paddle/gserver/tests/test_RecurrentGradientMachine.cpp @@ -14,6 +14,7 @@ limitations under the License. */ #include #include +#include #include #include #include diff --git a/paddle/parameter/Argument.cpp b/paddle/parameter/Argument.cpp index 645bf737990638df042723ed827d0823cb201e72..6d9365af2d14673146d9e427138bf6dd5f5b41b6 100644 --- a/paddle/parameter/Argument.cpp +++ b/paddle/parameter/Argument.cpp @@ -570,7 +570,7 @@ void Argument::poolSequenceWithStride(const Argument& input, CHECK(input.sequenceStartPositions); CHECK_EQ(input.hasSubseq(), 0UL); - CHECK_GT(stride, 0) << "stride must larger than 0"; + CHECK_GT(stride, 0UL) << "stride must larger than 0"; size_t numSequences = input.getNumSequences(); ICpuGpuVector::resizeOrCreate( sequenceStartPositions, numSequences + 1, false); diff --git a/paddle/parameter/ParallelParameter.cpp b/paddle/parameter/ParallelParameter.cpp deleted file mode 100644 index cea77e5b1787c25ecb9ccd42e948bf90973fd4cb..0000000000000000000000000000000000000000 --- a/paddle/parameter/ParallelParameter.cpp +++ /dev/null @@ -1,209 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include -#include "paddle/utils/Logging.h" - -#include "ParallelParameter.h" - -namespace paddle { - -UpdateFunction paramUpdateFunctions[UPDATE_TYPE_NUM] = { - nullptr, // &ParallelParameter::singleUpdate, /* single thread */ - nullptr, // &ParallelParameter::controlUpdate, /* controller thread */ - &ParallelParameter::majorUpdate, /* major thread */ - &ParallelParameter::minorUpdate, /* minor thread */ - - nullptr, /* master */ - &ParallelParameter::slaveUpdate, /* slave */ -}; -ParallelParameterPtr ParallelParameter::create(TrainerRole role, - ParameterPtr localParam, - int asyncCount) { - ParallelParameterPtr ptr = nullptr; - switch (role) { - case TRAINER_ROLE_CONTROL: - case TRAINER_ROLE_MAJOR: - case TRAINER_ROLE_MINOR: - ptr = std::make_shared(role, localParam); - break; - case TRAINER_ROLE_MASTER: - case TRAINER_ROLE_SLAVE: - ptr = std::make_shared(role, asyncCount, localParam); - break; - default: - LOG(FATAL) << "unknown role " << role << "\n"; - } - return ptr; -} -void ParallelParameter::syncUpdate(TrainerRole role, real learnRate) { - if (paramUpdateFunctions[role]) { - (this->*paramUpdateFunctions[role])(learnRate); - } -} - -void SyncParameter::attachControlParam(ParallelParameterPtr controler) { - controlParam_ = controler; -} - -void SyncParameter::attachMajorParam(ParallelParameterPtr partner) { - majorPartners_.push_back(partner); - if (role_ == TRAINER_ROLE_CONTROL) { - localParam_->setSharedCount(majorPartners_.size()); - } - // partnerParam_ = partner; -} - -void SyncParameter::attachMinorParam(ParallelParameterPtr partner, - int deviceId) { - minorPartners_.push_back(partner); - minorDeviceIds_.push_back(deviceId); - // partnerParam_ = partner; -} - -void SyncParameter::waitAllMajorGradReady() { - for (size_t i = 0; i < majorPartners_.size(); i++) { - majorPartners_[i]->waitGradReady(); - partnerParam_ = majorPartners_[i]->getLocalParameter(); - VectorPtr localGrad = localParam_->getBuf(PARAMETER_GRADIENT); - VectorPtr patnrGrad = partnerParam_->getBuf(PARAMETER_GRADIENT); - if (FLAGS_use_gpu) hl_set_device(minorDeviceIds_[i]); - localGrad->add(*patnrGrad); - } -} - -void SyncParameter::synchronizeParamter() { - valueSem_->wait(); - if (role_ == TRAINER_ROLE_MINOR) { - /* copy the value from controller */ - VectorPtr cntrlVec = - (controlParam_->getLocalParameter())->getBuf(PARAMETER_VALUE); - VectorPtr localVec = localParam_->getBuf(PARAMETER_VALUE); - localVec->copyFrom(*cntrlVec); - - /* dispatch the value to major */ - for (size_t i = 0; i < majorPartners_.size(); i++) { - VectorPtr majorVec = - (majorPartners_[i]->getLocalParameter())->getBuf(PARAMETER_VALUE); - majorVec->copyFrom(*localVec); - majorPartners_[i]->postValueReady(); - } - } -} - -void SyncParameter::singleUpdate(real learnRate) { - CHECK(role_ == TRAINER_ROLE_SINGLE); - localParam_->updateWithGradient(learnRate); -} - -void SyncParameter::controlUpdate(const UpdateCallback &callBack) { - CHECK(role_ == TRAINER_ROLE_CONTROL); - CHECK(gradSem_ != NULL && valueSem_ != NULL); - CHECK(majorPartners_.size()); - - /* update */ - if (callBack) { - callBack(localParam_.get()); - localParam_->clearGradient(); - } - - for (size_t i = 0; i < minorPartners_.size(); i++) { - minorPartners_[i]->postValueReady(); - } -} - -void SyncParameter::majorUpdate(real learnRate) { - (void)learnRate; - CHECK(role_ == TRAINER_ROLE_MAJOR); - CHECK(gradSem_ != NULL && valueSem_ != NULL); - CHECK(minorPartners_.size() && controlParam_); - - /* wait the minor-Gradient is ready */ - for (size_t i = 0; i < minorPartners_.size(); i++) { - minorPartners_[i]->waitGradReady(); - partnerParam_ = minorPartners_[i]->getLocalParameter(); - VectorPtr localGrad = localParam_->getBuf(PARAMETER_GRADIENT); - VectorPtr minorGrad = partnerParam_->getBuf(PARAMETER_GRADIENT); - localGrad->add(*minorGrad); - } - - /* notice the controller that the gradient is ready */ - gradSem_->post(); -} - -void SyncParameter::minorUpdate(real learnRate) { - (void)learnRate; - CHECK(role_ == TRAINER_ROLE_MINOR); - CHECK(gradSem_ != NULL && valueSem_ != NULL); - - // notice the major that the gradient is ready - gradSem_->post(); -} - -AsyncParameter::AsyncParameter(TrainerRole role, - int asyncCount, - ParameterPtr localParam) - : ParallelParameter(role, localParam) { - asyncCount_ = asyncCount; - accumCounter_ = 0; - gradientAccum_ = Vector::create(localParam->getSize(), localParam->useGpu()); - gradientAccum_->zeroMem(); -} - -void AsyncParameter::slaveUpdate(real learnRate) { - /* increase the accumCounter_ */ - accumCounter_++; - - /* accumulate the gradient to the buffer */ - VectorPtr grad = localParam_->getBuf(PARAMETER_GRADIENT); - gradientAccum_->add(*grad); - - /* if need to be synchronized with the master */ - if (accumCounter_ == asyncCount_) { - gradSem_->post(); - // accumCounter_ = 0; NOTICE: the upper-function need to reset the counter - } else { // self update - localParam_->updateWithGradient(learnRate); - } - localParam_->clearGradient(); -} - -bool AsyncParameter::masterUpdate(ParallelParameterPtr slaveParam, - const UpdateCallback &callback) { - CHECK(slaveParam && callback); - - /* wait the slave is ready */ - if (!slaveParam->timeWaitGradReady(5)) { - return false; - } - - AsyncParameter *asyncParam = dynamic_cast(slaveParam.get()); - - /* get the accum-gradient to update local parameter */ - VectorPtr slaveVec = asyncParam->getAccum(); - localParam_->getBuf(PARAMETER_GRADIENT)->copyFrom(*slaveVec); - callback(localParam_.get()); - // slaveVec->zeroMem(); - - /* copy the newest parameter-value to the slave */ - slaveVec = (slaveParam->getLocalParameter())->getBuf(PARAMETER_VALUE); - slaveVec->copyFrom(*(localParam_->getBuf(PARAMETER_VALUE))); - - /* release the semphore */ - slaveParam->postValueReady(); - - return true; -} - -} // namespace paddle diff --git a/paddle/parameter/ParallelParameter.h b/paddle/parameter/ParallelParameter.h deleted file mode 100644 index 2e7c18b8084dc25b9f2f7630390bb4553ac703c9..0000000000000000000000000000000000000000 --- a/paddle/parameter/ParallelParameter.h +++ /dev/null @@ -1,244 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include - -#include -#include -#include -#include -#include - -#include "hl_gpu.h" -#include "paddle/math/Vector.h" -#include "paddle/parameter/Parameter.h" -#include "paddle/parameter/ParameterUpdateFunctions.h" -#include "paddle/utils/Common.h" -#include "paddle/utils/Flags.h" -#include "paddle/utils/Locks.h" - -#include "ParameterConfig.pb.h" - -namespace paddle { - -class ParallelParameter; -class SyncParameter; -class AsyncParameter; - -typedef std::shared_ptr ParallelParameterPtr; - -const int UPDATE_TYPE_NUM = 32; - -/** - * TrainRole denotes the role of current training, different roles have - * different jobs. - * - * control, major, minor are three kinds of role to support mutiple GPUs - * parallel SGD training. SM on GPU card has two groups, each group - * consist of a major and a minor. - * - * @param single single GPU card single thread training. - * - * - * @param control current parameter updates via control role, - * not participate in real training. control role is - * responsible for merging all major's gradient and - * update parameter value. - * - * @param major major role paticipates in real training, when local - * gradient is ready, merge its corresponding minor's - * gradient and notify controller: this group's gradient - * is already ready. - * - * @param minor minor role participates in real training, when local - * gradient is ready, only notify its corresponding major. - * In order to maximum apportion jobs, after controller - * updates the paramemter value, each group's minior - * reponses to dispatch the latest model into local and - * major. - */ -enum TrainerRole { - TRAINER_ROLE_SINGLE, - TRAINER_ROLE_CONTROL, - TRAINER_ROLE_MAJOR, - TRAINER_ROLE_MINOR, - TRAINER_ROLE_MASTER, - TRAINER_ROLE_SLAVE -}; -typedef void (ParallelParameter::*UpdateFunction)(real learnRate); - -class ParallelParameter { -public: - static ParallelParameterPtr create(TrainerRole role, - ParameterPtr localParam, - int asyncCount = 1); - - ParallelParameter(TrainerRole role, ParameterPtr localParam) { - role_ = role; - gradSem_.reset(new Semaphore(0)); - valueSem_.reset(new Semaphore(0)); - localParam_ = localParam; - } - - virtual ~ParallelParameter() {} - - ParameterPtr getLocalParameter() { return localParam_; } - bool timeWaitGradReady(int sec) { - struct timespec ts; - ts.tv_nsec = 0; - ts.tv_sec = time(NULL) + sec; - return gradSem_->timeWait(&ts); - } - void waitGradReady() { gradSem_->wait(); } - void postValueReady() { valueSem_->post(); } - - void syncUpdate(TrainerRole role, real learnRate); - - virtual void synchronizeParamter() = 0; - - /** - * for synchronous - */ - virtual void singleUpdate(real learnRate) { (void)learnRate; } - - virtual void controlUpdate(const UpdateCallback& callback) { (void)callback; } - - virtual void majorUpdate(real learnRate) { (void)learnRate; } - - virtual void minorUpdate(real learnRate) { (void)learnRate; } - - /** - * for asynchronous - */ - virtual void slaveUpdate(real learnRate) { (void)learnRate; } - -protected: - TrainerRole role_; - ParameterPtr localParam_; - std::unique_ptr - gradSem_; /// wether the local parameter-gradient is ready - std::unique_ptr - valueSem_; /// wether the local parameter-value is updated -}; - -/** - * this class is designed for multi-threading training. - * - * "Synchronous" means multiple GPUs calculate 1/4 mini-Batch, - * but will get only one gradient - */ -class SyncParameter : public ParallelParameter { -public: - SyncParameter(TrainerRole role, ParameterPtr localParam) - : ParallelParameter(role, localParam) { - controlParam_ = nullptr; - majorPartners_.clear(); - minorPartners_.clear(); - } - ~SyncParameter() { - majorPartners_.clear(); - minorPartners_.clear(); - } - void attachControlParam(ParallelParameterPtr controler); - - void attachMajorParam(ParallelParameterPtr partner); - - void attachMinorParam(ParallelParameterPtr partner, int deviceId); - - void waitAllMajorGradReady(); - - void synchronizeParamter(); - - void singleUpdate(real learnRate); - - void controlUpdate(const UpdateCallback& callback); - - void majorUpdate(real learnRate); - - void minorUpdate(real learnRate); - - std::vector& getMajorPartners() { - return majorPartners_; - } - - std::vector& getMinorPartners() { - return minorPartners_; - } - -private: - // The following variables are used in a multithreaded training situation - // partnerParam_ is local-parameter's partner - // controlParam_ is the controller-thread 's parameter - ParameterPtr partnerParam_; - std::vector majorPartners_; - std::vector minorPartners_; - std::vector minorDeviceIds_; - ParallelParameterPtr controlParam_; -}; - -class AsyncParameter : public ParallelParameter { -public: - AsyncParameter(TrainerRole role, int asyncCount, ParameterPtr localParam); - - void clearCounter() { accumCounter_ = 0; } - - VectorPtr getAccum() { return gradientAccum_; } - - void synchronizeParamter() { - if (accumCounter_ == asyncCount_) { - valueSem_->wait(); - clearCounter(); - gradientAccum_->zeroMem(); - } - } - - /** - * When asynchronous training, update strategy including slave and master. - * - * slave: If in range asyncCount, adopting self-update method. - * If beyond asyncCount, waiting for master to update. - */ - void slaveUpdate(real learnRate); - - /** - * When asynchronous training, update strategy including slave and master. - * - * master: it only polls slaves, do not training data. - * If slave's gradient is ready, fetch it. - * Update master's parameter, then copy it into - * corresponding slave. - */ - bool masterUpdate(ParallelParameterPtr slaveParam, - const UpdateCallback& callback); - -private: - /** - * When asynchronous training, every aysnc trainer needs to - * accumulate a number of batch gradient. - * - * gradientAccum_ is used to save the sum of gradients. - */ - VectorPtr gradientAccum_; - - /// Asynchronous count. - int asyncCount_; - /// Accumulate counter of current gradients. - int accumCounter_; -}; - -typedef std::map ParallelParameterMap; - -} // namespace paddle diff --git a/paddle/parameter/Parameter.cpp b/paddle/parameter/Parameter.cpp index 1ccded818796798105a889df978618688b56ed36..b8efabbe2a0b54edec64f6cee62b44c76ca7bf10 100644 --- a/paddle/parameter/Parameter.cpp +++ b/paddle/parameter/Parameter.cpp @@ -271,55 +271,6 @@ SparsePrefetchRowCpuMatrix* Parameter::getPrefetchMatrix() { return nullptr; } -void Parameter::updateWithGradient(real learningRate) { - sgdUpdate(learningRate * config_.learning_rate(), - config_.momentum(), - config_.decay_rate(), - bufs_[PARAMETER_VALUE].get(), - bufs_[PARAMETER_GRADIENT].get(), - bufs_[PARAMETER_MOMENTUM].get()); -} - -void Parameter::updateWithGradient(real learningRate, - MatrixPtr gradMat, - IVectorPtr t0, - int currentTime, - bool fini) { - SparseRowCpuMatrix* sparseMat = - dynamic_cast(gradMat.get()); - CHECK(sparseMat); - CHECK_EQ(config_.momentum(), 0.0f) - << "not support momentum in sparse input sgd"; - bool useL1 = (config_.decay_rate_l1() != 0.0f); - sparseMat->sgdUpdate(*bufs_[PARAMETER_VALUE], - *t0, - learningRate * config_.learning_rate(), - currentTime, - useL1 ? config_.decay_rate_l1() : config_.decay_rate(), - useL1, - fini); -} - -void Parameter::updateWithGradient(real learningRate, - VectorPtr gradVec, - bool normalUpdate) { - if (normalUpdate) { - sgdUpdate(learningRate * config_.learning_rate(), - config_.momentum(), - config_.decay_rate(), - bufs_[PARAMETER_VALUE].get(), - gradVec.get(), - bufs_[PARAMETER_MOMENTUM].get()); - } else { - size_t size = gradVec->getSize(); - real* mom = bufs_[PARAMETER_MOMENTUM]->getData(); - real* grad = gradVec->getData(); - real* value = bufs_[PARAMETER_VALUE]->getData(); - hl_matrix_add(mom, grad, mom, 1, size, 1.0f, learningRate); - hl_matrix_add(value, grad, value, 1, size, 1.0f, learningRate); - } -} - void Parameter::incUpdate(const UpdateCallback& callback) { // Static parameter is fixed, and does not need to be updated if (isStatic()) { diff --git a/paddle/parameter/Parameter.h b/paddle/parameter/Parameter.h index 72c8336799133ad3f5855b0c1aa06639179ff70a..36d2b65f3bd1056a4ac6a1029000fe4cce6420ce 100644 --- a/paddle/parameter/Parameter.h +++ b/paddle/parameter/Parameter.h @@ -223,29 +223,6 @@ public: bool isValueUpdated() const { return updated_; } - /** - * Update bufs_[PARAMETER_VALUE] using bufs_[PARAMETER_GRADIENT] - */ - void updateWithGradient(real learningRate); - - /** - * Update bufs_[PARAMETER_VALUE] using sparse row grad matrix. - * - * @see SparseRowCpuMatrix::sgdUpdate for more information. - */ - void updateWithGradient(real learningRate, - MatrixPtr gradMat, - IVectorPtr t0, - int currentTime, - bool fini = false); - - /** - * This function is used to calculate multiple gpus, but only as a candidate - */ - void updateWithGradient(real learningRate, - VectorPtr grad, - bool normalUpdate = true); - /** * Save parameter value to a file */