提交 d447f062 编写于 作者: D dangqingqing

update

......@@ -20,26 +20,29 @@ def main():
adam_optimizer = paddle.optimizer.Adam(learning_rate=0.01)
trainer = paddle.trainer.SGD(cost=cost,
parameters=parameters,
update_equation=adam_optimizer)
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 100 == 0:
print "Pass %d, Batch %d, Cost %f, %s" % (
event.pass_id, event.batch_id, event.cost, event.metrics)
if event.batch_id % 1000 == 0:
result = trainer.test(reader=paddle.reader.batched(
paddle.dataset.mnist.test(), batch_size=256))
print "Pass %d, Batch %d, Cost %f, %s, Testing metrics %s" % (
event.pass_id, event.batch_id, event.cost, event.metrics,
result.metrics)
else:
pass
trainer = paddle.trainer.SGD(update_equation=adam_optimizer)
trainer.train(
reader=paddle.reader.batched(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=8192),
batch_size=32),
cost=cost,
parameters=parameters,
event_handler=event_handler,
reader_dict={images.name: 0,
label.name: 1})
event_handler=event_handler)
if __name__ == '__main__':
......
......@@ -6,7 +6,7 @@
在本文中,我们将阐释如何在集群上运行分布式 Paddle 训练作业。我们将以[推荐系统](https://github.com/baidu/Paddle/tree/develop/demo/recommendation)为例创建分布式的单进程训练。
在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/k8s) )的用户参考。
在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s) )的用户参考。
## 前提条件
......
......@@ -2,7 +2,7 @@
In this article, we explain how to run distributed Paddle training jobs on clusters. We will create the distributed version of the single-process training example, [recommendation](https://github.com/baidu/Paddle/tree/develop/demo/recommendation).
[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/k8s).
[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s).
## Prerequisite
......
......@@ -32,3 +32,10 @@ def download(url, module_name, md5sum):
shutil.copyfileobj(r.raw, f)
return filename
def dict_add(a_dict, ele):
if ele in a_dict:
a_dict[ele] += 1
else:
a_dict[ele] = 1
# /usr/bin/env python
# -*- coding:utf-8 -*-
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
IMDB dataset: http://ai.stanford.edu/%7Eamaas/data/sentiment/aclImdb_v1.tar.gz
"""
import paddle.v2.dataset.common
import tarfile
import Queue
import re
import string
import threading
__all__ = ['build_dict', 'train', 'test']
URL = 'http://ai.stanford.edu/%7Eamaas/data/sentiment/aclImdb_v1.tar.gz'
MD5 = '7c2ac02c03563afcf9b574c7e56c153a'
# Read files that match pattern. Tokenize and yield each file.
def tokenize(pattern):
with tarfile.open(paddle.v2.dataset.common.download(URL, 'imdb',
MD5)) as tarf:
# Note that we should use tarfile.next(), which does
# sequential access of member files, other than
# tarfile.extractfile, which does random access and might
# destroy hard disks.
tf = tarf.next()
while tf != None:
if bool(pattern.match(tf.name)):
# newline and punctuations removal and ad-hoc tokenization.
yield tarf.extractfile(tf).read().rstrip("\n\r").translate(
None, string.punctuation).lower().split()
tf = tarf.next()
def build_dict(pattern, cutoff):
word_freq = {}
for doc in tokenize(pattern):
for word in doc:
paddle.v2.dataset.common.dict_add(word_freq, word)
# Not sure if we should prune less-frequent words here.
word_freq = filter(lambda x: x[1] > cutoff, word_freq.items())
dictionary = sorted(word_freq, key=lambda x: (-x[1], x[0]))
words, _ = list(zip(*dictionary))
word_idx = dict(zip(words, xrange(len(words))))
word_idx['<unk>'] = len(words)
return word_idx
def reader_creator(pos_pattern, neg_pattern, word_idx, buffer_size):
UNK = word_idx['<unk>']
qs = [Queue.Queue(maxsize=buffer_size), Queue.Queue(maxsize=buffer_size)]
def load(pattern, queue):
for doc in tokenize(pattern):
queue.put(doc)
queue.put(None)
def reader():
# Creates two threads that loads positive and negative samples
# into qs.
t0 = threading.Thread(
target=load, args=(
pos_pattern,
qs[0], ))
t0.daemon = True
t0.start()
t1 = threading.Thread(
target=load, args=(
neg_pattern,
qs[1], ))
t1.daemon = True
t1.start()
# Read alternatively from qs[0] and qs[1].
i = 0
doc = qs[i].get()
while doc != None:
yield [word_idx.get(w, UNK) for w in doc], i % 2
i += 1
doc = qs[i % 2].get()
# If any queue is empty, reads from the other queue.
i += 1
doc = qs[i % 2].get()
while doc != None:
yield [word_idx.get(w, UNK) for w in doc], i % 2
doc = qs[i % 2].get()
return reader()
def train(word_idx):
return reader_creator(
re.compile("aclImdb/train/pos/.*\.txt$"),
re.compile("aclImdb/train/neg/.*\.txt$"), word_idx, 1000)
def test(word_idx):
return reader_creator(
re.compile("aclImdb/test/pos/.*\.txt$"),
re.compile("aclImdb/test/neg/.*\.txt$"), word_idx, 1000)
"""
imikolov's simple dataset: http://www.fit.vutbr.cz/~imikolov/rnnlm/
"""
import paddle.v2.dataset.common
import tarfile
__all__ = ['train', 'test']
URL = 'http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz'
MD5 = '30177ea32e27c525793142b6bf2c8e2d'
def word_count(f, word_freq=None):
add = paddle.v2.dataset.common.dict_add
if word_freq == None:
word_freq = {}
for l in f:
for w in l.strip().split():
add(word_freq, w)
add(word_freq, '<s>')
add(word_freq, '<e>')
return word_freq
def build_dict(train_filename, test_filename):
with tarfile.open(
paddle.v2.dataset.common.download(
paddle.v2.dataset.imikolov.URL, 'imikolov',
paddle.v2.dataset.imikolov.MD5)) as tf:
trainf = tf.extractfile(train_filename)
testf = tf.extractfile(test_filename)
word_freq = word_count(testf, word_count(trainf))
TYPO_FREQ = 50
word_freq = filter(lambda x: x[1] > TYPO_FREQ, word_freq.items())
dictionary = sorted(word_freq, key=lambda x: (-x[1], x[0]))
words, _ = list(zip(*dictionary))
word_idx = dict(zip(words, xrange(len(words))))
word_idx['<unk>'] = len(words)
return word_idx
word_idx = {}
def reader_creator(filename, n):
global word_idx
if len(word_idx) == 0:
word_idx = build_dict('./simple-examples/data/ptb.train.txt',
'./simple-examples/data/ptb.valid.txt')
def reader():
with tarfile.open(
paddle.v2.dataset.common.download(
paddle.v2.dataset.imikolov.URL, 'imikolov',
paddle.v2.dataset.imikolov.MD5)) as tf:
f = tf.extractfile(filename)
UNK = word_idx['<unk>']
for l in f:
l = ['<s>'] + l.strip().split() + ['<e>']
if len(l) >= n:
l = [word_idx.get(w, UNK) for w in l]
for i in range(n, len(l) + 1):
yield tuple(l[i - n:i])
return reader
def train(n):
return reader_creator('./simple-examples/data/ptb.train.txt', n)
def test(n):
return reader_creator('./simple-examples/data/ptb.valid.txt', n)
......@@ -9,9 +9,9 @@ __all__ = ['train', 'test']
URL_PREFIX = 'http://yann.lecun.com/exdb/mnist/'
TEST_IMAGE_URL = URL_PREFIX + 't10k-images-idx3-ubyte.gz'
TEST_IMAGE_MD5 = '25e3cc63507ef6e98d5dc541e8672bb6'
TEST_IMAGE_MD5 = '9fb629c4189551a2d022fa330f9573f3'
TEST_LABEL_URL = URL_PREFIX + 't10k-labels-idx1-ubyte.gz'
TEST_LABEL_MD5 = '4e9511fe019b2189026bd0421ba7b688'
TEST_LABEL_MD5 = 'ec29112dd5afa0611ce80d1b7f02629c'
TRAIN_IMAGE_URL = URL_PREFIX + 'train-images-idx3-ubyte.gz'
TRAIN_IMAGE_MD5 = 'f68b3c2dcbeaaa9fbdd348bbdeb94873'
TRAIN_LABEL_URL = URL_PREFIX + 'train-labels-idx1-ubyte.gz'
......
import paddle.v2.dataset.imdb
import unittest
import re
TRAIN_POS_PATTERN = re.compile("aclImdb/train/pos/.*\.txt$")
TRAIN_NEG_PATTERN = re.compile("aclImdb/train/neg/.*\.txt$")
TRAIN_PATTERN = re.compile("aclImdb/train/.*\.txt$")
TEST_POS_PATTERN = re.compile("aclImdb/test/pos/.*\.txt$")
TEST_NEG_PATTERN = re.compile("aclImdb/test/neg/.*\.txt$")
TEST_PATTERN = re.compile("aclImdb/test/.*\.txt$")
class TestIMDB(unittest.TestCase):
word_idx = None
def test_build_dict(self):
if self.word_idx == None:
self.word_idx = paddle.v2.dataset.imdb.build_dict(TRAIN_PATTERN,
150)
self.assertEqual(len(self.word_idx), 7036)
def check_dataset(self, dataset, expected_size):
if self.word_idx == None:
self.word_idx = paddle.v2.dataset.imdb.build_dict(TRAIN_PATTERN,
150)
sum = 0
for l in dataset(self.word_idx):
self.assertEqual(l[1], sum % 2)
sum += 1
self.assertEqual(sum, expected_size)
def test_train(self):
self.check_dataset(paddle.v2.dataset.imdb.train, 25000)
def test_test(self):
self.check_dataset(paddle.v2.dataset.imdb.test, 25000)
if __name__ == '__main__':
unittest.main()
import paddle.v2.dataset.imikolov
import unittest
class TestMikolov(unittest.TestCase):
def check_reader(self, reader, n):
for l in reader():
self.assertEqual(len(l), n)
def test_train(self):
n = 5
self.check_reader(paddle.v2.dataset.imikolov.train(n), n)
def test_test(self):
n = 5
self.check_reader(paddle.v2.dataset.imikolov.test(n), n)
if __name__ == '__main__':
unittest.main()
......@@ -11,7 +11,10 @@ There are:
TODO(yuyang18): Complete it!
"""
import py_paddle.swig_paddle as api
__all__ = ['EndIteration', 'BeginIteration', 'BeginPass', 'EndPass']
__all__ = [
'EndIteration', 'BeginIteration', 'BeginPass', 'EndPass', 'TestResult'
]
class WithMetric(object):
......@@ -30,6 +33,11 @@ class WithMetric(object):
return retv
class TestResult(WithMetric):
def __init__(self, evaluator):
super(TestResult, self).__init__(evaluator)
class BeginPass(object):
"""
Event On One Pass Training Start.
......
......@@ -21,6 +21,28 @@ import layer as v2_layer
__all__ = ['Topology']
def __flatten__(lis):
"""
Given a list, possibly nested to any level, return it flattened.
"""
new_lis = []
for item in lis:
if isinstance(item, collections.Sequence):
new_lis.extend(__flatten__(item))
else:
new_lis.append(item)
return new_lis
def __bfs_travel__(callback, *layers):
layers = __flatten__(layers)
for each_layer in layers:
__break__ = callback(each_layer)
if __break__:
return
__bfs_travel__(callback, *each_layer.__parent_layers__.values())
class Topology(object):
"""
Topology is used to store the information about all layers
......@@ -46,21 +68,17 @@ class Topology(object):
:param name:
:return:
"""
result_layer = []
def find_layer_by_name(layer, layer_name):
if len(result_layer) == 1:
return
elif layer.name == layer_name:
result_layer.append(layer)
else:
for parent_layer in layer.__parent_layers__.values():
find_layer_by_name(parent_layer, layer_name)
result_layer = [None]
for layer in self.layers:
find_layer_by_name(layer, name)
def __impl__(l):
if l.name == name:
result_layer[0] = l
return True # break
return False
assert len(result_layer) == 1
__bfs_travel__(__impl__, *self.layers)
if result_layer[0] is None:
raise ValueError("No such layer %s" % name)
return result_layer[0]
def data_layers(self):
......@@ -70,18 +88,11 @@ class Topology(object):
"""
data_layers = dict()
def find_data_layer(layer):
if isinstance(layer, v2_layer.DataLayerV2):
data_layers[layer.name] = layer
if not isinstance(layer, collections.Sequence):
layer = [layer]
for each_l in layer:
for parent_layer in each_l.__parent_layers__.values():
find_data_layer(parent_layer)
for layer in self.layers:
find_data_layer(layer)
def __impl__(l):
if isinstance(l, v2_layer.DataLayerV2):
data_layers[l.name] = l
__bfs_travel__(__impl__, *self.layers)
return data_layers
def data_type(self):
......@@ -89,12 +100,9 @@ class Topology(object):
get data_type from proto, such as:
[('image', dense_vector(768)), ('label', integer_value(10))]
"""
data_types_lists = []
data_layers = self.data_layers()
for each in self.__model_config__.input_layer_names:
data_types_lists.append((each, data_layers[each].type))
return data_types_lists
return [(nm, data_layers[nm].type)
for nm in self.proto().input_layer_names]
def __check_layer_type__(layer):
......
......@@ -42,25 +42,35 @@ class ITrainer(object):
class SGD(ITrainer):
def __init__(self, update_equation):
def __init__(self, cost, parameters, update_equation):
"""
Simple SGD Trainer.
:param update_equation: The optimizer object.
:type update_equation: v2_optimizer.Optimizer
"""
if not isinstance(parameters, v2_parameters.Parameters):
raise TypeError('parameters should be parameters')
if not isinstance(update_equation, v2_optimizer.Optimizer):
raise ValueError("update equation parameter must be "
"paddle.v2.optimizer.Optimizer")
raise TypeError("update equation parameter must be "
"paddle.v2.optimizer.Optimizer")
topology = Topology(cost)
self.__optimizer__ = update_equation
self.__topology__ = topology
self.__parameters__ = parameters
self.__topology_in_proto__ = topology.proto()
self.__data_types__ = topology.data_type()
gm = api.GradientMachine.createFromConfigProto(
self.__topology_in_proto__, api.CREATE_MODE_NORMAL,
self.__optimizer__.enable_types())
assert isinstance(gm, api.GradientMachine)
parameters.append_gradient_machine(gm)
self.__gradient_machine__ = gm
self.__gradient_machine__.randParameters()
def train(self,
reader,
cost,
parameters,
num_passes=1,
event_handler=None,
reader_dict=None):
def train(self, reader, num_passes=1, event_handler=None, reader_dict=None):
"""
Training method. Will train num_passes of input data.
......@@ -76,27 +86,22 @@ class SGD(ITrainer):
if event_handler is None:
event_handler = default_event_handler
topology = Topology(cost)
if reader_dict is None:
reader_dict = self.default_reader_dict()
__check_train_args__(**locals())
gm = api.GradientMachine.createFromConfigProto(
topology.proto(), api.CREATE_MODE_NORMAL,
self.__optimizer__.enable_types())
assert isinstance(gm, api.GradientMachine)
parameters.append_gradient_machine(gm)
gm.randParameters()
updater = self.__optimizer__.create_local_updater()
updater.init(gm)
updater.init(self.__gradient_machine__)
gm.start()
batch_evaluator = gm.makeEvaluator()
self.__gradient_machine__.start()
batch_evaluator = self.__gradient_machine__.makeEvaluator()
assert isinstance(batch_evaluator, api.Evaluator)
pass_evaluator = gm.makeEvaluator()
pass_evaluator = self.__gradient_machine__.makeEvaluator()
assert isinstance(pass_evaluator, api.Evaluator)
out_args = api.Arguments.createArguments(0)
feeder = DataFeeder(topology.data_type(), reader_dict)
feeder = DataFeeder(self.__data_types__, reader_dict)
for pass_id in xrange(num_passes):
event_handler(v2_event.BeginPass(pass_id))
......@@ -104,16 +109,18 @@ class SGD(ITrainer):
updater.startPass()
for batch_id, data_batch in enumerate(reader()):
pass_type = updater.startBatch(len(data_batch))
gm.forwardBackward(feeder(data_batch), out_args, pass_type)
self.__gradient_machine__.forwardBackward(
feeder(data_batch), out_args, pass_type)
batch_evaluator.start()
event_handler(
v2_event.BeginIteration(
pass_id=pass_id, batch_id=batch_id))
pass_type = updater.startBatch(len(data_batch))
gm.forwardBackward(feeder(data_batch), out_args, pass_type)
gm.eval(pass_evaluator)
gm.eval(batch_evaluator)
for each_param in gm.getParameters():
self.__gradient_machine__.forwardBackward(
feeder(data_batch), out_args, pass_type)
self.__gradient_machine__.eval(pass_evaluator)
self.__gradient_machine__.eval(batch_evaluator)
for each_param in self.__gradient_machine__.getParameters():
updater.update(each_param)
# Get cost. We use numpy to calculate total cost for this batch.
cost_vec = out_args.getSlotValue(0)
......@@ -131,22 +138,37 @@ class SGD(ITrainer):
updater.finishPass()
pass_evaluator.finish()
event_handler(v2_event.EndPass(pass_id, evaluator=pass_evaluator))
gm.finish()
self.__gradient_machine__.finish()
def default_reader_dict(self):
reader_dict = dict()
for i, tp in enumerate(self.__data_types__):
reader_dict[tp[0]] = i
return reader_dict
def test(self, reader, reader_dict=None):
if reader_dict is None:
reader_dict = self.default_reader_dict()
feeder = DataFeeder(self.__data_types__, reader_dict)
evaluator = self.__gradient_machine__.makeEvaluator()
out_args = api.Arguments.createArguments(0)
evaluator.start()
for data_batch in reader():
self.__gradient_machine__.forward(
feeder(data_batch), out_args, api.PASS_TEST)
self.__gradient_machine__.eval(evaluator)
evaluator.finish()
return v2_event.TestResult(evaluator=evaluator)
def __check_train_args__(reader, topology, parameters, event_handler, **kwargs):
def __check_train_args__(reader, event_handler, **kwargs):
"""
Check train function's argument types
"""
if not callable(reader) or not isinstance(reader(), collections.Iterator):
raise TypeError('train_data_reader should be a function, '
'which can return a iterator')
if not isinstance(topology, Topology):
raise TypeError('topology should be a model config')
if not isinstance(parameters, v2_parameters.Parameters):
raise TypeError('parameters should be a parameter pool')
if not callable(event_handler):
raise TypeError('event handler should be a function')
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册