提交 da83d286 编写于 作者: Y Yu Yang 提交者: GitHub

Merge pull request #2271 from reyoung/feature/clean_parameter_functionalities

Remove not necessary functionalities in Parameter
......@@ -16,5 +16,6 @@ third_party/
*~
bazel-*
third_party/
# for clion
# clion workspace.
cmake-build-*
......@@ -758,7 +758,7 @@ public:
T p3); // decayRate
/// apply L1/L2 to *this*
void applyL1(T learningRate, T decayRate);
virtual void applyL1(T learningRate, T decayRate);
void applyL1(BaseMatrixT& lr, T learningRate, T decayRate);
void applyL2(T learningRate, T decayRate);
void applyL2(BaseMatrixT& lr, T learningRate, T decayRate);
......
......@@ -54,7 +54,7 @@ void SparseRowCpuMatrix::zeroMem() {
clearRows();
}
void SparseRowCpuMatrix::applyL1Decay(real learningRate, real decayRate) {
void SparseRowCpuMatrix::applyL1(real learningRate, real decayRate) {
apply([=](real* buf, size_t len) {
CpuVector value(0, nullptr);
value.subVecFrom(buf, 0, len);
......
......@@ -94,7 +94,7 @@ public:
/**
* apply L1 to all sparse rows, should be apply after indices ready.
*/
void applyL1Decay(real learningRate, real decayRate);
virtual void applyL1(real learningRate, real decayRate);
void clearIndices() { clearRows(); }
void zeroMemThread(size_t tid, size_t numThreads);
......
......@@ -20,6 +20,7 @@ limitations under the License. */
#include "OptimizerFunctions.h"
#include "OptimizerWithRegularizer.h"
#include "ParameterUpdateFunctions.h"
#include "ThreadLocalBuffer.h"
#include "hl_gpu.h"
#include "paddle/math/CpuSparseMatrix.h"
#include "paddle/math/MathUtils.h"
......@@ -262,15 +263,6 @@ void Parameter::setMat(ParameterType pType, int matType) {
}
}
SparsePrefetchRowCpuMatrix* Parameter::getPrefetchMatrix() {
MatrixPtr mat = mats_[PARAMETER_VALUE];
if (mat) {
return dynamic_cast<SparsePrefetchRowCpuMatrix*>(mat.get());
}
return nullptr;
}
void Parameter::incUpdate(const UpdateCallback& callback) {
// Static parameter is fixed, and does not need to be updated
if (isStatic()) {
......@@ -422,37 +414,4 @@ bool Parameter::load(std::istream& s) {
return true;
}
ThreadLocal<std::vector<VectorPtr>> Parameter::tlsTempBufs_;
VectorPtr* Parameter::getTlsTempBufs() {
std::vector<VectorPtr>& bufs = *tlsTempBufs_;
if (bufs.empty()) {
bufs.resize(NUM_PARAMETER_TYPES);
for (auto& vec : bufs) {
vec.reset(new CpuVector(0, nullptr));
}
}
return bufs.data();
}
void Parameter::exec(ExecFunc func) {
auto execFunc = [this, func](int tid, size_t numThreads) {
if (numThreads == 1) { // single thread
func(this->getBufs());
} else { // multi thread
VectorPtr* vecs = Parameter::getTlsTempBufs();
auto interval = calcSplitArrayInterval(
this->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/);
for (size_t i = 0; i < (size_t)NUM_PARAMETER_TYPES; ++i) {
if (bufs_[i]) {
vecs[i]->subVecFrom(*bufs_[i], interval);
}
}
func(vecs);
}
};
getBuf(PARAMETER_VALUE)->exec(execFunc);
}
} // namespace paddle
......@@ -40,17 +40,6 @@ class Parameter;
typedef std::function<void(Parameter* param)> UpdateCallback;
typedef std::function<void(int paramId, Parameter* param)> ParamInitCallback;
struct Segment {
int64_t beginDim;
int64_t endDim;
// We allow the possibility that the parameters are not stored at contiguous
// memory locations for speed reason (i.e. data alignemnt)
// This means that the dimenstion is not same as the position in the memroy
// buffer.
int64_t beginPos; // beginning position in the local value or grad buffer
};
class Parameter;
typedef std::shared_ptr<Parameter> ParameterPtr;
......@@ -167,13 +156,6 @@ public:
}
}
void enableSharedType(ParameterType type, VectorPtr vec, MatType matType) {
if (!bufs_[type]) {
bufs_[type] = vec;
setMat(type, matType);
}
}
/// for batchGradientMachine: blockNum is number of partitions of the matrix.
bool isGradShared(size_t* blockNum = NULL);
......@@ -203,20 +185,6 @@ public:
const MatrixPtr& getMat(ParameterType pType) const { return mats_[pType]; }
const IVectorPtr& getIntBuf(ParameterType pType) { return intBufs_[pType]; }
void setIntBuf(ParameterType pType, const IVectorPtr& iVec) {
intBufs_[pType] = iVec;
}
SparsePrefetchRowCpuMatrix* getPrefetchMatrix();
float getLearnRate() const { return config_.learning_rate(); }
float getInitMean() const { return config_.initial_mean(); }
float getInitStandardDeviation() const { return config_.initial_std(); }
void setValueUpdated() { updated_ = true; }
void clearValueUpdated() { updated_ = false; }
......@@ -243,8 +211,6 @@ public:
*/
bool load(std::istream& is);
std::vector<Segment>& getGradientSegments() { return gradSegments_; }
void incShared() { sharedCount_++; }
/**
......@@ -351,35 +317,21 @@ protected:
int sharedCount_;
int updateCounter_;
std::vector<Segment> gradSegments_; // segments of non-zero gradient
bool updated_;
SparseFormat format_;
static ThreadLocal<std::vector<VectorPtr>> tlsTempBufs_;
std::vector<std::shared_ptr<IParameterUpdaterHook>> updaterHooks_;
public:
void setSharedCount(int cnt) { sharedCount_ = cnt; }
int getSharedCount() { return sharedCount_; }
void singleUpdate(void* data);
bool isSparse() { return config_.is_sparse(); }
SparseFormat getFormat() { return format_; }
static const std::string kMissParameterFail;
static const std::string kMissParameterRand;
static const std::string kMissParameterZero;
static VectorPtr* getTlsTempBufs();
/**
* exec a func in single/multi thread.
* vecs is bufs_ of Parameter, as input of ExecFunc.
*/
typedef std::function<void(const VectorPtr vecs[])> ExecFunc;
void exec(ExecFunc func);
};
typedef std::map<std::string, ParameterPtr> ParameterMap;
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "ThreadLocalBuffer.h"
#include "Parameter.h"
namespace paddle {
namespace parameter {
static ThreadLocal<std::vector<VectorPtr>> tlsTempBufs_;
VectorPtr* getThreadLocalBuffer() {
std::vector<VectorPtr>& bufs = *tlsTempBufs_;
if (bufs.empty()) {
bufs.resize(NUM_PARAMETER_TYPES);
for (auto& vec : bufs) {
vec.reset(new CpuVector(0, nullptr));
}
}
return bufs.data();
}
} // namespace parameter
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include "paddle/math/Vector.h"
namespace paddle {
namespace parameter {
extern VectorPtr* getThreadLocalBuffer();
} // namespace parameter
} // namespace paddle
......@@ -243,7 +243,8 @@ void ParameterClient2::prepareSendData(
CHECK_GE(blockSize, 1LU) << "blockSize should > 0 " << blockSize;
const auto paraSize = parameter->getSize();
if (sparseUpdate) {
const auto prefetchMat = parameter->getPrefetchMatrix();
auto prefetchMat = std::dynamic_pointer_cast<SparsePrefetchRowCpuMatrix>(
parameter->getMat(PARAMETER_VALUE));
CHECK(prefetchMat != nullptr) << "prefetchMat is nullptr";
auto sendMat = dynamic_cast<SparseRowCpuMatrix*>(
parameter->getMat(parameterType).get());
......
......@@ -18,7 +18,6 @@ limitations under the License. */
#include <fstream>
#include "paddle/math/SIMDFunctions.h"
#include "paddle/parameter/AverageOptimizer.h"
#include "paddle/parameter/FirstOrderOptimizer.h"
#include "paddle/parameter/OptimizerFunctions.h"
......@@ -26,6 +25,7 @@ limitations under the License. */
#include "paddle/parameter/ParameterOptimizer.h"
#include "paddle/parameter/ParameterUpdateFunctions.h"
#include "paddle/parameter/Regularizer.h"
#include "paddle/parameter/ThreadLocalBuffer.h"
#include "paddle/utils/Flags.h"
#include "paddle/utils/GlobalConstants.h"
#include "paddle/utils/Stat.h"
......@@ -618,7 +618,7 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request,
bool commitGradient = asyncGrdientCommitCheckAndStat(request);
VectorPtr* vecs = Parameter::getTlsTempBufs();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
size_t bufferIndex = 0;
for (const auto& block : request.blocks()) {
int64_t offset = getBlockOffset(block);
......@@ -1051,15 +1051,15 @@ void ParameterServer2::clearUnusedSegments(CpuVector* vec) {
}
void ParameterServer2::parallelExecForEachBlock(ExecFunc func) {
SyncThreadPool::execHelper(syncThreadPool_.get(),
[&](int tid, size_t numThreads) {
int64_t numBlocks = blockIdMap_.size();
VectorPtr* vecs = Parameter::getTlsTempBufs();
for (int64_t blockId = tid; blockId < numBlocks;
blockId += numThreads) {
func(blockId, vecs);
}
});
SyncThreadPool::execHelper(
syncThreadPool_.get(), [&](int tid, size_t numThreads) {
int64_t numBlocks = blockIdMap_.size();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
for (int64_t blockId = tid; blockId < numBlocks;
blockId += numThreads) {
func(blockId, vecs);
}
});
}
void ParameterServer2::blockTraverse(
......
......@@ -747,28 +747,32 @@ void SparseRemoteParameterUpdater::getParametersRemote(bool fullSize,
bool apply) {
ParameterType sendBackParameterType =
(useApplyInPserver_ && apply) ? PARAMETER_APPLY : PARAMETER_VALUE;
std::function<void()> getParams;
std::function<void(Parameter&, real)> applyL1;
if (fullSize) {
parameterClient_->getParameter(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
if (config_.shrink_parameter_value() > 0) {
for (auto& para : parameters_) {
if (para->getConfig().decay_rate_l1() > 0) {
para->getBuf(PARAMETER_VALUE)
->applyL1(1.0f, // learningRate
config_.shrink_parameter_value()); // decayRate
}
}
}
getParams = [&] {
parameterClient_->getParameter(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
};
applyL1 = [](Parameter& para, real decayRate) {
para.getBuf(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate);
};
} else {
REGISTER_TIMER("getParamSparse");
parameterClient_->getParameterSparse(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
getParams = [&] {
parameterClient_->getParameterSparse(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
};
applyL1 = [](Parameter& para, real decayRate) {
para.getMat(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate);
};
}
{
REGISTER_TIMER("getParamDenseAndSparse");
getParams();
if (config_.shrink_parameter_value() > 0) {
for (auto& para : parameters_) {
if (para->getConfig().decay_rate_l1() > 0) {
para->getPrefetchMatrix()->applyL1Decay(
1.0f, // learningRate
config_.shrink_parameter_value()); // decayRate
applyL1(*para, config_.shrink_parameter_value());
}
}
}
......
......@@ -17,6 +17,7 @@ limitations under the License. */
#include "paddle/utils/Logging.h"
#include "paddle/math/SparseRowMatrix.h"
#include "paddle/parameter/ThreadLocalBuffer.h"
#include "paddle/utils/Thread.h"
DECLARE_int32(trainer_count);
......@@ -98,7 +99,7 @@ void SgdThreadUpdater::threadTraverse(
int tid,
size_t numThreads,
Parameter* para) {
VectorPtr* vecs = Parameter::getTlsTempBufs();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
if (para->isGradSparseUpdate()) {
size_t height = para->getConfig().dims(0);
size_t width = para->getConfig().dims(1);
......@@ -214,7 +215,7 @@ void SgdThreadUpdater::threadUpdateSparse(int tid,
Parameter* para) {
int pid = para->getID();
ParameterOptimizer* optimizer = optimizers_[pid].get();
VectorPtr* vecs = Parameter::getTlsTempBufs();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
size_t height = para->getConfig().dims(0);
size_t width = para->getConfig().dims(1);
......@@ -286,7 +287,7 @@ void SgdThreadUpdater::threadUpdateDense(int tid,
Parameter* para) {
int pid = para->getID();
ParameterOptimizer* optimizer = optimizers_[pid].get();
VectorPtr* vecs = Parameter::getTlsTempBufs();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
auto interval = calcSplitArrayInterval(
para->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册