提交 1aada352 编写于 作者: Y Yancey1989

Merge branch 'develop' of github.com:PaddlePaddle/Paddle into random_selected_rows_value

...@@ -139,9 +139,6 @@ def run_benchmark(model, args): ...@@ -139,9 +139,6 @@ def run_benchmark(model, args):
# inference program # inference program
inference_program = fluid.default_main_program().clone() inference_program = fluid.default_main_program().clone()
with fluid.program_guard(inference_program):
inference_program = fluid.io.get_inference_program(
target_vars=[batch_acc, batch_size_tensor])
# Optimization # Optimization
opt = fluid.optimizer.AdamOptimizer( opt = fluid.optimizer.AdamOptimizer(
...@@ -161,7 +158,7 @@ def run_benchmark(model, args): ...@@ -161,7 +158,7 @@ def run_benchmark(model, args):
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=args.batch_size) paddle.dataset.mnist.train(), batch_size=args.batch_size)
accuracy = fluid.average.WeightedAverage() accuracy = fluid.metrics.Accuracy()
iters, num_samples, start_time = 0, 0, time.time() iters, num_samples, start_time = 0, 0, time.time()
for pass_id in range(args.pass_num): for pass_id in range(args.pass_num):
accuracy.reset() accuracy.reset()
...@@ -184,7 +181,7 @@ def run_benchmark(model, args): ...@@ -184,7 +181,7 @@ def run_benchmark(model, args):
"label": y_data}, "label": y_data},
fetch_list=[avg_cost, batch_acc, batch_size_tensor] fetch_list=[avg_cost, batch_acc, batch_size_tensor]
) # The accuracy is the accumulation of batches, but not the current batch. ) # The accuracy is the accumulation of batches, but not the current batch.
accuracy.add(value=outs[1], weight=outs[2]) accuracy.update(value=outs[1], weight=outs[2])
iters += 1 iters += 1
num_samples += len(y_data) num_samples += len(y_data)
loss = np.array(outs[0]) loss = np.array(outs[0])
......
## Install and Build ## Install and Build
TBD ### Download & Install
Download the latest C-API development package from CI system and install. You can find the required version in the table below:
<table>
<thead>
<tr>
<th>Version Tips</th>
<th>C-API</th>
</tr>
</thead>
<tbody>
<tr>
<td>cpu_avx_mkl</td>
<td><a href="https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_CpuAvxCp27cp27mu/.lastSuccessful/paddle.tgz" rel="nofollow">paddle.tgz</a></td>
</tr>
<tr>
<td>cpu_avx_openblas</td>
<td>-</td>
</tr>
<tr>
<td>cpu_noavx_openblas</td>
<td><a href="https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_CpuNoavxOpenblas/.lastSuccessful/paddle.tgz" rel="nofollow">paddle.tgz</a></td>
</tr>
<tr>
<td>cuda7.5_cudnn5_avx_mkl</td>
<td><a href="https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda75cudnn5cp27cp27mu/.lastSuccessful/paddle.tgz" rel="nofollow">paddle.tgz</a></td>
</tr>
<tr>
<td>cuda8.0_cudnn5_avx_mkl</td>
<td><a href="https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda80cudnn5cp27cp27mu/.lastSuccessful/paddle.tgz" rel="nofollow">paddle.tgz</a></td>
</tr>
<tr>
<td>cuda8.0_cudnn7_avx_mkl</td>
<td><a href="https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda8cudnn7cp27cp27mu/.lastSuccessful/paddle.tgz" rel="nofollow">paddle.tgz</a></td>
</tr></tbody></table>
### From source
Users can also compile the C-API library from PaddlePaddle source code by compiling with the following compilation options:
<table>
<thead>
<tr>
<th>Options</th>
<th>Value</th>
</tr>
</thead>
<tbody>
<tr>
<td>WITH_C_API</td>
<td>ON</td>
</tr>
<tr>
<td>WITH_PYTHON</td>
<td>OFF(recommended)</td>
</tr>
<tr>
<td>WITH_SWIG_PY</td>
<td>OFF(recommended)</td>
</tr>
<tr>
<td>WITH_GOLANG</td>
<td>OFF(recommended)</td>
</tr>
<tr>
<td>WITH_GPU</td>
<td>ON/OFF</td>
</tr>
<tr>
<td>WITH_MKL</td>
<td>ON/OFF</td>
</tr></tbody></table>
It is best to set up with recommended values to avoid linking with unnecessary libraries. Set other compilation options as you need.
Pull the latest following code snippet from github, and configure compilation options(replace PADDLE_ROOT with the installation path of the PaddlePaddle C-API inference library):
```shell
PADDLE_ROOT=/path/of/capi
git clone https://github.com/PaddlePaddle/Paddle.git
cd Paddle
mkdir build
cd build
cmake -DCMAKE_INSTALL_PREFIX=$PADDLE_ROOT \
-DCMAKE_BUILD_TYPE=Release \
-DWITH_C_API=ON \
-DWITH_SWIG_PY=OFF \
-DWITH_GOLANG=OFF \
-DWITH_PYTHON=OFF \
-DWITH_MKL=OFF \
-DWITH_GPU=OFF \
..
```
After running the above code to generate Makefile , run: `make && make install`. After successful compilation, the dependencies required by C-API(includes: (1)PaddlePaddle inference library and header files; (2) Third-party libraries and header files) will be stored in the `PADDLE_ROOT` directory.
If the compilation is successful, see the following directory structure under `PADDLE_ROOT`(includes PaddlePaddle header files and libraries, and third-party libraries and header files(determined by the link methods if necessary)):
```text
├── include
│   └── paddle
│   ├── arguments.h
│   ├── capi.h
│   ├── capi_private.h
│   ├── config.h
│   ├── error.h
│   ├── gradient_machine.h
│   ├── main.h
│   ├── matrix.h
│   ├── paddle_capi.map
│   └── vector.h
├── lib
│   ├── libpaddle_capi_engine.a
│   ├── libpaddle_capi_layers.a
│   ├── libpaddle_capi_shared.so
│   └── libpaddle_capi_whole.a
└── third_party
├── gflags
│   ├── include
│   │   └── gflags
│   │   ├── gflags_completions.h
│   │   ├── gflags_declare.h
│   │   ...
│   └── lib
│   └── libgflags.a
├── glog
│   ├── include
│   │   └── glog
│   │   ├── config.h
│   │   ...
│   └── lib
│   └── libglog.a
├── openblas
│   ├── include
│   │   ├── cblas.h
│   │   ...
│   └── lib
│   ...
├── protobuf
│   ├── include
│   │   └── google
│   │   └── protobuf
│   │   ...
│   └── lib
│   └── libprotobuf-lite.a
└── zlib
├── include
│   ...
└── lib
...
```
### Linking Description:
There are three kinds of linking methods:
1. Linking with dynamic library `libpaddle_capi_shared.so`(This way is much more convenient and easier, **Without special requirements, it is recommended**), refer to the following:
1. Compiling with CPU version and using `OpenBLAS`; only need to link one library named `libpaddle_capi_shared.so` to develop prediction program through C-API.
1. Compiling with CPU version and using `MKL` lib, you need to link MKL library directly to develop prediction program through PaddlePaddle C-API, due to `MKL` has its own dynamic library.
1. Compiling with GPU version, CUDA library will be loaded dynamically on prediction program run-time, and also set CUDA library to  `LD_LIBRARY_PATH` environment variable.
2. Linking with static library `libpaddle_capi_whole.a`,refer to the following:
1. Specify `-Wl,--whole-archive` linking options.
1. Explicitly link third-party libraries such as `gflags``glog``libz``protobuf` .etc, you can find them under `PADDLE_ROOT/third_party` directory.
1. Use OpenBLAS library if compiling C-API,must explicitly link `libopenblas.a`.
1. Use MKL when compiling C-API, must explicitly link MKL dynamic library.
3. Linking with static library `libpaddle_capi_layers.a` and `libpaddle_capi_engine.a`,refer to the following:
1. This linking methods is mainly used for mobile prediction.
1. Split `libpaddle_capi_whole.a` into two static linking library at least to reduce the size of linking libraries.
1. Specify `-Wl,--whole-archive -lpaddle_capi_layers`  and `-Wl,--no-whole-archive -lpaddle_capi_engine` for linking.
1. The third-party dependencies need explicitly link same as method 2 above.
# Kubernetes Distributed # Distributed Training on Kubernetes
TBD We introduced how to create a PaddlePaddle Job with a single node on Kuberentes in the
previous document.
In this article, we will introduce how to create a PaddlePaddle job with multiple nodes
on Kubernetes cluster.
## Overall Architecture
Before creating a training job, the users need to slice the training data and deploy
the Python scripts along with it into the distributed file system
(We can use the different type of Kuberentes Volumes to mount different distributed
file systems). Before training starts, The program will copy the training data into the
Container and also save the models at the same path during training. The global architecture
is as follows:
![PaddlePaddle on Kubernetes Architecture](src/k8s-paddle-arch.png)
The above figure describes a distributed training architecture which contains 3 nodes, each
Pod mounts a folder of the distributed file system to save training data and models
by Kubernetes Volume. Kubernetes created 3 Pods for this training phase and scheduled these on
3 nodes, each Pod has a PaddlePaddle container. After the containers car created,
PaddlePaddle starts up the communication between PServer and Trainer and read training
data for this training job.
As the description above, we can start up a PaddlePaddle distributed training job on a
Kubernetes ready cluster with the following steps:
1. [Build PaddlePaddle Docker Image](#Build a Docker Image)
1. [Split training data and upload to the distributed file system](#Upload Training Data)
1. [Edit a YAML file and create a Kubernetes Job](#Create a Job)
1. [Check the output](#Check The Output)
We will introduce these steps as follows:
### Build a Docker Image
Training docker image needs to package the paddle pserver and paddle trainer runtimes, as well as two more processes before we can kick off the training:
- Copying the training data into container.
- Generating the initialization arguments for `Paddle PServer` and `Paddle Training` processes.
Since the paddlepaddle official docker image already has the runtimes we need, we'll take it as the base image and pack some additional scripts for the processes mentioned above to build our training image. for more detail, please find from the following link:
- https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/usage/cluster/src/k8s_train/Dockerfile
```bash
$ cd doc/howto/usage/k8s/src/k8s_train
$ docker build -t [YOUR_REPO]/paddle:mypaddle .
```
And then upload the new Docker Image to a Docker hub:
```bash
docker push [YOUR_REPO]/paddle:mypaddle
```
**[NOTE]**, in the above command arguments, `[YOUR_REPO]` represents your Docker repository,
you need to use your repository instead of it. We will replace it with your respository name to
represent the Docker Image which built in this step.
### Prepare Training Data
We can download and split the training job by creating a Kubernetes Job, or custom your image
by editing [k8s_train](./src/k8s_train/).
Before creating a Job, we need to bind a [persistenVolumeClaim](https://kubernetes.io/docs/user-guide/persistent-volumes) by the different type of
the different file system, the generated dataset would be saved on this volume.
```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: paddle-data
spec:
template:
metadata:
name: pi
spec:
hostNetwork: true
containers:
- name: paddle-data
image: paddlepaddle/paddle-tutorial:k8s_data
imagePullPolicy: Always
volumeMounts:
- mountPath: "/mnt"
name: nfs
env:
- name: OUT_DIR
value: /home/work/mfs/paddle-cluster-job
- name: SPLIT_COUNT
value: "3"
volumes:
- name: nfs
persistentVolumeClaim:
claimName: mfs
restartPolicy: Never
```
Create the Job with the following command:
```bash
> kubectl create -f xxx.yaml
```
If created successfully, you can see some information like this:
```base
[root@paddle-kubernetes-node0 nfsdir]$ tree -d
.
`-- paddle-cluster-job
|-- 0
| `-- data
|-- 1
| `-- data
|-- 2
| `-- data
|-- output
|-- quick_start
```
The `paddle-cluster-job` above is the job name for this training job; we need 3
PaddlePaddle training nodes and save the split training data in `paddle-cluster-job` path,
the folder `0`, `1` and `2` represents the `training_id` on each node, `quick_start` folder is used to store training data, `output` folder is used to store the models and logs.
### Create a Job
Kubernetes allow users to create objects with YAML files, and we can use a command-line tool
to create it.
The Job YAML file describes that which Docker Image would be used in this training job, how much nodes would be created, what's the startup arguments of `Paddle PServer/Trainer` process and what's the type of Volumes. You can find the details of the YAML filed in
[Kubernetes Job API](http://kubernetes.io/docs/api-reference/batch/v1/definitions/#_v1_job).
The following is an example for this training job:
```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: paddle-cluster-job
spec:
parallelism: 3
completions: 3
template:
metadata:
name: paddle-cluster-job
spec:
volumes:
- name: jobpath
hostPath:
path: /home/work/mfs
containers:
- name: trainer
image: [YOUR_REPO]/paddle:mypaddle
command: ["bin/bash", "-c", "/root/start.sh"]
env:
- name: JOB_NAME
value: paddle-cluster-job
- name: JOB_PATH
value: /home/jobpath
- name: JOB_NAMESPACE
value: default
- name: TRAIN_CONFIG_DIR
value: recommendation
- name: CONF_PADDLE_NIC
value: eth0
- name: CONF_PADDLE_PORT
value: "7164"
- name: CONF_PADDLE_PORTS_NUM
value: "2"
- name: CONF_PADDLE_PORTS_NUM_SPARSE
value: "2"
- name: CONF_PADDLE_GRADIENT_NUM
value: "3"
volumeMounts:
- name: jobpath
mountPath: /home/jobpath
restartPolicy: Never
```
In the above YAML file:
- `metadata.name`, The job name.
- `parallelism`, Whether the Kubernetes Job would create `parallelism` Pods at the same time.
- `completions`, The Job would become the success status only when the number of successful Pod(the exit code is 0)
is equal to `completions`.
- `volumeMounts`, the name field `jobpath` is a key, the `mountPath` field represents
the path in the container, and we can define the `jobpath` in `volumes` filed, use `hostPath`
to configure the host path we want to mount.
- `env`, the environment variables in the Container, we pass some startup arguments by
this approach, some details are as following:
- JOB_PATH:the mount path in the container
- JOB_NAME:the job name
- TRAIN_CONFIG_DIR:the job path in the container, we can find the training data path by
combine with JOB_NAME.
- CONF_PADDLE_NIC: the argument `--nics` of `Paddle PServer` process, the network
device name.
- CONF_PADDLE_PORT: the argument `--port` of `Paddle PServer` process.
- CONF_PADDLE_PORTS_NUM: the argument `--ports_num` of `Paddle PServer`, the port number
for dense prameter update.
- CONF_PADDLE_PORTS_NUM_SPARSE:the argument `--ports_num_for_sparse` of `Paddle PServer`,
the port number for sparse parameter update.
- CONF_PADDLE_GRADIENT_NUM:the number of training node, the argument
`--num_gradient_servers` of `Paddle PServer` and `Paddle Trainer`.
You can find some details information at [here]
(http://www.paddlepaddle.org/docs/develop/documentation/zh/howto/usage/cmd_parameter/detail_introduction_cn.html)。
We can use the command-line tool of Kubernetes to create a Job when we finish the YAML file:
```bash
kubectl create -f job.yaml
```
Upon successful creation, Kubernetes would create 3 Pods as PaddlePaddle training node,
pull the Docker image and begin to train.
### Checkout the Output
At the process of training, we can check the logs and the output models which is stored in
the `output` folder.
**NOTE**, `node_0`, `node_1` and `node_2` represent the
`trainer_id` of the PaddlePaddle training job rather than the node id of Kubernetes.
```bash
[root@paddle-kubernetes-node0 output]# tree -d
.
├── node_0
│   ├── server.log
│   └── train.log
├── node_1
│   ├── server.log
│   └── train.log
├── node_2
......
├── pass-00002
│   ├── done
│   ├── ___embedding_0__.w0
│   ├── ___embedding_1__.w0
......
```
We can checkout the status of each training Pod by viewing the logs:
```bash
[root@paddle-kubernetes-node0 node_0]# cat train.log
I1116 09:10:17.123121 50 Util.cpp:155] commandline:
/usr/local/bin/../opt/paddle/bin/paddle_trainer
--nics=eth0 --port=7164
--ports_num=2 --comment=paddle_process_by_paddle
--pservers=192.168.129.66,192.168.223.143,192.168.129.71
--ports_num_for_sparse=2 --config=./trainer_config.py
--trainer_count=4 --num_passes=10 --use_gpu=0
--log_period=50 --dot_period=10 --saving_period=1
--local=0 --trainer_id=0
--save_dir=/home/jobpath/paddle-cluster-job/output
I1116 09:10:17.123440 50 Util.cpp:130] Calling runInitFunctions
I1116 09:10:17.123764 50 Util.cpp:143] Call runInitFunctions done.
[WARNING 2016-11-16 09:10:17,227 default_decorators.py:40] please use keyword arguments in paddle config.
[INFO 2016-11-16 09:10:17,239 networks.py:1282] The input order is [movie_id, title, genres, user_id, gender, age, occupation, rating]
[INFO 2016-11-16 09:10:17,239 networks.py:1289] The output order is [__square_error_cost_0__]
I1116 09:10:17.392917 50 Trainer.cpp:170] trainer mode: Normal
I1116 09:10:17.613910 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process
I1116 09:10:17.680917 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process
I1116 09:10:17.681543 50 GradientMachine.cpp:134] Initing parameters..
I1116 09:10:18.012390 50 GradientMachine.cpp:141] Init parameters done.
I1116 09:10:18.018641 50 ParameterClient2.cpp:122] pserver 0 192.168.129.66:7164
I1116 09:10:18.018950 50 ParameterClient2.cpp:122] pserver 1 192.168.129.66:7165
I1116 09:10:18.019069 50 ParameterClient2.cpp:122] pserver 2 192.168.223.143:7164
I1116 09:10:18.019492 50 ParameterClient2.cpp:122] pserver 3 192.168.223.143:7165
I1116 09:10:18.019716 50 ParameterClient2.cpp:122] pserver 4 192.168.129.71:7164
I1116 09:10:18.019836 50 ParameterClient2.cpp:122] pserver 5 192.168.129.71:7165
```
## Some Additional Details
### Using Environment Variables
Usually we use the environment varialbes to configurate the PaddlePaddle Job which runs in
Kubernetes, `start_paddle.py` provides a start up script to convert the environment variable
to the start up arguments of PaddlePaddle process:
```bash
API = "/api/v1/namespaces/"
JOBSELECTOR = "labelSelector=job-name="
JOB_PATH = os.getenv("JOB_PATH") + "/" + os.getenv("JOB_NAME")
JOB_PATH_OUTPUT = JOB_PATH + "/output"
JOBNAME = os.getenv("JOB_NAME")
NAMESPACE = os.getenv("JOB_NAMESPACE")
PADDLE_NIC = os.getenv("CONF_PADDLE_NIC")
PADDLE_PORT = os.getenv("CONF_PADDLE_PORT")
PADDLE_PORTS_NUM = os.getenv("CONF_PADDLE_PORTS_NUM")
PADDLE_PORTS_NUM_SPARSE = os.getenv("CONF_PADDLE_PORTS_NUM_SPARSE")
PADDLE_SERVER_NUM = os.getenv("CONF_PADDLE_GRADIENT_NUM")
```
### Communication between Pods
At the begin of `start_paddle.py`, it would initializes and parses the arguments.
```python
parser = argparse.ArgumentParser(prog="start_paddle.py",
description='simple tool for k8s')
args, train_args_list = parser.parse_known_args()
train_args = refine_unknown_args(train_args_list)
train_args_dict = dict(zip(train_args[:-1:2], train_args[1::2]))
podlist = getPodList()
```
And then query the status of all the other Pods of this Job by the function `getPodList()`, and fetch `triner_id` by the function `getIdMap(podlist)` if all the Pods status is `RUNNING`.
```python
podlist = getPodList()
# need to wait until all pods are running
while not isPodAllRunning(podlist):
time.sleep(10)
podlist = getPodList()
idMap = getIdMap(podlist)
```
**NOTE**: `getPodList()` would prefetch all the Pods in the current namespace, if some
Pods are alreay running, it may cause some error. We will use [statfulesets](https://kubernetes.io/docs/concepts/abstractions/controllers/statefulsets) instead of
Kubernetes Pod or Replicaset in the future.
The function `getIdMap(podlist)` fetches IPs addresses of `podlist` and then sort them
to generate `trainer_id`.
```python
def getIdMap(podlist):
'''
generate tainer_id by ip
'''
ips = []
for pod in podlist["items"]:
ips.append(pod["status"]["podIP"])
ips.sort()
idMap = {}
for i in range(len(ips)):
idMap[ips[i]] = i
return idMap
```
After getting the `idMap`, we can generate the arguments of `Paddle PServer` and `Paddle Trainer`
so that we can start up them by `startPaddle(idMap, train_args_dict)`.
### Create Job
The main goal of `startPaddle` is generating the arguments of `Paddle PServer` and
`Paddle Trainer` processes. Take `Paddle Trainer` as an example, we parse the
environment variable and then get `PADDLE_NIC`, `PADDLE_PORT`, `PADDLE_PORTS_NUM` and etc...,
finally find `trainerId` from `idMap` according to its IP address.
```python
program = 'paddle train'
args = " --nics=" + PADDLE_NIC
args += " --port=" + str(PADDLE_PORT)
args += " --ports_num=" + str(PADDLE_PORTS_NUM)
args += " --comment=" + "paddle_process_by_paddle"
ip_string = ""
for ip in idMap.keys():
ip_string += (ip + ",")
ip_string = ip_string.rstrip(",")
args += " --pservers=" + ip_string
args_ext = ""
for key, value in train_args_dict.items():
args_ext += (' --' + key + '=' + value)
localIP = socket.gethostbyname(socket.gethostname())
trainerId = idMap[localIP]
args += " " + args_ext + " --trainer_id=" + \
str(trainerId) + " --save_dir=" + JOB_PATH_OUTPUT
```
...@@ -13,11 +13,10 @@ See the License for the specific language governing permissions and ...@@ -13,11 +13,10 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/framework/block_desc.h" #include "paddle/fluid/framework/block_desc.h"
#include <queue>
#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include <queue>
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -147,52 +146,7 @@ void BlockDesc::RemoveOp(size_t s, size_t e) { ...@@ -147,52 +146,7 @@ void BlockDesc::RemoveOp(size_t s, size_t e) {
if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) { if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) {
return; return;
} }
auto get_vars = [](std::deque<std::unique_ptr<OpDesc>>::iterator &op, ops_.erase(ops_.begin() + s, ops_.begin() + e);
std::vector<std::string> &v) {
auto in_names = (*op)->InputArgumentNames();
v.insert(v.end(), in_names.begin(), in_names.end());
auto out_names = (*op)->OutputArgumentNames();
v.insert(v.end(), out_names.begin(), out_names.end());
std::sort(v.begin(), v.end());
auto last = std::unique(v.begin(), v.end());
v.erase(last, v.end());
};
need_update_ = true;
for (size_t i = s; i < e; i++) {
// since remove op one by one, every time remove the first op.
auto op = ops_.begin() + s;
// collect input and output variables from current delete op
std::vector<std::string> cur_vars;
get_vars(op, cur_vars);
// remove current op
ops_.erase(ops_.begin() + s);
// collect input and output variables from other ops
std::vector<std::string> other_vars;
for (auto it = ops_.begin(); it != ops_.end(); it++) {
get_vars(it, other_vars);
}
// variables should be deleted
std::vector<std::string> delete_vars;
// delete_vars = cur_vars - cur_vars ^ other_input_vars
std::set_difference(cur_vars.begin(), cur_vars.end(), other_vars.begin(),
other_vars.end(),
std::inserter(delete_vars, delete_vars.end()));
// remove variables
for (size_t i = 0; i < delete_vars.size(); i++) {
auto name = delete_vars[i];
auto it = vars_.find(name);
PADDLE_ENFORCE(it != vars_.end(),
"%s is not in variable list, it should not be deleted",
name);
vars_.erase(it);
VLOG(3) << "deleting variable " << name;
}
}
} }
std::vector<OpDesc *> BlockDesc::AllOps() const { std::vector<OpDesc *> BlockDesc::AllOps() const {
......
...@@ -12,9 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,9 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor.h" #include <stdint.h>
#include <string.h>
#include <algorithm>
#include <iterator>
#include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/framework.pb.h" #include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/memory/memcpy.h" #include "paddle/fluid/memory/memcpy.h"
#include "paddle/fluid/memory/memory.h" #include "paddle/fluid/memory/memory.h"
...@@ -22,11 +27,6 @@ limitations under the License. */ ...@@ -22,11 +27,6 @@ limitations under the License. */
#include "paddle/fluid/recordio/scanner.h" #include "paddle/fluid/recordio/scanner.h"
#include "paddle/fluid/recordio/writer.h" #include "paddle/fluid/recordio/writer.h"
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <iterator>
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -294,7 +294,7 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor, ...@@ -294,7 +294,7 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor,
TensorFromStream(is, static_cast<Tensor *>(tensor), dev_ctx); TensorFromStream(is, static_cast<Tensor *>(tensor), dev_ctx);
} }
void WriteToRecordIO(recordio::Writer &writer, void WriteToRecordIO(recordio::Writer *writer,
const std::vector<LoDTensor> &tensor, const std::vector<LoDTensor> &tensor,
const platform::DeviceContext &dev_ctx) { const platform::DeviceContext &dev_ctx) {
std::stringstream buffer; std::stringstream buffer;
...@@ -303,18 +303,20 @@ void WriteToRecordIO(recordio::Writer &writer, ...@@ -303,18 +303,20 @@ void WriteToRecordIO(recordio::Writer &writer,
for (auto &each : tensor) { for (auto &each : tensor) {
SerializeToStream(buffer, each, dev_ctx); SerializeToStream(buffer, each, dev_ctx);
} }
writer.Write(buffer.str()); writer->Write(buffer.str());
} }
std::vector<LoDTensor> ReadFromRecordIO( std::vector<LoDTensor> ReadFromRecordIO(
recordio::Scanner &scanner, const platform::DeviceContext &dev_ctx) { recordio::Scanner *scanner, const platform::DeviceContext &dev_ctx) {
std::istringstream sin(scanner.Next());
uint32_t sz;
sin.read(reinterpret_cast<char *>(&sz), sizeof(uint32_t));
std::vector<LoDTensor> result; std::vector<LoDTensor> result;
result.resize(sz); if (scanner->HasNext()) {
for (uint32_t i = 0; i < sz; ++i) { std::istringstream sin(scanner->Next());
DeserializeFromStream(sin, &result[i], dev_ctx); uint32_t sz;
sin.read(reinterpret_cast<char *>(&sz), sizeof(uint32_t));
result.resize(sz);
for (uint32_t i = 0; i < sz; ++i) {
DeserializeFromStream(sin, &result[i], dev_ctx);
}
} }
return result; return result;
} }
......
...@@ -15,6 +15,9 @@ limitations under the License. */ ...@@ -15,6 +15,9 @@ limitations under the License. */
#pragma once #pragma once
#include <memory> #include <memory>
#include <string>
#include <utility>
#include <vector>
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
#include <thrust/device_vector.h> #include <thrust/device_vector.h>
#include <thrust/host_vector.h> #include <thrust/host_vector.h>
...@@ -216,12 +219,12 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor, ...@@ -216,12 +219,12 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor,
void DeserializeFromStream(std::istream& is, LoDTensor* tensor, void DeserializeFromStream(std::istream& is, LoDTensor* tensor,
const platform::DeviceContext& dev_ctx); const platform::DeviceContext& dev_ctx);
extern void WriteToRecordIO(recordio::Writer& writer, extern void WriteToRecordIO(recordio::Writer* writer,
const std::vector<LoDTensor>& tensor, const std::vector<LoDTensor>& tensor,
const platform::DeviceContext& dev_ctx); const platform::DeviceContext& dev_ctx);
extern std::vector<LoDTensor> ReadFromRecordIO( extern std::vector<LoDTensor> ReadFromRecordIO(
recordio::Scanner& scanner, const platform::DeviceContext& dev_ctx); recordio::Scanner* scanner, const platform::DeviceContext& dev_ctx);
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -12,17 +12,17 @@ ...@@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/recordio/scanner.h"
#include "paddle/fluid/recordio/writer.h"
#include <glog/logging.h> #include <glog/logging.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <algorithm> #include <algorithm>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/recordio/scanner.h"
#include "paddle/fluid/recordio/writer.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -240,8 +240,8 @@ TEST(LoDTensor, RecordIO) { ...@@ -240,8 +240,8 @@ TEST(LoDTensor, RecordIO) {
*platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); *platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
{ {
recordio::Writer writer(stream, recordio::Compressor::kSnappy); recordio::Writer writer(stream, recordio::Compressor::kSnappy);
WriteToRecordIO(writer, {tensor, tensor}, ctx); WriteToRecordIO(&writer, {tensor, tensor}, ctx);
WriteToRecordIO(writer, {tensor, tensor}, ctx); WriteToRecordIO(&writer, {tensor, tensor}, ctx);
writer.Flush(); writer.Flush();
} }
...@@ -254,11 +254,11 @@ TEST(LoDTensor, RecordIO) { ...@@ -254,11 +254,11 @@ TEST(LoDTensor, RecordIO) {
{ {
std::unique_ptr<std::istream> stream_ptr(stream); std::unique_ptr<std::istream> stream_ptr(stream);
recordio::Scanner scanner(std::move(stream_ptr)); recordio::Scanner scanner(std::move(stream_ptr));
auto tensors = ReadFromRecordIO(scanner, ctx); auto tensors = ReadFromRecordIO(&scanner, ctx);
ASSERT_EQ(tensors.size(), 2); ASSERT_EQ(tensors.size(), 2);
assert_tensor_ok(tensors[0]); assert_tensor_ok(tensors[0]);
assert_tensor_ok(tensors[1]); assert_tensor_ok(tensors[1]);
tensors = ReadFromRecordIO(scanner, ctx); tensors = ReadFromRecordIO(&scanner, ctx);
ASSERT_EQ(tensors.size(), 2); ASSERT_EQ(tensors.size(), 2);
assert_tensor_ok(tensors[0]); assert_tensor_ok(tensors[0]);
assert_tensor_ok(tensors[1]); assert_tensor_ok(tensors[1]);
......
...@@ -115,14 +115,12 @@ void ParallelExecutor::BCastParamsToGPUs( ...@@ -115,14 +115,12 @@ void ParallelExecutor::BCastParamsToGPUs(
for (auto &var : vars) { for (auto &var : vars) {
auto *main_var = main_scope->FindVar(var); auto *main_var = main_scope->FindVar(var);
if (!main_var->IsType<LoDTensor>()) { if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
continue; continue;
} }
auto &main_tensor = main_var->Get<LoDTensor>(); auto &main_tensor = main_var->Get<LoDTensor>();
auto &dims = main_tensor.dims(); auto &dims = main_tensor.dims();
if (paddle::platform::is_gpu_place(main_tensor.place())) { if (paddle::platform::is_gpu_place(main_tensor.place())) {
size_t numel = main_tensor.numel(); size_t numel = main_tensor.numel();
ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type()); ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
...@@ -174,12 +172,17 @@ void ParallelExecutor::SplitTensorToPlaces( ...@@ -174,12 +172,17 @@ void ParallelExecutor::SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) { const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
for (auto it : feed_tensors) { for (auto it : feed_tensors) {
auto lod_tensors = it.second.SplitLoDTensor(member_->places_); auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
PADDLE_ENFORCE_EQ(
member_->places_.size(), lod_tensors.size(),
"The number of samples of current batch is less than the count of "
"devices, currently, it is not allowed. (%d vs %d)",
member_->places_.size(), lod_tensors.size());
for (size_t j = 0; j < member_->places_.size(); ++j) { for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var? // TODO(panxy0718): Do I need to delete this var?
member_->local_scopes_[j] auto t =
->Var(it.first) member_->local_scopes_[j]->Var(it.first)->GetMutable<LoDTensor>();
->GetMutable<LoDTensor>() t->ShareDataWith(lod_tensors[j]);
->ShareDataWith(lod_tensors[j]); t->set_lod(lod_tensors[j].lod());
} }
} }
} }
......
...@@ -22,7 +22,9 @@ FileReader::FileReader(const std::vector<DDim> &dims) : dims_(dims) {} ...@@ -22,7 +22,9 @@ FileReader::FileReader(const std::vector<DDim> &dims) : dims_(dims) {}
void FileReader::ReadNext(std::vector<LoDTensor> *out) { void FileReader::ReadNext(std::vector<LoDTensor> *out) {
ReadNextImpl(out); ReadNextImpl(out);
PADDLE_ENFORCE_EQ(out->size(), dims_.size()); if (out->empty()) {
return;
}
for (size_t i = 0; i < dims_.size(); ++i) { for (size_t i = 0; i < dims_.size(); ++i) {
auto &actual = out->at(i).dims(); auto &actual = out->at(i).dims();
auto &expect = dims_[i]; auto &expect = dims_[i];
......
...@@ -14,14 +14,13 @@ ...@@ -14,14 +14,13 @@
#pragma once #pragma once
#include <memory>
#include <vector>
#include "paddle/fluid/framework/ddim.h" #include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/lod_tensor_array.h" #include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include <memory>
#include <thread>
#include <vector>
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -31,8 +30,6 @@ class ReaderBase { ...@@ -31,8 +30,6 @@ class ReaderBase {
virtual void ReInit() = 0; virtual void ReInit() = 0;
virtual bool HasNext() const = 0;
virtual ~ReaderBase(); virtual ~ReaderBase();
}; };
...@@ -44,8 +41,6 @@ class DecoratedReader : public ReaderBase { ...@@ -44,8 +41,6 @@ class DecoratedReader : public ReaderBase {
void ReInit() override { reader_->ReInit(); } void ReInit() override { reader_->ReInit(); }
bool HasNext() const override { return reader_->HasNext(); }
protected: protected:
ReaderBase* reader_; ReaderBase* reader_;
}; };
...@@ -80,8 +75,6 @@ class ReaderHolder { ...@@ -80,8 +75,6 @@ class ReaderHolder {
reader_->ReInit(); reader_->ReInit();
} }
bool HasNext() const { return reader_->HasNext(); }
private: private:
std::unique_ptr<ReaderBase> reader_; std::unique_ptr<ReaderBase> reader_;
}; };
......
...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/operators/batch_norm_op.h" #include "paddle/fluid/operators/batch_norm_op.h"
#include <string>
#include "paddle/fluid/framework/data_layout.h" #include "paddle/fluid/framework/data_layout.h"
namespace paddle { namespace paddle {
......
...@@ -13,9 +13,8 @@ See the License for the specific language governing permissions and ...@@ -13,9 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/operators/batch_norm_op.h" #include "paddle/fluid/operators/batch_norm_op.h"
#include "paddle/fluid/framework/data_layout.h"
#include <cfloat> #include <cfloat>
#include "paddle/fluid/framework/data_layout.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/cudnn_helper.h" #include "paddle/fluid/platform/cudnn_helper.h"
#include "paddle/fluid/platform/float16.h" #include "paddle/fluid/platform/float16.h"
......
...@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and ...@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#pragma once #pragma once
#include <algorithm>
#include <vector>
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
......
...@@ -10,6 +10,7 @@ See the License for the specific language governing permissions and ...@@ -10,6 +10,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#pragma once #pragma once
#include <string>
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
......
...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/operators/compare_op.h" #include "paddle/fluid/operators/compare_op.h"
#include <string>
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
namespace paddle { namespace paddle {
......
...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/operators/concat_op.h" #include "paddle/fluid/operators/concat_op.h"
#include <string>
#include <vector> #include <vector>
namespace paddle { namespace paddle {
......
...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#pragma once #pragma once
#include <string>
#include <vector> #include <vector>
#include "glog/logging.h" #include "glog/logging.h"
#include "paddle/fluid/framework/ddim.h" #include "paddle/fluid/framework/ddim.h"
......
...@@ -13,6 +13,8 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/operators/conv_transpose_op.h" #include "paddle/fluid/operators/conv_transpose_op.h"
#include <string>
#include <vector>
namespace paddle { namespace paddle {
namespace operators { namespace operators {
......
...@@ -13,7 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#pragma once #pragma once
#include <vector>
#include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/im2col.h" #include "paddle/fluid/operators/math/im2col.h"
......
...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#pragma once #pragma once
#include <limits>
#include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
......
...@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and ...@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#pragma once #pragma once
#include <utility>
#include <vector>
#include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/strided_memcpy.h" #include "paddle/fluid/operators/strided_memcpy.h"
......
...@@ -39,13 +39,14 @@ void gemm<platform::CUDADeviceContext, float16>( ...@@ -39,13 +39,14 @@ void gemm<platform::CUDADeviceContext, float16>(
cublasOperation_t cuTransB = cublasOperation_t cuTransB =
(transB == CblasNoTrans) ? CUBLAS_OP_N : CUBLAS_OP_T; (transB == CblasNoTrans) ? CUBLAS_OP_N : CUBLAS_OP_T;
float h_alpha = static_cast<float>(alpha);
float h_beta = static_cast<float>(beta);
// TODO(kexinzhao): add processing code for compute capability < 53 case // TODO(kexinzhao): add processing code for compute capability < 53 case
PADDLE_ENFORCE_GE(context.GetComputeCapability(), 53, PADDLE_ENFORCE_GE(context.GetComputeCapability(), 53,
"cublas fp16 gemm requires GPU compute capability >= 53"); "cublas fp16 gemm requires GPU compute capability >= 53");
#if CUDA_VERSION >= 8000
float h_alpha = static_cast<float>(alpha);
float h_beta = static_cast<float>(beta);
cublasGemmAlgo_t algo = CUBLAS_GEMM_DFALT; cublasGemmAlgo_t algo = CUBLAS_GEMM_DFALT;
#if CUDA_VERSION >= 9000 #if CUDA_VERSION >= 9000
if (context.GetComputeCapability() >= 70) { if (context.GetComputeCapability() >= 70) {
...@@ -56,7 +57,7 @@ void gemm<platform::CUDADeviceContext, float16>( ...@@ -56,7 +57,7 @@ void gemm<platform::CUDADeviceContext, float16>(
PADDLE_ENFORCE(platform::dynload::cublasSetMathMode(context.cublas_handle(), PADDLE_ENFORCE(platform::dynload::cublasSetMathMode(context.cublas_handle(),
CUBLAS_DEFAULT_MATH)); CUBLAS_DEFAULT_MATH));
} }
#endif #endif // CUDA_VERSION >= 9000
// cublasHgemm does true FP16 computation which is slow for non-Volta // cublasHgemm does true FP16 computation which is slow for non-Volta
// GPUs. So use cublasGemmEx instead which does pesudo FP16 computation: // GPUs. So use cublasGemmEx instead which does pesudo FP16 computation:
...@@ -66,6 +67,18 @@ void gemm<platform::CUDADeviceContext, float16>( ...@@ -66,6 +67,18 @@ void gemm<platform::CUDADeviceContext, float16>(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, B, context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, B,
CUDA_R_16F, ldb, A, CUDA_R_16F, lda, &h_beta, C, CUDA_R_16F, N, CUDA_R_16F, ldb, A, CUDA_R_16F, lda, &h_beta, C, CUDA_R_16F, N,
CUDA_R_32F, algo)); CUDA_R_32F, algo));
#else
// CUDA 7.5 does not support cublasGemmEx, hence we fall back to use hgemm
const half h_alpha = static_cast<const half>(alpha);
const half h_beta = static_cast<const half>(beta);
const half* h_A = reinterpret_cast<const half*>(A);
const half* h_B = reinterpret_cast<const half*>(B);
half* h_C = reinterpret_cast<half*>(C);
PADDLE_ENFORCE(platform::dynload::cublasHgemm(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, h_B, ldb,
h_A, lda, &h_beta, h_C, N));
#endif // CUDA_VERSION >= 8000
} }
template <> template <>
......
...@@ -66,13 +66,7 @@ class ReadOp : public framework::OperatorBase { ...@@ -66,13 +66,7 @@ class ReadOp : public framework::OperatorBase {
std::vector<std::string> out_arg_names = Outputs("Out"); std::vector<std::string> out_arg_names = Outputs("Out");
std::vector<framework::LoDTensor> ins; std::vector<framework::LoDTensor> ins;
reader->ReadNext(&ins); reader->ReadNext(&ins);
if (ins.empty()) { PADDLE_ENFORCE(!ins.empty(), "There is no next data.");
reader->ReInit();
reader->ReadNext(&ins);
PADDLE_ENFORCE(
!ins.empty(),
"Reader can not read the next data even it has been re-initialized.");
}
PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size()); PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size());
for (size_t i = 0; i < ins.size(); ++i) { for (size_t i = 0; i < ins.size(); ++i) {
auto* out = auto* out =
......
...@@ -22,5 +22,6 @@ reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc) ...@@ -22,5 +22,6 @@ reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc)
reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc) reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc)
reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc) reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc)
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc)
# Export local libraries to parent # Export local libraries to parent
set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE) set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE)
...@@ -63,13 +63,14 @@ class DoubleBufferReader : public framework::DecoratedReader { ...@@ -63,13 +63,14 @@ class DoubleBufferReader : public framework::DecoratedReader {
StartPrefetcher(); StartPrefetcher();
} }
bool HasNext() const override;
void ReadNext(std::vector<framework::LoDTensor>* out) override; void ReadNext(std::vector<framework::LoDTensor>* out) override;
void ReInit() override; void ReInit() override;
~DoubleBufferReader() { EndPrefetcher(); } ~DoubleBufferReader() { EndPrefetcher(); }
private: private:
bool HasNext() const;
void StartPrefetcher() { void StartPrefetcher() {
channel_ = framework::MakeChannel<Item>(kChannelSize); channel_ = framework::MakeChannel<Item>(kChannelSize);
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
...@@ -109,7 +110,9 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { ...@@ -109,7 +110,9 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
auto place_str = Attr<std::string>("place"); auto place_str = Attr<std::string>("place");
platform::Place place; platform::Place place;
if (place_str == "CPU") { if (place_str == "AUTO") {
place = dev_place;
} else if (place_str == "CPU") {
place = platform::CPUPlace(); place = platform::CPUPlace();
} else { } else {
std::istringstream sin(place_str); std::istringstream sin(place_str);
...@@ -140,28 +143,22 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { ...@@ -140,28 +143,22 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
enum_range.insert(string::Sprintf("CUDA:%d", i)); enum_range.insert(string::Sprintf("CUDA:%d", i));
} }
enum_range.insert("CPU"); enum_range.insert("CPU");
AddAttr<std::string>("place", "The double buffer place, default is CPU") enum_range.insert("AUTO");
.SetDefault("CPU") AddAttr<std::string>("place", "The double buffer place")
.SetDefault("AUTO")
.InEnum({enum_range}); .InEnum({enum_range});
} }
}; };
bool DoubleBufferReader::HasNext() const {
while (!channel_->IsClosed() && !channel_->CanReceive()) {
}
return channel_->CanReceive();
}
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) { void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
if (!HasNext()) { out->clear();
PADDLE_THROW("There is no next data!"); if (HasNext()) {
} Item batch;
channel_->Receive(&batch);
Item batch; *out = batch.payloads_;
channel_->Receive(&batch); if (batch.ctx_) {
*out = batch.payloads_; batch.ctx_->Wait();
if (batch.ctx_) { }
batch.ctx_->Wait();
} }
} }
...@@ -171,16 +168,26 @@ void DoubleBufferReader::ReInit() { ...@@ -171,16 +168,26 @@ void DoubleBufferReader::ReInit() {
StartPrefetcher(); StartPrefetcher();
} }
bool DoubleBufferReader::HasNext() const {
while (!channel_->IsClosed() && !channel_->CanReceive()) {
}
return channel_->CanReceive();
}
void DoubleBufferReader::PrefetchThreadFunc() { void DoubleBufferReader::PrefetchThreadFunc() {
VLOG(5) << "A new prefetch thread starts."; VLOG(5) << "A new prefetch thread starts.";
std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache(kCacheSize); std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache(kCacheSize);
std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache(kCacheSize); std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache(kCacheSize);
size_t cached_tensor_id = 0; size_t cached_tensor_id = 0;
while (reader_->HasNext()) { while (true) {
Item batch; Item batch;
auto& cpu_batch = cpu_tensor_cache[cached_tensor_id]; auto& cpu_batch = cpu_tensor_cache[cached_tensor_id];
reader_->ReadNext(&cpu_batch); reader_->ReadNext(&cpu_batch);
if (cpu_batch.empty()) {
// The underlying reader have no next data.
break;
}
if (platform::is_gpu_place(place_)) { if (platform::is_gpu_place(place_)) {
auto& gpu_batch = gpu_tensor_cache[cached_tensor_id]; auto& gpu_batch = gpu_tensor_cache[cached_tensor_id];
auto* gpu_ctx = ctxs_[cached_tensor_id].get(); auto* gpu_ctx = ctxs_[cached_tensor_id].get();
......
...@@ -25,22 +25,12 @@ class MultiPassReader : public framework::DecoratedReader { ...@@ -25,22 +25,12 @@ class MultiPassReader : public framework::DecoratedReader {
: DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {} : DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {}
void ReadNext(std::vector<framework::LoDTensor>* out) override { void ReadNext(std::vector<framework::LoDTensor>* out) override {
if (!HasNext()) {
PADDLE_THROW("There is no next data!");
}
reader_->ReadNext(out); reader_->ReadNext(out);
} if (out->empty()) {
bool HasNext() const override {
if (reader_->HasNext()) {
return true;
} else {
++pass_count_; ++pass_count_;
if (pass_count_ >= pass_num_) { if (pass_count_ < pass_num_) {
return false;
} else {
reader_->ReInit(); reader_->ReInit();
return true; reader_->ReadNext(out);
} }
} }
} }
......
...@@ -52,8 +52,6 @@ class RandomDataGenerator : public framework::ReaderBase { ...@@ -52,8 +52,6 @@ class RandomDataGenerator : public framework::ReaderBase {
void ReInit() override { return; } void ReInit() override { return; }
bool HasNext() const override { return true; }
private: private:
float min_; float min_;
float max_; float max_;
...@@ -74,7 +72,7 @@ class CreateRandomDataGeneratorOp : public framework::OperatorBase { ...@@ -74,7 +72,7 @@ class CreateRandomDataGeneratorOp : public framework::OperatorBase {
const auto& ranks = Attr<std::vector<int>>("ranks"); const auto& ranks = Attr<std::vector<int>>("ranks");
PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty());
PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0),
int(shape_concat.size()), static_cast<int>(shape_concat.size()),
"The accumulate of all ranks should be equal to the " "The accumulate of all ranks should be equal to the "
"shape concat's length."); "shape concat's length.");
std::vector<framework::DDim> shapes = RestoreShapes(shape_concat, ranks); std::vector<framework::DDim> shapes = RestoreShapes(shape_concat, ranks);
......
...@@ -12,8 +12,6 @@ ...@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <mutex>
#include <thread>
#include "paddle/fluid/operators/reader/reader_op_registry.h" #include "paddle/fluid/operators/reader/reader_op_registry.h"
#include "paddle/fluid/recordio/scanner.h" #include "paddle/fluid/recordio/scanner.h"
...@@ -35,17 +33,15 @@ class RecordIOFileReader : public framework::FileReader { ...@@ -35,17 +33,15 @@ class RecordIOFileReader : public framework::FileReader {
LOG(INFO) << "Creating file reader" << filename; LOG(INFO) << "Creating file reader" << filename;
} }
bool HasNext() const override { return scanner_.HasNext(); }
void ReInit() override { scanner_.Reset(); } void ReInit() override { scanner_.Reset(); }
protected: protected:
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override { void ReadNextImpl(std::vector<framework::LoDTensor>* out) override {
if (ThreadSafe) { if (ThreadSafe) {
std::lock_guard<std::mutex> guard(*mutex_); std::lock_guard<std::mutex> guard(*mutex_);
*out = framework::ReadFromRecordIO(scanner_, dev_ctx_); *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_);
} else { } else {
*out = framework::ReadFromRecordIO(scanner_, dev_ctx_); *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_);
} }
} }
...@@ -66,7 +62,7 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { ...@@ -66,7 +62,7 @@ class CreateRecordIOReaderOp : public framework::OperatorBase {
const auto& ranks = Attr<std::vector<int>>("ranks"); const auto& ranks = Attr<std::vector<int>>("ranks");
PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty());
PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0),
int(shape_concat.size()), static_cast<int>(shape_concat.size()),
"The accumulate of all ranks should be equal to the " "The accumulate of all ranks should be equal to the "
"shape concat's length."); "shape concat's length.");
std::string filename = Attr<std::string>("filename"); std::string filename = Attr<std::string>("filename");
......
...@@ -30,35 +30,33 @@ class ShuffleReader : public framework::DecoratedReader { ...@@ -30,35 +30,33 @@ class ShuffleReader : public framework::DecoratedReader {
std::random_device device; std::random_device device;
seed_ = device(); seed_ = device();
} }
ReadIntoBuffers(); ReloadBuffer();
} }
void ReadNext(std::vector<framework::LoDTensor>* out) override { void ReadNext(std::vector<framework::LoDTensor>* out) override {
if (!HasNext()) { out->clear();
PADDLE_THROW("There is no next data!");
}
if (iteration_pos_ >= buffer_.size()) { if (iteration_pos_ >= buffer_.size()) {
VLOG(10) << "Resetting shuffle buffer"; VLOG(10) << "Resetting shuffle buffer";
ReadIntoBuffers(); ReloadBuffer();
if (buffer_.empty()) {
return;
}
} }
*out = buffer_[iteration_pos_++]; *out = buffer_[iteration_pos_++];
} }
bool HasNext() const override {
return iteration_pos_ < buffer_.size() || reader_->HasNext();
}
private: private:
void ReadIntoBuffers() { void ReloadBuffer() {
buffer_.clear(); buffer_.clear();
buffer_.reserve(buffer_size_); buffer_.reserve(buffer_size_);
iteration_pos_ = 0; iteration_pos_ = 0;
for (size_t i = 0; i < buffer_size_; ++i) { for (size_t i = 0; i < buffer_size_; ++i) {
if (!reader_->HasNext()) { std::vector<framework::LoDTensor> ins;
reader_->ReadNext(&ins);
if (ins.empty()) {
break; break;
} }
buffer_.emplace_back(); buffer_.emplace_back(ins);
reader_->ReadNext(&buffer_.back());
} }
std::mt19937 g(seed_); std::mt19937 g(seed_);
std::shuffle(buffer_.begin(), buffer_.end(), g); std::shuffle(buffer_.begin(), buffer_.end(), g);
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/detail/safe_ref.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle {
namespace operators {
namespace reader {
class ThreadedReader : public framework::DecoratedReader {
public:
ThreadedReader(ReaderBase* reader, bool safe_mode)
: DecoratedReader(reader), safe_mode_(safe_mode) {}
void ReadNext(std::vector<framework::LoDTensor>* out) override {
std::lock_guard<std::mutex> lock(mutex_);
reader_->ReadNext(out);
}
void ReInit() override {
if (safe_mode_) {
PADDLE_THROW(
"ThreadedReader::ReInit() is disabled when 'safe_mode' is true.");
}
VLOG(5) << "ThreadedReader::ReInit() is invoked! It might be buggy in "
"multi-thread environment.";
reader_->ReInit();
}
private:
bool safe_mode_;
std::mutex mutex_;
};
class CreateThreadedReaderOp : public framework::OperatorBase {
public:
using framework::OperatorBase::OperatorBase;
private:
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
auto* out = detail::Ref(scope.FindVar(Output("Out")))
.GetMutable<framework::ReaderHolder>();
if (out->Get() != nullptr) {
return;
}
const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
->Get<framework::ReaderHolder>();
bool safe_mode = Attr<bool>("safe_mode");
out->Reset(new ThreadedReader(underlying_reader.Get(), safe_mode));
}
};
class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase {
public:
CreateThreadedReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker)
: DecoratedReaderMakerBase(op_proto, op_checker) {
AddAttr<bool>("safe_mode",
"When 'safe_mode' is true, 'ReInit()' is disabled to avoid "
"unexpected bugs in multi-thread environment.")
.SetDefault(true);
AddComment(R"DOC(
CreateThreadedReader Operator
This operator creates a threaded reader. A threaded reader's
'ReadNext()' can be invoked by several threads at the same
time.
When the attribute 'safe_mode' is true, the threaded reader's
'ReInit()' is disabled to avoid unexpected bugs in multi-thread
environment.
)DOC");
}
};
} // namespace reader
} // namespace operators
} // namespace paddle
namespace reader = paddle::operators::reader;
REGISTER_DECORATED_READER_OPERATOR(create_threaded_reader,
reader::CreateThreadedReaderOp,
reader::CreateThreadedReaderOpMaker);
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <thread> // NOLINT
#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h" #include "paddle/fluid/operators/reader/reader_op_registry.h"
...@@ -19,38 +21,23 @@ namespace paddle { ...@@ -19,38 +21,23 @@ namespace paddle {
namespace operators { namespace operators {
namespace reader { namespace reader {
class MultipleReader : public framework::ReaderBase { class MultiFileReader : public framework::ReaderBase {
public: public:
class ThreadBufferMap { MultiFileReader(const std::vector<std::string>& file_names,
public: const std::vector<framework::DDim>& dims, size_t thread_num,
std::vector<framework::LoDTensor>& operator[]( size_t buffer_size)
const std::thread::id& thread_id) { : file_names_(file_names), dims_(dims), buffer_size_(buffer_size) {
std::lock_guard<std::mutex> lock(mutex_);
return buffer_[thread_id];
}
void Clear() { buffer_.clear(); }
private:
std::mutex mutex_;
std::unordered_map<std::thread::id, std::vector<framework::LoDTensor>>
buffer_;
};
MultipleReader(const std::vector<std::string>& file_names,
const std::vector<framework::DDim>& dims, size_t thread_num)
: file_names_(file_names), dims_(dims) {
prefetchers_.resize(thread_num); prefetchers_.resize(thread_num);
StartNewScheduler(); StartNewScheduler();
} }
void ReadNext(std::vector<framework::LoDTensor>* out) override; void ReadNext(std::vector<framework::LoDTensor>* out) override;
bool HasNext() const override;
void ReInit() override; void ReInit() override;
~MultipleReader() { EndScheduler(); } ~MultiFileReader() { EndScheduler(); }
private: private:
bool HasNext();
void StartNewScheduler(); void StartNewScheduler();
void EndScheduler(); void EndScheduler();
void ScheduleThreadFunc(); void ScheduleThreadFunc();
...@@ -60,39 +47,36 @@ class MultipleReader : public framework::ReaderBase { ...@@ -60,39 +47,36 @@ class MultipleReader : public framework::ReaderBase {
std::vector<framework::DDim> dims_; std::vector<framework::DDim> dims_;
std::thread scheduler_; std::thread scheduler_;
std::vector<std::thread> prefetchers_; std::vector<std::thread> prefetchers_;
size_t buffer_size_;
framework::Channel<size_t>* waiting_file_idx_; framework::Channel<size_t>* waiting_file_idx_;
framework::Channel<size_t>* available_thread_idx_; framework::Channel<size_t>* available_thread_idx_;
framework::Channel<std::vector<framework::LoDTensor>>* buffer_; framework::Channel<std::vector<framework::LoDTensor>>* buffer_;
mutable ThreadBufferMap thread_buffer_map_;
}; };
void MultipleReader::ReadNext(std::vector<framework::LoDTensor>* out) { void MultiFileReader::ReadNext(std::vector<framework::LoDTensor>* out) {
if (!HasNext()) { out->clear();
PADDLE_THROW("There is no next data!"); if (HasNext()) {
buffer_->Receive(out);
} }
auto& thread_local_buffer = thread_buffer_map_[std::this_thread::get_id()];
*out = thread_local_buffer;
thread_local_buffer.clear();
}
bool MultipleReader::HasNext() const {
auto& thread_local_buffer = thread_buffer_map_[std::this_thread::get_id()];
return thread_local_buffer.empty() ? buffer_->Receive(&thread_local_buffer)
: true;
} }
void MultipleReader::ReInit() { void MultiFileReader::ReInit() {
EndScheduler(); EndScheduler();
thread_buffer_map_.Clear();
StartNewScheduler(); StartNewScheduler();
} }
void MultipleReader::StartNewScheduler() { bool MultiFileReader::HasNext() {
while (!buffer_->IsClosed() && !buffer_->CanReceive()) {
}
return buffer_->CanReceive();
}
void MultiFileReader::StartNewScheduler() {
size_t thread_num = prefetchers_.size(); size_t thread_num = prefetchers_.size();
waiting_file_idx_ = framework::MakeChannel<size_t>(file_names_.size()); waiting_file_idx_ = framework::MakeChannel<size_t>(file_names_.size());
available_thread_idx_ = framework::MakeChannel<size_t>(thread_num); available_thread_idx_ = framework::MakeChannel<size_t>(thread_num);
buffer_ = buffer_ =
framework::MakeChannel<std::vector<framework::LoDTensor>>(thread_num); framework::MakeChannel<std::vector<framework::LoDTensor>>(buffer_size_);
for (size_t i = 0; i < file_names_.size(); ++i) { for (size_t i = 0; i < file_names_.size(); ++i) {
waiting_file_idx_->Send(&i); waiting_file_idx_->Send(&i);
...@@ -105,7 +89,7 @@ void MultipleReader::StartNewScheduler() { ...@@ -105,7 +89,7 @@ void MultipleReader::StartNewScheduler() {
scheduler_ = std::thread([this] { ScheduleThreadFunc(); }); scheduler_ = std::thread([this] { ScheduleThreadFunc(); });
} }
void MultipleReader::EndScheduler() { void MultiFileReader::EndScheduler() {
available_thread_idx_->Close(); available_thread_idx_->Close();
buffer_->Close(); buffer_->Close();
waiting_file_idx_->Close(); waiting_file_idx_->Close();
...@@ -117,8 +101,8 @@ void MultipleReader::EndScheduler() { ...@@ -117,8 +101,8 @@ void MultipleReader::EndScheduler() {
delete waiting_file_idx_; delete waiting_file_idx_;
} }
void MultipleReader::ScheduleThreadFunc() { void MultiFileReader::ScheduleThreadFunc() {
VLOG(5) << "MultipleReader schedule thread starts."; VLOG(5) << "MultiFileReader schedule thread starts.";
size_t completed_thread_num = 0; size_t completed_thread_num = 0;
size_t thread_idx; size_t thread_idx;
while (available_thread_idx_->Receive(&thread_idx)) { while (available_thread_idx_->Receive(&thread_idx)) {
...@@ -150,17 +134,20 @@ void MultipleReader::ScheduleThreadFunc() { ...@@ -150,17 +134,20 @@ void MultipleReader::ScheduleThreadFunc() {
p.join(); p.join();
} }
} }
VLOG(5) << "MultipleReader schedule thread terminates."; VLOG(5) << "MultiFileReader schedule thread terminates.";
} }
void MultipleReader::PrefetchThreadFunc(std::string file_name, void MultiFileReader::PrefetchThreadFunc(std::string file_name,
size_t thread_idx) { size_t thread_idx) {
VLOG(5) << "The prefetch thread of file '" << file_name << "' starts."; VLOG(5) << "The prefetch thread of file '" << file_name << "' starts.";
std::unique_ptr<framework::ReaderBase> reader = std::unique_ptr<framework::ReaderBase> reader =
CreateReaderByFileName(file_name, dims_); CreateReaderByFileName(file_name, dims_);
while (reader->HasNext()) { while (true) {
std::vector<framework::LoDTensor> ins; std::vector<framework::LoDTensor> ins;
reader->ReadNext(&ins); reader->ReadNext(&ins);
if (ins.empty()) {
break;
}
try { try {
buffer_->Send(&ins); buffer_->Send(&ins);
} catch (paddle::platform::EnforceNotMet e) { } catch (paddle::platform::EnforceNotMet e) {
...@@ -197,11 +184,13 @@ class OpenFilesOp : public framework::OperatorBase { ...@@ -197,11 +184,13 @@ class OpenFilesOp : public framework::OperatorBase {
const auto& file_names = Attr<std::vector<std::string>>("file_names"); const auto& file_names = Attr<std::vector<std::string>>("file_names");
PADDLE_ENFORCE(!file_names.empty(), "No file to be read!"); PADDLE_ENFORCE(!file_names.empty(), "No file to be read!");
const size_t thread_num = Attr<int>("thread_num"); const size_t thread_num = Attr<int>("thread_num");
const size_t buffer_size = Attr<int>("buffer_size");
auto* out = scope.FindVar(Output("Out")) auto* out = scope.FindVar(Output("Out"))
->template GetMutable<framework::ReaderHolder>(); ->template GetMutable<framework::ReaderHolder>();
out->Reset(new MultipleReader( out->Reset(new MultiFileReader(file_names,
file_names, RestoreShapes(shape_concat, ranks), thread_num)); RestoreShapes(shape_concat, ranks),
thread_num, buffer_size));
} }
}; };
...@@ -212,11 +201,12 @@ class OpenFilesOpMaker : public FileReaderMakerBase { ...@@ -212,11 +201,12 @@ class OpenFilesOpMaker : public FileReaderMakerBase {
AddAttr<std::vector<std::string>>("file_names", "Files to be read."); AddAttr<std::vector<std::string>>("file_names", "Files to be read.");
AddAttr<int>("thread_num", "The maximal concurrent prefetch thread number.") AddAttr<int>("thread_num", "The maximal concurrent prefetch thread number.")
.GreaterThan(0); .GreaterThan(0);
AddAttr<int>("buffer_size", "The size of prefetch buffer.").GreaterThan(0);
AddComment(R"DOC( AddComment(R"DOC(
OpenFiles Operator OpenFiles Operator
An OpenFilesOp creates a MultipleReader, which is able to An OpenFilesOp creates a MultiFileReader, which is able to
read data multi-threaded from multiple files. read data multi-threaded from multiple files.
)DOC"); )DOC");
} }
......
...@@ -33,22 +33,26 @@ constexpr int PADDLE_CUDA_NUM_THREADS = 512; ...@@ -33,22 +33,26 @@ constexpr int PADDLE_CUDA_NUM_THREADS = 512;
USE_CUDA_ATOMIC(Add, float); USE_CUDA_ATOMIC(Add, float);
USE_CUDA_ATOMIC(Add, int); USE_CUDA_ATOMIC(Add, int);
USE_CUDA_ATOMIC(Add, unsigned int); USE_CUDA_ATOMIC(Add, unsigned int);
USE_CUDA_ATOMIC(Add, unsigned long long int); // CUDA API uses unsigned long long int, we cannot use uint64_t here.
// It because unsigned long long int is not necessarily uint64_t
USE_CUDA_ATOMIC(Add, unsigned long long int); // NOLINT
CUDA_ATOMIC_WRAPPER(Add, int64_t) { CUDA_ATOMIC_WRAPPER(Add, int64_t) {
static_assert(sizeof(int64_t) == sizeof(long long int), // Here, we check long long int must be int64_t.
static_assert(sizeof(int64_t) == sizeof(long long int), // NOLINT
"long long should be int64"); "long long should be int64");
return CudaAtomicAdd(reinterpret_cast<unsigned long long int*>(address), return CudaAtomicAdd(
static_cast<unsigned long long int>(val)); reinterpret_cast<unsigned long long int*>(address), // NOLINT
static_cast<unsigned long long int>(val)); // NOLINT
} }
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 600 #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 600
USE_CUDA_ATOMIC(Add, double); USE_CUDA_ATOMIC(Add, double);
#else #else
CUDA_ATOMIC_WRAPPER(Add, double) { CUDA_ATOMIC_WRAPPER(Add, double) {
unsigned long long int* address_as_ull = unsigned long long int* address_as_ull = // NOLINT
reinterpret_cast<unsigned long long int*>(address); reinterpret_cast<unsigned long long int*>(address); // NOLINT
unsigned long long int old = *address_as_ull, assumed; unsigned long long int old = *address_as_ull, assumed; // NOLINT
do { do {
assumed = old; assumed = old;
......
...@@ -28,6 +28,10 @@ CUBLAS_BLAS_ROUTINE_EACH(DEFINE_WRAP); ...@@ -28,6 +28,10 @@ CUBLAS_BLAS_ROUTINE_EACH(DEFINE_WRAP);
CUBLAS_BLAS_ROUTINE_EACH_R2(DEFINE_WRAP); CUBLAS_BLAS_ROUTINE_EACH_R2(DEFINE_WRAP);
#endif #endif
#ifdef CUBLAS_BLAS_ROUTINE_EACH_R3
CUBLAS_BLAS_ROUTINE_EACH_R3(DEFINE_WRAP);
#endif
} // namespace dynload } // namespace dynload
} // namespace platform } // namespace platform
} // namespace paddle } // namespace paddle
...@@ -71,7 +71,6 @@ extern void *cublas_dso_handle; ...@@ -71,7 +71,6 @@ extern void *cublas_dso_handle;
__macro(cublasDgemm_v2); \ __macro(cublasDgemm_v2); \
__macro(cublasHgemm); \ __macro(cublasHgemm); \
__macro(cublasSgemmEx); \ __macro(cublasSgemmEx); \
__macro(cublasGemmEx); \
__macro(cublasSgeam_v2); \ __macro(cublasSgeam_v2); \
__macro(cublasDgeam_v2); \ __macro(cublasDgeam_v2); \
__macro(cublasCreate_v2); \ __macro(cublasCreate_v2); \
...@@ -83,11 +82,6 @@ extern void *cublas_dso_handle; ...@@ -83,11 +82,6 @@ extern void *cublas_dso_handle;
__macro(cublasDgemmBatched); \ __macro(cublasDgemmBatched); \
__macro(cublasCgemmBatched); \ __macro(cublasCgemmBatched); \
__macro(cublasZgemmBatched); \ __macro(cublasZgemmBatched); \
__macro(cublasSgemmStridedBatched); \
__macro(cublasDgemmStridedBatched); \
__macro(cublasCgemmStridedBatched); \
__macro(cublasZgemmStridedBatched); \
__macro(cublasHgemmStridedBatched); \
__macro(cublasSgetrfBatched); \ __macro(cublasSgetrfBatched); \
__macro(cublasSgetriBatched); \ __macro(cublasSgetriBatched); \
__macro(cublasDgetrfBatched); \ __macro(cublasDgetrfBatched); \
...@@ -95,10 +89,24 @@ extern void *cublas_dso_handle; ...@@ -95,10 +89,24 @@ extern void *cublas_dso_handle;
CUBLAS_BLAS_ROUTINE_EACH(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP) CUBLAS_BLAS_ROUTINE_EACH(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
// APIs available after CUDA 8.0
#if CUDA_VERSION >= 8000
#define CUBLAS_BLAS_ROUTINE_EACH_R2(__macro) \
__macro(cublasGemmEx); \
__macro(cublasSgemmStridedBatched); \
__macro(cublasDgemmStridedBatched); \
__macro(cublasCgemmStridedBatched); \
__macro(cublasZgemmStridedBatched); \
__macro(cublasHgemmStridedBatched);
CUBLAS_BLAS_ROUTINE_EACH_R2(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
#endif
// APIs available after CUDA 9.0 // APIs available after CUDA 9.0
#if CUDA_VERSION >= 9000 #if CUDA_VERSION >= 9000
#define CUBLAS_BLAS_ROUTINE_EACH_R2(__macro) __macro(cublasSetMathMode); #define CUBLAS_BLAS_ROUTINE_EACH_R3(__macro) __macro(cublasSetMathMode);
CUBLAS_BLAS_ROUTINE_EACH_R2(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
CUBLAS_BLAS_ROUTINE_EACH_R3(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
#endif #endif
#undef DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP #undef DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP
......
...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
#pragma once #pragma once
#include <thread> #include <thread> // NOLINT
#include <typeindex> #include <typeindex>
#include <vector>
#include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/dynload/nccl.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
...@@ -29,6 +30,8 @@ inline ncclDataType_t ToNCCLDataType(std::type_index type) { ...@@ -29,6 +30,8 @@ inline ncclDataType_t ToNCCLDataType(std::type_index type) {
return ncclDouble; return ncclDouble;
} else if (type == typeid(int)) { // NOLINT } else if (type == typeid(int)) { // NOLINT
return ncclInt; return ncclInt;
} else if (type == typeid(int64_t)) { // NOLINT
return ncclInt64;
} else { } else {
PADDLE_THROW("Not supported"); PADDLE_THROW("Not supported");
} }
...@@ -58,7 +61,7 @@ struct NCCLContext { ...@@ -58,7 +61,7 @@ struct NCCLContext {
ncclComm_t comm_; ncclComm_t comm_;
explicit NCCLContext(int dev_id) explicit NCCLContext(int dev_id)
: ctx_(new CUDADeviceContext(CUDAPlace(dev_id))) {} : ctx_(new CUDADeviceContext(CUDAPlace(dev_id))), comm_{nullptr} {}
cudaStream_t stream() const { return ctx_->stream(); } cudaStream_t stream() const { return ctx_->stream(); }
...@@ -66,23 +69,23 @@ struct NCCLContext { ...@@ -66,23 +69,23 @@ struct NCCLContext {
return boost::get<platform::CUDAPlace>(ctx_->GetPlace()).device; return boost::get<platform::CUDAPlace>(ctx_->GetPlace()).device;
} }
static void InitNCCLContext(std::unordered_map<int, NCCLContext> &contexts, static void InitNCCLContext(std::unordered_map<int, NCCLContext> *contexts,
const std::vector<platform::Place> &places) { const std::vector<platform::Place> &places) {
std::vector<ncclComm_t> comms; std::vector<ncclComm_t> comms;
std::vector<int> devs; std::vector<int> devs;
comms.resize(contexts.size()); comms.resize(contexts->size());
devs.reserve(contexts.size()); devs.reserve(contexts->size());
for (auto &p : places) { for (auto &p : places) {
devs.push_back(boost::get<platform::CUDAPlace>(p).device); devs.push_back(boost::get<platform::CUDAPlace>(p).device);
} }
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
&comms[0], static_cast<int>(contexts.size()), &devs[0])); &comms[0], static_cast<int>(contexts->size()), &devs[0]));
int i = 0; int i = 0;
for (auto &dev_id : devs) { for (auto &dev_id : devs) {
contexts.at(dev_id).comm_ = comms[i++]; contexts->at(dev_id).comm_ = comms[i++];
} }
} }
}; };
...@@ -91,7 +94,8 @@ struct NCCLContextMap { ...@@ -91,7 +94,8 @@ struct NCCLContextMap {
std::unordered_map<int, NCCLContext> contexts_; std::unordered_map<int, NCCLContext> contexts_;
std::vector<int> order_; std::vector<int> order_;
NCCLContextMap(const std::vector<platform::Place> &places) { explicit NCCLContextMap(const std::vector<platform::Place> &places) {
PADDLE_ENFORCE(!places.empty());
order_.reserve(places.size()); order_.reserve(places.size());
for (auto &p : places) { for (auto &p : places) {
int dev_id = boost::get<CUDAPlace>(p).device; int dev_id = boost::get<CUDAPlace>(p).device;
...@@ -102,15 +106,17 @@ struct NCCLContextMap { ...@@ -102,15 +106,17 @@ struct NCCLContextMap {
order_.size(), contexts_.size(), order_.size(), contexts_.size(),
"NCCL Context Map does not support contain two or more same device"); "NCCL Context Map does not support contain two or more same device");
std::vector<ncclComm_t> comms; if (places.size() > 1) {
comms.resize(order_.size()); std::vector<ncclComm_t> comms;
comms.resize(order_.size());
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
&comms[0], static_cast<int>(order_.size()), &order_[0])); &comms[0], static_cast<int>(order_.size()), &order_[0]));
int i = 0; int i = 0;
for (auto &dev_id : order_) { for (auto &dev_id : order_) {
contexts_.at(dev_id).comm_ = comms[i++]; contexts_.at(dev_id).comm_ = comms[i++];
}
} }
} }
......
...@@ -252,7 +252,6 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -252,7 +252,6 @@ All parameter, weight, gradient are variables in Paddle.
py::return_value_policy::reference); py::return_value_policy::reference);
py::class_<framework::ReaderHolder>(m, "Reader", "") py::class_<framework::ReaderHolder>(m, "Reader", "")
.def("has_next", &framework::ReaderHolder::HasNext)
.def("reset", &framework::ReaderHolder::ReInit); .def("reset", &framework::ReaderHolder::ReInit);
py::class_<Scope>(m, "Scope", "") py::class_<Scope>(m, "Scope", "")
......
...@@ -39,7 +39,7 @@ class RecordIOWriter { ...@@ -39,7 +39,7 @@ class RecordIOWriter {
void CompleteAppendTensor() { void CompleteAppendTensor() {
auto& ctx = auto& ctx =
*platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); *platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
framework::WriteToRecordIO(writer_, tensors_, ctx); framework::WriteToRecordIO(&writer_, tensors_, ctx);
tensors_.clear(); tensors_.clear();
} }
......
...@@ -29,6 +29,7 @@ import optimizer ...@@ -29,6 +29,7 @@ import optimizer
import backward import backward
import regularizer import regularizer
import average import average
import metrics
from param_attr import ParamAttr, WeightNormParamAttr from param_attr import ParamAttr, WeightNormParamAttr
from data_feeder import DataFeeder from data_feeder import DataFeeder
from core import LoDTensor, CPUPlace, CUDAPlace, CUDAPinnedPlace from core import LoDTensor, CPUPlace, CUDAPlace, CUDAPinnedPlace
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import numpy as np import numpy as np
import warnings
""" """
Class of all kinds of Average. Class of all kinds of Average.
...@@ -22,6 +23,8 @@ import numpy as np ...@@ -22,6 +23,8 @@ import numpy as np
wrappers of Python functions. wrappers of Python functions.
""" """
__all__ = ["WeightedAverage"]
def _is_number_(var): def _is_number_(var):
return isinstance(var, int) or isinstance(var, float) or (isinstance( return isinstance(var, int) or isinstance(var, float) or (isinstance(
...@@ -34,6 +37,9 @@ def _is_number_or_matrix_(var): ...@@ -34,6 +37,9 @@ def _is_number_or_matrix_(var):
class WeightedAverage(object): class WeightedAverage(object):
def __init__(self): def __init__(self):
warnings.warn(
"The %s is deprecated, please use fluid.metrics.Accuracy instead." %
(self.__class__.__name__), Warning)
self.reset() self.reset()
def reset(self): def reset(self):
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import warnings
import numpy as np import numpy as np
import layers import layers
...@@ -59,6 +60,9 @@ class Evaluator(object): ...@@ -59,6 +60,9 @@ class Evaluator(object):
""" """
def __init__(self, name, **kwargs): def __init__(self, name, **kwargs):
warnings.warn(
"The %s is deprecated, because maintain a modified program inside evaluator cause bug easily, please use fluid.metrics.%s instead."
% (self.__class__.__name__, self.__class__.__name__), Warning)
self.states = [] self.states = []
self.metrics = [] self.metrics = []
self.helper = LayerHelper(name, **kwargs) self.helper = LayerHelper(name, **kwargs)
......
...@@ -818,6 +818,11 @@ class Block(object): ...@@ -818,6 +818,11 @@ class Block(object):
del self.vars[name] del self.vars[name]
self.sync_with_cpp() self.sync_with_cpp()
def remove_var(self, name):
self.sync_with_cpp()
self.desc.remove_var(name)
del self.vars[name]
def create_parameter(self, *args, **kwargs): def create_parameter(self, *args, **kwargs):
global_block = self.program.global_block() global_block = self.program.global_block()
param = Parameter(global_block, *args, **kwargs) param = Parameter(global_block, *args, **kwargs)
...@@ -838,6 +843,11 @@ class Block(object): ...@@ -838,6 +843,11 @@ class Block(object):
self.ops.insert(index, op) self.ops.insert(index, op)
return op return op
def remove_op(self, index):
self.sync_with_cpp()
self.desc.remove_op(index, index + 1)
del self.ops[index]
def delete_ops(self, ops): def delete_ops(self, ops):
# remove from cpp # remove from cpp
# FIXME(typhoonzero): remove only the first occurrence. # FIXME(typhoonzero): remove only the first occurrence.
...@@ -846,6 +856,7 @@ class Block(object): ...@@ -846,6 +856,7 @@ class Block(object):
end = list(self.ops).index(ops[-1]) end = list(self.ops).index(ops[-1])
except Exception, e: except Exception, e:
raise e raise e
self.desc.remove_op(start, end + 1) self.desc.remove_op(start, end + 1)
def slice_ops(self, start, end): def slice_ops(self, start, end):
......
...@@ -21,8 +21,7 @@ from ..executor import global_scope ...@@ -21,8 +21,7 @@ from ..executor import global_scope
__all__ = [ __all__ = [
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file',
'open_files', 'read_file', 'create_shuffle_reader', 'open_files', 'read_file', 'shuffle', 'double_buffer'
'create_double_buffer_reader', 'create_multi_pass_reader'
] ]
...@@ -237,13 +236,9 @@ def monkey_patch_reader_methods(reader): ...@@ -237,13 +236,9 @@ def monkey_patch_reader_methods(reader):
var = scope.find_var(reader.name) var = scope.find_var(reader.name)
return var.get_reader() return var.get_reader()
def eof():
return not __get_reader__().has_next()
def reset(): def reset():
return __get_reader__().reset() return __get_reader__().reset()
reader.eof = eof
reader.reset = reset reader.reset = reset
reader.stop_gradient = True reader.stop_gradient = True
reader.persistable = True reader.persistable = True
...@@ -283,7 +278,42 @@ def _copy_reader_create_op_(block, op): ...@@ -283,7 +278,42 @@ def _copy_reader_create_op_(block, op):
return new_op return new_op
def open_recordio_file(filename, shapes, lod_levels, dtypes): def open_recordio_file(filename,
shapes,
lod_levels,
dtypes,
pass_num=1,
for_parallel=False):
"""
Open a RecordIO file
This layer takes a RecordIO file to read from and returns a Reader Variable.
Via the Reader Variable, we can get data from the given RecordIO file.
Args:
filename(str): The RecordIO file's name.
shapes(list): List of tuples which declaring data shapes.
lod_levels(list): List of ints which declaring data lod_level.
dtypes(list): List of strs which declaring data type.
pass_num(int): Number of passes to run.
for_parallel(Bool): Set it as True if you are going to run
subsequent operators in parallel.
Returns:
Variable: A Reader Variable via which we can get RecordIO file data.
Examples:
.. code-block:: python
reader = fluid.layers.io.open_recordio_file(
filename='./data.recordio',
shapes=[(3,224,224), (1)],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
# Via the reader, we can use 'read_file' layer to get data:
image, label = fluid.layers.read_file(reader)
"""
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = [] shape_concat = []
ranks = [] ranks = []
...@@ -310,10 +340,63 @@ def open_recordio_file(filename, shapes, lod_levels, dtypes): ...@@ -310,10 +340,63 @@ def open_recordio_file(filename, shapes, lod_levels, dtypes):
startup_var.persistable = True startup_var.persistable = True
main_prog_var = _copy_reader_var_(default_main_program().current_block(), main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var) startup_var)
if pass_num > 1:
main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num)
if for_parallel:
main_prog_var = parallel(reader=main_prog_var)
return monkey_patch_reader_methods(main_prog_var) return monkey_patch_reader_methods(main_prog_var)
def open_files(filenames, thread_num, shapes, lod_levels, dtypes): def open_files(filenames,
shapes,
lod_levels,
dtypes,
thread_num,
buffer_size=None,
pass_num=1,
for_parallel=False):
"""
Open files
This layer takes a list of files to read from and returns a Reader Variable.
Via the Reader Variable, we can get data from given files. All files must
have name suffixs to indicate their formats, e.g., '*.recordio'.
Args:
filenames(list): The list of file names.
shapes(list): List of tuples which declaring data shapes.
lod_levels(list): List of ints which declaring data lod_level.
dtypes(list): List of strs which declaring data type.
thread_num(int): The maximal concurrent prefetch thread number.
buffer_size(int): The size of prefetch buffer.
pass_num(int): Number of passes to run.
for_parallel(Bool): Set it as True if you are going to run
subsequent operators in parallel.
Returns:
Variable: A Reader Variable via which we can get file data.
Examples:
.. code-block:: python
reader = fluid.layers.io.open_files(filenames=['./data1.recordio',
'./data2.recordio'],
shapes=[(3,224,224), (1)],
lod_levels=[0, 0],
dtypes=['float32', 'int64'],
thread_num=2,
buffer_size=2)
# Via the reader, we can use 'read_file' layer to get data:
image, label = fluid.layers.io.read_file(reader)
"""
if buffer_size is None:
buffer_size = thread_num
if isinstance(filenames, basestring):
filenames = [filenames]
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = [] shape_concat = []
ranks = [] ranks = []
...@@ -322,29 +405,36 @@ def open_files(filenames, thread_num, shapes, lod_levels, dtypes): ...@@ -322,29 +405,36 @@ def open_files(filenames, thread_num, shapes, lod_levels, dtypes):
shape_concat.extend(shape) shape_concat.extend(shape)
ranks.append(len(shape)) ranks.append(len(shape))
var_name = unique_name('multiple_reader') multi_file_reader_name = unique_name('multi_file_reader')
startup_blk = default_startup_program().current_block() startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=var_name) startup_reader = startup_blk.create_var(name=multi_file_reader_name)
startup_blk.append_op( startup_blk.append_op(
type='open_files', type='open_files',
outputs={'Out': [startup_var]}, outputs={'Out': [startup_reader]},
attrs={ attrs={
'shape_concat': shape_concat, 'shape_concat': shape_concat,
'lod_levels': lod_levels, 'lod_levels': lod_levels,
'ranks': ranks, 'ranks': ranks,
'file_names': filenames, 'file_names': filenames,
'thread_num': thread_num 'thread_num': thread_num,
'buffer_size': buffer_size
}) })
startup_var.desc.set_dtypes(dtypes) startup_reader.desc.set_dtypes(dtypes)
startup_var.persistable = True startup_reader.persistable = True
main_prog_var = _copy_reader_var_(default_main_program().current_block(), main_prog_reader = _copy_reader_var_(default_main_program().current_block(),
startup_var) startup_reader)
return monkey_patch_reader_methods(main_prog_var) if pass_num > 1:
main_prog_reader = multi_pass(
reader=main_prog_reader, pass_num=pass_num)
if for_parallel:
main_prog_reader = parallel(reader=main_prog_reader)
return monkey_patch_reader_methods(main_prog_reader)
def __create_decorated_reader__(op_type, reader, attrs): def __create_shared_decorated_reader__(op_type, reader, attrs):
var_name = unique_name(op_type) var_name = unique_name(op_type)
startup_blk = default_startup_program().current_block() startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=var_name) startup_var = startup_blk.create_var(name=var_name)
...@@ -360,22 +450,41 @@ def __create_decorated_reader__(op_type, reader, attrs): ...@@ -360,22 +450,41 @@ def __create_decorated_reader__(op_type, reader, attrs):
return monkey_patch_reader_methods(main_prog_var) return monkey_patch_reader_methods(main_prog_var)
def create_shuffle_reader(reader, buffer_size): def __create_unshared_decorated_reader__(op_type, reader, attrs):
return __create_decorated_reader__('create_shuffle_reader', reader, new_reader_name = unique_name(op_type)
{'buffer_size': int(buffer_size)}) main_blk = default_main_program().current_block()
new_reader = main_blk.create_var(name=new_reader_name)
main_blk.append_op(
type=op_type,
inputs={'UnderlyingReader': reader},
outputs={'Out': [new_reader]},
attrs=attrs)
new_reader.persistable = True
new_reader.stop_gradient = True
return monkey_patch_reader_methods(new_reader)
def shuffle(reader, buffer_size):
return __create_unshared_decorated_reader__(
'create_shuffle_reader', reader, {'buffer_size': int(buffer_size)})
def create_double_buffer_reader(reader, place=None): def double_buffer(reader, place=None):
attrs = dict() attrs = dict()
if place is not None: if place is not None:
attrs['place'] = str(place).upper() attrs['place'] = str(place).upper()
return __create_decorated_reader__('create_double_buffer_reader', reader, return __create_unshared_decorated_reader__('create_double_buffer_reader',
attrs) reader, attrs)
def multi_pass(reader, pass_num):
return __create_shared_decorated_reader__(
'create_multi_pass_reader', reader, {'pass_num': int(pass_num)})
def create_multi_pass_reader(reader, pass_num): def parallel(reader):
return __create_decorated_reader__('create_multi_pass_reader', reader, return __create_shared_decorated_reader__('create_threaded_reader', reader,
{'pass_num': int(pass_num)}) {})
def read_file(file_obj): def read_file(file_obj):
......
...@@ -15,12 +15,13 @@ ...@@ -15,12 +15,13 @@
All layers just related to metric. All layers just related to metric.
""" """
import warnings
from ..layer_helper import LayerHelper from ..layer_helper import LayerHelper
from ..initializer import Normal, Constant from ..initializer import Normal, Constant
from ..framework import Variable from ..framework import Variable
from ..param_attr import ParamAttr from ..param_attr import ParamAttr
__all__ = ['accuracy'] __all__ = ['accuracy', 'auc']
def accuracy(input, label, k=1, correct=None, total=None): def accuracy(input, label, k=1, correct=None, total=None):
...@@ -55,3 +56,37 @@ def accuracy(input, label, k=1, correct=None, total=None): ...@@ -55,3 +56,37 @@ def accuracy(input, label, k=1, correct=None, total=None):
"Total": [total], "Total": [total],
}) })
return acc_out return acc_out
def auc(input, label, curve='ROC', num_thresholds=200):
warnings.warn(
"This interface not recommended, fluid.layers.auc compute the auc at every minibatch, \
but can not aggregate them and get the pass AUC, because pass \
auc can not be averaged with weighted from the minibatch auc value. \
Please use fluid.metrics.Auc, it can compute the auc value via Python natively, \
which can get every minibatch and every pass auc value.", Warning)
helper = LayerHelper("auc", **locals())
topk_out = helper.create_tmp_variable(dtype=input.dtype)
topk_indices = helper.create_tmp_variable(dtype="int64")
helper.append_op(
type="top_k",
inputs={"X": [input]},
outputs={"Out": [topk_out],
"Indices": [topk_indices]},
attrs={"k": k})
auc_out = helper.create_tmp_variable(dtype="float32")
if correct is None:
correct = helper.create_tmp_variable(dtype="int64")
if total is None:
total = helper.create_tmp_variable(dtype="int64")
helper.append_op(
type="accuracy",
inputs={
"Out": [topk_out],
"Indices": [topk_indices],
"Label": [label]
},
attrs={"curve": curve,
"num_thresholds": num_thresholds},
outputs={"AUC": [auc_out], })
return auc_out
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Fluid Metrics
The metrics are accomplished via Python natively.
"""
import numpy as np
import copy
import warnings
__all__ = [
'MetricBase',
'CompositeMetric',
'Accuracy',
'ChunkEvaluator',
'EditDistance',
'DetectionMAP',
'Auc',
]
def _is_numpy_(var):
return isinstance(var, (np.ndarray, np.generic))
def _is_number_(var):
return isinstance(var, int) or isinstance(var, float) or (isinstance(
var, np.ndarray) and var.shape == (1, ))
def _is_number_or_matrix_(var):
return _is_number_(var) or isinstance(var, np.ndarray)
class MetricBase(object):
"""
Base Class for all evaluators
Args:
name(str): The name of evaluator. such as, "accuracy". Used for generate
temporary variable name.
Interface:
Note(*) : the states is the attributes who not has _ prefix.
get_config(): print current states and configuration
reset(): clear the states. If the Metrics states type is not (int, float, np.ndarray),
Please override this method.
update(): update states at every minibatch
eval(): get metric evaluation in numpy type.
"""
def __init__(self, name, **kwargs):
self._name = str(name) if name != None else self.__class__.__name__
self._kwargs = kwargs if kwargs != None else dict()
self.reset()
def __str__(self):
return self._name
def reset(self):
"""
states is the attributes who not has _ prefix.
reset the states of metrics.
"""
states = {
attr: value
for attr, value in self.__dict__.iteritems()
if not attr.startswith("_")
}
for attr, value in states.iteritems():
if isinstance(value, int):
setattr(self, attr, 0)
elif isinstance(value, float):
setattr(self, attr, .0)
elif isinstance(value, (np.ndarray, np.generic)):
setattr(self, attr, np.zeros_like(value))
else:
setattr(self, attr, None)
def get_config(self):
states = {
attr: value
for attr, value in self.__dict__.iteritems()
if not attr.startswith("_")
}
config = copy.deepcopy(self._kwargs)
config.update({"name": self._name, "states": copy.deepcopy(states)})
return config
def update(self):
raise NotImplementedError()
def eval(self):
raise NotImplementedError()
class CompositeMetric(MetricBase):
"""
Compute multiple metrics in each minibatch.
for example, merge F1, accuracy, recall into one Metric.
"""
def __init__(self, name=None, **kwargs):
super(CompositeMetric, self).__init__(name, kwargs)
self._metrics = []
def add_metric(self, metric):
if not isinstance(metric, MetricBase):
raise ValueError("SubMetric should be inherit from MetricBase.")
self._metrics.append(metric)
def eval(self):
ans = []
for m in self._metrics:
ans.append(m.eval())
return ans
class Accuracy(MetricBase):
"""
Accumulate the accuracy from minibatches and compute the average accuracy
for every pass.
Args:
name: the metrics name
Example:
minibatch_accuracy = fluid.layers.accuracy(pred, label)
accuracy_evaluator = fluid.metrics.Accuracy()
for epoch in PASS_NUM:
accuracy_evaluator.reset()
for data in batches:
loss = exe.run(fetch_list=[cost, minibatch_accuracy])
accuracy_evaluator.update(value=minibatch_accuracy, weight=batches)
accuracy = accuracy_evaluator.eval()
"""
def __init__(self, name=None):
super(Accuracy, self).__init__(name)
self.value = .0
self.weight = .0
def update(self, value, weight):
if not _is_number_or_matrix_(value):
raise ValueError(
"The 'value' must be a number(int, float) or a numpy ndarray.")
if not _is_number_(weight):
raise ValueError("The 'weight' must be a number(int, float).")
self.value += value * weight
self.weight += weight
def eval(self):
if self.weight == 0:
raise ValueError(
"There is no data in Accuracy Metrics. Please check layers.accuracy output has added to Accuracy."
)
return self.value / self.weight
class ChunkEvalutor(MetricBase):
"""
Accumulate counter numbers output by chunk_eval from mini-batches and
compute the precision recall and F1-score using the accumulated counter
numbers.
"""
def __init__(self, name=None):
super(ChunkEvalutor, self).__init__(name)
self.num_infer_chunks = 0
self.num_label_chunks = 0
self.num_correct_chunks = 0
def update(self, num_infer_chunks, num_label_chunks, num_correct_chunks):
if not _is_number_or_matrix_(num_infer_chunks):
raise ValueError(
"The 'num_infer_chunks' must be a number(int, float) or a numpy ndarray."
)
if not _is_number_or_matrix_(num_label_chunks):
raise ValueError(
"The 'num_label_chunks' must be a number(int, float) or a numpy ndarray."
)
if not _is_number_or_matrix_(num_correct_chunks):
raise ValueError(
"The 'num_correct_chunks' must be a number(int, float) or a numpy ndarray."
)
self.num_infer_chunks += num_infer_chunks
self.num_label_chunks += num_label_chunks
self.num_correct_chunks += num_correct_chunks
def eval(self):
precision = float(
self.num_correct_chunks
) / self.num_infer_chunks if self.num_infer_chunks else 0
recall = float(self.num_correct_chunks
) / self.num_label_chunks if self.num_label_chunks else 0
f1_score = float(2 * precision * recall) / (
precision + recall) if self.num_correct_chunks else 0
return precision, recall, f1_score
class EditDistance(MetricBase):
"""
Accumulate edit distance sum and sequence number from mini-batches and
compute the average edit_distance and instance error of all batches.
Args:
name: the metrics name
Example:
edit_distance_metrics = fluid.layers.edit_distance(input, label)
distance_evaluator = fluid.metrics.EditDistance()
for epoch in PASS_NUM:
distance_evaluator.reset()
for data in batches:
loss = exe.run(fetch_list=[cost] + list(edit_distance_metrics))
distance_evaluator.update(*edit_distance_metrics)
distance, instance_error = distance_evaluator.eval()
In the above example:
'distance' is the average of the edit distance in a pass.
'instance_error' is the instance error rate in a pass.
"""
def __init__(self, name):
super(EditDistance, self).__init__(name)
self.total_distance = .0
self.seq_num = 0
self.instance_error = 0
def update(self, distances, seq_num):
if not _is_numpy_(distances):
raise ValueError("The 'distances' must be a numpy ndarray.")
if not _is_number_(seq_num):
raise ValueError("The 'seq_num' must be a number(int, float).")
seq_right_count = np.sum(distances == 0)
total_distance = np.sum(distances)
self.seq_num += seq_num
self.instance_error += seq_num - seq_right_count
self.total_distance += total_distance
def eval():
if self.seq_num == 0:
raise ValueError(
"There is no data in EditDistance Metric. Please check layers.edit_distance output has been added to EditDistance."
)
avg_distance = self.total_distance / self.seq_num
avg_instance_error = self.instance_error / self.seq_num
return avg_distance, avg_instance_error
class DetectionMAP(MetricBase):
"""
Calculate the detection mean average precision (mAP).
TODO (Dang Qingqing): update the following doc.
The general steps are as follows:
1. calculate the true positive and false positive according to the input
of detection and labels.
2. calculate mAP value, support two versions: '11 point' and 'integral'.
Please get more information from the following articles:
https://sanchom.wordpress.com/tag/average-precision/
https://arxiv.org/abs/1512.02325
"""
def __init__(self, name=None):
super(DetectionMAP, self).__init__(name)
# the current map value
self.value = .0
def update(self, value, weight):
if not _is_number_or_matrix_(value):
raise ValueError(
"The 'value' must be a number(int, float) or a numpy ndarray.")
if not _is_number_(weight):
raise ValueError("The 'weight' must be a number(int, float).")
self.value += value
self.weight += weight
def eval(self):
if self.weight == 0:
raise ValueError(
"There is no data in DetectionMAP Metrics. "
"Please check layers.detection_map output has added to DetectionMAP."
)
return self.value / self.weight
class Auc(MetricBase):
"""
Auc Metrics which adapts to binary classification.
Need to note that auc metrics compute the value via Python natively.
If you concern the speed, please use the fluid.layers.auc instead.
The `auc` function creates four local variables, `true_positives`,
`true_negatives`, `false_positives` and `false_negatives` that are used to
compute the AUC. To discretize the AUC curve, a linearly spaced set of
thresholds is used to compute pairs of recall and precision values. The area
under the ROC-curve is therefore computed using the height of the recall
values by the false positive rate, while the area under the PR-curve is the
computed using the height of the precision values by the recall.
Args:
name: metric name
curve: Specifies the name of the curve to be computed, 'ROC' [default] or
'PR' for the Precision-Recall-curve.
num_thresholds: The number of thresholds to use when discretizing the roc
curve.
"NOTE: only implement the ROC curve type via Python now."
"""
def __init__(self, name, curve='ROC', num_thresholds=200):
super(MetricBase, self).__init__(name, curve, num_thresholds)
self._curve = curve
self._num_thresholds = num_thresholds
self._epsilon = 1e-6
self.tp_list = np.ndarray((num_thresholds, ))
self.fn_list = np.ndarray((num_thresholds, ))
self.tn_list = np.ndarray((num_thresholds, ))
self.fp_list = np.ndarray((num_thresholds, ))
def update(self, labels, predictions, axis=1):
if not _is_numpy_(labels):
raise ValueError("The 'labels' must be a numpy ndarray.")
if not _is_numpy_(predictions):
raise ValueError("The 'predictions' must be a numpy ndarray.")
kepsilon = 1e-7 # to account for floating point imprecisions
thresholds = [(i + 1) * 1.0 / (num_thresholds - 1)
for i in range(num_thresholds - 2)]
thresholds = [0.0 - kepsilon] + thresholds + [1.0 + kepsilon]
# caculate TP, FN, TN, FP count
for idx_thresh, thresh in enumerate(thresholds):
tp, fn, tn, fp = 0, 0, 0, 0
for i, lbl in enumerate(labels):
if lbl:
if predictions[i, 0] >= thresh:
tp += 1
else:
fn += 1
else:
if predictions[i, 0] >= thresh:
fp += 1
else:
tn += 1
tp_list[idx_thresh] += tp
fn_list[idx_thresh] += fn
tn_list[idx_thresh] += tn
fp_list[idx_thresh] += fp
def eval(self):
epsilon = self._epsilon
num_thresholds = self._num_thresholds
tpr = (tp_list.astype("float32") + epsilon) / (
tp_list + fn_list + epsilon)
fpr = fp_list.astype("float32") / (fp_list + tn_list + epsilon)
rec = (tp_list.astype("float32") + epsilon) / (
tp_list + fp_list + epsilon)
x = fpr[:num_thresholds - 1] - fpr[1:]
y = (tpr[:num_thresholds - 1] + tpr[1:]) / 2.0
auc_value = np.sum(x * y)
return auc_value
...@@ -87,7 +87,8 @@ class ParallelExecutor(object): ...@@ -87,7 +87,8 @@ class ParallelExecutor(object):
# performance. Worth tunning for other models in the future. # performance. Worth tunning for other models in the future.
num_threads = len(self._places) num_threads = len(self._places)
else: else:
min(len(self._places) * 2, multiprocessing.cpu_count()) num_threads = min(
len(self._places) * 2, multiprocessing.cpu_count())
main = main_program main = main_program
main = main if main else framework.default_main_program() main = main if main else framework.default_main_program()
......
...@@ -61,8 +61,12 @@ class TestMultipleReader(unittest.TestCase): ...@@ -61,8 +61,12 @@ class TestMultipleReader(unittest.TestCase):
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
batch_count = 0 batch_count = 0
while not data_files.eof(): while True:
img_val, = exe.run(fetch_list=[img]) try:
img_val, = exe.run(fetch_list=[img])
except fluid.core.EnforceNotMet as ex:
self.assertIn("There is no next data.", ex.message)
break
batch_count += 1 batch_count += 1
self.assertLessEqual(img_val.shape[0], self.batch_size) self.assertLessEqual(img_val.shape[0], self.batch_size)
data_files.reset() data_files.reset()
......
...@@ -44,7 +44,7 @@ class TestMultipleReader(unittest.TestCase): ...@@ -44,7 +44,7 @@ class TestMultipleReader(unittest.TestCase):
shapes=[(-1, 784), (-1, 1)], shapes=[(-1, 784), (-1, 1)],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=['float32', 'int64']) dtypes=['float32', 'int64'])
data_file = fluid.layers.create_multi_pass_reader( data_file = fluid.layers.io.multi_pass(
reader=data_file, pass_num=self.pass_num) reader=data_file, pass_num=self.pass_num)
img, label = fluid.layers.read_file(data_file) img, label = fluid.layers.read_file(data_file)
...@@ -57,8 +57,12 @@ class TestMultipleReader(unittest.TestCase): ...@@ -57,8 +57,12 @@ class TestMultipleReader(unittest.TestCase):
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
batch_count = 0 batch_count = 0
while not data_file.eof(): while True:
img_val, = exe.run(fetch_list=[img]) try:
img_val, = exe.run(fetch_list=[img])
except fluid.core.EnforceNotMet as ex:
self.assertIn("There is no next data.", ex.message)
break
batch_count += 1 batch_count += 1
self.assertLessEqual(img_val.shape[0], self.batch_size) self.assertLessEqual(img_val.shape[0], self.batch_size)
data_file.reset() data_file.reset()
......
...@@ -26,11 +26,14 @@ def simple_fc_net(use_feed): ...@@ -26,11 +26,14 @@ def simple_fc_net(use_feed):
img = fluid.layers.data(name='image', shape=[784], dtype='float32') img = fluid.layers.data(name='image', shape=[784], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
else: else:
reader = fluid.layers.open_recordio_file( reader = fluid.layers.open_files(
filename='./mnist.recordio', filenames=['./mnist.recordio'],
shapes=[[-1, 784], [-1, 1]], shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=['float32', 'int64']) dtypes=['float32', 'int64'],
thread_num=1,
for_parallel=True)
reader = fluid.layers.io.double_buffer(reader)
img, label = fluid.layers.read_file(reader) img, label = fluid.layers.read_file(reader)
hidden = img hidden = img
for _ in xrange(4): for _ in xrange(4):
...@@ -51,11 +54,14 @@ def fc_with_batchnorm(use_feed): ...@@ -51,11 +54,14 @@ def fc_with_batchnorm(use_feed):
img = fluid.layers.data(name='image', shape=[784], dtype='float32') img = fluid.layers.data(name='image', shape=[784], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
else: else:
reader = fluid.layers.open_recordio_file( reader = fluid.layers.open_files(
filename='./mnist.recordio', filenames=['mnist.recordio'],
shapes=[[-1, 784], [-1, 1]], shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0], lod_levels=[0, 0],
dtypes=['float32', 'int64']) dtypes=['float32', 'int64'],
thread_num=1,
for_parallel=True)
reader = fluid.layers.io.double_buffer(reader)
img, label = fluid.layers.read_file(reader) img, label = fluid.layers.read_file(reader)
hidden = img hidden = img
......
...@@ -201,24 +201,6 @@ class TestBlockDesc(unittest.TestCase): ...@@ -201,24 +201,6 @@ class TestBlockDesc(unittest.TestCase):
op1.set_type("test") op1.set_type("test")
op2.set_type("test") op2.set_type("test")
var0 = block.var("var0")
var1 = block.var("var1")
var2 = block.var("var2")
var3 = block.var("var3")
var4 = block.var("var4")
var5 = block.var("var5")
op0.set_input("X", ["var0"])
op0.set_output("Y", ["var0"])
op1.set_input("X", ["var1", "var2"])
op1.set_output("Y", ["var3", "var4"])
op2.set_input("X", ["var1"])
op2.set_output("Y", ["var4", "var5"])
program.sync_with_cpp()
# remove op1, its input var2 and output var3 will be removed at the same time,
# but its input var1 and output var4 will not be removed since they are used for op2.
block.remove_op(1, 2) block.remove_op(1, 2)
program.sync_with_cpp() program.sync_with_cpp()
...@@ -226,8 +208,6 @@ class TestBlockDesc(unittest.TestCase): ...@@ -226,8 +208,6 @@ class TestBlockDesc(unittest.TestCase):
for idx in xrange(0, block.op_size()): for idx in xrange(0, block.op_size()):
all_ops.append(block.op(idx)) all_ops.append(block.op(idx))
self.assertEqual(all_ops, [op0, op2]) self.assertEqual(all_ops, [op0, op2])
all_vars = block.all_vars()
self.assertEqual(set(all_vars), {var0, var1, var4, var5})
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -65,8 +65,13 @@ class TestRecordIO(unittest.TestCase): ...@@ -65,8 +65,13 @@ class TestRecordIO(unittest.TestCase):
# train a pass # train a pass
batch_id = 0 batch_id = 0
while not data_file.eof(): while True:
tmp, = exe.run(fetch_list=[avg_loss]) try:
tmp, = exe.run(fetch_list=[avg_loss])
except fluid.core.EnforceNotMet as ex:
self.assertIn("There is no next data.", ex.message)
break
avg_loss_np.append(tmp) avg_loss_np.append(tmp)
batch_id += 1 batch_id += 1
data_file.reset() data_file.reset()
...@@ -74,8 +79,8 @@ class TestRecordIO(unittest.TestCase): ...@@ -74,8 +79,8 @@ class TestRecordIO(unittest.TestCase):
self.assertLess(avg_loss_np[-1], avg_loss_np[0]) self.assertLess(avg_loss_np[-1], avg_loss_np[0])
def test_shuffle_reader(self): def test_shuffle_reader(self):
self.test_main(decorator_callback=lambda reader: fluid.layers.create_shuffle_reader(reader, buffer_size=200)) self.test_main(decorator_callback=lambda reader: fluid.layers.io.shuffle(reader, buffer_size=200))
def test_double_buffer_reader(self): def test_double_buffer_reader(self):
self.test_main(decorator_callback=lambda reader: fluid.layers.create_double_buffer_reader(reader, self.test_main(decorator_callback=lambda reader: fluid.layers.io.double_buffer(reader,
place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu')) place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu'))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册