diff --git a/demo/word2vec/train_v2.py b/demo/word2vec/api_train_v2.py similarity index 76% rename from demo/word2vec/train_v2.py rename to demo/word2vec/api_train_v2.py index 7d952b446f9db432062fc3305a6b65b0ad66dd47..c0940f0e56eafa22f8aeb7052c0ddc79d8862917 100644 --- a/demo/word2vec/train_v2.py +++ b/demo/word2vec/api_train_v2.py @@ -1,27 +1,40 @@ +import gzip import math import paddle.v2 as paddle -dictsize = 1953 embsize = 32 hiddensize = 256 N = 5 def wordemb(inlayer): - wordemb = paddle.layer.table_projection( + wordemb = paddle.layer.embedding( input=inlayer, size=embsize, param_attr=paddle.attr.Param( name="_proj", initial_std=0.001, learning_rate=1, - l2_rate=0, )) + l2_rate=0, + sparse_update=True)) return wordemb def main(): - paddle.init(use_gpu=False, trainer_count=1) + # for local training + cluster_train = False + + if not cluster_train: + paddle.init(use_gpu=False, trainer_count=1) + else: + paddle.init( + use_gpu=False, + trainer_count=2, + port=7164, + ports_num=1, + ports_num_for_sparse=1, + num_gradient_servers=1) word_dict = paddle.dataset.imikolov.build_dict() dict_size = len(word_dict) firstword = paddle.layer.data( @@ -57,6 +70,9 @@ def main(): def event_handler(event): if isinstance(event, paddle.event.EndIteration): if event.batch_id % 100 == 0: + with gzip.open("batch-" + str(event.batch_id) + ".tar.gz", + 'w') as f: + trainer.save_parameter_to_tar(f) result = trainer.test( paddle.batch( paddle.dataset.imikolov.test(word_dict, N), 32)) @@ -65,11 +81,15 @@ def main(): result.metrics) cost = paddle.layer.classification_cost(input=predictword, label=nextword) + parameters = paddle.parameters.create(cost) - adam_optimizer = paddle.optimizer.Adam( + adagrad = paddle.optimizer.AdaGrad( learning_rate=3e-3, regularization=paddle.optimizer.L2Regularization(8e-4)) - trainer = paddle.trainer.SGD(cost, parameters, adam_optimizer) + trainer = paddle.trainer.SGD(cost, + parameters, + adagrad, + is_local=not cluster_train) trainer.train( paddle.batch(paddle.dataset.imikolov.train(word_dict, N), 32), num_passes=30, diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index c4f5dca26cc6a5e9fdd23ee27b594ced29a25c7a..d51204012171c9887acd5f578f913143182efe36 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -19,6 +19,7 @@ limitations under the License. */ #include #include #include +#include "paddle/gserver/gradientmachines/GradientMachine.h" #include "paddle/utils/Common.h" #include "paddle/utils/GlobalConstants.h" @@ -468,8 +469,10 @@ private: }; enum GradientMatchineCreateMode { - CREATE_MODE_NORMAL = 0, - CREATE_MODE_TESTING = 4 + CREATE_MODE_NORMAL = paddle::GradientMachine::kNormal, + CREATE_MODE_SGD_SPARSE_CPU_TRAINING = + paddle::GradientMachine::kSgdSparseCpuTraining, + CREATE_MODE_TESTING = paddle::GradientMachine::kTesting }; struct ParameterConfigPrivate; @@ -817,7 +820,8 @@ private: public: static ParameterUpdater* createLocalUpdater(OptimizationConfig* config); static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config, - int passCount); + int passCount, + bool useSparseUpdater); ~ParameterUpdater(); /** @@ -855,6 +859,13 @@ public: */ void update(Parameter* param); + /** + * @breif only get required sparse rows by default. + * @param fullSize: get full matrix parameter if *fullSize* set + * @param apply: get PARAMETER_APPLY on pserver if *apply* set + */ + void getParametersRemote(bool fullSize = false, bool apply = false); + /** * @brief restore the average parameter. * @note It is only used in AverageOptimizer. Restore will get the current diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index 75b0ae7cb6cc8c9ad0f8fe69963b7439a44bf55e..79921ea6e787f3c0ebecaad6a9a54bac92211320 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -29,10 +29,22 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater( } ParameterUpdater *ParameterUpdater::createRemoteUpdater( - OptimizationConfig *config, int passCount) { + OptimizationConfig *config, int passCount, bool useSparseUpdater) { auto updater = new ParameterUpdater(); - updater->m->updater.reset(new paddle::RemoteParameterUpdater( - config->m->getConfig(), passCount, nullptr)); + auto remoteUpdater = new paddle::RemoteParameterUpdater( + config->m->getConfig(), passCount, nullptr); + if (useSparseUpdater) { + std::unique_ptr remoteUpdaterPtr(remoteUpdater); + auto sparseRemoteUpdater = + new paddle::SparseRemoteParameterUpdaterComposite( + config->m->getConfig(), + passCount, + false, + std::move(remoteUpdaterPtr)); + updater->m->updater.reset(sparseRemoteUpdater); + } else { + updater->m->updater.reset(remoteUpdater); + } return updater; } @@ -59,6 +71,10 @@ void ParameterUpdater::update(Parameter *param) { m->updater->update(paddleParam); } +void ParameterUpdater::getParametersRemote(bool fullSize, bool apply) { + m->updater->getParametersRemote(fullSize, apply); +} + void ParameterUpdater::restore() { m->updater->restore(); } void ParameterUpdater::apply() { m->updater->apply(); } diff --git a/paddle/gserver/gradientmachines/MultiGradientMachine.cpp b/paddle/gserver/gradientmachines/MultiGradientMachine.cpp index 6ae60102b3e431727c0954e8b8073bfe0534f8ee..3159026e6b92355ba7480b09535388c969a504e2 100644 --- a/paddle/gserver/gradientmachines/MultiGradientMachine.cpp +++ b/paddle/gserver/gradientmachines/MultiGradientMachine.cpp @@ -518,7 +518,7 @@ void TrainerThread::computeThread() { backward(); break; case MultiGradientMachine::TASK_COPY_IN_ARGS: - copyInArgs(); + batchSize_ = copyInArgs(); inArgsCopied_ = true; multiMachine_->waitForCopyInArgs(); break; diff --git a/python/paddle/trainer_config_helpers/tests/configs/file_list.sh b/python/paddle/trainer_config_helpers/tests/configs/file_list.sh index c9178e3c6a46a2d663ec368569e529e780b76a6f..164d365c15b8ae7627eeb4798b6df89e198d3eb0 100755 --- a/python/paddle/trainer_config_helpers/tests/configs/file_list.sh +++ b/python/paddle/trainer_config_helpers/tests/configs/file_list.sh @@ -5,6 +5,6 @@ last_first_seq test_expand_layer test_ntm_layers test_hsigmoid img_layers img_trans_layers util_layers simple_rnn_layers unused_layers test_cost_layers test_rnn_group shared_fc shared_lstm shared_gru test_cost_layers_with_weight test_spp_layer test_bilinear_interp test_maxout test_bi_grumemory math_ops -test_seq_concat_reshape) +test_seq_concat_reshape test_pad) export whole_configs=(test_split_datasource) diff --git a/python/paddle/trainer_config_helpers/tests/configs/generate_protostr.sh b/python/paddle/trainer_config_helpers/tests/configs/generate_protostr.sh index ee5961af75ebb33af52f9add645f793015288f4e..8a318879630cd491573afcaf798dda2ca75e335d 100755 --- a/python/paddle/trainer_config_helpers/tests/configs/generate_protostr.sh +++ b/python/paddle/trainer_config_helpers/tests/configs/generate_protostr.sh @@ -11,6 +11,9 @@ for conf in ${configs[*]} do echo "Generating " $conf $1 -m paddle.utils.dump_config $conf.py > $protostr/$conf.protostr.unittest + if [ ! -f "$protostr/$conf.protostr" ]; then + cp $protostr/$conf.protostr.unittest $protostr/$conf.protostr + fi cat ${conf}.py |$1 test_config_parser_for_non_file_config.py > $protostr/$conf.protostr.non_file_config.unittest done @@ -18,5 +21,8 @@ for conf in ${whole_configs[*]} do echo "Generating " $conf $1 -m paddle.utils.dump_config $conf.py "" --whole > $protostr/$conf.protostr.unittest + if [ ! -f "$protostr/$conf.protostr" ]; then + cp $protostr/$conf.protostr.unittest $protostr/$conf.protostr + fi cat ${conf}.py |$1 test_config_parser_for_non_file_config.py --whole > $protostr/$conf.protostr.non_file_config.unittest done diff --git a/python/paddle/trainer_config_helpers/tests/configs/protostr/test_pad.protostr b/python/paddle/trainer_config_helpers/tests/configs/protostr/test_pad.protostr new file mode 100644 index 0000000000000000000000000000000000000000..15c6ab4dc8e61dedc10acaa49db7d8ae136d4952 --- /dev/null +++ b/python/paddle/trainer_config_helpers/tests/configs/protostr/test_pad.protostr @@ -0,0 +1,120 @@ +type: "nn" +layers { + name: "data" + type: "data" + size: 2016 + active_type: "" + height: 48 + width: 42 +} +layers { + name: "__conv_0__" + type: "exconv" + size: 32256 + active_type: "" + inputs { + input_layer_name: "data" + input_parameter_name: "___conv_0__.w0" + conv_conf { + filter_size: 3 + channels: 1 + stride: 1 + padding: 1 + groups: 1 + filter_channels: 1 + output_x: 42 + img_size: 42 + caffe_mode: true + filter_size_y: 3 + padding_y: 1 + stride_y: 1 + output_y: 48 + img_size_y: 48 + } + } + bias_parameter_name: "___conv_0__.wbias" + num_filters: 16 + shared_biases: true + height: 48 + width: 42 +} +layers { + name: "__pool_0__" + type: "pool" + size: 8064 + active_type: "" + inputs { + input_layer_name: "__conv_0__" + pool_conf { + pool_type: "max-projection" + channels: 16 + size_x: 2 + stride: 2 + output_x: 21 + img_size: 42 + padding: 0 + size_y: 2 + stride_y: 2 + output_y: 24 + img_size_y: 48 + padding_y: 0 + } + } + height: 24 + width: 21 +} +layers { + name: "__pad_0__" + type: "pad" + size: 14175 + active_type: "" + inputs { + input_layer_name: "__pool_0__" + pad_conf { + image_conf { + channels: 16 + img_size: 21 + img_size_y: 24 + } + pad_c: 2 + pad_c: 3 + pad_h: 1 + pad_h: 2 + pad_w: 3 + pad_w: 1 + } + } + height: 27 + width: 25 +} +parameters { + name: "___conv_0__.w0" + size: 144 + initial_mean: 0.0 + initial_std: 0.471404520791 + initial_strategy: 0 + initial_smart: false +} +parameters { + name: "___conv_0__.wbias" + size: 16 + initial_mean: 0.0 + initial_std: 0.0 + dims: 16 + dims: 1 + initial_strategy: 0 + initial_smart: false +} +input_layer_names: "data" +output_layer_names: "__pad_0__" +sub_models { + name: "root" + layer_names: "data" + layer_names: "__conv_0__" + layer_names: "__pool_0__" + layer_names: "__pad_0__" + input_layer_names: "data" + output_layer_names: "__pad_0__" + is_recurrent_layer_group: false +} + diff --git a/python/paddle/trainer_config_helpers/tests/configs/test_pad.py b/python/paddle/trainer_config_helpers/tests/configs/test_pad.py index bb5f13410dbbbaeea9e28c271d33a15fb3000dcf..491e8c8caab38eb7c24e5461107ab5a9d63b12ef 100644 --- a/python/paddle/trainer_config_helpers/tests/configs/test_pad.py +++ b/python/paddle/trainer_config_helpers/tests/configs/test_pad.py @@ -2,7 +2,7 @@ from paddle.trainer_config_helpers import * settings(batch_size=1000, learning_rate=1e-5) -data = data_layer(name='data', size=2304, height=48, width=42) +data = data_layer(name='data', size=2016, height=48, width=42) conv = img_conv_layer( input=data, @@ -13,8 +13,7 @@ conv = img_conv_layer( act=LinearActivation(), bias_attr=True) -pool = img_pool_layer( - input=conv, num_channels=8, pool_size=2, stride=2, pool_type=MaxPooling()) +pool = img_pool_layer(input=conv, pool_size=2, stride=2, pool_type=MaxPooling()) pad = pad_layer(input=pool, pad_c=[2, 3], pad_h=[1, 2], pad_w=[3, 1]) diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py index feefd7d758ba09f5d8f818ca1b12b00c5f0e9797..5e99d4a241b7fe2b0f9ff4ba191db4b341c4d30e 100644 --- a/python/paddle/v2/optimizer.py +++ b/python/paddle/v2/optimizer.py @@ -38,12 +38,35 @@ class Optimizer(object): assert isinstance(tmp, swig_api.ParameterOptimizer) return tmp.getParameterTypes() - def create_local_updater(self): + def __create_local_updater__(self): return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__) - def create_remote_updater(self, pass_num): - return swig_api.ParameterUpdater.createRemoteUpdater(self.__opt_conf__, - pass_num) + def __create_remote_updater__(self, pass_num, use_sparse_updater): + return swig_api.ParameterUpdater.createRemoteUpdater( + self.__opt_conf__, pass_num, use_sparse_updater) + + def create_updater(self, is_local, num_passes, use_sparse_updater): + """ + create proper parameter_updater by configuration. + :param is_local: create local or remote parameter updater + :param num_passes: remote parameter updater will use this to config + parameter server. + :param use_sparse_updater: when use remote updater, if some parameter is + sparse, updater should do some extra thing: + + .. code-block:: python + + if use_sparse_remote_updater: + gradient_machine.prefetch(in_args) + parameter_updater.getParametersRemote() + :return: parameter_updater + """ + if is_local: + parameter_updater = self.__create_local_updater__() + else: + parameter_updater = self.__create_remote_updater__( + num_passes, use_sparse_updater) + return parameter_updater class Momentum(Optimizer): diff --git a/python/paddle/v2/topology.py b/python/paddle/v2/topology.py index 737b6bf1e2eb60281d4d6e92667d9fe91e243704..ff28c85c53dc8255b6ad5e3975b07f72a9a64e4b 100644 --- a/python/paddle/v2/topology.py +++ b/python/paddle/v2/topology.py @@ -73,6 +73,18 @@ class Topology(object): assert isinstance(self.__model_config__, ModelConfig) + def use_sparse_updater(self): + """ + check if any parameter require to use sparse_update + :return: + """ + use_sparse = False + for parameter in self.__model_config__.parameters: + if parameter.sparse_update or parameter.sparse_remote_update: + use_sparse = True + break + return use_sparse + def proto(self): return self.__model_config__ diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 68b4967cc031dfa2dd164d822aff97585f923e48..ec9fcfb749f1a858713d3d6672118e521fbdcb32 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -2,6 +2,8 @@ Module Trainer """ import collections +import gzip +import os import py_paddle.swig_paddle as api @@ -42,7 +44,12 @@ class SGD(object): :type extra_layers: paddle.v2.config_base.Layer """ - def __init__(self, cost, parameters, update_equation, extra_layers=None): + def __init__(self, + cost, + parameters, + update_equation, + extra_layers=None, + is_local=True): if not isinstance(parameters, v2_parameters.Parameters): raise TypeError('parameters should be parameters') @@ -55,20 +62,48 @@ class SGD(object): self.__topology__ = topology self.__parameters__ = parameters self.__topology_in_proto__ = topology.proto() + self.__is_local__ = is_local - # In local mode, disable sparse_remote_update. - for param in self.__topology_in_proto__.parameters: - if param.sparse_remote_update: - param.sparse_remote_update = False + self.__use_sparse_updater__ = self.__topology__.use_sparse_updater() + # # In local mode, disable sparse_remote_update. + if is_local: + for param in self.__topology_in_proto__.parameters: + if param.sparse_remote_update: + param.sparse_remote_update = False + self.__gm_create_mode__ = api.CREATE_MODE_NORMAL if not \ + self.__use_sparse_updater__ else api.CREATE_MODE_SGD_SPARSE_CPU_TRAINING self.__data_types__ = topology.data_type() gm = api.GradientMachine.createFromConfigProto( - self.__topology_in_proto__, api.CREATE_MODE_NORMAL, + self.__topology_in_proto__, self.__gm_create_mode__, self.__optimizer__.enable_types()) assert isinstance(gm, api.GradientMachine) self.__gradient_machine__ = gm self.__gradient_machine__.randParameters() - parameters.append_gradient_machine(gm) + self.__parameters__.append_gradient_machine(gm) + self.__parameter_updater__ = None + + def __use_remote_sparse_updater__(self): + return self.__use_sparse_updater__ and not self.__is_local__ + + def __prepare_parameter__(self, in_args): + """ + prepare parameter before forward backward. + 1. When use remote sparse updater, parameters should be got + from ps according to input arguments. + :param in_args: input arguments of this batch. + :return: + """ + if self.__use_remote_sparse_updater__(): + self.__gradient_machine__.prefetch(in_args) + self.__parameter_updater__.getParametersRemote() + + def save_parameter_to_tar(self, f): + self.__parameter_updater__.catchUpWith() + self.__parameter_updater__.apply() + self.__parameter_updater__.getParametersRemote(True, True) + self.__parameters__.to_tar(f) + self.__parameter_updater__.restore() def train(self, reader, num_passes=1, event_handler=None, feeding=None): """ @@ -90,8 +125,9 @@ class SGD(object): event_handler = default_event_handler __check_train_args__(**locals()) - updater = self.__optimizer__.create_local_updater() - updater.init(self.__gradient_machine__) + self.__parameter_updater__ = self.__optimizer__.create_updater( + self.__is_local__, num_passes, self.__use_sparse_updater__) + self.__parameter_updater__.init(self.__gradient_machine__) self.__gradient_machine__.start() batch_evaluator = self.__gradient_machine__.makeEvaluator() @@ -103,23 +139,26 @@ class SGD(object): for pass_id in xrange(num_passes): event_handler(v2_event.BeginPass(pass_id)) pass_evaluator.start() - updater.startPass() + self.__parameter_updater__.startPass() for batch_id, data_batch in enumerate(reader()): batch_evaluator.start() event_handler( v2_event.BeginIteration( pass_id=pass_id, batch_id=batch_id)) - pass_type = updater.startBatch(len(data_batch)) - self.__gradient_machine__.forwardBackward( - feeder(data_batch), out_args, pass_type) + pass_type = self.__parameter_updater__.startBatch( + len(data_batch)) + in_args = feeder(data_batch) + self.__prepare_parameter__(in_args) + self.__gradient_machine__.forwardBackward(in_args, out_args, + pass_type) self.__gradient_machine__.eval(pass_evaluator) self.__gradient_machine__.eval(batch_evaluator) for each_param in self.__gradient_machine__.getNonStaticParameters( ): - updater.update(each_param) + self.__parameter_updater__.update(each_param) cost_sum = out_args.sum() cost = cost_sum / len(data_batch) - updater.finishBatch(cost) + self.__parameter_updater__.finishBatch(cost) batch_evaluator.finish() event_handler( v2_event.EndIteration( @@ -128,7 +167,7 @@ class SGD(object): cost=cost, evaluator=batch_evaluator)) - updater.finishPass() + self.__parameter_updater__.finishPass() pass_evaluator.finish() event_handler(v2_event.EndPass(pass_id, evaluator=pass_evaluator)) self.__gradient_machine__.finish() @@ -152,8 +191,9 @@ class SGD(object): num_samples = 0.0 for data_batch in reader(): num_samples += len(data_batch) - self.__gradient_machine__.forward( - feeder(data_batch), out_args, api.PASS_TEST) + in_args = feeder(data_batch) + self.__prepare_parameter__(in_args) + self.__gradient_machine__.forward(in_args, out_args, api.PASS_TEST) total_cost += out_args.sum() self.__gradient_machine__.eval(evaluator)