diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 3c36cffcb4eeaaf7f8cff5167777628dd2697e7d..b1b02bcc2f4fd14297715bcf5bfd1617e3d5f0c9 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -58,6 +58,8 @@ PaddlePaddle uses this [Git branching model](http://nvie.com/posts/a-successful-
create mode 100644 233
```
+ NOTE: The `yapf` installed by `pip install pre-commit` and `conda install -c conda-forge pre-commit` is slightly different. Paddle developers use `pip install pre-commit`.
+
1. Build and test
Users can build PaddlePaddle natively on Linux and Mac OS X. But to unify the building environment and to make it easy for debugging, the recommended way is [using Docker](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/dev/build_en.md).
diff --git a/Dockerfile b/Dockerfile
index 80a96983ec1ca6b9ec440f7e95de6c328eb1ed40..4d6165b79a1d94b8f27d7f3ee1b6e2cee5992d31 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -29,7 +29,7 @@ RUN apt-get update && \
wget unzip unrar tar xz-utils bzip2 gzip coreutils ntp \
curl sed grep graphviz libjpeg-dev zlib1g-dev \
python-matplotlib gcc-4.8 g++-4.8 \
- automake locales clang-format swig doxygen cmake \
+ automake locales clang-format swig cmake \
liblapack-dev liblapacke-dev \
clang-3.8 llvm-3.8 libclang-3.8-dev \
net-tools libtool ccache && \
diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py
index 30b070e4acac60caa97a4e8ffd07462cb347ee93..c1d458970a58bfac2a3369e8964eb100568b28f2 100644
--- a/benchmark/fluid/fluid_benchmark.py
+++ b/benchmark/fluid/fluid_benchmark.py
@@ -98,6 +98,8 @@ def parse_args():
'--use_fake_data',
action='store_true',
help='If set ommit the actual read data operators.')
+ parser.add_argument(
+ '--profile', action='store_true', help='If set, profile a few steps.')
parser.add_argument(
'--update_method',
type=str,
@@ -108,8 +110,8 @@ def parse_args():
return args
-def append_nccl2_prepare():
- if os.getenv("PADDLE_TRAINER_ID", None) != None:
+def append_nccl2_prepare(trainer_id):
+ if trainer_id >= 0:
# append gen_nccl_id at the end of startup program
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
port = os.getenv("PADDLE_PSERVER_PORT")
@@ -136,12 +138,12 @@ def append_nccl2_prepare():
})
return nccl_id_var, num_trainers, trainer_id
else:
- raise Exception(
- "must set PADDLE_TRAINER_ID env variables for dist train.")
+ raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
+ "nccl-based dist train.")
-def dist_transpile():
- if "PADDLE_TRAINING_ROLE" not in os.environ:
+def dist_transpile(trainer_id):
+ if trainer_id < 0:
return None, None
# the port of all pservers, needed by both trainer and pserver
@@ -158,9 +160,6 @@ def dist_transpile():
trainers = int(os.getenv("PADDLE_TRAINERS"))
# the IP of the local machine, needed by pserver only
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
- # the unique trainer id, starting from 0, needed by trainer
- # only
- trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
# the role, should be either PSERVER or TRAINER
training_role = os.getenv("PADDLE_TRAINING_ROLE")
@@ -295,6 +294,11 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
iters = 0
start_time = time.time()
for batch_id, data in enumerate(train_reader()):
+ if args.profile and pass_id == 0 and batch_id == 5:
+ profiler.start_profiler("All")
+ elif args.profile and pass_id == 0 and batch_id == 10:
+ profiler.stop_profiler("total", "/tmp/profile_%d" % trainer_id)
+
if iters == args.skip_batch_num:
start_time = time.time()
num_samples = 0
@@ -334,7 +338,11 @@ def print_arguments(args):
def main():
args = parse_args()
print_arguments(args)
- nccl_id_var, num_trainers, trainer_id = None, 1, 0
+
+ # the unique trainer id, starting from 0, needed by trainer
+ # only
+ nccl_id_var, num_trainers, trainer_id = (
+ None, 1, int(os.getenv("PADDLE_TRAINER_ID", "-1")))
if args.use_cprof:
pr = cProfile.Profile()
@@ -348,7 +356,7 @@ def main():
fluid.memory_optimize(fluid.default_main_program())
if args.update_method == "pserver":
- train_prog, startup_prog = dist_transpile()
+ train_prog, startup_prog = dist_transpile(trainer_id)
if not train_prog:
raise Exception(
"Must configure correct environments to run dist train.")
@@ -364,7 +372,7 @@ def main():
train_args.append(fluid.default_startup_program())
if args.update_method == "nccl2":
- nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare()
+ nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(trainer_id)
if args.gpus == 1:
# NOTE: parallel executor use profiler interanlly
if args.use_nvprof and args.device == 'GPU':
diff --git a/benchmark/fluid/kube_gen_job.py b/benchmark/fluid/kube_gen_job.py
index 39ba207fd96f71563504017e77dc0e87c249b3f8..9da8a69af1d7b671b2648b1b3702776c1c0650b0 100644
--- a/benchmark/fluid/kube_gen_job.py
+++ b/benchmark/fluid/kube_gen_job.py
@@ -49,7 +49,7 @@ def parse_args():
parser.add_argument(
'--fluid', default=1, type=int, help='whether is fluid job')
parser.add_argument(
- '--rdma', action='store_ture', help='whether mount rdma libs')
+ '--rdma', action='store_true', help='whether mount rdma libs')
parser.add_argument(
'--disttype',
default="pserver",
diff --git a/doc/fluid/getstarted/Developer's_Guide_to_Paddle_Fluid.md b/doc/fluid/getstarted/Developer's_Guide_to_Paddle_Fluid.md
index 0c0156c8e46378e7bbeea8072938b8ccfb9ab6d7..79df6c59578e2acf495a3453ab61f069c3f09a49 100644
--- a/doc/fluid/getstarted/Developer's_Guide_to_Paddle_Fluid.md
+++ b/doc/fluid/getstarted/Developer's_Guide_to_Paddle_Fluid.md
@@ -86,7 +86,7 @@
-
+
---
@@ -123,12 +123,12 @@
- 在科学计算领域,计算图是一种描述计算的经典方式。下图展示了从前向计算图(蓝色)开始,通过添加反向(红色)和优化算法相关(绿色)操作,构建出整个计算图的过程:
--
+-
-
+
- Fluid ==使用`Program`而不是计算图==来描述模型和优化过程。`Program`由`Block`、`Operator`和`Variable`构成,相关概念会在后文详细展开。
- 编译时 Fluid 接受前向计算(这里可以先简单的理解为是一段有序的计算流)`Program`,为这段前向计算按照:前向 -> 反向 -> 梯度 clip -> 正则 -> 优化 的顺序,添加相关 `Operator`和`Variable`到`Program`到完整的计算。
@@ -328,7 +328,7 @@
----
+---
### 编译时概念 :==**[Transpiler](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/fluid/design/motivation/fluid_compiler.md)**==
@@ -402,7 +402,7 @@
- `Scope`
- 计算相关
- - `Block`
+ - `Block`
- `Kernel`、`OpWithKernel`、`OpWithoutKernel`
-- 执行相关 :`Executor`
+- 执行相关 :`Executor`
@@ -798,7 +798,7 @@ class GPUAllocator : public SystemAllocator {
- step 1:添加Place类型,由用户实现添加到框架
- 可以将Place类型理解为一个整数加上一个枚举型,包括:设备号 + 设备类型
-
+
@@ -824,7 +824,7 @@ class GPUAllocator : public SystemAllocator {
1. DataType 执行数据类型 FP32/FP64/INT32/INT64
1. Memory layout: 运行时 Tensor 在内存中的排布格式 NCHW、 NHWC
1. 使用的库
-
+
来区分Kernel,为同一个operator注册多个 Kernel。
```cpp
@@ -876,7 +876,7 @@ step 3: 运行时的 KernelType 推断和Kernel切换,
---
@@ -1107,7 +1107,7 @@ void Run(const framework::Scope &scope,
-
+
|
@@ -1127,13 +1127,13 @@ void Run(const framework::Scope &scope,
- 设计概览
- - 重构概览 [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/refactorization.md)
- - fluid [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/fluid.md)
+ - 重构概览 [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/refactorization.md)
+ - fluid [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/fluid.md)
- fluid_compiler [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/fluid/design/motivation/fluid_compiler.md)
- 核心概念
- variable 描述 [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/var_desc.md)
- Tensor [->](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/tensor.md)
- - LoDTensor [->](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/lod_tensor.md)
+ - LoDTensor [->](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/lod_tensor.md)
- TensorArray [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/tensor_array.md)
- Program [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/program.md)
- Block [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/block.md)
@@ -1152,7 +1152,7 @@ void Run(const framework::Scope &scope,
- 支持新设硬件设备库 [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/support_new_device.md)
- 添加新的Operator [->](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/dev/new_op_cn.md)
- 添加新的Kernel [->](
-https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/dev/new_op_kernel_en.md)
+https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/dev/new_op_kernel_en.md)
@@ -1167,10 +1167,10 @@ https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/dev/new_op_kernel_
Docker编译PaddlePaddle源码: [->](http://www.paddlepaddle.org/docs/develop/documentation/fluid/zh/build_and_install/docker_install_cn.html)
-
+
PaddlePaddle 在 Dockerhub 地址:[->](
https://hub.docker.com/r/paddlepaddle/paddle/tags/)
-
+
1. 获取PaddlePaddle的Docker镜像
```bash
docker pull paddlepaddle/paddle:latest-dev
@@ -1183,7 +1183,7 @@ PaddlePaddle 在 Dockerhub 地址:[->](
```
1. 进入docker container后,从源码编译,请参考文档 [->]( http://www.paddlepaddle.org/docs/develop/documentation/fluid/zh/build_and_install/build_from_source_cn.html)
-
+
---
@@ -1196,7 +1196,7 @@ PaddlePaddle 在 Dockerhub 地址:[->](
1. 开发推荐使用tag为`latest-dev`的镜像,其中打包了所有编译依赖。`latest`及`lastest-gpu`是production镜像,主要用于运行PaddlePaddle程序。
2. 在Docker中运行GPU程序,推荐使用nvidia-docker,[否则需要将CUDA库和设备挂载到Docker容器内](http://www.paddlepaddle.org/docs/develop/documentation/fluid/zh/build_and_install/docker_install_cn.html)。
-
+
```bash
nvidia-docker run -it -v $PWD/Paddle:/paddle paddlepaddle/paddle:latest-dev /bin/bash
```
@@ -1353,9 +1353,9 @@ Op注册实现在`.cc`文件;Kernel注册CPU实现在`.cc`文件中,CUDA实
}
};
```
-
+
-
+
---
###### 实现带Kernel的Operator step2: 定义Operator类
@@ -1420,11 +1420,11 @@ class ClipOp : public framework::OperatorWithKernel {
2. override InferShape函数(参考 [clip_op](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/operators/clip_op.cc#L24))
1. 什么是`functor` ?
-
+
- 类或结构体仅重载了`()`,一般是可被多个kernel复用的计算函数。
-
+
```cpp
template
class CrossEntropyFunctor {
@@ -1438,9 +1438,9 @@ class ClipOp : public framework::OperatorWithKernel {
};
```
-
+
- 在 clip_op 内也会看到将一段计算函数抽象为functor的使用法: [->](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/operators/clip_op.h#L27)。
-
+
---
@@ -1504,7 +1504,7 @@ class ClipKernel : public framework::OpKernel {
- 需要注意,Fluid中,不区分Cost Op和中间层Op,所有Op都必须正确处理接收到的梯度
2. 反向Op的输出
- 对可学习参数的求导结果
- - 对所有输入的求导结果
+ - 对所有输入的求导结果
@@ -1520,7 +1520,7 @@ class ClipKernel : public framework::OpKernel {
1. 在`.cc`文件中注册前向、反向Op类,注册CPU Kernel。
-
+
```cpp
namespace ops = paddle::operators;
REGISTER_OP(clip, ops::ClipOp, ops::ClipOpMaker, clip_grad,
@@ -1530,13 +1530,13 @@ class ClipKernel : public framework::OpKernel {
REGISTER_OP_CPU_KERNEL(
clip_grad, ops::ClipGradKernel);
```
-
+
- 在上面的代码片段中:
1. `REGISTER_OP` : 注册`ops::ClipOp`类,类型名为`clip`,该类的`ProtoMaker`为`ops::ClipOpMaker`,注册`ops::ClipOpGrad`,类型名为`clip_grad`
1. `REGISTER_OP_WITHOUT_GRADIENT` : 用于注册没有反向的Op,例如:优化算法相关的Op
1. `REGISTER_OP_CPU_KERNEL` :注册`ops::ClipKernel`类,并特化模板参数为`paddle::platform::CPUPlace`和`float`类型,同理,注册`ops::ClipGradKernel`类
-
+
1. 按照同样方法,在`.cu`文件中注册GPU Kernel
- 如果CUDA Kernel的实现基于Eigen,需在 `.cu`的开始加上宏定义 `#define EIGEN_USE_GPU`
@@ -1593,7 +1593,7 @@ class ClipKernel : public framework::OpKernel {
```bash
make test ARGS="-R test_mul_op -V"
```
-
+
或者:
```
@@ -1613,7 +1613,7 @@ class ClipKernel : public framework::OpKernel {
- 如果多个Op依赖一些共用的函数,可以创建非`*_op.*`格式的文件来存放,如`gather.h`文件。
-
+
---
### ==10.== 使用相关问题
@@ -1735,7 +1735,7 @@ class ClipKernel : public framework::OpKernel {
y_data = np.random.randint(0, 8, [1]).astype("int32")
y_tensor = core.Tensor()
y_tensor.set(y_data, place)
-
+
x_data = np.random.uniform(0.1, 1, [11, 8]).astype("float32")
x_tensor = core.Tensor()
x_tensor.set(x_data, place)
diff --git a/doc/fluid/getstarted/index_cn.rst b/doc/fluid/getstarted/index_cn.rst
index 75af7354be93a6eeabfa9ccf86903505402a7ca6..3daea71d0933a2774227ff2b5e744392ca6b1765 100644
--- a/doc/fluid/getstarted/index_cn.rst
+++ b/doc/fluid/getstarted/index_cn.rst
@@ -17,3 +17,4 @@
:maxdepth: 1
concepts/use_concepts_cn.rst
+ developer's_guide_to_paddle_fluid.md
diff --git a/doc/fluid/getstarted/index_en.rst b/doc/fluid/getstarted/index_en.rst
index 75a43f4af87c34830ec940068196e6ca72640501..fb20bb4f245281c3acf67c417979dc63c144fef3 100644
--- a/doc/fluid/getstarted/index_en.rst
+++ b/doc/fluid/getstarted/index_en.rst
@@ -16,3 +16,4 @@ Here is an example of linear regression. It introduces workflow of PaddlePaddle,
:maxdepth: 1
concepts/index_en.rst
+ developer's_guide_to_paddle_fluid.md
diff --git a/doc/fluid/getstarted/quickstart_cn.rst b/doc/fluid/getstarted/quickstart_cn.rst
index 135beb75d0330f39d062753aa2aa83a077f36bb1..6a964d4f8561f30aa10936d2399698c51583442c 100644
--- a/doc/fluid/getstarted/quickstart_cn.rst
+++ b/doc/fluid/getstarted/quickstart_cn.rst
@@ -11,7 +11,7 @@ PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14.
pip install paddlepaddle
-如果需要安装支持GPU的版本(cuda7.5_cudnn5_avx_openblas),需要执行:
+如果需要安装支持GPU的版本(cuda8.0_cudnn5_avx_openblas),需要执行:
.. code-block:: bash
@@ -28,18 +28,18 @@ PaddlePaddle支持使用pip快速安装,目前支持CentOS 6以上, Ubuntu 14.
import paddle.dataset.uci_housing as uci_housing
import paddle.fluid as fluid
-
+
with fluid.scope_guard(fluid.core.Scope()):
# initialize executor with cpu
exe = fluid.Executor(place=fluid.CPUPlace())
- # load inference model
+ # load inference model
[inference_program, feed_target_names,fetch_targets] = \
fluid.io.load_inference_model(uci_housing.fluid_model(), exe)
# run inference
- result = exe.run(inference_program,
- feed={feed_target_names[0]: uci_housing.predict_reader()},
+ result = exe.run(inference_program,
+ feed={feed_target_names[0]: uci_housing.predict_reader()},
fetch_list=fetch_targets)
- # print predicted price is $12,273.97
+ # print predicted price is $12,273.97
print 'Predicted price: ${:,.2f}'.format(result[0][0][0] * 1000)
执行 :code:`python housing.py` 瞧! 它应该打印出预测住房数据的清单。
diff --git a/doc/fluid/getstarted/quickstart_en.rst b/doc/fluid/getstarted/quickstart_en.rst
index df6619cfd039fc1fdca8cde57db9cc6aebf8f029..680122f25893a5a48fac103266bda4788f891f6d 100644
--- a/doc/fluid/getstarted/quickstart_en.rst
+++ b/doc/fluid/getstarted/quickstart_en.rst
@@ -12,7 +12,7 @@ Simply run the following command to install, the version is cpu_avx_openblas:
pip install paddlepaddle
-If you need to install GPU version (cuda7.5_cudnn5_avx_openblas), run:
+If you need to install GPU version (cuda8.0_cudnn5_avx_openblas), run:
.. code-block:: bash
@@ -31,18 +31,18 @@ code:
import paddle.dataset.uci_housing as uci_housing
import paddle.fluid as fluid
-
+
with fluid.scope_guard(fluid.core.Scope()):
# initialize executor with cpu
exe = fluid.Executor(place=fluid.CPUPlace())
- # load inference model
+ # load inference model
[inference_program, feed_target_names,fetch_targets] = \
fluid.io.load_inference_model(uci_housing.fluid_model(), exe)
# run inference
- result = exe.run(inference_program,
- feed={feed_target_names[0]: uci_housing.predict_reader()},
+ result = exe.run(inference_program,
+ feed={feed_target_names[0]: uci_housing.predict_reader()},
fetch_list=fetch_targets)
- # print predicted price is $12,273.97
+ # print predicted price is $12,273.97
print 'Predicted price: ${:,.2f}'.format(result[0][0][0] * 1000)
Run :code:`python housing.py` and voila! It should print out a list of predictions
diff --git a/doc/v2/dev/contribute_to_paddle_cn.md b/doc/v2/dev/contribute_to_paddle_cn.md
index d8bf093e09b53b302225739fa67146adc7976e4b..add06e42f1bbd221b48eb83e4e84d4a7c89e7483 100644
--- a/doc/v2/dev/contribute_to_paddle_cn.md
+++ b/doc/v2/dev/contribute_to_paddle_cn.md
@@ -51,6 +51,8 @@ Paddle 开发人员使用 [pre-commit](http://pre-commit.com/) 工具来管理 G
Paddle 使用 `clang-format` 来调整 C/C++ 源代码格式,请确保 `clang-format` 版本在 3.8 以上。
+注:通过`pip install pre-commit`和`conda install -c conda-forge pre-commit`安装的`yapf`稍有不同的,Paddle 开发人员使用的是`pip install pre-commit`。
+
## 开始开发
在本例中,我删除了 README.md 中的一行,并创建了一个新文件。
diff --git a/paddle/contrib/inference/CMakeLists.txt b/paddle/contrib/inference/CMakeLists.txt
index a4fe10f708e5bb8b28e34b2d91b2254c346c467f..3beb93c4e7fd1ce4dd2131cb53cb6e89e0f10ebd 100644
--- a/paddle/contrib/inference/CMakeLists.txt
+++ b/paddle/contrib/inference/CMakeLists.txt
@@ -13,7 +13,11 @@
# limitations under the License.
#
-function(inference_api_test TARGET_NAME TEST_SRC DEP_TEST)
+if(APPLE)
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=pessimizing-move")
+endif(APPLE)
+
+function(inference_api_test TARGET_NAME TEST_SRC)
set(options "")
set(oneValueArgs "")
set(multiValueArgs ARGS)
@@ -32,8 +36,10 @@ function(inference_api_test TARGET_NAME TEST_SRC DEP_TEST)
string(REGEX REPLACE "^_$" "" arg "${arg}")
cc_test(${TARGET_NAME}
SRCS ${TEST_SRC}
- DEPS paddle_fluid_api paddle_inference_api paddle_inference_api_impl
+ DEPS paddle_fluid_api paddle_inference_api
ARGS --dirname=${PYTHON_TESTS_DIR}/book/)
+ # TODO(panyx0178): Figure out how to add word2vec and image_classification
+ # as deps.
# set_tests_properties(${TARGET_NAME}
# PROPERTIES DEPENDS ${DEP_TEST})
endforeach()
@@ -41,17 +47,12 @@ endfunction(inference_api_test)
cc_library(paddle_inference_api
- SRCS paddle_inference_api.cc
+ SRCS paddle_inference_api.cc paddle_inference_api_impl.cc
DEPS ${FLUID_CORE_MODULES} ${GLOB_OP_LIB})
-cc_library(paddle_inference_api_impl
- SRCS paddle_inference_api_impl.cc
- DEPS paddle_inference_api paddle_fluid_api)
-
cc_test(test_paddle_inference_api
SRCS test_paddle_inference_api.cc
DEPS paddle_inference_api)
inference_api_test(test_paddle_inference_api_impl
- test_paddle_inference_api_impl.cc
- test_word2vec)
+ test_paddle_inference_api_impl.cc)
diff --git a/paddle/contrib/inference/paddle_inference_api.h b/paddle/contrib/inference/paddle_inference_api.h
index f804d9b28697a6703d63d9a640c4ec337effaba6..b4c7f9bef4d2e83038ff223614a89e1b0493fc6f 100644
--- a/paddle/contrib/inference/paddle_inference_api.h
+++ b/paddle/contrib/inference/paddle_inference_api.h
@@ -45,10 +45,10 @@ struct PaddleTensor {
};
/*
-* A simple Inference API for Paddle. Currently this API might just be used by
-* non-sequence scenerios.
-* TODO(Superjomn) Prepare another API for NLP-related usages.
-*/
+ * A simple Inference API for Paddle. Currently this API can be used by
+ * non-sequence scenerios.
+ * TODO(Superjomn) Support another API for NLP-related usages.
+ */
class PaddlePredictor {
public:
struct Config;
@@ -66,34 +66,38 @@ class PaddlePredictor {
// be thread-safe.
virtual std::unique_ptr Clone() = 0;
- virtual bool InitShared() { return false; }
// Destroy the Predictor.
virtual ~PaddlePredictor() {}
- friend std::unique_ptr CreatePaddlePredictor(
- const PaddlePredictor::Config& config);
+ enum class EngineKind {
+ kNative = -1, // Use the native Fluid facility.
+ // TODO(Superjomn) support latter.
+ // kAnakin, // Use Anakin for inference.
+ // kTensorRT, // Use TensorRT for inference.
+ // kAutoMixedAnakin, // Automatically mix Fluid with Anakin.
+ // kAutoMixedTensorRT, // Automatically mix Fluid with TensorRT.
+ };
// The common configs for all the predictors.
struct Config {
- enum class EngineKind;
-
std::string model_dir; // path to the model directory.
bool enable_engine{false}; // Enable to execute (part of) the model on
- // third-party engines.
- EngineKind engine_kind{Config::EngineKind::kNone};
-
- enum class EngineKind {
- kNone = -1, // Use the native Fluid facility.
- kAnakin, // Use Anakin for inference.
- kTensorRT, // Use TensorRT for inference.
- kAutoMixedAnakin, // Automatically mix Fluid with Anakin.
- kAutoMixedTensorRT, // Automatically mix Fluid with TensorRT.
- };
};
};
+struct NativeConfig : public PaddlePredictor::Config {
+ bool use_gpu{false};
+ int device;
+ float fraction_of_gpu_memory;
+ std::string prog_file;
+ std::string param_file;
+ bool share_variables;
+};
+
// A factory to help create difference predictor.
-template
+template <
+ typename ConfigT,
+ PaddlePredictor::EngineKind engine = PaddlePredictor::EngineKind::kNative>
std::unique_ptr CreatePaddlePredictor(const ConfigT& config);
} // namespace paddle
diff --git a/paddle/contrib/inference/paddle_inference_api_impl.cc b/paddle/contrib/inference/paddle_inference_api_impl.cc
index e7a0b341dda1ca8d2ccfc0d6c12a7ac3d4c691d5..989252f69e42778dfd791cdee02c550f2aa78803 100644
--- a/paddle/contrib/inference/paddle_inference_api_impl.cc
+++ b/paddle/contrib/inference/paddle_inference_api_impl.cc
@@ -54,7 +54,7 @@ std::string num2str(T a) {
}
} // namespace
-bool PaddlePredictorImpl::Init() {
+bool NativePaddlePredictor::Init() {
VLOG(3) << "Predictor::init()";
// TODO(panyx0718): Should CPU vs GPU device be decided by id?
@@ -96,14 +96,14 @@ bool PaddlePredictorImpl::Init() {
return true;
}
-bool PaddlePredictorImpl::Run(const std::vector &inputs,
- std::vector *output_data) {
+bool NativePaddlePredictor::Run(const std::vector &inputs,
+ std::vector *output_data) {
VLOG(3) << "Predictor::predict";
Timer timer;
timer.tic();
// set feed variable
- std::map feed_targets;
- std::vector feeds;
+ std::map feed_targets;
+ std::vector feeds;
if (!SetFeed(inputs, &feeds)) {
LOG(ERROR) << "fail to set feed";
return false;
@@ -112,8 +112,8 @@ bool PaddlePredictorImpl::Run(const std::vector &inputs,
feed_targets[feed_target_names_[i]] = &feeds[i];
}
// get fetch variable
- std::map fetch_targets;
- std::vector fetchs;
+ std::map fetch_targets;
+ std::vector fetchs;
fetchs.resize(fetch_target_names_.size());
for (size_t i = 0; i < fetch_target_names_.size(); ++i) {
fetch_targets[fetch_target_names_[i]] = &fetchs[i];
@@ -133,76 +133,33 @@ bool PaddlePredictorImpl::Run(const std::vector &inputs,
return true;
}
-std::unique_ptr PaddlePredictorImpl::Clone() {
+std::unique_ptr NativePaddlePredictor::Clone() {
VLOG(3) << "Predictor::clone";
- std::unique_ptr cls(new PaddlePredictorImpl(config_));
- if (!cls->InitShared()) {
- LOG(ERROR) << "fail to call InitShared";
+ std::unique_ptr cls(new NativePaddlePredictor(config_));
+
+ if (!dynamic_cast(cls.get())->Init()) {
+ LOG(ERROR) << "fail to call Init";
return nullptr;
}
// fix manylinux compile error.
return std::move(cls);
}
-// TODO(panyx0718): Consider merge with Init()?
-bool PaddlePredictorImpl::InitShared() {
- VLOG(3) << "Predictor::init_shared";
- // 1. Define place, executor, scope
- if (this->config_.device >= 0) {
- place_ = paddle::platform::CUDAPlace();
- } else {
- place_ = paddle::platform::CPUPlace();
- }
- this->executor_.reset(new paddle::framework::Executor(this->place_));
- this->scope_.reset(new paddle::framework::Scope());
- // Initialize the inference program
- if (!this->config_.model_dir.empty()) {
- // Parameters are saved in separate files sited in
- // the specified `dirname`.
- this->inference_program_ = paddle::inference::Load(
- this->executor_.get(), this->scope_.get(), this->config_.model_dir);
- } else if (!this->config_.prog_file.empty() &&
- !this->config_.param_file.empty()) {
- // All parameters are saved in a single file.
- // The file names should be consistent with that used
- // in Python API `fluid.io.save_inference_model`.
- this->inference_program_ =
- paddle::inference::Load(this->executor_.get(),
- this->scope_.get(),
- this->config_.prog_file,
- this->config_.param_file);
- }
- this->ctx_ = this->executor_->Prepare(*this->inference_program_, 0);
- // 3. create variables
- // TODO(panyx0718): why test share_variables.
- if (config_.share_variables) {
- this->executor_->CreateVariables(
- *this->inference_program_, this->scope_.get(), 0);
- }
- // 4. Get the feed_target_names and fetch_target_names
- this->feed_target_names_ = this->inference_program_->GetFeedTargetNames();
- this->fetch_target_names_ = this->inference_program_->GetFetchTargetNames();
- return true;
-}
-
-bool PaddlePredictorImpl::SetFeed(
- const std::vector &inputs,
- std::vector *feeds) {
+bool NativePaddlePredictor::SetFeed(const std::vector &inputs,
+ std::vector *feeds) {
VLOG(3) << "Predictor::set_feed";
if (inputs.size() != feed_target_names_.size()) {
LOG(ERROR) << "wrong feed input size.";
return false;
}
for (size_t i = 0; i < feed_target_names_.size(); ++i) {
- paddle::framework::LoDTensor input;
- paddle::framework::DDim ddim =
- paddle::framework::make_ddim(inputs[i].shape);
+ framework::LoDTensor input;
+ framework::DDim ddim = framework::make_ddim(inputs[i].shape);
void *input_ptr;
if (inputs[i].dtype == PaddleDType::INT64) {
- input_ptr =
- input.mutable_data(ddim, paddle::platform::CPUPlace());
+ input_ptr = input.mutable_data(ddim, platform::CPUPlace());
} else if (inputs[i].dtype == PaddleDType::FLOAT32) {
- input_ptr = input.mutable_data(ddim, paddle::platform::CPUPlace());
+ input_ptr = input.mutable_data(ddim, platform::CPUPlace());
} else {
LOG(ERROR) << "unsupported feed type " << inputs[i].dtype;
return false;
@@ -213,13 +170,12 @@ bool PaddlePredictorImpl::SetFeed(
inputs[i].data.data,
inputs[i].data.length);
feeds->push_back(input);
- LOG(ERROR) << "Actual feed type " << feeds->back().type().name();
}
return true;
}
-bool PaddlePredictorImpl::GetFetch(
- const std::vector &fetchs,
+bool NativePaddlePredictor::GetFetch(
+ const std::vector &fetchs,
std::vector *outputs) {
VLOG(3) << "Predictor::get_fetch";
outputs->resize(fetchs.size());
@@ -284,27 +240,30 @@ bool PaddlePredictorImpl::GetFetch(
return true;
}
-std::unique_ptr CreatePaddlePredictorImpl(
- const VisConfig &config) {
- VLOG(3) << "create PaddlePredictorImpl";
- // 1. GPU memeroy
- std::vector flags;
- if (config.fraction_of_gpu_memory >= 0.0f ||
- config.fraction_of_gpu_memory <= 0.95f) {
- flags.push_back("dummpy");
- std::string flag = "--fraction_of_gpu_memory_to_use=" +
- num2str(config.fraction_of_gpu_memory);
- flags.push_back(flag);
- VLOG(3) << "set flag: " << flag;
- framework::InitGflags(flags);
+template <>
+std::unique_ptr
+CreatePaddlePredictor(
+ const NativeConfig &config) {
+ VLOG(3) << "create NativePaddlePredictor";
+ if (config.use_gpu) {
+ // 1. GPU memeroy
+ std::vector flags;
+ if (config.fraction_of_gpu_memory >= 0.0f ||
+ config.fraction_of_gpu_memory <= 0.95f) {
+ flags.push_back("dummpy");
+ std::string flag = "--fraction_of_gpu_memory_to_use=" +
+ num2str(config.fraction_of_gpu_memory);
+ flags.push_back(flag);
+ VLOG(3) << "set flag: " << flag;
+ framework::InitGflags(flags);
+ }
}
- std::unique_ptr predictor(
- new PaddlePredictorImpl(config));
- if (!predictor->Init()) {
+ std::unique_ptr predictor(new NativePaddlePredictor(config));
+ if (!dynamic_cast(predictor.get())->Init()) {
return nullptr;
}
- return predictor;
+ return std::move(predictor);
}
} // namespace paddle
diff --git a/paddle/contrib/inference/paddle_inference_api_impl.h b/paddle/contrib/inference/paddle_inference_api_impl.h
index a0c7ff030735fc1c6b9d717f8f9e4addc7e0c6b0..84707e223d7aa3d1ebca933923e932b3973613ae 100644
--- a/paddle/contrib/inference/paddle_inference_api_impl.h
+++ b/paddle/contrib/inference/paddle_inference_api_impl.h
@@ -29,20 +29,10 @@
namespace paddle {
-struct VisConfig : public PaddlePredictor::Config {
- int device;
- float fraction_of_gpu_memory;
- std::string prog_file;
- std::string param_file;
- bool share_variables;
-};
-
-/*
- * Do not use this, just a demo indicating how to customize a Predictor.
- */
-class PaddlePredictorImpl : public PaddlePredictor {
+class NativePaddlePredictor : public PaddlePredictor {
public:
- explicit PaddlePredictorImpl(const VisConfig &config) : config_(config) {}
+ explicit NativePaddlePredictor(const NativeConfig &config)
+ : config_(config) {}
bool Init();
@@ -51,26 +41,22 @@ class PaddlePredictorImpl : public PaddlePredictor {
std::unique_ptr Clone() override;
- ~PaddlePredictorImpl() override{};
+ ~NativePaddlePredictor() override{};
private:
- bool InitShared() override;
bool SetFeed(const std::vector &input_datas,
- std::vector *feeds);
- bool GetFetch(const std::vector &fetchs,
+ std::vector *feeds);
+ bool GetFetch(const std::vector &fetchs,
std::vector *output_data);
- VisConfig config_;
- paddle::platform::Place place_;
- std::unique_ptr executor_;
- std::unique_ptr scope_;
- std::unique_ptr ctx_;
- std::unique_ptr inference_program_;
+ NativeConfig config_;
+ platform::Place place_;
+ std::unique_ptr executor_;
+ std::unique_ptr scope_;
+ std::unique_ptr ctx_;
+ std::unique_ptr inference_program_;
std::vector feed_target_names_;
std::vector fetch_target_names_;
};
-std::unique_ptr CreatePaddlePredictorImpl(
- const VisConfig &config);
-
} // namespace paddle
diff --git a/paddle/contrib/inference/test_paddle_inference_api_impl.cc b/paddle/contrib/inference/test_paddle_inference_api_impl.cc
index 2a58f6989d5dad23b2f267adafde2cc105bf5651..5240fc2f20211ac5d38c57b71db31d04a6dc536a 100644
--- a/paddle/contrib/inference/test_paddle_inference_api_impl.cc
+++ b/paddle/contrib/inference/test_paddle_inference_api_impl.cc
@@ -40,16 +40,20 @@ PaddleTensor LodTensorToPaddleTensor(framework::LoDTensor* t) {
return pt;
}
-TEST(paddle_inference_api_impl, word2vec) {
- VisConfig config;
+NativeConfig GetConfig() {
+ NativeConfig config;
config.model_dir = FLAGS_dirname + "word2vec.inference.model";
LOG(INFO) << "dirname " << config.model_dir;
config.fraction_of_gpu_memory = 0.15;
+ config.use_gpu = true;
config.device = 0;
config.share_variables = true;
+ return config;
+}
- std::unique_ptr predictor =
- CreatePaddlePredictorImpl(config);
+TEST(paddle_inference_api_impl, word2vec) {
+ NativeConfig config = GetConfig();
+ auto predictor = CreatePaddlePredictor(config);
framework::LoDTensor first_word, second_word, third_word, fourth_word;
framework::LoD lod{{0, 1}};
@@ -60,24 +64,90 @@ TEST(paddle_inference_api_impl, word2vec) {
SetupLoDTensor(&third_word, lod, static_cast(0), dict_size - 1);
SetupLoDTensor(&fourth_word, lod, static_cast(0), dict_size - 1);
- std::vector cpu_feeds;
- cpu_feeds.push_back(LodTensorToPaddleTensor(&first_word));
- cpu_feeds.push_back(LodTensorToPaddleTensor(&second_word));
- cpu_feeds.push_back(LodTensorToPaddleTensor(&third_word));
- cpu_feeds.push_back(LodTensorToPaddleTensor(&fourth_word));
+ std::vector paddle_tensor_feeds;
+ paddle_tensor_feeds.push_back(LodTensorToPaddleTensor(&first_word));
+ paddle_tensor_feeds.push_back(LodTensorToPaddleTensor(&second_word));
+ paddle_tensor_feeds.push_back(LodTensorToPaddleTensor(&third_word));
+ paddle_tensor_feeds.push_back(LodTensorToPaddleTensor(&fourth_word));
+
+ std::vector outputs;
+ ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
+ ASSERT_EQ(outputs.size(), 1UL);
+ size_t len = outputs[0].data.length;
+ float* data = static_cast(outputs[0].data.data);
+ for (int j = 0; j < len / sizeof(float); ++j) {
+ ASSERT_LT(data[j], 1.0);
+ ASSERT_GT(data[j], -1.0);
+ }
+
+ std::vector cpu_feeds;
+ cpu_feeds.push_back(&first_word);
+ cpu_feeds.push_back(&second_word);
+ cpu_feeds.push_back(&third_word);
+ cpu_feeds.push_back(&fourth_word);
+
+ framework::LoDTensor output1;
+ std::vector cpu_fetchs1;
+ cpu_fetchs1.push_back(&output1);
+
+ TestInference(config.model_dir, cpu_feeds, cpu_fetchs1);
+
+ float* lod_data = output1.data();
+ for (size_t i = 0; i < output1.numel(); ++i) {
+ EXPECT_LT(lod_data[i] - data[i], 1e-3);
+ EXPECT_GT(lod_data[i] - data[i], -1e-3);
+ }
+
+ free(outputs[0].data.data);
+}
+
+TEST(paddle_inference_api_impl, image_classification) {
+ int batch_size = 2;
+ bool use_mkldnn = false;
+ bool repeat = false;
+ NativeConfig config = GetConfig();
+ config.model_dir =
+ FLAGS_dirname + "image_classification_resnet.inference.model";
+
+ const bool is_combined = false;
+ std::vector> feed_target_shapes =
+ GetFeedTargetShapes(config.model_dir, is_combined);
+
+ framework::LoDTensor input;
+ // Use normilized image pixels as input data,
+ // which should be in the range [0.0, 1.0].
+ feed_target_shapes[0][0] = batch_size;
+ framework::DDim input_dims = framework::make_ddim(feed_target_shapes[0]);
+ SetupTensor(
+ &input, input_dims, static_cast(0), static_cast(1));
+ std::vector cpu_feeds;
+ cpu_feeds.push_back(&input);
+
+ framework::LoDTensor output1;
+ std::vector cpu_fetchs1;
+ cpu_fetchs1.push_back(&output1);
+
+ TestInference(config.model_dir,
+ cpu_feeds,
+ cpu_fetchs1,
+ repeat,
+ is_combined,
+ use_mkldnn);
+
+ auto predictor = CreatePaddlePredictor(config);
+ std::vector paddle_tensor_feeds;
+ paddle_tensor_feeds.push_back(LodTensorToPaddleTensor(&input));
std::vector outputs;
- ASSERT_TRUE(predictor->Run(cpu_feeds, &outputs));
+ ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
ASSERT_EQ(outputs.size(), 1UL);
- for (size_t i = 0; i < outputs.size(); ++i) {
- size_t len = outputs[i].data.length;
- float* data = static_cast(outputs[i].data.data);
- for (size_t j = 0; j < len / sizeof(float); ++j) {
- ASSERT_LT(data[j], 1.0);
- ASSERT_GT(data[j], -1.0);
- }
- free(outputs[i].data.data);
+ size_t len = outputs[0].data.length;
+ float* data = static_cast(outputs[0].data.data);
+ float* lod_data = output1.data();
+ for (size_t j = 0; j < len / sizeof(float); ++j) {
+ EXPECT_NEAR(lod_data[j], data[j], 1e-3);
}
+ free(data);
}
} // namespace paddle
diff --git a/paddle/fluid/framework/operator.cc b/paddle/fluid/framework/operator.cc
index e3d2e5377eac49003b0082c39c9dd0460e2acd92..f87d5521492418d2daf5b7fba1500c4bb31e10f5 100644
--- a/paddle/fluid/framework/operator.cc
+++ b/paddle/fluid/framework/operator.cc
@@ -469,6 +469,7 @@ class RuntimeInferShapeContext : public InferShapeContext {
protected:
DDim GetDim(const std::string& name) const override {
Variable* var = scope_.FindVar(name);
+ PADDLE_ENFORCE_NOT_NULL(var);
if (var->IsType()) {
return var->Get().dims();
} else if (var->IsType()) {
diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc
index 76126f3dc64d71770d13f9d66bb30f176c112629..0b36f1116d15004b355e854e101abb9ad3297836 100644
--- a/paddle/fluid/framework/reader.cc
+++ b/paddle/fluid/framework/reader.cc
@@ -25,8 +25,10 @@ void FileReader::ReadNext(std::vector *out) {
if (out->empty()) {
return;
}
+
+ PADDLE_ENFORCE_EQ(out->size(), dims_.size());
for (size_t i = 0; i < dims_.size(); ++i) {
- auto &actual = out->at(i).dims();
+ auto &actual = (*out)[i].dims();
auto &expect = dims_[i];
PADDLE_ENFORCE_EQ(actual.size(), expect.size());
diff --git a/paddle/fluid/framework/selected_rows.cc b/paddle/fluid/framework/selected_rows.cc
index b4168f38949c7fcb057ec8c5c562d0529a6d9e48..06ed87e7e8a2d5324b48a466b05207042ec1b7fa 100644
--- a/paddle/fluid/framework/selected_rows.cc
+++ b/paddle/fluid/framework/selected_rows.cc
@@ -18,8 +18,8 @@ namespace paddle {
namespace framework {
struct ReAllocateVisitor {
- ReAllocateVisitor(framework::Tensor* tensor, const framework::DDim& dims)
- : tensor_(tensor), dims_(dims) {}
+ ReAllocateVisitor(const framework::DDim& dims, framework::Tensor* tensor)
+ : dims_(dims), tensor_(tensor) {}
template
void operator()() const {
@@ -34,8 +34,8 @@ struct ReAllocateVisitor {
tensor_->ShareDataWith(cpu_tensor);
}
- framework::Tensor* tensor_;
framework::DDim dims_;
+ framework::Tensor* tensor_;
};
struct TensorCopyVisitor {
@@ -158,6 +158,7 @@ bool SelectedRows::Set(int64_t key, const framework::Tensor& value) {
}
PADDLE_ENFORCE_EQ(value.dims()[0], static_cast(1),
"The first dim of value should be 1.");
+ std::lock_guard lock(*auto_grown_mutex_.get());
auto index = Index(key);
bool is_new_key = false;
if (index == -1) {
@@ -169,7 +170,7 @@ bool SelectedRows::Set(int64_t key, const framework::Tensor& value) {
auto dims = value_->dims();
dims[0] = (dims[0] + 1) << 1;
framework::VisitDataType(framework::ToDataType(value.type()),
- ReAllocateVisitor(value_.get(), dims));
+ ReAllocateVisitor(dims, value_.get()));
}
}
diff --git a/paddle/fluid/framework/selected_rows.h b/paddle/fluid/framework/selected_rows.h
index c80b05eed9b1c50325316057a8afc26d5d52e82c..7160670ddd204c20021ea87cdd67ee4721d03451 100644
--- a/paddle/fluid/framework/selected_rows.h
+++ b/paddle/fluid/framework/selected_rows.h
@@ -15,6 +15,8 @@ limitations under the License. */
#pragma once
#include
+#include
+#include // NOLINT
#include
#include
@@ -46,11 +48,13 @@ class SelectedRows {
SelectedRows(const std::vector& rows, const int64_t& height)
: rows_(rows), height_(height) {
value_.reset(new Tensor());
+ auto_grown_mutex_.reset(new std::mutex);
}
SelectedRows() {
height_ = 0;
value_.reset(new Tensor());
+ auto_grown_mutex_.reset(new std::mutex);
}
platform::Place place() const { return value_->place(); }
@@ -125,6 +129,7 @@ class SelectedRows {
Vector rows_;
std::unique_ptr value_{nullptr};
int64_t height_;
+ std::unique_ptr auto_grown_mutex_{nullptr};
};
/*
diff --git a/paddle/fluid/framework/tensor_impl.h b/paddle/fluid/framework/tensor_impl.h
index 0a1db7758bd9ec0dac133efcbf495de1d690021d..2f19ec0f0a9338e2b96d1f64eac45387bae4d1eb 100644
--- a/paddle/fluid/framework/tensor_impl.h
+++ b/paddle/fluid/framework/tensor_impl.h
@@ -39,7 +39,7 @@ template
inline const T* Tensor::data() const {
check_memory_size();
PADDLE_ENFORCE(std::is_same::value ||
- holder_->type().hash_code() == typeid(T).hash_code(),
+ holder_->type() == std::type_index(typeid(T)),
"Tensor holds the wrong type, it holds %s",
this->holder_->type().name());
@@ -53,7 +53,7 @@ template
inline T* Tensor::data() {
check_memory_size();
PADDLE_ENFORCE(std::is_same::value ||
- holder_->type().hash_code() == typeid(T).hash_code(),
+ holder_->type() == std::type_index(typeid(T)),
"Tensor holds the wrong type, it holds %s",
this->holder_->type().name());
return reinterpret_cast(reinterpret_cast(holder_->ptr()) +
diff --git a/paddle/fluid/inference/CMakeLists.txt b/paddle/fluid/inference/CMakeLists.txt
index cc4a725dfb3b3e7723a3a3a4008b20acdb53899d..ec16a1c600a3bafc1c4cbbd920360253c106e3a1 100644
--- a/paddle/fluid/inference/CMakeLists.txt
+++ b/paddle/fluid/inference/CMakeLists.txt
@@ -5,14 +5,19 @@ cc_library(paddle_fluid_api
SRCS io.cc
DEPS ${FLUID_CORE_MODULES} ${GLOB_OP_LIB})
-# Create static library
get_property(fluid_modules GLOBAL PROPERTY FLUID_MODULES)
-cc_library(paddle_fluid DEPS ${fluid_modules})
+if(WITH_CONTRIB)
+ set(fluid_modules "${fluid_modules}" paddle_inference_api)
+endif()
+
+# Create static library
+cc_library(paddle_fluid DEPS ${fluid_modules} paddle_fluid_api)
# Create shared library
cc_library(paddle_fluid_shared SHARED
SRCS io.cc
- DEPS ${fluid_modules})
+ DEPS ${fluid_modules} paddle_fluid_api)
+
set_target_properties(paddle_fluid_shared PROPERTIES OUTPUT_NAME paddle_fluid)
if(NOT APPLE)
# TODO(liuyiqun): Temporarily disable the link flag because it is not support on Mac.
diff --git a/paddle/fluid/inference/analysis/data_flow_graph.h b/paddle/fluid/inference/analysis/data_flow_graph.h
index 9f6ce40ede25248a4f779b379c132806a4ec06ba..913e344d371ddf3ea05a53c216e5b3bea8f11c7b 100644
--- a/paddle/fluid/inference/analysis/data_flow_graph.h
+++ b/paddle/fluid/inference/analysis/data_flow_graph.h
@@ -21,7 +21,10 @@ limitations under the License. */
#include
#include
+#include
#include
+#include
+#include
#include "paddle/fluid/inference/analysis/graph_traits.h"
#include "paddle/fluid/inference/analysis/node.h"
diff --git a/paddle/fluid/inference/analysis/data_flow_graph_to_fluid_pass_tester.cc b/paddle/fluid/inference/analysis/data_flow_graph_to_fluid_pass_tester.cc
index 60f159da9140516284449a0274906df004b23ac5..dcee75cee50ede1d2b660e88e06544440bd5ef77 100644
--- a/paddle/fluid/inference/analysis/data_flow_graph_to_fluid_pass_tester.cc
+++ b/paddle/fluid/inference/analysis/data_flow_graph_to_fluid_pass_tester.cc
@@ -44,6 +44,6 @@ TEST_F(DFG_Tester, Test) {
LOG(INFO) << graph.nodes.size();
}
-} // analysis
-} // inference
-} // paddle
+}; // namespace analysis
+}; // namespace inference
+}; // namespace paddle
diff --git a/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.cc b/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.cc
index f848a7d1add79c3032da7defc34a406dccf29d2e..9f67c989cca4a936cd320b73efaae277263fb3e2 100644
--- a/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.cc
+++ b/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.cc
@@ -12,9 +12,11 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
-#include "paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.h"
+#include
#include
+#include "paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.h"
+
namespace paddle {
namespace inference {
namespace analysis {
diff --git a/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.h b/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.h
index cd0d4fabaafe844bcc5bb8bfc2586971197d9167..33517e57becdffc0416f204247eac5feadb7ed82 100644
--- a/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.h
+++ b/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.h
@@ -19,6 +19,8 @@
#pragma once
+#include
+
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/inference/analysis/data_flow_graph.h"
#include "paddle/fluid/inference/analysis/pass.h"
diff --git a/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass_tester.cc b/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass_tester.cc
index 851c98bef305fa9e20dced5f7c26e9d1b6ddf4f2..817d32c92cdbdc234eef9ed5156891c2b11ced4c 100644
--- a/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass_tester.cc
+++ b/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass_tester.cc
@@ -32,6 +32,6 @@ TEST_F(DFG_Tester, Init) {
LOG(INFO) << '\n' << graph.DotString();
}
-} // analysis
-} // inference
-} // paddle
+} // namespace analysis
+} // namespace inference
+} // namespace paddle
diff --git a/paddle/fluid/inference/analysis/helper.h b/paddle/fluid/inference/analysis/helper.h
index 24ea9a4bae7132eb1692b0ffb02f8ab5e02b21a9..153dca576bd6734d62f00c4a7cb9b503506b33e2 100644
--- a/paddle/fluid/inference/analysis/helper.h
+++ b/paddle/fluid/inference/analysis/helper.h
@@ -50,7 +50,7 @@ struct DataTypeNamer {
return dic_.at(x);
}
- const std::string &repr(size_t &hash) const {
+ const std::string &repr(size_t &hash) const { // NOLINT
PADDLE_ENFORCE(dic_.count(hash), "unknown type for representation");
return dic_.at(hash);
}
@@ -62,7 +62,9 @@ struct DataTypeNamer {
SET_TYPE(float);
}
- std::unordered_map dic_;
+ std::unordered_map
+ dic_;
};
#undef SET_TYPE
diff --git a/paddle/fluid/inference/analysis/pass.h b/paddle/fluid/inference/analysis/pass.h
index 5c89b1304d84abc9a4942f12da46b4bfe76f44f5..aa0e8667b5e4a9e6156c25fcad03bb8eee3287f6 100644
--- a/paddle/fluid/inference/analysis/pass.h
+++ b/paddle/fluid/inference/analysis/pass.h
@@ -16,6 +16,7 @@ limitations under the License. */
#include
#include
+#include
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/inference/analysis/data_flow_graph.h"
diff --git a/paddle/fluid/inference/analysis/subgraph_splitter.h b/paddle/fluid/inference/analysis/subgraph_splitter.h
index ed90a0dcf31e154c4d82be08ce35e2f11d11c139..a31afbe6933da8d3c7a88142cc12d63b98b55796 100644
--- a/paddle/fluid/inference/analysis/subgraph_splitter.h
+++ b/paddle/fluid/inference/analysis/subgraph_splitter.h
@@ -18,6 +18,8 @@ limitations under the License. */
#pragma once
+#include
+
#include "paddle/fluid/inference/analysis/data_flow_graph.h"
#include "paddle/fluid/inference/analysis/node.h"
diff --git a/paddle/fluid/inference/analysis/ut_helper.h b/paddle/fluid/inference/analysis/ut_helper.h
index c86083d12153921672e15c172b874f77a8b46cde..722fa99a48a5f2b0e778904de0c35977d0ee3cc0 100644
--- a/paddle/fluid/inference/analysis/ut_helper.h
+++ b/paddle/fluid/inference/analysis/ut_helper.h
@@ -15,6 +15,7 @@ limitations under the License. */
#pragma once
#include
#include
+#include
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/inference/analysis/data_flow_graph.h"
#include "paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass.h"
diff --git a/paddle/fluid/inference/tensorrt/convert/ut_helper.h b/paddle/fluid/inference/tensorrt/convert/ut_helper.h
index 37fcb5c50309db0ad0924a057a6b481750665531..e46c577cdae145c0d4ceb6bfa307f03d313514ce 100644
--- a/paddle/fluid/inference/tensorrt/convert/ut_helper.h
+++ b/paddle/fluid/inference/tensorrt/convert/ut_helper.h
@@ -19,6 +19,9 @@ limitations under the License. */
#pragma once
+#include
+#include
+
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/inference/analysis/helper.h"
@@ -58,7 +61,7 @@ class TRTConvertValidation {
public:
TRTConvertValidation() = delete;
- TRTConvertValidation(int batch_size, int workspace_size = 1 << 10) {
+ explicit TRTConvertValidation(int batch_size, int workspace_size = 1024) {
// create engine.
engine_.reset(new TensorRTEngine(10, 1 << 10, &stream_));
engine_->InitNetwork();
diff --git a/paddle/fluid/inference/tensorrt/engine.cc b/paddle/fluid/inference/tensorrt/engine.cc
index fb27c8394c1f94953093ed90627e63e6241130ed..a88236ae98e1816fc43796ead596c432b798d7de 100644
--- a/paddle/fluid/inference/tensorrt/engine.cc
+++ b/paddle/fluid/inference/tensorrt/engine.cc
@@ -131,6 +131,20 @@ void* TensorRTEngine::GetOutputInGPU(const std::string& name) {
return buffer(name).buffer;
}
+void TensorRTEngine::GetOutputInGPU(const std::string& name, void* dst,
+ size_t max_size) {
+ // determine data size
+ auto it = buffer_sizes_.find(name);
+ PADDLE_ENFORCE(it != buffer_sizes_.end());
+ PADDLE_ENFORCE_GT(it->second, 0);
+ PADDLE_ENFORCE_GE(max_size, it->second);
+ auto& buf = buffer(name);
+ PADDLE_ENFORCE_NOT_NULL(buf.buffer, "buffer should be allocated before");
+ PADDLE_ENFORCE_EQ(cudaMemcpyAsync(dst, buf.buffer, it->second,
+ cudaMemcpyDeviceToDevice, *stream_),
+ 0);
+}
+
void TensorRTEngine::GetOutputInCPU(const std::string& name, void* dst,
size_t max_size) {
// determine data size
@@ -152,7 +166,7 @@ Buffer& TensorRTEngine::buffer(const std::string& name) {
return buffers_[slot_offset];
}
-void TensorRTEngine::SetInputFromCPU(const std::string& name, void* data,
+void TensorRTEngine::SetInputFromCPU(const std::string& name, const void* data,
size_t size) {
auto& buf = buffer(name);
PADDLE_ENFORCE_NOT_NULL(buf.buffer);
@@ -162,6 +176,16 @@ void TensorRTEngine::SetInputFromCPU(const std::string& name, void* data,
cudaMemcpyHostToDevice, *stream_));
}
+void TensorRTEngine::SetInputFromGPU(const std::string& name, const void* data,
+ size_t size) {
+ auto& buf = buffer(name);
+ PADDLE_ENFORCE_NOT_NULL(buf.buffer);
+ PADDLE_ENFORCE_LE(size, buf.max_size, "buffer is too small");
+ PADDLE_ENFORCE(buf.device == DeviceType::GPU);
+ PADDLE_ENFORCE_EQ(0, cudaMemcpyAsync(buf.buffer, data, size,
+ cudaMemcpyDeviceToDevice, *stream_));
+}
+
void TensorRTEngine::SetITensor(const std::string& name,
nvinfer1::ITensor* tensor) {
PADDLE_ENFORCE(tensor != nullptr);
diff --git a/paddle/fluid/inference/tensorrt/engine.h b/paddle/fluid/inference/tensorrt/engine.h
index b8298c6059e8644327194a1fcf7a7438cc9a7286..d9d3163b66d4c4c302d12edcc42f00e1cdfa5a30 100644
--- a/paddle/fluid/inference/tensorrt/engine.h
+++ b/paddle/fluid/inference/tensorrt/engine.h
@@ -92,13 +92,15 @@ class TensorRTEngine : public EngineBase {
cudaStream_t* stream() { return stream_; }
// Fill an input from CPU memory with name and size.
- void SetInputFromCPU(const std::string& name, void* data, size_t size);
+ void SetInputFromCPU(const std::string& name, const void* data, size_t size);
// TODO(Superjomn) is this method necessary given that buffer(xxx) can be
// accessed directly. Fill an input from GPU memory with name and size.
- void SetInputFromGPU(const std::string& name, void* data, size_t size);
+ void SetInputFromGPU(const std::string& name, const void* data, size_t size);
// Get an output called name, the output of tensorrt is in GPU, so this method
- // will just return the output's GPU memory address.
+ // Return the output's GPU memory address without copy.
void* GetOutputInGPU(const std::string& name);
+ // Copy data into dst inside the GPU device.
+ void GetOutputInGPU(const std::string& name, void* dst, size_t max_size);
// LOW EFFICENCY! Get output to CPU, this will trigger a memory copy from GPU
// to CPU.
void GetOutputInCPU(const std::string& name, void* dst, size_t max_size);
diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt
index e00cc73565fc98615090367606b6ba4f58feacfd..de6ff29c6f8edbcf930546ff157a1c226e1311db 100644
--- a/paddle/fluid/operators/CMakeLists.txt
+++ b/paddle/fluid/operators/CMakeLists.txt
@@ -168,6 +168,8 @@ function(op_library TARGET)
file(APPEND ${pybind_file} "USE_OP(relu);\n")
elseif(${TARGET} STREQUAL "reduce")
file(APPEND ${pybind_file} "USE_OP(reduce_sum);\n")
+ elseif(${TARGET} STREQUAL "fake_dequantize")
+ file(APPEND ${pybind_file} "USE_OP(fake_dequantize_max_abs);\n")
else()
file(APPEND ${pybind_file} "USE_OP(${TARGET});\n")
endif()
@@ -223,6 +225,11 @@ op_library(cross_entropy_op DEPS cross_entropy)
op_library(softmax_with_cross_entropy_op DEPS cross_entropy softmax)
op_library(softmax_op DEPS softmax)
op_library(sequence_softmax_op DEPS softmax)
+if (WITH_GPU AND TENSORRT_FOUND)
+ op_library(tensorrt_engine_op DEPS tensorrt_engine)
+else()
+ set(DEPS_OPS ${DEPS_OPS} tensorrt_engine_op)
+endif()
op_library(sum_op DEPS selected_rows_functor)
op_library(sgd_op DEPS selected_rows_functor)
op_library(print_op DEPS lod_tensor)
diff --git a/paddle/fluid/operators/cast_op.cc b/paddle/fluid/operators/cast_op.cc
index 84660d042c7b12283fabc316d29609f5eddb825d..8d6a498dc941e44688ec8a2b49a6e080608f9b85 100644
--- a/paddle/fluid/operators/cast_op.cc
+++ b/paddle/fluid/operators/cast_op.cc
@@ -89,4 +89,5 @@ REGISTER_OP_CPU_KERNEL(cast, ops::CastOpKernel,
ops::CastOpKernel,
ops::CastOpKernel,
ops::CastOpKernel,
+ ops::CastOpKernel,
ops::CastOpKernel);
diff --git a/paddle/fluid/operators/cast_op.cu b/paddle/fluid/operators/cast_op.cu
index c486c5850e25fcf4370f02cb145c244743a4cc4b..657d162878c108760585ca9bd58e2fd34bf1fef3 100644
--- a/paddle/fluid/operators/cast_op.cu
+++ b/paddle/fluid/operators/cast_op.cu
@@ -21,5 +21,5 @@ using CastOpKernel =
REGISTER_OP_CUDA_KERNEL(cast, CastOpKernel, CastOpKernel,
CastOpKernel, CastOpKernel,
- CastOpKernel,
+ CastOpKernel, CastOpKernel,
CastOpKernel);
diff --git a/paddle/fluid/operators/detail/CMakeLists.txt b/paddle/fluid/operators/detail/CMakeLists.txt
index b9a66474c9afc27462f9c47af1a0465e2cec70bc..cf20530513cf6cd420e56b2f6378225f73c2bc8b 100644
--- a/paddle/fluid/operators/detail/CMakeLists.txt
+++ b/paddle/fluid/operators/detail/CMakeLists.txt
@@ -1,6 +1,7 @@
if(WITH_DISTRIBUTE)
grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc
- grpc_server.cc variable_response.cc PROTO send_recv.proto DEPS lod_tensor selected_rows)
+ request_handler_impl.cc rpc_server.cc grpc_server.cc variable_response.cc PROTO send_recv.proto DEPS lod_tensor
+ selected_rows memory)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(serde_test.cc grpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(serde_test SRCS serde_test.cc variable_response.cc DEPS grpc++_unsecure grpc_unsecure gpr
diff --git a/paddle/fluid/operators/detail/grpc_client.cc b/paddle/fluid/operators/detail/grpc_client.cc
index f7ce7786874285795878b655365974f082c00b44..da9ca1a0c1d55018141f0e4285fe35d7c437fd55 100644
--- a/paddle/fluid/operators/detail/grpc_client.cc
+++ b/paddle/fluid/operators/detail/grpc_client.cc
@@ -205,6 +205,8 @@ void RPCClient::AsyncSendFetchBarrier(const std::string& ep, int64_t time_out) {
}
bool RPCClient::Wait() {
+ VLOG(3) << "RPCClient begin Wait()"
+ << " req_count_:" << req_count_;
if (req_count_ <= 0) {
return true;
}
diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc
index 361cc24b5ba11e2654f1282327730befaeca9f55..e73756d89004bc48339c0aa31dd0857c2ca6722d 100644
--- a/paddle/fluid/operators/detail/grpc_server.cc
+++ b/paddle/fluid/operators/detail/grpc_server.cc
@@ -1,4 +1,4 @@
-/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
+/*Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -12,19 +12,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
-#include "paddle/fluid/operators/detail/grpc_server.h"
-
#include
#include
-using ::grpc::ServerAsyncResponseWriter;
+#include "paddle/fluid/operators/detail/grpc_server.h"
-DEFINE_int32(rpc_server_handle_send_threads, 20,
- "Number of threads used to handle send at rpc server.");
-DEFINE_int32(rpc_server_handle_get_threads, 20,
- "Number of threads used to handle get at rpc server.");
-DEFINE_int32(rpc_server_handle_prefetch_threads, 1,
- "Number of threads used to handle prefetch at rpc server.");
+using ::grpc::ServerAsyncResponseWriter;
namespace paddle {
namespace operators {
@@ -36,49 +29,40 @@ enum CallStatus { PROCESS = 0, FINISH };
class RequestBase {
public:
explicit RequestBase(GrpcService::AsyncService* service,
- ::grpc::ServerCompletionQueue* cq, bool sync_mode,
- const platform::DeviceContext* dev_ctx)
+ ::grpc::ServerCompletionQueue* cq,
+ RequestHandler* request_handler, int req_id)
: service_(service),
cq_(cq),
- sync_mode_(sync_mode),
status_(PROCESS),
- dev_ctx_(dev_ctx) {
+ request_handler_(request_handler),
+ req_id_(req_id) {
PADDLE_ENFORCE(cq_);
}
virtual ~RequestBase() {}
- virtual void Process() { assert(false); }
+ virtual void Process() = 0;
CallStatus Status() { return status_; }
void SetStatus(CallStatus status) { status_ = status; }
- virtual std::string GetReqName() {
- assert(false);
- return "";
- }
+ virtual std::string GetReqName() = 0;
protected:
::grpc::ServerContext ctx_;
GrpcService::AsyncService* service_;
::grpc::ServerCompletionQueue* cq_;
- const bool sync_mode_;
CallStatus status_;
- const platform::DeviceContext* dev_ctx_;
+ RequestHandler* request_handler_;
+ int req_id_;
};
class RequestSend final : public RequestBase {
public:
explicit RequestSend(GrpcService::AsyncService* service,
- ::grpc::ServerCompletionQueue* cq, bool sync_mode,
- framework::Scope* scope, ReceivedQueue* queue,
- const platform::DeviceContext* dev_ctx, int req_id)
- : RequestBase(service, cq, sync_mode, dev_ctx),
- queue_(queue),
- responder_(&ctx_),
- req_id_(req_id) {
- if (sync_mode_) {
- request_.reset(new VariableResponse(scope, dev_ctx_, false));
- } else {
- request_.reset(new VariableResponse(scope, dev_ctx_, true));
- }
+ ::grpc::ServerCompletionQueue* cq,
+ RequestHandler* request_handler, int req_id)
+ : RequestBase(service, cq, request_handler, req_id), responder_(&ctx_) {
+ request_.reset(new VariableResponse(request_handler->scope(),
+ request_handler->dev_ctx(),
+ !request_handler->sync_mode()));
int method_id = static_cast(detail::GrpcMethod::kSendVariable);
service_->RequestAsyncUnary(
method_id, &ctx_, request_.get(), &responder_, cq_, cq_,
@@ -87,12 +71,17 @@ class RequestSend final : public RequestBase {
virtual ~RequestSend() {}
- virtual std::string GetReqName() { return request_->Varname(); }
+ std::string GetReqName() override { return request_->Varname(); }
+
+ void Process() override {
+ std::string varname = GetReqName();
+ VLOG(3) << "RequestSend var_name:" << varname;
- virtual void Process() {
- std::string var_name = GetReqName();
- VLOG(3) << "RequestSend " << var_name;
- queue_->Push(std::make_pair(var_name, request_));
+ auto scope = request_->GetMutableLocalScope();
+ auto invar = request_->GetVar();
+ framework::Variable* outvar = nullptr;
+
+ request_handler_->Handle(varname, scope, invar, &outvar);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
@@ -102,105 +91,85 @@ class RequestSend final : public RequestBase {
protected:
sendrecv::VoidMessage reply_;
std::shared_ptr request_;
- ReceivedQueue* queue_;
ServerAsyncResponseWriter responder_;
- int req_id_;
};
class RequestGet final : public RequestBase {
public:
explicit RequestGet(GrpcService::AsyncService* service,
- ::grpc::ServerCompletionQueue* cq, bool sync_mode,
- framework::Scope* scope,
- const platform::DeviceContext* dev_ctx,
- framework::BlockingQueue* queue,
- int req_id)
- : RequestBase(service, cq, sync_mode, dev_ctx),
- responder_(&ctx_),
- scope_(scope),
- queue_(queue),
- req_id_(req_id) {
+ ::grpc::ServerCompletionQueue* cq,
+ RequestHandler* request_handler, int req_id)
+ : RequestBase(service, cq, request_handler, req_id), responder_(&ctx_) {
auto method_id = static_cast(detail::GrpcMethod::kGetVariable);
service_->RequestAsyncUnary(
method_id, &ctx_, &request_, &responder_, cq_, cq_,
- reinterpret_cast(static_cast(req_id_)));
+ reinterpret_cast(static_cast(req_id)));
}
virtual ~RequestGet() {}
- virtual std::string GetReqName() { return request_.varname(); }
+ std::string GetReqName() override { return request_.varname(); }
- virtual void Process() {
+ void Process() override {
// proc request.
- std::string var_name = request_.varname();
- VLOG(3) << "RequestGet " << var_name;
- auto* var = scope_->FindVar(var_name);
+ std::string varname = request_.varname();
+ VLOG(3) << "RequestGet " << varname;
+
+ auto scope = request_handler_->scope();
+ auto invar = scope->FindVar(varname);
+ framework::Variable* outvar = nullptr;
- if (var_name != FETCH_BARRIER_MESSAGE) {
- SerializeToByteBuffer(var_name, var, *dev_ctx_, &reply_);
+ request_handler_->Handle(varname, scope, invar, &outvar);
+
+ if (outvar) {
+ SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
+ &reply_);
}
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast(static_cast(req_id_)));
-
- if (var_name == FETCH_BARRIER_MESSAGE) {
- sendrecv::VariableMessage msg;
- MessageWithName msg_with_name = std::make_pair(var_name, msg);
- queue_->Push(msg_with_name);
- }
}
protected:
sendrecv::VariableMessage request_;
::grpc::ByteBuffer reply_;
ServerAsyncResponseWriter<::grpc::ByteBuffer> responder_;
- framework::Scope* scope_;
- framework::BlockingQueue* queue_;
- int req_id_;
};
class RequestPrefetch final : public RequestBase {
public:
explicit RequestPrefetch(GrpcService::AsyncService* service,
- ::grpc::ServerCompletionQueue* cq, bool sync_mode,
- framework::Scope* scope,
- const platform::DeviceContext* dev_ctx,
- framework::Executor* executor,
- framework::ProgramDesc* program,
- framework::ExecutorPrepareContext* prefetch_ctx,
- int req_id)
- : RequestBase(service, cq, sync_mode, dev_ctx),
+ ::grpc::ServerCompletionQueue* cq,
+ RequestHandler* request_handler, int req_id)
+ : RequestBase(service, cq, request_handler, req_id),
responder_(&ctx_),
- scope_(scope),
- executor_(executor),
- program_(program),
- prefetch_ctx_(prefetch_ctx),
- req_id_(req_id) {
- // prefetch always create a new sub scope
- request_.reset(new VariableResponse(scope, dev_ctx_, true));
+ local_scope_(nullptr) {
+ request_.reset(new VariableResponse(request_handler->scope(),
+ request_handler->dev_ctx(), true));
int method_id = static_cast(detail::GrpcMethod::kPrefetchVariable);
service_->RequestAsyncUnary(
method_id, &ctx_, request_.get(), &responder_, cq_, cq_,
- reinterpret_cast(static_cast(req_id_)));
+ reinterpret_cast(static_cast(req_id)));
}
virtual ~RequestPrefetch() {}
- virtual std::string GetReqName() { return request_->Varname(); }
+ std::string GetReqName() override { return request_->Varname(); }
- virtual void Process() {
+ void Process() override {
// prefetch process...
+ std::string varname = request_->OutVarname();
+ VLOG(3) << "RequestPrefetch " << varname;
+
+ auto scope = request_->GetMutableLocalScope();
+ auto invar = scope->FindVar(varname);
+ framework::Variable* outvar = nullptr;
- std::string var_name = request_->OutVarname();
- VLOG(3) << "RequestPrefetch " << var_name;
- auto var_desc = program_->Block(0).FindVar(var_name);
- framework::Scope* local_scope = request_->GetMutableLocalScope();
- auto* var = local_scope->FindVar(var_name);
- InitializeVariable(var, var_desc->GetType());
- executor_->RunPreparedContext(prefetch_ctx_, local_scope);
+ request_handler_->Handle(varname, scope, invar, &outvar);
- SerializeToByteBuffer(var_name, var, *dev_ctx_, &reply_);
+ SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
+ &reply_);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
@@ -211,202 +180,169 @@ class RequestPrefetch final : public RequestBase {
std::shared_ptr request_;
::grpc::ByteBuffer reply_;
ServerAsyncResponseWriter<::grpc::ByteBuffer> responder_;
- framework::Scope* scope_;
- framework::Executor* executor_;
- framework::ProgramDesc* program_;
- framework::ExecutorPrepareContext* prefetch_ctx_;
- int req_id_;
+ framework::Scope* local_scope_;
};
-void AsyncGRPCServer::WaitClientGet(int count) {
- int fetch_barriers = 0;
- while (fetch_barriers < count) {
- auto msg = var_get_queue_.Pop();
- if (msg.first == FETCH_BARRIER_MESSAGE) {
- fetch_barriers++;
- }
- }
-}
-
void AsyncGRPCServer::WaitServerReady() {
+ VLOG(3) << "AsyncGRPCServer is wait server ready";
std::unique_lock lock(this->mutex_ready_);
condition_ready_.wait(lock, [=] { return this->ready_ == 1; });
+ VLOG(3) << "AsyncGRPCServer WaitSeverReady";
}
-void AsyncGRPCServer::RunSyncUpdate() {
+void AsyncGRPCServer::StartServer() {
::grpc::ServerBuilder builder;
- builder.AddListeningPort(address_, ::grpc::InsecureServerCredentials(),
+ builder.AddListeningPort(bind_address_, ::grpc::InsecureServerCredentials(),
&selected_port_);
+
builder.SetMaxSendMessageSize(std::numeric_limits::max());
builder.SetMaxReceiveMessageSize(std::numeric_limits::max());
builder.RegisterService(&service_);
- cq_send_ = builder.AddCompletionQueue();
- cq_get_ = builder.AddCompletionQueue();
- cq_prefetch_ = builder.AddCompletionQueue();
+ for (auto t : rpc_call_map_) {
+ rpc_cq_[t.first].reset(builder.AddCompletionQueue().release());
+ }
server_ = builder.BuildAndStart();
- LOG(INFO) << "Server listening on " << address_
+ LOG(INFO) << "Server listening on " << bind_address_
<< " selected port: " << selected_port_;
- std::function send_register = std::bind(
- &AsyncGRPCServer::TryToRegisterNewSendOne, this, std::placeholders::_1);
- std::function get_register = std::bind(
- &AsyncGRPCServer::TryToRegisterNewGetOne, this, std::placeholders::_1);
- std::function prefetch_register =
- std::bind(&AsyncGRPCServer::TryToRegisterNewPrefetchOne, this,
- std::placeholders::_1);
+ std::function f =
+ std::bind(&AsyncGRPCServer::TryToRegisterNewOne, this,
+ std::placeholders::_1, std::placeholders::_2);
- for (int i = 0; i < kSendReqsBufSize; ++i) {
- TryToRegisterNewSendOne(i);
- }
- for (int i = 0; i < kGetReqsBufSize; ++i) {
- TryToRegisterNewGetOne(i);
- }
- for (int i = 0; i < kPrefetchReqsBufSize; ++i) {
- TryToRegisterNewPrefetchOne(i);
- }
+ for (auto& t : rpc_call_map_) {
+ auto& rpc_name = t.first;
+ auto& cq = rpc_cq_[rpc_name];
+ auto threadnum = rpc_thread_num_[rpc_name];
+ auto& reqs = rpc_reqs_[rpc_name];
- for (int i = 0; i < FLAGS_rpc_server_handle_send_threads; ++i) {
- t_sends_.emplace_back(
- new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this,
- cq_send_.get(), "cq_send", send_register)));
- }
- for (int i = 0; i < FLAGS_rpc_server_handle_get_threads; ++i) {
- t_gets_.emplace_back(
- new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this,
- cq_get_.get(), "cq_get", get_register)));
- }
- for (int i = 0; i < FLAGS_rpc_server_handle_prefetch_threads; ++i) {
- t_prefetchs_.emplace_back(new std::thread(
- std::bind(&AsyncGRPCServer::HandleRequest, this, cq_prefetch_.get(),
- "cq_prefetch", prefetch_register)));
+ reqs.reserve(kRequestBufSize);
+
+ for (int i = 0; i < kRequestBufSize; i++) {
+ TryToRegisterNewOne(rpc_name, i);
+ }
+
+ for (int i = 0; i < threadnum; i++) {
+ rpc_threads_[rpc_name].emplace_back(new std::thread(std::bind(
+ &AsyncGRPCServer::HandleRequest, this, cq.get(), rpc_name, f)));
+ VLOG(3) << t.first << " creates threads!";
+ }
}
+
{
std::lock_guard lock(this->mutex_ready_);
ready_ = 1;
}
condition_ready_.notify_all();
+
// wait server
server_->Wait();
- for (int i = 0; i < FLAGS_rpc_server_handle_send_threads; ++i) {
- t_sends_[i]->join();
- }
- for (int i = 0; i < FLAGS_rpc_server_handle_get_threads; ++i) {
- t_gets_[i]->join();
- }
- for (int i = 0; i < FLAGS_rpc_server_handle_prefetch_threads; ++i) {
- t_prefetchs_[i]->join();
+
+ for (auto& t : rpc_threads_) {
+ auto& threads = t.second;
+ for (size_t i = 0; i < threads.size(); ++i) {
+ threads[i]->join();
+ VLOG(3) << t.first << " threads ends!";
+ }
}
}
void AsyncGRPCServer::ShutdownQueue() {
- std::unique_lock lock(cq_mutex_);
- cq_send_->Shutdown();
- cq_get_->Shutdown();
- cq_prefetch_->Shutdown();
+ for (auto& t : rpc_cq_) {
+ t.second->Shutdown();
+ VLOG(3) << t.first << " shutdown!";
+ }
}
-// This URL explains why shutdown is complicate:
-void AsyncGRPCServer::ShutDown() {
+void AsyncGRPCServer::ShutDownImpl() {
+ std::unique_lock lock(cq_mutex_);
is_shut_down_ = true;
ShutdownQueue();
+
+ VLOG(3) << "server_ shutdown!";
server_->Shutdown();
}
-void AsyncGRPCServer::TryToRegisterNewSendOne(int i) {
+void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name,
+ int req_id) {
std::unique_lock lock(cq_mutex_);
if (is_shut_down_) {
VLOG(3) << "shutdown, do not TryToRegisterNewSendOne";
return;
}
- RequestSend* send = new RequestSend(&service_, cq_send_.get(), sync_mode_,
- scope_, &var_recv_queue_, dev_ctx_, i);
- send_reqs_[i] = static_cast(send);
- VLOG(4) << "Create RequestSend status:" << send->Status();
-}
-void AsyncGRPCServer::TryToRegisterNewGetOne(int req_id) {
- std::unique_lock lock(cq_mutex_);
- if (is_shut_down_) {
- VLOG(3) << "shutdown, do not TryToRegisterNewGetOne";
- return;
+ VLOG(4) << "register send rpc_name:" << rpc_name
+ << ", handler:" << rpc_call_map_[kRequestSend];
+
+ auto& reqs = rpc_reqs_[rpc_name];
+ auto& handler = rpc_call_map_[rpc_name];
+ auto& cq = rpc_cq_[rpc_name];
+
+ RequestBase* b = nullptr;
+ if (rpc_name == kRequestSend) {
+ b = new RequestSend(&service_, cq.get(), handler, req_id);
+ } else if (rpc_name == kRequestGet) {
+ b = new RequestGet(&service_, cq.get(), handler, req_id);
+ } else if (rpc_name == kRequestPrefetch) {
+ b = new RequestPrefetch(&service_, cq.get(), handler, req_id);
+ } else {
+ PADDLE_ENFORCE(false, "not surpported rpc");
}
- RequestGet* get = new RequestGet(&service_, cq_get_.get(), sync_mode_, scope_,
- dev_ctx_, &var_get_queue_, req_id);
- get_reqs_[req_id] = static_cast(get);
- VLOG(4) << "Create RequestGet status:" << get->Status();
-}
-void AsyncGRPCServer::TryToRegisterNewPrefetchOne(int req_id) {
- std::unique_lock lock(cq_mutex_);
- if (is_shut_down_) {
- VLOG(3) << "shutdown, do not TryToRegisterNewPrefetchOne";
- return;
- }
- RequestPrefetch* prefetch = new RequestPrefetch(
- &service_, cq_prefetch_.get(), sync_mode_, scope_, dev_ctx_, executor_,
- program_, prefetch_ctx_.get(), req_id);
- prefetch_reqs_[req_id] = static_cast(prefetch);
+ reqs[req_id] = b;
- VLOG(4) << "Create RequestPrefetch status:" << prefetch->Status();
+ VLOG(4) << "Create RequestSend status:" << b->Status();
}
-// FIXME(typhoonzero): change cq_name to enum.
void AsyncGRPCServer::HandleRequest(
- ::grpc::ServerCompletionQueue* cq, const std::string& cq_name,
- std::function TryToRegisterNewOne) {
+ ::grpc::ServerCompletionQueue* cq, const std::string& rpc_name,
+ std::function TryToRegisterNewOne) {
void* tag = NULL;
bool ok = false;
while (true) {
- VLOG(3) << "HandleRequest for " << cq_name << " wait Next";
+ VLOG(3) << "HandleRequest " << rpc_name << " wait next";
if (!cq->Next(&tag, &ok)) {
- LOG(INFO) << cq_name << " CompletionQueue shutdown!";
+ LOG(INFO) << "CompletionQueue " << rpc_name << " shutdown!";
break;
}
- VLOG(3) << "HandleRequest for " << cq_name << " get Next";
- int req_id = static_cast(reinterpret_cast(tag));
- if (sync_mode_) {
- // FIXME(typhoonzero): de-couple the barriers with recv_op
- if (!is_shut_down_ && cq_name == "cq_get") WaitCond(1);
- if (!is_shut_down_ && cq_name == "cq_send") WaitCond(0);
- VLOG(3) << "HandleRequest for " << cq_name << " after WaitCond";
- }
+ int req_id = static_cast(reinterpret_cast(tag));
+ VLOG(3) << "HandleRequest " << rpc_name << ", req_id:" << req_id
+ << " get next";
+ auto& reqs = rpc_reqs_[rpc_name];
RequestBase* base = nullptr;
{
- std::lock_guard l(cq_mutex_);
- if (cq_name == "cq_get") {
- base = get_reqs_[req_id];
- } else if (cq_name == "cq_send") {
- base = send_reqs_[req_id];
- } else if (cq_name == "cq_prefetch") {
- base = prefetch_reqs_[req_id];
- }
+ PADDLE_ENFORCE(req_id >= 0 && req_id < kRequestBufSize);
+ std::unique_lock lock(cq_mutex_);
+ base = reqs[req_id];
}
+
// reference:
// https://github.com/tensorflow/tensorflow/issues/5596
// https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM
// https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I
if (!ok) {
- LOG(WARNING) << cq_name << " recv no regular event:argument name["
+ LOG(WARNING) << "completion queue:" << rpc_name
+ << " recv no regular event:argument name["
<< base->GetReqName() << "]";
- TryToRegisterNewOne(req_id);
+ TryToRegisterNewOne(rpc_name, req_id);
delete base;
continue;
}
+ VLOG(3) << "queue id:" << rpc_name << ", req_id:" << req_id
+ << ", status:" << base->Status();
+
switch (base->Status()) {
case PROCESS: {
base->Process();
- VLOG(4) << cq_name << " PROCESS status:" << base->Status();
break;
}
case FINISH: {
- TryToRegisterNewOne(req_id);
- VLOG(4) << cq_name << " FINISH status:" << base->Status();
+ TryToRegisterNewOne(rpc_name, req_id);
delete base;
break;
}
@@ -415,20 +351,6 @@ void AsyncGRPCServer::HandleRequest(
}
}
-void AsyncGRPCServer::WaitCond(int cond) {
- std::unique_lock lock(this->barrier_mutex_);
- barrier_condition_.wait(lock,
- [=] { return this->barrier_cond_step_ == cond; });
-}
-
-void AsyncGRPCServer::SetCond(int cond) {
- {
- std::lock_guard lock(this->barrier_mutex_);
- barrier_cond_step_ = cond;
- }
- barrier_condition_.notify_all();
-}
-
} // namespace detail
} // namespace operators
} // namespace paddle
diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h
index bdff9801a928699f8391bfb68c1c7bd2d75aa642..d1fcbc414f123c5c4810d9cecf807a406aa2c405 100644
--- a/paddle/fluid/operators/detail/grpc_server.h
+++ b/paddle/fluid/operators/detail/grpc_server.h
@@ -14,6 +14,8 @@ limitations under the License. */
#pragma once
+#include |