diff --git a/demo/mnist/api_train_v2.py b/demo/mnist/api_train_v2.py index 612b0d218fc0be68b668ffb8f1bfb0ba92c4d741..a59b30ccdb2eddea6680d6ad5c790c857b9c5141 100644 --- a/demo/mnist/api_train_v2.py +++ b/demo/mnist/api_train_v2.py @@ -20,26 +20,29 @@ def main(): adam_optimizer = paddle.optimizer.Adam(learning_rate=0.01) + trainer = paddle.trainer.SGD(cost=cost, + parameters=parameters, + update_equation=adam_optimizer) + def event_handler(event): if isinstance(event, paddle.event.EndIteration): - if event.batch_id % 100 == 0: - print "Pass %d, Batch %d, Cost %f, %s" % ( - event.pass_id, event.batch_id, event.cost, event.metrics) + if event.batch_id % 1000 == 0: + result = trainer.test(reader=paddle.reader.batched( + paddle.dataset.mnist.test(), batch_size=256)) + + print "Pass %d, Batch %d, Cost %f, %s, Testing metrics %s" % ( + event.pass_id, event.batch_id, event.cost, event.metrics, + result.metrics) + else: pass - trainer = paddle.trainer.SGD(update_equation=adam_optimizer) - trainer.train( reader=paddle.reader.batched( paddle.reader.shuffle( paddle.dataset.mnist.train(), buf_size=8192), batch_size=32), - cost=cost, - parameters=parameters, - event_handler=event_handler, - reader_dict={images.name: 0, - label.name: 1}) + event_handler=event_handler) if __name__ == '__main__': diff --git a/doc/howto/usage/cluster/cluster_train_cn.md b/doc/howto/usage/cluster/cluster_train_cn.md index acdcfa1c0047ced85c0a9c53d691edc0b4489336..274452fbf0c595ad7b4dbeffe85ad9038f12b458 100644 --- a/doc/howto/usage/cluster/cluster_train_cn.md +++ b/doc/howto/usage/cluster/cluster_train_cn.md @@ -6,7 +6,7 @@ 在本文中,我们将阐释如何在集群上运行分布式 Paddle 训练作业。我们将以[推荐系统](https://github.com/baidu/Paddle/tree/develop/demo/recommendation)为例创建分布式的单进程训练。 -在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/k8s) )的用户参考。 +在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s) )的用户参考。 ## 前提条件 diff --git a/doc/howto/usage/cluster/cluster_train_en.md b/doc/howto/usage/cluster/cluster_train_en.md index 30963dcd927250651f3ed0b39949f541cc28ed4a..c60876721cbf5565d6e48c8061811aacada748cd 100644 --- a/doc/howto/usage/cluster/cluster_train_en.md +++ b/doc/howto/usage/cluster/cluster_train_en.md @@ -2,7 +2,7 @@ In this article, we explain how to run distributed Paddle training jobs on clusters. We will create the distributed version of the single-process training example, [recommendation](https://github.com/baidu/Paddle/tree/develop/demo/recommendation). -[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/k8s). +[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s). ## Prerequisite diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index a5ffe25a116e9be039bdebaaaad435685e23d372..fcf4437ffaf329f52cc5bc997eff45dee200873c 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -32,3 +32,10 @@ def download(url, module_name, md5sum): shutil.copyfileobj(r.raw, f) return filename + + +def dict_add(a_dict, ele): + if ele in a_dict: + a_dict[ele] += 1 + else: + a_dict[ele] = 1 diff --git a/python/paddle/v2/dataset/imdb.py b/python/paddle/v2/dataset/imdb.py new file mode 100644 index 0000000000000000000000000000000000000000..433e37380f840f5b7ff619a5f64b99d2ad724b17 --- /dev/null +++ b/python/paddle/v2/dataset/imdb.py @@ -0,0 +1,120 @@ +# /usr/bin/env python +# -*- coding:utf-8 -*- + +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +IMDB dataset: http://ai.stanford.edu/%7Eamaas/data/sentiment/aclImdb_v1.tar.gz +""" +import paddle.v2.dataset.common +import tarfile +import Queue +import re +import string +import threading + +__all__ = ['build_dict', 'train', 'test'] + +URL = 'http://ai.stanford.edu/%7Eamaas/data/sentiment/aclImdb_v1.tar.gz' +MD5 = '7c2ac02c03563afcf9b574c7e56c153a' + + +# Read files that match pattern. Tokenize and yield each file. +def tokenize(pattern): + with tarfile.open(paddle.v2.dataset.common.download(URL, 'imdb', + MD5)) as tarf: + # Note that we should use tarfile.next(), which does + # sequential access of member files, other than + # tarfile.extractfile, which does random access and might + # destroy hard disks. + tf = tarf.next() + while tf != None: + if bool(pattern.match(tf.name)): + # newline and punctuations removal and ad-hoc tokenization. + yield tarf.extractfile(tf).read().rstrip("\n\r").translate( + None, string.punctuation).lower().split() + tf = tarf.next() + + +def build_dict(pattern, cutoff): + word_freq = {} + for doc in tokenize(pattern): + for word in doc: + paddle.v2.dataset.common.dict_add(word_freq, word) + + # Not sure if we should prune less-frequent words here. + word_freq = filter(lambda x: x[1] > cutoff, word_freq.items()) + + dictionary = sorted(word_freq, key=lambda x: (-x[1], x[0])) + words, _ = list(zip(*dictionary)) + word_idx = dict(zip(words, xrange(len(words)))) + word_idx[''] = len(words) + return word_idx + + +def reader_creator(pos_pattern, neg_pattern, word_idx, buffer_size): + UNK = word_idx[''] + + qs = [Queue.Queue(maxsize=buffer_size), Queue.Queue(maxsize=buffer_size)] + + def load(pattern, queue): + for doc in tokenize(pattern): + queue.put(doc) + queue.put(None) + + def reader(): + # Creates two threads that loads positive and negative samples + # into qs. + t0 = threading.Thread( + target=load, args=( + pos_pattern, + qs[0], )) + t0.daemon = True + t0.start() + + t1 = threading.Thread( + target=load, args=( + neg_pattern, + qs[1], )) + t1.daemon = True + t1.start() + + # Read alternatively from qs[0] and qs[1]. + i = 0 + doc = qs[i].get() + while doc != None: + yield [word_idx.get(w, UNK) for w in doc], i % 2 + i += 1 + doc = qs[i % 2].get() + + # If any queue is empty, reads from the other queue. + i += 1 + doc = qs[i % 2].get() + while doc != None: + yield [word_idx.get(w, UNK) for w in doc], i % 2 + doc = qs[i % 2].get() + + return reader() + + +def train(word_idx): + return reader_creator( + re.compile("aclImdb/train/pos/.*\.txt$"), + re.compile("aclImdb/train/neg/.*\.txt$"), word_idx, 1000) + + +def test(word_idx): + return reader_creator( + re.compile("aclImdb/test/pos/.*\.txt$"), + re.compile("aclImdb/test/neg/.*\.txt$"), word_idx, 1000) diff --git a/python/paddle/v2/dataset/imikolov.py b/python/paddle/v2/dataset/imikolov.py new file mode 100644 index 0000000000000000000000000000000000000000..b3791ddad66e588356338150fccadbcc8fa113ca --- /dev/null +++ b/python/paddle/v2/dataset/imikolov.py @@ -0,0 +1,79 @@ +""" +imikolov's simple dataset: http://www.fit.vutbr.cz/~imikolov/rnnlm/ +""" +import paddle.v2.dataset.common +import tarfile + +__all__ = ['train', 'test'] + +URL = 'http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz' +MD5 = '30177ea32e27c525793142b6bf2c8e2d' + + +def word_count(f, word_freq=None): + add = paddle.v2.dataset.common.dict_add + if word_freq == None: + word_freq = {} + + for l in f: + for w in l.strip().split(): + add(word_freq, w) + add(word_freq, '') + add(word_freq, '') + + return word_freq + + +def build_dict(train_filename, test_filename): + with tarfile.open( + paddle.v2.dataset.common.download( + paddle.v2.dataset.imikolov.URL, 'imikolov', + paddle.v2.dataset.imikolov.MD5)) as tf: + trainf = tf.extractfile(train_filename) + testf = tf.extractfile(test_filename) + word_freq = word_count(testf, word_count(trainf)) + + TYPO_FREQ = 50 + word_freq = filter(lambda x: x[1] > TYPO_FREQ, word_freq.items()) + + dictionary = sorted(word_freq, key=lambda x: (-x[1], x[0])) + words, _ = list(zip(*dictionary)) + word_idx = dict(zip(words, xrange(len(words)))) + word_idx[''] = len(words) + + return word_idx + + +word_idx = {} + + +def reader_creator(filename, n): + global word_idx + if len(word_idx) == 0: + word_idx = build_dict('./simple-examples/data/ptb.train.txt', + './simple-examples/data/ptb.valid.txt') + + def reader(): + with tarfile.open( + paddle.v2.dataset.common.download( + paddle.v2.dataset.imikolov.URL, 'imikolov', + paddle.v2.dataset.imikolov.MD5)) as tf: + f = tf.extractfile(filename) + + UNK = word_idx[''] + for l in f: + l = [''] + l.strip().split() + [''] + if len(l) >= n: + l = [word_idx.get(w, UNK) for w in l] + for i in range(n, len(l) + 1): + yield tuple(l[i - n:i]) + + return reader + + +def train(n): + return reader_creator('./simple-examples/data/ptb.train.txt', n) + + +def test(n): + return reader_creator('./simple-examples/data/ptb.valid.txt', n) diff --git a/python/paddle/v2/dataset/mnist.py b/python/paddle/v2/dataset/mnist.py index f1315b35cd55c5387295f1f883b997cd6dd71bd1..1512a3c3189de4e54f8502cfadf450b0710a246e 100644 --- a/python/paddle/v2/dataset/mnist.py +++ b/python/paddle/v2/dataset/mnist.py @@ -9,9 +9,9 @@ __all__ = ['train', 'test'] URL_PREFIX = 'http://yann.lecun.com/exdb/mnist/' TEST_IMAGE_URL = URL_PREFIX + 't10k-images-idx3-ubyte.gz' -TEST_IMAGE_MD5 = '25e3cc63507ef6e98d5dc541e8672bb6' +TEST_IMAGE_MD5 = '9fb629c4189551a2d022fa330f9573f3' TEST_LABEL_URL = URL_PREFIX + 't10k-labels-idx1-ubyte.gz' -TEST_LABEL_MD5 = '4e9511fe019b2189026bd0421ba7b688' +TEST_LABEL_MD5 = 'ec29112dd5afa0611ce80d1b7f02629c' TRAIN_IMAGE_URL = URL_PREFIX + 'train-images-idx3-ubyte.gz' TRAIN_IMAGE_MD5 = 'f68b3c2dcbeaaa9fbdd348bbdeb94873' TRAIN_LABEL_URL = URL_PREFIX + 'train-labels-idx1-ubyte.gz' diff --git a/python/paddle/v2/dataset/tests/imdb_test.py b/python/paddle/v2/dataset/tests/imdb_test.py new file mode 100644 index 0000000000000000000000000000000000000000..e887af16634d2db04b8cf5fa0269a69991d8baac --- /dev/null +++ b/python/paddle/v2/dataset/tests/imdb_test.py @@ -0,0 +1,43 @@ +import paddle.v2.dataset.imdb +import unittest +import re + +TRAIN_POS_PATTERN = re.compile("aclImdb/train/pos/.*\.txt$") +TRAIN_NEG_PATTERN = re.compile("aclImdb/train/neg/.*\.txt$") +TRAIN_PATTERN = re.compile("aclImdb/train/.*\.txt$") + +TEST_POS_PATTERN = re.compile("aclImdb/test/pos/.*\.txt$") +TEST_NEG_PATTERN = re.compile("aclImdb/test/neg/.*\.txt$") +TEST_PATTERN = re.compile("aclImdb/test/.*\.txt$") + + +class TestIMDB(unittest.TestCase): + word_idx = None + + def test_build_dict(self): + if self.word_idx == None: + self.word_idx = paddle.v2.dataset.imdb.build_dict(TRAIN_PATTERN, + 150) + + self.assertEqual(len(self.word_idx), 7036) + + def check_dataset(self, dataset, expected_size): + if self.word_idx == None: + self.word_idx = paddle.v2.dataset.imdb.build_dict(TRAIN_PATTERN, + 150) + + sum = 0 + for l in dataset(self.word_idx): + self.assertEqual(l[1], sum % 2) + sum += 1 + self.assertEqual(sum, expected_size) + + def test_train(self): + self.check_dataset(paddle.v2.dataset.imdb.train, 25000) + + def test_test(self): + self.check_dataset(paddle.v2.dataset.imdb.test, 25000) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/v2/dataset/tests/imikolov_test.py b/python/paddle/v2/dataset/tests/imikolov_test.py new file mode 100644 index 0000000000000000000000000000000000000000..9b1748eaaa7f913a6b94f2087a8089fb998570aa --- /dev/null +++ b/python/paddle/v2/dataset/tests/imikolov_test.py @@ -0,0 +1,20 @@ +import paddle.v2.dataset.imikolov +import unittest + + +class TestMikolov(unittest.TestCase): + def check_reader(self, reader, n): + for l in reader(): + self.assertEqual(len(l), n) + + def test_train(self): + n = 5 + self.check_reader(paddle.v2.dataset.imikolov.train(n), n) + + def test_test(self): + n = 5 + self.check_reader(paddle.v2.dataset.imikolov.test(n), n) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/v2/event.py b/python/paddle/v2/event.py index 835e28e6218df22e1cad7f7bb31c3c9941657252..a78bcf076cc65e0dfdfc5760e099900418162f35 100644 --- a/python/paddle/v2/event.py +++ b/python/paddle/v2/event.py @@ -11,7 +11,10 @@ There are: TODO(yuyang18): Complete it! """ import py_paddle.swig_paddle as api -__all__ = ['EndIteration', 'BeginIteration', 'BeginPass', 'EndPass'] + +__all__ = [ + 'EndIteration', 'BeginIteration', 'BeginPass', 'EndPass', 'TestResult' +] class WithMetric(object): @@ -30,6 +33,11 @@ class WithMetric(object): return retv +class TestResult(WithMetric): + def __init__(self, evaluator): + super(TestResult, self).__init__(evaluator) + + class BeginPass(object): """ Event On One Pass Training Start. diff --git a/python/paddle/v2/topology.py b/python/paddle/v2/topology.py index e2709e39de9d9d1588825e35cdc603da40e52f8c..4c211254319bbdf46b02a2cee56b6a98b01819a2 100644 --- a/python/paddle/v2/topology.py +++ b/python/paddle/v2/topology.py @@ -21,6 +21,28 @@ import layer as v2_layer __all__ = ['Topology'] +def __flatten__(lis): + """ + Given a list, possibly nested to any level, return it flattened. + """ + new_lis = [] + for item in lis: + if isinstance(item, collections.Sequence): + new_lis.extend(__flatten__(item)) + else: + new_lis.append(item) + return new_lis + + +def __bfs_travel__(callback, *layers): + layers = __flatten__(layers) + for each_layer in layers: + __break__ = callback(each_layer) + if __break__: + return + __bfs_travel__(callback, *each_layer.__parent_layers__.values()) + + class Topology(object): """ Topology is used to store the information about all layers @@ -46,21 +68,17 @@ class Topology(object): :param name: :return: """ - result_layer = [] - - def find_layer_by_name(layer, layer_name): - if len(result_layer) == 1: - return - elif layer.name == layer_name: - result_layer.append(layer) - else: - for parent_layer in layer.__parent_layers__.values(): - find_layer_by_name(parent_layer, layer_name) + result_layer = [None] - for layer in self.layers: - find_layer_by_name(layer, name) + def __impl__(l): + if l.name == name: + result_layer[0] = l + return True # break + return False - assert len(result_layer) == 1 + __bfs_travel__(__impl__, *self.layers) + if result_layer[0] is None: + raise ValueError("No such layer %s" % name) return result_layer[0] def data_layers(self): @@ -70,18 +88,11 @@ class Topology(object): """ data_layers = dict() - def find_data_layer(layer): - if isinstance(layer, v2_layer.DataLayerV2): - data_layers[layer.name] = layer - if not isinstance(layer, collections.Sequence): - layer = [layer] - for each_l in layer: - for parent_layer in each_l.__parent_layers__.values(): - find_data_layer(parent_layer) - - for layer in self.layers: - find_data_layer(layer) + def __impl__(l): + if isinstance(l, v2_layer.DataLayerV2): + data_layers[l.name] = l + __bfs_travel__(__impl__, *self.layers) return data_layers def data_type(self): @@ -89,12 +100,9 @@ class Topology(object): get data_type from proto, such as: [('image', dense_vector(768)), ('label', integer_value(10))] """ - - data_types_lists = [] data_layers = self.data_layers() - for each in self.__model_config__.input_layer_names: - data_types_lists.append((each, data_layers[each].type)) - return data_types_lists + return [(nm, data_layers[nm].type) + for nm in self.proto().input_layer_names] def __check_layer_type__(layer): diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index bf8b181e42064f01a78b92313805b5fed3a3ceac..5003f55f3e0d15149d28d1478e0487d6873d6e0a 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -42,25 +42,35 @@ class ITrainer(object): class SGD(ITrainer): - def __init__(self, update_equation): + def __init__(self, cost, parameters, update_equation): """ Simple SGD Trainer. :param update_equation: The optimizer object. :type update_equation: v2_optimizer.Optimizer """ + + if not isinstance(parameters, v2_parameters.Parameters): + raise TypeError('parameters should be parameters') + if not isinstance(update_equation, v2_optimizer.Optimizer): - raise ValueError("update equation parameter must be " - "paddle.v2.optimizer.Optimizer") + raise TypeError("update equation parameter must be " + "paddle.v2.optimizer.Optimizer") + topology = Topology(cost) self.__optimizer__ = update_equation + self.__topology__ = topology + self.__parameters__ = parameters + self.__topology_in_proto__ = topology.proto() + self.__data_types__ = topology.data_type() + gm = api.GradientMachine.createFromConfigProto( + self.__topology_in_proto__, api.CREATE_MODE_NORMAL, + self.__optimizer__.enable_types()) + assert isinstance(gm, api.GradientMachine) + parameters.append_gradient_machine(gm) + self.__gradient_machine__ = gm + self.__gradient_machine__.randParameters() - def train(self, - reader, - cost, - parameters, - num_passes=1, - event_handler=None, - reader_dict=None): + def train(self, reader, num_passes=1, event_handler=None, reader_dict=None): """ Training method. Will train num_passes of input data. @@ -76,27 +86,22 @@ class SGD(ITrainer): if event_handler is None: event_handler = default_event_handler - topology = Topology(cost) + if reader_dict is None: + reader_dict = self.default_reader_dict() __check_train_args__(**locals()) - gm = api.GradientMachine.createFromConfigProto( - topology.proto(), api.CREATE_MODE_NORMAL, - self.__optimizer__.enable_types()) - assert isinstance(gm, api.GradientMachine) - parameters.append_gradient_machine(gm) - gm.randParameters() updater = self.__optimizer__.create_local_updater() - updater.init(gm) + updater.init(self.__gradient_machine__) - gm.start() - batch_evaluator = gm.makeEvaluator() + self.__gradient_machine__.start() + batch_evaluator = self.__gradient_machine__.makeEvaluator() assert isinstance(batch_evaluator, api.Evaluator) - pass_evaluator = gm.makeEvaluator() + pass_evaluator = self.__gradient_machine__.makeEvaluator() assert isinstance(pass_evaluator, api.Evaluator) out_args = api.Arguments.createArguments(0) - feeder = DataFeeder(topology.data_type(), reader_dict) + feeder = DataFeeder(self.__data_types__, reader_dict) for pass_id in xrange(num_passes): event_handler(v2_event.BeginPass(pass_id)) @@ -104,16 +109,18 @@ class SGD(ITrainer): updater.startPass() for batch_id, data_batch in enumerate(reader()): pass_type = updater.startBatch(len(data_batch)) - gm.forwardBackward(feeder(data_batch), out_args, pass_type) + self.__gradient_machine__.forwardBackward( + feeder(data_batch), out_args, pass_type) batch_evaluator.start() event_handler( v2_event.BeginIteration( pass_id=pass_id, batch_id=batch_id)) pass_type = updater.startBatch(len(data_batch)) - gm.forwardBackward(feeder(data_batch), out_args, pass_type) - gm.eval(pass_evaluator) - gm.eval(batch_evaluator) - for each_param in gm.getParameters(): + self.__gradient_machine__.forwardBackward( + feeder(data_batch), out_args, pass_type) + self.__gradient_machine__.eval(pass_evaluator) + self.__gradient_machine__.eval(batch_evaluator) + for each_param in self.__gradient_machine__.getParameters(): updater.update(each_param) # Get cost. We use numpy to calculate total cost for this batch. cost_vec = out_args.getSlotValue(0) @@ -131,22 +138,37 @@ class SGD(ITrainer): updater.finishPass() pass_evaluator.finish() event_handler(v2_event.EndPass(pass_id, evaluator=pass_evaluator)) - gm.finish() + self.__gradient_machine__.finish() + + def default_reader_dict(self): + reader_dict = dict() + for i, tp in enumerate(self.__data_types__): + reader_dict[tp[0]] = i + return reader_dict + + def test(self, reader, reader_dict=None): + if reader_dict is None: + reader_dict = self.default_reader_dict() + + feeder = DataFeeder(self.__data_types__, reader_dict) + evaluator = self.__gradient_machine__.makeEvaluator() + out_args = api.Arguments.createArguments(0) + evaluator.start() + for data_batch in reader(): + self.__gradient_machine__.forward( + feeder(data_batch), out_args, api.PASS_TEST) + self.__gradient_machine__.eval(evaluator) + + evaluator.finish() + return v2_event.TestResult(evaluator=evaluator) -def __check_train_args__(reader, topology, parameters, event_handler, **kwargs): +def __check_train_args__(reader, event_handler, **kwargs): """ Check train function's argument types """ if not callable(reader) or not isinstance(reader(), collections.Iterator): raise TypeError('train_data_reader should be a function, ' 'which can return a iterator') - - if not isinstance(topology, Topology): - raise TypeError('topology should be a model config') - - if not isinstance(parameters, v2_parameters.Parameters): - raise TypeError('parameters should be a parameter pool') - if not callable(event_handler): raise TypeError('event handler should be a function')