提交 246f6135 编写于 作者: W weixing02

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into format

......@@ -7,3 +7,6 @@ paddle/rnn/imdb.pkl
caffe/image/logs
tensorflow/image/logs
tensorflow/rnn/logs
fluid/models/*.pyc
fluid/logs
fluid/nohup.out
......@@ -40,10 +40,7 @@ def parse_args():
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size.')
parser.add_argument(
'--learning_rate',
type=float,
default=0.001,
help='The minibatch size.')
'--learning_rate', type=float, default=0.001, help='The learning rate.')
# TODO(wuyi): add "--use_fake_data" option back.
parser.add_argument(
'--skip_batch_num',
......@@ -88,8 +85,8 @@ def parse_args():
help='If set, use nvprof for CUDA.')
parser.add_argument(
'--no_test',
action='store_false',
help='If set, test the testset during training.')
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
......@@ -231,13 +228,10 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
train_losses.append(loss)
print("Pass: %d, Iter: %d, Loss: %f\n" %
(pass_id, iters, np.mean(train_losses)))
train_elapsed = time.time() - start_time
examples_per_sec = num_samples / train_elapsed
print('\nTotal examples: %d, total time: %.5f, %.5f examples/sec\n' %
(num_samples, train_elapsed, examples_per_sec))
print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses)))
print_train_time(start_time, time.time(), num_samples)
print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))),
# evaluation
if not args.no_test and batch_acc != None:
if not args.no_test and batch_acc:
pass_test_acc = test(exe, infer_prog, test_reader, feeder,
batch_acc)
print(", Test Accuracy: %f" % pass_test_acc)
......@@ -315,11 +309,8 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
if batch_id % 1 == 0:
print("Pass %d, batch %d, loss %s" %
(pass_id, batch_id, np.array(loss)))
train_elapsed = time.time() - start_time
examples_per_sec = num_samples / train_elapsed
print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
(num_samples, train_elapsed, examples_per_sec))
if not args.no_test and batch_acc != None:
print_train_time(start_time, time.time(), num_samples)
if not args.no_test and batch_acc:
test_acc = test(startup_exe, infer_prog, test_reader, feeder,
batch_acc)
print("Pass: %d, Test Accuracy: %f\n" % (pass_id, test_acc))
......@@ -329,12 +320,19 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
def print_arguments(args):
vars(args)['use_nvprof'] = (vars(args)['use_nvprof'] and
vars(args)['device'] == 'GPU')
print('----------- resnet Configuration Arguments -----------')
print('----------- Configuration Arguments -----------')
for arg, value in sorted(vars(args).iteritems()):
print('%s: %s' % (arg, value))
print('------------------------------------------------')
def print_train_time(start_time, end_time, num_samples):
train_elapsed = end_time - start_time
examples_per_sec = num_samples / train_elapsed
print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
(num_samples, train_elapsed, examples_per_sec))
def main():
args = parse_args()
print_arguments(args)
......@@ -342,7 +340,7 @@ def main():
# the unique trainer id, starting from 0, needed by trainer
# only
nccl_id_var, num_trainers, trainer_id = (
None, 1, int(os.getenv("PADDLE_TRAINER_ID", "-1")))
None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0")))
if args.use_cprof:
pr = cProfile.Profile()
......
......@@ -2,6 +2,7 @@
# This script benchmarking the PaddlePaddle Fluid on
# single thread single GPU.
mkdir -p logs
#export FLAGS_fraction_of_gpu_memory_to_use=0.0
export CUDNN_PATH=/paddle/cudnn_v5
......@@ -35,6 +36,7 @@ nohup stdbuf -oL nvidia-smi \
--format=csv \
--filename=mem.log \
-l 1 &
# mnist
# mnist gpu mnist 128
FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
......@@ -43,7 +45,7 @@ FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
--batch_size=128 \
--skip_batch_num=5 \
--iterations=500 \
2>&1 | tee -a mnist_gpu_128.log
2>&1 | tee -a logs/mnist_gpu_128.log
# vgg16
# gpu cifar10 128
......@@ -53,7 +55,7 @@ FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
--batch_size=128 \
--skip_batch_num=5 \
--iterations=30 \
2>&1 | tee -a vgg16_gpu_128.log
2>&1 | tee -a logs/vgg16_gpu_128.log
# flowers gpu 128
FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
......@@ -63,28 +65,28 @@ FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
--data_set=flowers \
--skip_batch_num=5 \
--iterations=30 \
2>&1 | tee -a vgg16_gpu_flowers_32.log
2>&1 | tee -a logs/vgg16_gpu_flowers_32.log
# resnet50
# resnet50 gpu cifar10 128
FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
--model=resnet50 \
--model=resnet \
--device=GPU \
--batch_size=128 \
--data_set=cifar10 \
--skip_batch_num=5 \
--iterations=30 \
2>&1 | tee -a resnet50_gpu_128.log
2>&1 | tee -a logs/resnet50_gpu_128.log
# resnet50 gpu flowers 64
FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
--model=resnet50 \
--model=resnet \
--device=GPU \
--batch_size=64 \
--data_set=flowers \
--skip_batch_num=5 \
--iterations=30 \
2>&1 | tee -a resnet50_gpu_flowers_64.log
2>&1 | tee -a logs/resnet50_gpu_flowers_64.log
# lstm
# lstm gpu imdb 32 # tensorflow only support batch=32
......@@ -94,7 +96,7 @@ FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
--batch_size=32 \
--skip_batch_num=5 \
--iterations=30 \
2>&1 | tee -a lstm_gpu_32.log
2>&1 | tee -a logs/lstm_gpu_32.log
# seq2seq
# seq2seq gpu wmb 128
......@@ -104,4 +106,4 @@ FLAGS_benchmark=true stdbuf -oL python fluid_benchmark.py \
--batch_size=128 \
--skip_batch_num=5 \
--iterations=30 \
2>&1 | tee -a lstm_gpu_128.log
2>&1 | tee -a logs/lstm_gpu_128.log
......@@ -33,10 +33,19 @@ ELSE()
SET(BUILD_CMD make HAS_SYSTEM_PROTOBUF=false -s -j ${NUM_OF_PROCESSOR} static grpc_cpp_plugin)
ENDIF()
# FIXME(wuyi): do not build zlib cares protobuf twice, find a way to build grpc with them
ExternalProject_Add(
extern_grpc
DEPENDS protobuf zlib
URL "http://paddlepaddledeps.bj.bcebos.com/grpc.tar.xz"
# NOTE(wuyi):
# this package is generated by following steps:
# 1. git clone -b v1.8.x https://github.com/grpc/grpc.git
# 2. submodule update --init
# 3. keep only zlib, cares, protobuf, boringssl under "third_party",
# checkout and clean other dirs under third_party
# 4. remove .git, and package the directory.
URL "http://paddlepaddledeps.bj.bcebos.com/grpc-v1.8.x.tar.gz"
URL_MD5 "c9c58ee7d0e8929a63155af6a2ecdbd0"
PREFIX ${GRPC_SOURCES_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
......@@ -49,7 +58,6 @@ ExternalProject_Add(
INSTALL_COMMAND make prefix=${GRPC_INSTALL_DIR} install
)
# FIXME(typhoonzero): hack to get static lib path, try a better way like merge them.
ADD_LIBRARY(grpc++_unsecure STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET grpc++_unsecure PROPERTY IMPORTED_LOCATION
"${GRPC_INSTALL_DIR}/lib/libgrpc++_unsecure.a")
......
......@@ -59,3 +59,21 @@ get_inference_program
.. autofunction:: paddle.fluid.io.get_inference_program
:noindex:
save_checkpoint
---------------
.. autofunction:: paddle.fluid.io.save_checkpoint
:noindex:
load_checkpoint
---------------
.. autofunction:: paddle.fluid.io.load_checkpoint
:noindex:
clean_checkpoint
----------------
.. autofunction:: paddle.fluid.io.clean_checkpoint
:noindex:
......@@ -181,6 +181,12 @@ Print
.. autofunction:: paddle.fluid.layers.Print
:noindex:
is_empty
--------
.. autofunction:: paddle.fluid.layers.is_empty
:noindex:
device
======
......@@ -255,6 +261,19 @@ double_buffer
.. autofunction:: paddle.fluid.layers.double_buffer
:noindex:
random_data_generator
---------------------
.. autofunction:: paddle.fluid.layers.random_data_generator
:noindex:
Preprocessor
------------
.. autoclass:: paddle.fluid.layers.Preprocessor
:members:
:noindex:
nn
==
......@@ -594,6 +613,29 @@ roi_pool
.. autofunction:: paddle.fluid.layers.roi_pool
:noindex:
dice_loss
---------
.. autofunction:: paddle.fluid.layers.dice_loss
:noindex:
resize_bilinear
---------------
.. autofunction:: paddle.fluid.layers.resize_bilinear
:noindex:
gather
------
.. autofunction:: paddle.fluid.layers.gather
:noindex:
random_crop
-----------
.. autofunction:: paddle.fluid.layers.random_crop
:noindex:
ops
===
......@@ -742,6 +784,12 @@ sum
.. autofunction:: paddle.fluid.layers.sum
:noindex:
shape
-----
.. autofunction:: paddle.fluid.layers.shape
:noindex:
sigmoid
-------
......@@ -991,27 +1039,3 @@ zeros
.. autofunction:: paddle.fluid.layers.zeros
:noindex:
topk
----
.. autofunction:: paddle.fluid.layers.topk
:noindex:
dice_loss
----
.. autofunction:: paddle.fluid.layers.dice_loss
:noindex:
resize_bilinear
____
.. autofunction:: paddle.fluid.layers.resize_bilinear
:noindex:
gather
____
.. autofunction:: paddle.fluid.layers.gather
:noindex:
......@@ -47,28 +47,6 @@ DecayedAdagrad
:members:
:noindex:
Adadelta
-----------------
.. autoclass:: paddle.fluid.optimizer.Adadelta
:members:
:noindex:
RMSProp
-----------------
.. autoclass:: paddle.fluid.optimizer.RMSProp
:members:
:noindex:
ModelAverage
-----------------
.. autoclass:: paddle.fluid.optimizer.ModelAverage
:members:
:noindex:
SGDOptimizer
------------
......@@ -111,19 +89,24 @@ DecayedAdagradOptimizer
:members:
:noindex:
RMSPropOptimizer
----------------
AdadeltaOptimizer
-----------------
.. autoclass:: paddle.fluid.optimizer.AdadeltaOptimizer
.. autoclass:: paddle.fluid.optimizer.RMSPropOptimizer
:members:
:noindex:
Adadelta
--------
RMSPropOptimizer
-----------------
.. autoclass:: paddle.fluid.optimizer.Adadelta
:members:
:noindex:
.. autoclass:: paddle.fluid.optimizer.RMSPropOptimizer
ModelAverage
------------
.. autoclass:: paddle.fluid.optimizer.ModelAverage
:members:
:noindex:
......@@ -133,3 +116,4 @@ Optimizer
.. autoclass:: paddle.fluid.optimizer.Optimizer
:members:
:noindex:
......@@ -23,3 +23,15 @@ profiler
.. autofunction:: paddle.fluid.profiler.profiler
:noindex:
start_profiler
--------------
.. autofunction:: paddle.fluid.profiler.start_profiler
:noindex:
stop_profiler
-------------
.. autofunction:: paddle.fluid.profiler.stop_profiler
:noindex:
# How to use RecordIO in Fluid
If you want to use RecordIO as your training data format, you need to convert to your training data
to RecordIO files and reading them in the process of training, PaddlePaddle Fluid provides some
interface to deal with the RecordIO files.
## Generate RecordIO File
Before start training with RecordIO files, you need to convert your training data
to RecordIO format by `fluid.recordio_writer.convert_reader_to_recordio_file`, the sample codes
as follows:
```python
reader = paddle.batch(mnist.train(), batch_size=1)
feeder = fluid.DataFeeder(
feed_list=[ # order is image and label
fluid.layers.data(
name='image', shape=[784]),
fluid.layers.data(
name='label', shape=[1], dtype='int64'),
],
place=fluid.CPUPlace())
fluid.recordio_writer.convert_reader_to_recordio_file('./mnist.recordio', reader, feeder)
```
The above code snippet would generate a RecordIO `./mnist.recordio` on your host.
**NOTE**: we recommend users to set `batch_size=1` when generating the recordio files so that users can
adjust it flexibly while reading it.
## Use the RecordIO file in a Local Training Job
PaddlePaddle Fluid provides an interface `fluid.layers.io.open_recordio_file` to load your RecordIO file
and then you can use them as a Layer in your network configuration, the sample codes as follows:
```python
data_file = fluid.layers.io.open_recordio_file(
filename="./mnist.recordio",
shapes=[(-1, 784),(-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int32"])
data_file = fluid.layers.io.batch(data_file, batch_size=4)
img, label = fluid.layers.io.read_file(data_file)
hidden = fluid.layers.fc(input=img, size=100, act='tanh')
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=prediction, label=label)
avg_loss = fluid.layers.mean(loss)
fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
avg_loss_np = []
# train a pass
batch_id = 0
while True:
tmp, = exe.run(fetch_list=[avg_loss])
avg_loss_np.append(tmp)
print(batch_id)
batch_id += 1
```
## Use the RecordIO files in Distributed Training
1. generate multiple RecordIO files
For a distributed training job, you may have multiple trainer nodes,
and one or more RecordIO files for one trainer node, you can use the interface
`fluid.recordio_writer.convert_reader_to_recordio_files` to convert your training data
into multiple RecordIO files, the sample codes as follows:
```python
reader = paddle.batch(mnist.train(), batch_size=1)
feeder = fluid.DataFeeder(
feed_list=[ # order is image and label
fluid.layers.data(
name='image', shape=[784]),
fluid.layers.data(
name='label', shape=[1], dtype='int64'),
],
place=fluid.CPUPlace())
fluid.recordio_writer.convert_reader_to_recordio_files(
filename_suffix='./mnist.recordio', batch_per_file=100, reader, feeder)
```
The above codes would generate multiple RecordIO files on your host like:
```bash
.
\_mnist-00000.recordio
|-mnist-00001.recordio
|-mnist-00002.recordio
|-mnist-00003.recordio
|-mnist-00004.recordio
```
2. open multiple RecordIO files by `fluid.layers.io.open_files`
For a distributed training job, the distributed operator system will schedule trainer process on multiple nodes,
each trainer process reads parts of the whole training data, we usually take the following approach to make the training
data allocated by each trainer process as uniform as possiable:
```python
def gen_train_list(file_pattern, trainers, trainer_id):
file_list = glob.glob(file_pattern)
ret_list = []
for idx, f in enumerate(file_list):
if (idx + trainers) % trainers == trainer_id:
ret_list.append(f)
return ret_list
trainers = int(os.getenv("TRAINERS"))
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
data_file = fluid.layers.io.open_files(
filenames=gen_train_list("./mnist-[0-9]*.recordio", 2, 0),
thread_num=1,
shapes=[(-1, 784),(-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int32"])
img, label = fluid.layers.io.read_file(data_files)
...
```
../../../../../benchmark/cluster/README.md
\ No newline at end of file
../../../../../../benchmark/cluster/vgg16/README.md
\ No newline at end of file
## 堆内存分析和优化
计算机程序都可能有内存泄露的风险。**内存泄露**一般是由于程序在堆(heap)上分配了内存而没有释放,随着程序的运行占用的内存越来越大,一方面会影响程序的稳定性,可能让运行速度越来越慢,或者造成oom,甚至会影响运行程序的机器的稳定性,造成宕机。
目前有很多内存泄露分析工具,比较经典的有[valgrind](http://valgrind.org/docs/manual/quick-start.html#quick-start.intro), [gperftools](https://gperftools.github.io/gperftools/)
因为Fluid是用Python驱动C++ core来运行,valgrind直接分析非常困难,需要自己编译debug版本的、带valgrind支持的专用Python版本,而且输出的信息中大部分是Python自己的符号和调用信息,分析起来很困难,另外使用valgrind会让程序运行速度变得非常慢,所以不建议使用。
本教程主要介绍[gperftools](https://gperftools.github.io/gperftools/)的使用。
gperftool主要支持以下四个功能:
- thread-caching malloc
- heap-checking using tcmalloc
- heap-profiling using tcmalloc
- CPU profiler
Paddle也提供了基于gperftool的[CPU性能分析教程](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/fluid/howto/optimization/cpu_profiling_cn.md)
对于堆内存的分析,主要用到thread-caching malloc和heap-profiling using tcmalloc。
## 使用流程
#### 环境
本教程基于paddle提供的Docker开发环境paddlepaddle/paddle:latest-dev,基于Ubuntu 16.04.4 LTS环境。
#### 使用流程
- 安装google-perftools
```
apt-get install libunwind-dev
apt-get install google-perftools
```
- 安装pprof
```
go get -u github.com/google/pprof
```
- 设置运行环境
```
export PPROF_PATH=/root/gopath/bin/pprof
export PPROF_BINARY_PATH=/root/gopath/bin/pprof
export LD_PRELOAD=/usr/lib/libtcmalloc.so.4
```
- 使用heap profile来运行python程序。本质上是周期性的对堆的分配情况做一次快照。
```
# HEAPPROFILE 设置生成的堆分析文件的目录和文件前缀
# HEAP_PROFILE_ALLOCATION_INTERVAL 设置每分配多少存储dump一次dump,默认1GB
env HEAPPROFILE="./perf_log/test.log" HEAP_PROFILE_ALLOCATION_INTERVAL=209715200 python trainer.py
```
随着程序的运行,会在perf_log这个文件夹下生成很多文件,如下:
```
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0001.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0002.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0003.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0004.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0005.heap
-rw-r--r-- 1 root root 1.0M Jun 1 15:00 test.log.0006.heap
```
- 使用pprof对heap文件进行分析。分析有两种模式:
- 完整模式。会对当前heap做一个分析,显示目前分配内存一些调用路径。
```
pprof --pdf python test.log.0012.heap
```
上述命令会生成一个profile00x.pdf的文件,可以直接打开,例如:[allocator](https://github.com/jacquesqiao/Paddle/blob/tutorial-of-memory-profile/doc/fluid/howto/optimization/memory_cpu_allocator.pdf)。从下图可以看出,在CPU版本fluid的运行过程中,分配存储最多的模块式CPUAllocator. 而别的模块相对而言分配内存较少,所以被忽略了,这对于分配内存泄露是很不方便的,因为泄露是一个缓慢的过程,在这种图中是无法看到的。
![result](https://user-images.githubusercontent.com/3048612/40964027-a54033e4-68dc-11e8-836a-144910c4bb8c.png)
- Diff模式。可以对两个时刻的heap做diff,把一些内存分配没有发生变化的模块去掉,而把增量部分显示出来。
```
pprof --pdf --base test.log.0010.heap python test.log.1045.heap
```
生成的结果为:[`memory_leak_protobuf`](https://github.com/jacquesqiao/Paddle/blob/tutorial-of-memory-profile/doc/fluid/howto/optimization/memory_leak_protobuf.pdf)
从图中可以看出:ProgramDesc这个结构,在两个版本之间增长了200MB+,所以这里有很大的内存泄露的可能性,最终结果也确实证明是这里造成了泄露。
![result](https://user-images.githubusercontent.com/3048612/40964057-b434d5e4-68dc-11e8-894b-8ab62bcf26c2.png)
![result](https://user-images.githubusercontent.com/3048612/40964063-b7dbee44-68dc-11e8-9719-da279f86477f.png)
......@@ -63,16 +63,16 @@ Android的Docker开发镜像向用户提供两个可配置的参数:
- 编译`armeabi-v7a``Android API 21`的PaddlePaddle库
```bash
$ docker run -it --rm -v $PWD:/paddle -e "ANDROID_ABI=armeabi-v7a" -e "ANDROID_API=21" username/paddle-android:dev
$ docker run -it --rm -v $PWD:/paddle -w /paddle -e "ANDROID_ABI=armeabi-v7a" -e "ANDROID_API=21" username/paddle-android:dev ./paddle/scripts/paddle_build.sh build_android
```
- 编译`arm64-v8a``Android API 21`的PaddlePaddle库
```bash
$ docker run -it --rm -v $PWD:/paddle -e "ANDROID_ABI=arm64-v8a" -e "ANDROID_API=21" username/paddle-android:dev
$ docker run -it --rm -v $PWD:/paddle -w /paddle -e "ANDROID_ABI=arm64-v8a" -e "ANDROID_API=21" username/paddle-android:dev ./paddle/scripts/paddle_build.sh build_android
```
执行上述`docker run`命令时,容器默认执行[paddle/scripts/docker/build_android.sh](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/docker/build_android.sh)脚本。该脚本中记录了交叉编译Android版PaddlePaddle库常用的CMake配置,并且会根据`ANDROID_ABI``ANDROID_API`自动构建独立工具链、进行编译和安装。由于arm64架构要求Android API不小于21。因此当`ANDROID_ABI=arm64-v8a``ANDROID_API<21`时,Docker容器中将默认使用`Android API 21`的编译工具链。用户可以参考下文[配置交叉编译参数](#配置交叉编译参数)章节,根据个人的需求修改定制Docker容器所执行的脚本。编译安装结束之后,PaddlePaddle的C-API库将被安装到`$PWD/install_android`目录,所依赖的第三方库同时也被安装到`$PWD/install_android/third_party`目录。
执行上述`docker run`命令时,容器执行[paddle/scripts/paddle_build.sh build_android](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/paddle_build.sh)脚本。该脚本中记录了交叉编译Android版PaddlePaddle库常用的CMake配置,并且会根据`ANDROID_ABI``ANDROID_API`自动构建独立工具链、进行编译和安装。由于arm64架构要求Android API不小于21。因此当`ANDROID_ABI=arm64-v8a``ANDROID_API<21`时,Docker容器中将默认使用`Android API 21`的编译工具链。用户可以参考下文[配置交叉编译参数](#配置交叉编译参数)章节,根据个人的需求修改定制Docker容器所执行的脚本。编译安装结束之后,PaddlePaddle的C-API库将被安装到`$PWD/install_android`目录,所依赖的第三方库同时也被安装到`$PWD/install_android/third_party`目录。
## 基于Linux交叉编译环境的编译方式
本文档将以Linux x86-64平台为例,介绍交叉编译Android平台上适用的PaddlePaddle库的方法和步骤。
......
......@@ -36,7 +36,7 @@ $ docker pull docker.paddlepaddlehub.com/paddle:latest-dev-android
We can run the Docker image we just created to build the inference library of PaddlePaddle for Android using the command below:
```bash
$ docker run -it --rm -v $PWD:/paddle -e "ANDROID_ABI=armeabi-v7a" -e "ANDROID_API=21" paddle:dev-android
$ docker run -it --rm -v $PWD:/paddle -w /paddle -e "ANDROID_ABI=armeabi-v7a" -e "ANDROID_API=21" paddle:dev-android ./paddle/scripts/paddle_build.sh build_android
```
The Docker image accepts two arguments `ANDROID_ABI` and `ANDROID_API`:
......@@ -70,7 +70,7 @@ The Docker image accepts two arguments `ANDROID_ABI` and `ANDROID_API`:
The ARM-64 architecture (`arm64-v8a`) requires at least level 21 of Android API.
The default entry-point of the Docker image, [`paddle/scripts/docker/build_android.sh`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/docker/build_android.sh) generates the [Android cross-compiling standalone toolchain](https://developer.android.com/ndk/guides/standalone_toolchain.html) based on the argument: `ANDROID_ABI` or `ANDROID_API`. For information about other configuration arguments, please continue reading.
The build command, [`paddle/scripts/paddle_build.sh build_android`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/paddle_build.sh) generates the [Android cross-compiling standalone toolchain](https://developer.android.com/ndk/guides/standalone_toolchain.html) based on the argument: `ANDROID_ABI` or `ANDROID_API`. For information about other configuration arguments, please continue reading.
The above command generates and outputs the inference library in `$PWD/install_android` and puts third-party libraries in `$PWD/install_android/third_party`.
......
......@@ -23,7 +23,7 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安
在 `这里 <https://github.com/PaddlePaddle/Paddle/tree/develop/tools/manylinux1/>`__ 找到 paddle_manylinux_devel
镜像的编译以及使用方法。或者参考下述可选步骤,从源码中构建用于编译PaddlePaddle的Docker镜像。
如果您选择不使用Docker镜像,则需要在本机安装下面章节列出的 `编译依赖`_ 之后才能开始编译的步骤。
如果您选择不使用Docker镜像,则需要在本机安装下面章节列出的 :ref:`编译依赖 <_compile_deps>` 之后才能开始编译的步骤。
编译PaddlePaddle,需要执行:
......@@ -106,7 +106,7 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安
- 学习 Docker 有多难?
理解 Docker 并不难,大概花十分钟看一下[这篇文章](https://zhuanlan.zhihu.com/p/19902938)。这可以帮您省掉花一小时安装和配置各种开发工具,以及切换机器时需要新安装的辛苦。别忘了 PaddlePaddle 更新可能导致需要新的开发工具。更别提简化问题复现带来的好处了。
理解 Docker 并不难,大概花十分钟看一下 `这篇文章 <https://zhuanlan.zhihu.com/p/19902938>`_ 。这可以帮您省掉花一小时安装和配置各种开发工具,以及切换机器时需要新安装的辛苦。别忘了 PaddlePaddle 更新可能导致需要新的开发工具。更别提简化问题复现带来的好处了。
- 我可以用 IDE 吗?
......@@ -123,7 +123,7 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安
- 可以并行编译吗?
是的。我们的 Docker image 运行一个 [Bash 脚本](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/docker/build.sh)。这个脚本调用 `make -j$(nproc)` 来启动和 CPU 核一样多的进程来并行编译。
是的。我们的 Docker image 运行一个 `Bash脚本 <https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/docker/build.sh>`_ 。这个脚本调用 `make -j$(nproc)` 来启动和 CPU 核一样多的进程来并行编译。
- Docker 需要 sudo
......@@ -131,11 +131,11 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安
- 在 Windows/MacOS 上编译很慢
Docker 在 Windows 和 MacOS 都可以运行。不过实际上是运行在一个 Linux 虚拟机上。可能需要注意给这个虚拟机多分配一些 CPU 和内存,以保证编译高效。具体做法请参考[这个issue](https://github.com/PaddlePaddle/Paddle/issues/627)
Docker 在 Windows 和 MacOS 都可以运行。不过实际上是运行在一个 Linux 虚拟机上。可能需要注意给这个虚拟机多分配一些 CPU 和内存,以保证编译高效。具体做法请参考 `这个issue <https://github.com/PaddlePaddle/Paddle/issues/627>`_
- 磁盘不够
本文中的例子里,`docker run` 命令里都用了 `--rm` 参数,这样保证运行结束之后的 containers 不会保留在磁盘上。可以用 `docker ps -a` 命令看到停止后但是没有删除的 containers。`docker build` 命令有时候会产生一些中间结果,是没有名字的 images,也会占用磁盘。可以参考[这篇文章](https://zaiste.net/posts/removing_docker_containers/)来清理这些内容。
本文中的例子里,`docker run` 命令里都用了 `--rm` 参数,这样保证运行结束之后的 containers 不会保留在磁盘上。可以用 `docker ps -a` 命令看到停止后但是没有删除的 containers。`docker build` 命令有时候会产生一些中间结果,是没有名字的 images,也会占用磁盘。可以参考 `这篇文章 <https://zaiste.net/posts/removing_docker_containers/>`_ 来清理这些内容。
.. _compile_deps:
......
......@@ -26,6 +26,8 @@ you can also find how to build and use paddle_manylinux_devel Docker image from
`here <https://github.com/PaddlePaddle/Paddle/tree/develop/tools/manylinux1/>`__
Or you can build your own image from source as the optional step below:
If you don't wish to use docker,you need to install several compile dependencies manually as :ref:`Compile Dependencies <_compile_deps>` shows to start compilation.
.. code-block:: bash
# 1. clone the source code
......@@ -108,7 +110,7 @@ Frequently Asked Questions
- How difficult is it to learn Docker?
It takes you ten minutes to read [an introductory article](https://docs.docker.com/get-started) and saves you more than one hour to install all required build tools, configure them, especially when new versions of PaddlePaddle require some new tools. Not even to mention the time saved when other people trying to reproduce the issue you have.
It takes you ten minutes to read `an introductory article <https://docs.docker.com/get-started>`_ and saves you more than one hour to install all required build tools, configure them, especially when new versions of PaddlePaddle require some new tools. Not even to mention the time saved when other people trying to reproduce the issue you have.
- Can I use my favorite IDE?
......@@ -125,7 +127,7 @@ Frequently Asked Questions
- Does Docker do parallel building?
Our building Docker image runs a [Bash script](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/docker/build.sh), which calls `make -j$(nproc)` to starts as many processes as the number of your CPU cores.
Our building Docker image runs a `Bash script <https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/scripts/docker/build.sh>`_ , which calls `make -j$(nproc)` to starts as many processes as the number of your CPU cores.
- Docker requires sudo
......@@ -133,11 +135,11 @@ Frequently Asked Questions
- Docker on Windows/MacOS builds slowly
On Windows and MacOS, Docker containers run in a Linux VM. You might want to give this VM some more memory and CPUs so to make the building efficient. Please refer to [this issue](https://github.com/PaddlePaddle/Paddle/issues/627) for details.
On Windows and MacOS, Docker containers run in a Linux VM. You might want to give this VM some more memory and CPUs so to make the building efficient. Please refer to `this issue <https://github.com/PaddlePaddle/Paddle/issues/627>`_ for details.
- Not enough disk space
Examples in this article use option `--rm` with the `docker run` command. This option ensures that stopped containers do not exist on hard disks. We can use `docker ps -a` to list all containers, including stopped. Sometimes `docker build` generates some intermediate dangling images, which also take disk space. To clean them, please refer to [this article](https://zaiste.net/posts/removing_docker_containers/).
Examples in this article use option `--rm` with the `docker run` command. This option ensures that stopped containers do not exist on hard disks. We can use `docker ps -a` to list all containers, including stopped. Sometimes `docker build` generates some intermediate dangling images, which also take disk space. To clean them, please refer to `this article <https://zaiste.net/posts/removing_docker_containers/>`_ .
.. _compile_deps:
......
......@@ -17,6 +17,42 @@ if(APPLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=pessimizing-move")
endif(APPLE)
set(ANAKIN_INCLUDE "" CACHE STRING "root of Anakin header files")
set(ANAKIN_LIBRARY "" CACHE STRING "path of Anakin library")
set(inference_deps paddle_inference_api paddle_fluid_api)
# if anakin is set enable anakin api implementation
if(ANAKIN_INCLUDE_DIR AND ANAKIN_LIBRARY)
set(ANAKIN_FOUND ON)
else()
set(ANAKIN_FOUND OFF)
endif()
if (ANAKIN_FOUND)
# Anakin's code style doesn't follow google c style.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=comment
-Wno-error=reorder
-Wno-error=format
-Wno-error=switch
-Wno-error=return-type
-Wno-error=non-virtual-dtor
-Wno-error=cpp")
message(STATUS "Anakin for inference is enabled")
message(STATUS "Anakin is set INCLUDE:${ANAKIN_INCLUDE} LIBRARY:${ANAKIN_LIBRARY}")
include_directories("${ANAKIN_INCLUDE}")
# Anakin's source path is a mass, need to set sub-directories trivially.
include_directories("${ANAKIN_INCLUDE}/saber")
link_directories("${ANAKIN_LIBRARY}")
nv_library(inference_anakin_api SRCS paddle_inference_api_anakin_engine.cc)
target_link_libraries(inference_anakin_api anakin)
list(APPEND inference_deps inference_anakin_api)
endif()
function(inference_api_test TARGET_NAME)
if (WITH_TESTING)
set(options "")
......@@ -27,7 +63,7 @@ function(inference_api_test TARGET_NAME)
set(PYTHON_TESTS_DIR ${PADDLE_BINARY_DIR}/python/paddle/fluid/tests)
cc_test(${TARGET_NAME}
SRCS ${TARGET_NAME}.cc
DEPS paddle_fluid paddle_inference_api
DEPS "${inference_deps}"
ARGS --dirname=${PYTHON_TESTS_DIR}/book/)
if(inference_test_ARGS)
set_tests_properties(${TARGET_NAME}
......@@ -47,6 +83,11 @@ cc_test(test_paddle_inference_api
inference_api_test(test_paddle_inference_api_impl
ARGS test_word2vec test_image_classification)
if (ANAKIN_FOUND)
nv_test(inference_anakin_test SRCS paddle_inference_api_anakin_engine_tester.cc
DEPS ${inference_deps} protobuf)
endif()
if(WITH_TESTING)
add_subdirectory(demo)
endif()
......@@ -54,7 +54,7 @@ void Main(bool use_gpu) {
CHECK(predictor->Run(slots, &outputs));
//# 4. Get output.
ASSERT_EQ(outputs.size(), 1);
ASSERT_EQ(outputs.size(), 1UL);
LOG(INFO) << "output buffer size: " << outputs.front().data.length;
const size_t num_elements = outputs.front().data.length / sizeof(float);
// The outputs' buffers are in CPU memory.
......@@ -65,7 +65,10 @@ void Main(bool use_gpu) {
}
TEST(demo, word2vec_cpu) { Main(false /*use_gpu*/); }
#ifdef PADDLE_WITH_CUDA
TEST(demo, word2vec_gpu) { Main(true /*use_gpu*/); }
#endif
} // namespace demo
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
/*
* This file contains the definition of a simple Inference API for Paddle.
......@@ -47,8 +47,8 @@ struct PaddleTensor {
enum class PaddleEngineKind {
kNative = 0, // Use the native Fluid facility.
kAnakin, // Use Anakin for inference.
// TODO(Superjomn) support following engines latter.
// kAnakin, // Use Anakin for inference.
// kTensorRT, // Use TensorRT for inference.
// kAutoMixedAnakin, // Automatically mix Fluid with Anakin.
// kAutoMixedTensorRT, // Automatically mix Fluid with TensorRT.
......@@ -63,6 +63,7 @@ class PaddlePredictor {
struct Config;
PaddlePredictor() = default;
PaddlePredictor(const PaddlePredictor&) = delete;
PaddlePredictor& operator=(const PaddlePredictor&) = delete;
// Predict an record.
// The caller should be responsible for allocating and releasing the memory of
......@@ -76,7 +77,7 @@ class PaddlePredictor {
virtual std::unique_ptr<PaddlePredictor> Clone() = 0;
// Destroy the Predictor.
virtual ~PaddlePredictor() {}
virtual ~PaddlePredictor() = default;
// The common configs for all the predictors.
struct Config {
......@@ -95,6 +96,13 @@ struct NativeConfig : public PaddlePredictor::Config {
std::string param_file;
};
// Configurations for Anakin engine.
struct AnakinConfig : public PaddlePredictor::Config {
int device;
std::string model_file;
int max_batch_size{-1};
};
// A factory to help create different predictors.
//
// FOR EXTENSION DEVELOPER:
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cuda.h>
#include "paddle/contrib/inference/paddle_inference_api_anakin_engine.h"
namespace paddle {
PaddleInferenceAnakinPredictor::PaddleInferenceAnakinPredictor(
const AnakinConfig &config) {
CHECK(Init(config));
}
bool PaddleInferenceAnakinPredictor::Init(const AnakinConfig &config) {
// TODO(Superjomn) Tell anakin to support return code.
engine_.Build(config.model_file, config.max_batch_size);
return true;
}
bool PaddleInferenceAnakinPredictor::Run(
const std::vector<PaddleTensor> &inputs,
std::vector<PaddleTensor> *output_data) {
for (const auto &input : inputs) {
if (input.dtype != PaddleDType::FLOAT32) {
LOG(ERROR) << "Only support float type inputs. " << input.name
<< "'s type is not float";
return false;
}
engine_.SetInputFromCPU(
input.name, static_cast<float *>(input.data.data), input.data.length);
}
// TODO(Superjomn) Tell anakin to support return code.
engine_.Execute();
if (output_data->empty()) {
LOG(ERROR) << "At least one output should be set with tensors' names.";
return false;
}
for (auto &output : *output_data) {
auto *tensor = engine_.GetOutputInGPU(output.name);
output.shape = tensor->shape();
// Copy data from GPU -> CPU
if (cudaMemcpy(output.data.data,
tensor->data(),
tensor->size(),
cudaMemcpyDeviceToHost) != 0) {
LOG(ERROR) << "copy data from GPU to CPU error";
return false;
}
}
return true;
}
// TODO(Superjomn) To implement latter.
std::unique_ptr<PaddlePredictor> PaddleInferenceAnakinPredictor::Clone() {
return nullptr;
}
// A factory to help create difference predictor.
template <>
std::unique_ptr<PaddlePredictor>
CreatePaddlePredictor<AnakinConfig, PaddleEngineKind::kAnakin>(
const AnakinConfig &config) {
std::unique_ptr<PaddlePredictor> x(
new PaddleInferenceAnakinPredictor(config));
return x;
};
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
/*
* This file contains the implementation of inference API with Anakin engine
* embeded, this API can only support Anakin models.
*/
#pragma once
// NOTE This header file do not have namespace.
// TODO(Superjomn) Tell Anakin to provide better APIs.
#include <test/framework/net/paddle_api.h>
#include "paddle/contrib/inference/paddle_inference_api.h"
namespace paddle {
class PaddleInferenceAnakinPredictor : public PaddlePredictor {
public:
PaddleInferenceAnakinPredictor(const AnakinConfig& config);
// NOTE Unlike the native engine, the buffers of anakin engine's output_data
// should be allocated first.
// TODO(Superjomn) should unify all the behaviors of output_data accross all
// the engines.
bool Run(const std::vector<PaddleTensor>& inputs,
std::vector<PaddleTensor>* output_data) override;
std::unique_ptr<PaddlePredictor> Clone() override;
private:
bool Init(const AnakinConfig& config);
anakin::AnakinEngine<anakin::NV,
anakin::saber::AK_FLOAT,
anakin::Precision::FP32>
engine_;
};
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/contrib/inference/paddle_inference_api.h"
#include <gtest/gtest.h>
namespace paddle {
TEST(inference, anakin) {
AnakinConfig config;
auto engine =
CreatePaddlePredictor<AnakinConfig, PaddleEngineKind::kAnakin>(config);
}
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <sys/time.h>
#include <algorithm>
......@@ -54,7 +54,8 @@ std::string num2str(T a) {
}
} // namespace
bool NativePaddlePredictor::Init() {
bool NativePaddlePredictor::Init(
std::shared_ptr<framework::Scope> parent_scope) {
VLOG(3) << "Predictor::init()";
if (config_.use_gpu) {
......@@ -62,9 +63,15 @@ bool NativePaddlePredictor::Init() {
} else {
place_ = paddle::platform::CPUPlace();
}
if (parent_scope) {
scope_ = parent_scope;
sub_scope_ = &(parent_scope->NewScope());
} else {
paddle::framework::InitDevices(false);
executor_.reset(new paddle::framework::Executor(place_));
scope_.reset(new paddle::framework::Scope());
}
executor_.reset(new paddle::framework::Executor(place_));
// Initialize the inference program
if (!config_.model_dir.empty()) {
......@@ -83,13 +90,8 @@ bool NativePaddlePredictor::Init() {
return false;
}
ctx_ = executor_->Prepare(*inference_program_, 0);
// Create temporary variables first, so that the first batch do not need to
// create variables in the runtime. This is the logics of the old inference
// API.
// TODO(Superjomn) this should be modified when `Clone` is valid for
// multi-thread application.
executor_->CreateVariables(*inference_program_, scope_.get(), 0);
executor_->CreateVariables(
*inference_program_, sub_scope_ ? sub_scope_ : scope_.get(), 0);
// Get the feed_target_names and fetch_target_names
feed_target_names_ = inference_program_->GetFeedTargetNames();
......@@ -97,6 +99,13 @@ bool NativePaddlePredictor::Init() {
return true;
}
NativePaddlePredictor::~NativePaddlePredictor() {
if (sub_scope_) {
PADDLE_ENFORCE_NOT_NULL(scope_, "Should have parent scope!");
scope_->DeleteScope(sub_scope_);
}
};
bool NativePaddlePredictor::Run(const std::vector<PaddleTensor> &inputs,
std::vector<PaddleTensor> *output_data) {
VLOG(3) << "Predictor::predict";
......@@ -121,8 +130,9 @@ bool NativePaddlePredictor::Run(const std::vector<PaddleTensor> &inputs,
}
// Run the inference program
// if share variables, we need not create variables
executor_->RunPreparedContext(ctx_.get(),
scope_.get(),
executor_->RunPreparedContext(
ctx_.get(),
sub_scope_ != nullptr ? sub_scope_ : scope_.get(),
&feed_targets,
&fetch_targets,
false /* don't create variable eatch time */);
......@@ -138,7 +148,7 @@ std::unique_ptr<PaddlePredictor> NativePaddlePredictor::Clone() {
VLOG(3) << "Predictor::clone";
std::unique_ptr<PaddlePredictor> cls(new NativePaddlePredictor(config_));
if (!dynamic_cast<NativePaddlePredictor *>(cls.get())->Init()) {
if (!dynamic_cast<NativePaddlePredictor *>(cls.get())->Init(scope_)) {
LOG(ERROR) << "fail to call Init";
return nullptr;
}
......@@ -266,7 +276,7 @@ CreatePaddlePredictor<NativeConfig, PaddleEngineKind::kNative>(
}
std::unique_ptr<PaddlePredictor> predictor(new NativePaddlePredictor(config));
if (!dynamic_cast<NativePaddlePredictor *>(predictor.get())->Init()) {
if (!dynamic_cast<NativePaddlePredictor *>(predictor.get())->Init(nullptr)) {
return nullptr;
}
return std::move(predictor);
......
......@@ -34,14 +34,15 @@ class NativePaddlePredictor : public PaddlePredictor {
explicit NativePaddlePredictor(const NativeConfig &config)
: config_(config) {}
bool Init();
// will only create sub scope if have global scope
bool Init(std::shared_ptr<framework::Scope> parent_scope);
bool Run(const std::vector<PaddleTensor> &inputs,
std::vector<PaddleTensor> *output_data) override;
std::unique_ptr<PaddlePredictor> Clone() override;
~NativePaddlePredictor() override{};
~NativePaddlePredictor() override;
private:
bool SetFeed(const std::vector<PaddleTensor> &input_datas,
......@@ -52,11 +53,13 @@ class NativePaddlePredictor : public PaddlePredictor {
NativeConfig config_;
platform::Place place_;
std::unique_ptr<framework::Executor> executor_;
std::unique_ptr<framework::Scope> scope_;
std::shared_ptr<framework::Scope> scope_;
std::unique_ptr<framework::ExecutorPrepareContext> ctx_;
std::unique_ptr<framework::ProgramDesc> inference_program_;
std::vector<std::string> feed_target_names_;
std::vector<std::string> fetch_target_names_;
// Do not use unique_ptr, use parent scope to delete
framework::Scope *sub_scope_{nullptr};
};
} // namespace paddle
......@@ -169,17 +169,13 @@ void BlockDesc::Flush() {
}
if (need_update_) {
auto &op_field = *this->desc_->mutable_ops();
this->ClearPBOps();
op_field.Reserve(static_cast<int>(ops_.size()));
this->desc_->mutable_ops()->Clear();
for (auto &op_desc : ops_) {
op_field.AddAllocated(op_desc->Proto());
this->desc_->mutable_ops()->Add()->CopyFrom(*op_desc->Proto());
}
auto &var_field = *this->desc_->mutable_vars();
this->ClearPBVars();
var_field.Reserve(static_cast<int>(vars_.size()));
this->desc_->mutable_vars()->Clear();
for (auto &var_desc : vars_) {
var_field.AddAllocated(var_desc.second->Proto());
this->desc_->mutable_vars()->Add()->CopyFrom(*var_desc.second->Proto());
}
need_update_ = false;
}
......@@ -217,22 +213,6 @@ BlockDesc::BlockDesc(const BlockDesc &other, proto::BlockDesc *desc,
}
}
void BlockDesc::ClearPBOps() {
auto ops = this->desc_->mutable_ops();
while (!ops->empty()) {
// we do not own the OpDesc, so release the ownership.
ops->ReleaseLast();
}
}
void BlockDesc::ClearPBVars() {
auto vars = this->desc_->mutable_vars();
while (!vars->empty()) {
// we do not own the VarDesc, so release the ownership.
vars->ReleaseLast();
}
}
void BlockDesc::SetForwardBlockID(int32_t forward_block_id) {
PADDLE_ENFORCE(!desc_->has_forward_block_idx(),
"Parent block ID has been set to %d. Cannot set to %d",
......
......@@ -41,11 +41,6 @@ class BlockDesc {
BlockDesc(const BlockDesc &other, proto::BlockDesc *desc, ProgramDesc *prog);
~BlockDesc() {
this->ClearPBVars();
this->ClearPBOps();
}
int32_t ID() const { return desc_->idx(); }
int32_t Parent() const { return desc_->parent_idx(); }
......@@ -113,10 +108,6 @@ class BlockDesc {
ProgramDesc *Program() const { return this->prog_; }
private:
void ClearPBOps();
void ClearPBVars();
private:
ProgramDesc *prog_; // not_own
proto::BlockDesc *desc_; // not_own
......
......@@ -220,8 +220,10 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
has_fetch_operators(program.Block(0), *fetch_targets, fetch_holder_name);
ProgramDesc* copy_program = const_cast<ProgramDesc*>(&program);
std::unique_ptr<ProgramDesc> unique_ptr_of_copy_program;
if (!has_feed_ops || !has_fetch_ops) {
copy_program = std::unique_ptr<ProgramDesc>(new ProgramDesc(program)).get();
unique_ptr_of_copy_program.reset(new ProgramDesc(program));
copy_program = unique_ptr_of_copy_program.get();
}
auto* global_block = copy_program->MutableBlock(0);
......
......@@ -38,3 +38,11 @@ inference_test(recommender_system)
#inference_test(rnn_encoder_decoder)
#inference_test(understand_sentiment ARGS conv)
inference_test(word2vec)
# This is an unly work around to make this test run
# TODO(TJ): clean me up
cc_test(test_inference_nlp
SRCS test_inference_nlp.cc
DEPS paddle_fluid
ARGS
--model_path=${PADDLE_BINARY_DIR}/python/paddle/fluid/tests/book/recognize_digits_mlp.inference.model)
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <sys/time.h>
#include <time.h>
#include <fstream>
#include <thread> // NOLINT
#include "gflags/gflags.h"
#include "gtest/gtest.h"
#include "paddle/fluid/inference/tests/test_helper.h"
#ifdef PADDLE_WITH_MKLML
#include <mkl_service.h>
#include <omp.h>
#endif
DEFINE_string(model_path, "", "Directory of the inference model.");
DEFINE_string(data_file, "", "File of input index data.");
DEFINE_int32(repeat, 100, "Running the inference program repeat times");
DEFINE_bool(use_mkldnn, false, "Use MKLDNN to run inference");
DEFINE_bool(prepare_vars, true, "Prepare variables before executor");
DEFINE_int32(num_threads, 1, "Number of threads should be used");
inline double GetCurrentMs() {
struct timeval time;
gettimeofday(&time, NULL);
return 1e+3 * time.tv_sec + 1e-3 * time.tv_usec;
}
// This function just give dummy data for recognize_digits model.
size_t DummyData(std::vector<paddle::framework::LoDTensor>* out) {
paddle::framework::LoDTensor input;
SetupTensor<float>(&input, {1, 1, 28, 28}, -1.f, 1.f);
out->emplace_back(input);
return 1;
}
// Load the input word index data from file and save into LodTensor.
// Return the size of words.
size_t LoadData(std::vector<paddle::framework::LoDTensor>* out,
const std::string& filename) {
if (filename.empty()) {
return DummyData(out);
}
size_t sz = 0;
std::fstream fin(filename);
std::string line;
out->clear();
while (getline(fin, line)) {
std::istringstream iss(line);
std::vector<int64_t> ids;
std::string field;
while (getline(iss, field, ' ')) {
ids.push_back(stoi(field));
}
if (ids.size() >= 1024) {
// Synced with NLP guys, they will ignore input larger then 1024
continue;
}
paddle::framework::LoDTensor words;
paddle::framework::LoD lod{{0, ids.size()}};
words.set_lod(lod);
int64_t* pdata = words.mutable_data<int64_t>(
{static_cast<int64_t>(ids.size()), 1}, paddle::platform::CPUPlace());
memcpy(pdata, ids.data(), words.numel() * sizeof(int64_t));
out->emplace_back(words);
sz += ids.size();
}
return sz;
}
// Split input data samples into small pieces jobs as balanced as possible,
// according to the number of threads.
void SplitData(
const std::vector<paddle::framework::LoDTensor>& datasets,
std::vector<std::vector<const paddle::framework::LoDTensor*>>* jobs,
const int num_threads) {
size_t s = 0;
jobs->resize(num_threads);
while (s < datasets.size()) {
for (auto it = jobs->begin(); it != jobs->end(); it++) {
it->emplace_back(&datasets[s]);
s++;
if (s >= datasets.size()) {
break;
}
}
}
}
void ThreadRunInfer(
const int tid, paddle::framework::Executor* executor,
paddle::framework::Scope* scope,
const std::unique_ptr<paddle::framework::ProgramDesc>& inference_program,
const std::vector<std::vector<const paddle::framework::LoDTensor*>>& jobs) {
auto copy_program = std::unique_ptr<paddle::framework::ProgramDesc>(
new paddle::framework::ProgramDesc(*inference_program));
auto& sub_scope = scope->NewScope();
std::string feed_holder_name = "feed_" + paddle::string::to_string(tid);
std::string fetch_holder_name = "fetch_" + paddle::string::to_string(tid);
copy_program->SetFeedHolderName(feed_holder_name);
copy_program->SetFetchHolderName(fetch_holder_name);
const std::vector<std::string>& feed_target_names =
copy_program->GetFeedTargetNames();
const std::vector<std::string>& fetch_target_names =
copy_program->GetFetchTargetNames();
PADDLE_ENFORCE_EQ(fetch_target_names.size(), 1UL);
std::map<std::string, paddle::framework::LoDTensor*> fetch_targets;
paddle::framework::LoDTensor outtensor;
fetch_targets[fetch_target_names[0]] = &outtensor;
std::map<std::string, const paddle::framework::LoDTensor*> feed_targets;
PADDLE_ENFORCE_EQ(feed_target_names.size(), 1UL);
auto& inputs = jobs[tid];
auto start_ms = GetCurrentMs();
for (size_t i = 0; i < inputs.size(); ++i) {
feed_targets[feed_target_names[0]] = inputs[i];
executor->Run(*copy_program, &sub_scope, &feed_targets, &fetch_targets,
true /*create_local_scope*/, true /*create_vars*/,
feed_holder_name, fetch_holder_name);
}
auto stop_ms = GetCurrentMs();
scope->DeleteScope(&sub_scope);
LOG(INFO) << "Tid: " << tid << ", process " << inputs.size()
<< " samples, avg time per sample: "
<< (stop_ms - start_ms) / inputs.size() << " ms";
}
TEST(inference, nlp) {
if (FLAGS_model_path.empty()) {
LOG(FATAL) << "Usage: ./example --model_path=path/to/your/model";
}
if (FLAGS_data_file.empty()) {
LOG(WARNING) << "No data file provided, will use dummy data!"
<< "Note: if you use nlp model, please provide data file.";
}
LOG(INFO) << "Model Path: " << FLAGS_model_path;
LOG(INFO) << "Data File: " << FLAGS_data_file;
std::vector<paddle::framework::LoDTensor> datasets;
size_t num_total_words = LoadData(&datasets, FLAGS_data_file);
LOG(INFO) << "Number of samples (seq_len<1024): " << datasets.size();
LOG(INFO) << "Total number of words: " << num_total_words;
const bool model_combined = false;
// 0. Call `paddle::framework::InitDevices()` initialize all the devices
// 1. Define place, executor, scope
auto place = paddle::platform::CPUPlace();
auto executor = paddle::framework::Executor(place);
std::unique_ptr<paddle::framework::Scope> scope(
new paddle::framework::Scope());
// 2. Initialize the inference_program and load parameters
std::unique_ptr<paddle::framework::ProgramDesc> inference_program;
inference_program =
InitProgram(&executor, scope.get(), FLAGS_model_path, model_combined);
if (FLAGS_use_mkldnn) {
EnableMKLDNN(inference_program);
}
#ifdef PADDLE_WITH_MKLML
// only use 1 thread number per std::thread
omp_set_dynamic(0);
omp_set_num_threads(1);
mkl_set_num_threads(1);
#endif
double start_ms = 0, stop_ms = 0;
if (FLAGS_num_threads > 1) {
std::vector<std::vector<const paddle::framework::LoDTensor*>> jobs;
SplitData(datasets, &jobs, FLAGS_num_threads);
std::vector<std::unique_ptr<std::thread>> threads;
start_ms = GetCurrentMs();
for (int i = 0; i < FLAGS_num_threads; ++i) {
threads.emplace_back(
new std::thread(ThreadRunInfer, i, &executor, scope.get(),
std::ref(inference_program), std::ref(jobs)));
}
for (int i = 0; i < FLAGS_num_threads; ++i) {
threads[i]->join();
}
stop_ms = GetCurrentMs();
} else {
if (FLAGS_prepare_vars) {
executor.CreateVariables(*inference_program, scope.get(), 0);
}
// always prepare context
std::unique_ptr<paddle::framework::ExecutorPrepareContext> ctx;
ctx = executor.Prepare(*inference_program, 0);
// preapre fetch
const std::vector<std::string>& fetch_target_names =
inference_program->GetFetchTargetNames();
PADDLE_ENFORCE_EQ(fetch_target_names.size(), 1UL);
std::map<std::string, paddle::framework::LoDTensor*> fetch_targets;
paddle::framework::LoDTensor outtensor;
fetch_targets[fetch_target_names[0]] = &outtensor;
// prepare feed
const std::vector<std::string>& feed_target_names =
inference_program->GetFeedTargetNames();
PADDLE_ENFORCE_EQ(feed_target_names.size(), 1UL);
std::map<std::string, const paddle::framework::LoDTensor*> feed_targets;
// feed data and run
start_ms = GetCurrentMs();
for (size_t i = 0; i < datasets.size(); ++i) {
feed_targets[feed_target_names[0]] = &(datasets[i]);
executor.RunPreparedContext(ctx.get(), scope.get(), &feed_targets,
&fetch_targets, !FLAGS_prepare_vars);
}
stop_ms = GetCurrentMs();
LOG(INFO) << "Tid: 0, process " << datasets.size()
<< " samples, avg time per sample: "
<< (stop_ms - start_ms) / datasets.size() << " ms";
}
LOG(INFO) << "Total inference time with " << FLAGS_num_threads
<< " threads : " << (stop_ms - start_ms) / 1000.0
<< " sec, QPS: " << datasets.size() / ((stop_ms - start_ms) / 1000);
}
......@@ -222,35 +222,35 @@ struct MKLDNNActivationGradFunc : public BaseActivationFunctor<T> {
};
template <typename T>
using ReluMkldnnFunctor =
using ReluMKLDNNFunctor =
MKLDNNActivationFunc<T, mkldnn::algorithm::eltwise_relu>;
template <typename T>
using TanhMkldnnFunctor =
using TanhMKLDNNFunctor =
MKLDNNActivationFunc<T, mkldnn::algorithm::eltwise_tanh>;
template <typename T>
using SqrtMkldnnFunctor =
using SqrtMKLDNNFunctor =
MKLDNNActivationFunc<T, mkldnn::algorithm::eltwise_sqrt>;
template <typename T>
using AbsMkldnnFunctor =
using AbsMKLDNNFunctor =
MKLDNNActivationFunc<T, mkldnn::algorithm::eltwise_abs>;
template <typename T>
using ReluMkldnnGradFunctor =
using ReluMKLDNNGradFunctor =
MKLDNNActivationGradFunc<T, mkldnn::algorithm::eltwise_relu>;
template <typename T>
using TanhMkldnnGradFunctor =
using TanhMKLDNNGradFunctor =
MKLDNNActivationGradFunc<T, mkldnn::algorithm::eltwise_tanh>;
template <typename T>
using SqrtMkldnnGradFunctor =
using SqrtMKLDNNGradFunctor =
MKLDNNActivationGradFunc<T, mkldnn::algorithm::eltwise_sqrt>;
template <typename T>
using AbsMkldnnGradFunctor =
using AbsMKLDNNGradFunctor =
MKLDNNActivationGradFunc<T, mkldnn::algorithm::eltwise_abs>;
} // namespace operators
} // namespace paddle
......@@ -265,9 +265,9 @@ namespace ops = paddle::operators;
ops::MKLDNNActivationGradKernel<ops::grad_functor<float>>);
#define FOR_EACH_MKLDNN_KERNEL_FUNCTOR(__macro) \
__macro(relu, ReluMkldnnFunctor, ReluMkldnnGradFunctor); \
__macro(tanh, TanhMkldnnFunctor, TanhMkldnnGradFunctor); \
__macro(sqrt, SqrtMkldnnFunctor, SqrtMkldnnGradFunctor); \
__macro(abs, AbsMkldnnFunctor, AbsMkldnnGradFunctor);
__macro(relu, ReluMKLDNNFunctor, ReluMKLDNNGradFunctor); \
__macro(tanh, TanhMKLDNNFunctor, TanhMKLDNNGradFunctor); \
__macro(sqrt, SqrtMKLDNNFunctor, SqrtMKLDNNGradFunctor); \
__macro(abs, AbsMKLDNNFunctor, AbsMKLDNNGradFunctor);
FOR_EACH_MKLDNN_KERNEL_FUNCTOR(REGISTER_ACTIVATION_MKLDNN_KERNEL);
......@@ -38,6 +38,25 @@ void RPCClient::Init() {
if (rpc_client_.get() == nullptr) {
rpc_client_.reset(new RPCClient());
}
rpc_client_->InitEventLoop();
}
void RPCClient::InitEventLoop() {
// start the client process thread
// TODO(wuyi): can make this in a threadpool
client_thread_.reset(new std::thread(std::bind(&RPCClient::Proceed, this)));
}
RPCClient::~RPCClient() {
Wait();
cq_.Shutdown();
{
std::lock_guard<std::mutex> guard(chan_mutex_);
for (auto& it : channels_) {
it.second.reset();
}
}
client_thread_->join();
}
bool RPCClient::AsyncSendVariable(const std::string& ep,
......@@ -204,70 +223,37 @@ void RPCClient::AsyncSendFetchBarrier(const std::string& ep, int64_t time_out) {
req_count_++;
}
bool RPCClient::Wait() {
VLOG(3) << "RPCClient begin Wait()"
<< " req_count_:" << req_count_;
if (req_count_ <= 0) {
return true;
}
const size_t kReqCnt = req_count_;
bool a[kReqCnt];
std::vector<std::future<void>> waits(req_count_);
std::mutex mu;
for (int i = 0; i < req_count_; i++) {
waits[i] = framework::AsyncIO([i, &a, &mu, this] {
bool ret = Proceed();
std::lock_guard<std::mutex> l(mu);
a[i] = ret;
});
}
for (int i = 0; i < req_count_; i++) {
waits[i].wait();
}
int last_req_count = req_count_;
req_count_ = 0;
for (int i = 0; i < last_req_count; i++) {
if (!a[i]) {
return false;
}
}
return true;
void RPCClient::Wait() {
std::unique_lock<std::mutex> lk(sync_mutex_);
sync_cond_.wait(lk, [this] { return req_count_ == 0; });
}
bool RPCClient::Proceed() {
void* tag = NULL;
void RPCClient::Proceed() {
void* tag = nullptr;
bool ok = false;
// request counts.
if (!cq_.Next(&tag, &ok)) {
LOG(ERROR) << "Get meets CompletionQueue error";
return false;
}
GPR_ASSERT(ok);
PADDLE_ENFORCE(tag);
// TODO(gongwb): add more retries.
while (cq_.Next(&tag, &ok)) {
BaseProcessor* c = static_cast<BaseProcessor*>(tag);
if (!c->status_.ok()) {
LOG(ERROR) << "proc param error:" << c->var_h_.String()
GPR_ASSERT(ok);
PADDLE_ENFORCE(c);
if (c->status_.ok()) {
c->Process();
} else {
LOG(ERROR) << "var: " << c->var_h_.String()
<< " grpc error:" << c->status_.error_message();
delete c;
return false;
}
c->Process();
delete c;
return true;
{
std::lock_guard<std::mutex> lk(sync_mutex_);
req_count_--;
}
sync_cond_.notify_all();
}
}
std::shared_ptr<grpc::Channel> RPCClient::GetChannel(const std::string& ep) {
// TODO(Yancey1989): make grpc client completely thread-safe
std::unique_lock<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> guard(chan_mutex_);
auto it = channels_.find(ep);
if (it != channels_.end()) {
return it->second;
......
......@@ -17,14 +17,17 @@ limitations under the License. */
#include <time.h>
#include <chrono> // NOLINT
#include <condition_variable> // NOLINT
#include <ctime>
#include <functional>
#include <iostream>
#include <map>
#include <mutex> // NOLINT
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "grpc++/channel.h"
#include "grpc++/generic/generic_stub.h"
#include "grpc++/grpc++.h"
#include "grpc++/support/byte_buffer.h"
......@@ -164,6 +167,7 @@ class FetchBarrierProcessor : public BaseProcessor {
class RPCClient {
public:
RPCClient() {}
~RPCClient();
static RPCClient* GetInstance();
......@@ -192,19 +196,28 @@ class RPCClient {
void AsyncSendFetchBarrier(const std::string& ep,
int64_t time_out = 600 * 1000);
bool Wait();
void Wait();
// InitEventLoop should only be called by Init()
void InitEventLoop();
private:
bool Proceed();
void Proceed();
std::shared_ptr<grpc::Channel> GetChannel(const std::string& ep);
// Init is called by GetInstance.
static void Init();
private:
grpc::CompletionQueue cq_;
std::map<std::string, std::shared_ptr<grpc::Channel>> channels_;
std::unordered_map<std::string, std::shared_ptr<grpc::Channel>> channels_;
std::unique_ptr<std::thread> client_thread_;
// mutex for Wait client sync
std::mutex sync_mutex_;
std::condition_variable sync_cond_;
std::atomic<int64_t> req_count_{0};
std::mutex mutex_;
// mutex for GetChannel thread safety
std::mutex chan_mutex_;
static std::unique_ptr<RPCClient> rpc_client_;
static std::once_flag init_flag_;
DISABLE_COPY_AND_ASSIGN(RPCClient);
......
......@@ -68,9 +68,7 @@ class RequestSend final : public RequestBase {
method_id, &ctx_, request_.get(), &responder_, cq_, cq_,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id)));
}
virtual ~RequestSend() {}
std::string GetReqName() override { return request_->Varname(); }
void Process() override {
......@@ -82,7 +80,6 @@ class RequestSend final : public RequestBase {
framework::Variable* outvar = nullptr;
request_handler_->Handle(varname, scope, invar, &outvar);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
......@@ -125,7 +122,6 @@ class RequestGet final : public RequestBase {
SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
&reply_);
}
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
......@@ -170,10 +166,9 @@ class RequestPrefetch final : public RequestBase {
SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
&reply_);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
status_ = FINISH;
}
protected:
......
......@@ -71,8 +71,6 @@ class AsyncGRPCServer final : public RPCServer {
std::unique_ptr<::grpc::Server> server_;
// condition of the sub program
std::mutex barrier_mutex_;
mutable int barrier_cond_step_;
std::condition_variable barrier_condition_;
std::mutex mutex_ready_;
......
......@@ -113,10 +113,6 @@ void StartServer() {
std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::StartServer, g_rpc_service.get()));
// FIXME(gongwb): don't use hard time.
sleep(10);
LOG(INFO) << "got nccl id and stop server...";
g_rpc_service->ShutDown();
server_thread.join();
}
......@@ -127,7 +123,7 @@ TEST(PREFETCH, CPU) {
std::thread server_thread(StartServer);
g_rpc_service->WaitServerReady();
detail::RPCClient client;
detail::RPCClient* client = detail::RPCClient::GetInstance();
int port = g_rpc_service->GetSelectedPort();
std::string ep = paddle::string::Sprintf("127.0.0.1:%d", port);
......@@ -141,8 +137,8 @@ TEST(PREFETCH, CPU) {
std::string in_var_name("ids");
std::string out_var_name("out");
client.AsyncPrefetchVariable(ep, ctx, scope, in_var_name, out_var_name);
client.Wait();
client->AsyncPrefetchVariable(ep, ctx, scope, in_var_name, out_var_name);
client->Wait();
auto var = scope.Var(out_var_name);
auto value = var->GetMutable<framework::SelectedRows>()->value();
auto ptr = value.mutable_data<float>(place);
......@@ -152,6 +148,7 @@ TEST(PREFETCH, CPU) {
}
}
g_rpc_service->ShutDown();
server_thread.join();
LOG(INFO) << "begin reset";
g_rpc_service.reset(nullptr);
......
......@@ -45,13 +45,13 @@ class FetchBarrierOp : public framework::OperatorBase {
auto rpc_client = detail::RPCClient::GetInstance();
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
for (auto& ep : eps) {
VLOG(3) << "fetch barrier, ep: " << ep;
rpc_client->AsyncSendFetchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
};
......
......@@ -222,8 +222,8 @@ static void FillRequestCtx(detail::RequestHandler *h, framework::Scope *scope,
h->SetDevCtx(dev_ctx);
h->SetExecutor(executor);
h->SetProgram(program);
h->SetPrefetchPreparedCtx(std::move(
std::unique_ptr<framework::ExecutorPrepareContext>(prefetch_ctx)));
h->SetPrefetchPreparedCtx(
std::unique_ptr<framework::ExecutorPrepareContext>(prefetch_ctx));
h->SetRPCServer(rpc_server);
}
......
......@@ -53,7 +53,7 @@ class PrefetchOp : public framework::OperatorBase {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
};
......
......@@ -26,7 +26,11 @@ class MultiFileReader : public framework::ReaderBase {
MultiFileReader(const std::vector<std::string>& file_names,
const std::vector<framework::DDim>& dims, size_t thread_num,
size_t buffer_size)
: file_names_(file_names), dims_(dims), buffer_size_(buffer_size) {
: buffer_size_(buffer_size) {
readers_.reserve(file_names.size());
for (const std::string& f_name : file_names) {
readers_.emplace_back(CreateReaderByFileName(f_name, dims));
}
prefetchers_.resize(thread_num);
StartNewScheduler();
}
......@@ -40,14 +44,13 @@ class MultiFileReader : public framework::ReaderBase {
void StartNewScheduler();
void EndScheduler();
void ScheduleThreadFunc();
void PrefetchThreadFunc(std::string file_name, size_t thread_idx);
void PrefetchThreadFunc(size_t reader_idx, size_t thread_idx);
std::vector<std::string> file_names_;
std::vector<framework::DDim> dims_;
std::vector<std::unique_ptr<framework::ReaderBase>> readers_;
std::thread scheduler_;
std::vector<std::thread> prefetchers_;
size_t buffer_size_;
reader::BlockingQueue<size_t>* waiting_file_idx_;
reader::BlockingQueue<size_t>* waiting_reader_idx_;
reader::BlockingQueue<size_t>* available_thread_idx_;
reader::BlockingQueue<std::vector<framework::LoDTensor>>* buffer_;
};
......@@ -65,15 +68,15 @@ void MultiFileReader::ReInit() {
void MultiFileReader::StartNewScheduler() {
size_t thread_num = prefetchers_.size();
waiting_file_idx_ = new reader::BlockingQueue<size_t>(file_names_.size());
waiting_reader_idx_ = new reader::BlockingQueue<size_t>(readers_.size());
available_thread_idx_ = new reader::BlockingQueue<size_t>(thread_num);
buffer_ = new reader::BlockingQueue<std::vector<framework::LoDTensor>>(
buffer_size_);
for (size_t i = 0; i < file_names_.size(); ++i) {
waiting_file_idx_->Send(i);
for (size_t i = 0; i < readers_.size(); ++i) {
waiting_reader_idx_->Send(i);
}
waiting_file_idx_->Close();
waiting_reader_idx_->Close();
for (size_t i = 0; i < thread_num; ++i) {
available_thread_idx_->Send(i);
}
......@@ -84,13 +87,13 @@ void MultiFileReader::StartNewScheduler() {
void MultiFileReader::EndScheduler() {
available_thread_idx_->Close();
buffer_->Close();
waiting_file_idx_->Close();
waiting_reader_idx_->Close();
if (scheduler_.joinable()) {
scheduler_.join();
}
delete buffer_;
delete available_thread_idx_;
delete waiting_file_idx_;
delete waiting_reader_idx_;
}
void MultiFileReader::ScheduleThreadFunc() {
......@@ -102,12 +105,11 @@ void MultiFileReader::ScheduleThreadFunc() {
if (prefetcher.joinable()) {
prefetcher.join();
}
size_t file_idx;
if (waiting_file_idx_->Receive(&file_idx)) {
size_t reader_idx;
if (waiting_reader_idx_->Receive(&reader_idx)) {
// Still have files to read. Start a new prefetch thread.
std::string file_name = file_names_[file_idx];
prefetcher = std::thread([this, file_name, thread_idx] {
PrefetchThreadFunc(file_name, thread_idx);
prefetcher = std::thread([this, reader_idx, thread_idx] {
PrefetchThreadFunc(reader_idx, thread_idx);
});
} else {
// No more file to read.
......@@ -129,23 +131,22 @@ void MultiFileReader::ScheduleThreadFunc() {
VLOG(5) << "MultiFileReader schedule thread terminates.";
}
void MultiFileReader::PrefetchThreadFunc(std::string file_name,
size_t thread_idx) {
VLOG(5) << "The prefetch thread of file '" << file_name << "' starts.";
std::unique_ptr<framework::ReaderBase> reader =
CreateReaderByFileName(file_name, dims_);
void MultiFileReader::PrefetchThreadFunc(size_t reader_idx, size_t thread_idx) {
VLOG(5) << "The prefetch thread of file idx '" << reader_idx << "' starts.";
std::unique_ptr<framework::ReaderBase>& reader = readers_[reader_idx];
while (true) {
std::vector<framework::LoDTensor> ins;
reader->ReadNext(&ins);
if (ins.empty()) {
reader->ReInit();
break;
}
try {
buffer_->Send(std::move(ins));
} catch (paddle::platform::EnforceNotMet e) {
VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch "
"thread of file '"
<< file_name << "' will terminate.";
"thread of file idx '"
<< reader_idx << "' will terminate.";
break;
}
}
......@@ -154,7 +155,8 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name,
VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. "
"Fail to send thread_idx.";
}
VLOG(5) << "The prefetch thread of file '" << file_name << "' terminates.";
VLOG(5) << "The prefetch thread of file idx '" << reader_idx
<< "' terminates.";
}
class OpenFilesOp : public framework::OperatorBase {
......
......@@ -51,7 +51,7 @@ class RecvOp : public framework::OperatorBase {
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
}
if (sync_mode) {
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
}
};
......
......@@ -135,15 +135,16 @@ class ReduceKernel : public framework::OpKernel<T> {
} else {
int ndim = context.Input<Tensor>("X")->dims().size();
int rdim = context.Attr<std::vector<int>>("dim").size();
HANDLE_DIM(6, 5);
HANDLE_DIM(6, 4);
HANDLE_DIM(6, 3);
HANDLE_DIM(6, 2);
HANDLE_DIM(6, 1);
HANDLE_DIM(5, 4);
HANDLE_DIM(5, 3);
HANDLE_DIM(5, 2);
HANDLE_DIM(5, 1);
// comments for accelerating compiling temporarily.
// HANDLE_DIM(6, 5);
// HANDLE_DIM(6, 4);
// HANDLE_DIM(6, 3);
// HANDLE_DIM(6, 2);
// HANDLE_DIM(6, 1);
// HANDLE_DIM(5, 4);
// HANDLE_DIM(5, 3);
// HANDLE_DIM(5, 2);
// HANDLE_DIM(5, 1);
HANDLE_DIM(4, 3);
HANDLE_DIM(4, 2);
HANDLE_DIM(4, 1);
......
......@@ -49,13 +49,13 @@ class SendBarrierOp : public framework::OperatorBase {
VLOG(3) << "SendBarrierOp sync_mode:" << sync_mode;
// need to wait before sending send_barrier message
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
if (sync_mode) {
for (auto& ep : eps) {
VLOG(3) << "send barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
}
};
......
......@@ -59,14 +59,14 @@ class SendOp : public framework::OperatorBase {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
if (sync_mode) {
for (auto& ep : endpoints) {
VLOG(3) << "batch barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
if (outs.size() > 0) {
......@@ -74,13 +74,13 @@ class SendOp : public framework::OperatorBase {
VLOG(2) << "getting " << outs[i] << " from " << epmap[i];
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
// tell pservers that current trainer have called fetch
for (auto& ep : endpoints) {
VLOG(2) << "send fetch barrier, ep: " << ep;
rpc_client->AsyncSendFetchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
rpc_client->Wait();
}
}
};
......
......@@ -114,7 +114,7 @@ class SGDOpKernel : public framework::OpKernel<T> {
int64_t id_index = param.Index(grad.rows()[i]);
PADDLE_ENFORCE_GE(id_index, static_cast<int64_t>(0),
"id should be in the table");
for (size_t j = 0; j < grad_row_width; j++) {
for (int64_t j = 0; j < grad_row_width; j++) {
out_data[id_index * grad_row_width + j] -=
lr[0] * grad_data[i * grad_row_width + j];
}
......
......@@ -61,7 +61,6 @@ void StartServer() {
std::bind(&detail::AsyncGRPCServer::StartServer, g_rpc_service.get()));
g_rpc_service->SetCond(detail::kRequestSend);
std::cout << "before WaitFanInOfSend" << std::endl;
g_rpc_service->WaitBarrier(detail::kRequestSend);
LOG(INFO) << "got nccl id and stop server...";
......@@ -88,12 +87,12 @@ TEST(SendNcclId, GrpcServer) {
int port = g_rpc_service->GetSelectedPort();
std::string ep = string::Sprintf("127.0.0.1:%d", port);
detail::RPCClient client;
LOG(INFO) << "connect to server" << ep;
client.AsyncSendVariable(ep, dev_ctx, scope, NCCL_ID_VARNAME);
client.Wait();
client.AsyncSendBatchBarrier(ep);
client.Wait();
detail::RPCClient* client = detail::RPCClient::GetInstance();
LOG(INFO) << "connect to server " << ep;
client->AsyncSendVariable(ep, dev_ctx, scope, NCCL_ID_VARNAME);
client->Wait();
client->AsyncSendBatchBarrier(ep);
client->Wait();
server_thread.join();
g_rpc_service.reset(nullptr);
......
......@@ -119,40 +119,56 @@ bool Chunk::Write(std::ostream& os, Compressor ct) const {
}
bool Chunk::Parse(std::istream& sin) {
Header hdr;
bool ok = hdr.Parse(sin);
ChunkParser parser(sin);
if (!parser.Init()) {
return false;
}
Clear();
while (parser.HasNext()) {
Add(parser.Next());
}
return true;
}
ChunkParser::ChunkParser(std::istream& sin) : in_(sin) {}
bool ChunkParser::Init() {
pos_ = 0;
bool ok = header_.Parse(in_);
if (!ok) {
return ok;
}
auto beg_pos = sin.tellg();
uint32_t crc = Crc32Stream(sin, hdr.CompressSize());
PADDLE_ENFORCE_EQ(hdr.Checksum(), crc);
Clear();
sin.seekg(beg_pos, sin.beg);
std::unique_ptr<std::istream> compressed_stream;
switch (hdr.CompressType()) {
auto beg_pos = in_.tellg();
uint32_t crc = Crc32Stream(in_, header_.CompressSize());
PADDLE_ENFORCE_EQ(header_.Checksum(), crc);
in_.seekg(beg_pos, in_.beg);
switch (header_.CompressType()) {
case Compressor::kNoCompress:
break;
case Compressor::kSnappy:
compressed_stream.reset(new snappy::iSnappyStream(sin));
compressed_stream_.reset(new snappy::iSnappyStream(in_));
break;
default:
PADDLE_THROW("Not implemented");
}
return true;
}
std::istream& stream = compressed_stream ? *compressed_stream : sin;
bool ChunkParser::HasNext() const { return pos_ < header_.NumRecords(); }
for (uint32_t i = 0; i < hdr.NumRecords(); ++i) {
std::string ChunkParser::Next() {
if (!HasNext()) {
return "";
}
++pos_;
std::istream& stream = compressed_stream_ ? *compressed_stream_ : in_;
uint32_t rec_len;
stream.read(reinterpret_cast<char*>(&rec_len), sizeof(uint32_t));
std::string buf;
buf.resize(rec_len);
stream.read(&buf[0], rec_len);
PADDLE_ENFORCE_EQ(rec_len, stream.gcount());
Add(buf);
}
return true;
return buf;
}
} // namespace recordio
} // namespace paddle
......@@ -13,6 +13,7 @@
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include <vector>
......@@ -53,9 +54,20 @@ class Chunk {
DISABLE_COPY_AND_ASSIGN(Chunk);
};
size_t CompressData(const char* in, size_t in_length, Compressor ct, char* out);
class ChunkParser {
public:
explicit ChunkParser(std::istream& sin);
bool Init();
std::string Next();
bool HasNext() const;
void DeflateData(const char* in, size_t in_length, Compressor ct, char* out);
private:
Header header_;
uint32_t pos_{0};
std::istream& in_;
std::unique_ptr<std::istream> compressed_stream_;
};
} // namespace recordio
} // namespace paddle
......@@ -22,35 +22,33 @@ namespace paddle {
namespace recordio {
Scanner::Scanner(std::unique_ptr<std::istream> &&stream)
: stream_(std::move(stream)) {
: stream_(std::move(stream)), parser_(*stream_) {
Reset();
}
Scanner::Scanner(const std::string &filename) {
stream_.reset(new std::ifstream(filename));
Scanner::Scanner(const std::string &filename)
: stream_(new std::ifstream(filename)), parser_(*stream_) {
Reset();
}
void Scanner::Reset() {
stream_->clear();
stream_->seekg(0, std::ios::beg);
ParseNextChunk();
parser_.Init();
}
std::string Scanner::Next() {
PADDLE_ENFORCE(!eof_, "StopIteration");
auto rec = cur_chunk_.Record(offset_++);
if (offset_ == cur_chunk_.NumRecords()) {
ParseNextChunk();
if (stream_->eof()) {
return "";
}
return rec;
}
void Scanner::ParseNextChunk() {
eof_ = !cur_chunk_.Parse(*stream_);
offset_ = 0;
auto res = parser_.Next();
if (!parser_.HasNext() && HasNext()) {
parser_.Init();
}
return res;
}
bool Scanner::HasNext() const { return !eof_; }
bool Scanner::HasNext() const { return !stream_->eof(); }
} // namespace recordio
} // namespace paddle
......@@ -37,11 +37,7 @@ class Scanner {
private:
std::unique_ptr<std::istream> stream_;
Chunk cur_chunk_;
size_t offset_;
bool eof_;
void ParseNextChunk();
ChunkParser parser_;
};
} // namespace recordio
} // namespace paddle
......@@ -145,19 +145,17 @@ function check_style() {
trap 'abort' 0
set -e
# install glide
curl https://glide.sh/get | bash
if [ -x "$(command -v gimme)" ]; then
eval "$(GIMME_GO_VERSION=1.8.3 gimme)"
fi
# set up go environment for running gometalinter
mkdir -p $GOPATH/src/github.com/PaddlePaddle/
ln -sf ${PADDLE_ROOT} $GOPATH/src/github.com/PaddlePaddle/Paddle
cd $GOPATH/src/github.com/PaddlePaddle/Paddle/go; glide install; cd -
go get github.com/alecthomas/gometalinter
gometalinter --install
mkdir -p ./build/go
cp go/glide.* build/go
cd build/go; glide install; cd -
cd ${PADDLE_ROOT}
export PATH=/usr/bin:$PATH
pre-commit install
clang-format --version
......
......@@ -71,6 +71,7 @@ __all__ = [
'cumsum',
'scatter',
'sum',
'polygon_box_transform',
'shape',
] + __activations__
......
......@@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import core
import contextlib
__all__ = ['convert_reader_to_recordio_file']
__all__ = [
'convert_reader_to_recordio_file', 'convert_reader_to_recordio_files'
]
@contextlib.contextmanager
......@@ -46,3 +48,36 @@ def convert_reader_to_recordio_file(
writer.complete_append_tensor()
counter += 1
return counter
def convert_reader_to_recordio_files(
filename,
batch_per_file,
reader_creator,
feeder,
compressor=core.RecordIOWriter.Compressor.Snappy,
max_num_records=1000,
feed_order=None):
if feed_order is None:
feed_order = feeder.feed_names
f_name, f_ext = os.path.splitext(filename)
assert (f_ext == ".recordio")
lines = []
f_idx = 0
counter = 0
for idx, batch in enumerate(reader_creator()):
lines.append(batch)
if idx >= batch_per_file and idx % batch_per_file == 0:
filename = "%s-%05d%s" % (f_name, f_idx, f_ext)
with create_recordio_writer(filename, compressor,
max_num_records) as writer:
for l in lines:
res = feeder.feed(l)
for each in feed_order:
writer.append_tensor(res[each])
writer.complete_append_tensor()
counter += 1
lines = []
f_idx += 1
return counter
......@@ -48,13 +48,15 @@ def linear():
return avg_loss
def optimizer_func():
return fluid.optimizer.SGD(learning_rate=0.001)
def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
trainer = fluid.Trainer(
train_func=train_program,
place=place,
optimizer=fluid.optimizer.SGD(learning_rate=0.001))
train_func=train_program, place=place, optimizer_func=optimizer_func)
def event_handler(event):
if isinstance(event, fluid.EndStepEvent):
......
......@@ -85,6 +85,10 @@ def train_network():
return [avg_cost, accuracy]
def optimizer_func():
return fluid.optimizer.Adam(learning_rate=0.001)
def train(use_cuda, train_program, params_dirname):
BATCH_SIZE = 128
EPOCH_NUM = 1
......@@ -111,9 +115,7 @@ def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
trainer = fluid.Trainer(
train_func=train_program,
optimizer=fluid.optimizer.Adam(learning_rate=0.001),
place=place)
train_func=train_program, optimizer_func=optimizer_func, place=place)
trainer.train(
reader=train_reader,
......
......@@ -64,6 +64,10 @@ def train_network():
return [avg_cost, accuracy]
def optimizer_func():
return fluid.optimizer.Adam(learning_rate=0.001)
def train(use_cuda, train_program, params_dirname):
BATCH_SIZE = 128
train_reader = paddle.batch(
......@@ -88,9 +92,7 @@ def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
trainer = fluid.Trainer(
train_func=train_program,
place=place,
optimizer=fluid.optimizer.Adam(learning_rate=0.001))
train_func=train_program, place=place, optimizer_func=optimizer_func)
trainer.train(
reader=train_reader,
......
......@@ -141,12 +141,16 @@ def train_program():
return [avg_cost]
def optimize_func():
return fluid.optimizer.SGD(learning_rate=fluid.layers.exponential_decay(
learning_rate=0.01, decay_steps=100000, decay_rate=0.5, staircase=True))
def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
trainer = fluid.Trainer(
train_func=train_program, place=place, optimizer=optimizer)
train_func=train_program, place=place, optimizer_func=optimize_func)
feed_order = [
'word_data', 'ctx_n2_data', 'ctx_n1_data', 'ctx_0_data', 'ctx_p1_data',
......@@ -245,7 +249,7 @@ def infer(use_cuda, inference_program, params_dirname):
},
return_numpy=False)
print("infer results: ", np.array(results[0]))
print("infer results: ", np.array(results[0]).shape)
def main(use_cuda):
......
......@@ -158,6 +158,13 @@ def train_program(is_sparse):
return avg_cost
def optimizer_func():
return fluid.optimizer.Adagrad(
learning_rate=1e-4,
regularization=fluid.regularizer.L2DecayRegularizer(
regularization_coeff=0.1))
def train(use_cuda, is_sparse, is_local=True):
EPOCH_NUM = 1
......@@ -182,11 +189,8 @@ def train(use_cuda, is_sparse, is_local=True):
trainer = fluid.Trainer(
train_func=partial(train_program, is_sparse),
optimizer=fluid.optimizer.Adagrad(
learning_rate=1e-4,
regularization=fluid.regularizer.L2DecayRegularizer(
regularization_coeff=0.1)),
place=place)
place=place,
optimizer_func=optimizer_func)
trainer.train(
reader=train_reader,
......
......@@ -57,14 +57,17 @@ def train_program():
return [avg_cost, acc]
def optimizer_func():
return fluid.optimizer.Adam(learning_rate=0.001)
def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
trainer = fluid.Trainer(
train_func=train_program,
place=place,
optimizer=optimizer,
optimizer_func=optimizer_func,
parallel=True)
def event_handler(event):
......
......@@ -44,12 +44,15 @@ def train_program():
return [avg_cost, acc]
def optimizer_func():
return fluid.optimizer.Adam(learning_rate=0.001)
def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
trainer = fluid.Trainer(
train_func=train_program, place=place, optimizer=optimizer)
train_func=train_program, place=place, optimizer_func=optimizer_func)
def event_handler(event):
if isinstance(event, fluid.EndEpochEvent):
......
......@@ -155,12 +155,15 @@ def train_program():
return [avg_cost, scale_infer]
def optimizer_func():
return fluid.optimizer.SGD(learning_rate=0.2)
def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.SGD(learning_rate=0.2)
trainer = fluid.Trainer(
train_func=train_program, place=place, optimizer=optimizer)
train_func=train_program, place=place, optimizer_func=optimizer_func)
feed_order = [
'user_id', 'gender_id', 'age_id', 'job_id', 'movie_id', 'category_id',
......
......@@ -64,15 +64,18 @@ def train_program(word_dict):
return [avg_cost, accuracy]
def optimizer_func():
return fluid.optimizer.Adagrad(learning_rate=0.002)
def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.Adagrad(learning_rate=0.002)
word_dict = paddle.dataset.imdb.word_dict()
trainer = fluid.Trainer(
train_func=partial(train_program, word_dict),
place=place,
optimizer=optimizer)
optimizer_func=optimizer_func)
def event_handler(event):
if isinstance(event, fluid.EndEpochEvent):
......
......@@ -79,15 +79,18 @@ def train_program(word_dict):
return [avg_cost, accuracy]
def optimizer_func():
return fluid.optimizer.Adagrad(learning_rate=0.002)
def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.Adagrad(learning_rate=0.002)
word_dict = paddle.dataset.imdb.word_dict()
trainer = fluid.Trainer(
train_func=partial(train_program, word_dict),
place=place,
optimizer=optimizer)
optimizer_func=optimizer_func)
def event_handler(event):
if isinstance(event, fluid.EndEpochEvent):
......
......@@ -71,15 +71,18 @@ def train_program(word_dict):
return [avg_cost, accuracy]
def optimizer_func():
return fluid.optimizer.Adagrad(learning_rate=0.002)
def train(use_cuda, train_program, params_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
optimizer = fluid.optimizer.Adagrad(learning_rate=0.002)
word_dict = paddle.dataset.imdb.word_dict()
trainer = fluid.Trainer(
train_func=partial(train_program, word_dict),
place=place,
optimizer=optimizer)
optimizer_func=optimizer_func)
def event_handler(event):
if isinstance(event, fluid.EndEpochEvent):
......
......@@ -80,6 +80,10 @@ def train_program(is_sparse):
return avg_cost
def optimizer_func():
return fluid.optimizer.SGD(learning_rate=0.001)
def train(use_cuda, train_program, params_dirname):
train_reader = paddle.batch(
paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE)
......@@ -104,9 +108,7 @@ def train(use_cuda, train_program, params_dirname):
sys.exit("got NaN loss, training failed.")
trainer = fluid.Trainer(
train_func=train_program,
optimizer=fluid.optimizer.SGD(learning_rate=0.001),
place=place)
train_func=train_program, optimizer_func=optimizer_func, place=place)
trainer.train(
reader=train_reader,
......
......@@ -80,21 +80,6 @@ def encoder_decoder():
return rnn()
def to_lodtensor(data, place):
seq_lens = [len(seq) for seq in data]
cur_len = 0
lod = [cur_len]
for l in seq_lens:
cur_len += l
lod.append(cur_len)
flattened_data = np.concatenate(data, axis=0).astype("int64")
flattened_data = flattened_data.reshape([len(flattened_data), 1])
res = core.LoDTensor()
res.set(flattened_data, place)
res.set_lod([lod])
return res
def main():
rnn_out = encoder_decoder()
label = layers.data(
......@@ -122,18 +107,21 @@ def main():
exe.run(framework.default_startup_program())
feed_order = [
'src_word_id', 'target_language_word', 'target_language_next_word'
]
feed_list = [
fluid.default_main_program().global_block().var(var_name)
for var_name in feed_order
]
feeder = fluid.DataFeeder(feed_list, place)
batch_id = 0
for pass_id in xrange(10):
for data in train_data():
word_data = to_lodtensor(map(lambda x: x[0], data), place)
trg_word = to_lodtensor(map(lambda x: x[1], data), place)
trg_word_next = to_lodtensor(map(lambda x: x[2], data), place)
outs = exe.run(fluid.default_main_program(),
feed={
'src_word_id': word_data,
'target_language_word': trg_word,
'target_language_next_word': trg_word_next
},
feed=feeder.feed(data),
fetch_list=[avg_cost])
avg_cost_val = np.array(outs[0])
print('pass_id=' + str(pass_id) + ' batch=' + str(batch_id) +
......
......@@ -48,5 +48,7 @@ foreach(TEST_OP ${TEST_OPS})
endforeach(TEST_OP)
py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=${WARPCTC_LIB_DIR} SERIAL)
py_test_modules(test_dist_train MODULES test_dist_train SERIAL)
# tests that need to be done in fixed timeout
set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20)
# FIXME(Yancey1989): this test would cost much more time on CUDAPlace
# since load cudnn libraries, so we use a longer timeout to make this
# unit test stability.
set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 30)
......@@ -379,6 +379,14 @@ class TestBook(unittest.TestCase):
self.assertIsNotNone(output)
print(str(program))
def test_polygon_box_transform(self):
program = Program()
with program_guard(program):
x = layers.data(name='x', shape=[8, 4, 4], dtype="float32")
output = layers.polygon_box_transform(input=x)
self.assertIsNotNone(output)
print(str(program))
if __name__ == '__main__':
unittest.main()
......@@ -23,7 +23,7 @@ from multiprocessing import Process
from op_test import OpTest
def run_pserver(use_cuda, sync_mode, ip, port, trainer_count, trainer_id):
def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None)
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
......@@ -39,15 +39,8 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainer_count, trainer_id):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
port = os.getenv("PADDLE_INIT_PORT", port)
pserver_ips = os.getenv("PADDLE_INIT_PSERVERS", ip) # ip,ip...
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist) # ip:port,ip:port...
trainers = int(os.getenv("TRAINERS", trainer_count))
current_endpoint = os.getenv("POD_IP", ip) + ":" + port
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID", trainer_id))
pserver_endpoints = ip + ":" + port
current_endpoint = ip + ":" + port
t = fluid.DistributeTranspiler()
t.transpile(
trainer_id,
......@@ -62,47 +55,51 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainer_count, trainer_id):
class TestListenAndServOp(OpTest):
def setUp(self):
self.sleep_time = 5
self.ps_timeout = 5
self.ip = "127.0.0.1"
self.port = "6173"
self.trainer_count = 1
self.trainers = 1
self.trainer_id = 1
def _raise_signal(self, parent_pid, raised_signal):
time.sleep(self.sleep_time)
ps_command = subprocess.Popen(
"ps -o pid --ppid %d --noheaders" % parent_pid,
shell=True,
stdout=subprocess.PIPE)
ps_output = ps_command.stdout.read()
retcode = ps_command.wait()
assert retcode == 0, "ps command returned %d" % retcode
for pid_str in ps_output.split("\n")[:-1]:
try:
os.kill(int(pid_str), raised_signal)
except Exception:
continue
def _start_pserver(self, use_cuda, sync_mode):
p = Process(
target=run_pserver,
args=(use_cuda, sync_mode, self.ip, self.port, self.trainer_count,
args=(use_cuda, sync_mode, self.ip, self.port, self.trainers,
self.trainer_id))
p.start()
return p.pid
def _wait_ps_ready(self, pid):
retry_times = self.ps_timeout
while True:
assert retry_times >= 0, "wait ps ready failed"
time.sleep(0.5)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error:
retry_times -= 1
def test_rpc_interfaces(self):
# TODO(Yancey1989): need to make sure the rpc interface correctly.
pass
def test_handle_signal_in_serv_op(self):
# run pserver on CPU in sync mode
self._start_pserver(False, True)
pid = self._start_pserver(False, True)
self._wait_ps_ready(pid)
# raise SIGINT to pserver
self._raise_signal(os.getpid(), signal.SIGINT)
# raise SIGTERM to pserver
os.kill(pid, signal.SIGTERM)
# run pserver on CPU in async mode
self._start_pserver(False, False)
pid = self._start_pserver(False, False)
self._wait_ps_ready(pid)
# raise SIGTERM to pserver
self._raise_signal(os.getpid(), signal.SIGTERM)
os.kill(pid, signal.SIGTERM)
if __name__ == '__main__':
......
......@@ -90,13 +90,13 @@ class Trainer(object):
Args:
train_func(callable): A function which will return loss. The loss must be a scalar.
optimizer(optimizer.Optimizer): The optimizer should be an instance of Optimizer
optimizer_func(callable): A function that returns an Optimizer object.
place: The device place of this trainer.
"""
def __init__(self,
train_func,
optimizer,
optimizer_func,
param_path=None,
place=None,
parallel=False):
......@@ -105,8 +105,6 @@ class Trainer(object):
# 1. we need to generate a framework.Program by calling
# program_func. Reference: fluid.program_guard in
# test_word2vec.py
if not isinstance(optimizer, opt_module.Optimizer):
raise TypeError("The optimizer should be an instance of Optimizer")
self.scope = core.Scope()
......@@ -118,11 +116,14 @@ class Trainer(object):
self.train_func_outputs = program_func_outs if isinstance(
program_func_outs, list) else [program_func_outs]
self.test_program = self.train_program.clone()
# The fisrt element of program_func_outs is loss.
loss = self.train_func_outputs[0]
optimizer = optimizer_func()
if not isinstance(optimizer, opt_module.Optimizer):
raise TypeError(
"The optimizer should be an instance of Optimizer")
# The fisrt element of program_func_outs is loss.
loss = self.train_func_outputs[0]
optimize_ops, params_grads = optimizer.minimize(loss)
self.place = check_and_get_place(place)
......
......@@ -187,12 +187,17 @@ class DistributeTranspiler:
param_list = []
grad_list = []
param_grad_set = set()
for p, g in self.params_grads:
# skip parameter marked not trainable
if type(p) == Parameter and p.trainable == False:
continue
if p.name not in param_grad_set:
param_list.append(p)
param_grad_set.add(p.name)
if g.name not in param_grad_set:
grad_list.append(g)
param_grad_set.add(g.name)
self._update_dist_lookup_table_vars(param_list, grad_list,
self.params_grads)
......@@ -829,6 +834,9 @@ class DistributeTranspiler:
if not block_map.has_key(varname):
block_map[varname] = []
block_map[varname].append((long(offset), long(size)))
# Do not remove this important debug message:
print("block map: %s" % block_map)
for varname, splited in block_map.iteritems():
orig_var = program.global_block().var(varname)
if len(splited) == 1:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册