diff --git a/paddle/math/BaseMatrix.h b/paddle/math/BaseMatrix.h index 6ed48c8d88ee698689de6f7a7f470b97a094ea5b..120d69f718b954925438fbd2119d69f0be13b3e9 100644 --- a/paddle/math/BaseMatrix.h +++ b/paddle/math/BaseMatrix.h @@ -758,7 +758,7 @@ public: T p3); // decayRate /// apply L1/L2 to *this* - void applyL1(T learningRate, T decayRate); + virtual void applyL1(T learningRate, T decayRate); void applyL1(BaseMatrixT& lr, T learningRate, T decayRate); void applyL2(T learningRate, T decayRate); void applyL2(BaseMatrixT& lr, T learningRate, T decayRate); diff --git a/paddle/math/SparseRowMatrix.cpp b/paddle/math/SparseRowMatrix.cpp index b8c781ca1fd46c9840817abe26a20eec005c37e9..b086433fe535225ad05453b7d13c3846f5ce3c2b 100644 --- a/paddle/math/SparseRowMatrix.cpp +++ b/paddle/math/SparseRowMatrix.cpp @@ -54,7 +54,7 @@ void SparseRowCpuMatrix::zeroMem() { clearRows(); } -void SparseRowCpuMatrix::applyL1Decay(real learningRate, real decayRate) { +void SparseRowCpuMatrix::applyL1(real learningRate, real decayRate) { apply([=](real* buf, size_t len) { CpuVector value(0, nullptr); value.subVecFrom(buf, 0, len); diff --git a/paddle/math/SparseRowMatrix.h b/paddle/math/SparseRowMatrix.h index 1ccbf97b25922ae52377d7048da3a07012d21003..8704eb038d5d42ca834d232c0a651e9ffb2b40f3 100644 --- a/paddle/math/SparseRowMatrix.h +++ b/paddle/math/SparseRowMatrix.h @@ -94,7 +94,7 @@ public: /** * apply L1 to all sparse rows, should be apply after indices ready. */ - void applyL1Decay(real learningRate, real decayRate); + virtual void applyL1(real learningRate, real decayRate); void clearIndices() { clearRows(); } void zeroMemThread(size_t tid, size_t numThreads); diff --git a/paddle/parameter/Parameter.cpp b/paddle/parameter/Parameter.cpp index b8efabbe2a0b54edec64f6cee62b44c76ca7bf10..ebe36d49376882fe4c1013e19dcf71f452b3e501 100644 --- a/paddle/parameter/Parameter.cpp +++ b/paddle/parameter/Parameter.cpp @@ -20,6 +20,7 @@ limitations under the License. */ #include "OptimizerFunctions.h" #include "OptimizerWithRegularizer.h" #include "ParameterUpdateFunctions.h" +#include "ThreadLocalBuffer.h" #include "hl_gpu.h" #include "paddle/math/CpuSparseMatrix.h" #include "paddle/math/MathUtils.h" @@ -262,15 +263,6 @@ void Parameter::setMat(ParameterType pType, int matType) { } } -SparsePrefetchRowCpuMatrix* Parameter::getPrefetchMatrix() { - MatrixPtr mat = mats_[PARAMETER_VALUE]; - if (mat) { - return dynamic_cast(mat.get()); - } - - return nullptr; -} - void Parameter::incUpdate(const UpdateCallback& callback) { // Static parameter is fixed, and does not need to be updated if (isStatic()) { @@ -422,37 +414,4 @@ bool Parameter::load(std::istream& s) { return true; } -ThreadLocal> Parameter::tlsTempBufs_; - -VectorPtr* Parameter::getTlsTempBufs() { - std::vector& bufs = *tlsTempBufs_; - if (bufs.empty()) { - bufs.resize(NUM_PARAMETER_TYPES); - for (auto& vec : bufs) { - vec.reset(new CpuVector(0, nullptr)); - } - } - return bufs.data(); -} - -void Parameter::exec(ExecFunc func) { - auto execFunc = [this, func](int tid, size_t numThreads) { - if (numThreads == 1) { // single thread - func(this->getBufs()); - } else { // multi thread - VectorPtr* vecs = Parameter::getTlsTempBufs(); - auto interval = calcSplitArrayInterval( - this->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/); - for (size_t i = 0; i < (size_t)NUM_PARAMETER_TYPES; ++i) { - if (bufs_[i]) { - vecs[i]->subVecFrom(*bufs_[i], interval); - } - } - func(vecs); - } - }; - - getBuf(PARAMETER_VALUE)->exec(execFunc); -} - } // namespace paddle diff --git a/paddle/parameter/Parameter.h b/paddle/parameter/Parameter.h index 36d2b65f3bd1056a4ac6a1029000fe4cce6420ce..eb88c862034bfc144d52299347e82026094d4306 100644 --- a/paddle/parameter/Parameter.h +++ b/paddle/parameter/Parameter.h @@ -209,14 +209,6 @@ public: intBufs_[pType] = iVec; } - SparsePrefetchRowCpuMatrix* getPrefetchMatrix(); - - float getLearnRate() const { return config_.learning_rate(); } - - float getInitMean() const { return config_.initial_mean(); } - - float getInitStandardDeviation() const { return config_.initial_std(); } - void setValueUpdated() { updated_ = true; } void clearValueUpdated() { updated_ = false; } @@ -356,8 +348,6 @@ protected: bool updated_; SparseFormat format_; - static ThreadLocal> tlsTempBufs_; - std::vector> updaterHooks_; public: @@ -371,15 +361,6 @@ public: static const std::string kMissParameterFail; static const std::string kMissParameterRand; static const std::string kMissParameterZero; - - static VectorPtr* getTlsTempBufs(); - - /** - * exec a func in single/multi thread. - * vecs is bufs_ of Parameter, as input of ExecFunc. - */ - typedef std::function ExecFunc; - void exec(ExecFunc func); }; typedef std::map ParameterMap; diff --git a/paddle/parameter/ThreadLocalBuffer.cpp b/paddle/parameter/ThreadLocalBuffer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b21dd15245cf7c3d0418d37e6e8925c9e906f482 --- /dev/null +++ b/paddle/parameter/ThreadLocalBuffer.cpp @@ -0,0 +1,35 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "ThreadLocalBuffer.h" +#include "Parameter.h" + +namespace paddle { +namespace parameter { + +static ThreadLocal> tlsTempBufs_; + +VectorPtr* getThreadLocalBuffer() { + std::vector& bufs = *tlsTempBufs_; + if (bufs.empty()) { + bufs.resize(NUM_PARAMETER_TYPES); + for (auto& vec : bufs) { + vec.reset(new CpuVector(0, nullptr)); + } + } + return bufs.data(); +} + +} // namespace parameter +} // namespace paddle diff --git a/paddle/parameter/ThreadLocalBuffer.h b/paddle/parameter/ThreadLocalBuffer.h new file mode 100644 index 0000000000000000000000000000000000000000..c916519c974a5bdeea407dcc1bc6d196756874ee --- /dev/null +++ b/paddle/parameter/ThreadLocalBuffer.h @@ -0,0 +1,22 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include "paddle/math/Vector.h" + +namespace paddle { +namespace parameter { +extern VectorPtr* getThreadLocalBuffer(); +} // namespace parameter +} // namespace paddle diff --git a/paddle/pserver/ParameterClient2.cpp b/paddle/pserver/ParameterClient2.cpp index a97859f83fe6495b298e920346c964ef2a9b146c..f7e391f76324a09c203dfbbb449feb050caa8fb4 100644 --- a/paddle/pserver/ParameterClient2.cpp +++ b/paddle/pserver/ParameterClient2.cpp @@ -243,7 +243,8 @@ void ParameterClient2::prepareSendData( CHECK_GE(blockSize, 1LU) << "blockSize should > 0 " << blockSize; const auto paraSize = parameter->getSize(); if (sparseUpdate) { - const auto prefetchMat = parameter->getPrefetchMatrix(); + auto prefetchMat = std::dynamic_pointer_cast( + parameter->getMat(PARAMETER_VALUE)); CHECK(prefetchMat != nullptr) << "prefetchMat is nullptr"; auto sendMat = dynamic_cast( parameter->getMat(parameterType).get()); diff --git a/paddle/pserver/ParameterServer2.cpp b/paddle/pserver/ParameterServer2.cpp index 19ff40ba7e9584f772043f939bcb31caf666163d..41ac15336d3150417da1cf1631319604584991ec 100644 --- a/paddle/pserver/ParameterServer2.cpp +++ b/paddle/pserver/ParameterServer2.cpp @@ -18,7 +18,6 @@ limitations under the License. */ #include #include "paddle/math/SIMDFunctions.h" - #include "paddle/parameter/AverageOptimizer.h" #include "paddle/parameter/FirstOrderOptimizer.h" #include "paddle/parameter/OptimizerFunctions.h" @@ -26,6 +25,7 @@ limitations under the License. */ #include "paddle/parameter/ParameterOptimizer.h" #include "paddle/parameter/ParameterUpdateFunctions.h" #include "paddle/parameter/Regularizer.h" +#include "paddle/parameter/ThreadLocalBuffer.h" #include "paddle/utils/Flags.h" #include "paddle/utils/GlobalConstants.h" #include "paddle/utils/Stat.h" @@ -618,7 +618,7 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request, bool commitGradient = asyncGrdientCommitCheckAndStat(request); - VectorPtr* vecs = Parameter::getTlsTempBufs(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); size_t bufferIndex = 0; for (const auto& block : request.blocks()) { int64_t offset = getBlockOffset(block); @@ -1051,15 +1051,15 @@ void ParameterServer2::clearUnusedSegments(CpuVector* vec) { } void ParameterServer2::parallelExecForEachBlock(ExecFunc func) { - SyncThreadPool::execHelper(syncThreadPool_.get(), - [&](int tid, size_t numThreads) { - int64_t numBlocks = blockIdMap_.size(); - VectorPtr* vecs = Parameter::getTlsTempBufs(); - for (int64_t blockId = tid; blockId < numBlocks; - blockId += numThreads) { - func(blockId, vecs); - } - }); + SyncThreadPool::execHelper( + syncThreadPool_.get(), [&](int tid, size_t numThreads) { + int64_t numBlocks = blockIdMap_.size(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); + for (int64_t blockId = tid; blockId < numBlocks; + blockId += numThreads) { + func(blockId, vecs); + } + }); } void ParameterServer2::blockTraverse( diff --git a/paddle/trainer/RemoteParameterUpdater.cpp b/paddle/trainer/RemoteParameterUpdater.cpp index 6939738203f41e0c1f7204d54834e34b2cd90682..7314266cb24da9b9e9f0f1cbe61ed363247f51fe 100644 --- a/paddle/trainer/RemoteParameterUpdater.cpp +++ b/paddle/trainer/RemoteParameterUpdater.cpp @@ -747,28 +747,32 @@ void SparseRemoteParameterUpdater::getParametersRemote(bool fullSize, bool apply) { ParameterType sendBackParameterType = (useApplyInPserver_ && apply) ? PARAMETER_APPLY : PARAMETER_VALUE; + std::function getParams; + std::function applyL1; if (fullSize) { - parameterClient_->getParameter( - /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); - if (config_.shrink_parameter_value() > 0) { - for (auto& para : parameters_) { - if (para->getConfig().decay_rate_l1() > 0) { - para->getBuf(PARAMETER_VALUE) - ->applyL1(1.0f, // learningRate - config_.shrink_parameter_value()); // decayRate - } - } - } + getParams = [&] { + parameterClient_->getParameter( + /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); + }; + applyL1 = [](Parameter& para, real decayRate) { + para.getBuf(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate); + }; } else { - REGISTER_TIMER("getParamSparse"); - parameterClient_->getParameterSparse( - /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); + getParams = [&] { + parameterClient_->getParameterSparse( + /* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType); + }; + applyL1 = [](Parameter& para, real decayRate) { + para.getMat(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate); + }; + } + { + REGISTER_TIMER("getParamDenseAndSparse"); + getParams(); if (config_.shrink_parameter_value() > 0) { for (auto& para : parameters_) { if (para->getConfig().decay_rate_l1() > 0) { - para->getPrefetchMatrix()->applyL1Decay( - 1.0f, // learningRate - config_.shrink_parameter_value()); // decayRate + applyL1(*para, config_.shrink_parameter_value()); } } } diff --git a/paddle/trainer/ThreadParameterUpdater.cpp b/paddle/trainer/ThreadParameterUpdater.cpp index 870d4a4b0246fe244bbd3796ec14449eb181aad2..3c85c3aaac68fc29da90c24d1208887a17009d5f 100644 --- a/paddle/trainer/ThreadParameterUpdater.cpp +++ b/paddle/trainer/ThreadParameterUpdater.cpp @@ -17,6 +17,7 @@ limitations under the License. */ #include "paddle/utils/Logging.h" #include "paddle/math/SparseRowMatrix.h" +#include "paddle/parameter/ThreadLocalBuffer.h" #include "paddle/utils/Thread.h" DECLARE_int32(trainer_count); @@ -98,7 +99,7 @@ void SgdThreadUpdater::threadTraverse( int tid, size_t numThreads, Parameter* para) { - VectorPtr* vecs = Parameter::getTlsTempBufs(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); if (para->isGradSparseUpdate()) { size_t height = para->getConfig().dims(0); size_t width = para->getConfig().dims(1); @@ -214,7 +215,7 @@ void SgdThreadUpdater::threadUpdateSparse(int tid, Parameter* para) { int pid = para->getID(); ParameterOptimizer* optimizer = optimizers_[pid].get(); - VectorPtr* vecs = Parameter::getTlsTempBufs(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); size_t height = para->getConfig().dims(0); size_t width = para->getConfig().dims(1); @@ -286,7 +287,7 @@ void SgdThreadUpdater::threadUpdateDense(int tid, Parameter* para) { int pid = para->getID(); ParameterOptimizer* optimizer = optimizers_[pid].get(); - VectorPtr* vecs = Parameter::getTlsTempBufs(); + VectorPtr* vecs = parameter::getThreadLocalBuffer(); auto interval = calcSplitArrayInterval( para->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/);