提交 27c39617 编写于 作者: Y Yancey1989

Merge branch 'develop' of github.com:PaddlePaddle/Paddle into fix_k8s_404

......@@ -28,6 +28,10 @@ function train() {
--test_period=100 \
--config_args=$args \
2>&1 | tee ${log}
avg_time=`tail ${log} -n 1 | awk -F ' ' '{print $8}' | sed 's/avg=//'`
fps=`awk 'BEGIN{printf "%.2f",('$bs' / '$avg_time' * 1000)}'`
echo "FPS: $fps images/sec" 2>&1 | tee -a ${log}
}
if [ ! -f "train.list" ]; then
......
set -e
function clock_to_seconds() {
hours=`echo $1 | awk -F ':' '{print $1}'`
mins=`echo $1 | awk -F ':' '{print $2}'`
secs=`echo $1 | awk -F ':' '{print $3}'`
echo `awk 'BEGIN{printf "%.2f",('$secs' + '$mins' * 60 + '$hours' * 3600)}'`
}
function infer() {
unset OMP_NUM_THREADS MKL_NUM_THREADS OMP_DYNAMIC KMP_AFFINITY
topology=$1
layer_num=$2
bs=$3
thread=`nproc`
if [ $thread -gt $bs ]; then
thread=$bs
fi
log="logs/infer-${topology}-${layer_num}-${thread}openblas-${bs}.log"
models_in="models/${topology}-${layer_num}/pass-00000/"
if [ ! -d $models_in ]; then
echo "./run_mkl_infer.sh to save the model first"
exit 0
fi
log_period=$((256 / bs))
paddle train --job=test \
--config="${topology}.py" \
--use_gpu=False \
--trainer_count=$thread \
--log_period=$log_period \
--config_args="batch_size=${bs},layer_num=${layer_num},is_infer=True" \
--init_model_path=$models_in \
2>&1 | tee ${log}
# calculate the last 5 logs period time of 1280 samples,
# the time before are burning time.
start=`tail ${log} -n 7 | head -n 1 | awk -F ' ' '{print $2}' | xargs`
end=`tail ${log} -n 2 | head -n 1 | awk -F ' ' '{print $2}' | xargs`
start_sec=`clock_to_seconds $start`
end_sec=`clock_to_seconds $end`
fps=`awk 'BEGIN{printf "%.2f",(1280 / ('$end_sec' - '$start_sec'))}'`
echo "Last 1280 samples start: ${start}(${start_sec} sec), end: ${end}(${end_sec} sec;" >> ${log}
echo "FPS: $fps images/sec" 2>&1 | tee -a ${log}
}
if [ ! -f "train.list" ]; then
echo " " > train.list
fi
if [ ! -f "test.list" ]; then
echo " " > test.list
fi
if [ ! -d "logs" ]; then
mkdir logs
fi
# inference benchmark
for batchsize in 1 2 4 8 16; do
infer googlenet v1 $batchsize
infer resnet 50 $batchsize
infer vgg 19 $batchsize
done
set -e
function train() {
unset OMP_NUM_THREADS MKL_NUM_THREADS OMP_DYNAMIC KMP_AFFINITY
topology=$1
layer_num=$2
bs=$3
thread=`nproc`
# each trainer_count use only 1 core to avoid conflict
log="logs/train-${topology}-${layer_num}-${thread}openblas-${bs}.log"
args="batch_size=${bs},layer_num=${layer_num}"
config="${topology}.py"
paddle train --job=time \
--config=$config \
--use_gpu=False \
--trainer_count=$thread \
--log_period=10 \
--test_period=100 \
--config_args=$args \
2>&1 | tee ${log}
avg_time=`tail ${log} -n 1 | awk -F ' ' '{print $8}' | sed 's/avg=//'`
fps=`awk 'BEGIN{printf "%.2f",('$bs' / '$avg_time' * 1000)}'`
echo "FPS: $fps images/sec" 2>&1 | tee -a ${log}
}
if [ ! -f "train.list" ]; then
echo " " > train.list
fi
if [ ! -d "logs" ]; then
mkdir logs
fi
# training benchmark
for batchsize in 64 128 256; do
train vgg 19 $batchsize
train resnet 50 $batchsize
train googlenet v1 $batchsize
done
......@@ -300,3 +300,7 @@ conv2d_transpose
.. autofunction:: paddle.v2.fluid.layers.conv2d_transpose
:noindex:
sequence_expand
---------
.. autofunction:: paddle.v2.fluid.layers.sequence_expand
:noindex:
# Executor Design Doc
## Motivation
In [fluid](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/fluid.md), we encourage the user to use deep learning programming paradigms to describe the training process. When the user-written Python program is executed, it will first create a protobuf message
[`ProgramDesc`](https://github.com/PaddlePaddle/Paddle/blob/a91efdde6910ce92a78e3aa7157412c4c88d9ee8/paddle/framework/framework.proto#L145) that describes the process and is conceptually like an [abstract syntax tree](https://en.wikipedia.org/wiki/Abstract_syntax_tree).
We use executor to do the runtime evaluation of a `ProgramDesc`.
The executor runs the `ProgramDesc` like an interpreter. `ProgramDesc` contains the intrinsics (operators in this case) and variables which will be used, executor explicitly executes the stored precompiled code.
## Overview
An executor takes a `ProgramDesc`, a `block_id` and a `Scope`. The `ProgramDesc` is a list of blocks and each block contains the protobuf definition of all the parameters and operators. The `block_id` specifies the entrance block. And the `Scope` is the container of all the variable instance, which is persistent throughout different runs.
An executor takes a `ProgramDesc`, a `block_id` and a `Scope`. The `ProgramDesc` is a list of blocks and each block contains the protobuf definition of all the parameters and operators in the block. The `block_id` specifies the entrance block. And the `Scope` is the container of all the variable instances, which is persistent throughout different runs.
### What does executor do?
## Executor
It evaluates all the operators in the `block_id`th block of a `ProgramDesc`.
The `Executor` explicitly executes all the intrinsics (operators here) in the `block_id`th block of a `ProgramDesc`. Essentially, it instantiates Variables and Operators, then runs all the operators in sequence one-by-one.
It is very similar to how a push stack frame works when entering a block, following which it cleans up all the temporary variables when a mini-batch is finished. It does not however, have the stack frame pop process.
### What does executor NOT do?
### The interface
```c++
Executor(places);
```
A executor does not own any computing resources, a user can only construct an executor using the specified places.
It does not do runtime optimization, meaning intelligently parse the dependency of each op a choose which one to be run and in which order they should be run.
### Running an Executor
It does not do graph partitioning, meaning dividing the `ProgramDesc` into several small pieces and executing them on different devices.
## Implementation
`Executor` evaluates a `ProgramDesc`. Essentially, it instantiates Variables and Operators, then run all the operators in sequence. [[code]](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.cc)
```
void Run(ProgramDesc, Scope, block_id, create_local_scope);
```
An `Executor` only provides a unified way to execute `ProgramDesc`. `ProgramDesc` is the target that will be executed, the `Scope` specifies the variable container, the `block_id` indicates the entrance block and `create_local_scope` is a boolean that states whether it will destroy the temporary variables after the execution is finished.
......@@ -30,10 +30,10 @@
由于在现有的某些情况下(例如RNN),多次调用 cblas_?gemm 会使用相同的原数据,因此,每次调用时对原数据的重复Packing便成为了冗余。
为了最大程度减少多次调用 cblas_?gemm 在Packing上的耗时,Intel® MKL 引入了以下四个API:
* cblas_?gemm_alloc
* cblas_?gemm_pack
* cblas_?gemm_compute
* cblas_?gemm_free
* [cblas_?gemm_alloc](https://software.intel.com/en-us/mkl-developer-reference-c-cblas-gemm-alloc)
* [cblas_?gemm_pack](https://software.intel.com/en-us/mkl-developer-reference-c-cblas-gemm-pack)
* [cblas_?gemm_compute](https://software.intel.com/en-us/mkl-developer-reference-c-cblas-gemm-compute)
* [cblas_?gemm_free](https://software.intel.com/en-us/mkl-developer-reference-c-cblas-gemm-free)
通过使用这些API,我们可以先完成对原数据的Packing操作,再把已转换为Packed格式的数据传递给那些复用同一数据的gemm_compute函数,从而避免了Packing冗余。
......@@ -84,7 +84,20 @@ PaddlePaddle/Paddle
2. 对比优化后layer与相对应的PaddlePaddle原有layer, 在batch mode下的结果。
### Python API
TBD
计划在`paddle/utils.Flags`中添加`use_mkl_packed`的flag,用于选择是否使用相关功能,并且当编译时`WITH_MKL=ON`的情况下,默认设置为`true`
同时,在`python/paddle/trainer/config_parser.py`中对应的layer处,添加`use_mkl_packed`这个选择,方便用户在Python端选择是否启用这个功能。
具体实现方式比如:
```python
use_mkl_packed = bool(int(g_command_config_args.get("use_mkl_packed", 0)))
if use_mkl_packed:
self.layer_type = mkl_packed_*
```
所有相关的`layer_type`会以*mkl_packed_*开头,这些会在`MKLPacked*Layer`注册layer的时候保证,以示区分。
### Benchmarking
会添加相应的脚本用于测试和对比在使用MKL Packed recurrent layers 前后的网络性能。
......
......@@ -9,9 +9,6 @@
usage/cmd_parameter/index_cn.rst
usage/cluster/cluster_train_cn.md
usage/k8s/k8s_basis_cn.md
usage/k8s/k8s_cn.md
usage/k8s/k8s_distributed_cn.md
开发标准
--------
......
......@@ -9,8 +9,6 @@ Usage
usage/cmd_parameter/index_en.rst
usage/cluster/cluster_train_en.md
usage/k8s/k8s_en.md
usage/k8s/k8s_aws_en.md
Development
------------
......
# PaddlePaddle分布式训练
* [概述](#概述)
* [环境准备](#环境准备)
* [启动参数说明](#启动参数说明)
* [启动参数服务器](#启动参数服务器)
* [启动计算节点](#启动计算节点)
* [准备数据集](#准备数据集)
* [准备训练程序](#准备训练程序)
* [使用分布式计算平台或工具](#使用分布式计算平台或工具)
* [使用Fabric启动集群作业](#使用fabric启动集群作业)
* [准备一个Linux集群](#准备一个linux集群)
* [启动集群作业](#启动集群作业)
* [终止集群作业](#终止集群作业)
* [检查集群训练结果](#检查集群训练结果)
* [检查模型输出](#检查模型输出)
* [在OpenMPI集群中提交训练作业](#在openmpi集群中提交训练作业)
* [准备OpenMPI集群](#准备OpenMPI集群)
* [启动集群作业](#启动集群作业-1)
* [在Kubernetes集群中提交训练作业](#在kubernetes集群中提交训练作业)
## 概述
本文将介绍如何使用PaddlePaddle在不同的集群框架下完成分布式训练。分布式训练架构如下图所示:
<img src="https://user-images.githubusercontent.com/13348433/31772175-5f419eca-b511-11e7-9db7-5231fe3d9ccb.png" width="500">
......@@ -32,10 +15,11 @@
在使用同步SGD训练神经网络时,PaddlePaddle使用同步屏障(barrier),使梯度的提交和参数的更新按照顺序方式执行。在异步SGD中,则并不会等待所有trainer提交梯度才更新参数,这样极大地提高了计算的并行性:参数服务器之间不相互依赖,并行地接收梯度和更新参数,参数服务器也不会等待计算节点全部都提交梯度之后才开始下一步,计算节点之间也不会相互依赖,并行地执行模型的训练。可以看出,虽然异步SGD方式会提高参数更新并行度, 但是并不能保证参数同步更新,在任意时间某一台参数服务器上保存的参数可能比另一台要更新,与同步SGD相比,梯度会有噪声。
## 环境准备
1. 准备您的计算集群。计算集群通常由一组(几台到几千台规模)的Linux服务器组成。服务器之间可以通过局域网(LAN)联通,每台服务器具有集群中唯一的IP地址(或者可被DNS解析的主机名)。集群中的每台计算机通常被成为一个“节点”。
1. 我们需要在集群的所有节点上安装 PaddlePaddle。 如果要启用GPU,还需要在节点上安装对应的GPU驱动以及CUDA。PaddlePaddle的安装可以参考[build_and_install](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/getstarted/build_and_install)的多种安装方式。我们推荐使用[Docker](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/getstarted/build_and_install/docker_install_cn.rst)安装方式来快速安装PaddlePaddle。
1. 我们需要在集群的所有节点上安装 PaddlePaddle。 如果要启用GPU,还需要在节点上安装对应的GPU驱动以及CUDA。PaddlePaddle的安装可以参考[build_and_install](http://www.paddlepaddle.org/docs/develop/documentation/zh/getstarted/build_and_install/index_cn.html)的多种安装方式。我们推荐使用[Docker](http://www.paddlepaddle.org/docs/develop/documentation/zh/getstarted/build_and_install/docker_install_cn.html)安装方式来快速安装PaddlePaddle。
安装完成之后,执行下面的命令可以查看已经安装的版本(docker安装方式可以进入docker容器执行:`docker run -it paddlepaddle/paddle:[tag] /bin/bash`):
```bash
......@@ -63,12 +47,12 @@ $ paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradie
$ stdbuf -oL /usr/bin/nohup paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradient_servers=1 &> pserver.log
```
| 参数 | 是否必选 | 默认值 | 说明 |
| ------------- | ------------- | ------------- | ------------- |
| port | 必选 | 7164 | pserver监听的起始端口,根据ports_num决定<br>总端口个数,从起始端口监听多个端口用于通信 |
| ports_num | 必选 | 1 | 监听的端口个数 |
| ports_num_for_sparse | 必选 | 1 | 用于稀疏类型参数通信的端口个数 |
| num_gradient_servers | 必选 | 1 | 当前训练任务pserver总数 |
参数说明
- port:**必选,默认7164**,pserver监听的起始端口,根据ports_num决定总端口个数,从起始端口监听多个端口用于通信
- ports_num:**必选,默认1**,监听的端口个数
- ports_num_for_sparse:**必选,默认1**,用于稀疏类型参数通信的端口个数
- num_gradient_servers:**必选,默认1**,当前训练任务pserver总数
### 启动计算节点
执行以下命令启动使用python编写的trainer程序(文件名为任意文件名,如train.py)
......@@ -105,16 +89,16 @@ paddle.init(
pservers="127.0.0.1")
```
| 参数 | 是否必选 | 默认 | 说明 |
| ------------- | ------------- | ------------- | ------------- |
| use_gpu | 可选 | False | 是否启用GPU训练 |
| trainer_count | 必选 | 1 | 当前训练任务trainer总个数 |
| port | 必选 | 7164 | 连接到pserver的端口 |
| ports_num | 必选 | 1 | 连接到pserver的端口个数 |
| ports_num_for_sparse | 必选 | 1 | 和pserver之间用于稀疏类型参数通信的端口个数 |
| num_gradient_servers | 必选 | 1 | 当前训练任务pserver总数 |
| trainer_id | 必选 | 0 | 每个trainer的唯一ID,从0开始的整数 |
| pservers | 必选 | 127.0.0.1 | 当前训练任务启动的pserver的IP列表,多个IP使用“,”隔开 |
参数说明
- use_gpu: **可选,默认False**,是否启用GPU训练
- trainer_count:**必选,默认1**,当前训练任务trainer总个数
- port:**必选,默认7164**,连接到pserver的端口
- ports_num:**必选,默认1**,连接到pserver的端口个数
- ports_num_for_sparse:**必选,默认1**,和pserver之间用于稀疏类型参数通信的端口个数
- num_gradient_servers:**必选,默认1**,当前训练任务pserver总数
- trainer_id:**必选,默认0**,每个trainer的唯一ID,从0开始的整数
- pservers:**必选,默认127.0.0.1**,当前训练任务启动的pserver的IP列表,多个IP使用“,”隔开
### 准备数据集
......@@ -171,7 +155,7 @@ test.txt-00002
- `my_lib.py`:会被`train.py`调用的一些用户定义的库函数,比如PIL库等。
- `word_dict.pickle`:在`train.py`中会使用到的字典数据文件。
- `train.py`:训练程序,代码参考[api_train_v2_cluster.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/prepare.py)***注意:*** 对于本样例代码,在使用不同的分布式计算平台时,您可能需要修改`train.py`开头的部分(如下),以便获得训练数据的位置和获取环境变量配置:
- `train.py`:训练程序,代码参考[api_train_v2_cluster.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/api_train_v2_cluster.py)***注意:*** 对于本样例代码,在使用不同的分布式计算平台时,您可能需要修改`train.py`开头的部分(如下),以便获得训练数据的位置和获取环境变量配置:
```python
cluster_train_file = "./train_data_dir/train/train.txt"
......@@ -195,91 +179,10 @@ PaddlePaddle可以使用多种分布式计算平台构建分布式计算任务
在使用分布式计算平台进行训练时,任务被调度在集群中时,分布式计算平台通常会通过API或者环境变量提供任务运行需要的参数,比如节点的ID、IP和任务节点个数等。
### 使用Fabric启动集群作业
#### 准备一个Linux集群
可以在`paddle/scripts/cluster_train_v2/fabric/docker_cluster`目录下,执行`kubectl -f ssh_servers.yaml`启动一个测试集群,并使用`kubectl get po -o wide`获得这些节点的IP地址。
#### 启动集群作业
`paddle.py` 提供了自动化脚本来启动不同节点中的所有 PaddlePaddle 集群进程。默认情况下,所有命令行选项可以设置为 `paddle.py` 命令选项并且 `paddle.py` 将透明、自动地将这些选项应用到 PaddlePaddle 底层进程。
`paddle.py` 为方便作业启动提供了两个独特的命令选项。
- `job_dispatch_package` 设为本地 `workspace` 目录,它将被分发到 `conf.py` 中设置的所有节点。它有助于帮助频繁修改和访问工作区文件的用户减少负担,否则频繁的多节点工作空间部署可能会很麻烦。
- `job_workspace` 设为已部署的工作空间目录,`paddle.py` 将跳过分发阶段直接启动所有节点的集群作业。它可以帮助减少分发延迟。
`cluster_train/run.sh` 提供了命令样例来运行 `doc/howto/usage/cluster/src/word2vec` 集群任务,只需用您定义的目录修改 `job_dispatch_package``job_workspace`,然后:
```
sh run.sh
```
集群作业将会在几秒后启动。
#### 终止集群作业
`paddle.py`能获取`Ctrl + C` SIGINT 信号来自动终止它启动的所有进程。只需中断 `paddle.py` 任务来终止集群作业。如果程序崩溃你也可以手动终止。
#### 检查集群训练结果
详细信息请检查 $workspace/log 里的日志,每一个节点都有相同的日志结构。
`paddle_trainer.INFO`
提供几乎所有训练的内部输出日志,与本地训练相同。这里检验运行时间模型的收敛。
`paddle_pserver2.INFO`
提供 pserver 运行日志,有助于诊断分布式错误。
`server.log`
提供 parameter server 进程的 stderr 和 stdout。训练失败时可以检查错误日志。
`train.log`
提供训练过程的 stderr 和 stdout。训练失败时可以检查错误日志。
#### 检查模型输出
运行完成后,模型文件将被写入节点 0 的 `output` 目录中。
工作空间中的 `nodefile` 表示当前集群作业的节点 ID。
### 在OpenMPI集群中提交训练作业
#### 准备OpenMPI集群
执行下面的命令以启动3个节点的OpenMPI集群和一个"head"节点:
```bash
paddle/scripts/cluster_train_v2/openmpi/docker_cluster
kubectl create -f head.yaml
kubectl create -f mpi-nodes.yaml
```
然后可以从head节点ssh无密码登录到OpenMPI的每个节点上。
#### 启动集群作业
您可以按照下面的步骤在OpenMPI集群中提交paddle训练任务:
```bash
# 获得head和node节点的IP地址
kubectl get po -o wide
# 将node节点的IP地址保存到machines文件中
kubectl get po -o wide | grep nodes | awk '{print $6}' > machines
# 拷贝必要的文件到head节点
scp -i ssh/id_rsa.mpi.pub machines prepare.py train.py start_mpi_train.sh tutorial@[headIP]:~
# ssh 登录到head节点
ssh -i ssh/id_rsa.mpi.pub tutorial@[headIP]
# --------------- 以下操作均在head节点中执行 ---------------
# 准备训练数据
python prepare.py
# 拷贝训练程序和字典文件到每台MPI节点
cat machines | xargs -i scp word_dict.pickle train.py start_mpi_train.sh machines {}:/home/tutorial
# 创建日志目录
mpirun -hostfile machines -n 3 mkdir /home/tutorial/logs
# 拷贝训练数据到各自的节点
scp train.txt-00000 test.txt-00000 [node1IP]:/home/tutorial
scp train.txt-00001 test.txt-00001 [node2IP]:/home/tutorial
scp train.txt-00002 test.txt-00002 [node3IP]:/home/tutorial
# 启动训练任务
mpirun -hostfile machines -n 3 /home/tutorial/start_mpi_train.sh
```
### 在Kubernetes集群中提交训练作业
## 在不同集群中运行
此部分的使用方法可以参考[here](../k8s/k8s_distributed_cn.md)
- [fabric](fabric_cn.md)
- [openmpi](openmpi_cn.md)
- [kubernetes](k8s_cn.md)
- [kubernetes distributed](k8s_distributed_cn.md)
- [kubernetes on AWS](k8s_aws_cn.md)
# PaddlePaddle Distributed Training
* [Introduction](#introduction)
* [Preparations](#preparations)
* [Command-line arguments](#command-line-arguments)
* [Starting parameter server](#starting-parameter-server)
* [Starting trainer](#starting-trainer)
* [Prepare Training Dataset](#prepare-training-dataset)
* [Prepare Training program](#prepare-training-program)
* [Use cluster platforms or cluster management tools](#use-cluster-platforms-or-cluster-management-tools)
* [Cluster Training Using Fabric](#cluster-training-using-fabric)
* [Prepare a Linux cluster](#prepare-a-linux-cluster)
* [Launching Cluster Job](#launching-cluster-job)
* [Kill Cluster Job](#kill-cluster-job)
* [Check Cluster Training Result](#check-cluster-training-result)
* [Check Model Output](#check-model-output)
* [Cluster Training Using OpenMPI](#cluster-training-using-openmpi)
* [Prepare an OpenMPI cluster](#prepare-an-openmpi-cluster)
* [Launching Cluster Job](#launching-cluster-job-1)
* [Cluster Training Using Kubernetes](#cluster-training-using-kubernetes)
## Introduction
In this article, we'll explain how to run distributed training jobs with PaddlePaddle on different types of clusters. The diagram below shows the main architecture of a distributed trainning job:
......@@ -35,7 +16,7 @@ When training with synchronize SGD, PaddlePaddle uses an internal "synchronize b
## Preparations
1. Prepare your computer cluster. It's normally a bunch of Linux servers connected by LAN. Each server will be assigned a unique IP address. The computers in the cluster can be called "nodes".
2. Install PaddlePaddle on every node. If you are going to take advantage of GPU cards, you'll also need to install proper driver and CUDA libraries. To install PaddlePaddle please read [this build and install](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/getstarted/build_and_install) document. We strongly recommend using [Docker installation](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/getstarted/build_and_install/docker_install_en.rst).
2. Install PaddlePaddle on every node. If you are going to take advantage of GPU cards, you'll also need to install proper driver and CUDA libraries. To install PaddlePaddle please read [this build and install](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/index_en.html) document. We strongly recommend using [Docker installation](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/docker_install_en.html).
After installation, you can check the version by typing the below command (run a docker container if using docker: `docker run -it paddlepaddle/paddle:[tag] /bin/bash`):
......@@ -67,12 +48,12 @@ If you wish to run parameter servers in background, and save a log file, you can
$ stdbuf -oL /usr/bin/nohup paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradient_servers=1 &> pserver.log
```
| param | required | default | description |
| ------------- | ------------- | ------------- | ------------- |
| port | required | 7164 | port which parameter server will listen on. If ports_num greater than 1, parameter server will listen on multiple ports for more network throughput |
| ports_num | required | 1 | total number of ports will listen on |
| ports_num_for_sparse | required | 1 | number of ports which serves sparse parameter update |
| num_gradient_servers | required | 1 | total number of gradient servers |
Parameter Description
- port: **required, default 7164**, port which parameter server will listen on. If ports_num greater than 1, parameter server will listen on multiple ports for more network throughput.
- ports_num: **required, default 1**, total number of ports will listen on.
- ports_num_for_sparse: **required, default 1**, number of ports which serves sparse parameter update.
- num_gradient_servers: **required, default 1**, total number of gradient servers.
### Starting trainer
Type the command below to start the trainer(name the file whatever you want, like "train.py")
......@@ -111,16 +92,16 @@ paddle.init(
pservers="127.0.0.1")
```
| param | required | default | description |
| ------------- | ------------- | ------------- | ------------- |
| use_gpu | optional | False | set to "True" to enable GPU training |
| trainer_count | required | 1 | total count of trainers in the training job |
| port | required | 7164 | port to connect to parameter server |
| ports_num | required | 1 | number of ports for communication |
| ports_num_for_sparse | required | 1 | number of ports for sparse type caculation |
| num_gradient_servers | required | 1 | total number of gradient server |
| trainer_id | required | 0 | ID for every trainer, start from 0 |
| pservers | required | 127.0.0.1 | list of IPs of parameter servers, separated by "," |
Parameter Description
- use_gpu: **optional, default False**, set to "True" to enable GPU training.
- trainer_count: **required, default 1**, total count of trainers in the training job.
- port: **required, default 7164**, port to connect to parameter server.
- ports_num: **required, default 1**, number of ports for communication.
- ports_num_for_sparse: **required, default 1**, number of ports for sparse type caculation.
- num_gradient_servers: **required, default 1**, total number of gradient server.
- trainer_id: **required, default 0**, ID for every trainer, start from 0.
- pservers: **required, default 127.0.0.1**, list of IPs of parameter servers, separated by ",".
### Prepare Training Dataset
......@@ -178,7 +159,7 @@ Your workspace may looks like:
- `my_lib.py`: user defined libraries, like PIL libs. This is optional.
- `word_dict.pickle`: dict file for training word embeding.
- `train.py`: training program. Sample code: [api_train_v2_cluster.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/prepare.py). ***NOTE:*** You may need to modify the head part of `train.py` when using different cluster platform to retrive configuration environment variables:
- `train.py`: training program. Sample code: [api_train_v2_cluster.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/api_train_v2_cluster.py). ***NOTE:*** You may need to modify the head part of `train.py` when using different cluster platform to retrive configuration environment variables:
```python
cluster_train_file = "./train_data_dir/train/train.txt"
......@@ -202,92 +183,10 @@ We'll introduce cluster job management on these platforms. The examples can be f
These cluster platforms provide API or environment variables for training processes, when the job is dispatched to different nodes. Like node ID, IP or total number of nodes etc.
### Cluster Training Using Fabric
#### Prepare a Linux cluster
Run `kubectl -f ssh_servers.yaml` under the directory: `paddle/scripts/cluster_train_v2/fabric/docker_cluster` will launch a demo cluster. Run `kubectl get po -o wide` to get IP addresses of these nodes.
#### Launching Cluster Job
`paddle.py` provides automatical scripts to start all PaddlePaddle cluster processes in different nodes. By default, all command line options can be set as `paddle.py` command options and `paddle.py` will transparently and automatically set these options to PaddlePaddle lower level processes.
`paddle.py`provides two distinguished command option for easy job launching.
- `job_dispatch_package` set it with local `workspace` directory, it will be dispatched to all nodes which is set in `conf.py`. It could be helpful for frequently manipulating workspace files. otherwise, frequent multi-nodes workspace deployment is very annoying.
- `job_workspace` set it with already deployed workspace directory, `paddle.py` will skip dispatch stage to directly launch cluster job with all nodes. It could help to reduce heavy
dispatch latency.
`cluster_train/run.sh` provides command line sample to run `demo/recommendation` cluster job, just modify `job_dispatch_package` and `job_workspace` with your defined directory, then:
```
sh run.sh
```
The cluster Job will start in several seconds.
#### Kill Cluster Job
`paddle.py` can capture `Ctrl + C` SIGINT signal to automatically kill all processes launched by it. So just stop `paddle.py` to kill cluster job. You should manually kill the job if the program crashed.
#### Check Cluster Training Result
Check log in $workspace/log for details, each node owns same log structure.
`paddle_trainer.INFO`
It provides almost all internal output log for training, same as local training. Check runtime model convergence here.
`paddle_pserver2.INFO`
It provides parameter server running log, which could help to diagnose distributed error.
`server.log`
It provides stderr and stdout of parameter server process. Check error log if training crashes.
`train.log`
It provides stderr and stdout of trainer process. Check error log if training crashes.
#### Check Model Output
After one pass finished, model files will be written in `output` directory in node 0.
`nodefile` in workspace indicates the node id of current cluster job.
### Cluster Training Using OpenMPI
#### Prepare an OpenMPI cluster
Run the following command to start a 3-node MPI cluster and one "head" node.
```bash
cd paddle/scripts/cluster_train_v2/openmpi/docker_cluster
kubectl create -f head.yaml
kubectl create -f mpi-nodes.yaml
```
Then you can log in to every OpenMPI node using ssh without input any passwords.
#### Launching Cluster Job
Follow the steps to launch a PaddlePaddle training job in OpenMPI cluster:\
```bash
# find out node IP addresses
kubectl get po -o wide
# generate a "machines" file containing node IP addresses
kubectl get po -o wide | grep nodes | awk '{print $6}' > machines
# copy necessary files onto "head" node
scp -i ssh/id_rsa.mpi.pub machines prepare.py train.py start_mpi_train.sh tutorial@[headIP]:~
# login to head node using ssh
ssh -i ssh/id_rsa.mpi.pub tutorial@[headIP]
# --------------- in head node ---------------
# prepare training data
python prepare.py
# copy training data and dict file to MPI nodes
cat machines | xargs -i scp word_dict.pickle train.py start_mpi_train.sh machines {}:/home/tutorial
# creat a directory for storing log files
mpirun -hostfile machines -n 3 mkdir /home/tutorial/logs
# copy training data to every node
scp train.txt-00000 test.txt-00000 [node1IP]:/home/tutorial
scp train.txt-00001 test.txt-00001 [node2IP]:/home/tutorial
scp train.txt-00002 test.txt-00002 [node3IP]:/home/tutorial
# start the job
mpirun -hostfile machines -n 3 /home/tutorial/start_mpi_train.sh
```
### Cluster Training Using Kubernetes
## Use different clusters
The details can be found [here](../k8s/k8s_cn.md)
- [fabric](fabric_en.md)
- [openmpi](openmpi_en.md)
- [kubernetes](k8s_en.md)
- kubernetes distributed
- [kubernetes on AWS](k8s_aws_en.md)
# 使用fabric启动集群训练
## 准备一个Linux集群
可以在`paddle/scripts/cluster_train_v2/fabric/docker_cluster`目录下,执行`kubectl -f ssh_servers.yaml`启动一个测试集群,并使用`kubectl get po -o wide`获得这些节点的IP地址。
## 启动集群作业
`paddle.py` 提供了自动化脚本来启动不同节点中的所有 PaddlePaddle 集群进程。默认情况下,所有命令行选项可以设置为 `paddle.py` 命令选项并且 `paddle.py` 将透明、自动地将这些选项应用到 PaddlePaddle 底层进程。
`paddle.py` 为方便作业启动提供了两个独特的命令选项。
- `job_dispatch_package` 设为本地 `workspace` 目录,它将被分发到 `conf.py` 中设置的所有节点。它有助于帮助频繁修改和访问工作区文件的用户减少负担,否则频繁的多节点工作空间部署可能会很麻烦。
- `job_workspace` 设为已部署的工作空间目录,`paddle.py` 将跳过分发阶段直接启动所有节点的集群作业。它可以帮助减少分发延迟。
`cluster_train/run.sh` 提供了命令样例来运行 `doc/howto/usage/cluster/src/word2vec` 集群任务,只需用您定义的目录修改 `job_dispatch_package``job_workspace`,然后:
```
sh run.sh
```
集群作业将会在几秒后启动。
## 终止集群作业
`paddle.py`能获取`Ctrl + C` SIGINT 信号来自动终止它启动的所有进程。只需中断 `paddle.py` 任务来终止集群作业。如果程序崩溃你也可以手动终止。
## 检查集群训练结果
详细信息请检查 $workspace/log 里的日志,每一个节点都有相同的日志结构。
`paddle_trainer.INFO`
提供几乎所有训练的内部输出日志,与本地训练相同。这里检验运行时间模型的收敛。
`paddle_pserver2.INFO`
提供 pserver 运行日志,有助于诊断分布式错误。
`server.log`
提供 parameter server 进程的 stderr 和 stdout。训练失败时可以检查错误日志。
`train.log`
提供训练过程的 stderr 和 stdout。训练失败时可以检查错误日志。
## 检查模型输出
运行完成后,模型文件将被写入节点 0 的 `output` 目录中。
工作空间中的 `nodefile` 表示当前集群作业的节点 ID。
# Cluster Training Using Fabric
## Prepare a Linux cluster
Run `kubectl -f ssh_servers.yaml` under the directory: `paddle/scripts/cluster_train_v2/fabric/docker_cluster` will launch a demo cluster. Run `kubectl get po -o wide` to get IP addresses of these nodes.
## Launching Cluster Job
`paddle.py` provides automatical scripts to start all PaddlePaddle cluster processes in different nodes. By default, all command line options can be set as `paddle.py` command options and `paddle.py` will transparently and automatically set these options to PaddlePaddle lower level processes.
`paddle.py`provides two distinguished command option for easy job launching.
- `job_dispatch_package` set it with local `workspace` directory, it will be dispatched to all nodes which is set in `conf.py`. It could be helpful for frequently manipulating workspace files. otherwise, frequent multi-nodes workspace deployment is very annoying.
- `job_workspace` set it with already deployed workspace directory, `paddle.py` will skip dispatch stage to directly launch cluster job with all nodes. It could help to reduce heavy
dispatch latency.
`cluster_train/run.sh` provides command line sample to run `demo/recommendation` cluster job, just modify `job_dispatch_package` and `job_workspace` with your defined directory, then:
```
sh run.sh
```
The cluster Job will start in several seconds.
## Kill Cluster Job
`paddle.py` can capture `Ctrl + C` SIGINT signal to automatically kill all processes launched by it. So just stop `paddle.py` to kill cluster job. You should manually kill the job if the program crashed.
## Check Cluster Training Result
Check log in $workspace/log for details, each node owns same log structure.
`paddle_trainer.INFO`
It provides almost all internal output log for training, same as local training. Check runtime model convergence here.
`paddle_pserver2.INFO`
It provides parameter server running log, which could help to diagnose distributed error.
`server.log`
It provides stderr and stdout of parameter server process. Check error log if training crashes.
`train.log`
It provides stderr and stdout of trainer process. Check error log if training crashes.
## Check Model Output
After one pass finished, model files will be written in `output` directory in node 0.
`nodefile` in workspace indicates the node id of current cluster job.
k8s_aws_en.md
\ No newline at end of file
# Kubernetes分布式训练
前一篇文章介绍了如何在Kubernetes集群上启动一个单机PaddlePaddle训练作业 (Job)。在这篇文章里,我们介绍如何在Kubernetes集群上进行分布式PaddlePaddle训练作业。关于PaddlePaddle的分布式训练,文章 [Cluster Training](https://github.com/baidu/Paddle/blob/develop/doc/cluster/opensource/cluster_train.md)介绍了一种通过SSH远程分发任务,进行分布式训练的方法,与此不同的是,本文将介绍在Kubernetes容器管理平台上快速构建PaddlePaddle容器集群,进行分布式训练的方案。
前一篇文章介绍了如何在Kubernetes集群上启动一个单机PaddlePaddle训练作业 (Job)。在这篇文章里,我们介绍如何在Kubernetes集群上进行分布式PaddlePaddle训练作业。关于PaddlePaddle的分布式训练,文章 [Cluster Training](http://www.paddlepaddle.org/docs/develop/documentation/zh/howto/usage/cluster/cluster_train_cn.html)介绍了一种通过SSH远程分发任务,进行分布式训练的方法,与此不同的是,本文将介绍在Kubernetes容器管理平台上快速构建PaddlePaddle容器集群,进行分布式训练的方案。
有关Kubernetes相关概念以及如何搭建和配置Kubernetes集群,可以参考[k8s_basis](./k8s_basis_cn.md)
......@@ -28,7 +28,7 @@ PaddlePaddle镜像需要提供`paddle pserver`与`paddle train`进程的运行
- 拷贝训练文件到容器内
- 生成`paddle pserver``paddle train`进程的启动参数,并且启动训练
因为官方镜像 `paddledev/paddle:cpu-latest` 内已经包含PaddlePaddle的执行程序但是还没上述功能,所以我们可以在这个基础上,添加启动脚本,制作新镜像来完成以上的工作。参考镜像的[*Dockerfile*](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/usage/cluster/k8s/src/k8s_train/Dockerfile)
因为官方镜像 `paddledev/paddle:cpu-latest` 内已经包含PaddlePaddle的执行程序但是还没上述功能,所以我们可以在这个基础上,添加启动脚本,制作新镜像来完成以上的工作。参考镜像的[*Dockerfile*](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/usage/cluster/src/k8s_train/Dockerfile)
```bash
$ cd doc/howto/usage/k8s/src/k8s_train
......@@ -149,20 +149,19 @@ spec:
文件中,`metadata`下的`name`表示这个job的名字。`parallelism,completions`字段表示这个job会同时开启3个PaddlePaddle节点,成功训练且退出的pod数目为3时,这个job才算成功结束。然后申明一个存储卷`jobpath`,代表宿主机目录`/home/work/mfs`,在对容器的描述`containers`字段中,将此目录挂载为容器的`/home/jobpath`目录,这样容器的`/home/jobpath`目录就成为了共享存储,放在这个目录里的文件其实是保存到了MFS上。
`env`字段表示容器的环境变量,我们将`paddle`运行的一些参数通过这种方式传递到容器内
`env`字段表示容器的环境变量,我们将`paddle`运行的一些参数通过这种方式传递到容器内
环境变量 | 说明
--- | ---
JOB_PATH | 共享存储挂在的路径
JOB_NAME | Job的名字
TRAIN_CONFIG_DIR | 本次训练文件所在目录,与JOB_PATH,JOB_NAME组合可以找到本次训练需要的文件路径
CONF_PADDLE_NIC | `paddle pserver`进程需要的`--nics`参数,即网卡名
CONF_PADDLE_PORT | `paddle paserver``--port`参数
CONF_PADDLE_PORTS_NUM | 稠密更新的端口数量,即`--ports_num`参数
CONF_PADDLE_PORTS_NUM_SPARSE | 稀疏更新的端口数量,即`--ports_num_for_sparse`参数
CONF_PADDLE_GRADIENT_NUM | 训练节点数量,即`--num_gradient_servers参数`
这些参数的具体描述,读者可以查看[这里](http://www.paddlepaddle.org/doc/ui/cmd_argument/detail_introduction.html#parameter-server-and-distributed-communication)
- JOB_PATH:共享存储挂在的路径
- JOB_NAME:Job的名字
- TRAIN_CONFIG_DIR:本次训练文件所在目录,与JOB_PATH,JOB_NAME组合可以找到本次训练需要的文件路径
- CONF_PADDLE_NIC:`paddle pserver`进程需要的`--nics`参数,即网卡名
- CONF_PADDLE_PORT:`paddle paserver``--port`参数
- CONF_PADDLE_PORTS_NUM:稠密更新的端口数量,即`--ports_num`参数
- CONF_PADDLE_PORTS_NUM_SPARSE:稀疏更新的端口数量,即`--ports_num_for_sparse`参数
- CONF_PADDLE_GRADIENT_NUM:训练节点数量,即`--num_gradient_servers参数`
这些参数的具体描述,读者可以查看[这里](http://www.paddlepaddle.org/docs/develop/documentation/zh/howto/usage/cmd_parameter/detail_introduction_cn.html)
编写完YAML文件后,可以使用Kubernetes的命令行工具创建job。
......
# 在OpenMPI集群中提交训练作业
## 准备OpenMPI集群
执行下面的命令以启动3个节点的OpenMPI集群和一个"head"节点:
```bash
paddle/scripts/cluster_train_v2/openmpi/docker_cluster
kubectl create -f head.yaml
kubectl create -f mpi-nodes.yaml
```
然后可以从head节点ssh无密码登录到OpenMPI的每个节点上。
## 启动集群作业
您可以按照下面的步骤在OpenMPI集群中提交paddle训练任务:
```bash
# 获得head和node节点的IP地址
kubectl get po -o wide
# 将node节点的IP地址保存到machines文件中
kubectl get po -o wide | grep nodes | awk '{print $6}' > machines
# 拷贝必要的文件到head节点
scp -i ssh/id_rsa.mpi.pub machines prepare.py train.py start_mpi_train.sh tutorial@[headIP]:~
# ssh 登录到head节点
ssh -i ssh/id_rsa.mpi.pub tutorial@[headIP]
# --------------- 以下操作均在head节点中执行 ---------------
# 准备训练数据
python prepare.py
# 拷贝训练程序和字典文件到每台MPI节点
cat machines | xargs -i scp word_dict.pickle train.py start_mpi_train.sh machines {}:/home/tutorial
# 创建日志目录
mpirun -hostfile machines -n 3 mkdir /home/tutorial/logs
# 拷贝训练数据到各自的节点
scp train.txt-00000 test.txt-00000 [node1IP]:/home/tutorial
scp train.txt-00001 test.txt-00001 [node2IP]:/home/tutorial
scp train.txt-00002 test.txt-00002 [node3IP]:/home/tutorial
# 启动训练任务
mpirun -hostfile machines -n 3 /home/tutorial/start_mpi_train.sh
```
# Cluster Training Using OpenMPI
## Prepare an OpenMPI cluster
Run the following command to start a 3-node MPI cluster and one "head" node.
```bash
cd paddle/scripts/cluster_train_v2/openmpi/docker_cluster
kubectl create -f head.yaml
kubectl create -f mpi-nodes.yaml
```
Then you can log in to every OpenMPI node using ssh without input any passwords.
## Launching Cluster Job
Follow the steps to launch a PaddlePaddle training job in OpenMPI cluster:\
```bash
# find out node IP addresses
kubectl get po -o wide
# generate a "machines" file containing node IP addresses
kubectl get po -o wide | grep nodes | awk '{print $6}' > machines
# copy necessary files onto "head" node
scp -i ssh/id_rsa.mpi.pub machines prepare.py train.py start_mpi_train.sh tutorial@[headIP]:~
# login to head node using ssh
ssh -i ssh/id_rsa.mpi.pub tutorial@[headIP]
# --------------- in head node ---------------
# prepare training data
python prepare.py
# copy training data and dict file to MPI nodes
cat machines | xargs -i scp word_dict.pickle train.py start_mpi_train.sh machines {}:/home/tutorial
# creat a directory for storing log files
mpirun -hostfile machines -n 3 mkdir /home/tutorial/logs
# copy training data to every node
scp train.txt-00000 test.txt-00000 [node1IP]:/home/tutorial
scp train.txt-00001 test.txt-00001 [node2IP]:/home/tutorial
scp train.txt-00002 test.txt-00002 [node3IP]:/home/tutorial
# start the job
mpirun -hostfile machines -n 3 /home/tutorial/start_mpi_train.sh
```
# Kubernetes 简介
[*Kubernetes*](http://kubernetes.io/)是Google开源的容器集群管理系统,其提供应用部署、维护、扩展机制等功能,利用Kubernetes能方便地管理跨机器运行容器化的应用。Kubernetes可以在物理机或虚拟机上运行,且支持部署到[AWS](http://kubernetes.io/docs/getting-started-guides/aws)[Azure](http://kubernetes.io/docs/getting-started-guides/azure/)[GCE](http://kubernetes.io/docs/getting-started-guides/gce)等多种公有云环境。介绍分布式训练之前,需要对[Kubernetes](http://kubernetes.io/)有一个基本的认识,下面先简要介绍一下本文用到的几个Kubernetes概念。
- [*Node*](http://kubernetes.io/docs/admin/node/) 表示一个Kubernetes集群中的一个工作节点,这个节点可以是物理机或者虚拟机,Kubernetes集群就是由node节点与master节点组成的。
- [*Pod*](http://kubernetes.io/docs/user-guide/pods/) 是一组(一个或多个)容器,pod是Kubernetes的最小调度单元,一个pod中的所有容器会被调度到同一个node上。Pod中的容器共享NET,PID,IPC,UTS等Linux namespace。由于容器之间共享NET namespace,所以它们使用同一个IP地址,可以通过*localhost*互相通信。不同pod之间可以通过IP地址访问。
- [*Job*](http://kubernetes.io/docs/user-guide/jobs/) 描述Kubernetes上运行的作业,一次作业称为一个job,通常每个job包括一个或者多个pods,job启动后会创建这些pod并开始执行一个程序,等待这个程序执行成功并返回0则成功退出,如果执行失败,也可以配置不同的重试机制。
- [*Volume*](http://kubernetes.io/docs/user-guide/volumes/) 存储卷,是pod内的容器都可以访问的共享目录,也是容器与node之间共享文件的方式,因为容器内的文件都是暂时存在的,当容器因为各种原因被销毁时,其内部的文件也会随之消失。通过volume,就可以将这些文件持久化存储。Kubernetes支持多种volume,例如hostPath(宿主机目录),gcePersistentDisk,awsElasticBlockStore等。
- [*Namespaces*](https://kubernetes.io/docs/user-guide/namespaces/) 命名空间,在kubernetes中创建的所有资源对象(例如上文的pod,job)等都属于一个命名空间,在同一个命名空间中,资源对象的名字是唯一的,不同空间的资源名可以重复,命名空间主要为了对象进行逻辑上的分组便于管理。本文只使用了默认命名空间。
- [*PersistentVolume*](https://kubernetes.io/docs/user-guide/persistent-volumes/): 和[*PersistentVolumeClaim*](https://kubernetes.io/docs/user-guide/persistent-volumes/#persistentvolumeclaims)结合,将外部的存储服务在Kubernetes中描述成为统一的资源形式,便于存储资源管理和Pod引用。
## 部署Kubernetes集群
Kubernetes提供了多种集群部署的方案,本文档内不重复介绍。这里给出集中常见的部署方法:
- [*minikube*](https://kubernetes.io/docs/getting-started-guides/minikube/): 快速在本地启动一个单机的kubernetes服务器,便于本地验证和测试。
- [*kubeadm*](http://kubernetes.io/docs/getting-started-guides/kubeadm/): 在不同操作系统,不同主机(Bare-Metal, AWS, GCE)条件下,快速部署集群。
- [*AWS EC2*](https://kubernetes.io/docs/getting-started-guides/aws/): 在aws上快速部署集群。
- [*Bare-Metal*](https://kubernetes.io/docs/getting-started-guides/centos/centos_manual_config/): 在物理机上手动部署。
可以参考[这个表格](https://kubernetes.io/docs/getting-started-guides/#table-of-solutions)选择适合您的场景的合适方案。
## 选择存储方案
容器不会保留在运行时生成的数据,job或者应用程序在容器中运行时生成的数据会在容器销毁时消失。为了完成分布式机器学习训练任务,需要有一个外部的存储服务来保存训练所需数据和训练输出。
常见的可选存储服务包括:
- [*NFS*](https://github.com/kubernetes/kubernetes/tree/master/examples/volumes/nfs): 可以将磁盘上某个目录共享给网络中其他机器访问。部署和配置比较简单,可以用于小量数据的验证。不提供分布式存储,高可用,冗余等功能。NFS的部署方法可以参考[这里](http://www.tecmint.com/how-to-setup-nfs-server-in-linux/)
- [*GlusterFS*](http://gluster.readthedocs.io/en/latest/Quick-Start-Guide/Quickstart/): 网络分布式文件系统,可以在Kubernetes中按照[这个](https://github.com/kubernetes/kubernetes/tree/master/examples/volumes/glusterfs)例子使用。
- [*Ceph*](http://docs.ceph.com/docs/master/): 分布式文件系统,支持rbd,POSIX API接口(ceph fs)和对象存储API,参考[这里](https://kubernetes.io/docs/user-guide/volumes/#rbd)
- [*MooseFS*](https://moosefs.com/documentation.html): 一个分布式的存储系统。需要先挂载到服务器Node上再通过kubernetes hostPath Volume挂载到容器中。
## 配置kubectl
### 安装kubectl
```
# OS X
curl -LO https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/darwin/amd64/kubectl
# Linux
curl -LO https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl
# Windows
curl -LO https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/windows/amd64/kubectl.exe
```
### 配置kubectl访问你的kubernetes集群
编辑`~/.kube/config`这个配置文件,修改`Master-IP`的地址。如果使用SSL认证,则需要配置`certificate-authority``users`中的用户证书。如果是使用非SSL方式访问(比如通过8080端口),也可以去掉这些证书的配置。
```
apiVersion: v1
clusters:
- cluster:
certificate-authority: /path/to/ca.crt
server: https://[Master-IP]:443
name: minikube
contexts:
- context:
cluster: minikube
user: minikube
name: minikube
current-context: minikube
kind: Config
preferences: {}
users:
- name: minikube
user:
client-certificate: /path/to/apiserver.crt
client-key: /Users/wuyi/.minikube/apiserver.key
```
......@@ -12,14 +12,14 @@
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/operators/seq_expand_op.h"
#include "paddle/operators/sequence_expand_op.h"
namespace paddle {
namespace operators {
using framework::Tensor;
class SeqExpandOp : public framework::OperatorWithKernel {
class SequenceExpandOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
......@@ -35,25 +35,25 @@ class SeqExpandOp : public framework::OperatorWithKernel {
}
};
class SeqExpandOpMaker : public framework::OpProtoAndCheckerMaker {
class SequenceExpandOpMaker : public framework::OpProtoAndCheckerMaker {
public:
SeqExpandOpMaker(framework::OpProto* proto,
framework::OpAttrChecker* op_checker)
SequenceExpandOpMaker(framework::OpProto* proto,
framework::OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X",
"(Tensor or LoDTensor) The input(X) of this operator can be a "
"LoDTensor or a base Tensor.");
AddInput("Y",
"(LoDTensor)The reference input(Y) of seq_expand op."
"(LoDTensor)The reference input(Y) of sequence_expand op."
"It must be a LoDTensor with k-level(k>0)."
"The input(X) will be expanded according to LOD of input(Y)."
"The element numbers of last level in input(Y) "
"must be equal to dims[0] of input(X).");
AddOutput("Out",
"(LodTensor)The output of seq_expand op."
"(LodTensor)The output of sequence_expand op."
"The lod of output will be as same as input(Y)'s lod.");
AddComment(R"DOC(
Seq Expand Operator.
Sequence Expand Operator.
This operator expands input(X) according to LOD of input(Y).
Following are cases to better explain how this works:
......@@ -124,7 +124,7 @@ then we get 2-level LoDTensor
}
};
class SeqExpandOpGrad : public framework::OperatorWithKernel {
class SequenceExpandOpGrad : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
......@@ -146,11 +146,11 @@ class SeqExpandOpGrad : public framework::OperatorWithKernel {
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OP(seq_expand, ops::SeqExpandOp, ops::SeqExpandOpMaker,
seq_expand_grad, ops::SeqExpandOpGrad);
REGISTER_OP(sequence_expand, ops::SequenceExpandOp, ops::SequenceExpandOpMaker,
sequence_expand_grad, ops::SequenceExpandOpGrad);
REGISTER_OP_CPU_KERNEL(
seq_expand,
ops::SeqExpandKernel<paddle::platform::CPUDeviceContext, float>);
sequence_expand,
ops::SequenceExpandKernel<paddle::platform::CPUDeviceContext, float>);
REGISTER_OP_CPU_KERNEL(
seq_expand_grad,
ops::SeqExpandGradKernel<paddle::platform::CPUDeviceContext, float>);
sequence_expand_grad,
ops::SequenceExpandGradKernel<paddle::platform::CPUDeviceContext, float>);
......@@ -13,12 +13,12 @@
limitations under the License. */
#define EIGEN_USE_GPU
#include "paddle/operators/seq_expand_op.h"
#include "paddle/operators/sequence_expand_op.h"
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(
seq_expand,
ops::SeqExpandKernel<paddle::platform::CUDADeviceContext, float>);
sequence_expand,
ops::SequenceExpandKernel<paddle::platform::CUDADeviceContext, float>);
REGISTER_OP_CUDA_KERNEL(
seq_expand_grad,
ops::SeqExpandGradKernel<paddle::platform::CUDADeviceContext, float>);
sequence_expand_grad,
ops::SequenceExpandGradKernel<paddle::platform::CUDADeviceContext, float>);
......@@ -24,7 +24,7 @@ namespace operators {
using LoDTensor = framework::LoDTensor;
template <typename DeviceContext, typename T>
class SeqExpandKernel : public framework::OpKernel<T> {
class SequenceExpandKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
auto* x = context.Input<LoDTensor>("X");
......@@ -71,7 +71,7 @@ class SeqExpandKernel : public framework::OpKernel<T> {
*
* */
template <typename DeviceContext, typename T>
class SeqExpandGradKernel : public framework::OpKernel<T> {
class SequenceExpandGradKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
auto* d_out = context.Input<LoDTensor>(framework::GradVarName("Out"));
......
import numpy as np
import layers
from framework import Program, unique_name, Variable
from framework import Program, unique_name, Variable, program_guard
from layer_helper import LayerHelper
__all__ = ['Accuracy', 'ChunkEvaluator']
......@@ -49,15 +49,12 @@ class Evaluator(object):
if reset_program is None:
reset_program = Program()
for var in self.states:
assert isinstance(var, Variable)
g_var = _clone_var_(reset_program.current_block(), var)
layers.fill_constant(
shape=g_var.shape,
value=0.0,
dtype=g_var.dtype,
out=g_var,
main_program=reset_program)
with program_guard(main_program=reset_program):
for var in self.states:
assert isinstance(var, Variable)
g_var = _clone_var_(reset_program.current_block(), var)
layers.fill_constant(
shape=g_var.shape, value=0.0, dtype=g_var.dtype, out=g_var)
executor.run(reset_program)
......@@ -104,20 +101,14 @@ class Accuracy(Evaluator):
self.total = self.create_state(dtype='int64', shape=[1], suffix='total')
self.correct = self.create_state(
dtype='int64', shape=[1], suffix='correct')
kwargs = {'main_program': main_program}
total = self.helper.create_tmp_variable(dtype='int')
correct = self.helper.create_tmp_variable(dtype='int')
acc = layers.accuracy(
input=input,
label=label,
k=k,
total=total,
correct=correct,
**kwargs)
total = layers.cast(x=total, dtype='int64', **kwargs)
correct = layers.cast(x=correct, dtype='int64', **kwargs)
layers.sums(input=[self.total, total], out=self.total, **kwargs)
layers.sums(input=[self.correct, correct], out=self.correct, **kwargs)
input=input, label=label, k=k, total=total, correct=correct)
total = layers.cast(x=total, dtype='int64')
correct = layers.cast(x=correct, dtype='int64')
layers.sums(input=[self.total, total], out=self.total)
layers.sums(input=[self.correct, correct], out=self.correct)
self.metrics.append(acc)
......@@ -125,12 +116,12 @@ class Accuracy(Evaluator):
if eval_program is None:
eval_program = Program()
block = eval_program.current_block()
kwargs = {'main_program': eval_program}
total = _clone_var_(block, self.total)
correct = _clone_var_(block, self.correct)
total = layers.cast(total, dtype='float32', **kwargs)
correct = layers.cast(correct, dtype='float32', **kwargs)
out = layers.elementwise_div(x=correct, y=total, **kwargs)
with program_guard(main_program=eval_program):
total = _clone_var_(block, self.total)
correct = _clone_var_(block, self.correct)
total = layers.cast(total, dtype='float32')
correct = layers.cast(correct, dtype='float32')
out = layers.elementwise_div(x=correct, y=total)
return np.array(executor.run(eval_program, fetch_list=[out])[0])
......@@ -141,14 +132,14 @@ class ChunkEvaluator(Evaluator):
numbers.
"""
def __init__(self,
input,
label,
chunk_scheme,
num_chunk_types,
excluded_chunk_types=None,
**kwargs):
super(ChunkEvaluator, self).__init__("chunk_eval", **kwargs)
def __init__(
self,
input,
label,
chunk_scheme,
num_chunk_types,
excluded_chunk_types=None, ):
super(ChunkEvaluator, self).__init__("chunk_eval")
main_program = self.helper.main_program
if main_program.current_block().idx != 0:
raise ValueError("You can only invoke Evaluator in root block")
......@@ -159,26 +150,21 @@ class ChunkEvaluator(Evaluator):
dtype='int64', shape=[1], suffix='num_label_chunks')
self.num_correct_chunks = self.create_state(
dtype='int64', shape=[1], suffix='num_correct_chunks')
kwargs = {'main_program': main_program}
precision, recall, f1_score, num_infer_chunks, num_label_chunks, num_correct_chunks = layers.chunk_eval(
input=input,
label=label,
chunk_scheme=chunk_scheme,
num_chunk_types=num_chunk_types,
excluded_chunk_types=excluded_chunk_types,
**kwargs)
excluded_chunk_types=excluded_chunk_types, )
layers.sums(
input=[self.num_infer_chunks, num_infer_chunks],
out=self.num_infer_chunks,
**kwargs)
out=self.num_infer_chunks)
layers.sums(
input=[self.num_label_chunks, num_label_chunks],
out=self.num_label_chunks,
**kwargs)
out=self.num_label_chunks)
layers.sums(
input=[self.num_correct_chunks, num_correct_chunks],
out=self.num_correct_chunks,
**kwargs)
out=self.num_correct_chunks)
self.metrics.extend([precision, recall, f1_score])
......@@ -186,7 +172,6 @@ class ChunkEvaluator(Evaluator):
if eval_program is None:
eval_program = Program()
block = eval_program.current_block()
kwargs = {'main_program': eval_program}
num_infer_chunks, num_label_chunks, num_correct_chunks = executor.run(
eval_program,
fetch_list=[_clone_var_(block, state) for state in self.states])
......
......@@ -21,19 +21,11 @@ class LayerHelper(object):
@property
def main_program(self):
prog = self.kwargs.get('main_program', None)
if prog is None:
return default_main_program()
else:
return prog
return default_main_program()
@property
def startup_program(self):
prog = self.kwargs.get('startup_program', None)
if prog is None:
return default_startup_program()
else:
return prog
return default_startup_program()
def append_op(self, *args, **kwargs):
return self.main_program.current_block().append_op(*args, **kwargs)
......@@ -151,13 +143,6 @@ class LayerHelper(object):
persistable=True,
initializer=initializer)
@property
def to_kwargs(self):
return {
'main_program': self.main_program,
'startup_program': self.startup_program
}
def append_bias_op(self, input_var, dim_start=1, dim_end=None):
"""
Append bias operator and return its output. If the user does not set
......
......@@ -14,11 +14,7 @@ __all__ = [
]
def split_lod_tensor(input,
mask,
level=0,
main_program=None,
startup_program=None):
def split_lod_tensor(input, mask, level=0):
helper = LayerHelper('split_lod_tensor', **locals())
out_true = helper.create_tmp_variable(dtype=input.dtype)
out_false = helper.create_tmp_variable(dtype=input.dtype)
......@@ -34,13 +30,7 @@ def split_lod_tensor(input,
return out_true, out_false
def merge_lod_tensor(in_true,
in_false,
x,
mask,
level=0,
main_program=None,
startup_program=None):
def merge_lod_tensor(in_true, in_false, x, mask, level=0):
helper = LayerHelper('merge_lod_tensor', **locals())
out = helper.create_tmp_variable(dtype=in_true.dtype)
helper.append_op(
......@@ -135,9 +125,8 @@ class StaticRNN(object):
IN_RNN_BLOCK = 1
AFTER_RNN_BLOCK = 2
def __init__(self, name=None, main_program=None):
self.helper = LayerHelper(
"static_rnn", name=name, main_program=main_program)
def __init__(self, name=None):
self.helper = LayerHelper("static_rnn", name=name)
self.memories = {} # memory map, from pre_mem.name --> MemoryLink
self.inputs = [] # input variable list in current block
self.outputs = [] # output variable list in parent block
......@@ -354,8 +343,8 @@ class While(object):
IN_WHILE_BLOCK = 1
AFTER_WHILE_BLOCK = 2
def __init__(self, cond, name=None, main_program=None):
self.helper = LayerHelper("while", name=name, main_program=main_program)
def __init__(self, cond, name=None):
self.helper = LayerHelper("while", name=name)
self.status = While.BEFORE_WHILE_BLOCK
if not isinstance(cond, Variable):
raise TypeError("condition should be a variable")
......@@ -406,7 +395,7 @@ class While(object):
attrs={'sub_block': while_block})
def lod_rank_table(x, level=0, main_program=None):
def lod_rank_table(x, level=0):
"""
This function creates an operator for creating a LOD_RANK_TABLE
using the input x.
......@@ -423,7 +412,7 @@ def lod_rank_table(x, level=0, main_program=None):
return table
def max_sequence_len(rank_table, main_program=None):
def max_sequence_len(rank_table):
"""
This function creates an operator to calculate the length of
max seqence through input rank_table(should be a lod_rank_table)
......@@ -437,7 +426,7 @@ def max_sequence_len(rank_table, main_program=None):
return res
def topk(input, k, main_program=None, startup_program=None):
def topk(input, k):
helper = LayerHelper('topk', **locals())
topk_out = helper.create_tmp_variable(dtype=input.data_type)
topk_indices = helper.create_tmp_variable(dtype='int64')
......@@ -450,7 +439,7 @@ def topk(input, k, main_program=None, startup_program=None):
return topk_out, topk_indices
def lod_tensor_to_array(x, table, main_program=None):
def lod_tensor_to_array(x, table):
"""
This function creates an operator to convert an LOD_Tensor to
an array.
......@@ -468,7 +457,7 @@ def lod_tensor_to_array(x, table, main_program=None):
return array
def array_to_lod_tensor(x, table, main_program=None, startup_program=None):
def array_to_lod_tensor(x, table):
"""
This function creates an operator to convert an array to a
LOD_Tensor.
......@@ -483,11 +472,7 @@ def array_to_lod_tensor(x, table, main_program=None, startup_program=None):
return tmp
def increment(x,
value=1.0,
in_place=True,
main_program=None,
startup_program=None):
def increment(x, value=1.0, in_place=True):
"""
This function creates an operator to increment each value in the input
`x` by an amount: `value` as mentioned in the input parameter. This
......@@ -506,7 +491,7 @@ def increment(x,
return out
def array_write(x, i, array=None, main_program=None, startup_program=None):
def array_write(x, i, array=None):
"""
This function creates an operator to write the data out as a
LOD_TENSOR_ARRAY.
......@@ -525,7 +510,7 @@ def array_write(x, i, array=None, main_program=None, startup_program=None):
return array
def create_array(dtype, main_program=None):
def create_array(dtype):
helper = LayerHelper("array", **locals())
return helper.create_variable(
name="{0}.out".format(helper.name),
......@@ -533,7 +518,7 @@ def create_array(dtype, main_program=None):
dtype=dtype)
def less_than(x, y, cond=None, main_program=None, **ignored):
def less_than(x, y, cond=None, **ignored):
helper = LayerHelper("less_than", **locals())
if cond is None:
cond = helper.create_tmp_variable(dtype='bool')
......@@ -545,7 +530,7 @@ def less_than(x, y, cond=None, main_program=None, **ignored):
return cond
def array_read(array, i, main_program=None, startup_program=None):
def array_read(array, i):
"""
This function creates an operator to read the data in as a
LOD_TENSOR_ARRAY.
......@@ -564,7 +549,7 @@ def array_read(array, i, main_program=None, startup_program=None):
return out
def shrink_memory(x, i, table, main_program=None, startup_program=None):
def shrink_memory(x, i, table):
"""
This function creates an operator to shrink_rnn_memory using the RankTable
as mentioned in the input parameter.
......@@ -581,7 +566,7 @@ def shrink_memory(x, i, table, main_program=None, startup_program=None):
return out
def array_length(array, main_program=None):
def array_length(array):
"""
This function creates an operator to find the length of the
LOD_TENSOR_ARRAY.
......@@ -611,20 +596,12 @@ class ConditionalBlockGuard(BlockGuard):
class ConditionalBlock(object):
def __init__(self,
inputs,
name=None,
main_program=None,
startup_program=None):
def __init__(self, inputs, name=None):
for each_input in inputs:
if not isinstance(each_input, Variable):
raise TypeError("Each input should be variable")
self.inputs = inputs
self.helper = LayerHelper(
'conditional_block',
name=name,
main_program=main_program,
startup_program=startup_program)
self.helper = LayerHelper('conditional_block', name=name)
def block(self):
return ConditionalBlockGuard(self)
......@@ -709,15 +686,10 @@ class IfElse(object):
IN_IF_ELSE_TRUE_BLOCKS = 1
IN_IF_ELSE_FALSE_BLOCKS = 2
def __init__(self, cond, name=None, main_program=None,
startup_program=None):
def __init__(self, cond, name=None):
if not isinstance(cond, Variable):
raise TypeError("cond must be a Variable")
self.helper = LayerHelper(
'ifelse',
name=name,
main_program=main_program,
startup_program=startup_program)
self.helper = LayerHelper('ifelse', name=name)
self.cond = cond
self.input_table = {}
self.status = IfElse.OUT_IF_ELSE_BLOCKS
......@@ -782,11 +754,7 @@ class IfElse(object):
out_table.append(outside_out)
# assign local var to outside
assign(
input=each_out,
output=outside_out,
main_program=self.helper.main_program,
startup_program=self.helper.startup_program)
assign(input=each_out, output=outside_out)
def __call__(self):
if self.status != self.OUT_IF_ELSE_BLOCKS:
......@@ -810,9 +778,7 @@ class IfElse(object):
in_false=false_var,
mask=self.cond,
x=self.cond,
level=0,
main_program=self.helper.main_program,
startup_program=self.helper.startup_program))
level=0))
return rlist
......@@ -821,12 +787,8 @@ class DynamicRNN(object):
IN_RNN = 1
AFTER_RNN = 2
def __init__(self, name=None, main_program=None, startup_program=None):
self.helper = LayerHelper(
'dynamic_rnn',
name=name,
main_program=main_program,
startup_program=startup_program)
def __init__(self, name=None):
self.helper = LayerHelper('dynamic_rnn', name=name)
self.status = DynamicRNN.BEFORE_RNN
self.lod_rank_table = None
self.max_seq_len = None
......@@ -880,8 +842,7 @@ class DynamicRNN(object):
inputs={'X': x,
'RankTable': self.lod_rank_table},
outputs={'Out': input_array})
return array_read(
array=input_array, i=self.step_idx, **self.helper.to_kwargs)
return array_read(array=input_array, i=self.step_idx)
@contextlib.contextmanager
def block(self):
......@@ -892,32 +853,18 @@ class DynamicRNN(object):
self.status = DynamicRNN.IN_RNN
with self.while_op.block():
yield
increment(
x=self.step_idx,
value=1.0,
in_place=True,
**self.helper.to_kwargs)
increment(x=self.step_idx, value=1.0, in_place=True)
for new_mem, mem_array in self.mem_link:
array_write(
x=new_mem,
i=self.step_idx,
array=mem_array,
**self.helper.to_kwargs)
less_than(
x=self.step_idx,
y=self.max_seq_len,
cond=self.cond,
**self.helper.to_kwargs)
array_write(x=new_mem, i=self.step_idx, array=mem_array)
less_than(x=self.step_idx, y=self.max_seq_len, cond=self.cond)
self.status = DynamicRNN.AFTER_RNN
for each_array in self.output_array:
self.outputs.append(
array_to_lod_tensor(
x=each_array,
table=self.lod_rank_table,
**self.helper.to_kwargs))
x=each_array, table=self.lod_rank_table))
def __call__(self, *args, **kwargs):
if self.status != DynamicRNN.AFTER_RNN:
......@@ -944,13 +891,9 @@ class DynamicRNN(object):
inputs={'X': init,
'I': self.zero_idx},
outputs={'Out': mem_array})
retv = array_read(
array=mem_array, i=self.step_idx, **self.helper.to_kwargs)
retv = array_read(array=mem_array, i=self.step_idx)
retv = shrink_memory(
x=retv,
i=self.step_idx,
table=self.lod_rank_table,
**self.helper.to_kwargs)
x=retv, i=self.step_idx, table=self.lod_rank_table)
self.mem_dict[retv.name] = mem_array
return retv
else:
......
......@@ -10,8 +10,6 @@ def data(name,
dtype='float32',
lod_level=0,
type=core.VarDesc.VarType.LOD_TENSOR,
main_program=None,
startup_program=None,
stop_gradient=True):
"""
Data Layer.
......
......@@ -10,7 +10,7 @@ __all__ = [
'fc', 'embedding', 'dynamic_lstm', 'gru_unit', 'linear_chain_crf',
'crf_decoding', 'cos_sim', 'cross_entropy', 'square_error_cost', 'accuracy',
'chunk_eval', 'sequence_conv', 'conv2d', 'sequence_pool', 'pool2d',
'batch_norm', 'beam_search_decode', 'conv2d_transpose'
'batch_norm', 'beam_search_decode', 'conv2d_transpose', 'sequence_expand'
]
......@@ -20,9 +20,7 @@ def fc(input,
param_attr=None,
bias_attr=None,
act=None,
name=None,
main_program=None,
startup_program=None):
name=None):
"""
Fully Connected Layer.
......@@ -88,13 +86,7 @@ def fc(input,
return helper.append_activation(pre_activation)
def embedding(input,
size,
is_sparse=False,
param_attr=None,
dtype='float32',
main_program=None,
startup_program=None):
def embedding(input, size, is_sparse=False, param_attr=None, dtype='float32'):
"""
Embedding Layer.
......@@ -140,9 +132,7 @@ def dynamic_lstm(input,
gate_activation='sigmoid',
cell_activation='tanh',
candidate_activation='tanh',
dtype='float32',
main_program=None,
startup_program=None):
dtype='float32'):
helper = LayerHelper('lstm', **locals())
size = size / 4
weight = helper.create_parameter(
......@@ -185,9 +175,7 @@ def gru_unit(input,
weight=None,
bias=None,
activation='tanh',
gate_activation='sigmoid',
main_program=None,
startup_program=None):
gate_activation='sigmoid'):
"""
GRUUnit Operator implements partial calculations of the GRU unit as following:
......@@ -250,11 +238,7 @@ def gru_unit(input,
return updated_hidden, reset_hidden_pre, gate
def linear_chain_crf(input,
label,
param_attr=None,
main_program=None,
startup_program=None):
def linear_chain_crf(input, label, param_attr=None):
helper = LayerHelper('linear_chain_crf', **locals())
size = input.shape[1]
transition = helper.create_parameter(
......@@ -280,11 +264,7 @@ def linear_chain_crf(input,
return log_likelihood
def crf_decoding(input,
param_attr,
label=None,
main_program=None,
startup_program=None):
def crf_decoding(input, param_attr, label=None):
helper = LayerHelper('crf_decoding', **locals())
transition = helper.get_parameter(param_attr.name)
viterbi_path = helper.create_tmp_variable(dtype=helper.input_dtype())
......@@ -392,7 +372,7 @@ def chunk_eval(input,
excluded_chunk_types=None,
**kwargs):
"""
This function computes and outputs the precision, recall and
This function computes and outputs the precision, recall and
F1-score of chunk detection.
"""
helper = LayerHelper("chunk_eval", **kwargs)
......@@ -432,9 +412,7 @@ def sequence_conv(input,
padding=None,
bias_attr=None,
param_attr=None,
act=None,
main_program=None,
startup_program=None):
act=None):
"""
This function creates the op for sequence_conv, using the inputs and
other convolutional configurations for the filters and stride as given
......@@ -477,9 +455,7 @@ def conv2d(input,
param_attr=None,
bias_attr=None,
act=None,
name=None,
main_program=None,
startup_program=None):
name=None):
"""
This function creates the op for a 2-dimensional Convolution.
This is performed using the parameters of filters(size, dimensionality etc)
......@@ -565,9 +541,7 @@ def pool2d(input,
pool_type,
pool_stride=None,
pool_padding=None,
global_pooling=False,
main_program=None,
startup_program=None):
global_pooling=False):
"""
This function adds the operator for pooling in 2 dimensions, using the
pooling configurations mentioned in input parameters.
......@@ -613,9 +587,7 @@ def batch_norm(input,
epsilon=1e-05,
param_attr=None,
bias_attr=None,
data_layout='NCHW',
main_program=None,
startup_program=None):
data_layout='NCHW'):
"""
This function helps create an operator to implement
the BatchNorm layer using the configurations from the input parameters.
......@@ -685,7 +657,7 @@ def batch_norm(input,
return helper.append_activation(batch_norm_out)
def beam_search_decode(ids, scores, main_program=None, startup_program=None):
def beam_search_decode(ids, scores):
helper = LayerHelper('beam_search_decode', **locals())
sentence_ids = helper.create_tmp_variable(dtype=ids.dtype)
sentence_scores = helper.create_tmp_variable(dtype=ids.dtype)
......@@ -708,9 +680,7 @@ def conv2d_transpose(input,
filter_size=None,
padding=None,
stride=None,
param_attr=None,
main_program=None,
startup_program=None):
param_attr=None):
"""
The transpose of conv2d layer.
......@@ -789,3 +759,70 @@ def conv2d_transpose(input,
attrs=op_attr)
return out
def sequence_expand(x, y, main_program=None, startup_program=None):
"""Sequence Expand Layer. This layer will expand the input variable **x**
according to LoD information of **y**. And the following examples will
explain how sequence_expand works:
.. code-block:: text
* Case 1
x is a LoDTensor:
x.lod = [[0, 2, 3],
[0, 1, 3, 4]]
x.data = [a, b, c, d]
x.dims = [4, 1]
y is a LoDTensor:
y.lod = [[0, 2, 4],
[0, 3, 6, 7, 8]]
with condition len(y.lod[-1]) - 1 == x.dims[0]
then output is a 2-level LoDTensor:
out.lod = [[0, 2, 4],
[0, 3, 6, 7, 8]]
out.data = [a, a, a, b, b, b, c, d]
out.dims = [8, 1]
* Case 2
x is a Tensor:
x.data = [a, b, c]
x.dims = [3, 1]
y is a LoDTensor:
y.lod = [[0, 2, 3, 6]]
with condition len(y.lod[-1]) - 1 == x.dims[0]
then output is a 1-level LoDTensor:
out.lod = [[0, 2, 3, 6]]
out.data = [a, a, b, c, c, c]
out.dims = [6, 1]
Args:
x (Variable): The input variable which is a Tensor or LoDTensor.
y (Variable): The input variable which is a LoDTensor.
main_program (Program): The main program.
startup_program (Program): The startup program.
Returns:
Variable: The expanded variable which is a LoDTensor.
Examples:
.. code-block:: python
x = fluid.layers.data(name='x', shape=[10], dtype='float32')
y = fluid.layers.data(name='y', shape=[10, 20],
dtype='float32', lod_level=1)
out = layers.sequence_expand(x=x, y=y)
"""
helper = LayerHelper('sequence_expand', input=x, **locals())
dtype = helper.input_dtype()
tmp = helper.create_tmp_variable(dtype)
helper.append_op(
type='sequence_expand', inputs={'X': x,
'Y': y}, outputs={'Out': tmp})
return tmp
......@@ -6,12 +6,12 @@ __all__ = [
]
def create_tensor(dtype, name=None, main_program=None, startup_program=None):
def create_tensor(dtype, name=None):
helper = LayerHelper("create_tensor", **locals())
return helper.create_variable(name=helper.name, dtype=dtype)
def cast(x, dtype, main_program=None):
def cast(x, dtype):
"""
This function takes in the input with input_dtype
and casts it to the output_dtype as the output.
......@@ -27,7 +27,7 @@ def cast(x, dtype, main_program=None):
return out
def concat(input, axis, main_program=None, startup_program=None):
def concat(input, axis):
"""
This function concats the input along the axis mentioned
and returns that as the output.
......@@ -42,7 +42,7 @@ def concat(input, axis, main_program=None, startup_program=None):
return out
def sums(input, out=None, main_program=None, startup_program=None):
def sums(input, out=None):
"""
This function takes in the input and performs the sum operation on it
and returns that as the output.
......@@ -54,7 +54,7 @@ def sums(input, out=None, main_program=None, startup_program=None):
return out
def assign(input, output, main_program=None, startup_program=None):
def assign(input, output):
helper = LayerHelper('assign', **locals())
helper.append_op(
type='scale',
......@@ -64,12 +64,7 @@ def assign(input, output, main_program=None, startup_program=None):
return output
def fill_constant(shape,
dtype,
value,
out=None,
main_program=None,
startup_program=None):
def fill_constant(shape, dtype, value, out=None):
"""
This function creates a tensor , with shape as mentioned in the input and
specified dtype and fills this up with a constant value that
......@@ -94,9 +89,7 @@ def fill_constant_batch_size_like(input,
dtype,
value,
input_dim_idx=0,
output_dim_idx=0,
main_program=None,
startup_program=None):
output_dim_idx=0):
helper = LayerHelper("fill_constant_batch_size_like", **locals())
out = helper.create_tmp_variable(dtype=dtype)
helper.append_op(
......@@ -114,7 +107,7 @@ def fill_constant_batch_size_like(input,
return out
def ones(shape, dtype, main_program=None):
def ones(shape, dtype):
"""
This function performs the same function as fill_constant() declared above
with the constant value being 1.0.
......@@ -122,7 +115,7 @@ def ones(shape, dtype, main_program=None):
return fill_constant(value=1.0, **locals())
def zeros(shape, dtype, main_program=None):
def zeros(shape, dtype):
"""
This function performs the same function as fill_constant() declared above
with the constant value being 0.0.
......
......@@ -10,25 +10,19 @@ def simple_img_conv_pool(input,
pool_stride,
act,
param_attr=None,
pool_type='max',
main_program=None,
startup_program=None):
pool_type='max'):
conv_out = layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
param_attr=param_attr,
act=act,
main_program=main_program,
startup_program=startup_program)
act=act)
pool_out = layers.pool2d(
input=conv_out,
pool_size=pool_size,
pool_type=pool_type,
pool_stride=pool_stride,
main_program=main_program,
startup_program=startup_program)
pool_stride=pool_stride)
return pool_out
......@@ -42,9 +36,7 @@ def img_conv_group(input,
conv_with_batchnorm=False,
conv_batchnorm_drop_rate=None,
pool_stride=1,
pool_type=None,
main_program=None,
startup_program=None):
pool_type=None):
"""
Image Convolution Group, Used for vgg net.
"""
......@@ -75,31 +67,19 @@ def img_conv_group(input,
filter_size=conv_filter_size[i],
padding=conv_padding[i],
param_attr=param_attr[i],
act=local_conv_act,
main_program=main_program,
startup_program=startup_program)
act=local_conv_act)
if conv_with_batchnorm[i]:
tmp = layers.batch_norm(
input=tmp,
act=conv_act,
main_program=main_program,
startup_program=startup_program)
tmp = layers.batch_norm(input=tmp, act=conv_act)
drop_rate = conv_batchnorm_drop_rate[i]
if abs(drop_rate) > 1e-5:
tmp = layers.dropout(
x=tmp,
dropout_prob=drop_rate,
main_program=main_program,
startup_program=startup_program)
tmp = layers.dropout(x=tmp, dropout_prob=drop_rate)
pool_out = layers.pool2d(
input=tmp,
pool_size=pool_size,
pool_type=pool_type,
pool_stride=pool_stride,
main_program=main_program,
startup_program=startup_program)
pool_stride=pool_stride)
return pool_out
......@@ -108,21 +88,13 @@ def sequence_conv_pool(input,
filter_size,
param_attr=None,
act="sigmoid",
pool_type="max",
main_program=None,
startup_program=None):
pool_type="max"):
conv_out = layers.sequence_conv(
input=input,
num_filters=num_filters,
filter_size=filter_size,
param_attr=param_attr,
act=act,
main_program=main_program,
startup_program=startup_program)
act=act)
pool_out = layers.sequence_pool(
input=conv_out,
pool_type=pool_type,
main_program=main_program,
startup_program=startup_program)
pool_out = layers.sequence_pool(input=conv_out, pool_type=pool_type)
return pool_out
......@@ -2,7 +2,7 @@ from collections import defaultdict
import framework
from backward import append_backward_ops
from framework import unique_name
from framework import unique_name, program_guard
from initializer import Constant
from layer_helper import LayerHelper
from regularizer import append_regularization_ops
......@@ -159,34 +159,32 @@ class Optimizer(object):
# Create any accumulators
program = loss.block.program
self.helper = LayerHelper(
self.__class__.__name__,
main_program=program,
startup_program=startup_program)
self._create_accumulators(loss.block,
[p[0] for p in parameters_and_grads])
optimize_ops = []
for param_and_grad in parameters_and_grads:
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
optimize_op = self._append_optimize_op(loss.block,
param_and_grad)
optimize_ops.append(optimize_op)
# Returned list of ops can include more ops in addition
# to optimization ops
return_ops = optimize_ops
# Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies
finish_ops = self._finish_update(loss.block)
if finish_ops is not None:
return_ops += finish_ops
if self._global_step is not None:
return_ops.append(self._increment_global_step(loss.block))
return return_ops
with program_guard(program, startup_program):
self.helper = LayerHelper(self.__class__.__name__)
self._create_accumulators(loss.block,
[p[0] for p in parameters_and_grads])
optimize_ops = []
for param_and_grad in parameters_and_grads:
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
optimize_op = self._append_optimize_op(loss.block,
param_and_grad)
optimize_ops.append(optimize_op)
# Returned list of ops can include more ops in addition
# to optimization ops
return_ops = optimize_ops
# Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies
finish_ops = self._finish_update(loss.block)
if finish_ops is not None:
return_ops += finish_ops
if self._global_step is not None:
return_ops.append(self._increment_global_step(loss.block))
return return_ops
def minimize(self,
loss,
......
image/
fit_a_line.model/
tmp
cuda_profiler.txt
......@@ -33,11 +33,10 @@ opts = optimizer.minimize(avg_cost)
accuracy = fluid.evaluator.Accuracy(input=predict, label=label)
inference_program = fluid.default_main_program().clone()
test_accuracy = fluid.evaluator.Accuracy(
input=predict, label=label, main_program=inference_program)
test_target = [avg_cost] + test_accuracy.metrics + test_accuracy.states
inference_program = fluid.io.get_inference_program(
test_target, main_program=inference_program)
with fluid.program_guard(inference_program):
test_accuracy = fluid.evaluator.Accuracy(input=predict, label=label)
test_target = [avg_cost] + test_accuracy.metrics + test_accuracy.states
inference_program = fluid.io.get_inference_program(test_target)
train_reader = paddle.batch(
paddle.reader.shuffle(
......
......@@ -4,12 +4,7 @@ import paddle.v2.fluid as fluid
from paddle.v2.fluid.layer_helper import LayerHelper
def lstm(x,
c_pre_init,
hidden_dim,
forget_bias=None,
main_program=None,
startup_program=None):
def lstm(x, c_pre_init, hidden_dim, forget_bias=None):
"""
This function helps create an operator for the LSTM (Long Short Term
Memory) cell that can be used inside an RNN.
......@@ -20,15 +15,8 @@ def lstm(x,
c_pre = rnn.memory(init=c_pre_init)
x_t = rnn.step_input(x)
before_fc = fluid.layers.concat(
input=[x_t, c_pre],
axis=1,
main_program=main_program,
startup_program=startup_program)
after_fc = fluid.layers.fc(input=before_fc,
size=hidden_dim * 4,
main_program=main_program,
startup_program=startup_program)
before_fc = fluid.layers.concat(input=[x_t, c_pre], axis=1)
after_fc = fluid.layers.fc(input=before_fc, size=hidden_dim * 4)
dtype = x.dtype
c = helper.create_tmp_variable(dtype)
......
......@@ -5,12 +5,7 @@ import paddle.v2.fluid.nets as nets
from paddle.v2.fluid.framework import Program
def conv_block(input,
num_filter,
groups,
dropouts,
main_program=None,
startup_program=None):
def conv_block(input, num_filter, groups, dropouts):
return nets.img_conv_group(
input=input,
pool_size=2,
......@@ -20,90 +15,54 @@ def conv_block(input,
conv_act='relu',
conv_with_batchnorm=True,
conv_batchnorm_drop_rate=dropouts,
pool_type='max',
main_program=main_program,
startup_program=startup_program)
pool_type='max')
class TestLayer(unittest.TestCase):
def test_batch_norm_layer(self):
main_program = Program()
startup_program = Program()
images = fluid.layers.data(
name='pixel',
shape=[3, 48, 48],
dtype='float32',
main_program=main_program)
hidden1 = fluid.layers.batch_norm(
input=images,
main_program=main_program,
startup_program=startup_program)
hidden2 = fluid.layers.fc(input=hidden1,
size=128,
act='relu',
main_program=main_program)
hidden3 = fluid.layers.batch_norm(
input=hidden2,
main_program=main_program,
startup_program=startup_program)
with fluid.program_guard(main_program, startup_program):
images = fluid.layers.data(
name='pixel', shape=[3, 48, 48], dtype='float32')
hidden1 = fluid.layers.batch_norm(input=images)
hidden2 = fluid.layers.fc(input=hidden1, size=128, act='relu')
fluid.layers.batch_norm(input=hidden2)
print str(main_program)
def test_dropout_layer(self):
main_program = Program()
startup_program = Program()
images = fluid.layers.data(
name='pixel',
shape=[3, 48, 48],
dtype='float32',
main_program=main_program)
fluid.layers.dropout(
x=images,
dropout_prob=0.5,
main_program=main_program,
startup_program=startup_program)
with fluid.program_guard(main_program, startup_program):
images = fluid.layers.data(
name='pixel', shape=[3, 48, 48], dtype='float32')
fluid.layers.dropout(x=images, dropout_prob=0.5)
# print str(main_program)
print str(main_program)
def test_img_conv_group(self):
main_program = Program()
startup_program = Program()
images = fluid.layers.data(
name='pixel',
shape=[3, 48, 48],
dtype='float32',
main_program=main_program,
startup_program=startup_program)
conv1 = conv_block(images, 64, 2, [0.3, 0], main_program,
startup_program)
conv2 = conv_block(conv1, 256, 3, [0.4, 0.4, 0], main_program,
startup_program)
with fluid.program_guard(main_program, startup_program):
images = fluid.layers.data(
name='pixel', shape=[3, 48, 48], dtype='float32')
conv1 = conv_block(images, 64, 2, [0.3, 0])
conv_block(conv1, 256, 3, [0.4, 0.4, 0])
# print str(main_program)
print str(main_program)
def test_elementwise_add_with_act(self):
main_program = Program()
startup_program = Program()
image1 = fluid.layers.data(
name='pixel1',
shape=[3, 48, 48],
dtype='float32',
main_program=main_program,
startup_program=startup_program)
image2 = fluid.layers.data(
name='pixel2',
shape=[3, 48, 48],
dtype='float32',
main_program=main_program,
startup_program=startup_program)
out = fluid.layers.elementwise_add(
x=image1,
y=image2,
act='relu',
main_program=main_program,
startup_program=startup_program)
# print(main_program)
with fluid.program_guard(main_program, startup_program):
image1 = fluid.layers.data(
name='pixel1', shape=[3, 48, 48], dtype='float32')
image2 = fluid.layers.data(
name='pixel2', shape=[3, 48, 48], dtype='float32')
fluid.layers.elementwise_add(x=image1, y=image2, act='relu')
print(main_program)
if __name__ == '__main__':
......
......@@ -6,7 +6,7 @@ import paddle.v2.fluid.core as core
import paddle.v2.fluid.executor as executor
import paddle.v2.fluid.layers as layers
import paddle.v2.fluid.optimizer as optimizer
from paddle.v2.fluid.framework import Program
from paddle.v2.fluid.framework import Program, program_guard
from paddle.v2.fluid.io import save_inference_model, load_inference_model
......@@ -16,35 +16,18 @@ class TestBook(unittest.TestCase):
init_program = Program()
program = Program()
x = layers.data(
name='x',
shape=[2],
dtype='float32',
main_program=program,
startup_program=init_program)
y = layers.data(
name='y',
shape=[1],
dtype='float32',
main_program=program,
startup_program=init_program)
y_predict = layers.fc(input=x,
size=1,
act=None,
main_program=program,
startup_program=init_program)
cost = layers.square_error_cost(
input=y_predict,
label=y,
main_program=program,
startup_program=init_program)
avg_cost = layers.mean(
x=cost, main_program=program, startup_program=init_program)
sgd_optimizer = optimizer.SGDOptimizer(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost, init_program)
with program_guard(program, init_program):
x = layers.data(name='x', shape=[2], dtype='float32')
y = layers.data(name='y', shape=[1], dtype='float32')
y_predict = layers.fc(input=x, size=1, act=None)
cost = layers.square_error_cost(input=y_predict, label=y)
avg_cost = layers.mean(x=cost)
sgd_optimizer = optimizer.SGDOptimizer(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost, init_program)
place = core.CPUPlace()
exe = executor.Executor(place)
......
......@@ -161,6 +161,15 @@ class TestBook(unittest.TestCase):
x=dat, label=lbl))
print(str(program))
def test_seq_expand(self):
program = Program()
with program_guard(program):
x = layers.data(name='x', shape=[10], dtype='float32')
y = layers.data(
name='y', shape=[10, 20], dtype='float32', lod_level=1)
self.assertIsNotNone(layers.sequence_expand(x=x, y=y))
print(str(program))
if __name__ == '__main__':
unittest.main()
......@@ -2,7 +2,7 @@ import unittest
import paddle.v2.fluid.core as core
import numpy
import paddle.v2.fluid.layers as layers
from paddle.v2.fluid.framework import Program
from paddle.v2.fluid.framework import Program, program_guard
from paddle.v2.fluid.executor import Executor
from paddle.v2.fluid.backward import append_backward_ops
......@@ -118,16 +118,17 @@ class TestCPULoDTensorArrayOps(unittest.TestCase):
def main(self, tensor, expect_array, expect_lod, expect_max_len, level=0):
place = self.place()
program = Program()
x = layers.data(name='x', shape=[10], main_program=program)
x.persistable = True
table = layers.lod_rank_table(x, level=level, main_program=program)
max_len = layers.max_sequence_len(table, main_program=program)
max_len.persistable = True
array = layers.lod_tensor_to_array(x, table, main_program=program)
array.persistable = True
result = layers.array_to_lod_tensor(array, table, main_program=program)
result.persistable = True
with program_guard(program):
x = layers.data(name='x', shape=[10])
x.persistable = True
table = layers.lod_rank_table(x, level=level)
max_len = layers.max_sequence_len(table)
max_len.persistable = True
array = layers.lod_tensor_to_array(x, table)
array.persistable = True
result = layers.array_to_lod_tensor(array, table)
result.persistable = True
exe = Executor(place)
scope = core.Scope()
exe.run(program, feed={'x': tensor}, scope=scope)
......@@ -160,19 +161,16 @@ class TestCPULoDTensorArrayOpGrad(unittest.TestCase):
place = core.CPUPlace()
program = Program()
x = layers.data(
name='x',
shape=[1],
dtype='float32',
main_program=program,
stop_gradient=False)
table = layers.lod_rank_table(x, level=0, main_program=program)
array = layers.lod_tensor_to_array(x, table, main_program=program)
result = layers.array_to_lod_tensor(array, table, main_program=program)
with program_guard(program):
x = layers.data(
name='x', shape=[1], dtype='float32', stop_gradient=False)
table = layers.lod_rank_table(x, level=0)
array = layers.lod_tensor_to_array(x, table)
result = layers.array_to_lod_tensor(array, table)
mean = layers.mean(x=result, main_program=program)
mean = layers.mean(x=result)
append_backward_ops(mean)
append_backward_ops(mean)
tensor = core.LoDTensor()
tensor.set(numpy.arange(10).reshape(10, 1).astype('float32'), place)
......
import paddle.v2.fluid.layers as layers
from paddle.v2.fluid.framework import Program
from paddle.v2.fluid.framework import Program, program_guard, default_main_program, default_startup_program
from paddle.v2.fluid.executor import Executor
from paddle.v2.fluid.optimizer import MomentumOptimizer
import paddle.v2.fluid.core as core
......@@ -10,44 +10,42 @@ import numpy as np
class TestMNISTIfElseOp(unittest.TestCase):
def test_raw_api(self):
kwargs = {'startup_program': Program(), 'main_program': Program()}
image = layers.data(name='x', shape=[784], dtype='float32', **kwargs)
prog = Program()
startup_prog = Program()
with program_guard(prog, startup_prog):
image = layers.data(name='x', shape=[784], dtype='float32')
label = layers.data(name='y', shape=[1], dtype='int64', **kwargs)
label = layers.data(name='y', shape=[1], dtype='int64')
limit = layers.fill_constant_batch_size_like(
input=label, dtype='int64', shape=[1], value=5.0, **kwargs)
limit = layers.fill_constant_batch_size_like(
input=label, dtype='int64', shape=[1], value=5.0)
cond = layers.less_than(x=label, y=limit)
true_image, false_image = layers.split_lod_tensor(
input=image, mask=cond)
cond = layers.less_than(x=label, y=limit, **kwargs)
true_image, false_image = layers.split_lod_tensor(
input=image, mask=cond, **kwargs)
true_out = layers.create_tensor(dtype='float32')
true_cond = layers.ConditionalBlock([true_image])
true_out = layers.create_tensor(dtype='float32', **kwargs)
true_cond = layers.ConditionalBlock([true_image], **kwargs)
with true_cond.block():
hidden = layers.fc(input=true_image, size=100, act='tanh')
prob = layers.fc(input=hidden, size=10, act='softmax')
layers.assign(input=prob, output=true_out)
with true_cond.block():
hidden = layers.fc(input=true_image, size=100, act='tanh', **kwargs)
prob = layers.fc(input=hidden, size=10, act='softmax', **kwargs)
layers.assign(input=prob, output=true_out, **kwargs)
false_out = layers.create_tensor(dtype='float32')
false_cond = layers.ConditionalBlock([false_image])
false_out = layers.create_tensor(dtype='float32', **kwargs)
false_cond = layers.ConditionalBlock([false_image], **kwargs)
with false_cond.block():
hidden = layers.fc(input=false_image, size=200, act='tanh')
prob = layers.fc(input=hidden, size=10, act='softmax')
layers.assign(input=prob, output=false_out)
with false_cond.block():
hidden = layers.fc(input=false_image,
size=200,
act='tanh',
**kwargs)
prob = layers.fc(input=hidden, size=10, act='softmax', **kwargs)
layers.assign(input=prob, output=false_out, **kwargs)
prob = layers.merge_lod_tensor(
in_true=true_out, in_false=false_out, mask=cond, x=image)
loss = layers.cross_entropy(input=prob, label=label)
avg_loss = layers.mean(x=loss)
prob = layers.merge_lod_tensor(
in_true=true_out, in_false=false_out, mask=cond, x=image, **kwargs)
loss = layers.cross_entropy(input=prob, label=label, **kwargs)
avg_loss = layers.mean(x=loss, **kwargs)
optimizer = MomentumOptimizer(learning_rate=0.001, momentum=0.9)
optimizer.minimize(avg_loss, kwargs['startup_program'])
optimizer = MomentumOptimizer(learning_rate=0.001, momentum=0.9)
optimizer.minimize(avg_loss, startup_prog)
train_reader = paddle.batch(
paddle.reader.shuffle(
......@@ -57,7 +55,7 @@ class TestMNISTIfElseOp(unittest.TestCase):
place = core.CPUPlace()
exe = Executor(place)
exe.run(kwargs['startup_program'])
exe.run(startup_prog)
PASS_NUM = 100
for pass_id in range(PASS_NUM):
for data in train_reader():
......@@ -65,7 +63,7 @@ class TestMNISTIfElseOp(unittest.TestCase):
y_data = np.array(map(lambda x: x[1], data)).astype("int64")
y_data = np.expand_dims(y_data, axis=1)
outs = exe.run(kwargs['main_program'],
outs = exe.run(prog,
feed={'x': x_data,
'y': y_data},
fetch_list=[avg_loss])
......@@ -75,39 +73,36 @@ class TestMNISTIfElseOp(unittest.TestCase):
self.assertFalse(True)
def test_ifelse(self):
kwargs = {'startup_program': Program(), 'main_program': Program()}
image = layers.data(name='x', shape=[784], dtype='float32', **kwargs)
label = layers.data(name='y', shape=[1], dtype='int64', **kwargs)
limit = layers.fill_constant_batch_size_like(
input=label, dtype='int64', shape=[1], value=5.0, **kwargs)
cond = layers.less_than(x=label, y=limit, **kwargs)
ie = layers.IfElse(cond, **kwargs)
with ie.true_block():
true_image = ie.input(image)
hidden = layers.fc(input=true_image, size=100, act='tanh', **kwargs)
prob = layers.fc(input=hidden, size=10, act='softmax', **kwargs)
ie.output(prob)
with ie.false_block():
false_image = ie.input(image)
hidden = layers.fc(input=false_image,
size=200,
act='tanh',
**kwargs)
prob = layers.fc(input=hidden, size=10, act='softmax', **kwargs)
ie.output(prob)
prob = ie()
loss = layers.cross_entropy(input=prob[0], label=label, **kwargs)
avg_loss = layers.mean(x=loss, **kwargs)
optimizer = MomentumOptimizer(learning_rate=0.001, momentum=0.9)
optimizer.minimize(avg_loss, kwargs['startup_program'])
prog = Program()
startup_prog = Program()
with program_guard(prog, startup_prog):
image = layers.data(name='x', shape=[784], dtype='float32')
label = layers.data(name='y', shape=[1], dtype='int64')
limit = layers.fill_constant_batch_size_like(
input=label, dtype='int64', shape=[1], value=5.0)
cond = layers.less_than(x=label, y=limit)
ie = layers.IfElse(cond)
with ie.true_block():
true_image = ie.input(image)
hidden = layers.fc(input=true_image, size=100, act='tanh')
prob = layers.fc(input=hidden, size=10, act='softmax')
ie.output(prob)
with ie.false_block():
false_image = ie.input(image)
hidden = layers.fc(input=false_image, size=200, act='tanh')
prob = layers.fc(input=hidden, size=10, act='softmax')
ie.output(prob)
prob = ie()
loss = layers.cross_entropy(input=prob[0], label=label)
avg_loss = layers.mean(x=loss)
optimizer = MomentumOptimizer(learning_rate=0.001, momentum=0.9)
optimizer.minimize(avg_loss, startup_prog)
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=8192),
......@@ -135,4 +130,5 @@ class TestMNISTIfElseOp(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
# temp disable if else unittest since it could be buggy.
exit(0)
from __future__ import print_function
import unittest
from paddle.v2.fluid.framework import Program, default_main_program
from paddle.v2.fluid.framework import Program, default_main_program, program_guard
import paddle.v2.fluid.layers as layers
main_program = default_main_program()
......@@ -129,13 +129,10 @@ class TestProgram(unittest.TestCase):
def test_program_clone_with_parameter(self):
main_program = Program()
startup_program = Program()
kwargs = {
'main_program': main_program,
'startup_program': startup_program
}
d = layers.data(name='x', shape=[784], dtype='float32', **kwargs)
hidden = layers.fc(input=d, size=100, **kwargs)
layers.fc(input=hidden, size=100, **kwargs)
with program_guard(main_program, startup_program):
d = layers.data(name='x', shape=[784], dtype='float32')
hidden = layers.fc(input=d, size=100)
layers.fc(input=hidden, size=100)
new_program = main_program.clone()
self.assertNotEqual(0, len(new_program.blocks[0].all_parameters()))
......
......@@ -3,7 +3,7 @@ import numpy as np
from op_test import OpTest
class TestSeqExpand(OpTest):
class TestSequenceExpand(OpTest):
def set_data(self):
x_data = np.random.uniform(0.1, 1, [3, 1]).astype('float32')
y_data = np.random.uniform(0.1, 1, [8, 1]).astype('float32')
......@@ -21,7 +21,7 @@ class TestSeqExpand(OpTest):
self.outputs = {'Out': out}
def setUp(self):
self.op_type = 'seq_expand'
self.op_type = 'sequence_expand'
self.set_data()
self.compute()
......@@ -32,7 +32,7 @@ class TestSeqExpand(OpTest):
self.check_grad(["X"], "Out")
class TestSeqExpandCase1(TestSeqExpand):
class TestSequenceExpandCase1(TestSequenceExpand):
def set_data(self):
x_data = np.random.uniform(0.1, 1, [5, 1]).astype('float32')
x_lod = [[0, 2, 5]]
......@@ -41,7 +41,7 @@ class TestSeqExpandCase1(TestSeqExpand):
self.inputs = {'X': (x_data, x_lod), 'Y': (y_data, y_lod)}
class TestSeqExpandCase2(TestSeqExpand):
class TestSequenceExpandCase2(TestSequenceExpand):
def set_data(self):
x_data = np.random.uniform(0.1, 1, [1, 2, 2]).astype('float32')
x_lod = [[0, 1]]
......@@ -50,7 +50,7 @@ class TestSeqExpandCase2(TestSeqExpand):
self.inputs = {'X': (x_data, x_lod), 'Y': (y_data, y_lod)}
class TestSeqExpandCase3(TestSeqExpand):
class TestSequenceExpandCase3(TestSequenceExpand):
def set_data(self):
x_data = np.random.uniform(0.1, 1, [4, 1]).astype('float32')
x_lod = [[0, 1, 2, 3, 4]]
......
......@@ -2,7 +2,7 @@ import unittest
import paddle.v2.fluid.core as core
import numpy as np
import paddle.v2.fluid.layers as layers
from paddle.v2.fluid.framework import Program
from paddle.v2.fluid.framework import Program, program_guard
from paddle.v2.fluid.executor import Executor
from paddle.v2.fluid.backward import append_backward_ops
......@@ -75,26 +75,22 @@ class TestCPULoDTensorArrayOps(unittest.TestCase):
level=0):
place = self.place()
program = Program()
x = layers.data(name='x', shape=[1], main_program=program)
x.persistable = True
with program_guard(program):
x = layers.data(name='x', shape=[1])
x.persistable = True
y = layers.data(name='y', shape=[1], main_program=program)
y.persistable = True
y = layers.data(name='y', shape=[1])
y.persistable = True
out_true, out_false = layers.split_lod_tensor(
input=x, mask=y, level=level, main_program=program)
out_true.persistable = True
out_false.persistable = True
out_true, out_false = layers.split_lod_tensor(
input=x, mask=y, level=level)
out_true.persistable = True
out_false.persistable = True
out = layers.merge_lod_tensor(
in_true=out_true,
in_false=out_false,
mask=y,
x=x,
level=level,
main_program=program)
out = layers.merge_lod_tensor(
in_true=out_true, in_false=out_false, mask=y, x=x, level=level)
out.persistable = True
out.persistable = True
exe = Executor(place)
scope = core.Scope()
......@@ -123,34 +119,21 @@ class TestCPUSplitMergeLoDTensorGrad(unittest.TestCase):
def test_grad(self):
place = core.CPUPlace()
program = Program()
with program_guard(program):
x = layers.data(
name='x', shape=[1], dtype='float32', stop_gradient=False)
y = layers.data(
name='y', shape=[1], dtype='bool', stop_gradient=False)
x = layers.data(
name='x',
shape=[1],
dtype='float32',
main_program=program,
stop_gradient=False)
y = layers.data(
name='y',
shape=[1],
dtype='bool',
main_program=program,
stop_gradient=False)
level = 0
out_true, out_false = layers.split_lod_tensor(
input=x, mask=y, level=level, main_program=program)
out = layers.merge_lod_tensor(
in_true=out_true,
in_false=out_false,
mask=y,
x=x,
level=level,
main_program=program)
mean = layers.mean(x=out, main_program=program)
append_backward_ops(mean)
level = 0
out_true, out_false = layers.split_lod_tensor(
input=x, mask=y, level=level)
out = layers.merge_lod_tensor(
in_true=out_true, in_false=out_false, mask=y, x=x, level=level)
mean = layers.mean(x=out)
append_backward_ops(mean)
tensor = core.LoDTensor()
tensor.set(np.arange(10).reshape(10, 1).astype('float32'), place)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册