diff --git a/benchmark/fluid/mnist.py b/benchmark/fluid/mnist.py
index 43866da9cb113e9d49fc1c51f67da94cbc6bfd8e..dc10ac2ec195acc9a5693718141ddb32417dfb71 100644
--- a/benchmark/fluid/mnist.py
+++ b/benchmark/fluid/mnist.py
@@ -139,9 +139,6 @@ def run_benchmark(model, args):
# inference program
inference_program = fluid.default_main_program().clone()
- with fluid.program_guard(inference_program):
- inference_program = fluid.io.get_inference_program(
- target_vars=[batch_acc, batch_size_tensor])
# Optimization
opt = fluid.optimizer.AdamOptimizer(
@@ -161,7 +158,7 @@ def run_benchmark(model, args):
train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=args.batch_size)
- accuracy = fluid.average.WeightedAverage()
+ accuracy = fluid.metrics.Accuracy()
iters, num_samples, start_time = 0, 0, time.time()
for pass_id in range(args.pass_num):
accuracy.reset()
@@ -184,7 +181,7 @@ def run_benchmark(model, args):
"label": y_data},
fetch_list=[avg_cost, batch_acc, batch_size_tensor]
) # The accuracy is the accumulation of batches, but not the current batch.
- accuracy.add(value=outs[1], weight=outs[2])
+ accuracy.update(value=outs[1], weight=outs[2])
iters += 1
num_samples += len(y_data)
loss = np.array(outs[0])
diff --git a/doc/v2/howto/capi/compile_paddle_lib_en.md b/doc/v2/howto/capi/compile_paddle_lib_en.md
index 11d69b9b79c1a41898d3060d3fe25a31330334a3..6212a3081116d988630706e83d2349dd200b73ab 100644
--- a/doc/v2/howto/capi/compile_paddle_lib_en.md
+++ b/doc/v2/howto/capi/compile_paddle_lib_en.md
@@ -1,3 +1,175 @@
## Install and Build
-TBD
+### Download & Install
+
+ Download the latest C-API development package from CI system and install. You can find the required version in the table below:
+
+
+### From source
+
+ Users can also compile the C-API library from PaddlePaddle source code by compiling with the following compilation options:
+
+
+
+
+Options |
+Value |
+
+
+
+
+WITH_C_API |
+ON |
+
+
+WITH_PYTHON |
+OFF(recommended) |
+
+
+WITH_SWIG_PY |
+OFF(recommended) |
+
+
+WITH_GOLANG |
+OFF(recommended) |
+
+
+WITH_GPU |
+ON/OFF |
+
+
+WITH_MKL |
+ON/OFF |
+
+
+It is best to set up with recommended values to avoid linking with unnecessary libraries. Set other compilation options as you need.
+
+Pull the latest following code snippet from github, and configure compilation options(replace PADDLE_ROOT with the installation path of the PaddlePaddle C-API inference library):
+
+```shell
+PADDLE_ROOT=/path/of/capi
+git clone https://github.com/PaddlePaddle/Paddle.git
+cd Paddle
+mkdir build
+cd build
+cmake -DCMAKE_INSTALL_PREFIX=$PADDLE_ROOT \
+ -DCMAKE_BUILD_TYPE=Release \
+ -DWITH_C_API=ON \
+ -DWITH_SWIG_PY=OFF \
+ -DWITH_GOLANG=OFF \
+ -DWITH_PYTHON=OFF \
+ -DWITH_MKL=OFF \
+ -DWITH_GPU=OFF \
+ ..
+```
+
+After running the above code to generate Makefile , run: `make && make install`. After successful compilation, the dependencies required by C-API(includes: (1)PaddlePaddle inference library and header files; (2) Third-party libraries and header files) will be stored in the `PADDLE_ROOT` directory.
+
+If the compilation is successful, see the following directory structure under `PADDLE_ROOT`(includes PaddlePaddle header files and libraries, and third-party libraries and header files(determined by the link methods if necessary)):
+
+```text
+├── include
+│ └── paddle
+│ ├── arguments.h
+│ ├── capi.h
+│ ├── capi_private.h
+│ ├── config.h
+│ ├── error.h
+│ ├── gradient_machine.h
+│ ├── main.h
+│ ├── matrix.h
+│ ├── paddle_capi.map
+│ └── vector.h
+├── lib
+│ ├── libpaddle_capi_engine.a
+│ ├── libpaddle_capi_layers.a
+│ ├── libpaddle_capi_shared.so
+│ └── libpaddle_capi_whole.a
+└── third_party
+ ├── gflags
+ │ ├── include
+ │ │ └── gflags
+ │ │ ├── gflags_completions.h
+ │ │ ├── gflags_declare.h
+ │ │ ...
+ │ └── lib
+ │ └── libgflags.a
+ ├── glog
+ │ ├── include
+ │ │ └── glog
+ │ │ ├── config.h
+ │ │ ...
+ │ └── lib
+ │ └── libglog.a
+ ├── openblas
+ │ ├── include
+ │ │ ├── cblas.h
+ │ │ ...
+ │ └── lib
+ │ ...
+ ├── protobuf
+ │ ├── include
+ │ │ └── google
+ │ │ └── protobuf
+ │ │ ...
+ │ └── lib
+ │ └── libprotobuf-lite.a
+ └── zlib
+ ├── include
+ │ ...
+ └── lib
+ ...
+
+```
+
+### Linking Description:
+
+There are three kinds of linking methods:
+
+1. Linking with dynamic library `libpaddle_capi_shared.so`(This way is much more convenient and easier, **Without special requirements, it is recommended**), refer to the following:
+ 1. Compiling with CPU version and using `OpenBLAS`; only need to link one library named `libpaddle_capi_shared.so` to develop prediction program through C-API.
+ 1. Compiling with CPU version and using `MKL` lib, you need to link MKL library directly to develop prediction program through PaddlePaddle C-API, due to `MKL` has its own dynamic library.
+ 1. Compiling with GPU version, CUDA library will be loaded dynamically on prediction program run-time, and also set CUDA library to `LD_LIBRARY_PATH` environment variable.
+
+2. Linking with static library `libpaddle_capi_whole.a`,refer to the following:
+ 1. Specify `-Wl,--whole-archive` linking options.
+ 1. Explicitly link third-party libraries such as `gflags`、`glog`、`libz`、`protobuf` .etc, you can find them under `PADDLE_ROOT/third_party` directory.
+ 1. Use OpenBLAS library if compiling C-API,must explicitly link `libopenblas.a`.
+ 1. Use MKL when compiling C-API, must explicitly link MKL dynamic library.
+
+3. Linking with static library `libpaddle_capi_layers.a` and `libpaddle_capi_engine.a`,refer to the following:
+ 1. This linking methods is mainly used for mobile prediction.
+ 1. Split `libpaddle_capi_whole.a` into two static linking library at least to reduce the size of linking libraries.
+ 1. Specify `-Wl,--whole-archive -lpaddle_capi_layers` and `-Wl,--no-whole-archive -lpaddle_capi_engine` for linking.
+ 1. The third-party dependencies need explicitly link same as method 2 above.
diff --git a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md
index bc3d50b3ffd3b703a3a656caa1f96bdcf683f68b..dee1b7554f97af17989c3f7739d8feea3b6b8e3f 100644
--- a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md
+++ b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md
@@ -1,3 +1,372 @@
-# Kubernetes Distributed
+# Distributed Training on Kubernetes
-TBD
+We introduced how to create a PaddlePaddle Job with a single node on Kuberentes in the
+previous document.
+In this article, we will introduce how to create a PaddlePaddle job with multiple nodes
+on Kubernetes cluster.
+
+## Overall Architecture
+
+Before creating a training job, the users need to slice the training data and deploy
+the Python scripts along with it into the distributed file system
+(We can use the different type of Kuberentes Volumes to mount different distributed
+file systems). Before training starts, The program will copy the training data into the
+Container and also save the models at the same path during training. The global architecture
+is as follows:
+
+![PaddlePaddle on Kubernetes Architecture](src/k8s-paddle-arch.png)
+
+The above figure describes a distributed training architecture which contains 3 nodes, each
+Pod mounts a folder of the distributed file system to save training data and models
+by Kubernetes Volume. Kubernetes created 3 Pods for this training phase and scheduled these on
+3 nodes, each Pod has a PaddlePaddle container. After the containers car created,
+PaddlePaddle starts up the communication between PServer and Trainer and read training
+data for this training job.
+
+As the description above, we can start up a PaddlePaddle distributed training job on a
+Kubernetes ready cluster with the following steps:
+
+1. [Build PaddlePaddle Docker Image](#Build a Docker Image)
+1. [Split training data and upload to the distributed file system](#Upload Training Data)
+1. [Edit a YAML file and create a Kubernetes Job](#Create a Job)
+1. [Check the output](#Check The Output)
+
+We will introduce these steps as follows:
+
+### Build a Docker Image
+
+Training docker image needs to package the paddle pserver and paddle trainer runtimes, as well as two more processes before we can kick off the training:
+
+- Copying the training data into container.
+- Generating the initialization arguments for `Paddle PServer` and `Paddle Training` processes.
+
+Since the paddlepaddle official docker image already has the runtimes we need, we'll take it as the base image and pack some additional scripts for the processes mentioned above to build our training image. for more detail, please find from the following link:
+- https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/usage/cluster/src/k8s_train/Dockerfile
+
+
+```bash
+$ cd doc/howto/usage/k8s/src/k8s_train
+$ docker build -t [YOUR_REPO]/paddle:mypaddle .
+```
+
+And then upload the new Docker Image to a Docker hub:
+
+```bash
+docker push [YOUR_REPO]/paddle:mypaddle
+```
+
+**[NOTE]**, in the above command arguments, `[YOUR_REPO]` represents your Docker repository,
+you need to use your repository instead of it. We will replace it with your respository name to
+represent the Docker Image which built in this step.
+
+### Prepare Training Data
+
+We can download and split the training job by creating a Kubernetes Job, or custom your image
+by editing [k8s_train](./src/k8s_train/).
+
+Before creating a Job, we need to bind a [persistenVolumeClaim](https://kubernetes.io/docs/user-guide/persistent-volumes) by the different type of
+the different file system, the generated dataset would be saved on this volume.
+
+```yaml
+apiVersion: batch/v1
+kind: Job
+metadata:
+ name: paddle-data
+spec:
+ template:
+ metadata:
+ name: pi
+ spec:
+ hostNetwork: true
+ containers:
+ - name: paddle-data
+ image: paddlepaddle/paddle-tutorial:k8s_data
+ imagePullPolicy: Always
+ volumeMounts:
+ - mountPath: "/mnt"
+ name: nfs
+ env:
+ - name: OUT_DIR
+ value: /home/work/mfs/paddle-cluster-job
+ - name: SPLIT_COUNT
+ value: "3"
+ volumes:
+ - name: nfs
+ persistentVolumeClaim:
+ claimName: mfs
+ restartPolicy: Never
+```
+
+Create the Job with the following command:
+
+```bash
+> kubectl create -f xxx.yaml
+```
+
+If created successfully, you can see some information like this:
+
+```base
+[root@paddle-kubernetes-node0 nfsdir]$ tree -d
+.
+`-- paddle-cluster-job
+ |-- 0
+ | `-- data
+ |-- 1
+ | `-- data
+ |-- 2
+ | `-- data
+ |-- output
+ |-- quick_start
+```
+
+The `paddle-cluster-job` above is the job name for this training job; we need 3
+PaddlePaddle training nodes and save the split training data in `paddle-cluster-job` path,
+the folder `0`, `1` and `2` represents the `training_id` on each node, `quick_start` folder is used to store training data, `output` folder is used to store the models and logs.
+
+
+### Create a Job
+
+Kubernetes allow users to create objects with YAML files, and we can use a command-line tool
+to create it.
+
+The Job YAML file describes that which Docker Image would be used in this training job, how much nodes would be created, what's the startup arguments of `Paddle PServer/Trainer` process and what's the type of Volumes. You can find the details of the YAML filed in
+[Kubernetes Job API](http://kubernetes.io/docs/api-reference/batch/v1/definitions/#_v1_job).
+The following is an example for this training job:
+
+```yaml
+apiVersion: batch/v1
+kind: Job
+metadata:
+ name: paddle-cluster-job
+spec:
+ parallelism: 3
+ completions: 3
+ template:
+ metadata:
+ name: paddle-cluster-job
+ spec:
+ volumes:
+ - name: jobpath
+ hostPath:
+ path: /home/work/mfs
+ containers:
+ - name: trainer
+ image: [YOUR_REPO]/paddle:mypaddle
+ command: ["bin/bash", "-c", "/root/start.sh"]
+ env:
+ - name: JOB_NAME
+ value: paddle-cluster-job
+ - name: JOB_PATH
+ value: /home/jobpath
+ - name: JOB_NAMESPACE
+ value: default
+ - name: TRAIN_CONFIG_DIR
+ value: recommendation
+ - name: CONF_PADDLE_NIC
+ value: eth0
+ - name: CONF_PADDLE_PORT
+ value: "7164"
+ - name: CONF_PADDLE_PORTS_NUM
+ value: "2"
+ - name: CONF_PADDLE_PORTS_NUM_SPARSE
+ value: "2"
+ - name: CONF_PADDLE_GRADIENT_NUM
+ value: "3"
+ volumeMounts:
+ - name: jobpath
+ mountPath: /home/jobpath
+ restartPolicy: Never
+```
+
+In the above YAML file:
+- `metadata.name`, The job name.
+- `parallelism`, Whether the Kubernetes Job would create `parallelism` Pods at the same time.
+- `completions`, The Job would become the success status only when the number of successful Pod(the exit code is 0)
+ is equal to `completions`.
+- `volumeMounts`, the name field `jobpath` is a key, the `mountPath` field represents
+ the path in the container, and we can define the `jobpath` in `volumes` filed, use `hostPath`
+ to configure the host path we want to mount.
+- `env`, the environment variables in the Container, we pass some startup arguments by
+ this approach, some details are as following:
+ - JOB_PATH:the mount path in the container
+ - JOB_NAME:the job name
+ - TRAIN_CONFIG_DIR:the job path in the container, we can find the training data path by
+ combine with JOB_NAME.
+ - CONF_PADDLE_NIC: the argument `--nics` of `Paddle PServer` process, the network
+ device name.
+ - CONF_PADDLE_PORT: the argument `--port` of `Paddle PServer` process.
+ - CONF_PADDLE_PORTS_NUM: the argument `--ports_num` of `Paddle PServer`, the port number
+ for dense prameter update.
+ - CONF_PADDLE_PORTS_NUM_SPARSE:the argument `--ports_num_for_sparse` of `Paddle PServer`,
+ the port number for sparse parameter update.
+ - CONF_PADDLE_GRADIENT_NUM:the number of training node, the argument
+ `--num_gradient_servers` of `Paddle PServer` and `Paddle Trainer`.
+
+You can find some details information at [here]
+(http://www.paddlepaddle.org/docs/develop/documentation/zh/howto/usage/cmd_parameter/detail_introduction_cn.html)。
+
+We can use the command-line tool of Kubernetes to create a Job when we finish the YAML file:
+
+```bash
+kubectl create -f job.yaml
+```
+
+Upon successful creation, Kubernetes would create 3 Pods as PaddlePaddle training node,
+pull the Docker image and begin to train.
+
+
+### Checkout the Output
+
+At the process of training, we can check the logs and the output models which is stored in
+the `output` folder.
+
+**NOTE**, `node_0`, `node_1` and `node_2` represent the
+`trainer_id` of the PaddlePaddle training job rather than the node id of Kubernetes.
+
+```bash
+[root@paddle-kubernetes-node0 output]# tree -d
+.
+├── node_0
+│ ├── server.log
+│ └── train.log
+├── node_1
+│ ├── server.log
+│ └── train.log
+├── node_2
+......
+├── pass-00002
+│ ├── done
+│ ├── ___embedding_0__.w0
+│ ├── ___embedding_1__.w0
+......
+```
+
+We can checkout the status of each training Pod by viewing the logs:
+
+```bash
+[root@paddle-kubernetes-node0 node_0]# cat train.log
+I1116 09:10:17.123121 50 Util.cpp:155] commandline:
+ /usr/local/bin/../opt/paddle/bin/paddle_trainer
+ --nics=eth0 --port=7164
+ --ports_num=2 --comment=paddle_process_by_paddle
+ --pservers=192.168.129.66,192.168.223.143,192.168.129.71
+ --ports_num_for_sparse=2 --config=./trainer_config.py
+ --trainer_count=4 --num_passes=10 --use_gpu=0
+ --log_period=50 --dot_period=10 --saving_period=1
+ --local=0 --trainer_id=0
+ --save_dir=/home/jobpath/paddle-cluster-job/output
+I1116 09:10:17.123440 50 Util.cpp:130] Calling runInitFunctions
+I1116 09:10:17.123764 50 Util.cpp:143] Call runInitFunctions done.
+[WARNING 2016-11-16 09:10:17,227 default_decorators.py:40] please use keyword arguments in paddle config.
+[INFO 2016-11-16 09:10:17,239 networks.py:1282] The input order is [movie_id, title, genres, user_id, gender, age, occupation, rating]
+[INFO 2016-11-16 09:10:17,239 networks.py:1289] The output order is [__square_error_cost_0__]
+I1116 09:10:17.392917 50 Trainer.cpp:170] trainer mode: Normal
+I1116 09:10:17.613910 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process
+I1116 09:10:17.680917 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process
+I1116 09:10:17.681543 50 GradientMachine.cpp:134] Initing parameters..
+I1116 09:10:18.012390 50 GradientMachine.cpp:141] Init parameters done.
+I1116 09:10:18.018641 50 ParameterClient2.cpp:122] pserver 0 192.168.129.66:7164
+I1116 09:10:18.018950 50 ParameterClient2.cpp:122] pserver 1 192.168.129.66:7165
+I1116 09:10:18.019069 50 ParameterClient2.cpp:122] pserver 2 192.168.223.143:7164
+I1116 09:10:18.019492 50 ParameterClient2.cpp:122] pserver 3 192.168.223.143:7165
+I1116 09:10:18.019716 50 ParameterClient2.cpp:122] pserver 4 192.168.129.71:7164
+I1116 09:10:18.019836 50 ParameterClient2.cpp:122] pserver 5 192.168.129.71:7165
+```
+
+## Some Additional Details
+
+### Using Environment Variables
+
+Usually we use the environment varialbes to configurate the PaddlePaddle Job which runs in
+Kubernetes, `start_paddle.py` provides a start up script to convert the environment variable
+to the start up arguments of PaddlePaddle process:
+
+```bash
+API = "/api/v1/namespaces/"
+JOBSELECTOR = "labelSelector=job-name="
+JOB_PATH = os.getenv("JOB_PATH") + "/" + os.getenv("JOB_NAME")
+JOB_PATH_OUTPUT = JOB_PATH + "/output"
+JOBNAME = os.getenv("JOB_NAME")
+NAMESPACE = os.getenv("JOB_NAMESPACE")
+PADDLE_NIC = os.getenv("CONF_PADDLE_NIC")
+PADDLE_PORT = os.getenv("CONF_PADDLE_PORT")
+PADDLE_PORTS_NUM = os.getenv("CONF_PADDLE_PORTS_NUM")
+PADDLE_PORTS_NUM_SPARSE = os.getenv("CONF_PADDLE_PORTS_NUM_SPARSE")
+PADDLE_SERVER_NUM = os.getenv("CONF_PADDLE_GRADIENT_NUM")
+```
+
+### Communication between Pods
+
+At the begin of `start_paddle.py`, it would initializes and parses the arguments.
+
+```python
+parser = argparse.ArgumentParser(prog="start_paddle.py",
+ description='simple tool for k8s')
+ args, train_args_list = parser.parse_known_args()
+ train_args = refine_unknown_args(train_args_list)
+ train_args_dict = dict(zip(train_args[:-1:2], train_args[1::2]))
+ podlist = getPodList()
+```
+
+And then query the status of all the other Pods of this Job by the function `getPodList()`, and fetch `triner_id` by the function `getIdMap(podlist)` if all the Pods status is `RUNNING`.
+
+```python
+ podlist = getPodList()
+ # need to wait until all pods are running
+ while not isPodAllRunning(podlist):
+ time.sleep(10)
+ podlist = getPodList()
+ idMap = getIdMap(podlist)
+```
+
+**NOTE**: `getPodList()` would prefetch all the Pods in the current namespace, if some
+Pods are alreay running, it may cause some error. We will use [statfulesets](https://kubernetes.io/docs/concepts/abstractions/controllers/statefulsets) instead of
+Kubernetes Pod or Replicaset in the future.
+
+The function `getIdMap(podlist)` fetches IPs addresses of `podlist` and then sort them
+to generate `trainer_id`.
+
+```python
+def getIdMap(podlist):
+ '''
+ generate tainer_id by ip
+ '''
+ ips = []
+ for pod in podlist["items"]:
+ ips.append(pod["status"]["podIP"])
+ ips.sort()
+ idMap = {}
+ for i in range(len(ips)):
+ idMap[ips[i]] = i
+ return idMap
+```
+
+After getting the `idMap`, we can generate the arguments of `Paddle PServer` and `Paddle Trainer`
+so that we can start up them by `startPaddle(idMap, train_args_dict)`.
+
+### Create Job
+
+The main goal of `startPaddle` is generating the arguments of `Paddle PServer` and
+`Paddle Trainer` processes. Take `Paddle Trainer` as an example, we parse the
+environment variable and then get `PADDLE_NIC`, `PADDLE_PORT`, `PADDLE_PORTS_NUM` and etc...,
+finally find `trainerId` from `idMap` according to its IP address.
+
+```python
+ program = 'paddle train'
+ args = " --nics=" + PADDLE_NIC
+ args += " --port=" + str(PADDLE_PORT)
+ args += " --ports_num=" + str(PADDLE_PORTS_NUM)
+ args += " --comment=" + "paddle_process_by_paddle"
+ ip_string = ""
+ for ip in idMap.keys():
+ ip_string += (ip + ",")
+ ip_string = ip_string.rstrip(",")
+ args += " --pservers=" + ip_string
+ args_ext = ""
+ for key, value in train_args_dict.items():
+ args_ext += (' --' + key + '=' + value)
+ localIP = socket.gethostbyname(socket.gethostname())
+ trainerId = idMap[localIP]
+ args += " " + args_ext + " --trainer_id=" + \
+ str(trainerId) + " --save_dir=" + JOB_PATH_OUTPUT
+```
diff --git a/paddle/fluid/framework/block_desc.cc b/paddle/fluid/framework/block_desc.cc
index fbe08349c37c4fde115ceea954ba2b84880088d7..b8847e4b909cbab67b2ddb6885b45b73d402de19 100644
--- a/paddle/fluid/framework/block_desc.cc
+++ b/paddle/fluid/framework/block_desc.cc
@@ -13,11 +13,10 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/block_desc.h"
+#include
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
-#include
-
namespace paddle {
namespace framework {
@@ -147,52 +146,7 @@ void BlockDesc::RemoveOp(size_t s, size_t e) {
if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) {
return;
}
- auto get_vars = [](std::deque>::iterator &op,
- std::vector &v) {
- auto in_names = (*op)->InputArgumentNames();
- v.insert(v.end(), in_names.begin(), in_names.end());
- auto out_names = (*op)->OutputArgumentNames();
- v.insert(v.end(), out_names.begin(), out_names.end());
- std::sort(v.begin(), v.end());
- auto last = std::unique(v.begin(), v.end());
- v.erase(last, v.end());
- };
- need_update_ = true;
-
- for (size_t i = s; i < e; i++) {
- // since remove op one by one, every time remove the first op.
- auto op = ops_.begin() + s;
-
- // collect input and output variables from current delete op
- std::vector cur_vars;
- get_vars(op, cur_vars);
-
- // remove current op
- ops_.erase(ops_.begin() + s);
-
- // collect input and output variables from other ops
- std::vector other_vars;
- for (auto it = ops_.begin(); it != ops_.end(); it++) {
- get_vars(it, other_vars);
- }
-
- // variables should be deleted
- std::vector delete_vars;
- // delete_vars = cur_vars - cur_vars ^ other_input_vars
- std::set_difference(cur_vars.begin(), cur_vars.end(), other_vars.begin(),
- other_vars.end(),
- std::inserter(delete_vars, delete_vars.end()));
- // remove variables
- for (size_t i = 0; i < delete_vars.size(); i++) {
- auto name = delete_vars[i];
- auto it = vars_.find(name);
- PADDLE_ENFORCE(it != vars_.end(),
- "%s is not in variable list, it should not be deleted",
- name);
- vars_.erase(it);
- VLOG(3) << "deleting variable " << name;
- }
- }
+ ops_.erase(ops_.begin() + s, ops_.begin() + e);
}
std::vector BlockDesc::AllOps() const {
diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc
index 8155cb55a468a09320b1196b49fc3e34cea261b1..a56674cbe216e312c4394ef537140122352dc785 100644
--- a/paddle/fluid/framework/lod_tensor.cc
+++ b/paddle/fluid/framework/lod_tensor.cc
@@ -12,9 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
-#include "paddle/fluid/framework/lod_tensor.h"
+#include
+#include
+#include
+#include
+
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/framework.pb.h"
+#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/memory/memcpy.h"
#include "paddle/fluid/memory/memory.h"
@@ -22,11 +27,6 @@ limitations under the License. */
#include "paddle/fluid/recordio/scanner.h"
#include "paddle/fluid/recordio/writer.h"
-#include
-#include
-#include
-#include
-
namespace paddle {
namespace framework {
@@ -294,7 +294,7 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor,
TensorFromStream(is, static_cast(tensor), dev_ctx);
}
-void WriteToRecordIO(recordio::Writer &writer,
+void WriteToRecordIO(recordio::Writer *writer,
const std::vector &tensor,
const platform::DeviceContext &dev_ctx) {
std::stringstream buffer;
@@ -303,18 +303,20 @@ void WriteToRecordIO(recordio::Writer &writer,
for (auto &each : tensor) {
SerializeToStream(buffer, each, dev_ctx);
}
- writer.Write(buffer.str());
+ writer->Write(buffer.str());
}
std::vector ReadFromRecordIO(
- recordio::Scanner &scanner, const platform::DeviceContext &dev_ctx) {
- std::istringstream sin(scanner.Next());
- uint32_t sz;
- sin.read(reinterpret_cast(&sz), sizeof(uint32_t));
+ recordio::Scanner *scanner, const platform::DeviceContext &dev_ctx) {
std::vector result;
- result.resize(sz);
- for (uint32_t i = 0; i < sz; ++i) {
- DeserializeFromStream(sin, &result[i], dev_ctx);
+ if (scanner->HasNext()) {
+ std::istringstream sin(scanner->Next());
+ uint32_t sz;
+ sin.read(reinterpret_cast(&sz), sizeof(uint32_t));
+ result.resize(sz);
+ for (uint32_t i = 0; i < sz; ++i) {
+ DeserializeFromStream(sin, &result[i], dev_ctx);
+ }
}
return result;
}
diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h
index 4f130d265900483ec7a7c541f2610d17a352913f..1159fee39b0737402c60448dcbe69e7535c9d6e1 100644
--- a/paddle/fluid/framework/lod_tensor.h
+++ b/paddle/fluid/framework/lod_tensor.h
@@ -15,6 +15,9 @@ limitations under the License. */
#pragma once
#include
+#include
+#include
+#include
#ifdef PADDLE_WITH_CUDA
#include
#include
@@ -216,12 +219,12 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor,
void DeserializeFromStream(std::istream& is, LoDTensor* tensor,
const platform::DeviceContext& dev_ctx);
-extern void WriteToRecordIO(recordio::Writer& writer,
+extern void WriteToRecordIO(recordio::Writer* writer,
const std::vector& tensor,
const platform::DeviceContext& dev_ctx);
extern std::vector ReadFromRecordIO(
- recordio::Scanner& scanner, const platform::DeviceContext& dev_ctx);
+ recordio::Scanner* scanner, const platform::DeviceContext& dev_ctx);
} // namespace framework
} // namespace paddle
diff --git a/paddle/fluid/framework/lod_tensor_test.cc b/paddle/fluid/framework/lod_tensor_test.cc
index e691e29383d4842b80769021e0e494967d38e9bb..97ab98f09b1a902a942d9667bc7716a28b98d54c 100644
--- a/paddle/fluid/framework/lod_tensor_test.cc
+++ b/paddle/fluid/framework/lod_tensor_test.cc
@@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include "paddle/fluid/framework/lod_tensor.h"
-
-#include "paddle/fluid/recordio/scanner.h"
-#include "paddle/fluid/recordio/writer.h"
-
#include
#include
#include
#include
#include
+#include "paddle/fluid/framework/lod_tensor.h"
+
+#include "paddle/fluid/recordio/scanner.h"
+#include "paddle/fluid/recordio/writer.h"
+
namespace paddle {
namespace framework {
@@ -240,8 +240,8 @@ TEST(LoDTensor, RecordIO) {
*platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
{
recordio::Writer writer(stream, recordio::Compressor::kSnappy);
- WriteToRecordIO(writer, {tensor, tensor}, ctx);
- WriteToRecordIO(writer, {tensor, tensor}, ctx);
+ WriteToRecordIO(&writer, {tensor, tensor}, ctx);
+ WriteToRecordIO(&writer, {tensor, tensor}, ctx);
writer.Flush();
}
@@ -254,11 +254,11 @@ TEST(LoDTensor, RecordIO) {
{
std::unique_ptr stream_ptr(stream);
recordio::Scanner scanner(std::move(stream_ptr));
- auto tensors = ReadFromRecordIO(scanner, ctx);
+ auto tensors = ReadFromRecordIO(&scanner, ctx);
ASSERT_EQ(tensors.size(), 2);
assert_tensor_ok(tensors[0]);
assert_tensor_ok(tensors[1]);
- tensors = ReadFromRecordIO(scanner, ctx);
+ tensors = ReadFromRecordIO(&scanner, ctx);
ASSERT_EQ(tensors.size(), 2);
assert_tensor_ok(tensors[0]);
assert_tensor_ok(tensors[1]);
diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc
index 74945fb4f2f745b6ca9c48adb0c8b9e6ae1e94a4..20dcc080b696431b9972c0a972904d957f9b47d8 100644
--- a/paddle/fluid/framework/parallel_executor.cc
+++ b/paddle/fluid/framework/parallel_executor.cc
@@ -115,14 +115,12 @@ void ParallelExecutor::BCastParamsToGPUs(
for (auto &var : vars) {
auto *main_var = main_scope->FindVar(var);
- if (!main_var->IsType()) {
+ if (main_var == nullptr || !main_var->IsType()) {
continue;
}
auto &main_tensor = main_var->Get();
-
auto &dims = main_tensor.dims();
-
if (paddle::platform::is_gpu_place(main_tensor.place())) {
size_t numel = main_tensor.numel();
ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
@@ -174,12 +172,17 @@ void ParallelExecutor::SplitTensorToPlaces(
const std::unordered_map &feed_tensors) {
for (auto it : feed_tensors) {
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
+ PADDLE_ENFORCE_EQ(
+ member_->places_.size(), lod_tensors.size(),
+ "The number of samples of current batch is less than the count of "
+ "devices, currently, it is not allowed. (%d vs %d)",
+ member_->places_.size(), lod_tensors.size());
for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var?
- member_->local_scopes_[j]
- ->Var(it.first)
- ->GetMutable()
- ->ShareDataWith(lod_tensors[j]);
+ auto t =
+ member_->local_scopes_[j]->Var(it.first)->GetMutable();
+ t->ShareDataWith(lod_tensors[j]);
+ t->set_lod(lod_tensors[j].lod());
}
}
}
diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc
index 56bf00e5f91700f0cffa917aad8608caaab0a7fe..76126f3dc64d71770d13f9d66bb30f176c112629 100644
--- a/paddle/fluid/framework/reader.cc
+++ b/paddle/fluid/framework/reader.cc
@@ -22,7 +22,9 @@ FileReader::FileReader(const std::vector &dims) : dims_(dims) {}
void FileReader::ReadNext(std::vector *out) {
ReadNextImpl(out);
- PADDLE_ENFORCE_EQ(out->size(), dims_.size());
+ if (out->empty()) {
+ return;
+ }
for (size_t i = 0; i < dims_.size(); ++i) {
auto &actual = out->at(i).dims();
auto &expect = dims_[i];
diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h
index 3573b99becf6d657c680c5fec0bda4bdde5dd7a2..3a413941df964c8d9454fafc6030c377c10f9fb1 100644
--- a/paddle/fluid/framework/reader.h
+++ b/paddle/fluid/framework/reader.h
@@ -14,14 +14,13 @@
#pragma once
+#include
+#include
+
#include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/platform/place.h"
-#include
-#include
-#include
-
namespace paddle {
namespace framework {
@@ -31,8 +30,6 @@ class ReaderBase {
virtual void ReInit() = 0;
- virtual bool HasNext() const = 0;
-
virtual ~ReaderBase();
};
@@ -44,8 +41,6 @@ class DecoratedReader : public ReaderBase {
void ReInit() override { reader_->ReInit(); }
- bool HasNext() const override { return reader_->HasNext(); }
-
protected:
ReaderBase* reader_;
};
@@ -80,8 +75,6 @@ class ReaderHolder {
reader_->ReInit();
}
- bool HasNext() const { return reader_->HasNext(); }
-
private:
std::unique_ptr reader_;
};
diff --git a/paddle/fluid/operators/batch_norm_op.cc b/paddle/fluid/operators/batch_norm_op.cc
index 36049ee6a4a0d2a251b6d10cf1ff05a9d9845089..c9939e8602ed341d37784ca292a55326899e8e65 100644
--- a/paddle/fluid/operators/batch_norm_op.cc
+++ b/paddle/fluid/operators/batch_norm_op.cc
@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/batch_norm_op.h"
+#include
#include "paddle/fluid/framework/data_layout.h"
namespace paddle {
diff --git a/paddle/fluid/operators/batch_norm_op.cu.cc b/paddle/fluid/operators/batch_norm_op.cu.cc
index 6ceacc39924a7558e380aaf563aaf234f1bf30a5..eecb58e11ef57b550c79c040e6933ed6e52e2e87 100644
--- a/paddle/fluid/operators/batch_norm_op.cu.cc
+++ b/paddle/fluid/operators/batch_norm_op.cu.cc
@@ -13,9 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/batch_norm_op.h"
-#include "paddle/fluid/framework/data_layout.h"
-
#include
+#include "paddle/fluid/framework/data_layout.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/cudnn_helper.h"
#include "paddle/fluid/platform/float16.h"
diff --git a/paddle/fluid/operators/batch_size_like.h b/paddle/fluid/operators/batch_size_like.h
index 0bdf27e620a3a7c7b62b955f708a5e2aad1a6986..dd51a11fbe6ad5e528197b67536518c4b31fa355 100644
--- a/paddle/fluid/operators/batch_size_like.h
+++ b/paddle/fluid/operators/batch_size_like.h
@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
-
+#include
+#include
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
diff --git a/paddle/fluid/operators/box_coder_op.h b/paddle/fluid/operators/box_coder_op.h
index 3c7cac1cd17042994287effc31a918ebd4353c4c..77fc6c2b62af42e6526b889aeef2d9bab795baec 100644
--- a/paddle/fluid/operators/box_coder_op.h
+++ b/paddle/fluid/operators/box_coder_op.h
@@ -10,6 +10,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
+#include
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
diff --git a/paddle/fluid/operators/compare_op.cc b/paddle/fluid/operators/compare_op.cc
index 9a139ab27ec53395a8d1ab1347dbce93ea68fd8e..3a6a357e81949014a70e5bae1ee0e1c8b9d0c2ce 100644
--- a/paddle/fluid/operators/compare_op.cc
+++ b/paddle/fluid/operators/compare_op.cc
@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/compare_op.h"
+#include
#include "paddle/fluid/framework/op_registry.h"
namespace paddle {
diff --git a/paddle/fluid/operators/concat_op.cc b/paddle/fluid/operators/concat_op.cc
index 0eedd8ee51ebfff6f553d8e19e97c3a45a95fa6a..d65a7b34678cda38d5f8beb9154d61928f517ce0 100644
--- a/paddle/fluid/operators/concat_op.cc
+++ b/paddle/fluid/operators/concat_op.cc
@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/concat_op.h"
+#include
#include
namespace paddle {
diff --git a/paddle/fluid/operators/cond_op.h b/paddle/fluid/operators/cond_op.h
index a04fae2182005d4eb08305e943449977bfb637f9..d3888923dbdeee122fb3045a839c0ba639b892b1 100644
--- a/paddle/fluid/operators/cond_op.h
+++ b/paddle/fluid/operators/cond_op.h
@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
+#include
#include
#include "glog/logging.h"
#include "paddle/fluid/framework/ddim.h"
diff --git a/paddle/fluid/operators/conv_transpose_op.cc b/paddle/fluid/operators/conv_transpose_op.cc
index b2a3cfc89f18eff24c941c664b1184b4485ab895..08f5939d42a41d235a94eff16cf2f558068d6aaa 100644
--- a/paddle/fluid/operators/conv_transpose_op.cc
+++ b/paddle/fluid/operators/conv_transpose_op.cc
@@ -13,6 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/conv_transpose_op.h"
+#include
+#include
namespace paddle {
namespace operators {
diff --git a/paddle/fluid/operators/conv_transpose_op.h b/paddle/fluid/operators/conv_transpose_op.h
index d4e4b641ece9ed120904ded6f8baed65a2666213..bfc0177c2a0da1627fbca532764fdae8167b6b2a 100644
--- a/paddle/fluid/operators/conv_transpose_op.h
+++ b/paddle/fluid/operators/conv_transpose_op.h
@@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
-
+#include
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/im2col.h"
diff --git a/paddle/fluid/operators/crf_decoding_op.h b/paddle/fluid/operators/crf_decoding_op.h
index 2b2a733fb9f162755e5c548fec617937d86689dd..3f5fab3b382bea97f43e4bc1b2cd436c956ba264 100644
--- a/paddle/fluid/operators/crf_decoding_op.h
+++ b/paddle/fluid/operators/crf_decoding_op.h
@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
+#include
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
diff --git a/paddle/fluid/operators/crop_op.h b/paddle/fluid/operators/crop_op.h
index c5ac6849789587f2f41588f79bd538f7b79a7478..f05c2e23284e3a24cf48442996f671ec6084c391 100644
--- a/paddle/fluid/operators/crop_op.h
+++ b/paddle/fluid/operators/crop_op.h
@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
-
+#include
+#include
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/strided_memcpy.h"
diff --git a/paddle/fluid/operators/math/math_function.cu b/paddle/fluid/operators/math/math_function.cu
index 82e12943148a806bae719c722944d6a9d5236b7c..e53183603fec54ceef68873cfd97b4b985b0d437 100644
--- a/paddle/fluid/operators/math/math_function.cu
+++ b/paddle/fluid/operators/math/math_function.cu
@@ -39,13 +39,14 @@ void gemm(
cublasOperation_t cuTransB =
(transB == CblasNoTrans) ? CUBLAS_OP_N : CUBLAS_OP_T;
- float h_alpha = static_cast(alpha);
- float h_beta = static_cast(beta);
-
// TODO(kexinzhao): add processing code for compute capability < 53 case
PADDLE_ENFORCE_GE(context.GetComputeCapability(), 53,
"cublas fp16 gemm requires GPU compute capability >= 53");
+#if CUDA_VERSION >= 8000
+ float h_alpha = static_cast(alpha);
+ float h_beta = static_cast(beta);
+
cublasGemmAlgo_t algo = CUBLAS_GEMM_DFALT;
#if CUDA_VERSION >= 9000
if (context.GetComputeCapability() >= 70) {
@@ -56,7 +57,7 @@ void gemm(
PADDLE_ENFORCE(platform::dynload::cublasSetMathMode(context.cublas_handle(),
CUBLAS_DEFAULT_MATH));
}
-#endif
+#endif // CUDA_VERSION >= 9000
// cublasHgemm does true FP16 computation which is slow for non-Volta
// GPUs. So use cublasGemmEx instead which does pesudo FP16 computation:
@@ -66,6 +67,18 @@ void gemm(
context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, B,
CUDA_R_16F, ldb, A, CUDA_R_16F, lda, &h_beta, C, CUDA_R_16F, N,
CUDA_R_32F, algo));
+#else
+ // CUDA 7.5 does not support cublasGemmEx, hence we fall back to use hgemm
+ const half h_alpha = static_cast(alpha);
+ const half h_beta = static_cast(beta);
+ const half* h_A = reinterpret_cast(A);
+ const half* h_B = reinterpret_cast(B);
+ half* h_C = reinterpret_cast(C);
+
+ PADDLE_ENFORCE(platform::dynload::cublasHgemm(
+ context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, h_B, ldb,
+ h_A, lda, &h_beta, h_C, N));
+#endif // CUDA_VERSION >= 8000
}
template <>
diff --git a/paddle/fluid/operators/read_op.cc b/paddle/fluid/operators/read_op.cc
index 2925b8a85da1b0d19672124e49c8fd22c8b4e6bf..bf02b9958927580608b95d6b8ecfddc7231a02d4 100644
--- a/paddle/fluid/operators/read_op.cc
+++ b/paddle/fluid/operators/read_op.cc
@@ -66,13 +66,7 @@ class ReadOp : public framework::OperatorBase {
std::vector out_arg_names = Outputs("Out");
std::vector ins;
reader->ReadNext(&ins);
- if (ins.empty()) {
- reader->ReInit();
- reader->ReadNext(&ins);
- PADDLE_ENFORCE(
- !ins.empty(),
- "Reader can not read the next data even it has been re-initialized.");
- }
+ PADDLE_ENFORCE(!ins.empty(), "There is no next data.");
PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size());
for (size_t i = 0; i < ins.size(); ++i) {
auto* out =
diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt
index 6fa0195b9ae103418beb56cc4b0fa9ab59e93108..845528860f91d0b479bb3c4dbbe05e32c68dc16f 100644
--- a/paddle/fluid/operators/reader/CMakeLists.txt
+++ b/paddle/fluid/operators/reader/CMakeLists.txt
@@ -22,5 +22,6 @@ reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc)
reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc)
reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc)
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
+reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc)
# Export local libraries to parent
set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE)
diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc
index ed868786ab2a80efa42574ed4f579c633ce0becf..33a50b5cebc1f65ccf9a00280f0eeadd00982555 100644
--- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc
+++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc
@@ -63,13 +63,14 @@ class DoubleBufferReader : public framework::DecoratedReader {
StartPrefetcher();
}
- bool HasNext() const override;
void ReadNext(std::vector* out) override;
void ReInit() override;
~DoubleBufferReader() { EndPrefetcher(); }
private:
+ bool HasNext() const;
+
void StartPrefetcher() {
channel_ = framework::MakeChannel- (kChannelSize);
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
@@ -109,7 +110,9 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
auto place_str = Attr("place");
platform::Place place;
- if (place_str == "CPU") {
+ if (place_str == "AUTO") {
+ place = dev_place;
+ } else if (place_str == "CPU") {
place = platform::CPUPlace();
} else {
std::istringstream sin(place_str);
@@ -140,28 +143,22 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
enum_range.insert(string::Sprintf("CUDA:%d", i));
}
enum_range.insert("CPU");
- AddAttr("place", "The double buffer place, default is CPU")
- .SetDefault("CPU")
+ enum_range.insert("AUTO");
+ AddAttr("place", "The double buffer place")
+ .SetDefault("AUTO")
.InEnum({enum_range});
}
};
-bool DoubleBufferReader::HasNext() const {
- while (!channel_->IsClosed() && !channel_->CanReceive()) {
- }
- return channel_->CanReceive();
-}
-
void DoubleBufferReader::ReadNext(std::vector* out) {
- if (!HasNext()) {
- PADDLE_THROW("There is no next data!");
- }
-
- Item batch;
- channel_->Receive(&batch);
- *out = batch.payloads_;
- if (batch.ctx_) {
- batch.ctx_->Wait();
+ out->clear();
+ if (HasNext()) {
+ Item batch;
+ channel_->Receive(&batch);
+ *out = batch.payloads_;
+ if (batch.ctx_) {
+ batch.ctx_->Wait();
+ }
}
}
@@ -171,16 +168,26 @@ void DoubleBufferReader::ReInit() {
StartPrefetcher();
}
+bool DoubleBufferReader::HasNext() const {
+ while (!channel_->IsClosed() && !channel_->CanReceive()) {
+ }
+ return channel_->CanReceive();
+}
+
void DoubleBufferReader::PrefetchThreadFunc() {
VLOG(5) << "A new prefetch thread starts.";
std::vector> cpu_tensor_cache(kCacheSize);
std::vector> gpu_tensor_cache(kCacheSize);
size_t cached_tensor_id = 0;
- while (reader_->HasNext()) {
+ while (true) {
Item batch;
auto& cpu_batch = cpu_tensor_cache[cached_tensor_id];
reader_->ReadNext(&cpu_batch);
+ if (cpu_batch.empty()) {
+ // The underlying reader have no next data.
+ break;
+ }
if (platform::is_gpu_place(place_)) {
auto& gpu_batch = gpu_tensor_cache[cached_tensor_id];
auto* gpu_ctx = ctxs_[cached_tensor_id].get();
diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc
index b72ccc77a3e1ec30fd817471d3ffd667974ae684..0573345ba502b6a9af35710840d5acf7634f332f 100644
--- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc
+++ b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc
@@ -25,22 +25,12 @@ class MultiPassReader : public framework::DecoratedReader {
: DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {}
void ReadNext(std::vector* out) override {
- if (!HasNext()) {
- PADDLE_THROW("There is no next data!");
- }
reader_->ReadNext(out);
- }
-
- bool HasNext() const override {
- if (reader_->HasNext()) {
- return true;
- } else {
+ if (out->empty()) {
++pass_count_;
- if (pass_count_ >= pass_num_) {
- return false;
- } else {
+ if (pass_count_ < pass_num_) {
reader_->ReInit();
- return true;
+ reader_->ReadNext(out);
}
}
}
diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc
index 95d8674c08b63e872926ff8708d0c734da33684c..d1cb8e47da70cab784858caea7e791151fc104dd 100644
--- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc
+++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc
@@ -52,8 +52,6 @@ class RandomDataGenerator : public framework::ReaderBase {
void ReInit() override { return; }
- bool HasNext() const override { return true; }
-
private:
float min_;
float max_;
@@ -74,7 +72,7 @@ class CreateRandomDataGeneratorOp : public framework::OperatorBase {
const auto& ranks = Attr>("ranks");
PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty());
PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0),
- int(shape_concat.size()),
+ static_cast(shape_concat.size()),
"The accumulate of all ranks should be equal to the "
"shape concat's length.");
std::vector shapes = RestoreShapes(shape_concat, ranks);
diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc
index adaa0b9e5f1ffcfbf3e9cd8fd060153575f270a6..2ae29725561769ebe6428002c9983246b8eec724 100644
--- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc
+++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc
@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include
-#include
#include "paddle/fluid/operators/reader/reader_op_registry.h"
#include "paddle/fluid/recordio/scanner.h"
@@ -35,17 +33,15 @@ class RecordIOFileReader : public framework::FileReader {
LOG(INFO) << "Creating file reader" << filename;
}
- bool HasNext() const override { return scanner_.HasNext(); }
-
void ReInit() override { scanner_.Reset(); }
protected:
void ReadNextImpl(std::vector* out) override {
if (ThreadSafe) {
std::lock_guard guard(*mutex_);
- *out = framework::ReadFromRecordIO(scanner_, dev_ctx_);
+ *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_);
} else {
- *out = framework::ReadFromRecordIO(scanner_, dev_ctx_);
+ *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_);
}
}
@@ -66,7 +62,7 @@ class CreateRecordIOReaderOp : public framework::OperatorBase {
const auto& ranks = Attr>("ranks");
PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty());
PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0),
- int(shape_concat.size()),
+ static_cast(shape_concat.size()),
"The accumulate of all ranks should be equal to the "
"shape concat's length.");
std::string filename = Attr("filename");
diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc
index b164ce232d6bea7b4ff0c67ee0a7dd83b14f61a2..13825d65913be95f4f444bd9d5271a036ec8b1e2 100644
--- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc
+++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc
@@ -30,35 +30,33 @@ class ShuffleReader : public framework::DecoratedReader {
std::random_device device;
seed_ = device();
}
- ReadIntoBuffers();
+ ReloadBuffer();
}
void ReadNext(std::vector* out) override {
- if (!HasNext()) {
- PADDLE_THROW("There is no next data!");
- }
+ out->clear();
if (iteration_pos_ >= buffer_.size()) {
VLOG(10) << "Resetting shuffle buffer";
- ReadIntoBuffers();
+ ReloadBuffer();
+ if (buffer_.empty()) {
+ return;
+ }
}
*out = buffer_[iteration_pos_++];
}
- bool HasNext() const override {
- return iteration_pos_ < buffer_.size() || reader_->HasNext();
- }
-
private:
- void ReadIntoBuffers() {
+ void ReloadBuffer() {
buffer_.clear();
buffer_.reserve(buffer_size_);
iteration_pos_ = 0;
for (size_t i = 0; i < buffer_size_; ++i) {
- if (!reader_->HasNext()) {
+ std::vector ins;
+ reader_->ReadNext(&ins);
+ if (ins.empty()) {
break;
}
- buffer_.emplace_back();
- reader_->ReadNext(&buffer_.back());
+ buffer_.emplace_back(ins);
}
std::mt19937 g(seed_);
std::shuffle(buffer_.begin(), buffer_.end(), g);
diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc
new file mode 100644
index 0000000000000000000000000000000000000000..cbf709d5e734c0f2adf3735dc28043c1340349da
--- /dev/null
+++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc
@@ -0,0 +1,94 @@
+// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "paddle/fluid/operators/detail/safe_ref.h"
+#include "paddle/fluid/operators/reader/reader_op_registry.h"
+
+namespace paddle {
+namespace operators {
+namespace reader {
+
+class ThreadedReader : public framework::DecoratedReader {
+ public:
+ ThreadedReader(ReaderBase* reader, bool safe_mode)
+ : DecoratedReader(reader), safe_mode_(safe_mode) {}
+
+ void ReadNext(std::vector* out) override {
+ std::lock_guard lock(mutex_);
+ reader_->ReadNext(out);
+ }
+
+ void ReInit() override {
+ if (safe_mode_) {
+ PADDLE_THROW(
+ "ThreadedReader::ReInit() is disabled when 'safe_mode' is true.");
+ }
+ VLOG(5) << "ThreadedReader::ReInit() is invoked! It might be buggy in "
+ "multi-thread environment.";
+ reader_->ReInit();
+ }
+
+ private:
+ bool safe_mode_;
+ std::mutex mutex_;
+};
+
+class CreateThreadedReaderOp : public framework::OperatorBase {
+ public:
+ using framework::OperatorBase::OperatorBase;
+
+ private:
+ void RunImpl(const framework::Scope& scope,
+ const platform::Place& dev_place) const override {
+ auto* out = detail::Ref(scope.FindVar(Output("Out")))
+ .GetMutable();
+ if (out->Get() != nullptr) {
+ return;
+ }
+ const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
+ ->Get();
+ bool safe_mode = Attr("safe_mode");
+ out->Reset(new ThreadedReader(underlying_reader.Get(), safe_mode));
+ }
+};
+
+class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase {
+ public:
+ CreateThreadedReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker)
+ : DecoratedReaderMakerBase(op_proto, op_checker) {
+ AddAttr("safe_mode",
+ "When 'safe_mode' is true, 'ReInit()' is disabled to avoid "
+ "unexpected bugs in multi-thread environment.")
+ .SetDefault(true);
+ AddComment(R"DOC(
+ CreateThreadedReader Operator
+
+ This operator creates a threaded reader. A threaded reader's
+ 'ReadNext()' can be invoked by several threads at the same
+ time.
+ When the attribute 'safe_mode' is true, the threaded reader's
+ 'ReInit()' is disabled to avoid unexpected bugs in multi-thread
+ environment.
+ )DOC");
+ }
+};
+
+} // namespace reader
+} // namespace operators
+} // namespace paddle
+
+namespace reader = paddle::operators::reader;
+REGISTER_DECORATED_READER_OPERATOR(create_threaded_reader,
+ reader::CreateThreadedReaderOp,
+ reader::CreateThreadedReaderOpMaker);
diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc
index eacedeea8835d27b712b287824b9d30b03ebebbf..779dc8a6a0deb7792e0540071e3a2588102fa708 100644
--- a/paddle/fluid/operators/reader/open_files_op.cc
+++ b/paddle/fluid/operators/reader/open_files_op.cc
@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include // NOLINT
+
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
@@ -19,38 +21,23 @@ namespace paddle {
namespace operators {
namespace reader {
-class MultipleReader : public framework::ReaderBase {
+class MultiFileReader : public framework::ReaderBase {
public:
- class ThreadBufferMap {
- public:
- std::vector& operator[](
- const std::thread::id& thread_id) {
- std::lock_guard lock(mutex_);
- return buffer_[thread_id];
- }
-
- void Clear() { buffer_.clear(); }
-
- private:
- std::mutex mutex_;
- std::unordered_map>
- buffer_;
- };
-
- MultipleReader(const std::vector& file_names,
- const std::vector& dims, size_t thread_num)
- : file_names_(file_names), dims_(dims) {
+ MultiFileReader(const std::vector& file_names,
+ const std::vector& dims, size_t thread_num,
+ size_t buffer_size)
+ : file_names_(file_names), dims_(dims), buffer_size_(buffer_size) {
prefetchers_.resize(thread_num);
StartNewScheduler();
}
void ReadNext(std::vector* out) override;
- bool HasNext() const override;
void ReInit() override;
- ~MultipleReader() { EndScheduler(); }
+ ~MultiFileReader() { EndScheduler(); }
private:
+ bool HasNext();
void StartNewScheduler();
void EndScheduler();
void ScheduleThreadFunc();
@@ -60,39 +47,36 @@ class MultipleReader : public framework::ReaderBase {
std::vector dims_;
std::thread scheduler_;
std::vector prefetchers_;
+ size_t buffer_size_;
framework::Channel* waiting_file_idx_;
framework::Channel* available_thread_idx_;
framework::Channel>* buffer_;
- mutable ThreadBufferMap thread_buffer_map_;
};
-void MultipleReader::ReadNext(std::vector* out) {
- if (!HasNext()) {
- PADDLE_THROW("There is no next data!");
+void MultiFileReader::ReadNext(std::vector* out) {
+ out->clear();
+ if (HasNext()) {
+ buffer_->Receive(out);
}
- auto& thread_local_buffer = thread_buffer_map_[std::this_thread::get_id()];
- *out = thread_local_buffer;
- thread_local_buffer.clear();
-}
-
-bool MultipleReader::HasNext() const {
- auto& thread_local_buffer = thread_buffer_map_[std::this_thread::get_id()];
- return thread_local_buffer.empty() ? buffer_->Receive(&thread_local_buffer)
- : true;
}
-void MultipleReader::ReInit() {
+void MultiFileReader::ReInit() {
EndScheduler();
- thread_buffer_map_.Clear();
StartNewScheduler();
}
-void MultipleReader::StartNewScheduler() {
+bool MultiFileReader::HasNext() {
+ while (!buffer_->IsClosed() && !buffer_->CanReceive()) {
+ }
+ return buffer_->CanReceive();
+}
+
+void MultiFileReader::StartNewScheduler() {
size_t thread_num = prefetchers_.size();
waiting_file_idx_ = framework::MakeChannel(file_names_.size());
available_thread_idx_ = framework::MakeChannel(thread_num);
buffer_ =
- framework::MakeChannel>(thread_num);
+ framework::MakeChannel>(buffer_size_);
for (size_t i = 0; i < file_names_.size(); ++i) {
waiting_file_idx_->Send(&i);
@@ -105,7 +89,7 @@ void MultipleReader::StartNewScheduler() {
scheduler_ = std::thread([this] { ScheduleThreadFunc(); });
}
-void MultipleReader::EndScheduler() {
+void MultiFileReader::EndScheduler() {
available_thread_idx_->Close();
buffer_->Close();
waiting_file_idx_->Close();
@@ -117,8 +101,8 @@ void MultipleReader::EndScheduler() {
delete waiting_file_idx_;
}
-void MultipleReader::ScheduleThreadFunc() {
- VLOG(5) << "MultipleReader schedule thread starts.";
+void MultiFileReader::ScheduleThreadFunc() {
+ VLOG(5) << "MultiFileReader schedule thread starts.";
size_t completed_thread_num = 0;
size_t thread_idx;
while (available_thread_idx_->Receive(&thread_idx)) {
@@ -150,17 +134,20 @@ void MultipleReader::ScheduleThreadFunc() {
p.join();
}
}
- VLOG(5) << "MultipleReader schedule thread terminates.";
+ VLOG(5) << "MultiFileReader schedule thread terminates.";
}
-void MultipleReader::PrefetchThreadFunc(std::string file_name,
- size_t thread_idx) {
+void MultiFileReader::PrefetchThreadFunc(std::string file_name,
+ size_t thread_idx) {
VLOG(5) << "The prefetch thread of file '" << file_name << "' starts.";
std::unique_ptr reader =
CreateReaderByFileName(file_name, dims_);
- while (reader->HasNext()) {
+ while (true) {
std::vector ins;
reader->ReadNext(&ins);
+ if (ins.empty()) {
+ break;
+ }
try {
buffer_->Send(&ins);
} catch (paddle::platform::EnforceNotMet e) {
@@ -197,11 +184,13 @@ class OpenFilesOp : public framework::OperatorBase {
const auto& file_names = Attr>("file_names");
PADDLE_ENFORCE(!file_names.empty(), "No file to be read!");
const size_t thread_num = Attr("thread_num");
+ const size_t buffer_size = Attr("buffer_size");
auto* out = scope.FindVar(Output("Out"))
->template GetMutable();
- out->Reset(new MultipleReader(
- file_names, RestoreShapes(shape_concat, ranks), thread_num));
+ out->Reset(new MultiFileReader(file_names,
+ RestoreShapes(shape_concat, ranks),
+ thread_num, buffer_size));
}
};
@@ -212,11 +201,12 @@ class OpenFilesOpMaker : public FileReaderMakerBase {
AddAttr>("file_names", "Files to be read.");
AddAttr("thread_num", "The maximal concurrent prefetch thread number.")
.GreaterThan(0);
+ AddAttr("buffer_size", "The size of prefetch buffer.").GreaterThan(0);
AddComment(R"DOC(
OpenFiles Operator
- An OpenFilesOp creates a MultipleReader, which is able to
+ An OpenFilesOp creates a MultiFileReader, which is able to
read data multi-threaded from multiple files.
)DOC");
}
diff --git a/paddle/fluid/platform/cuda_helper.h b/paddle/fluid/platform/cuda_helper.h
index 881d611d4ac26f992036f639097815aff625227b..8758af0804ae08fec6fa64d7387f197f046ce20e 100644
--- a/paddle/fluid/platform/cuda_helper.h
+++ b/paddle/fluid/platform/cuda_helper.h
@@ -33,22 +33,26 @@ constexpr int PADDLE_CUDA_NUM_THREADS = 512;
USE_CUDA_ATOMIC(Add, float);
USE_CUDA_ATOMIC(Add, int);
USE_CUDA_ATOMIC(Add, unsigned int);
-USE_CUDA_ATOMIC(Add, unsigned long long int);
+// CUDA API uses unsigned long long int, we cannot use uint64_t here.
+// It because unsigned long long int is not necessarily uint64_t
+USE_CUDA_ATOMIC(Add, unsigned long long int); // NOLINT
CUDA_ATOMIC_WRAPPER(Add, int64_t) {
- static_assert(sizeof(int64_t) == sizeof(long long int),
+ // Here, we check long long int must be int64_t.
+ static_assert(sizeof(int64_t) == sizeof(long long int), // NOLINT
"long long should be int64");
- return CudaAtomicAdd(reinterpret_cast(address),
- static_cast(val));
+ return CudaAtomicAdd(
+ reinterpret_cast(address), // NOLINT
+ static_cast(val)); // NOLINT
}
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 600
USE_CUDA_ATOMIC(Add, double);
#else
CUDA_ATOMIC_WRAPPER(Add, double) {
- unsigned long long int* address_as_ull =
- reinterpret_cast(address);
- unsigned long long int old = *address_as_ull, assumed;
+ unsigned long long int* address_as_ull = // NOLINT
+ reinterpret_cast(address); // NOLINT
+ unsigned long long int old = *address_as_ull, assumed; // NOLINT
do {
assumed = old;
diff --git a/paddle/fluid/platform/dynload/cublas.cc b/paddle/fluid/platform/dynload/cublas.cc
index eb541579a136de2a84ecc9773e0c312b405f7e86..361d3439b844e9f68d3fba0a0e41ec457118a4a9 100644
--- a/paddle/fluid/platform/dynload/cublas.cc
+++ b/paddle/fluid/platform/dynload/cublas.cc
@@ -28,6 +28,10 @@ CUBLAS_BLAS_ROUTINE_EACH(DEFINE_WRAP);
CUBLAS_BLAS_ROUTINE_EACH_R2(DEFINE_WRAP);
#endif
+#ifdef CUBLAS_BLAS_ROUTINE_EACH_R3
+CUBLAS_BLAS_ROUTINE_EACH_R3(DEFINE_WRAP);
+#endif
+
} // namespace dynload
} // namespace platform
} // namespace paddle
diff --git a/paddle/fluid/platform/dynload/cublas.h b/paddle/fluid/platform/dynload/cublas.h
index a41018d350e89881888d5e31089c2b9ecd76f6c0..1ab55d6b9bf8fdbd14c9c2bd978e3e99dba3e73e 100644
--- a/paddle/fluid/platform/dynload/cublas.h
+++ b/paddle/fluid/platform/dynload/cublas.h
@@ -71,7 +71,6 @@ extern void *cublas_dso_handle;
__macro(cublasDgemm_v2); \
__macro(cublasHgemm); \
__macro(cublasSgemmEx); \
- __macro(cublasGemmEx); \
__macro(cublasSgeam_v2); \
__macro(cublasDgeam_v2); \
__macro(cublasCreate_v2); \
@@ -83,11 +82,6 @@ extern void *cublas_dso_handle;
__macro(cublasDgemmBatched); \
__macro(cublasCgemmBatched); \
__macro(cublasZgemmBatched); \
- __macro(cublasSgemmStridedBatched); \
- __macro(cublasDgemmStridedBatched); \
- __macro(cublasCgemmStridedBatched); \
- __macro(cublasZgemmStridedBatched); \
- __macro(cublasHgemmStridedBatched); \
__macro(cublasSgetrfBatched); \
__macro(cublasSgetriBatched); \
__macro(cublasDgetrfBatched); \
@@ -95,10 +89,24 @@ extern void *cublas_dso_handle;
CUBLAS_BLAS_ROUTINE_EACH(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
+// APIs available after CUDA 8.0
+#if CUDA_VERSION >= 8000
+#define CUBLAS_BLAS_ROUTINE_EACH_R2(__macro) \
+ __macro(cublasGemmEx); \
+ __macro(cublasSgemmStridedBatched); \
+ __macro(cublasDgemmStridedBatched); \
+ __macro(cublasCgemmStridedBatched); \
+ __macro(cublasZgemmStridedBatched); \
+ __macro(cublasHgemmStridedBatched);
+
+CUBLAS_BLAS_ROUTINE_EACH_R2(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
+#endif
+
// APIs available after CUDA 9.0
#if CUDA_VERSION >= 9000
-#define CUBLAS_BLAS_ROUTINE_EACH_R2(__macro) __macro(cublasSetMathMode);
-CUBLAS_BLAS_ROUTINE_EACH_R2(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
+#define CUBLAS_BLAS_ROUTINE_EACH_R3(__macro) __macro(cublasSetMathMode);
+
+CUBLAS_BLAS_ROUTINE_EACH_R3(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP)
#endif
#undef DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP
diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h
index 29990043206509e4192bfff84832f09ef127d9dd..ca9ab2c7aecff47924f0198802d710b7661f5576 100644
--- a/paddle/fluid/platform/nccl_helper.h
+++ b/paddle/fluid/platform/nccl_helper.h
@@ -14,8 +14,9 @@
#pragma once
-#include
+#include // NOLINT
#include
+#include
#include "paddle/fluid/platform/dynload/nccl.h"
#include "paddle/fluid/platform/enforce.h"
@@ -29,6 +30,8 @@ inline ncclDataType_t ToNCCLDataType(std::type_index type) {
return ncclDouble;
} else if (type == typeid(int)) { // NOLINT
return ncclInt;
+ } else if (type == typeid(int64_t)) { // NOLINT
+ return ncclInt64;
} else {
PADDLE_THROW("Not supported");
}
@@ -58,7 +61,7 @@ struct NCCLContext {
ncclComm_t comm_;
explicit NCCLContext(int dev_id)
- : ctx_(new CUDADeviceContext(CUDAPlace(dev_id))) {}
+ : ctx_(new CUDADeviceContext(CUDAPlace(dev_id))), comm_{nullptr} {}
cudaStream_t stream() const { return ctx_->stream(); }
@@ -66,23 +69,23 @@ struct NCCLContext {
return boost::get(ctx_->GetPlace()).device;
}
- static void InitNCCLContext(std::unordered_map &contexts,
+ static void InitNCCLContext(std::unordered_map *contexts,
const std::vector &places) {
std::vector comms;
std::vector devs;
- comms.resize(contexts.size());
- devs.reserve(contexts.size());
+ comms.resize(contexts->size());
+ devs.reserve(contexts->size());
for (auto &p : places) {
devs.push_back(boost::get(p).device);
}
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
- &comms[0], static_cast(contexts.size()), &devs[0]));
+ &comms[0], static_cast(contexts->size()), &devs[0]));
int i = 0;
for (auto &dev_id : devs) {
- contexts.at(dev_id).comm_ = comms[i++];
+ contexts->at(dev_id).comm_ = comms[i++];
}
}
};
@@ -91,7 +94,8 @@ struct NCCLContextMap {
std::unordered_map contexts_;
std::vector order_;
- NCCLContextMap(const std::vector &places) {
+ explicit NCCLContextMap(const std::vector &places) {
+ PADDLE_ENFORCE(!places.empty());
order_.reserve(places.size());
for (auto &p : places) {
int dev_id = boost::get(p).device;
@@ -102,15 +106,17 @@ struct NCCLContextMap {
order_.size(), contexts_.size(),
"NCCL Context Map does not support contain two or more same device");
- std::vector comms;
- comms.resize(order_.size());
+ if (places.size() > 1) {
+ std::vector comms;
+ comms.resize(order_.size());
- PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
- &comms[0], static_cast(order_.size()), &order_[0]));
+ PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
+ &comms[0], static_cast(order_.size()), &order_[0]));
- int i = 0;
- for (auto &dev_id : order_) {
- contexts_.at(dev_id).comm_ = comms[i++];
+ int i = 0;
+ for (auto &dev_id : order_) {
+ contexts_.at(dev_id).comm_ = comms[i++];
+ }
}
}
diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc
index 392404045578489014f2283b885c388d5a4586cf..f8cdbac98192d7f23ca907e1f2d38aa6e791437e 100644
--- a/paddle/fluid/pybind/pybind.cc
+++ b/paddle/fluid/pybind/pybind.cc
@@ -252,7 +252,6 @@ All parameter, weight, gradient are variables in Paddle.
py::return_value_policy::reference);
py::class_(m, "Reader", "")
- .def("has_next", &framework::ReaderHolder::HasNext)
.def("reset", &framework::ReaderHolder::ReInit);
py::class_(m, "Scope", "")
diff --git a/paddle/fluid/pybind/recordio.cc b/paddle/fluid/pybind/recordio.cc
index 0644d91425af1a1ac9363b1dec9e317689331fcb..330d104e0a774d905e463566f85bd2e64a080190 100644
--- a/paddle/fluid/pybind/recordio.cc
+++ b/paddle/fluid/pybind/recordio.cc
@@ -39,7 +39,7 @@ class RecordIOWriter {
void CompleteAppendTensor() {
auto& ctx =
*platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
- framework::WriteToRecordIO(writer_, tensors_, ctx);
+ framework::WriteToRecordIO(&writer_, tensors_, ctx);
tensors_.clear();
}
diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py
index a5a3884750cce8cf19b92f1e5f131b50a18d3c97..f757411b853bacb9e03fc42fa2ef6593c3cde00f 100644
--- a/python/paddle/fluid/__init__.py
+++ b/python/paddle/fluid/__init__.py
@@ -29,6 +29,7 @@ import optimizer
import backward
import regularizer
import average
+import metrics
from param_attr import ParamAttr, WeightNormParamAttr
from data_feeder import DataFeeder
from core import LoDTensor, CPUPlace, CUDAPlace, CUDAPinnedPlace
diff --git a/python/paddle/fluid/average.py b/python/paddle/fluid/average.py
index ded6eb085968343fcdc9f6e4b8353c08408df426..6abe8233b07c484494848c566e9898600a7d8f5c 100644
--- a/python/paddle/fluid/average.py
+++ b/python/paddle/fluid/average.py
@@ -13,6 +13,7 @@
# limitations under the License.
import numpy as np
+import warnings
"""
Class of all kinds of Average.
@@ -22,6 +23,8 @@ import numpy as np
wrappers of Python functions.
"""
+__all__ = ["WeightedAverage"]
+
def _is_number_(var):
return isinstance(var, int) or isinstance(var, float) or (isinstance(
@@ -34,6 +37,9 @@ def _is_number_or_matrix_(var):
class WeightedAverage(object):
def __init__(self):
+ warnings.warn(
+ "The %s is deprecated, please use fluid.metrics.Accuracy instead." %
+ (self.__class__.__name__), Warning)
self.reset()
def reset(self):
diff --git a/python/paddle/fluid/evaluator.py b/python/paddle/fluid/evaluator.py
index 19e5b61b0b32aba3fe1e7805704a3740e3854fc8..13475025b5c2a759779066f9d511ed8a786118d5 100644
--- a/python/paddle/fluid/evaluator.py
+++ b/python/paddle/fluid/evaluator.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import warnings
import numpy as np
import layers
@@ -59,6 +60,9 @@ class Evaluator(object):
"""
def __init__(self, name, **kwargs):
+ warnings.warn(
+ "The %s is deprecated, because maintain a modified program inside evaluator cause bug easily, please use fluid.metrics.%s instead."
+ % (self.__class__.__name__, self.__class__.__name__), Warning)
self.states = []
self.metrics = []
self.helper = LayerHelper(name, **kwargs)
diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py
index 33cf6918178ff746a6b130af0e23a69de0f532fe..793421a22fbf6f3c25ec6a9bf8359f4e71e905de 100644
--- a/python/paddle/fluid/framework.py
+++ b/python/paddle/fluid/framework.py
@@ -818,6 +818,11 @@ class Block(object):
del self.vars[name]
self.sync_with_cpp()
+ def remove_var(self, name):
+ self.sync_with_cpp()
+ self.desc.remove_var(name)
+ del self.vars[name]
+
def create_parameter(self, *args, **kwargs):
global_block = self.program.global_block()
param = Parameter(global_block, *args, **kwargs)
@@ -838,6 +843,11 @@ class Block(object):
self.ops.insert(index, op)
return op
+ def remove_op(self, index):
+ self.sync_with_cpp()
+ self.desc.remove_op(index, index + 1)
+ del self.ops[index]
+
def delete_ops(self, ops):
# remove from cpp
# FIXME(typhoonzero): remove only the first occurrence.
@@ -846,6 +856,7 @@ class Block(object):
end = list(self.ops).index(ops[-1])
except Exception, e:
raise e
+
self.desc.remove_op(start, end + 1)
def slice_ops(self, start, end):
diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py
index 969398bda4cfd0b2f5e39f45d34a1da9b216901f..e7d6c4e2521bee133c4794ed1db669b02fc2152b 100644
--- a/python/paddle/fluid/layers/io.py
+++ b/python/paddle/fluid/layers/io.py
@@ -21,8 +21,7 @@ from ..executor import global_scope
__all__ = [
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file',
- 'open_files', 'read_file', 'create_shuffle_reader',
- 'create_double_buffer_reader', 'create_multi_pass_reader'
+ 'open_files', 'read_file', 'shuffle', 'double_buffer'
]
@@ -237,13 +236,9 @@ def monkey_patch_reader_methods(reader):
var = scope.find_var(reader.name)
return var.get_reader()
- def eof():
- return not __get_reader__().has_next()
-
def reset():
return __get_reader__().reset()
- reader.eof = eof
reader.reset = reset
reader.stop_gradient = True
reader.persistable = True
@@ -283,7 +278,42 @@ def _copy_reader_create_op_(block, op):
return new_op
-def open_recordio_file(filename, shapes, lod_levels, dtypes):
+def open_recordio_file(filename,
+ shapes,
+ lod_levels,
+ dtypes,
+ pass_num=1,
+ for_parallel=False):
+ """
+ Open a RecordIO file
+
+ This layer takes a RecordIO file to read from and returns a Reader Variable.
+ Via the Reader Variable, we can get data from the given RecordIO file.
+
+ Args:
+ filename(str): The RecordIO file's name.
+ shapes(list): List of tuples which declaring data shapes.
+ lod_levels(list): List of ints which declaring data lod_level.
+ dtypes(list): List of strs which declaring data type.
+ pass_num(int): Number of passes to run.
+ for_parallel(Bool): Set it as True if you are going to run
+ subsequent operators in parallel.
+
+ Returns:
+ Variable: A Reader Variable via which we can get RecordIO file data.
+
+ Examples:
+ .. code-block:: python
+
+ reader = fluid.layers.io.open_recordio_file(
+ filename='./data.recordio',
+ shapes=[(3,224,224), (1)],
+ lod_levels=[0, 0],
+ dtypes=['float32', 'int64'])
+
+ # Via the reader, we can use 'read_file' layer to get data:
+ image, label = fluid.layers.read_file(reader)
+ """
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []
@@ -310,10 +340,63 @@ def open_recordio_file(filename, shapes, lod_levels, dtypes):
startup_var.persistable = True
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)
+
+ if pass_num > 1:
+ main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num)
+
+ if for_parallel:
+ main_prog_var = parallel(reader=main_prog_var)
+
return monkey_patch_reader_methods(main_prog_var)
-def open_files(filenames, thread_num, shapes, lod_levels, dtypes):
+def open_files(filenames,
+ shapes,
+ lod_levels,
+ dtypes,
+ thread_num,
+ buffer_size=None,
+ pass_num=1,
+ for_parallel=False):
+ """
+ Open files
+
+ This layer takes a list of files to read from and returns a Reader Variable.
+ Via the Reader Variable, we can get data from given files. All files must
+ have name suffixs to indicate their formats, e.g., '*.recordio'.
+
+ Args:
+ filenames(list): The list of file names.
+ shapes(list): List of tuples which declaring data shapes.
+ lod_levels(list): List of ints which declaring data lod_level.
+ dtypes(list): List of strs which declaring data type.
+ thread_num(int): The maximal concurrent prefetch thread number.
+ buffer_size(int): The size of prefetch buffer.
+ pass_num(int): Number of passes to run.
+ for_parallel(Bool): Set it as True if you are going to run
+ subsequent operators in parallel.
+
+ Returns:
+ Variable: A Reader Variable via which we can get file data.
+
+ Examples:
+ .. code-block:: python
+
+ reader = fluid.layers.io.open_files(filenames=['./data1.recordio',
+ './data2.recordio'],
+ shapes=[(3,224,224), (1)],
+ lod_levels=[0, 0],
+ dtypes=['float32', 'int64'],
+ thread_num=2,
+ buffer_size=2)
+
+ # Via the reader, we can use 'read_file' layer to get data:
+ image, label = fluid.layers.io.read_file(reader)
+ """
+ if buffer_size is None:
+ buffer_size = thread_num
+ if isinstance(filenames, basestring):
+ filenames = [filenames]
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []
@@ -322,29 +405,36 @@ def open_files(filenames, thread_num, shapes, lod_levels, dtypes):
shape_concat.extend(shape)
ranks.append(len(shape))
- var_name = unique_name('multiple_reader')
-
+ multi_file_reader_name = unique_name('multi_file_reader')
startup_blk = default_startup_program().current_block()
- startup_var = startup_blk.create_var(name=var_name)
+ startup_reader = startup_blk.create_var(name=multi_file_reader_name)
startup_blk.append_op(
type='open_files',
- outputs={'Out': [startup_var]},
+ outputs={'Out': [startup_reader]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks,
'file_names': filenames,
- 'thread_num': thread_num
+ 'thread_num': thread_num,
+ 'buffer_size': buffer_size
})
- startup_var.desc.set_dtypes(dtypes)
- startup_var.persistable = True
- main_prog_var = _copy_reader_var_(default_main_program().current_block(),
- startup_var)
- return monkey_patch_reader_methods(main_prog_var)
+ startup_reader.desc.set_dtypes(dtypes)
+ startup_reader.persistable = True
+ main_prog_reader = _copy_reader_var_(default_main_program().current_block(),
+ startup_reader)
+ if pass_num > 1:
+ main_prog_reader = multi_pass(
+ reader=main_prog_reader, pass_num=pass_num)
+
+ if for_parallel:
+ main_prog_reader = parallel(reader=main_prog_reader)
+
+ return monkey_patch_reader_methods(main_prog_reader)
-def __create_decorated_reader__(op_type, reader, attrs):
+def __create_shared_decorated_reader__(op_type, reader, attrs):
var_name = unique_name(op_type)
startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=var_name)
@@ -360,22 +450,41 @@ def __create_decorated_reader__(op_type, reader, attrs):
return monkey_patch_reader_methods(main_prog_var)
-def create_shuffle_reader(reader, buffer_size):
- return __create_decorated_reader__('create_shuffle_reader', reader,
- {'buffer_size': int(buffer_size)})
+def __create_unshared_decorated_reader__(op_type, reader, attrs):
+ new_reader_name = unique_name(op_type)
+ main_blk = default_main_program().current_block()
+ new_reader = main_blk.create_var(name=new_reader_name)
+ main_blk.append_op(
+ type=op_type,
+ inputs={'UnderlyingReader': reader},
+ outputs={'Out': [new_reader]},
+ attrs=attrs)
+ new_reader.persistable = True
+ new_reader.stop_gradient = True
+ return monkey_patch_reader_methods(new_reader)
+
+
+def shuffle(reader, buffer_size):
+ return __create_unshared_decorated_reader__(
+ 'create_shuffle_reader', reader, {'buffer_size': int(buffer_size)})
-def create_double_buffer_reader(reader, place=None):
+def double_buffer(reader, place=None):
attrs = dict()
if place is not None:
attrs['place'] = str(place).upper()
- return __create_decorated_reader__('create_double_buffer_reader', reader,
- attrs)
+ return __create_unshared_decorated_reader__('create_double_buffer_reader',
+ reader, attrs)
+
+
+def multi_pass(reader, pass_num):
+ return __create_shared_decorated_reader__(
+ 'create_multi_pass_reader', reader, {'pass_num': int(pass_num)})
-def create_multi_pass_reader(reader, pass_num):
- return __create_decorated_reader__('create_multi_pass_reader', reader,
- {'pass_num': int(pass_num)})
+def parallel(reader):
+ return __create_shared_decorated_reader__('create_threaded_reader', reader,
+ {})
def read_file(file_obj):
diff --git a/python/paddle/fluid/layers/metric.py b/python/paddle/fluid/layers/metric.py
index 3d9157ad4ef9381b70b4007c5bdca91f1482b427..f66dccfa2d040ea0a9d29daeaa1d2da640525959 100644
--- a/python/paddle/fluid/layers/metric.py
+++ b/python/paddle/fluid/layers/metric.py
@@ -15,12 +15,13 @@
All layers just related to metric.
"""
+import warnings
from ..layer_helper import LayerHelper
from ..initializer import Normal, Constant
from ..framework import Variable
from ..param_attr import ParamAttr
-__all__ = ['accuracy']
+__all__ = ['accuracy', 'auc']
def accuracy(input, label, k=1, correct=None, total=None):
@@ -55,3 +56,37 @@ def accuracy(input, label, k=1, correct=None, total=None):
"Total": [total],
})
return acc_out
+
+
+def auc(input, label, curve='ROC', num_thresholds=200):
+ warnings.warn(
+ "This interface not recommended, fluid.layers.auc compute the auc at every minibatch, \
+ but can not aggregate them and get the pass AUC, because pass \
+ auc can not be averaged with weighted from the minibatch auc value. \
+ Please use fluid.metrics.Auc, it can compute the auc value via Python natively, \
+ which can get every minibatch and every pass auc value.", Warning)
+ helper = LayerHelper("auc", **locals())
+ topk_out = helper.create_tmp_variable(dtype=input.dtype)
+ topk_indices = helper.create_tmp_variable(dtype="int64")
+ helper.append_op(
+ type="top_k",
+ inputs={"X": [input]},
+ outputs={"Out": [topk_out],
+ "Indices": [topk_indices]},
+ attrs={"k": k})
+ auc_out = helper.create_tmp_variable(dtype="float32")
+ if correct is None:
+ correct = helper.create_tmp_variable(dtype="int64")
+ if total is None:
+ total = helper.create_tmp_variable(dtype="int64")
+ helper.append_op(
+ type="accuracy",
+ inputs={
+ "Out": [topk_out],
+ "Indices": [topk_indices],
+ "Label": [label]
+ },
+ attrs={"curve": curve,
+ "num_thresholds": num_thresholds},
+ outputs={"AUC": [auc_out], })
+ return auc_out
diff --git a/python/paddle/fluid/metrics.py b/python/paddle/fluid/metrics.py
new file mode 100644
index 0000000000000000000000000000000000000000..99a81c1d4244b919a53dfec36fc5a6659c10adae
--- /dev/null
+++ b/python/paddle/fluid/metrics.py
@@ -0,0 +1,378 @@
+# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Fluid Metrics
+
+The metrics are accomplished via Python natively.
+"""
+import numpy as np
+import copy
+import warnings
+
+__all__ = [
+ 'MetricBase',
+ 'CompositeMetric',
+ 'Accuracy',
+ 'ChunkEvaluator',
+ 'EditDistance',
+ 'DetectionMAP',
+ 'Auc',
+]
+
+
+def _is_numpy_(var):
+ return isinstance(var, (np.ndarray, np.generic))
+
+
+def _is_number_(var):
+ return isinstance(var, int) or isinstance(var, float) or (isinstance(
+ var, np.ndarray) and var.shape == (1, ))
+
+
+def _is_number_or_matrix_(var):
+ return _is_number_(var) or isinstance(var, np.ndarray)
+
+
+class MetricBase(object):
+ """
+ Base Class for all evaluators
+
+ Args:
+ name(str): The name of evaluator. such as, "accuracy". Used for generate
+ temporary variable name.
+ Interface:
+ Note(*) : the states is the attributes who not has _ prefix.
+
+ get_config(): print current states and configuration
+ reset(): clear the states. If the Metrics states type is not (int, float, np.ndarray),
+ Please override this method.
+ update(): update states at every minibatch
+ eval(): get metric evaluation in numpy type.
+ """
+
+ def __init__(self, name, **kwargs):
+ self._name = str(name) if name != None else self.__class__.__name__
+ self._kwargs = kwargs if kwargs != None else dict()
+ self.reset()
+
+ def __str__(self):
+ return self._name
+
+ def reset(self):
+ """
+ states is the attributes who not has _ prefix.
+ reset the states of metrics.
+ """
+ states = {
+ attr: value
+ for attr, value in self.__dict__.iteritems()
+ if not attr.startswith("_")
+ }
+ for attr, value in states.iteritems():
+ if isinstance(value, int):
+ setattr(self, attr, 0)
+ elif isinstance(value, float):
+ setattr(self, attr, .0)
+ elif isinstance(value, (np.ndarray, np.generic)):
+ setattr(self, attr, np.zeros_like(value))
+ else:
+ setattr(self, attr, None)
+
+ def get_config(self):
+ states = {
+ attr: value
+ for attr, value in self.__dict__.iteritems()
+ if not attr.startswith("_")
+ }
+ config = copy.deepcopy(self._kwargs)
+ config.update({"name": self._name, "states": copy.deepcopy(states)})
+ return config
+
+ def update(self):
+ raise NotImplementedError()
+
+ def eval(self):
+ raise NotImplementedError()
+
+
+class CompositeMetric(MetricBase):
+ """
+ Compute multiple metrics in each minibatch.
+ for example, merge F1, accuracy, recall into one Metric.
+ """
+
+ def __init__(self, name=None, **kwargs):
+ super(CompositeMetric, self).__init__(name, kwargs)
+ self._metrics = []
+
+ def add_metric(self, metric):
+ if not isinstance(metric, MetricBase):
+ raise ValueError("SubMetric should be inherit from MetricBase.")
+ self._metrics.append(metric)
+
+ def eval(self):
+ ans = []
+ for m in self._metrics:
+ ans.append(m.eval())
+ return ans
+
+
+class Accuracy(MetricBase):
+ """
+ Accumulate the accuracy from minibatches and compute the average accuracy
+ for every pass.
+
+ Args:
+ name: the metrics name
+
+ Example:
+ minibatch_accuracy = fluid.layers.accuracy(pred, label)
+ accuracy_evaluator = fluid.metrics.Accuracy()
+ for epoch in PASS_NUM:
+ accuracy_evaluator.reset()
+ for data in batches:
+ loss = exe.run(fetch_list=[cost, minibatch_accuracy])
+ accuracy_evaluator.update(value=minibatch_accuracy, weight=batches)
+ accuracy = accuracy_evaluator.eval()
+ """
+
+ def __init__(self, name=None):
+ super(Accuracy, self).__init__(name)
+ self.value = .0
+ self.weight = .0
+
+ def update(self, value, weight):
+ if not _is_number_or_matrix_(value):
+ raise ValueError(
+ "The 'value' must be a number(int, float) or a numpy ndarray.")
+ if not _is_number_(weight):
+ raise ValueError("The 'weight' must be a number(int, float).")
+ self.value += value * weight
+ self.weight += weight
+
+ def eval(self):
+ if self.weight == 0:
+ raise ValueError(
+ "There is no data in Accuracy Metrics. Please check layers.accuracy output has added to Accuracy."
+ )
+ return self.value / self.weight
+
+
+class ChunkEvalutor(MetricBase):
+ """
+ Accumulate counter numbers output by chunk_eval from mini-batches and
+ compute the precision recall and F1-score using the accumulated counter
+ numbers.
+ """
+
+ def __init__(self, name=None):
+ super(ChunkEvalutor, self).__init__(name)
+ self.num_infer_chunks = 0
+ self.num_label_chunks = 0
+ self.num_correct_chunks = 0
+
+ def update(self, num_infer_chunks, num_label_chunks, num_correct_chunks):
+ if not _is_number_or_matrix_(num_infer_chunks):
+ raise ValueError(
+ "The 'num_infer_chunks' must be a number(int, float) or a numpy ndarray."
+ )
+ if not _is_number_or_matrix_(num_label_chunks):
+ raise ValueError(
+ "The 'num_label_chunks' must be a number(int, float) or a numpy ndarray."
+ )
+ if not _is_number_or_matrix_(num_correct_chunks):
+ raise ValueError(
+ "The 'num_correct_chunks' must be a number(int, float) or a numpy ndarray."
+ )
+ self.num_infer_chunks += num_infer_chunks
+ self.num_label_chunks += num_label_chunks
+ self.num_correct_chunks += num_correct_chunks
+
+ def eval(self):
+ precision = float(
+ self.num_correct_chunks
+ ) / self.num_infer_chunks if self.num_infer_chunks else 0
+ recall = float(self.num_correct_chunks
+ ) / self.num_label_chunks if self.num_label_chunks else 0
+ f1_score = float(2 * precision * recall) / (
+ precision + recall) if self.num_correct_chunks else 0
+ return precision, recall, f1_score
+
+
+class EditDistance(MetricBase):
+ """
+ Accumulate edit distance sum and sequence number from mini-batches and
+ compute the average edit_distance and instance error of all batches.
+
+ Args:
+ name: the metrics name
+
+ Example:
+ edit_distance_metrics = fluid.layers.edit_distance(input, label)
+ distance_evaluator = fluid.metrics.EditDistance()
+ for epoch in PASS_NUM:
+ distance_evaluator.reset()
+ for data in batches:
+ loss = exe.run(fetch_list=[cost] + list(edit_distance_metrics))
+ distance_evaluator.update(*edit_distance_metrics)
+ distance, instance_error = distance_evaluator.eval()
+
+ In the above example:
+ 'distance' is the average of the edit distance in a pass.
+ 'instance_error' is the instance error rate in a pass.
+
+ """
+
+ def __init__(self, name):
+ super(EditDistance, self).__init__(name)
+ self.total_distance = .0
+ self.seq_num = 0
+ self.instance_error = 0
+
+ def update(self, distances, seq_num):
+ if not _is_numpy_(distances):
+ raise ValueError("The 'distances' must be a numpy ndarray.")
+ if not _is_number_(seq_num):
+ raise ValueError("The 'seq_num' must be a number(int, float).")
+ seq_right_count = np.sum(distances == 0)
+ total_distance = np.sum(distances)
+ self.seq_num += seq_num
+ self.instance_error += seq_num - seq_right_count
+ self.total_distance += total_distance
+
+ def eval():
+ if self.seq_num == 0:
+ raise ValueError(
+ "There is no data in EditDistance Metric. Please check layers.edit_distance output has been added to EditDistance."
+ )
+ avg_distance = self.total_distance / self.seq_num
+ avg_instance_error = self.instance_error / self.seq_num
+ return avg_distance, avg_instance_error
+
+
+class DetectionMAP(MetricBase):
+ """
+ Calculate the detection mean average precision (mAP).
+
+ TODO (Dang Qingqing): update the following doc.
+ The general steps are as follows:
+ 1. calculate the true positive and false positive according to the input
+ of detection and labels.
+ 2. calculate mAP value, support two versions: '11 point' and 'integral'.
+
+ Please get more information from the following articles:
+ https://sanchom.wordpress.com/tag/average-precision/
+ https://arxiv.org/abs/1512.02325
+ """
+
+ def __init__(self, name=None):
+ super(DetectionMAP, self).__init__(name)
+ # the current map value
+ self.value = .0
+
+ def update(self, value, weight):
+ if not _is_number_or_matrix_(value):
+ raise ValueError(
+ "The 'value' must be a number(int, float) or a numpy ndarray.")
+ if not _is_number_(weight):
+ raise ValueError("The 'weight' must be a number(int, float).")
+ self.value += value
+ self.weight += weight
+
+ def eval(self):
+ if self.weight == 0:
+ raise ValueError(
+ "There is no data in DetectionMAP Metrics. "
+ "Please check layers.detection_map output has added to DetectionMAP."
+ )
+ return self.value / self.weight
+
+
+class Auc(MetricBase):
+ """
+ Auc Metrics which adapts to binary classification.
+ Need to note that auc metrics compute the value via Python natively.
+ If you concern the speed, please use the fluid.layers.auc instead.
+
+ The `auc` function creates four local variables, `true_positives`,
+ `true_negatives`, `false_positives` and `false_negatives` that are used to
+ compute the AUC. To discretize the AUC curve, a linearly spaced set of
+ thresholds is used to compute pairs of recall and precision values. The area
+ under the ROC-curve is therefore computed using the height of the recall
+ values by the false positive rate, while the area under the PR-curve is the
+ computed using the height of the precision values by the recall.
+
+ Args:
+ name: metric name
+ curve: Specifies the name of the curve to be computed, 'ROC' [default] or
+ 'PR' for the Precision-Recall-curve.
+ num_thresholds: The number of thresholds to use when discretizing the roc
+ curve.
+
+ "NOTE: only implement the ROC curve type via Python now."
+ """
+
+ def __init__(self, name, curve='ROC', num_thresholds=200):
+ super(MetricBase, self).__init__(name, curve, num_thresholds)
+ self._curve = curve
+ self._num_thresholds = num_thresholds
+ self._epsilon = 1e-6
+ self.tp_list = np.ndarray((num_thresholds, ))
+ self.fn_list = np.ndarray((num_thresholds, ))
+ self.tn_list = np.ndarray((num_thresholds, ))
+ self.fp_list = np.ndarray((num_thresholds, ))
+
+ def update(self, labels, predictions, axis=1):
+ if not _is_numpy_(labels):
+ raise ValueError("The 'labels' must be a numpy ndarray.")
+ if not _is_numpy_(predictions):
+ raise ValueError("The 'predictions' must be a numpy ndarray.")
+
+ kepsilon = 1e-7 # to account for floating point imprecisions
+ thresholds = [(i + 1) * 1.0 / (num_thresholds - 1)
+ for i in range(num_thresholds - 2)]
+ thresholds = [0.0 - kepsilon] + thresholds + [1.0 + kepsilon]
+
+ # caculate TP, FN, TN, FP count
+ for idx_thresh, thresh in enumerate(thresholds):
+ tp, fn, tn, fp = 0, 0, 0, 0
+ for i, lbl in enumerate(labels):
+ if lbl:
+ if predictions[i, 0] >= thresh:
+ tp += 1
+ else:
+ fn += 1
+ else:
+ if predictions[i, 0] >= thresh:
+ fp += 1
+ else:
+ tn += 1
+ tp_list[idx_thresh] += tp
+ fn_list[idx_thresh] += fn
+ tn_list[idx_thresh] += tn
+ fp_list[idx_thresh] += fp
+
+ def eval(self):
+ epsilon = self._epsilon
+ num_thresholds = self._num_thresholds
+ tpr = (tp_list.astype("float32") + epsilon) / (
+ tp_list + fn_list + epsilon)
+ fpr = fp_list.astype("float32") / (fp_list + tn_list + epsilon)
+ rec = (tp_list.astype("float32") + epsilon) / (
+ tp_list + fp_list + epsilon)
+
+ x = fpr[:num_thresholds - 1] - fpr[1:]
+ y = (tpr[:num_thresholds - 1] + tpr[1:]) / 2.0
+ auc_value = np.sum(x * y)
+ return auc_value
diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py
index b93f2f974ca28cfd8d03c0dbbf1d401620a15e53..24dfa6144ae9584f1678e662716da123352430dd 100644
--- a/python/paddle/fluid/parallel_executor.py
+++ b/python/paddle/fluid/parallel_executor.py
@@ -87,7 +87,8 @@ class ParallelExecutor(object):
# performance. Worth tunning for other models in the future.
num_threads = len(self._places)
else:
- min(len(self._places) * 2, multiprocessing.cpu_count())
+ num_threads = min(
+ len(self._places) * 2, multiprocessing.cpu_count())
main = main_program
main = main if main else framework.default_main_program()
diff --git a/python/paddle/fluid/tests/unittests/test_multiple_reader.py b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py
similarity index 91%
rename from python/paddle/fluid/tests/unittests/test_multiple_reader.py
rename to python/paddle/fluid/tests/unittests/test_multi_file_reader.py
index a60a5d6c4af2b6b3652d0fe2089018b9403eee25..5dc41e54d6158787eb966333c894e378b5c706d0 100644
--- a/python/paddle/fluid/tests/unittests/test_multiple_reader.py
+++ b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py
@@ -61,8 +61,12 @@ class TestMultipleReader(unittest.TestCase):
exe.run(fluid.default_startup_program())
batch_count = 0
- while not data_files.eof():
- img_val, = exe.run(fetch_list=[img])
+ while True:
+ try:
+ img_val, = exe.run(fetch_list=[img])
+ except fluid.core.EnforceNotMet as ex:
+ self.assertIn("There is no next data.", ex.message)
+ break
batch_count += 1
self.assertLessEqual(img_val.shape[0], self.batch_size)
data_files.reset()
diff --git a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py
index 0b7a29075939a548320185947b5afa7261029d49..1471843ded7a42432a84a9fad76bb97dcf7fb9c2 100644
--- a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py
+++ b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py
@@ -44,7 +44,7 @@ class TestMultipleReader(unittest.TestCase):
shapes=[(-1, 784), (-1, 1)],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
- data_file = fluid.layers.create_multi_pass_reader(
+ data_file = fluid.layers.io.multi_pass(
reader=data_file, pass_num=self.pass_num)
img, label = fluid.layers.read_file(data_file)
@@ -57,8 +57,12 @@ class TestMultipleReader(unittest.TestCase):
exe.run(fluid.default_startup_program())
batch_count = 0
- while not data_file.eof():
- img_val, = exe.run(fetch_list=[img])
+ while True:
+ try:
+ img_val, = exe.run(fetch_list=[img])
+ except fluid.core.EnforceNotMet as ex:
+ self.assertIn("There is no next data.", ex.message)
+ break
batch_count += 1
self.assertLessEqual(img_val.shape[0], self.batch_size)
data_file.reset()
diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py
index 8401716db88ef3dda68644a052d78b4476c9fdc7..3c00f708f08b6637acd731d23a5b9eb4eed12d2a 100644
--- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py
+++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py
@@ -26,11 +26,14 @@ def simple_fc_net(use_feed):
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
else:
- reader = fluid.layers.open_recordio_file(
- filename='./mnist.recordio',
+ reader = fluid.layers.open_files(
+ filenames=['./mnist.recordio'],
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
- dtypes=['float32', 'int64'])
+ dtypes=['float32', 'int64'],
+ thread_num=1,
+ for_parallel=True)
+ reader = fluid.layers.io.double_buffer(reader)
img, label = fluid.layers.read_file(reader)
hidden = img
for _ in xrange(4):
@@ -51,11 +54,14 @@ def fc_with_batchnorm(use_feed):
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
else:
- reader = fluid.layers.open_recordio_file(
- filename='./mnist.recordio',
+ reader = fluid.layers.open_files(
+ filenames=['mnist.recordio'],
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
- dtypes=['float32', 'int64'])
+ dtypes=['float32', 'int64'],
+ thread_num=1,
+ for_parallel=True)
+ reader = fluid.layers.io.double_buffer(reader)
img, label = fluid.layers.read_file(reader)
hidden = img
diff --git a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
index f98a8bbc68a4315df3ae761f2e52b8f11cb620c6..3f9059fb5b31cd009c068ccddc9a8938adae5772 100644
--- a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
+++ b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
@@ -201,24 +201,6 @@ class TestBlockDesc(unittest.TestCase):
op1.set_type("test")
op2.set_type("test")
- var0 = block.var("var0")
- var1 = block.var("var1")
- var2 = block.var("var2")
- var3 = block.var("var3")
- var4 = block.var("var4")
- var5 = block.var("var5")
-
- op0.set_input("X", ["var0"])
- op0.set_output("Y", ["var0"])
- op1.set_input("X", ["var1", "var2"])
- op1.set_output("Y", ["var3", "var4"])
- op2.set_input("X", ["var1"])
- op2.set_output("Y", ["var4", "var5"])
-
- program.sync_with_cpp()
-
- # remove op1, its input var2 and output var3 will be removed at the same time,
- # but its input var1 and output var4 will not be removed since they are used for op2.
block.remove_op(1, 2)
program.sync_with_cpp()
@@ -226,8 +208,6 @@ class TestBlockDesc(unittest.TestCase):
for idx in xrange(0, block.op_size()):
all_ops.append(block.op(idx))
self.assertEqual(all_ops, [op0, op2])
- all_vars = block.all_vars()
- self.assertEqual(set(all_vars), {var0, var1, var4, var5})
if __name__ == '__main__':
diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py
index 24a0074d9b9621d902d12eb8cb29d9b65be22ed3..7c8e7f634fdd3ee3f056a95df774402a7c29e906 100644
--- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py
+++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py
@@ -65,8 +65,13 @@ class TestRecordIO(unittest.TestCase):
# train a pass
batch_id = 0
- while not data_file.eof():
- tmp, = exe.run(fetch_list=[avg_loss])
+ while True:
+ try:
+ tmp, = exe.run(fetch_list=[avg_loss])
+ except fluid.core.EnforceNotMet as ex:
+ self.assertIn("There is no next data.", ex.message)
+ break
+
avg_loss_np.append(tmp)
batch_id += 1
data_file.reset()
@@ -74,8 +79,8 @@ class TestRecordIO(unittest.TestCase):
self.assertLess(avg_loss_np[-1], avg_loss_np[0])
def test_shuffle_reader(self):
- self.test_main(decorator_callback=lambda reader: fluid.layers.create_shuffle_reader(reader, buffer_size=200))
+ self.test_main(decorator_callback=lambda reader: fluid.layers.io.shuffle(reader, buffer_size=200))
def test_double_buffer_reader(self):
- self.test_main(decorator_callback=lambda reader: fluid.layers.create_double_buffer_reader(reader,
+ self.test_main(decorator_callback=lambda reader: fluid.layers.io.double_buffer(reader,
place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu'))