提交 b35ea1a4 编写于 作者: Y Yancey1989

Merge branch 'develop' of github.com:PaddlePaddle/Paddle into overlap_send_op

......@@ -9,7 +9,7 @@ import subprocess
import platform
COPYRIGHT = '''
Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......
......@@ -62,9 +62,9 @@ Please refer to our [release announcement](https://github.com/PaddlePaddle/Paddl
## Installation
It is recommended to check out the
[Docker installation guide](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/docker_install_en.html)
[Docker installation guide](http://www.paddlepaddle.org/docs/develop/documentation/fluid/en/build_and_install/docker_install_en.html)
before looking into the
[build from source guide](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/build_from_source_en.html).
[build from source guide](http://www.paddlepaddle.org/docs/develop/documentation/fluid/en/build_and_install/build_from_source_en.html).
## Documentation
......
......@@ -45,22 +45,25 @@ IF(${CBLAS_PROVIDER} STREQUAL "MKLML")
ELSE()
MESSAGE(FATAL_ERROR "Should enable MKLML when build MKLDNN")
ENDIF()
SET(MKLDNN_CFLAG "${CMAKE_C_FLAGS} -Wno-error=strict-overflow")
SET(MKLDNN_CXXFLAG "${CMAKE_CXX_FLAGS} -Wno-error=strict-overflow")
SET(MKLDNN_FLAG "-Wno-error=strict-overflow -Wno-error=unused-result -Wno-unused-result")
SET(MKLDNN_CFLAG "${CMAKE_C_FLAGS} ${MKLDNN_FLAG}")
SET(MKLDNN_CXXFLAG "${CMAKE_CXX_FLAGS} ${MKLDNN_FLAG}")
ExternalProject_Add(
${MKLDNN_PROJECT}
${EXTERNAL_PROJECT_LOG_ARGS}
DEPENDS ${MKLDNN_DEPENDS}
GIT_REPOSITORY "https://github.com/01org/mkl-dnn.git"
GIT_TAG "v0.11"
GIT_TAG "v0.14"
PREFIX ${MKLDNN_SOURCES_DIR}
UPDATE_COMMAND ""
# Patch MKLDNN to compile with gcc 4.8, the related issue is in intel/mkl-dnn#237.
PATCH_COMMAND ${CMAKE_COMMAND} -E copy_if_different ${CMAKE_CURRENT_SOURCE_DIR}/patches/mkldnn.hpp ${MKLDNN_SOURCES_DIR}/src/extern_mkldnn/include/mkldnn.hpp
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${MKLDNN_INSTALL_DIR}
CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
CMAKE_ARGS -DMKLROOT=${MKLML_ROOT}
CMAKE_ARGS -DCMAKE_C_FLAGS=${MKLDNN_CFLAG}
CMAKE_ARGS -DCMAKE_CXX_FLAGS=${MKLDNN_CXXFLAG}
CMAKE_ARGS -DWITH_TEST=OFF -DWITH_EXAMPLE=OFF
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${MKLDNN_INSTALL_DIR}
-DMKLROOT:PATH=${MKLML_ROOT}
)
......
......@@ -27,7 +27,7 @@ ENDIF()
INCLUDE(ExternalProject)
SET(MKLML_PROJECT "extern_mklml")
SET(MKLML_VER "mklml_lnx_2018.0.1.20171007")
SET(MKLML_VER "mklml_lnx_2018.0.3.20180406")
SET(MKLML_URL "http://paddlepaddledeps.bj.bcebos.com/${MKLML_VER}.tgz")
SET(MKLML_SOURCE_DIR "${THIRD_PARTY_PATH}/mklml")
SET(MKLML_DOWNLOAD_DIR "${MKLML_SOURCE_DIR}/src/${MKLML_PROJECT}")
......
# Embed Paddle Inference in Your Application
Paddle inference offers the APIs in `C` and `C++` languages.
One can easily deploy a model trained by Paddle following the steps as below:
1. Optimize the native model;
2. Write some codes for deployment.
Let's explain the steps in detail.
## Optimize the native Fluid Model
The native model that get from the training phase needs to be optimized for that.
- Clean the noise such as the cost operators that do not need inference;
- Prune unnecessary computation fork that has nothing to do with the output;
- Remove extraneous variables;
- Memory reuse for native Fluid executor;
- Translate the model storage format to some third-party engine's, so that the inference API can utilize the engine for acceleration;
We have an official tool to do the optimization, call `paddle_inference_optimize --help` for more information.
## Write some codes
Read `paddle_inference_api.h` for more information.
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <string>
#include <vector>
namespace paddle {
class Predictor {
public:
struct Attr;
Predictor() = default;
// Build the network before inference.
bool Init(const Attr& attr);
// Predict an record.
// Arguments:
// inputs: the name of the input variables.
// outputs: the name of the output varaibles.
// input_shapes: the shape of the input variables.
// output_shapes: the shape of the output variables.
// input_data: the data of the input variables.
// output_data: the data of the output variables.
bool Run(const std::vector<std::string>& inputs,
const std::vector<std::string>& outputs,
const std::vector<std::vector<int>>& input_shapes,
const std::vector<std::vector<int>>& output_shapes,
const std::vector<std::vector<float>>& input_data,
std::vector<std::vector<float>>* output_data);
// Clone a predictor that share the model weights.
Predictor* Clone();
// Destroy the Predictor.
~Predictor();
struct Attr {
enum class EngineKind;
std::string model_dir; // path to the model directory.
bool enable_engine{false}; // Enable to execute (part of) the model on
// third-party engines.
EngineKind engine_kind{Attr::EngineKind::kNone};
enum class EngineKind {
kNone = -1, // Use the native Fluid facility.
kAnakin, // Use Anakin for inference.
kTensorRT, // Use TensorRT for inference.
kAutoMixedAnakin, // Automatically mix Fluid with Anakin.
kAutoMixedTensorRT, // Automatically mix Fluid with TensorRT.
};
};
};
} // namespace paddle
../../v2/build_and_install/paddleci.png
\ No newline at end of file
......@@ -155,7 +155,7 @@ into offsets
3 2+3 4+5 1+9 2+10 3+12
```
so we know that the first sentence is from word 0 to word 3, and the second sentence from work 3 to word 5.
so we know that the first sentence is from word 0 to word 3, and the second sentence from word 3 to word 5.
Similarly, the lengths in the top level LoD
......
......@@ -77,8 +77,7 @@ print "The sematic-vector of testA: ", paddle.infer(fA, parameters, testA)
### Example 2. Sharing Parameters between "Models"
We use [GAN](https://github.com/PaddlePaddle/book/tree/develop/gan) in
this example. In the following example program, `d0` and `d1`
We use GAN in this example. In the following example program, `d0` and `d1`
correspond to the two networks in the following figure:
<img src="https://github.com/wangyang59/book/raw/00036f4b0da5225041a6824587c1a01cf20159b1/gan/image/gan_ig.png" width=400 />
......
......@@ -125,12 +125,12 @@ Compile Time -> IR -> Runtime
## Operator/OpWithKernel/OpKernel
![class_diagram](http://api.paddlepaddle.org/graphviz?dot=https://gist.githubusercontent.com/reyoung/53df507f6749762675dff3e7ce53372f/raw/49caf1fb70820fb4a6c217634317c9306f361f36/op_op_with_kern_class_diagram.dot)
![class_diagram](https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/op_op_with_kern_class_diagram.dot)
---
## Operator
![class_diagram](http://api.paddlepaddle.org/graphviz?dot=https://gist.githubusercontent.com/reyoung/53df507f6749762675dff3e7ce53372f/raw/dd598e8f1976f5759f58af5e5ef94738a6b2e661/op.dot)
![class_diagram](https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/op.dot)
* `Operator` is the fundamental building block of the user interface.
* Operator stores input/output variable names and attributes.
......@@ -141,7 +141,7 @@ Compile Time -> IR -> Runtime
## OpWithKernel/Kernel
![class_diagram](http://api.paddlepaddle.org/graphviz?dot=https://gist.githubusercontent.com/reyoung/53df507f6749762675dff3e7ce53372f/raw/9d7f4eba185cf41c8e2fbfb40ae21890dbddcd39/op_with_kernel.dot)
![class_diagram](https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/op_with_kernel.dot)
* `OpWithKernel` inherits `Operator`.
* `OpWithKernel` contains a Kernel map.
......
......@@ -75,7 +75,7 @@ Different layout leads to different implementation of the operator kernel. There
- The inference of Layout is at run-time, not at compile-time.
- Every operator has to implement different kernels for different layouts. Let's take MKLDNN as an example. If we want to implement an MKLDNN convolution operator, we have to implement all the kernels for different layouts, which are listed [here](http://01org.github.io/mkl-dnn/structmkldnn_1_1memory.html). And we will have a special macro to register kernels for MKLDNN operators.
- Every operator has to implement different kernels for different layouts. Let's take MKLDNN as an example. If we want to implement an MKLDNN convolution operator, we have to implement all the kernels for different layouts, which are listed [here](http://intel.github.io/mkl-dnn/structmkldnn_1_1memory.html). And we will have a special macro to register kernels for MKLDNN operators.
`Layout` is also defined as a enum variable:
......
# Distributed Training with NCCL2 and RDMA
When doing distributed multi-GPU training, network bandwith often becomes the
bottle neck. We introduce a way to use NCCL2 to do such training job to
achieve best performace.
## Prepare Hardwares with RDMA and Multiple GPUs
I'm using two Linux servers each of them is installed with 8 GPUs and
one 100Gb RDMA card.
Base environment is:
* OS: CentOS 7.4
* RDMA device: "Mellanox Technologies MT27700 Family [ConnectX-4]"
* Kernel version: `4.4.88-1.el7.elrepo.x86_64`
* Docker version: `1.12.6`
* Docker storage driver: `overlay2`
* IP addresses: 192.168.16.30,192.168.16.34
In general, the steps including:
1. Install GPU drivers
1. Install RDMA drivers
1. Install "InfiniBand Support"
1. Use docker to run tests and make sure GPUs and RDMA can work inside
the container.
I'll ommit section "Install GPU drivers" because we can find it easily
somewhere else.
### Install RDMA drivers
For my case, I've got two machines with device
"Mellanox Technologies MT27700 Family [ConnectX-4]" installed. The OS was
"CentOS 7.4" and I updated the kernel to version 4.4 so that docker can
work with latest overlay2 filesystem.
***NOTE: before you start, make sure you have a way to get a console
of the server other than ssh because we may need to re-configure the
network device.***
1. Go to http://www.mellanox.com/page/products_dyn?product_family=26,
download `MLNX_OFED` software in the bottom of the page, and upload it
onto the server.
1. Run `./mlnxofedinstall --add-kernel-support` in the software package.
1. Run `/etc/init.d/openibd restart` to make everything work, note that
this operation may cause the network goes down if you are using this
RDMA device as default network device and use ssh to login the server.
1. Re-configure the network interface, for example:
`ifconfig eth2 192.168.16.30/20 up`, then add routes if needed:
`ip route add default via 192.168.16.1 dev eth2`.
1. Do the same thing on the other node.
1. Use `ping` to test if the two nodes have typical ICMP connection.
1. Use either `udaddy` or `ib_write_bw` to test the network connection is
ready and have the desired bandwith.
### Prepare Docker Image to Run RDMA Programs
1. Build a docker image using cuda base image like: `nvidia/cuda:8.0-cudnn5-devel-ubuntu16.04` and install paddlepaddle whl
package in it.
1. Start a docker container and mount GPU driver libs into it (you can
skip this step if you are using nvidia-docker).
1. Mount RDMA dirvers and libs into the docker image (see below section),
also `udaddy` and `ib_write_bw` if needed.
1. Mount GPU devices and RDMA devices into the container using `--device`
or just use privileged mode `--privileged`.
1. Start the container using host network mode: `--net=host`
### RDMA Library Files Needed
Usually, `MLNX_OFED` install latest supported libs under
`/usr/lib64/mlnx_ofed/valgrind`. Other libs also needed to run RDMA programs
is listed below. These libs must be mounted into the docker container.
* Libs under `/usr/lib64/mlnx_ofed/valgrind`
* libibcm.so
* libibverbs.so
* libmlx4.so
* libmlx5.so
* libmlx5-rdmav2.so
* librdmacm.so
* Other libs:
* libnl-3.so.200
* libnl-route-3.so.200
* libnuma.so.1
## Start to Run the Training Job
Setting NCCL environment variables to turn NCCL switches on and off:
| Env Name | Description |
| --- | --- |
| NCCL_SOCKET_IFNAME | The RDMA device, e.g. eth2 |
| NCCL_P2P_DISABLE | Set to 1 to disable P2P transfer between GPUs |
| NCCL_IB_DISABLE | Set to 1 to disable using RDMA |
| NCCL_IB_CUDA_SUPPORT | Set to 1 to enable GPU Direct if supported |
| NCCL_DEBUG | Set debug level: VERSION, WARN, INFO |
My two servers are: `192.168.16.30,192.168.16.34`, On node 1, Run :
```bash
PADDLE_TRAINER_ID=0 PADDLE_PORT=48372 PADDLE_WORKERS=192.168.16.30,192.168.16.34 POD_IP=192.168.16.30 stdbuf -oL python vgg16.py
```
On node 2, Run:
```bash
PADDLE_TRAINER_ID=1 PADDLE_PORT=48372 PADDLE_WORKERS=192.168.16.30,192.168.16.34 POD_IP=192.168.16.34 stdbuf -oL python vgg16.py
```
digraph sample {
graph [rankdir=TD]; node [shape=record];
op [label="{Operator| InferShape()=0\lRun()=0\l | map&#60;string, string[]&#62; inputs_\lmap&#60;string, string[]&#62; outputs_ \l AttributeMap attrs_\l}"];
}
\ No newline at end of file
digraph sample {
graph [rankdir=TD]; node [shape=record];
op [label="{Operator| InferShape()=0\lRun()=0\l | map&#60;string, string[]&#62; inputs_\lmap&#60;string, string[]&#62; outputs_ \l AttributeMap attrs_\l}"];
op_with_kern [label="{OpWithKernel | InferShape()=0\lRun()\l | map&#60;OpKernelKey,OpKernel&#62;kernels_ }"]
op_kernel [label="{OpKernel | Compute()=0}"]
op_kernel_key [label="{OpKernelKey| Place place\n...}"]
op -> op_with_kern [dir=back, arrowtail=onormal]
op_with_kern -> op_kernel [arrowhead=vee, label="contains many"]
{
rank=same;
op_with_kern
op_kernel
}
op_kernel -> op_kernel_key [style=invis]
{
rank=same;
op_kernel
op_kernel_key
}
op_with_kern -> op_kernel_key [arrowhead=vee, label ="\nas map key"]
mul_op [label="MulOp"]
op_with_kern -> mul_op [dir=back, arrowtail=onormal]
mul_kernel [label="template &#60;typename Place&#62;\lclass MulOpKernel\l"]
op_kernel -> mul_kernel [dir=back, arrowtail=onormal]
mul_op -> mul_kernel [arrowhead=vee, label="register many"]
{
rank=same;
mul_op;
mul_kernel;
}
}
\ No newline at end of file
digraph sample {
graph [rankdir=TD]; node [shape=record];
op [label="{Operator}"];
op_with_kern [label="{OpWithKernel | InferShape()=0\lRun()\l | map&#60;OpKernelKey,OpKernel&#62;kernels_ }"]
op_kernel [label="{OpKernel | Compute()=0}"]
op_kernel_key [label="{OpKernelKey| Place place\n...}"]
op -> op_with_kern [dir=back, arrowtail=onormal]
op_with_kern -> op_kernel [arrowhead=vee, label="contains many"]
{
rank=same;
op_with_kern
op_kernel
}
op_kernel -> op_kernel_key [style=invis]
{
rank=same;
op_kernel
op_kernel_key
}
op_with_kern -> op_kernel_key [arrowhead=vee, label ="\nas map key"]
}
\ No newline at end of file
......@@ -5,7 +5,7 @@
充分展现英特尔平台的优势,有效提升PaddlePaddle在英特尔架构上的性能。
<div align="center">
<img src="image/overview.png"><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/v2/images/overview.png"><br/>
Figure 1. PaddlePaddle on IA
</div>
......@@ -42,16 +42,43 @@ Figure 1. PaddlePaddle on IA
MKL,MKLML以及MKL-DNN三者关系如下表:
| Name | Open Source | License | Descriptions |
| :---------- | :--------------- | :---------- | :------------ |
| MKL | No | Proprietary | Accelerate math processing routines |
| MKLML | No | Proprietary | Small package of MKL, especially for Machine Learning |
| MKL-DNN | Yes | Apache 2.0 | Accelerate primitives processing routines especially for Deep Neural Networks |
<table>
<thead>
<tr>
<th>Name</th>
<th>Open Source</th>
<th>License</th>
<th>Descriptions</th>
</tr>
</thead>
<tbody>
<tr>
<td>MKL</td>
<td>No</td>
<td>Proprietary</td>
<td>Accelerate math processing routines</td>
</tr>
<tr>
<td>MKLML</td>
<td>No</td>
<td>Proprietary</td>
<td>Small package of MKL, especially for Machine Learning</td>
</tr>
<tr>
<td>MKL-DNN</td>
<td>Yes</td>
<td>Apache 2.0</td>
<td>Accelerate primitives processing routines especially for Deep Neural Networks</td>
</tr>
</tbody>
</table>
MKLML可以与MKL-DNN共同使用,以此达到最好的性能。
<div align="center">
<img src="image/engine.png"><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/v2/images/engine.png"><br/>
Figure 2. PaddlePaddle with MKL Engines
</div>
......@@ -103,7 +130,7 @@ MKL-DNN的库目前只有动态库`libmkldnn.so`。
所以我们定义了一个`MKLDNNMatrix`用于管理MKL-DNN数据的不同格式以及相互之间的转换。
<div align="center">
<img src="image/matrix.png"><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/v2/images/matrix.png"><br/>
Figure 3. MKLDNNMatrix
</div>
......@@ -113,7 +140,7 @@ Figure 3. MKLDNNMatrix
子类只需要使用定义好的接口,实现具体的函数功能即可。
<div align="center">
<img src="image/layers.png"><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/v2/images/layers.png"><br/>
Figure 4. MKLDNNLayer
</div>
......@@ -150,7 +177,7 @@ Figure 4. MKLDNNLayer
所以整体上,在实现每个子类的时候就不需要关心分支的事情了。
<div align="center">
<img src="image/gradients.png"><br/>
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/v2/images/gradients.png"><br/>
Figure 5. Merge Gradients
</div>
......
digraph G{
subgraph cluster_timestep0 {
label="recurrent timestep i-1"
bgcolor=lightgray
node [style=filled,color=white]
fc0_0 [label="fc 0"]
fc0_1 [label="fc 1"]
fc0_2 [label="fc 2"]
fc0_0 -> fc0_1
fc0_1 -> fc0_2
}
subgraph cluster_timestep1 {
label="recurrent timestep i"
node [style=filled];
fc1_0 [label="fc 0"]
fc1_1 [label="fc 1"]
fc1_2 [label="fc 2"]
color=blue
fc1_0 -> fc1_1
fc1_1 -> fc1_2
}
subgraph cluster_timestep2 {
label="recurrent timestep i+1"
bgcolor=lightgray
node [style=filled,color=white]
fc2_0 [label="fc 0"]
fc2_1 [label="fc 1"]
fc2_2 [label="fc 2"]
fc2_0 -> fc2_1
fc2_1 -> fc2_2
}
fc0_1 -> fc1_1 [style="dotted" constraint=false]
fc1_1 -> fc2_1 [style="dotted" constraint=false]
}
\ No newline at end of file
digraph G{
subgraph cluster_timestep0 {
label="recurrent timestep i-1"
bgcolor=lightgray
node [style=filled,color=white]
fc0_0 [label="fc 0"]
fc0_1 [label="fc 1"]
fc0_2 [label="fc 2"]
m0 [label="memory"]
fc0_0 -> fc0_1
fc0_1 -> fc0_2
fc0_1 -> m0
m0 -> fc0_1
}
subgraph cluster_timestep1 {
label="recurrent timestep i"
node [style=filled];
fc1_0 [label="fc 0"]
fc1_1 [label="fc 1"]
fc1_2 [label="fc 2"]
m1 [label="memory"]
color=blue
fc1_0 -> fc1_1
fc1_1 -> fc1_2
fc1_1 -> m1
m1 -> fc1_1
}
subgraph cluster_timestep2 {
label="recurrent timestep i+1"
bgcolor=lightgray
node [style=filled,color=white]
fc2_0 [label="fc 0"]
fc2_1 [label="fc 1"]
fc2_2 [label="fc 2"]
m2 [label="memory"]
fc2_0 -> fc2_1
fc2_1 -> fc2_2
fc2_1 -> m2
m2 -> fc2_1
}
m0 -> m1 [style="dotted" constraint=false]
m1 -> m2 [style="dotted" constraint=false]
}
\ No newline at end of file
digraph G {
rankdir=LR;
subgraph cluster_t0 {
a [label="4"]
b [label="5"]
c [label="2"]
}
subgraph cluster_t1 {
d [label="0"]
e [label="9"]
}
subgraph cluster_t2 {
f [label="8"]
g [label="1"]
h [label="4"]
}
a -> b;
b -> c;
c -> d [constraint=false];
d -> e;
e -> f [constraint=false];
f -> g;
g -> h;
}
\ No newline at end of file
digraph G {
rankdir=LR;
a [label="4"]
b [label="5"]
c [label="2"]
d [label="0"]
e [label="9"]
f [label="8"]
g [label="1"]
h [label="4"]
a -> b;
b -> c;
c -> d;
d -> e;
e -> f;
f -> g;
g -> h;
}
\ No newline at end of file
......@@ -57,7 +57,7 @@ cc_library(data_transform SRCS data_transform.cc DEPS math_function tensor
cc_library(attribute SRCS attribute.cc DEPS framework_proto boost)
cc_test(program_desc_test SRCS program_desc_test.cc DEPS proto_desc
device_context)
cc_library(op_proto_maker SRCS op_proto_maker.cc DEPS framework_proto attribute)
cc_library(op_proto_maker SRCS op_proto_maker.cc DEPS framework_proto attribute glog)
cc_test(op_proto_maker_test SRCS op_proto_maker_test.cc DEPS op_proto_maker)
cc_library(op_info SRCS op_info.cc DEPS attribute framework_proto)
cc_library(shape_inference SRCS shape_inference.cc DEPS ddim attribute device_context)
......
......@@ -134,6 +134,11 @@ OpDesc *BlockDesc::PrependOp() {
return ops_.front().get();
}
void BlockDesc::PrependAllocatedOp(std::unique_ptr<OpDesc> &&op_desc) {
need_update_ = true;
ops_.emplace_front(std::move(op_desc));
}
OpDesc *BlockDesc::InsertOp(size_t index) {
need_update_ = true;
auto it = ops_.begin() + index;
......
......@@ -88,6 +88,8 @@ class BlockDesc {
OpDesc *PrependOp();
void PrependAllocatedOp(std::unique_ptr<OpDesc> &&op_desc);
OpDesc *InsertOp(size_t index);
/*
......
......@@ -32,8 +32,7 @@ struct AddFunctor {
class OpKernelTestProtoAndCheckerMaker : public OpProtoAndCheckerMaker {
public:
OpKernelTestProtoAndCheckerMaker(OpProto* proto, OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
void Make() {
AddInput("input", "input1 of test op");
AddOutput("output", "output of test op");
AddAttr<bool>("use_gpu", "force to use gpu kernel").SetDefault(false);
......
......@@ -38,9 +38,7 @@ void BroadcastOpHandle::RunImpl() {
out_var_handles.size(), places_.size(),
"The number of output should equal to the number of places.");
// Wait input done, this Wait is asynchronous operation platform::Place
// &in_place;
WaitInputVarGenerated(*in_var_handle);
WaitInputVarGenerated();
std::vector<const Scope *> var_scopes;
for (auto *s : local_scopes_) {
......@@ -50,29 +48,9 @@ void BroadcastOpHandle::RunImpl() {
auto *in_var =
var_scopes.at(in_var_handle->scope_idx_)->FindVar(in_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(in_var);
Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);
// NOTE: The tensors' Place of input and output must be all on GPU or all on
// CPU.
for (auto *out_var_handle : out_var_handles) {
if (out_var_handle->IsTheSameVar(*in_var_handle)) {
continue;
}
auto t_out_p = out_var_handle->place_;
auto *out_var = var_scopes.at(out_var_handle->scope_idx_)
->FindVar(out_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
if (platform::is_gpu_place(in_tensor.place())) {
PADDLE_ENFORCE(platform::is_gpu_place(t_out_p),
"Places of input and output must be all on GPU.");
} else {
t_out_p = platform::CPUPlace();
}
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(t_out_p,
in_tensor.type());
}
InitOutputValue(*in_var_handle, out_var_handles);
if (platform::is_cpu_place(in_tensor.place())) {
for (auto *out_var_handle : out_var_handles) {
......@@ -147,11 +125,37 @@ void BroadcastOpHandle::RunImpl() {
}
}
void BroadcastOpHandle::WaitInputVarGenerated(const VarHandle &in_var) {
if (in_var.generated_op_) {
for (auto &pair : dev_ctxes_) {
in_var.generated_op_->Wait(pair.second);
void BroadcastOpHandle::InitOutputValue(
const VarHandle &in_var_handle,
const std::vector<VarHandle *> &out_var_handles) const {
std::vector<const Scope *> var_scopes;
for (auto *s : local_scopes_) {
var_scopes.emplace_back(s->FindVar(kLocalExecScopeName)->Get<Scope *>());
}
auto *in_var =
var_scopes.at(in_var_handle.scope_idx_)->FindVar(in_var_handle.name_);
Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);
// NOTE: The tensors' Place of input and output must be all on GPU or all on
// CPU.
for (auto *out_var_handle : out_var_handles) {
if (out_var_handle->IsTheSameVar(in_var_handle)) {
continue;
}
auto t_out_p = out_var_handle->place_;
auto *out_var = var_scopes.at(out_var_handle->scope_idx_)
->FindVar(out_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
if (is_gpu_place(in_tensor.place())) {
PADDLE_ENFORCE(platform::is_gpu_place(t_out_p),
"Places of input and output must be all on GPU.");
} else {
t_out_p = platform::CPUPlace();
}
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(t_out_p,
in_tensor.type());
}
}
......
......@@ -57,7 +57,6 @@ struct BroadcastOpHandle : public OpHandleBase {
protected:
void RunImpl() override;
void WaitInputVarGenerated(const VarHandle &in_var);
private:
const std::vector<Scope *> &local_scopes_;
......@@ -65,6 +64,9 @@ struct BroadcastOpHandle : public OpHandleBase {
#ifdef PADDLE_WITH_CUDA
const platform::NCCLContextMap *nccl_ctxs_;
#endif
void InitOutputValue(const VarHandle &in_var_handle,
const std::vector<VarHandle *> &out_var_handles) const;
};
} // namespace details
} // namespace framework
......
......@@ -26,20 +26,20 @@ ComputationOpHandle::ComputationOpHandle(const OpDesc &op_desc, Scope *scope,
place_(place) {}
void ComputationOpHandle::RunImpl() {
auto *cur_ctx = dev_ctxes_[place_];
for (auto *in : inputs_) {
bool need_wait = in->generated_op_ &&
in->generated_op_->DeviceContext(place_) != cur_ctx;
if (need_wait) {
in->generated_op_->Wait(cur_ctx);
}
}
WaitInputVarGenerated(place_);
this->RunAndRecordEvent([this] {
op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(), place_);
});
}
bool ComputationOpHandle::NeedWait(VarHandleBase *in_var) {
bool need_wait =
in_var && in_var->generated_op_ &&
in_var->generated_op_->DeviceContext(place_) != dev_ctxes_[place_];
return need_wait;
}
std::string ComputationOpHandle::Name() const { return op_->Type(); }
} // namespace details
} // namespace framework
......
......@@ -36,6 +36,8 @@ struct ComputationOpHandle : public OpHandleBase {
protected:
void RunImpl() override;
bool NeedWait(VarHandleBase *in_var) override;
private:
std::unique_ptr<OperatorBase> op_;
Scope *scope_;
......
......@@ -31,7 +31,7 @@ FetchOpHandle::~FetchOpHandle() {
}
}
void FetchOpHandle::Wait(platform::DeviceContext *waited_dev) {
void FetchOpHandle::RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) {
PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error");
}
......@@ -45,12 +45,8 @@ void FetchOpHandle::WaitAndMergeCPUTensors() const {
}
void FetchOpHandle::RunImpl() {
auto cpu_ctx =
platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
for (auto *input : inputs_) {
auto *var = static_cast<VarHandle *>(input);
var->generated_op_->Wait(cpu_ctx);
}
WaitInputVarGenerated(platform::CPUPlace());
tensors_.resize(inputs_.size());
auto *var_handle = static_cast<VarHandle *>(inputs_[0]);
auto &var_name = var_handle->name_;
......@@ -77,6 +73,15 @@ void FetchOpHandle::RunImpl() {
this->WaitAndMergeCPUTensors();
}
void FetchOpHandle::WaitInputVarGenerated(const platform::Place &place) {
auto cpu_ctx = platform::DeviceContextPool::Instance().Get(place);
for (auto *input : inputs_) {
if (input->generated_op_) {
input->generated_op_->RecordWaitEventOnCtx(cpu_ctx);
}
}
}
std::string FetchOpHandle::Name() const { return "Fetch"; }
} // namespace details
......
......@@ -33,7 +33,7 @@ struct FetchOpHandle : public OpHandleBase {
~FetchOpHandle();
void Wait(platform::DeviceContext *waited_dev) override;
void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) override;
void WaitAndMergeCPUTensors() const;
......@@ -42,6 +42,8 @@ struct FetchOpHandle : public OpHandleBase {
protected:
void RunImpl() override;
void WaitInputVarGenerated(const platform::Place &place) override;
private:
FeedFetchList *data_;
size_t offset_;
......
......@@ -55,7 +55,7 @@ void GatherOpHandle::RunImpl() {
"Currently, gather_op only can gather SelectedRows.");
// Wait input done, this Wait is asynchronous operation
WaitInputVarGenerated(in_var_handles);
WaitInputVarGenerated();
auto &pre_in_value = pre_in_var->Get<framework::SelectedRows>();
std::vector<int64_t> out_rows;
......@@ -111,17 +111,6 @@ void GatherOpHandle::RunImpl() {
});
}
void GatherOpHandle::WaitInputVarGenerated(
const std::vector<VarHandle *> &in_var_handles) {
for (auto *in : in_var_handles) {
if (in->generated_op_) {
for (auto pair : dev_ctxes_) {
in->generated_op_->Wait(pair.second);
}
}
}
}
std::string GatherOpHandle::Name() const { return "gather"; }
} // namespace details
} // namespace framework
......
......@@ -39,7 +39,6 @@ struct GatherOpHandle : public OpHandleBase {
protected:
void RunImpl() override;
void WaitInputVarGenerated(const std::vector<VarHandle *> &in_var_handles);
private:
const std::vector<Scope *> &local_scopes_;
......
......@@ -37,20 +37,26 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale)
platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale,
bool balance_parameter_opt_between_cards)
: loss_var_name_(loss_var_name),
places_(places),
local_scopes_(local_scopes),
nccl_ctxs_(nccl_ctxs) {
nccl_ctxs_(nccl_ctxs),
balance_parameter_opt_between_cards_(
balance_parameter_opt_between_cards) {
#else
MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
const std::vector<platform::Place> &places,
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes, bool use_default_grad_scale)
const std::vector<Scope *> &local_scopes, bool use_default_grad_scale,
bool balance_parameter_opt_between_cards)
: loss_var_name_(loss_var_name),
places_(places),
local_scopes_(local_scopes) {
local_scopes_(local_scopes),
balance_parameter_opt_between_cards_(
balance_parameter_opt_between_cards) {
#endif
for (auto &p : params) {
grad_names_.insert(GradVarName(p));
......@@ -124,6 +130,12 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
// Find "send" op first for split is in front of send.
OpDesc *send_op = GetSendOpDesc(program);
size_t cur_device_id = 0;
std::vector<std::unordered_set<std::string>> var_name_on_devices;
std::vector<std::unordered_set<std::string>> bcast_var_name_set;
var_name_on_devices.resize(places_.size());
bcast_var_name_set.resize(places_.size());
bool is_forwarding = true;
for (auto *op : program.Block(0).AllOps()) {
if (op->Type() == "send") {
......@@ -139,17 +151,33 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
}
is_forwarding = false;
} else {
CreateComputationalOps(&result, *op, places_.size());
int op_dev_id = GetOpDeviceID(var_name_on_devices, *op);
if (op_dev_id == -1) { // var on all device
CreateComputationalOps(&result, *op, places_.size());
} else {
CreateComputationalOp(&result, *op, op_dev_id);
for (auto &var_name : op->OutputArgumentNames()) {
var_name_on_devices[op_dev_id].emplace(var_name);
}
}
if (!is_forwarding && places_.size() > 1) {
// Currently, we assume that once gradient is generated, it can be
// broadcast, and each gradient is only broadcast once.
for (auto &og : op->OutputArgumentNames()) {
if (IsParameterGradientOnce(og, &og_has_been_broadcast)) {
if (IsSparseGradient(var_types, og)) {
CreateReduceOp(&result, og, 0);
CreateBroadcastOp(&result, og, 0);
if (balance_parameter_opt_between_cards_) {
CreateReduceOp(&result, og, cur_device_id);
var_name_on_devices[cur_device_id].emplace(og);
bcast_var_name_set[cur_device_id].emplace(
og.substr(0, og.size() - strlen(kGradVarSuffix)));
cur_device_id = (cur_device_id + 1) % places_.size();
} else {
InsertNCCLAllReduceOp(&result, og);
if (IsSparseGradient(var_types, og)) {
CreateReduceOp(&result, og, 0);
CreateBroadcastOp(&result, og, 0);
} else {
InsertNCCLAllReduceOp(&result, og);
}
}
}
}
......@@ -157,6 +185,13 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
}
}
// Insert BCast Ops
for (size_t dev_id = 0; dev_id < bcast_var_name_set.size(); ++dev_id) {
auto &to_bcast_set = bcast_var_name_set[dev_id];
for (auto &bcast_name : to_bcast_set) {
CreateBroadcastOp(&result, bcast_name, dev_id);
}
}
/*
Dependency graph has been constructed. However, there are still data
harzaeds need to be handled.
......@@ -265,6 +300,26 @@ bool MultiDevSSAGraphBuilder::IsParameterGradientOnce(
return is_pg_once;
}
int MultiDevSSAGraphBuilder::GetOpDeviceID(
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
const OpDesc &op) const {
if (!balance_parameter_opt_between_cards_) {
return -1;
}
int var_dev_id = -1;
for (auto &var_name : op.InputArgumentNames()) {
if (var_dev_id != -1) break;
for (size_t i = 0; i < var_name_on_devices.size(); ++i) {
if (var_name_on_devices[i].count(var_name)) {
var_dev_id = static_cast<int>(i);
break;
}
}
}
return var_dev_id;
}
void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(SSAGraph *result) const {
for (size_t i = 0; i < places_.size(); ++i) {
// Insert ScaleCost OpHandle
......
......@@ -36,13 +36,15 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
platform::NCCLContextMap *nccl_ctxs,
bool use_default_grad_scale);
bool use_default_grad_scale,
bool balance_parameter_opt_between_cards);
#else
MultiDevSSAGraphBuilder(const std::vector<platform::Place> &places,
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
bool use_default_grad_scale);
bool use_default_grad_scale,
bool balance_parameter_opt_between_cards);
#endif
std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override;
......@@ -60,6 +62,7 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
#ifdef PADDLE_WITH_CUDA
platform::NCCLContextMap *nccl_ctxs_;
#endif
bool balance_parameter_opt_between_cards_;
bool use_default_grad_scale_;
bool IsScaleLossOp(const OpDesc &op) const;
......@@ -84,6 +87,10 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const std::string &og,
std::unordered_set<std::string> *og_has_been_broadcast) const;
int GetOpDeviceID(
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
const OpDesc &op) const;
void InsertNCCLAllReduceOp(SSAGraph *result, const std::string &og) const;
void CreateBroadcastOp(SSAGraph *result, const std::string &p_name,
......
......@@ -34,10 +34,7 @@ void NCCLAllReduceOpHandle::RunImpl() {
return; // No need to all reduce when GPU count = 1;
} else {
// Wait input done
for (auto *in : inputs_) {
auto &p = static_cast<VarHandle *>(in)->place_;
in->generated_op_->Wait(dev_ctxes_[p]);
}
WaitInputVarGenerated();
auto &var_name = static_cast<VarHandle *>(this->inputs_[0])->name_;
int dtype = -1;
......
......@@ -56,15 +56,15 @@ void OpHandleBase::Run(bool use_event) {
RunImpl();
}
void OpHandleBase::Wait(platform::DeviceContext *waited_dev) {
void OpHandleBase::RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) {
#ifdef PADDLE_WITH_CUDA
if (platform::is_cpu_place(waited_dev->GetPlace()) || events_.empty()) {
if (platform::is_cpu_place(waited_ctx->GetPlace()) || events_.empty()) {
for (auto &dev_ctx : dev_ctxes_) {
dev_ctx.second->Wait();
}
} else {
auto stream =
static_cast<platform::CUDADeviceContext *>(waited_dev)->stream();
static_cast<platform::CUDADeviceContext *>(waited_ctx)->stream();
for (auto &ev : events_) {
PADDLE_ENFORCE(cudaStreamWaitEvent(stream, ev.second, 0));
}
......@@ -86,6 +86,28 @@ void OpHandleBase::AddOutput(VarHandleBase *out) {
out->generated_op_ = this;
}
void OpHandleBase::WaitInputVarGenerated() {
for (auto in_var : inputs_) {
if (NeedWait(in_var)) {
for (auto &pair : dev_ctxes_) {
in_var->generated_op_->RecordWaitEventOnCtx(pair.second);
}
}
}
}
void OpHandleBase::WaitInputVarGenerated(const platform::Place &place) {
for (auto *in : inputs_) {
if (NeedWait(in)) {
in->generated_op_->RecordWaitEventOnCtx(dev_ctxes_[place]);
}
}
}
bool OpHandleBase::NeedWait(VarHandleBase *in_var) {
return in_var && in_var->generated_op_;
}
void OpHandleBase::RunAndRecordEvent(const std::function<void()> &callback) {
#ifdef PADDLE_WITH_CUDA
if (!events_.empty()) { // Use event
......
......@@ -38,12 +38,24 @@ class OpHandleBase {
void Run(bool use_event);
virtual void Wait(platform::DeviceContext *waited_dev);
virtual void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx);
void AddInput(VarHandleBase *in);
void AddOutput(VarHandleBase *out);
// This method adds the wait events of all the input on all the device
// context.
// NODE: This Wait is asynchronous operation.
virtual void WaitInputVarGenerated();
// This method adds the wait events of all the input on the specified device
// context.
// NODE: This Wait is asynchronous operation.
virtual void WaitInputVarGenerated(const platform::Place &place);
virtual bool NeedWait(VarHandleBase *in_var);
// If the Op involves data transfer of multiple devices that
// will likely block other computations.
virtual bool IsMultiDeviceTransfer() { return false; }
......
......@@ -95,7 +95,10 @@ struct OpInfoFiller<T, kOpProtoAndCheckerMaker> {
void operator()(const char* op_type, OpInfo* info) const {
info->proto_ = new proto::OpProto;
info->checker_ = new OpAttrChecker();
auto maker = T(info->proto_, info->checker_);
T maker;
maker.SetProto(info->proto_);
maker.SetChecker(info->checker_);
maker.Make();
maker.Validate();
info->proto_->set_type(op_type);
PADDLE_ENFORCE(
......
......@@ -51,7 +51,7 @@ void ReduceOpHandle::RunImpl() {
PADDLE_ENFORCE_NOT_NULL(pre_in_var);
// Wait input done, this Wait is asynchronous operation
WaitInputVarGenerated(in_var_handles);
WaitInputVarGenerated();
// NOTE: The Places of all input tensor must be all on CPU or all on GPU.
std::vector<platform::Place> in_places; // used to get dev_ctx
......@@ -80,19 +80,21 @@ void ReduceOpHandle::RunImpl() {
}
if (pre_in_var->IsType<framework::SelectedRows>()) {
std::vector<const SelectedRows *> in_selected_rows =
GetInputValues<SelectedRows>(in_var_handles, var_scopes);
GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, t_out_p,
out_var->GetMutable<framework::SelectedRows>());
this->RunAndRecordEvent([&] {
std::vector<const SelectedRows *> in_selected_rows =
GetInputValues<SelectedRows>(in_var_handles, var_scopes);
GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, t_out_p,
out_var->GetMutable<framework::SelectedRows>());
});
} else {
std::vector<const LoDTensor *> lod_tensors =
GetInputValues<LoDTensor>(in_var_handles, var_scopes);
if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) {
ReduceLoDTensor func(lod_tensors,
out_var->GetMutable<framework::LoDTensor>());
VisitDataType(ToDataType(lod_tensors[0]->type()), func);
this->RunAndRecordEvent([&] {
ReduceLoDTensor func(lod_tensors,
out_var->GetMutable<framework::LoDTensor>());
VisitDataType(ToDataType(lod_tensors[0]->type()), func);
});
} else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) {
#ifdef PADDLE_WITH_CUDA
auto pre_in = pre_in_var->Get<framework::LoDTensor>();
......@@ -157,17 +159,6 @@ std::vector<const T *> ReduceOpHandle::GetInputValues(
return in_selected_rows;
}
void ReduceOpHandle::WaitInputVarGenerated(
const std::vector<VarHandle *> &in_var_handles) {
for (auto *in : in_var_handles) {
if (in->generated_op_) {
for (auto pair : dev_ctxes_) {
in->generated_op_->Wait(pair.second);
}
}
}
}
std::string ReduceOpHandle::Name() const { return "reduce"; }
} // namespace details
} // namespace framework
......
......@@ -60,8 +60,6 @@ struct ReduceOpHandle : public OpHandleBase {
protected:
void RunImpl() override;
void WaitInputVarGenerated(const std::vector<VarHandle *> &in_var_handles);
template <typename T>
std::vector<const T *> GetInputValues(
const std::vector<VarHandle *> &in_var_handles,
......
......@@ -29,6 +29,7 @@ ScaleLossGradOpHandle::ScaleLossGradOpHandle(size_t num_dev, Scope *scope,
ScaleLossGradOpHandle::~ScaleLossGradOpHandle() {}
void ScaleLossGradOpHandle::RunImpl() {
// Doesn't wait any event
std::string var_name = static_cast<VarHandle *>(this->outputs_[0])->name_;
auto &local_scope = *scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
......
......@@ -26,13 +26,16 @@ SendOpHandle::SendOpHandle(const framework::OpDesc &op_desc,
place_(place) {}
void SendOpHandle::RunImpl() {
// TODO(wuyi): need further analysis whether wait VarDummyHandle.
// Wait input done
for (auto *in : inputs_) {
auto &p = static_cast<VarHandle *>(in)->place_;
if (in->DebugString() == "dummy") { // HACK
continue;
}
in->generated_op_->Wait(dev_ctxes_[p]);
if (in->generated_op_) {
in->generated_op_->RecordWaitEventOnCtx(dev_ctxes_[p]);
}
}
auto &tmp_scope = local_scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
// FIXME(wuyi): can not use RunAndRecordEvent here, for it will cause dead
......
......@@ -14,8 +14,6 @@
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/fetch_op_handle.h"
namespace paddle {
namespace framework {
namespace details {
......@@ -45,73 +43,33 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
// Should revisit it if overlapping is available.
std::unordered_set<OpHandleBase *> delayed_ops;
auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) {
pending_vars.insert(&var);
if (var.generated_op_ == nullptr) {
ready_vars.Push(&var);
}
};
auto InsertPendingOp = [&pending_ops](OpHandleBase &op_instance) {
pending_ops.insert({&op_instance, op_instance.Inputs().size()});
};
// Transform SSAGraph to pending_ops & pending_vars
for (auto &var_map : graph_->vars_) {
for (auto &name_pair : var_map) {
for (auto &version_pair : name_pair.second) {
InsertPendingVar(*version_pair);
InsertPendingVar(&pending_vars, &ready_vars, version_pair.get());
}
}
}
for (auto &var : graph_->dep_vars_) {
InsertPendingVar(*var);
InsertPendingVar(&pending_vars, &ready_vars, var.get());
}
for (auto &op : graph_->ops_) {
if (op->Inputs().empty()) { // Special case, Op has no input.
ready_ops.insert(op.get());
} else {
InsertPendingOp(*op);
InsertPendingOp(&pending_ops, op.get());
}
}
// Step 2. Insert FetchOps
std::vector<std::unique_ptr<FetchOpHandle>> fetch_ops;
FeedFetchList fetch_data(fetch_tensors.size());
std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;
for (auto &fetch_var_name : fetch_tensors) {
for (auto &var_map : graph_->vars_) {
auto it = var_map.find(fetch_var_name);
if (it != var_map.end()) {
fetched_vars[fetch_var_name].push_back(it->second.rbegin()->get());
}
}
}
std::unordered_set<std::unique_ptr<VarHandleBase>> fetch_dependencies;
for (size_t i = 0; i < fetch_tensors.size(); ++i) {
auto &var_name = fetch_tensors[i];
auto &vars = fetched_vars.at(var_name);
auto *op = new FetchOpHandle(&fetch_data, i, &local_scopes_);
fetch_ops.emplace_back(op);
for (auto &p : places_) {
op->SetDeviceContext(p, fetch_ctxs_.Get(p));
}
for (auto *var : vars) {
op->AddInput(var);
}
FeedFetchList fetch_data(fetch_tensors.size());
auto *fetch_dummy = new DummyVarHandle();
op->AddOutput(fetch_dummy);
fetch_dependencies.emplace(fetch_dummy);
InsertPendingVar(*fetch_dummy);
InsertPendingOp(*op);
}
InsertFetchOps(fetch_tensors, &fetch_ops, &fetch_dependencies, &pending_ops,
&pending_vars, &ready_vars, &fetch_data);
auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) {
for (auto *op : set) {
......@@ -174,6 +132,60 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
return fetch_data;
}
void ThreadedSSAGraphExecutor::InsertFetchOps(
const std::vector<std::string> &fetch_tensors,
std::vector<std::unique_ptr<FetchOpHandle>> *fetch_ops,
std::unordered_set<std::unique_ptr<VarHandleBase>> *fetch_dependencies,
std::unordered_map<OpHandleBase *, size_t> *pending_ops,
std::unordered_set<VarHandleBase *> *pending_vars,
BlockingQueue<VarHandleBase *> *ready_vars, FeedFetchList *fetch_data) {
std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;
for (auto &fetch_var_name : fetch_tensors) {
for (auto &var_map : graph_->vars_) {
auto it = var_map.find(fetch_var_name);
if (it != var_map.end()) {
fetched_vars[fetch_var_name].push_back(it->second.rbegin()->get());
}
}
}
for (size_t i = 0; i < fetch_tensors.size(); ++i) {
auto &var_name = fetch_tensors[i];
auto &vars = fetched_vars.at(var_name);
auto *op = new FetchOpHandle(fetch_data, i, &local_scopes_);
fetch_ops->emplace_back(op);
for (auto &p : places_) {
op->SetDeviceContext(p, fetch_ctxs_.Get(p));
}
for (auto *var : vars) {
op->AddInput(var);
}
auto *fetch_dummy = new DummyVarHandle();
op->AddOutput(fetch_dummy);
fetch_dependencies->emplace(fetch_dummy);
this->InsertPendingVar(pending_vars, ready_vars, fetch_dummy);
this->InsertPendingOp(pending_ops, op);
}
}
void ThreadedSSAGraphExecutor::InsertPendingOp(
std::unordered_map<OpHandleBase *, size_t> *pending_ops,
OpHandleBase *op_instance) const {
pending_ops->insert({op_instance, op_instance->Inputs().size()});
}
void ThreadedSSAGraphExecutor::InsertPendingVar(
std::unordered_set<VarHandleBase *> *pending_vars,
BlockingQueue<VarHandleBase *> *ready_vars, VarHandleBase *var) const {
pending_vars->insert(var);
if (var->generated_op_ == nullptr) {
ready_vars->Push(var);
}
}
void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
auto op_run = [ready_var_q, op, this] {
......
......@@ -23,6 +23,7 @@
#include <functional>
#include "ThreadPool.h" // ThreadPool in thrird party
#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/details/fetch_op_handle.h"
#include "paddle/fluid/framework/details/ssa_graph_executor.h"
namespace paddle {
......@@ -58,6 +59,21 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::unique_ptr<platform::EnforceNotMet> exception_;
std::atomic<int> running_ops_;
bool allow_op_delay_;
void InsertPendingOp(std::unordered_map<OpHandleBase *, size_t> *pending_ops,
OpHandleBase *op_instance) const;
void InsertPendingVar(std::unordered_set<VarHandleBase *> *pending_vars,
BlockingQueue<VarHandleBase *> *ready_vars,
VarHandleBase *var) const;
void InsertFetchOps(
const std::vector<std::string> &fetch_tensors,
std::vector<std::unique_ptr<FetchOpHandle>> *fetch_ops,
std::unordered_set<std::unique_ptr<VarHandleBase>> *fetch_dependencies,
std::unordered_map<OpHandleBase *, size_t> *pending_ops,
std::unordered_set<VarHandleBase *> *pending_vars,
BlockingQueue<VarHandleBase *> *ready_vars, FeedFetchList *fetch_data);
};
} // namespace details
......
......@@ -14,56 +14,57 @@ limitations under the License. */
#pragma once
#include <string>
#include "glog/logging.h"
#include "paddle/fluid/framework/attribute.h"
#include "paddle/fluid/framework/framework.pb.h"
namespace paddle {
namespace framework {
// this class not only make proto but also init attribute checkers.
class OpProtoAndCheckerMaker {
public:
using OpProto = proto::OpProto;
using OpAttrChecker = framework::OpAttrChecker;
OpProtoAndCheckerMaker(OpProto* proto, OpAttrChecker* op_checker)
: proto_(proto), op_checker_(op_checker) {}
virtual void Make() = 0;
virtual ~OpProtoAndCheckerMaker() {
PADDLE_ENFORCE(validated_, "should call Validate after build");
CHECK(validated_) << "should call Validate after build";
}
void SetProto(proto::OpProto *proto) { proto_ = proto; }
void SetChecker(OpAttrChecker *attr_checker) { op_checker_ = attr_checker; }
void Validate();
protected:
struct VariableBuilder {
OpProto::Var* var_;
proto::OpProto::Var *var_;
VariableBuilder& AsDuplicable() {
VariableBuilder &AsDuplicable() {
var_->set_duplicable(true);
return *this;
}
VariableBuilder& AsIntermediate() {
VariableBuilder &AsIntermediate() {
var_->set_intermediate(true);
return *this;
}
VariableBuilder& AsDispensable() {
VariableBuilder &AsDispensable() {
var_->set_dispensable(true);
return *this;
}
};
VariableBuilder AddInput(const std::string& name, const std::string& comment);
VariableBuilder AddInput(const std::string &name, const std::string &comment);
VariableBuilder AddOutput(const std::string& name,
const std::string& comment);
VariableBuilder AddOutput(const std::string &name,
const std::string &comment);
template <typename T>
TypedAttrChecker<T>& AddAttr(const std::string& name,
const std::string& comment,
TypedAttrChecker<T> &AddAttr(const std::string &name,
const std::string &comment,
bool generated = false) {
auto* attr = proto_->add_attrs();
auto *attr = proto_->add_attrs();
attr->set_name(name);
attr->set_comment(comment);
attr->set_generated(generated);
......@@ -71,21 +72,14 @@ class OpProtoAndCheckerMaker {
return op_checker_->AddAttrChecker<T>(name);
}
void AddComment(const std::string& comment) { proto_->set_comment(comment); }
void AddComment(const std::string &comment) { proto_->set_comment(comment); }
private:
void CheckNoDuplicatedInOutAttrs();
OpProto* proto_;
OpAttrChecker* op_checker_;
proto::OpProto *proto_;
OpAttrChecker *op_checker_;
bool validated_{false};
};
class NOPMaker : public OpProtoAndCheckerMaker {
public:
NOPMaker(OpProto* proto, framework::OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {}
};
} // namespace framework
} // namespace paddle
......@@ -18,9 +18,7 @@ limitations under the License. */
class TestAttrProtoMaker : public paddle::framework::OpProtoAndCheckerMaker {
public:
TestAttrProtoMaker(paddle::framework::proto::OpProto* proto,
paddle::framework::OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
void Make() {
AddAttr<float>("scale", "scale of test op");
AddAttr<float>("scale", "scale of test op");
}
......@@ -29,15 +27,16 @@ class TestAttrProtoMaker : public paddle::framework::OpProtoAndCheckerMaker {
TEST(ProtoMaker, DuplicatedAttr) {
paddle::framework::proto::OpProto op_proto;
paddle::framework::OpAttrChecker op_checker;
auto proto_maker = TestAttrProtoMaker(&op_proto, &op_checker);
TestAttrProtoMaker proto_maker;
proto_maker.SetProto(&op_proto);
proto_maker.SetChecker(&op_checker);
proto_maker.Make();
ASSERT_THROW(proto_maker.Validate(), paddle::platform::EnforceNotMet);
}
class TestInOutProtoMaker : public paddle::framework::OpProtoAndCheckerMaker {
public:
TestInOutProtoMaker(paddle::framework::proto::OpProto* proto,
paddle::framework::OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
void Make() {
AddInput("input", "input of test op");
AddInput("input", "input of test op");
}
......@@ -46,6 +45,9 @@ class TestInOutProtoMaker : public paddle::framework::OpProtoAndCheckerMaker {
TEST(ProtoMaker, DuplicatedInOut) {
paddle::framework::proto::OpProto op_proto;
paddle::framework::OpAttrChecker op_checker;
auto proto_maker = TestInOutProtoMaker(&op_proto, &op_checker);
TestAttrProtoMaker proto_maker;
proto_maker.SetProto(&op_proto);
proto_maker.SetChecker(&op_checker);
proto_maker.Make();
ASSERT_THROW(proto_maker.Validate(), paddle::platform::EnforceNotMet);
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册