提交 0df032a6 编写于 作者: Q qijun

Merge remote-tracking branch 'baidu/develop' into feature/complete_howto_write_docs_cn

import gzip
import math import math
import paddle.v2 as paddle import paddle.v2 as paddle
dictsize = 1953
embsize = 32 embsize = 32
hiddensize = 256 hiddensize = 256
N = 5 N = 5
def wordemb(inlayer): def wordemb(inlayer):
wordemb = paddle.layer.table_projection( wordemb = paddle.layer.embedding(
input=inlayer, input=inlayer,
size=embsize, size=embsize,
param_attr=paddle.attr.Param( param_attr=paddle.attr.Param(
name="_proj", name="_proj",
initial_std=0.001, initial_std=0.001,
learning_rate=1, learning_rate=1,
l2_rate=0, )) l2_rate=0,
sparse_update=True))
return wordemb return wordemb
def main(): def main():
# for local training
cluster_train = False
if not cluster_train:
paddle.init(use_gpu=False, trainer_count=1) paddle.init(use_gpu=False, trainer_count=1)
else:
paddle.init(
use_gpu=False,
trainer_count=2,
port=7164,
ports_num=1,
ports_num_for_sparse=1,
num_gradient_servers=1)
word_dict = paddle.dataset.imikolov.build_dict() word_dict = paddle.dataset.imikolov.build_dict()
dict_size = len(word_dict) dict_size = len(word_dict)
firstword = paddle.layer.data( firstword = paddle.layer.data(
...@@ -57,6 +70,9 @@ def main(): ...@@ -57,6 +70,9 @@ def main():
def event_handler(event): def event_handler(event):
if isinstance(event, paddle.event.EndIteration): if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 100 == 0: if event.batch_id % 100 == 0:
with gzip.open("batch-" + str(event.batch_id) + ".tar.gz",
'w') as f:
trainer.save_parameter_to_tar(f)
result = trainer.test( result = trainer.test(
paddle.batch( paddle.batch(
paddle.dataset.imikolov.test(word_dict, N), 32)) paddle.dataset.imikolov.test(word_dict, N), 32))
...@@ -65,11 +81,15 @@ def main(): ...@@ -65,11 +81,15 @@ def main():
result.metrics) result.metrics)
cost = paddle.layer.classification_cost(input=predictword, label=nextword) cost = paddle.layer.classification_cost(input=predictword, label=nextword)
parameters = paddle.parameters.create(cost) parameters = paddle.parameters.create(cost)
adam_optimizer = paddle.optimizer.Adam( adagrad = paddle.optimizer.AdaGrad(
learning_rate=3e-3, learning_rate=3e-3,
regularization=paddle.optimizer.L2Regularization(8e-4)) regularization=paddle.optimizer.L2Regularization(8e-4))
trainer = paddle.trainer.SGD(cost, parameters, adam_optimizer) trainer = paddle.trainer.SGD(cost,
parameters,
adagrad,
is_local=not cluster_train)
trainer.train( trainer.train(
paddle.batch(paddle.dataset.imikolov.train(word_dict, N), 32), paddle.batch(paddle.dataset.imikolov.train(word_dict, N), 32),
num_passes=30, num_passes=30,
......
...@@ -19,6 +19,7 @@ limitations under the License. */ ...@@ -19,6 +19,7 @@ limitations under the License. */
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <vector> #include <vector>
#include "paddle/gserver/gradientmachines/GradientMachine.h"
#include "paddle/utils/Common.h" #include "paddle/utils/Common.h"
#include "paddle/utils/GlobalConstants.h" #include "paddle/utils/GlobalConstants.h"
...@@ -468,8 +469,10 @@ private: ...@@ -468,8 +469,10 @@ private:
}; };
enum GradientMatchineCreateMode { enum GradientMatchineCreateMode {
CREATE_MODE_NORMAL = 0, CREATE_MODE_NORMAL = paddle::GradientMachine::kNormal,
CREATE_MODE_TESTING = 4 CREATE_MODE_SGD_SPARSE_CPU_TRAINING =
paddle::GradientMachine::kSgdSparseCpuTraining,
CREATE_MODE_TESTING = paddle::GradientMachine::kTesting
}; };
struct ParameterConfigPrivate; struct ParameterConfigPrivate;
...@@ -817,7 +820,8 @@ private: ...@@ -817,7 +820,8 @@ private:
public: public:
static ParameterUpdater* createLocalUpdater(OptimizationConfig* config); static ParameterUpdater* createLocalUpdater(OptimizationConfig* config);
static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config, static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config,
int passCount); int passCount,
bool useSparseUpdater);
~ParameterUpdater(); ~ParameterUpdater();
/** /**
...@@ -855,6 +859,13 @@ public: ...@@ -855,6 +859,13 @@ public:
*/ */
void update(Parameter* param); void update(Parameter* param);
/**
* @breif only get required sparse rows by default.
* @param fullSize: get full matrix parameter if *fullSize* set
* @param apply: get PARAMETER_APPLY on pserver if *apply* set
*/
void getParametersRemote(bool fullSize = false, bool apply = false);
/** /**
* @brief restore the average parameter. * @brief restore the average parameter.
* @note It is only used in AverageOptimizer. Restore will get the current * @note It is only used in AverageOptimizer. Restore will get the current
......
...@@ -29,10 +29,22 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater( ...@@ -29,10 +29,22 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater(
} }
ParameterUpdater *ParameterUpdater::createRemoteUpdater( ParameterUpdater *ParameterUpdater::createRemoteUpdater(
OptimizationConfig *config, int passCount) { OptimizationConfig *config, int passCount, bool useSparseUpdater) {
auto updater = new ParameterUpdater(); auto updater = new ParameterUpdater();
updater->m->updater.reset(new paddle::RemoteParameterUpdater( auto remoteUpdater = new paddle::RemoteParameterUpdater(
config->m->getConfig(), passCount, nullptr)); config->m->getConfig(), passCount, nullptr);
if (useSparseUpdater) {
std::unique_ptr<paddle::ParameterUpdater> remoteUpdaterPtr(remoteUpdater);
auto sparseRemoteUpdater =
new paddle::SparseRemoteParameterUpdaterComposite(
config->m->getConfig(),
passCount,
false,
std::move(remoteUpdaterPtr));
updater->m->updater.reset(sparseRemoteUpdater);
} else {
updater->m->updater.reset(remoteUpdater);
}
return updater; return updater;
} }
...@@ -59,6 +71,10 @@ void ParameterUpdater::update(Parameter *param) { ...@@ -59,6 +71,10 @@ void ParameterUpdater::update(Parameter *param) {
m->updater->update(paddleParam); m->updater->update(paddleParam);
} }
void ParameterUpdater::getParametersRemote(bool fullSize, bool apply) {
m->updater->getParametersRemote(fullSize, apply);
}
void ParameterUpdater::restore() { m->updater->restore(); } void ParameterUpdater::restore() { m->updater->restore(); }
void ParameterUpdater::apply() { m->updater->apply(); } void ParameterUpdater::apply() { m->updater->apply(); }
......
...@@ -518,7 +518,7 @@ void TrainerThread::computeThread() { ...@@ -518,7 +518,7 @@ void TrainerThread::computeThread() {
backward(); backward();
break; break;
case MultiGradientMachine::TASK_COPY_IN_ARGS: case MultiGradientMachine::TASK_COPY_IN_ARGS:
copyInArgs(); batchSize_ = copyInArgs();
inArgsCopied_ = true; inArgsCopied_ = true;
multiMachine_->waitForCopyInArgs(); multiMachine_->waitForCopyInArgs();
break; break;
......
...@@ -5,6 +5,6 @@ last_first_seq test_expand_layer test_ntm_layers test_hsigmoid ...@@ -5,6 +5,6 @@ last_first_seq test_expand_layer test_ntm_layers test_hsigmoid
img_layers img_trans_layers util_layers simple_rnn_layers unused_layers test_cost_layers img_layers img_trans_layers util_layers simple_rnn_layers unused_layers test_cost_layers
test_rnn_group shared_fc shared_lstm shared_gru test_cost_layers_with_weight test_rnn_group shared_fc shared_lstm shared_gru test_cost_layers_with_weight
test_spp_layer test_bilinear_interp test_maxout test_bi_grumemory math_ops test_spp_layer test_bilinear_interp test_maxout test_bi_grumemory math_ops
test_seq_concat_reshape) test_seq_concat_reshape test_pad)
export whole_configs=(test_split_datasource) export whole_configs=(test_split_datasource)
...@@ -11,6 +11,9 @@ for conf in ${configs[*]} ...@@ -11,6 +11,9 @@ for conf in ${configs[*]}
do do
echo "Generating " $conf echo "Generating " $conf
$1 -m paddle.utils.dump_config $conf.py > $protostr/$conf.protostr.unittest $1 -m paddle.utils.dump_config $conf.py > $protostr/$conf.protostr.unittest
if [ ! -f "$protostr/$conf.protostr" ]; then
cp $protostr/$conf.protostr.unittest $protostr/$conf.protostr
fi
cat ${conf}.py |$1 test_config_parser_for_non_file_config.py > $protostr/$conf.protostr.non_file_config.unittest cat ${conf}.py |$1 test_config_parser_for_non_file_config.py > $protostr/$conf.protostr.non_file_config.unittest
done done
...@@ -18,5 +21,8 @@ for conf in ${whole_configs[*]} ...@@ -18,5 +21,8 @@ for conf in ${whole_configs[*]}
do do
echo "Generating " $conf echo "Generating " $conf
$1 -m paddle.utils.dump_config $conf.py "" --whole > $protostr/$conf.protostr.unittest $1 -m paddle.utils.dump_config $conf.py "" --whole > $protostr/$conf.protostr.unittest
if [ ! -f "$protostr/$conf.protostr" ]; then
cp $protostr/$conf.protostr.unittest $protostr/$conf.protostr
fi
cat ${conf}.py |$1 test_config_parser_for_non_file_config.py --whole > $protostr/$conf.protostr.non_file_config.unittest cat ${conf}.py |$1 test_config_parser_for_non_file_config.py --whole > $protostr/$conf.protostr.non_file_config.unittest
done done
type: "nn"
layers {
name: "data"
type: "data"
size: 2016
active_type: ""
height: 48
width: 42
}
layers {
name: "__conv_0__"
type: "exconv"
size: 32256
active_type: ""
inputs {
input_layer_name: "data"
input_parameter_name: "___conv_0__.w0"
conv_conf {
filter_size: 3
channels: 1
stride: 1
padding: 1
groups: 1
filter_channels: 1
output_x: 42
img_size: 42
caffe_mode: true
filter_size_y: 3
padding_y: 1
stride_y: 1
output_y: 48
img_size_y: 48
}
}
bias_parameter_name: "___conv_0__.wbias"
num_filters: 16
shared_biases: true
height: 48
width: 42
}
layers {
name: "__pool_0__"
type: "pool"
size: 8064
active_type: ""
inputs {
input_layer_name: "__conv_0__"
pool_conf {
pool_type: "max-projection"
channels: 16
size_x: 2
stride: 2
output_x: 21
img_size: 42
padding: 0
size_y: 2
stride_y: 2
output_y: 24
img_size_y: 48
padding_y: 0
}
}
height: 24
width: 21
}
layers {
name: "__pad_0__"
type: "pad"
size: 14175
active_type: ""
inputs {
input_layer_name: "__pool_0__"
pad_conf {
image_conf {
channels: 16
img_size: 21
img_size_y: 24
}
pad_c: 2
pad_c: 3
pad_h: 1
pad_h: 2
pad_w: 3
pad_w: 1
}
}
height: 27
width: 25
}
parameters {
name: "___conv_0__.w0"
size: 144
initial_mean: 0.0
initial_std: 0.471404520791
initial_strategy: 0
initial_smart: false
}
parameters {
name: "___conv_0__.wbias"
size: 16
initial_mean: 0.0
initial_std: 0.0
dims: 16
dims: 1
initial_strategy: 0
initial_smart: false
}
input_layer_names: "data"
output_layer_names: "__pad_0__"
sub_models {
name: "root"
layer_names: "data"
layer_names: "__conv_0__"
layer_names: "__pool_0__"
layer_names: "__pad_0__"
input_layer_names: "data"
output_layer_names: "__pad_0__"
is_recurrent_layer_group: false
}
...@@ -2,7 +2,7 @@ from paddle.trainer_config_helpers import * ...@@ -2,7 +2,7 @@ from paddle.trainer_config_helpers import *
settings(batch_size=1000, learning_rate=1e-5) settings(batch_size=1000, learning_rate=1e-5)
data = data_layer(name='data', size=2304, height=48, width=42) data = data_layer(name='data', size=2016, height=48, width=42)
conv = img_conv_layer( conv = img_conv_layer(
input=data, input=data,
...@@ -13,8 +13,7 @@ conv = img_conv_layer( ...@@ -13,8 +13,7 @@ conv = img_conv_layer(
act=LinearActivation(), act=LinearActivation(),
bias_attr=True) bias_attr=True)
pool = img_pool_layer( pool = img_pool_layer(input=conv, pool_size=2, stride=2, pool_type=MaxPooling())
input=conv, num_channels=8, pool_size=2, stride=2, pool_type=MaxPooling())
pad = pad_layer(input=pool, pad_c=[2, 3], pad_h=[1, 2], pad_w=[3, 1]) pad = pad_layer(input=pool, pad_c=[2, 3], pad_h=[1, 2], pad_w=[3, 1])
......
...@@ -38,12 +38,35 @@ class Optimizer(object): ...@@ -38,12 +38,35 @@ class Optimizer(object):
assert isinstance(tmp, swig_api.ParameterOptimizer) assert isinstance(tmp, swig_api.ParameterOptimizer)
return tmp.getParameterTypes() return tmp.getParameterTypes()
def create_local_updater(self): def __create_local_updater__(self):
return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__) return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__)
def create_remote_updater(self, pass_num): def __create_remote_updater__(self, pass_num, use_sparse_updater):
return swig_api.ParameterUpdater.createRemoteUpdater(self.__opt_conf__, return swig_api.ParameterUpdater.createRemoteUpdater(
pass_num) self.__opt_conf__, pass_num, use_sparse_updater)
def create_updater(self, is_local, num_passes, use_sparse_updater):
"""
create proper parameter_updater by configuration.
:param is_local: create local or remote parameter updater
:param num_passes: remote parameter updater will use this to config
parameter server.
:param use_sparse_updater: when use remote updater, if some parameter is
sparse, updater should do some extra thing:
.. code-block:: python
if use_sparse_remote_updater:
gradient_machine.prefetch(in_args)
parameter_updater.getParametersRemote()
:return: parameter_updater
"""
if is_local:
parameter_updater = self.__create_local_updater__()
else:
parameter_updater = self.__create_remote_updater__(
num_passes, use_sparse_updater)
return parameter_updater
class Momentum(Optimizer): class Momentum(Optimizer):
......
...@@ -73,6 +73,18 @@ class Topology(object): ...@@ -73,6 +73,18 @@ class Topology(object):
assert isinstance(self.__model_config__, ModelConfig) assert isinstance(self.__model_config__, ModelConfig)
def use_sparse_updater(self):
"""
check if any parameter require to use sparse_update
:return:
"""
use_sparse = False
for parameter in self.__model_config__.parameters:
if parameter.sparse_update or parameter.sparse_remote_update:
use_sparse = True
break
return use_sparse
def proto(self): def proto(self):
return self.__model_config__ return self.__model_config__
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
Module Trainer Module Trainer
""" """
import collections import collections
import gzip
import os
import py_paddle.swig_paddle as api import py_paddle.swig_paddle as api
...@@ -42,7 +44,12 @@ class SGD(object): ...@@ -42,7 +44,12 @@ class SGD(object):
:type extra_layers: paddle.v2.config_base.Layer :type extra_layers: paddle.v2.config_base.Layer
""" """
def __init__(self, cost, parameters, update_equation, extra_layers=None): def __init__(self,
cost,
parameters,
update_equation,
extra_layers=None,
is_local=True):
if not isinstance(parameters, v2_parameters.Parameters): if not isinstance(parameters, v2_parameters.Parameters):
raise TypeError('parameters should be parameters') raise TypeError('parameters should be parameters')
...@@ -55,20 +62,48 @@ class SGD(object): ...@@ -55,20 +62,48 @@ class SGD(object):
self.__topology__ = topology self.__topology__ = topology
self.__parameters__ = parameters self.__parameters__ = parameters
self.__topology_in_proto__ = topology.proto() self.__topology_in_proto__ = topology.proto()
self.__is_local__ = is_local
# In local mode, disable sparse_remote_update. self.__use_sparse_updater__ = self.__topology__.use_sparse_updater()
# # In local mode, disable sparse_remote_update.
if is_local:
for param in self.__topology_in_proto__.parameters: for param in self.__topology_in_proto__.parameters:
if param.sparse_remote_update: if param.sparse_remote_update:
param.sparse_remote_update = False param.sparse_remote_update = False
self.__gm_create_mode__ = api.CREATE_MODE_NORMAL if not \
self.__use_sparse_updater__ else api.CREATE_MODE_SGD_SPARSE_CPU_TRAINING
self.__data_types__ = topology.data_type() self.__data_types__ = topology.data_type()
gm = api.GradientMachine.createFromConfigProto( gm = api.GradientMachine.createFromConfigProto(
self.__topology_in_proto__, api.CREATE_MODE_NORMAL, self.__topology_in_proto__, self.__gm_create_mode__,
self.__optimizer__.enable_types()) self.__optimizer__.enable_types())
assert isinstance(gm, api.GradientMachine) assert isinstance(gm, api.GradientMachine)
self.__gradient_machine__ = gm self.__gradient_machine__ = gm
self.__gradient_machine__.randParameters() self.__gradient_machine__.randParameters()
parameters.append_gradient_machine(gm) self.__parameters__.append_gradient_machine(gm)
self.__parameter_updater__ = None
def __use_remote_sparse_updater__(self):
return self.__use_sparse_updater__ and not self.__is_local__
def __prepare_parameter__(self, in_args):
"""
prepare parameter before forward backward.
1. When use remote sparse updater, parameters should be got
from ps according to input arguments.
:param in_args: input arguments of this batch.
:return:
"""
if self.__use_remote_sparse_updater__():
self.__gradient_machine__.prefetch(in_args)
self.__parameter_updater__.getParametersRemote()
def save_parameter_to_tar(self, f):
self.__parameter_updater__.catchUpWith()
self.__parameter_updater__.apply()
self.__parameter_updater__.getParametersRemote(True, True)
self.__parameters__.to_tar(f)
self.__parameter_updater__.restore()
def train(self, reader, num_passes=1, event_handler=None, feeding=None): def train(self, reader, num_passes=1, event_handler=None, feeding=None):
""" """
...@@ -90,8 +125,9 @@ class SGD(object): ...@@ -90,8 +125,9 @@ class SGD(object):
event_handler = default_event_handler event_handler = default_event_handler
__check_train_args__(**locals()) __check_train_args__(**locals())
updater = self.__optimizer__.create_local_updater() self.__parameter_updater__ = self.__optimizer__.create_updater(
updater.init(self.__gradient_machine__) self.__is_local__, num_passes, self.__use_sparse_updater__)
self.__parameter_updater__.init(self.__gradient_machine__)
self.__gradient_machine__.start() self.__gradient_machine__.start()
batch_evaluator = self.__gradient_machine__.makeEvaluator() batch_evaluator = self.__gradient_machine__.makeEvaluator()
...@@ -103,23 +139,26 @@ class SGD(object): ...@@ -103,23 +139,26 @@ class SGD(object):
for pass_id in xrange(num_passes): for pass_id in xrange(num_passes):
event_handler(v2_event.BeginPass(pass_id)) event_handler(v2_event.BeginPass(pass_id))
pass_evaluator.start() pass_evaluator.start()
updater.startPass() self.__parameter_updater__.startPass()
for batch_id, data_batch in enumerate(reader()): for batch_id, data_batch in enumerate(reader()):
batch_evaluator.start() batch_evaluator.start()
event_handler( event_handler(
v2_event.BeginIteration( v2_event.BeginIteration(
pass_id=pass_id, batch_id=batch_id)) pass_id=pass_id, batch_id=batch_id))
pass_type = updater.startBatch(len(data_batch)) pass_type = self.__parameter_updater__.startBatch(
self.__gradient_machine__.forwardBackward( len(data_batch))
feeder(data_batch), out_args, pass_type) in_args = feeder(data_batch)
self.__prepare_parameter__(in_args)
self.__gradient_machine__.forwardBackward(in_args, out_args,
pass_type)
self.__gradient_machine__.eval(pass_evaluator) self.__gradient_machine__.eval(pass_evaluator)
self.__gradient_machine__.eval(batch_evaluator) self.__gradient_machine__.eval(batch_evaluator)
for each_param in self.__gradient_machine__.getNonStaticParameters( for each_param in self.__gradient_machine__.getNonStaticParameters(
): ):
updater.update(each_param) self.__parameter_updater__.update(each_param)
cost_sum = out_args.sum() cost_sum = out_args.sum()
cost = cost_sum / len(data_batch) cost = cost_sum / len(data_batch)
updater.finishBatch(cost) self.__parameter_updater__.finishBatch(cost)
batch_evaluator.finish() batch_evaluator.finish()
event_handler( event_handler(
v2_event.EndIteration( v2_event.EndIteration(
...@@ -128,7 +167,7 @@ class SGD(object): ...@@ -128,7 +167,7 @@ class SGD(object):
cost=cost, cost=cost,
evaluator=batch_evaluator)) evaluator=batch_evaluator))
updater.finishPass() self.__parameter_updater__.finishPass()
pass_evaluator.finish() pass_evaluator.finish()
event_handler(v2_event.EndPass(pass_id, evaluator=pass_evaluator)) event_handler(v2_event.EndPass(pass_id, evaluator=pass_evaluator))
self.__gradient_machine__.finish() self.__gradient_machine__.finish()
...@@ -152,8 +191,9 @@ class SGD(object): ...@@ -152,8 +191,9 @@ class SGD(object):
num_samples = 0.0 num_samples = 0.0
for data_batch in reader(): for data_batch in reader():
num_samples += len(data_batch) num_samples += len(data_batch)
self.__gradient_machine__.forward( in_args = feeder(data_batch)
feeder(data_batch), out_args, api.PASS_TEST) self.__prepare_parameter__(in_args)
self.__gradient_machine__.forward(in_args, out_args, api.PASS_TEST)
total_cost += out_args.sum() total_cost += out_args.sum()
self.__gradient_machine__.eval(evaluator) self.__gradient_machine__.eval(evaluator)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册