diff --git a/doc/howto/usage/cluster/cluster_train_cn.md b/doc/howto/usage/cluster/cluster_train_cn.md index acdcfa1c0047ced85c0a9c53d691edc0b4489336..274452fbf0c595ad7b4dbeffe85ad9038f12b458 100644 --- a/doc/howto/usage/cluster/cluster_train_cn.md +++ b/doc/howto/usage/cluster/cluster_train_cn.md @@ -6,7 +6,7 @@ 在本文中,我们将阐释如何在集群上运行分布式 Paddle 训练作业。我们将以[推荐系统](https://github.com/baidu/Paddle/tree/develop/demo/recommendation)为例创建分布式的单进程训练。 -在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/k8s) )的用户参考。 +在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s) )的用户参考。 ## 前提条件 diff --git a/doc/howto/usage/cluster/cluster_train_en.md b/doc/howto/usage/cluster/cluster_train_en.md index 30963dcd927250651f3ed0b39949f541cc28ed4a..c60876721cbf5565d6e48c8061811aacada748cd 100644 --- a/doc/howto/usage/cluster/cluster_train_en.md +++ b/doc/howto/usage/cluster/cluster_train_en.md @@ -2,7 +2,7 @@ In this article, we explain how to run distributed Paddle training jobs on clusters. We will create the distributed version of the single-process training example, [recommendation](https://github.com/baidu/Paddle/tree/develop/demo/recommendation). -[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/k8s). +[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s). ## Prerequisite diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index 1831b8e170087c909f77948f2d9077c946c72507..d99e9a4ad48ea4764c7a1ea56c507d754d56853b 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -47,6 +47,9 @@ void setUseGpu(bool useGpu); /// Return true if this py_paddle is compiled in GPU Version bool isGpuVersion(); +/// Return FLAGS_trainer_count +int getTrainerCount(); + /// The Error of IO Operation. Such as file not found, etc. class IOError {}; diff --git a/paddle/api/Util.cpp b/paddle/api/Util.cpp index 54d67aa62f4d87ad03282962c722019698dc621a..d369df5d4e04b4a8d822db0e72a8051150868ce6 100644 --- a/paddle/api/Util.cpp +++ b/paddle/api/Util.cpp @@ -54,5 +54,7 @@ bool isGpuVersion() { #endif } +int getTrainerCount() { return FLAGS_trainer_count; } + static_assert(NUM_PARAMETER_TYPES == paddle::NUM_PARAMETER_TYPES, "The Parameter Type should be same in core/api and core/common"); diff --git a/paddle/function/CosSimOpGpu.cu b/paddle/function/CosSimOpGpu.cu index 1dd733674fa0542c76070955ec63e008b083c7d2..c62ab39551f02288618244871ae31c6800df5b42 100644 --- a/paddle/function/CosSimOpGpu.cu +++ b/paddle/function/CosSimOpGpu.cu @@ -92,7 +92,6 @@ void CosSimForward(GpuMatrix& out_mat, CHECK(in1_mat.useGpu_ == true && in2_mat.useGpu_ == true) << "Matrix type are not GPU"; - size_t num_samples = out_mat.getHeight(); size_t dim = in1_mat.getWidth(); real* out = out_mat.getData(); const real* x = in1_mat.getData(); diff --git a/paddle/py_paddle/dataprovider_converter.py b/paddle/py_paddle/dataprovider_converter.py index 2690cafe1d8d32bf52cd9e5fa4dc69fbacb2d66c..c009b05cdeeb9dbe2dc70048e6827a12445f677e 100644 --- a/paddle/py_paddle/dataprovider_converter.py +++ b/paddle/py_paddle/dataprovider_converter.py @@ -26,6 +26,15 @@ class IScanner(object): if not isinstance(self.input_type, dp2.InputType): raise ValueError("input type should be dataprovider2.InputType") self.pos = pos + # data_in_gpu is used to indicate whether to create argument on GPU + # or not in GPU mode. Now if using one thread (trainer_count=1), + # trainer uses NeuralNetwork which needs to create argument on GPU + # before calling forward function. So, set data_in_gpu to True. + # Otherwise, trainer uses MultiGradientMachine which will transfer + # data from CPU to GPU in the forward function, set data_in_gpu to + # False in this case. + self.data_in_gpu = swig_paddle.isUsingGpu( + ) and swig_paddle.getTrainerCount() == 1 def scan(self, dat): pass @@ -53,7 +62,8 @@ class DenseScanner(IScanner): assert isinstance(argument, swig_paddle.Arguments) if self.__mat__.dtype != numpy.float32: self.__mat__ = self.__mat__.astype(numpy.float32) - m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True, False) + m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True, + self.data_in_gpu) argument.setSlotValue(self.pos, m) @@ -75,10 +85,13 @@ class SparseBinaryScanner(IScanner): def finish_scan(self, argument): assert isinstance(argument, swig_paddle.Arguments) - m = swig_paddle.Matrix.createSparse(self.__height__, - self.input_type.dim, - len(self.__cols__), - len(self.__value__) == 0) + m = swig_paddle.Matrix.createSparse( + self.__height__, + self.input_type.dim, + len(self.__cols__), + len(self.__value__) == 0, + False, # trans + False) # TODO supoort GPU assert isinstance(m, swig_paddle.Matrix) m.sparseCopyFrom(self.__rows__, self.__cols__, self.__value__) argument.setSlotValue(self.pos, m) @@ -102,7 +115,7 @@ class IndexScanner(IScanner): self.__ids__.append(dat) def finish_scan(self, argument): - ids = swig_paddle.IVector.create(self.__ids__) + ids = swig_paddle.IVector.create(self.__ids__, self.data_in_gpu) assert isinstance(argument, swig_paddle.Arguments) argument.setSlotIds(self.pos, ids) diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index a5ffe25a116e9be039bdebaaaad435685e23d372..fcf4437ffaf329f52cc5bc997eff45dee200873c 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -32,3 +32,10 @@ def download(url, module_name, md5sum): shutil.copyfileobj(r.raw, f) return filename + + +def dict_add(a_dict, ele): + if ele in a_dict: + a_dict[ele] += 1 + else: + a_dict[ele] = 1 diff --git a/python/paddle/v2/dataset/imdb.py b/python/paddle/v2/dataset/imdb.py new file mode 100644 index 0000000000000000000000000000000000000000..433e37380f840f5b7ff619a5f64b99d2ad724b17 --- /dev/null +++ b/python/paddle/v2/dataset/imdb.py @@ -0,0 +1,120 @@ +# /usr/bin/env python +# -*- coding:utf-8 -*- + +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +IMDB dataset: http://ai.stanford.edu/%7Eamaas/data/sentiment/aclImdb_v1.tar.gz +""" +import paddle.v2.dataset.common +import tarfile +import Queue +import re +import string +import threading + +__all__ = ['build_dict', 'train', 'test'] + +URL = 'http://ai.stanford.edu/%7Eamaas/data/sentiment/aclImdb_v1.tar.gz' +MD5 = '7c2ac02c03563afcf9b574c7e56c153a' + + +# Read files that match pattern. Tokenize and yield each file. +def tokenize(pattern): + with tarfile.open(paddle.v2.dataset.common.download(URL, 'imdb', + MD5)) as tarf: + # Note that we should use tarfile.next(), which does + # sequential access of member files, other than + # tarfile.extractfile, which does random access and might + # destroy hard disks. + tf = tarf.next() + while tf != None: + if bool(pattern.match(tf.name)): + # newline and punctuations removal and ad-hoc tokenization. + yield tarf.extractfile(tf).read().rstrip("\n\r").translate( + None, string.punctuation).lower().split() + tf = tarf.next() + + +def build_dict(pattern, cutoff): + word_freq = {} + for doc in tokenize(pattern): + for word in doc: + paddle.v2.dataset.common.dict_add(word_freq, word) + + # Not sure if we should prune less-frequent words here. + word_freq = filter(lambda x: x[1] > cutoff, word_freq.items()) + + dictionary = sorted(word_freq, key=lambda x: (-x[1], x[0])) + words, _ = list(zip(*dictionary)) + word_idx = dict(zip(words, xrange(len(words)))) + word_idx[''] = len(words) + return word_idx + + +def reader_creator(pos_pattern, neg_pattern, word_idx, buffer_size): + UNK = word_idx[''] + + qs = [Queue.Queue(maxsize=buffer_size), Queue.Queue(maxsize=buffer_size)] + + def load(pattern, queue): + for doc in tokenize(pattern): + queue.put(doc) + queue.put(None) + + def reader(): + # Creates two threads that loads positive and negative samples + # into qs. + t0 = threading.Thread( + target=load, args=( + pos_pattern, + qs[0], )) + t0.daemon = True + t0.start() + + t1 = threading.Thread( + target=load, args=( + neg_pattern, + qs[1], )) + t1.daemon = True + t1.start() + + # Read alternatively from qs[0] and qs[1]. + i = 0 + doc = qs[i].get() + while doc != None: + yield [word_idx.get(w, UNK) for w in doc], i % 2 + i += 1 + doc = qs[i % 2].get() + + # If any queue is empty, reads from the other queue. + i += 1 + doc = qs[i % 2].get() + while doc != None: + yield [word_idx.get(w, UNK) for w in doc], i % 2 + doc = qs[i % 2].get() + + return reader() + + +def train(word_idx): + return reader_creator( + re.compile("aclImdb/train/pos/.*\.txt$"), + re.compile("aclImdb/train/neg/.*\.txt$"), word_idx, 1000) + + +def test(word_idx): + return reader_creator( + re.compile("aclImdb/test/pos/.*\.txt$"), + re.compile("aclImdb/test/neg/.*\.txt$"), word_idx, 1000) diff --git a/python/paddle/v2/dataset/imikolov.py b/python/paddle/v2/dataset/imikolov.py new file mode 100644 index 0000000000000000000000000000000000000000..b3791ddad66e588356338150fccadbcc8fa113ca --- /dev/null +++ b/python/paddle/v2/dataset/imikolov.py @@ -0,0 +1,79 @@ +""" +imikolov's simple dataset: http://www.fit.vutbr.cz/~imikolov/rnnlm/ +""" +import paddle.v2.dataset.common +import tarfile + +__all__ = ['train', 'test'] + +URL = 'http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz' +MD5 = '30177ea32e27c525793142b6bf2c8e2d' + + +def word_count(f, word_freq=None): + add = paddle.v2.dataset.common.dict_add + if word_freq == None: + word_freq = {} + + for l in f: + for w in l.strip().split(): + add(word_freq, w) + add(word_freq, '') + add(word_freq, '') + + return word_freq + + +def build_dict(train_filename, test_filename): + with tarfile.open( + paddle.v2.dataset.common.download( + paddle.v2.dataset.imikolov.URL, 'imikolov', + paddle.v2.dataset.imikolov.MD5)) as tf: + trainf = tf.extractfile(train_filename) + testf = tf.extractfile(test_filename) + word_freq = word_count(testf, word_count(trainf)) + + TYPO_FREQ = 50 + word_freq = filter(lambda x: x[1] > TYPO_FREQ, word_freq.items()) + + dictionary = sorted(word_freq, key=lambda x: (-x[1], x[0])) + words, _ = list(zip(*dictionary)) + word_idx = dict(zip(words, xrange(len(words)))) + word_idx[''] = len(words) + + return word_idx + + +word_idx = {} + + +def reader_creator(filename, n): + global word_idx + if len(word_idx) == 0: + word_idx = build_dict('./simple-examples/data/ptb.train.txt', + './simple-examples/data/ptb.valid.txt') + + def reader(): + with tarfile.open( + paddle.v2.dataset.common.download( + paddle.v2.dataset.imikolov.URL, 'imikolov', + paddle.v2.dataset.imikolov.MD5)) as tf: + f = tf.extractfile(filename) + + UNK = word_idx[''] + for l in f: + l = [''] + l.strip().split() + [''] + if len(l) >= n: + l = [word_idx.get(w, UNK) for w in l] + for i in range(n, len(l) + 1): + yield tuple(l[i - n:i]) + + return reader + + +def train(n): + return reader_creator('./simple-examples/data/ptb.train.txt', n) + + +def test(n): + return reader_creator('./simple-examples/data/ptb.valid.txt', n) diff --git a/python/paddle/v2/dataset/tests/imdb_test.py b/python/paddle/v2/dataset/tests/imdb_test.py new file mode 100644 index 0000000000000000000000000000000000000000..e887af16634d2db04b8cf5fa0269a69991d8baac --- /dev/null +++ b/python/paddle/v2/dataset/tests/imdb_test.py @@ -0,0 +1,43 @@ +import paddle.v2.dataset.imdb +import unittest +import re + +TRAIN_POS_PATTERN = re.compile("aclImdb/train/pos/.*\.txt$") +TRAIN_NEG_PATTERN = re.compile("aclImdb/train/neg/.*\.txt$") +TRAIN_PATTERN = re.compile("aclImdb/train/.*\.txt$") + +TEST_POS_PATTERN = re.compile("aclImdb/test/pos/.*\.txt$") +TEST_NEG_PATTERN = re.compile("aclImdb/test/neg/.*\.txt$") +TEST_PATTERN = re.compile("aclImdb/test/.*\.txt$") + + +class TestIMDB(unittest.TestCase): + word_idx = None + + def test_build_dict(self): + if self.word_idx == None: + self.word_idx = paddle.v2.dataset.imdb.build_dict(TRAIN_PATTERN, + 150) + + self.assertEqual(len(self.word_idx), 7036) + + def check_dataset(self, dataset, expected_size): + if self.word_idx == None: + self.word_idx = paddle.v2.dataset.imdb.build_dict(TRAIN_PATTERN, + 150) + + sum = 0 + for l in dataset(self.word_idx): + self.assertEqual(l[1], sum % 2) + sum += 1 + self.assertEqual(sum, expected_size) + + def test_train(self): + self.check_dataset(paddle.v2.dataset.imdb.train, 25000) + + def test_test(self): + self.check_dataset(paddle.v2.dataset.imdb.test, 25000) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/v2/dataset/tests/imikolov_test.py b/python/paddle/v2/dataset/tests/imikolov_test.py new file mode 100644 index 0000000000000000000000000000000000000000..9b1748eaaa7f913a6b94f2087a8089fb998570aa --- /dev/null +++ b/python/paddle/v2/dataset/tests/imikolov_test.py @@ -0,0 +1,20 @@ +import paddle.v2.dataset.imikolov +import unittest + + +class TestMikolov(unittest.TestCase): + def check_reader(self, reader, n): + for l in reader(): + self.assertEqual(len(l), n) + + def test_train(self): + n = 5 + self.check_reader(paddle.v2.dataset.imikolov.train(n), n) + + def test_test(self): + n = 5 + self.check_reader(paddle.v2.dataset.imikolov.test(n), n) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/v2/tests/test_data_feeder.py b/python/paddle/v2/tests/test_data_feeder.py index 5f67da6a5b32d74228d727d94ec79b9f7a06dab7..ab2bc5df76cd839b5b0184e9559f0c2e03baf38b 100644 --- a/python/paddle/v2/tests/test_data_feeder.py +++ b/python/paddle/v2/tests/test_data_feeder.py @@ -235,4 +235,8 @@ class DataFeederTest(unittest.TestCase): if __name__ == '__main__': api.initPaddle("--use_gpu=0") - unittest.main() + suite = unittest.TestLoader().loadTestsFromTestCase(DataFeederTest) + unittest.TextTestRunner().run(suite) + if api.isGpuVersion(): + api.setUseGpu(True) + unittest.main()