提交 f7a06f17 编写于 作者: Y Yu Yang

Merge branch 'develop' of github.com:baidu/Paddle into feature/recommendation_v2_api

......@@ -6,7 +6,7 @@
在本文中,我们将阐释如何在集群上运行分布式 Paddle 训练作业。我们将以[推荐系统](https://github.com/baidu/Paddle/tree/develop/demo/recommendation)为例创建分布式的单进程训练。
在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/k8s) )的用户参考。
在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s) )的用户参考。
## 前提条件
......
......@@ -2,7 +2,7 @@
In this article, we explain how to run distributed Paddle training jobs on clusters. We will create the distributed version of the single-process training example, [recommendation](https://github.com/baidu/Paddle/tree/develop/demo/recommendation).
[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/k8s).
[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s).
## Prerequisite
......
......@@ -47,6 +47,9 @@ void setUseGpu(bool useGpu);
/// Return true if this py_paddle is compiled in GPU Version
bool isGpuVersion();
/// Return FLAGS_trainer_count
int getTrainerCount();
/// The Error of IO Operation. Such as file not found, etc.
class IOError {};
......
......@@ -54,5 +54,7 @@ bool isGpuVersion() {
#endif
}
int getTrainerCount() { return FLAGS_trainer_count; }
static_assert(NUM_PARAMETER_TYPES == paddle::NUM_PARAMETER_TYPES,
"The Parameter Type should be same in core/api and core/common");
......@@ -92,7 +92,6 @@ void CosSimForward<DEVICE_TYPE_GPU>(GpuMatrix& out_mat,
CHECK(in1_mat.useGpu_ == true && in2_mat.useGpu_ == true)
<< "Matrix type are not GPU";
size_t num_samples = out_mat.getHeight();
size_t dim = in1_mat.getWidth();
real* out = out_mat.getData();
const real* x = in1_mat.getData();
......
......@@ -26,6 +26,15 @@ class IScanner(object):
if not isinstance(self.input_type, dp2.InputType):
raise ValueError("input type should be dataprovider2.InputType")
self.pos = pos
# data_in_gpu is used to indicate whether to create argument on GPU
# or not in GPU mode. Now if using one thread (trainer_count=1),
# trainer uses NeuralNetwork which needs to create argument on GPU
# before calling forward function. So, set data_in_gpu to True.
# Otherwise, trainer uses MultiGradientMachine which will transfer
# data from CPU to GPU in the forward function, set data_in_gpu to
# False in this case.
self.data_in_gpu = swig_paddle.isUsingGpu(
) and swig_paddle.getTrainerCount() == 1
def scan(self, dat):
pass
......@@ -53,7 +62,8 @@ class DenseScanner(IScanner):
assert isinstance(argument, swig_paddle.Arguments)
if self.__mat__.dtype != numpy.float32:
self.__mat__ = self.__mat__.astype(numpy.float32)
m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True, False)
m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True,
self.data_in_gpu)
argument.setSlotValue(self.pos, m)
......@@ -75,10 +85,13 @@ class SparseBinaryScanner(IScanner):
def finish_scan(self, argument):
assert isinstance(argument, swig_paddle.Arguments)
m = swig_paddle.Matrix.createSparse(self.__height__,
m = swig_paddle.Matrix.createSparse(
self.__height__,
self.input_type.dim,
len(self.__cols__),
len(self.__value__) == 0)
len(self.__value__) == 0,
False, # trans
False) # TODO supoort GPU
assert isinstance(m, swig_paddle.Matrix)
m.sparseCopyFrom(self.__rows__, self.__cols__, self.__value__)
argument.setSlotValue(self.pos, m)
......@@ -102,7 +115,7 @@ class IndexScanner(IScanner):
self.__ids__.append(dat)
def finish_scan(self, argument):
ids = swig_paddle.IVector.create(self.__ids__)
ids = swig_paddle.IVector.create(self.__ids__, self.data_in_gpu)
assert isinstance(argument, swig_paddle.Arguments)
argument.setSlotIds(self.pos, ids)
......
......@@ -32,3 +32,10 @@ def download(url, module_name, md5sum):
shutil.copyfileobj(r.raw, f)
return filename
def dict_add(a_dict, ele):
if ele in a_dict:
a_dict[ele] += 1
else:
a_dict[ele] = 1
# /usr/bin/env python
# -*- coding:utf-8 -*-
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
IMDB dataset: http://ai.stanford.edu/%7Eamaas/data/sentiment/aclImdb_v1.tar.gz
"""
import paddle.v2.dataset.common
import tarfile
import Queue
import re
import string
import threading
__all__ = ['build_dict', 'train', 'test']
URL = 'http://ai.stanford.edu/%7Eamaas/data/sentiment/aclImdb_v1.tar.gz'
MD5 = '7c2ac02c03563afcf9b574c7e56c153a'
# Read files that match pattern. Tokenize and yield each file.
def tokenize(pattern):
with tarfile.open(paddle.v2.dataset.common.download(URL, 'imdb',
MD5)) as tarf:
# Note that we should use tarfile.next(), which does
# sequential access of member files, other than
# tarfile.extractfile, which does random access and might
# destroy hard disks.
tf = tarf.next()
while tf != None:
if bool(pattern.match(tf.name)):
# newline and punctuations removal and ad-hoc tokenization.
yield tarf.extractfile(tf).read().rstrip("\n\r").translate(
None, string.punctuation).lower().split()
tf = tarf.next()
def build_dict(pattern, cutoff):
word_freq = {}
for doc in tokenize(pattern):
for word in doc:
paddle.v2.dataset.common.dict_add(word_freq, word)
# Not sure if we should prune less-frequent words here.
word_freq = filter(lambda x: x[1] > cutoff, word_freq.items())
dictionary = sorted(word_freq, key=lambda x: (-x[1], x[0]))
words, _ = list(zip(*dictionary))
word_idx = dict(zip(words, xrange(len(words))))
word_idx['<unk>'] = len(words)
return word_idx
def reader_creator(pos_pattern, neg_pattern, word_idx, buffer_size):
UNK = word_idx['<unk>']
qs = [Queue.Queue(maxsize=buffer_size), Queue.Queue(maxsize=buffer_size)]
def load(pattern, queue):
for doc in tokenize(pattern):
queue.put(doc)
queue.put(None)
def reader():
# Creates two threads that loads positive and negative samples
# into qs.
t0 = threading.Thread(
target=load, args=(
pos_pattern,
qs[0], ))
t0.daemon = True
t0.start()
t1 = threading.Thread(
target=load, args=(
neg_pattern,
qs[1], ))
t1.daemon = True
t1.start()
# Read alternatively from qs[0] and qs[1].
i = 0
doc = qs[i].get()
while doc != None:
yield [word_idx.get(w, UNK) for w in doc], i % 2
i += 1
doc = qs[i % 2].get()
# If any queue is empty, reads from the other queue.
i += 1
doc = qs[i % 2].get()
while doc != None:
yield [word_idx.get(w, UNK) for w in doc], i % 2
doc = qs[i % 2].get()
return reader()
def train(word_idx):
return reader_creator(
re.compile("aclImdb/train/pos/.*\.txt$"),
re.compile("aclImdb/train/neg/.*\.txt$"), word_idx, 1000)
def test(word_idx):
return reader_creator(
re.compile("aclImdb/test/pos/.*\.txt$"),
re.compile("aclImdb/test/neg/.*\.txt$"), word_idx, 1000)
"""
imikolov's simple dataset: http://www.fit.vutbr.cz/~imikolov/rnnlm/
"""
import paddle.v2.dataset.common
import tarfile
__all__ = ['train', 'test']
URL = 'http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz'
MD5 = '30177ea32e27c525793142b6bf2c8e2d'
def word_count(f, word_freq=None):
add = paddle.v2.dataset.common.dict_add
if word_freq == None:
word_freq = {}
for l in f:
for w in l.strip().split():
add(word_freq, w)
add(word_freq, '<s>')
add(word_freq, '<e>')
return word_freq
def build_dict(train_filename, test_filename):
with tarfile.open(
paddle.v2.dataset.common.download(
paddle.v2.dataset.imikolov.URL, 'imikolov',
paddle.v2.dataset.imikolov.MD5)) as tf:
trainf = tf.extractfile(train_filename)
testf = tf.extractfile(test_filename)
word_freq = word_count(testf, word_count(trainf))
TYPO_FREQ = 50
word_freq = filter(lambda x: x[1] > TYPO_FREQ, word_freq.items())
dictionary = sorted(word_freq, key=lambda x: (-x[1], x[0]))
words, _ = list(zip(*dictionary))
word_idx = dict(zip(words, xrange(len(words))))
word_idx['<unk>'] = len(words)
return word_idx
word_idx = {}
def reader_creator(filename, n):
global word_idx
if len(word_idx) == 0:
word_idx = build_dict('./simple-examples/data/ptb.train.txt',
'./simple-examples/data/ptb.valid.txt')
def reader():
with tarfile.open(
paddle.v2.dataset.common.download(
paddle.v2.dataset.imikolov.URL, 'imikolov',
paddle.v2.dataset.imikolov.MD5)) as tf:
f = tf.extractfile(filename)
UNK = word_idx['<unk>']
for l in f:
l = ['<s>'] + l.strip().split() + ['<e>']
if len(l) >= n:
l = [word_idx.get(w, UNK) for w in l]
for i in range(n, len(l) + 1):
yield tuple(l[i - n:i])
return reader
def train(n):
return reader_creator('./simple-examples/data/ptb.train.txt', n)
def test(n):
return reader_creator('./simple-examples/data/ptb.valid.txt', n)
import paddle.v2.dataset.imdb
import unittest
import re
TRAIN_POS_PATTERN = re.compile("aclImdb/train/pos/.*\.txt$")
TRAIN_NEG_PATTERN = re.compile("aclImdb/train/neg/.*\.txt$")
TRAIN_PATTERN = re.compile("aclImdb/train/.*\.txt$")
TEST_POS_PATTERN = re.compile("aclImdb/test/pos/.*\.txt$")
TEST_NEG_PATTERN = re.compile("aclImdb/test/neg/.*\.txt$")
TEST_PATTERN = re.compile("aclImdb/test/.*\.txt$")
class TestIMDB(unittest.TestCase):
word_idx = None
def test_build_dict(self):
if self.word_idx == None:
self.word_idx = paddle.v2.dataset.imdb.build_dict(TRAIN_PATTERN,
150)
self.assertEqual(len(self.word_idx), 7036)
def check_dataset(self, dataset, expected_size):
if self.word_idx == None:
self.word_idx = paddle.v2.dataset.imdb.build_dict(TRAIN_PATTERN,
150)
sum = 0
for l in dataset(self.word_idx):
self.assertEqual(l[1], sum % 2)
sum += 1
self.assertEqual(sum, expected_size)
def test_train(self):
self.check_dataset(paddle.v2.dataset.imdb.train, 25000)
def test_test(self):
self.check_dataset(paddle.v2.dataset.imdb.test, 25000)
if __name__ == '__main__':
unittest.main()
import paddle.v2.dataset.imikolov
import unittest
class TestMikolov(unittest.TestCase):
def check_reader(self, reader, n):
for l in reader():
self.assertEqual(len(l), n)
def test_train(self):
n = 5
self.check_reader(paddle.v2.dataset.imikolov.train(n), n)
def test_test(self):
n = 5
self.check_reader(paddle.v2.dataset.imikolov.test(n), n)
if __name__ == '__main__':
unittest.main()
......@@ -235,4 +235,8 @@ class DataFeederTest(unittest.TestCase):
if __name__ == '__main__':
api.initPaddle("--use_gpu=0")
suite = unittest.TestLoader().loadTestsFromTestCase(DataFeederTest)
unittest.TextTestRunner().run(suite)
if api.isGpuVersion():
api.setUseGpu(True)
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册