提交 8efdc772 编写于 作者: Y Yu Yang

Merge branch 'develop' of github.com:baidu/Paddle into add_comments_to_v2_module

......@@ -66,7 +66,7 @@ def main():
sys.stdout.flush()
if isinstance(event, paddle.event.EndPass):
result = trainer.test(
reader=paddle.reader.batched(
reader=paddle.batch(
paddle.dataset.cifar.test10(), batch_size=128),
reader_dict={'image': 0,
'label': 1})
......@@ -77,7 +77,7 @@ def main():
parameters=parameters,
update_equation=momentum_optimizer)
trainer.train(
reader=paddle.reader.batched(
reader=paddle.batch(
paddle.reader.shuffle(
paddle.dataset.cifar.train10(), buf_size=50000),
batch_size=128),
......
......@@ -98,7 +98,7 @@ def main():
result.metrics['classification_error_evaluator']))
trainer.train(
reader=paddle.reader.batched(
reader=paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=8192),
batch_size=128),
......@@ -115,7 +115,7 @@ def main():
probs = paddle.infer(
output=predict,
parameters=parameters,
reader=paddle.reader.batched(
reader=paddle.batch(
paddle.reader.firstn(
paddle.reader.map_readers(lambda item: (item[0], ),
paddle.dataset.mnist.test()),
......
......@@ -72,31 +72,35 @@ def main():
# define network topology
cost = seqToseq_net_v2(source_dict_dim, target_dict_dim)
parameters = paddle.parameters.create(cost)
optimizer = paddle.optimizer.Adam(learning_rate=1e-4)
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 10 == 0:
print "Pass %d, Batch %d, Cost %f, %s" % (
event.pass_id, event.batch_id, event.cost, event.metrics)
# define optimize method and trainer
optimizer = paddle.optimizer.Adam(learning_rate=1e-4)
trainer = paddle.trainer.SGD(cost=cost,
parameters=parameters,
update_equation=optimizer)
# define data reader
reader_dict = {
'source_language_word': 0,
'target_language_word': 1,
'target_language_next_word': 2
}
trn_reader = paddle.reader.batched(
wmt14_reader = paddle.reader.batched(
paddle.reader.shuffle(
train_reader("data/pre-wmt14/train/train"), buf_size=8192),
batch_size=5)
# define event_handler callback
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 10 == 0:
print "Pass %d, Batch %d, Cost %f, %s" % (
event.pass_id, event.batch_id, event.cost, event.metrics)
# start to train
trainer.train(
reader=trn_reader,
reader=wmt14_reader,
event_handler=event_handler,
num_passes=10000,
reader_dict=reader_dict)
......
import paddle.v2.activation as activation
import paddle.v2.attr as attr
import paddle.v2.data_type as data_type
import paddle.v2.layer as layer
import paddle.v2.networks as networks
import paddle.v2 as paddle
def seqToseq_net_v2(source_dict_dim, target_dict_dim):
......@@ -12,64 +8,70 @@ def seqToseq_net_v2(source_dict_dim, target_dict_dim):
encoder_size = 512 # dimension of hidden unit in GRU Encoder network
#### Encoder
src_word_id = layer.data(
src_word_id = paddle.layer.data(
name='source_language_word',
type=data_type.integer_value_sequence(source_dict_dim))
src_embedding = layer.embedding(
type=paddle.data_type.integer_value_sequence(source_dict_dim))
src_embedding = paddle.layer.embedding(
input=src_word_id,
size=word_vector_dim,
param_attr=attr.ParamAttr(name='_source_language_embedding'))
src_forward = networks.simple_gru(input=src_embedding, size=encoder_size)
src_backward = networks.simple_gru(
param_attr=paddle.attr.ParamAttr(name='_source_language_embedding'))
src_forward = paddle.networks.simple_gru(
input=src_embedding, size=encoder_size)
src_backward = paddle.networks.simple_gru(
input=src_embedding, size=encoder_size, reverse=True)
encoded_vector = layer.concat(input=[src_forward, src_backward])
encoded_vector = paddle.layer.concat(input=[src_forward, src_backward])
#### Decoder
with layer.mixed(size=decoder_size) as encoded_proj:
encoded_proj += layer.full_matrix_projection(input=encoded_vector)
with paddle.layer.mixed(size=decoder_size) as encoded_proj:
encoded_proj += paddle.layer.full_matrix_projection(
input=encoded_vector)
backward_first = layer.first_seq(input=src_backward)
backward_first = paddle.layer.first_seq(input=src_backward)
with layer.mixed(size=decoder_size, act=activation.Tanh()) as decoder_boot:
decoder_boot += layer.full_matrix_projection(input=backward_first)
with paddle.layer.mixed(
size=decoder_size, act=paddle.activation.Tanh()) as decoder_boot:
decoder_boot += paddle.layer.full_matrix_projection(
input=backward_first)
def gru_decoder_with_attention(enc_vec, enc_proj, current_word):
decoder_mem = layer.memory(
decoder_mem = paddle.layer.memory(
name='gru_decoder', size=decoder_size, boot_layer=decoder_boot)
context = networks.simple_attention(
context = paddle.networks.simple_attention(
encoded_sequence=enc_vec,
encoded_proj=enc_proj,
decoder_state=decoder_mem)
with layer.mixed(size=decoder_size * 3) as decoder_inputs:
decoder_inputs += layer.full_matrix_projection(input=context)
decoder_inputs += layer.full_matrix_projection(input=current_word)
with paddle.layer.mixed(size=decoder_size * 3) as decoder_inputs:
decoder_inputs += paddle.layer.full_matrix_projection(input=context)
decoder_inputs += paddle.layer.full_matrix_projection(
input=current_word)
gru_step = layer.gru_step(
gru_step = paddle.layer.gru_step(
name='gru_decoder',
input=decoder_inputs,
output_mem=decoder_mem,
size=decoder_size)
with layer.mixed(
size=target_dict_dim, bias_attr=True,
act=activation.Softmax()) as out:
out += layer.full_matrix_projection(input=gru_step)
with paddle.layer.mixed(
size=target_dict_dim,
bias_attr=True,
act=paddle.activation.Softmax()) as out:
out += paddle.layer.full_matrix_projection(input=gru_step)
return out
decoder_group_name = "decoder_group"
group_input1 = layer.StaticInputV2(input=encoded_vector, is_seq=True)
group_input2 = layer.StaticInputV2(input=encoded_proj, is_seq=True)
group_input1 = paddle.layer.StaticInputV2(input=encoded_vector, is_seq=True)
group_input2 = paddle.layer.StaticInputV2(input=encoded_proj, is_seq=True)
group_inputs = [group_input1, group_input2]
trg_embedding = layer.embedding(
input=layer.data(
trg_embedding = paddle.layer.embedding(
input=paddle.layer.data(
name='target_language_word',
type=data_type.integer_value_sequence(target_dict_dim)),
type=paddle.data_type.integer_value_sequence(target_dict_dim)),
size=word_vector_dim,
param_attr=attr.ParamAttr(name='_target_language_embedding'))
param_attr=paddle.attr.ParamAttr(name='_target_language_embedding'))
group_inputs.append(trg_embedding)
# For decoder equipped with attention mechanism, in training,
......@@ -77,14 +79,14 @@ def seqToseq_net_v2(source_dict_dim, target_dict_dim):
# while encoded source sequence is accessed to as an unbounded memory.
# Here, the StaticInput defines a read-only memory
# for the recurrent_group.
decoder = layer.recurrent_group(
decoder = paddle.layer.recurrent_group(
name=decoder_group_name,
step=gru_decoder_with_attention,
input=group_inputs)
lbl = layer.data(
lbl = paddle.layer.data(
name='target_language_next_word',
type=data_type.integer_value_sequence(target_dict_dim))
cost = layer.classification_cost(input=decoder, label=lbl)
type=paddle.data_type.integer_value_sequence(target_dict_dim))
cost = paddle.layer.classification_cost(input=decoder, label=lbl)
return cost
import math
import paddle.v2 as paddle
dictsize = 1953
embsize = 32
hiddensize = 256
N = 5
def wordemb(inlayer):
wordemb = paddle.layer.table_projection(
input=inlayer,
size=embsize,
param_attr=paddle.attr.Param(
name="_proj",
initial_std=0.001,
learning_rate=1,
l2_rate=0, ))
return wordemb
def main():
paddle.init(use_gpu=False, trainer_count=1)
word_dict = paddle.dataset.imikolov.build_dict()
dict_size = len(word_dict)
firstword = paddle.layer.data(
name="firstw", type=paddle.data_type.integer_value(dict_size))
secondword = paddle.layer.data(
name="secondw", type=paddle.data_type.integer_value(dict_size))
thirdword = paddle.layer.data(
name="thirdw", type=paddle.data_type.integer_value(dict_size))
fourthword = paddle.layer.data(
name="fourthw", type=paddle.data_type.integer_value(dict_size))
nextword = paddle.layer.data(
name="fifthw", type=paddle.data_type.integer_value(dict_size))
Efirst = wordemb(firstword)
Esecond = wordemb(secondword)
Ethird = wordemb(thirdword)
Efourth = wordemb(fourthword)
contextemb = paddle.layer.concat(input=[Efirst, Esecond, Ethird, Efourth])
hidden1 = paddle.layer.fc(input=contextemb,
size=hiddensize,
act=paddle.activation.Sigmoid(),
layer_attr=paddle.attr.Extra(drop_rate=0.5),
bias_attr=paddle.attr.Param(learning_rate=2),
param_attr=paddle.attr.Param(
initial_std=1. / math.sqrt(embsize * 8),
learning_rate=1))
predictword = paddle.layer.fc(input=hidden1,
size=dict_size,
bias_attr=paddle.attr.Param(learning_rate=2),
act=paddle.activation.Softmax())
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 100 == 0:
result = trainer.test(
paddle.batch(
paddle.dataset.imikolov.test(word_dict, N), 32))
print "Pass %d, Batch %d, Cost %f, %s, Testing metrics %s" % (
event.pass_id, event.batch_id, event.cost, event.metrics,
result.metrics)
cost = paddle.layer.classification_cost(input=predictword, label=nextword)
parameters = paddle.parameters.create(cost)
adam_optimizer = paddle.optimizer.Adam(
learning_rate=3e-3,
regularization=paddle.optimizer.L2Regularization(8e-4))
trainer = paddle.trainer.SGD(cost, parameters, adam_optimizer)
trainer.train(
paddle.batch(paddle.dataset.imikolov.train(word_dict, N), 32),
num_passes=30,
event_handler=event_handler)
if __name__ == '__main__':
main()
......@@ -22,6 +22,13 @@ Reader
.. automodule:: paddle.v2.reader.creator
:members:
#########
minibatch
#########
.. automodule:: paddle.v2.minibatch
:members:
#######
Dataset
#######
......
......@@ -43,22 +43,55 @@ docker push [YOUR_REPO]/paddle:mypaddle
注意上述命令中`[YOUR_REPO]`表示读者所使用的Docker镜像仓库地址,读者需要替换成自己使用的仓库地址。下文使用`[YOUR_REPO]/paddle:mypaddle`这个地址来表示此步骤所构建出的镜像。
### 上传训练文件
### 准备训练数据
本文使用PaddlePaddle官方的[recommendation demo](http://www.paddlepaddle.org/doc/demo/index.html#recommendation)作为这次训练的内容,我们将训练文件与数据放在一个job name命名的目录中,上传到volume所在的共享存储(使用不同分布式存储会有不同的挂载方式,需要要先挂载这个目录,然后拷贝数据)。完成后volume中的文件内容大致如下:
这里我们通过在Kubernetes集群上启动一个Job来下载并切割数据,也可以通过修改[k8s_train](./src/k8s_train/README.md)的内容来定制image.
```bash
[root@paddle-kubernetes-node0 mfs]# tree -d
在启动Job之前,需要根据不同的分布式存储来绑定一个[persistentVolumeClaim](https://kubernetes.io/docs/user-guide/persistent-volumes/),生成的数据将会存储在这个volume下.
```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: paddle-data
spec:
template:
metadata:
name: pi
spec:
hostNetwork: true
containers:
- name: paddle-data
image: paddledev/paddle-tutorial:k8s_data
imagePullPolicy: Always
volumeMounts:
- mountPath: "/mnt"
name: nfs
env:
- name: OUT_DIR
value: /home/work/mfs/paddle-cluster-job
- name: SPLIT_COUNT
value: "3"
volumes:
- name: nfs
persistentVolumeClaim:
claimName: mfs
restartPolicy: Never
```
完成后volume中的文件内容大致如下:
```base
[root@paddle-kubernetes-node0 nfsdir]$ tree -d
.
└── paddle-cluster-job
├── data
│   ├── 0
│   │
│   ├── 1
│   │
│   └── 2
├── output
└── recommendation
`-- paddle-cluster-job
|-- 0
| `-- data
|-- 1
| `-- data
|-- 2
| `-- data
|-- output
|-- quick_start
```
目录中paddle-cluster-job是本次训练对应的job name,本次训练要求有3个PaddlePaddle节点,在paddle-cluster-job/data目录中存放切分好的数据,文件夹0,1,2分别代表3个节点的trainer_id。recommendation文件夹内存放训练文件,output文件夹存放训练结果与日志。
......@@ -118,15 +151,16 @@ spec:
`env`字段表示容器的环境变量,我们将`paddle`运行的一些参数通过这种方式传递到容器内。
`JOB_PATH`表示共享存储挂载的路径,`JOB_NAME`表示job名字,`TRAIN_CONFIG_DIR`表示本次训练文件所在目录,这三个变量组合就可以找到本次训练需要的文件路径。
`CONF_PADDLE_NIC`表示`paddle pserver`进程需要的`--nics`参数,即网卡名
`CONF_PADDLE_PORT`表示`paddle pserver``--port`参数,`CONF_PADDLE_PORTS_NUM`则表示稠密更新的端口数量,也就是`--ports_num`参数。
`CONF_PADDLE_PORTS_NUM_SPARSE`表示稀疏更新的端口数量,也就是`--ports_num_for_sparse`参数。
`CONF_PADDLE_GRADIENT_NUM`表示训练节点数量,即`--num_gradient_servers`参数
环境变量 | 说明
--- | ---
JOB_PATH | 共享存储挂在的路径
JOB_NAME | Job的名字
TRAIN_CONFIG_DIR | 本次训练文件所在目录,与JOB_PATH,JOB_NAME组合可以找到本次训练需要的文件路径
CONF_PADDLE_NIC | `paddle pserver`进程需要的`--nics`参数,即网卡名
CONF_PADDLE_PORT | `paddle paserver``--port`参数
CONF_PADDLE_PORTS_NUM | 稠密更新的端口数量,即`--ports_num`参数
CONF_PADDLE_PORTS_NUM_SPARSE | 稀疏更新的端口数量,即`--ports_num_for_sparse`参数
CONF_PADDLE_GRADIENT_NUM | 训练节点数量,即`--num_gradient_servers参数`
这些参数的具体描述,读者可以查看[这里](http://www.paddlepaddle.org/doc/ui/cmd_argument/detail_introduction.html#parameter-server-and-distributed-communication)
......
......@@ -28,6 +28,7 @@ import pooling
import inference
import networks
import py_paddle.swig_paddle as api
import minibatch
__all__ = [
'optimizer', 'layer', 'activation', 'parameters', 'init', 'trainer',
......@@ -45,3 +46,4 @@ def init(**kwargs):
infer = inference.infer
batch = minibatch.batch
......@@ -23,8 +23,9 @@ import movielens
import conll05
import uci_housing
import sentiment
import wmt14
__all__ = [
'mnist', 'imikolov', 'imdb', 'cifar', 'movielens', 'conll05', 'sentiment'
'uci_housing'
'uci_housing', 'wmt14'
]
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
wmt14 dataset
"""
import paddle.v2.dataset.common
import tarfile
import os.path
import itertools
__all__ = ['train', 'test', 'build_dict']
URL_DEV_TEST = 'http://www-lium.univ-lemans.fr/~schwenk/cslm_joint_paper/data/dev+test.tgz'
MD5_DEV_TEST = '7d7897317ddd8ba0ae5c5fa7248d3ff5'
URL_TRAIN = 'http://localhost:8000/train.tgz'
MD5_TRAIN = '72de99da2830ea5a3a2c4eb36092bbc7'
def word_count(f, word_freq=None):
add = paddle.v2.dataset.common.dict_add
if word_freq == None:
word_freq = {}
for l in f:
for w in l.strip().split():
add(word_freq, w)
add(word_freq, '<s>')
add(word_freq, '<e>')
return word_freq
def get_word_dix(word_freq):
TYPO_FREQ = 50
word_freq = filter(lambda x: x[1] > TYPO_FREQ, word_freq.items())
word_freq_sorted = sorted(word_freq, key=lambda x: (-x[1], x[0]))
words, _ = list(zip(*word_freq_sorted))
word_idx = dict(zip(words, xrange(len(words))))
word_idx['<unk>'] = len(words)
return word_idx
def get_word_freq(train, dev):
word_freq = word_count(train, word_count(dev))
if '<unk>' in word_freq:
# remove <unk> for now, since we will set it as last index
del word_freq['<unk>']
return word_freq
def build_dict():
base_dir = './wmt14-data'
train_en_filename = base_dir + '/train/train.en'
train_fr_filename = base_dir + '/train/train.fr'
dev_en_filename = base_dir + '/dev/ntst1213.en'
dev_fr_filename = base_dir + '/dev/ntst1213.fr'
if not os.path.exists(train_en_filename) or not os.path.exists(
train_fr_filename):
with tarfile.open(
paddle.v2.dataset.common.download(URL_TRAIN, 'wmt14',
MD5_TRAIN)) as tf:
tf.extractall(base_dir)
if not os.path.exists(dev_en_filename) or not os.path.exists(
dev_fr_filename):
with tarfile.open(
paddle.v2.dataset.common.download(URL_DEV_TEST, 'wmt14',
MD5_DEV_TEST)) as tf:
tf.extractall(base_dir)
f_en = open(train_en_filename)
f_fr = open(train_fr_filename)
f_en_dev = open(dev_en_filename)
f_fr_dev = open(dev_fr_filename)
word_freq_en = get_word_freq(f_en, f_en_dev)
word_freq_fr = get_word_freq(f_fr, f_fr_dev)
f_en.close()
f_fr.close()
f_en_dev.close()
f_fr_dev.close()
return get_word_dix(word_freq_en), get_word_dix(word_freq_fr)
def reader_creator(directory, path_en, path_fr, URL, MD5, dict_en, dict_fr):
def reader():
if not os.path.exists(path_en) or not os.path.exists(path_fr):
with tarfile.open(
paddle.v2.dataset.common.download(URL, 'wmt14', MD5)) as tf:
tf.extractall(directory)
f_en = open(path_en)
f_fr = open(path_fr)
UNK_en = dict_en['<unk>']
UNK_fr = dict_fr['<unk>']
for en, fr in itertools.izip(f_en, f_fr):
src_ids = [dict_en.get(w, UNK_en) for w in en.strip().split()]
tar_ids = [
dict_fr.get(w, UNK_fr)
for w in ['<s>'] + fr.strip().split() + ['<e>']
]
# remove sequence whose length > 80 in training mode
if len(src_ids) == 0 or len(tar_ids) <= 1 or len(
src_ids) > 80 or len(tar_ids) > 80:
continue
yield src_ids, tar_ids[:-1], tar_ids[1:]
f_en.close()
f_fr.close()
return reader
def train(dict_en, dict_fr):
directory = './wmt14-data'
return reader_creator(directory, directory + '/train/train.en',
directory + '/train/train.fr', URL_TRAIN, MD5_TRAIN,
dict_en, dict_fr)
def test(dict_en, dict_fr):
directory = './wmt14-data'
return reader_creator(directory, directory + '/dev/ntst1213.en',
directory + '/dev/ntst1213.fr', URL_DEV_TEST,
MD5_DEV_TEST, dict_en, dict_fr)
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = ['batch']
def batch(reader, batch_size):
"""
Create a batched reader.
:param reader: the data reader to read from.
:type reader: callable
:param batch_size: size of each mini-batch
:type batch_size: int
:return: the batched reader.
:rtype: callable
"""
def batch_reader():
r = reader()
b = []
for instance in r:
b.append(instance)
if len(b) == batch_size:
yield b
b = []
if b:
yield b
return batch_reader
......@@ -12,16 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = [
'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
'ComposeNotAligned', 'firstn'
]
import itertools
import random
from Queue import Queue
from threading import Thread
__all__ = [
'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
'ComposeNotAligned', 'batched', 'firstn'
]
def map_readers(func, *readers):
"""
......@@ -202,32 +202,6 @@ def buffered(reader, size):
return data_reader
def batched(reader, batch_size):
"""
Create a batched reader.
:param reader: the data reader to read from.
:type reader: callable
:param batch_size: size of each mini-batch
:type batch_size: int
:return: the batched reader.
:rtype: callable
"""
def batched_reader():
r = reader()
batch = []
for instance in r:
batch.append(instance)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch
return batched_reader
def firstn(reader, n):
"""
Limit the max number of samples that reader could return.
......
......@@ -109,7 +109,7 @@ class SGD(object):
for each_param in self.__gradient_machine__.getNonStaticParameters(
):
updater.update(each_param)
cost_sum = out_args.sumCosts()
cost_sum = out_args.sum()
cost = cost_sum / len(data_batch)
updater.finishBatch(cost)
batch_evaluator.finish()
......@@ -145,7 +145,7 @@ class SGD(object):
num_samples += len(data_batch)
self.__gradient_machine__.forward(
feeder(data_batch), out_args, api.PASS_TEST)
total_cost += out_args.sumCosts()
total_cost += out_args.sum()
self.__gradient_machine__.eval(evaluator)
evaluator.finish()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册