diff --git a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md index bc3d50b3ffd3b703a3a656caa1f96bdcf683f68b..dee1b7554f97af17989c3f7739d8feea3b6b8e3f 100644 --- a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md +++ b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md @@ -1,3 +1,372 @@ -# Kubernetes Distributed +# Distributed Training on Kubernetes -TBD +We introduced how to create a PaddlePaddle Job with a single node on Kuberentes in the +previous document. +In this article, we will introduce how to create a PaddlePaddle job with multiple nodes +on Kubernetes cluster. + +## Overall Architecture + +Before creating a training job, the users need to slice the training data and deploy +the Python scripts along with it into the distributed file system +(We can use the different type of Kuberentes Volumes to mount different distributed +file systems). Before training starts, The program will copy the training data into the +Container and also save the models at the same path during training. The global architecture +is as follows: + +![PaddlePaddle on Kubernetes Architecture](src/k8s-paddle-arch.png) + +The above figure describes a distributed training architecture which contains 3 nodes, each +Pod mounts a folder of the distributed file system to save training data and models +by Kubernetes Volume. Kubernetes created 3 Pods for this training phase and scheduled these on +3 nodes, each Pod has a PaddlePaddle container. After the containers car created, +PaddlePaddle starts up the communication between PServer and Trainer and read training +data for this training job. + +As the description above, we can start up a PaddlePaddle distributed training job on a +Kubernetes ready cluster with the following steps: + +1. [Build PaddlePaddle Docker Image](#Build a Docker Image) +1. [Split training data and upload to the distributed file system](#Upload Training Data) +1. [Edit a YAML file and create a Kubernetes Job](#Create a Job) +1. [Check the output](#Check The Output) + +We will introduce these steps as follows: + +### Build a Docker Image + +Training docker image needs to package the paddle pserver and paddle trainer runtimes, as well as two more processes before we can kick off the training: + +- Copying the training data into container. +- Generating the initialization arguments for `Paddle PServer` and `Paddle Training` processes. + +Since the paddlepaddle official docker image already has the runtimes we need, we'll take it as the base image and pack some additional scripts for the processes mentioned above to build our training image. for more detail, please find from the following link: +- https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/usage/cluster/src/k8s_train/Dockerfile + + +```bash +$ cd doc/howto/usage/k8s/src/k8s_train +$ docker build -t [YOUR_REPO]/paddle:mypaddle . +``` + +And then upload the new Docker Image to a Docker hub: + +```bash +docker push [YOUR_REPO]/paddle:mypaddle +``` + +**[NOTE]**, in the above command arguments, `[YOUR_REPO]` represents your Docker repository, +you need to use your repository instead of it. We will replace it with your respository name to +represent the Docker Image which built in this step. + +### Prepare Training Data + +We can download and split the training job by creating a Kubernetes Job, or custom your image +by editing [k8s_train](./src/k8s_train/). + +Before creating a Job, we need to bind a [persistenVolumeClaim](https://kubernetes.io/docs/user-guide/persistent-volumes) by the different type of +the different file system, the generated dataset would be saved on this volume. + +```yaml +apiVersion: batch/v1 +kind: Job +metadata: + name: paddle-data +spec: + template: + metadata: + name: pi + spec: + hostNetwork: true + containers: + - name: paddle-data + image: paddlepaddle/paddle-tutorial:k8s_data + imagePullPolicy: Always + volumeMounts: + - mountPath: "/mnt" + name: nfs + env: + - name: OUT_DIR + value: /home/work/mfs/paddle-cluster-job + - name: SPLIT_COUNT + value: "3" + volumes: + - name: nfs + persistentVolumeClaim: + claimName: mfs + restartPolicy: Never +``` + +Create the Job with the following command: + +```bash +> kubectl create -f xxx.yaml +``` + +If created successfully, you can see some information like this: + +```base +[root@paddle-kubernetes-node0 nfsdir]$ tree -d +. +`-- paddle-cluster-job + |-- 0 + | `-- data + |-- 1 + | `-- data + |-- 2 + | `-- data + |-- output + |-- quick_start +``` + +The `paddle-cluster-job` above is the job name for this training job; we need 3 +PaddlePaddle training nodes and save the split training data in `paddle-cluster-job` path, +the folder `0`, `1` and `2` represents the `training_id` on each node, `quick_start` folder is used to store training data, `output` folder is used to store the models and logs. + + +### Create a Job + +Kubernetes allow users to create objects with YAML files, and we can use a command-line tool +to create it. + +The Job YAML file describes that which Docker Image would be used in this training job, how much nodes would be created, what's the startup arguments of `Paddle PServer/Trainer` process and what's the type of Volumes. You can find the details of the YAML filed in +[Kubernetes Job API](http://kubernetes.io/docs/api-reference/batch/v1/definitions/#_v1_job). +The following is an example for this training job: + +```yaml +apiVersion: batch/v1 +kind: Job +metadata: + name: paddle-cluster-job +spec: + parallelism: 3 + completions: 3 + template: + metadata: + name: paddle-cluster-job + spec: + volumes: + - name: jobpath + hostPath: + path: /home/work/mfs + containers: + - name: trainer + image: [YOUR_REPO]/paddle:mypaddle + command: ["bin/bash", "-c", "/root/start.sh"] + env: + - name: JOB_NAME + value: paddle-cluster-job + - name: JOB_PATH + value: /home/jobpath + - name: JOB_NAMESPACE + value: default + - name: TRAIN_CONFIG_DIR + value: recommendation + - name: CONF_PADDLE_NIC + value: eth0 + - name: CONF_PADDLE_PORT + value: "7164" + - name: CONF_PADDLE_PORTS_NUM + value: "2" + - name: CONF_PADDLE_PORTS_NUM_SPARSE + value: "2" + - name: CONF_PADDLE_GRADIENT_NUM + value: "3" + volumeMounts: + - name: jobpath + mountPath: /home/jobpath + restartPolicy: Never +``` + +In the above YAML file: +- `metadata.name`, The job name. +- `parallelism`, Whether the Kubernetes Job would create `parallelism` Pods at the same time. +- `completions`, The Job would become the success status only when the number of successful Pod(the exit code is 0) + is equal to `completions`. +- `volumeMounts`, the name field `jobpath` is a key, the `mountPath` field represents + the path in the container, and we can define the `jobpath` in `volumes` filed, use `hostPath` + to configure the host path we want to mount. +- `env`, the environment variables in the Container, we pass some startup arguments by + this approach, some details are as following: + - JOB_PATH:the mount path in the container + - JOB_NAME:the job name + - TRAIN_CONFIG_DIR:the job path in the container, we can find the training data path by + combine with JOB_NAME. + - CONF_PADDLE_NIC: the argument `--nics` of `Paddle PServer` process, the network + device name. + - CONF_PADDLE_PORT: the argument `--port` of `Paddle PServer` process. + - CONF_PADDLE_PORTS_NUM: the argument `--ports_num` of `Paddle PServer`, the port number + for dense prameter update. + - CONF_PADDLE_PORTS_NUM_SPARSE:the argument `--ports_num_for_sparse` of `Paddle PServer`, + the port number for sparse parameter update. + - CONF_PADDLE_GRADIENT_NUM:the number of training node, the argument + `--num_gradient_servers` of `Paddle PServer` and `Paddle Trainer`. + +You can find some details information at [here] +(http://www.paddlepaddle.org/docs/develop/documentation/zh/howto/usage/cmd_parameter/detail_introduction_cn.html)。 + +We can use the command-line tool of Kubernetes to create a Job when we finish the YAML file: + +```bash +kubectl create -f job.yaml +``` + +Upon successful creation, Kubernetes would create 3 Pods as PaddlePaddle training node, +pull the Docker image and begin to train. + + +### Checkout the Output + +At the process of training, we can check the logs and the output models which is stored in +the `output` folder. + +**NOTE**, `node_0`, `node_1` and `node_2` represent the +`trainer_id` of the PaddlePaddle training job rather than the node id of Kubernetes. + +```bash +[root@paddle-kubernetes-node0 output]# tree -d +. +├── node_0 +│   ├── server.log +│   └── train.log +├── node_1 +│   ├── server.log +│   └── train.log +├── node_2 +...... +├── pass-00002 +│   ├── done +│   ├── ___embedding_0__.w0 +│   ├── ___embedding_1__.w0 +...... +``` + +We can checkout the status of each training Pod by viewing the logs: + +```bash +[root@paddle-kubernetes-node0 node_0]# cat train.log +I1116 09:10:17.123121 50 Util.cpp:155] commandline: + /usr/local/bin/../opt/paddle/bin/paddle_trainer + --nics=eth0 --port=7164 + --ports_num=2 --comment=paddle_process_by_paddle + --pservers=192.168.129.66,192.168.223.143,192.168.129.71 + --ports_num_for_sparse=2 --config=./trainer_config.py + --trainer_count=4 --num_passes=10 --use_gpu=0 + --log_period=50 --dot_period=10 --saving_period=1 + --local=0 --trainer_id=0 + --save_dir=/home/jobpath/paddle-cluster-job/output +I1116 09:10:17.123440 50 Util.cpp:130] Calling runInitFunctions +I1116 09:10:17.123764 50 Util.cpp:143] Call runInitFunctions done. +[WARNING 2016-11-16 09:10:17,227 default_decorators.py:40] please use keyword arguments in paddle config. +[INFO 2016-11-16 09:10:17,239 networks.py:1282] The input order is [movie_id, title, genres, user_id, gender, age, occupation, rating] +[INFO 2016-11-16 09:10:17,239 networks.py:1289] The output order is [__square_error_cost_0__] +I1116 09:10:17.392917 50 Trainer.cpp:170] trainer mode: Normal +I1116 09:10:17.613910 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process +I1116 09:10:17.680917 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process +I1116 09:10:17.681543 50 GradientMachine.cpp:134] Initing parameters.. +I1116 09:10:18.012390 50 GradientMachine.cpp:141] Init parameters done. +I1116 09:10:18.018641 50 ParameterClient2.cpp:122] pserver 0 192.168.129.66:7164 +I1116 09:10:18.018950 50 ParameterClient2.cpp:122] pserver 1 192.168.129.66:7165 +I1116 09:10:18.019069 50 ParameterClient2.cpp:122] pserver 2 192.168.223.143:7164 +I1116 09:10:18.019492 50 ParameterClient2.cpp:122] pserver 3 192.168.223.143:7165 +I1116 09:10:18.019716 50 ParameterClient2.cpp:122] pserver 4 192.168.129.71:7164 +I1116 09:10:18.019836 50 ParameterClient2.cpp:122] pserver 5 192.168.129.71:7165 +``` + +## Some Additional Details + +### Using Environment Variables + +Usually we use the environment varialbes to configurate the PaddlePaddle Job which runs in +Kubernetes, `start_paddle.py` provides a start up script to convert the environment variable +to the start up arguments of PaddlePaddle process: + +```bash +API = "/api/v1/namespaces/" +JOBSELECTOR = "labelSelector=job-name=" +JOB_PATH = os.getenv("JOB_PATH") + "/" + os.getenv("JOB_NAME") +JOB_PATH_OUTPUT = JOB_PATH + "/output" +JOBNAME = os.getenv("JOB_NAME") +NAMESPACE = os.getenv("JOB_NAMESPACE") +PADDLE_NIC = os.getenv("CONF_PADDLE_NIC") +PADDLE_PORT = os.getenv("CONF_PADDLE_PORT") +PADDLE_PORTS_NUM = os.getenv("CONF_PADDLE_PORTS_NUM") +PADDLE_PORTS_NUM_SPARSE = os.getenv("CONF_PADDLE_PORTS_NUM_SPARSE") +PADDLE_SERVER_NUM = os.getenv("CONF_PADDLE_GRADIENT_NUM") +``` + +### Communication between Pods + +At the begin of `start_paddle.py`, it would initializes and parses the arguments. + +```python +parser = argparse.ArgumentParser(prog="start_paddle.py", + description='simple tool for k8s') + args, train_args_list = parser.parse_known_args() + train_args = refine_unknown_args(train_args_list) + train_args_dict = dict(zip(train_args[:-1:2], train_args[1::2])) + podlist = getPodList() +``` + +And then query the status of all the other Pods of this Job by the function `getPodList()`, and fetch `triner_id` by the function `getIdMap(podlist)` if all the Pods status is `RUNNING`. + +```python + podlist = getPodList() + # need to wait until all pods are running + while not isPodAllRunning(podlist): + time.sleep(10) + podlist = getPodList() + idMap = getIdMap(podlist) +``` + +**NOTE**: `getPodList()` would prefetch all the Pods in the current namespace, if some +Pods are alreay running, it may cause some error. We will use [statfulesets](https://kubernetes.io/docs/concepts/abstractions/controllers/statefulsets) instead of +Kubernetes Pod or Replicaset in the future. + +The function `getIdMap(podlist)` fetches IPs addresses of `podlist` and then sort them +to generate `trainer_id`. + +```python +def getIdMap(podlist): + ''' + generate tainer_id by ip + ''' + ips = [] + for pod in podlist["items"]: + ips.append(pod["status"]["podIP"]) + ips.sort() + idMap = {} + for i in range(len(ips)): + idMap[ips[i]] = i + return idMap +``` + +After getting the `idMap`, we can generate the arguments of `Paddle PServer` and `Paddle Trainer` +so that we can start up them by `startPaddle(idMap, train_args_dict)`. + +### Create Job + +The main goal of `startPaddle` is generating the arguments of `Paddle PServer` and +`Paddle Trainer` processes. Take `Paddle Trainer` as an example, we parse the +environment variable and then get `PADDLE_NIC`, `PADDLE_PORT`, `PADDLE_PORTS_NUM` and etc..., +finally find `trainerId` from `idMap` according to its IP address. + +```python + program = 'paddle train' + args = " --nics=" + PADDLE_NIC + args += " --port=" + str(PADDLE_PORT) + args += " --ports_num=" + str(PADDLE_PORTS_NUM) + args += " --comment=" + "paddle_process_by_paddle" + ip_string = "" + for ip in idMap.keys(): + ip_string += (ip + ",") + ip_string = ip_string.rstrip(",") + args += " --pservers=" + ip_string + args_ext = "" + for key, value in train_args_dict.items(): + args_ext += (' --' + key + '=' + value) + localIP = socket.gethostbyname(socket.gethostname()) + trainerId = idMap[localIP] + args += " " + args_ext + " --trainer_id=" + \ + str(trainerId) + " --save_dir=" + JOB_PATH_OUTPUT +``` diff --git a/paddle/fluid/framework/block_desc.cc b/paddle/fluid/framework/block_desc.cc index fbe08349c37c4fde115ceea954ba2b84880088d7..b8847e4b909cbab67b2ddb6885b45b73d402de19 100644 --- a/paddle/fluid/framework/block_desc.cc +++ b/paddle/fluid/framework/block_desc.cc @@ -13,11 +13,10 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/block_desc.h" +#include #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/program_desc.h" -#include - namespace paddle { namespace framework { @@ -147,52 +146,7 @@ void BlockDesc::RemoveOp(size_t s, size_t e) { if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) { return; } - auto get_vars = [](std::deque>::iterator &op, - std::vector &v) { - auto in_names = (*op)->InputArgumentNames(); - v.insert(v.end(), in_names.begin(), in_names.end()); - auto out_names = (*op)->OutputArgumentNames(); - v.insert(v.end(), out_names.begin(), out_names.end()); - std::sort(v.begin(), v.end()); - auto last = std::unique(v.begin(), v.end()); - v.erase(last, v.end()); - }; - need_update_ = true; - - for (size_t i = s; i < e; i++) { - // since remove op one by one, every time remove the first op. - auto op = ops_.begin() + s; - - // collect input and output variables from current delete op - std::vector cur_vars; - get_vars(op, cur_vars); - - // remove current op - ops_.erase(ops_.begin() + s); - - // collect input and output variables from other ops - std::vector other_vars; - for (auto it = ops_.begin(); it != ops_.end(); it++) { - get_vars(it, other_vars); - } - - // variables should be deleted - std::vector delete_vars; - // delete_vars = cur_vars - cur_vars ^ other_input_vars - std::set_difference(cur_vars.begin(), cur_vars.end(), other_vars.begin(), - other_vars.end(), - std::inserter(delete_vars, delete_vars.end())); - // remove variables - for (size_t i = 0; i < delete_vars.size(); i++) { - auto name = delete_vars[i]; - auto it = vars_.find(name); - PADDLE_ENFORCE(it != vars_.end(), - "%s is not in variable list, it should not be deleted", - name); - vars_.erase(it); - VLOG(3) << "deleting variable " << name; - } - } + ops_.erase(ops_.begin() + s, ops_.begin() + e); } std::vector BlockDesc::AllOps() const { diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index 8155cb55a468a09320b1196b49fc3e34cea261b1..a56674cbe216e312c4394ef537140122352dc785 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -12,9 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "paddle/fluid/framework/lod_tensor.h" +#include +#include +#include +#include + #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/framework.pb.h" +#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/memory/memcpy.h" #include "paddle/fluid/memory/memory.h" @@ -22,11 +27,6 @@ limitations under the License. */ #include "paddle/fluid/recordio/scanner.h" #include "paddle/fluid/recordio/writer.h" -#include -#include -#include -#include - namespace paddle { namespace framework { @@ -294,7 +294,7 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor, TensorFromStream(is, static_cast(tensor), dev_ctx); } -void WriteToRecordIO(recordio::Writer &writer, +void WriteToRecordIO(recordio::Writer *writer, const std::vector &tensor, const platform::DeviceContext &dev_ctx) { std::stringstream buffer; @@ -303,18 +303,20 @@ void WriteToRecordIO(recordio::Writer &writer, for (auto &each : tensor) { SerializeToStream(buffer, each, dev_ctx); } - writer.Write(buffer.str()); + writer->Write(buffer.str()); } std::vector ReadFromRecordIO( - recordio::Scanner &scanner, const platform::DeviceContext &dev_ctx) { - std::istringstream sin(scanner.Next()); - uint32_t sz; - sin.read(reinterpret_cast(&sz), sizeof(uint32_t)); + recordio::Scanner *scanner, const platform::DeviceContext &dev_ctx) { std::vector result; - result.resize(sz); - for (uint32_t i = 0; i < sz; ++i) { - DeserializeFromStream(sin, &result[i], dev_ctx); + if (scanner->HasNext()) { + std::istringstream sin(scanner->Next()); + uint32_t sz; + sin.read(reinterpret_cast(&sz), sizeof(uint32_t)); + result.resize(sz); + for (uint32_t i = 0; i < sz; ++i) { + DeserializeFromStream(sin, &result[i], dev_ctx); + } } return result; } diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index 4f130d265900483ec7a7c541f2610d17a352913f..1159fee39b0737402c60448dcbe69e7535c9d6e1 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -15,6 +15,9 @@ limitations under the License. */ #pragma once #include +#include +#include +#include #ifdef PADDLE_WITH_CUDA #include #include @@ -216,12 +219,12 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor, void DeserializeFromStream(std::istream& is, LoDTensor* tensor, const platform::DeviceContext& dev_ctx); -extern void WriteToRecordIO(recordio::Writer& writer, +extern void WriteToRecordIO(recordio::Writer* writer, const std::vector& tensor, const platform::DeviceContext& dev_ctx); extern std::vector ReadFromRecordIO( - recordio::Scanner& scanner, const platform::DeviceContext& dev_ctx); + recordio::Scanner* scanner, const platform::DeviceContext& dev_ctx); } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/lod_tensor_test.cc b/paddle/fluid/framework/lod_tensor_test.cc index e691e29383d4842b80769021e0e494967d38e9bb..97ab98f09b1a902a942d9667bc7716a28b98d54c 100644 --- a/paddle/fluid/framework/lod_tensor_test.cc +++ b/paddle/fluid/framework/lod_tensor_test.cc @@ -12,17 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/framework/lod_tensor.h" - -#include "paddle/fluid/recordio/scanner.h" -#include "paddle/fluid/recordio/writer.h" - #include #include #include #include #include +#include "paddle/fluid/framework/lod_tensor.h" + +#include "paddle/fluid/recordio/scanner.h" +#include "paddle/fluid/recordio/writer.h" + namespace paddle { namespace framework { @@ -240,8 +240,8 @@ TEST(LoDTensor, RecordIO) { *platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); { recordio::Writer writer(stream, recordio::Compressor::kSnappy); - WriteToRecordIO(writer, {tensor, tensor}, ctx); - WriteToRecordIO(writer, {tensor, tensor}, ctx); + WriteToRecordIO(&writer, {tensor, tensor}, ctx); + WriteToRecordIO(&writer, {tensor, tensor}, ctx); writer.Flush(); } @@ -254,11 +254,11 @@ TEST(LoDTensor, RecordIO) { { std::unique_ptr stream_ptr(stream); recordio::Scanner scanner(std::move(stream_ptr)); - auto tensors = ReadFromRecordIO(scanner, ctx); + auto tensors = ReadFromRecordIO(&scanner, ctx); ASSERT_EQ(tensors.size(), 2); assert_tensor_ok(tensors[0]); assert_tensor_ok(tensors[1]); - tensors = ReadFromRecordIO(scanner, ctx); + tensors = ReadFromRecordIO(&scanner, ctx); ASSERT_EQ(tensors.size(), 2); assert_tensor_ok(tensors[0]); assert_tensor_ok(tensors[1]); diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index f393105fe82bfad70246952deada8e296c851ef5..20dcc080b696431b9972c0a972904d957f9b47d8 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -115,14 +115,12 @@ void ParallelExecutor::BCastParamsToGPUs( for (auto &var : vars) { auto *main_var = main_scope->FindVar(var); - if (!main_var->IsType()) { + if (main_var == nullptr || !main_var->IsType()) { continue; } auto &main_tensor = main_var->Get(); - auto &dims = main_tensor.dims(); - if (paddle::platform::is_gpu_place(main_tensor.place())) { size_t numel = main_tensor.numel(); ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type()); diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 56bf00e5f91700f0cffa917aad8608caaab0a7fe..76126f3dc64d71770d13f9d66bb30f176c112629 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -22,7 +22,9 @@ FileReader::FileReader(const std::vector &dims) : dims_(dims) {} void FileReader::ReadNext(std::vector *out) { ReadNextImpl(out); - PADDLE_ENFORCE_EQ(out->size(), dims_.size()); + if (out->empty()) { + return; + } for (size_t i = 0; i < dims_.size(); ++i) { auto &actual = out->at(i).dims(); auto &expect = dims_[i]; diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 3573b99becf6d657c680c5fec0bda4bdde5dd7a2..3a413941df964c8d9454fafc6030c377c10f9fb1 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -14,14 +14,13 @@ #pragma once +#include +#include + #include "paddle/fluid/framework/ddim.h" #include "paddle/fluid/framework/lod_tensor_array.h" #include "paddle/fluid/platform/place.h" -#include -#include -#include - namespace paddle { namespace framework { @@ -31,8 +30,6 @@ class ReaderBase { virtual void ReInit() = 0; - virtual bool HasNext() const = 0; - virtual ~ReaderBase(); }; @@ -44,8 +41,6 @@ class DecoratedReader : public ReaderBase { void ReInit() override { reader_->ReInit(); } - bool HasNext() const override { return reader_->HasNext(); } - protected: ReaderBase* reader_; }; @@ -80,8 +75,6 @@ class ReaderHolder { reader_->ReInit(); } - bool HasNext() const { return reader_->HasNext(); } - private: std::unique_ptr reader_; }; diff --git a/paddle/fluid/operators/read_op.cc b/paddle/fluid/operators/read_op.cc index 2925b8a85da1b0d19672124e49c8fd22c8b4e6bf..bf02b9958927580608b95d6b8ecfddc7231a02d4 100644 --- a/paddle/fluid/operators/read_op.cc +++ b/paddle/fluid/operators/read_op.cc @@ -66,13 +66,7 @@ class ReadOp : public framework::OperatorBase { std::vector out_arg_names = Outputs("Out"); std::vector ins; reader->ReadNext(&ins); - if (ins.empty()) { - reader->ReInit(); - reader->ReadNext(&ins); - PADDLE_ENFORCE( - !ins.empty(), - "Reader can not read the next data even it has been re-initialized."); - } + PADDLE_ENFORCE(!ins.empty(), "There is no next data."); PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size()); for (size_t i = 0; i < ins.size(); ++i) { auto* out = diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 6fa0195b9ae103418beb56cc4b0fa9ab59e93108..845528860f91d0b479bb3c4dbbe05e32c68dc16f 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -22,5 +22,6 @@ reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc) reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc) reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) +reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) # Export local libraries to parent set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE) diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index ed868786ab2a80efa42574ed4f579c633ce0becf..33a50b5cebc1f65ccf9a00280f0eeadd00982555 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -63,13 +63,14 @@ class DoubleBufferReader : public framework::DecoratedReader { StartPrefetcher(); } - bool HasNext() const override; void ReadNext(std::vector* out) override; void ReInit() override; ~DoubleBufferReader() { EndPrefetcher(); } private: + bool HasNext() const; + void StartPrefetcher() { channel_ = framework::MakeChannel(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); @@ -109,7 +110,9 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { auto place_str = Attr("place"); platform::Place place; - if (place_str == "CPU") { + if (place_str == "AUTO") { + place = dev_place; + } else if (place_str == "CPU") { place = platform::CPUPlace(); } else { std::istringstream sin(place_str); @@ -140,28 +143,22 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { enum_range.insert(string::Sprintf("CUDA:%d", i)); } enum_range.insert("CPU"); - AddAttr("place", "The double buffer place, default is CPU") - .SetDefault("CPU") + enum_range.insert("AUTO"); + AddAttr("place", "The double buffer place") + .SetDefault("AUTO") .InEnum({enum_range}); } }; -bool DoubleBufferReader::HasNext() const { - while (!channel_->IsClosed() && !channel_->CanReceive()) { - } - return channel_->CanReceive(); -} - void DoubleBufferReader::ReadNext(std::vector* out) { - if (!HasNext()) { - PADDLE_THROW("There is no next data!"); - } - - Item batch; - channel_->Receive(&batch); - *out = batch.payloads_; - if (batch.ctx_) { - batch.ctx_->Wait(); + out->clear(); + if (HasNext()) { + Item batch; + channel_->Receive(&batch); + *out = batch.payloads_; + if (batch.ctx_) { + batch.ctx_->Wait(); + } } } @@ -171,16 +168,26 @@ void DoubleBufferReader::ReInit() { StartPrefetcher(); } +bool DoubleBufferReader::HasNext() const { + while (!channel_->IsClosed() && !channel_->CanReceive()) { + } + return channel_->CanReceive(); +} + void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; std::vector> cpu_tensor_cache(kCacheSize); std::vector> gpu_tensor_cache(kCacheSize); size_t cached_tensor_id = 0; - while (reader_->HasNext()) { + while (true) { Item batch; auto& cpu_batch = cpu_tensor_cache[cached_tensor_id]; reader_->ReadNext(&cpu_batch); + if (cpu_batch.empty()) { + // The underlying reader have no next data. + break; + } if (platform::is_gpu_place(place_)) { auto& gpu_batch = gpu_tensor_cache[cached_tensor_id]; auto* gpu_ctx = ctxs_[cached_tensor_id].get(); diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc index b72ccc77a3e1ec30fd817471d3ffd667974ae684..0573345ba502b6a9af35710840d5acf7634f332f 100644 --- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc +++ b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc @@ -25,22 +25,12 @@ class MultiPassReader : public framework::DecoratedReader { : DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {} void ReadNext(std::vector* out) override { - if (!HasNext()) { - PADDLE_THROW("There is no next data!"); - } reader_->ReadNext(out); - } - - bool HasNext() const override { - if (reader_->HasNext()) { - return true; - } else { + if (out->empty()) { ++pass_count_; - if (pass_count_ >= pass_num_) { - return false; - } else { + if (pass_count_ < pass_num_) { reader_->ReInit(); - return true; + reader_->ReadNext(out); } } } diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc index 95d8674c08b63e872926ff8708d0c734da33684c..d1cb8e47da70cab784858caea7e791151fc104dd 100644 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc @@ -52,8 +52,6 @@ class RandomDataGenerator : public framework::ReaderBase { void ReInit() override { return; } - bool HasNext() const override { return true; } - private: float min_; float max_; @@ -74,7 +72,7 @@ class CreateRandomDataGeneratorOp : public framework::OperatorBase { const auto& ranks = Attr>("ranks"); PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - int(shape_concat.size()), + static_cast(shape_concat.size()), "The accumulate of all ranks should be equal to the " "shape concat's length."); std::vector shapes = RestoreShapes(shape_concat, ranks); diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index adaa0b9e5f1ffcfbf3e9cd8fd060153575f270a6..2ae29725561769ebe6428002c9983246b8eec724 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include #include "paddle/fluid/operators/reader/reader_op_registry.h" #include "paddle/fluid/recordio/scanner.h" @@ -35,17 +33,15 @@ class RecordIOFileReader : public framework::FileReader { LOG(INFO) << "Creating file reader" << filename; } - bool HasNext() const override { return scanner_.HasNext(); } - void ReInit() override { scanner_.Reset(); } protected: void ReadNextImpl(std::vector* out) override { if (ThreadSafe) { std::lock_guard guard(*mutex_); - *out = framework::ReadFromRecordIO(scanner_, dev_ctx_); + *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_); } else { - *out = framework::ReadFromRecordIO(scanner_, dev_ctx_); + *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_); } } @@ -66,7 +62,7 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { const auto& ranks = Attr>("ranks"); PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - int(shape_concat.size()), + static_cast(shape_concat.size()), "The accumulate of all ranks should be equal to the " "shape concat's length."); std::string filename = Attr("filename"); diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index b164ce232d6bea7b4ff0c67ee0a7dd83b14f61a2..13825d65913be95f4f444bd9d5271a036ec8b1e2 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -30,35 +30,33 @@ class ShuffleReader : public framework::DecoratedReader { std::random_device device; seed_ = device(); } - ReadIntoBuffers(); + ReloadBuffer(); } void ReadNext(std::vector* out) override { - if (!HasNext()) { - PADDLE_THROW("There is no next data!"); - } + out->clear(); if (iteration_pos_ >= buffer_.size()) { VLOG(10) << "Resetting shuffle buffer"; - ReadIntoBuffers(); + ReloadBuffer(); + if (buffer_.empty()) { + return; + } } *out = buffer_[iteration_pos_++]; } - bool HasNext() const override { - return iteration_pos_ < buffer_.size() || reader_->HasNext(); - } - private: - void ReadIntoBuffers() { + void ReloadBuffer() { buffer_.clear(); buffer_.reserve(buffer_size_); iteration_pos_ = 0; for (size_t i = 0; i < buffer_size_; ++i) { - if (!reader_->HasNext()) { + std::vector ins; + reader_->ReadNext(&ins); + if (ins.empty()) { break; } - buffer_.emplace_back(); - reader_->ReadNext(&buffer_.back()); + buffer_.emplace_back(ins); } std::mt19937 g(seed_); std::shuffle(buffer_.begin(), buffer_.end(), g); diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..cbf709d5e734c0f2adf3735dc28043c1340349da --- /dev/null +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -0,0 +1,94 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/operators/detail/safe_ref.h" +#include "paddle/fluid/operators/reader/reader_op_registry.h" + +namespace paddle { +namespace operators { +namespace reader { + +class ThreadedReader : public framework::DecoratedReader { + public: + ThreadedReader(ReaderBase* reader, bool safe_mode) + : DecoratedReader(reader), safe_mode_(safe_mode) {} + + void ReadNext(std::vector* out) override { + std::lock_guard lock(mutex_); + reader_->ReadNext(out); + } + + void ReInit() override { + if (safe_mode_) { + PADDLE_THROW( + "ThreadedReader::ReInit() is disabled when 'safe_mode' is true."); + } + VLOG(5) << "ThreadedReader::ReInit() is invoked! It might be buggy in " + "multi-thread environment."; + reader_->ReInit(); + } + + private: + bool safe_mode_; + std::mutex mutex_; +}; + +class CreateThreadedReaderOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + + private: + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + auto* out = detail::Ref(scope.FindVar(Output("Out"))) + .GetMutable(); + if (out->Get() != nullptr) { + return; + } + const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) + ->Get(); + bool safe_mode = Attr("safe_mode"); + out->Reset(new ThreadedReader(underlying_reader.Get(), safe_mode)); + } +}; + +class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { + public: + CreateThreadedReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker) + : DecoratedReaderMakerBase(op_proto, op_checker) { + AddAttr("safe_mode", + "When 'safe_mode' is true, 'ReInit()' is disabled to avoid " + "unexpected bugs in multi-thread environment.") + .SetDefault(true); + AddComment(R"DOC( + CreateThreadedReader Operator + + This operator creates a threaded reader. A threaded reader's + 'ReadNext()' can be invoked by several threads at the same + time. + When the attribute 'safe_mode' is true, the threaded reader's + 'ReInit()' is disabled to avoid unexpected bugs in multi-thread + environment. + )DOC"); + } +}; + +} // namespace reader +} // namespace operators +} // namespace paddle + +namespace reader = paddle::operators::reader; +REGISTER_DECORATED_READER_OPERATOR(create_threaded_reader, + reader::CreateThreadedReaderOp, + reader::CreateThreadedReaderOpMaker); diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index eacedeea8835d27b712b287824b9d30b03ebebbf..779dc8a6a0deb7792e0540071e3a2588102fa708 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include // NOLINT + #include "paddle/fluid/framework/channel.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" @@ -19,38 +21,23 @@ namespace paddle { namespace operators { namespace reader { -class MultipleReader : public framework::ReaderBase { +class MultiFileReader : public framework::ReaderBase { public: - class ThreadBufferMap { - public: - std::vector& operator[]( - const std::thread::id& thread_id) { - std::lock_guard lock(mutex_); - return buffer_[thread_id]; - } - - void Clear() { buffer_.clear(); } - - private: - std::mutex mutex_; - std::unordered_map> - buffer_; - }; - - MultipleReader(const std::vector& file_names, - const std::vector& dims, size_t thread_num) - : file_names_(file_names), dims_(dims) { + MultiFileReader(const std::vector& file_names, + const std::vector& dims, size_t thread_num, + size_t buffer_size) + : file_names_(file_names), dims_(dims), buffer_size_(buffer_size) { prefetchers_.resize(thread_num); StartNewScheduler(); } void ReadNext(std::vector* out) override; - bool HasNext() const override; void ReInit() override; - ~MultipleReader() { EndScheduler(); } + ~MultiFileReader() { EndScheduler(); } private: + bool HasNext(); void StartNewScheduler(); void EndScheduler(); void ScheduleThreadFunc(); @@ -60,39 +47,36 @@ class MultipleReader : public framework::ReaderBase { std::vector dims_; std::thread scheduler_; std::vector prefetchers_; + size_t buffer_size_; framework::Channel* waiting_file_idx_; framework::Channel* available_thread_idx_; framework::Channel>* buffer_; - mutable ThreadBufferMap thread_buffer_map_; }; -void MultipleReader::ReadNext(std::vector* out) { - if (!HasNext()) { - PADDLE_THROW("There is no next data!"); +void MultiFileReader::ReadNext(std::vector* out) { + out->clear(); + if (HasNext()) { + buffer_->Receive(out); } - auto& thread_local_buffer = thread_buffer_map_[std::this_thread::get_id()]; - *out = thread_local_buffer; - thread_local_buffer.clear(); -} - -bool MultipleReader::HasNext() const { - auto& thread_local_buffer = thread_buffer_map_[std::this_thread::get_id()]; - return thread_local_buffer.empty() ? buffer_->Receive(&thread_local_buffer) - : true; } -void MultipleReader::ReInit() { +void MultiFileReader::ReInit() { EndScheduler(); - thread_buffer_map_.Clear(); StartNewScheduler(); } -void MultipleReader::StartNewScheduler() { +bool MultiFileReader::HasNext() { + while (!buffer_->IsClosed() && !buffer_->CanReceive()) { + } + return buffer_->CanReceive(); +} + +void MultiFileReader::StartNewScheduler() { size_t thread_num = prefetchers_.size(); waiting_file_idx_ = framework::MakeChannel(file_names_.size()); available_thread_idx_ = framework::MakeChannel(thread_num); buffer_ = - framework::MakeChannel>(thread_num); + framework::MakeChannel>(buffer_size_); for (size_t i = 0; i < file_names_.size(); ++i) { waiting_file_idx_->Send(&i); @@ -105,7 +89,7 @@ void MultipleReader::StartNewScheduler() { scheduler_ = std::thread([this] { ScheduleThreadFunc(); }); } -void MultipleReader::EndScheduler() { +void MultiFileReader::EndScheduler() { available_thread_idx_->Close(); buffer_->Close(); waiting_file_idx_->Close(); @@ -117,8 +101,8 @@ void MultipleReader::EndScheduler() { delete waiting_file_idx_; } -void MultipleReader::ScheduleThreadFunc() { - VLOG(5) << "MultipleReader schedule thread starts."; +void MultiFileReader::ScheduleThreadFunc() { + VLOG(5) << "MultiFileReader schedule thread starts."; size_t completed_thread_num = 0; size_t thread_idx; while (available_thread_idx_->Receive(&thread_idx)) { @@ -150,17 +134,20 @@ void MultipleReader::ScheduleThreadFunc() { p.join(); } } - VLOG(5) << "MultipleReader schedule thread terminates."; + VLOG(5) << "MultiFileReader schedule thread terminates."; } -void MultipleReader::PrefetchThreadFunc(std::string file_name, - size_t thread_idx) { +void MultiFileReader::PrefetchThreadFunc(std::string file_name, + size_t thread_idx) { VLOG(5) << "The prefetch thread of file '" << file_name << "' starts."; std::unique_ptr reader = CreateReaderByFileName(file_name, dims_); - while (reader->HasNext()) { + while (true) { std::vector ins; reader->ReadNext(&ins); + if (ins.empty()) { + break; + } try { buffer_->Send(&ins); } catch (paddle::platform::EnforceNotMet e) { @@ -197,11 +184,13 @@ class OpenFilesOp : public framework::OperatorBase { const auto& file_names = Attr>("file_names"); PADDLE_ENFORCE(!file_names.empty(), "No file to be read!"); const size_t thread_num = Attr("thread_num"); + const size_t buffer_size = Attr("buffer_size"); auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new MultipleReader( - file_names, RestoreShapes(shape_concat, ranks), thread_num)); + out->Reset(new MultiFileReader(file_names, + RestoreShapes(shape_concat, ranks), + thread_num, buffer_size)); } }; @@ -212,11 +201,12 @@ class OpenFilesOpMaker : public FileReaderMakerBase { AddAttr>("file_names", "Files to be read."); AddAttr("thread_num", "The maximal concurrent prefetch thread number.") .GreaterThan(0); + AddAttr("buffer_size", "The size of prefetch buffer.").GreaterThan(0); AddComment(R"DOC( OpenFiles Operator - An OpenFilesOp creates a MultipleReader, which is able to + An OpenFilesOp creates a MultiFileReader, which is able to read data multi-threaded from multiple files. )DOC"); } diff --git a/paddle/fluid/platform/cuda_helper.h b/paddle/fluid/platform/cuda_helper.h index 881d611d4ac26f992036f639097815aff625227b..8758af0804ae08fec6fa64d7387f197f046ce20e 100644 --- a/paddle/fluid/platform/cuda_helper.h +++ b/paddle/fluid/platform/cuda_helper.h @@ -33,22 +33,26 @@ constexpr int PADDLE_CUDA_NUM_THREADS = 512; USE_CUDA_ATOMIC(Add, float); USE_CUDA_ATOMIC(Add, int); USE_CUDA_ATOMIC(Add, unsigned int); -USE_CUDA_ATOMIC(Add, unsigned long long int); +// CUDA API uses unsigned long long int, we cannot use uint64_t here. +// It because unsigned long long int is not necessarily uint64_t +USE_CUDA_ATOMIC(Add, unsigned long long int); // NOLINT CUDA_ATOMIC_WRAPPER(Add, int64_t) { - static_assert(sizeof(int64_t) == sizeof(long long int), + // Here, we check long long int must be int64_t. + static_assert(sizeof(int64_t) == sizeof(long long int), // NOLINT "long long should be int64"); - return CudaAtomicAdd(reinterpret_cast(address), - static_cast(val)); + return CudaAtomicAdd( + reinterpret_cast(address), // NOLINT + static_cast(val)); // NOLINT } #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 600 USE_CUDA_ATOMIC(Add, double); #else CUDA_ATOMIC_WRAPPER(Add, double) { - unsigned long long int* address_as_ull = - reinterpret_cast(address); - unsigned long long int old = *address_as_ull, assumed; + unsigned long long int* address_as_ull = // NOLINT + reinterpret_cast(address); // NOLINT + unsigned long long int old = *address_as_ull, assumed; // NOLINT do { assumed = old; diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 29990043206509e4192bfff84832f09ef127d9dd..ca9ab2c7aecff47924f0198802d710b7661f5576 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -14,8 +14,9 @@ #pragma once -#include +#include // NOLINT #include +#include #include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/enforce.h" @@ -29,6 +30,8 @@ inline ncclDataType_t ToNCCLDataType(std::type_index type) { return ncclDouble; } else if (type == typeid(int)) { // NOLINT return ncclInt; + } else if (type == typeid(int64_t)) { // NOLINT + return ncclInt64; } else { PADDLE_THROW("Not supported"); } @@ -58,7 +61,7 @@ struct NCCLContext { ncclComm_t comm_; explicit NCCLContext(int dev_id) - : ctx_(new CUDADeviceContext(CUDAPlace(dev_id))) {} + : ctx_(new CUDADeviceContext(CUDAPlace(dev_id))), comm_{nullptr} {} cudaStream_t stream() const { return ctx_->stream(); } @@ -66,23 +69,23 @@ struct NCCLContext { return boost::get(ctx_->GetPlace()).device; } - static void InitNCCLContext(std::unordered_map &contexts, + static void InitNCCLContext(std::unordered_map *contexts, const std::vector &places) { std::vector comms; std::vector devs; - comms.resize(contexts.size()); - devs.reserve(contexts.size()); + comms.resize(contexts->size()); + devs.reserve(contexts->size()); for (auto &p : places) { devs.push_back(boost::get(p).device); } PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( - &comms[0], static_cast(contexts.size()), &devs[0])); + &comms[0], static_cast(contexts->size()), &devs[0])); int i = 0; for (auto &dev_id : devs) { - contexts.at(dev_id).comm_ = comms[i++]; + contexts->at(dev_id).comm_ = comms[i++]; } } }; @@ -91,7 +94,8 @@ struct NCCLContextMap { std::unordered_map contexts_; std::vector order_; - NCCLContextMap(const std::vector &places) { + explicit NCCLContextMap(const std::vector &places) { + PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); for (auto &p : places) { int dev_id = boost::get(p).device; @@ -102,15 +106,17 @@ struct NCCLContextMap { order_.size(), contexts_.size(), "NCCL Context Map does not support contain two or more same device"); - std::vector comms; - comms.resize(order_.size()); + if (places.size() > 1) { + std::vector comms; + comms.resize(order_.size()); - PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( - &comms[0], static_cast(order_.size()), &order_[0])); + PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( + &comms[0], static_cast(order_.size()), &order_[0])); - int i = 0; - for (auto &dev_id : order_) { - contexts_.at(dev_id).comm_ = comms[i++]; + int i = 0; + for (auto &dev_id : order_) { + contexts_.at(dev_id).comm_ = comms[i++]; + } } } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index a9a5d87d77ef9074e1d073d6b16083360110f670..d559743a69584c96c9f543692191964c68c7b5e1 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -252,7 +252,6 @@ All parameter, weight, gradient are variables in Paddle. py::return_value_policy::reference); py::class_(m, "Reader", "") - .def("has_next", &framework::ReaderHolder::HasNext) .def("reset", &framework::ReaderHolder::ReInit); py::class_(m, "Scope", "") diff --git a/paddle/fluid/pybind/recordio.cc b/paddle/fluid/pybind/recordio.cc index 0644d91425af1a1ac9363b1dec9e317689331fcb..330d104e0a774d905e463566f85bd2e64a080190 100644 --- a/paddle/fluid/pybind/recordio.cc +++ b/paddle/fluid/pybind/recordio.cc @@ -39,7 +39,7 @@ class RecordIOWriter { void CompleteAppendTensor() { auto& ctx = *platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); - framework::WriteToRecordIO(writer_, tensors_, ctx); + framework::WriteToRecordIO(&writer_, tensors_, ctx); tensors_.clear(); } diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 33cf6918178ff746a6b130af0e23a69de0f532fe..793421a22fbf6f3c25ec6a9bf8359f4e71e905de 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -818,6 +818,11 @@ class Block(object): del self.vars[name] self.sync_with_cpp() + def remove_var(self, name): + self.sync_with_cpp() + self.desc.remove_var(name) + del self.vars[name] + def create_parameter(self, *args, **kwargs): global_block = self.program.global_block() param = Parameter(global_block, *args, **kwargs) @@ -838,6 +843,11 @@ class Block(object): self.ops.insert(index, op) return op + def remove_op(self, index): + self.sync_with_cpp() + self.desc.remove_op(index, index + 1) + del self.ops[index] + def delete_ops(self, ops): # remove from cpp # FIXME(typhoonzero): remove only the first occurrence. @@ -846,6 +856,7 @@ class Block(object): end = list(self.ops).index(ops[-1]) except Exception, e: raise e + self.desc.remove_op(start, end + 1) def slice_ops(self, start, end): diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 969398bda4cfd0b2f5e39f45d34a1da9b216901f..e7d6c4e2521bee133c4794ed1db669b02fc2152b 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -21,8 +21,7 @@ from ..executor import global_scope __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', - 'open_files', 'read_file', 'create_shuffle_reader', - 'create_double_buffer_reader', 'create_multi_pass_reader' + 'open_files', 'read_file', 'shuffle', 'double_buffer' ] @@ -237,13 +236,9 @@ def monkey_patch_reader_methods(reader): var = scope.find_var(reader.name) return var.get_reader() - def eof(): - return not __get_reader__().has_next() - def reset(): return __get_reader__().reset() - reader.eof = eof reader.reset = reset reader.stop_gradient = True reader.persistable = True @@ -283,7 +278,42 @@ def _copy_reader_create_op_(block, op): return new_op -def open_recordio_file(filename, shapes, lod_levels, dtypes): +def open_recordio_file(filename, + shapes, + lod_levels, + dtypes, + pass_num=1, + for_parallel=False): + """ + Open a RecordIO file + + This layer takes a RecordIO file to read from and returns a Reader Variable. + Via the Reader Variable, we can get data from the given RecordIO file. + + Args: + filename(str): The RecordIO file's name. + shapes(list): List of tuples which declaring data shapes. + lod_levels(list): List of ints which declaring data lod_level. + dtypes(list): List of strs which declaring data type. + pass_num(int): Number of passes to run. + for_parallel(Bool): Set it as True if you are going to run + subsequent operators in parallel. + + Returns: + Variable: A Reader Variable via which we can get RecordIO file data. + + Examples: + .. code-block:: python + + reader = fluid.layers.io.open_recordio_file( + filename='./data.recordio', + shapes=[(3,224,224), (1)], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + + # Via the reader, we can use 'read_file' layer to get data: + image, label = fluid.layers.read_file(reader) + """ dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] shape_concat = [] ranks = [] @@ -310,10 +340,63 @@ def open_recordio_file(filename, shapes, lod_levels, dtypes): startup_var.persistable = True main_prog_var = _copy_reader_var_(default_main_program().current_block(), startup_var) + + if pass_num > 1: + main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num) + + if for_parallel: + main_prog_var = parallel(reader=main_prog_var) + return monkey_patch_reader_methods(main_prog_var) -def open_files(filenames, thread_num, shapes, lod_levels, dtypes): +def open_files(filenames, + shapes, + lod_levels, + dtypes, + thread_num, + buffer_size=None, + pass_num=1, + for_parallel=False): + """ + Open files + + This layer takes a list of files to read from and returns a Reader Variable. + Via the Reader Variable, we can get data from given files. All files must + have name suffixs to indicate their formats, e.g., '*.recordio'. + + Args: + filenames(list): The list of file names. + shapes(list): List of tuples which declaring data shapes. + lod_levels(list): List of ints which declaring data lod_level. + dtypes(list): List of strs which declaring data type. + thread_num(int): The maximal concurrent prefetch thread number. + buffer_size(int): The size of prefetch buffer. + pass_num(int): Number of passes to run. + for_parallel(Bool): Set it as True if you are going to run + subsequent operators in parallel. + + Returns: + Variable: A Reader Variable via which we can get file data. + + Examples: + .. code-block:: python + + reader = fluid.layers.io.open_files(filenames=['./data1.recordio', + './data2.recordio'], + shapes=[(3,224,224), (1)], + lod_levels=[0, 0], + dtypes=['float32', 'int64'], + thread_num=2, + buffer_size=2) + + # Via the reader, we can use 'read_file' layer to get data: + image, label = fluid.layers.io.read_file(reader) + """ + if buffer_size is None: + buffer_size = thread_num + if isinstance(filenames, basestring): + filenames = [filenames] dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] shape_concat = [] ranks = [] @@ -322,29 +405,36 @@ def open_files(filenames, thread_num, shapes, lod_levels, dtypes): shape_concat.extend(shape) ranks.append(len(shape)) - var_name = unique_name('multiple_reader') - + multi_file_reader_name = unique_name('multi_file_reader') startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=var_name) + startup_reader = startup_blk.create_var(name=multi_file_reader_name) startup_blk.append_op( type='open_files', - outputs={'Out': [startup_var]}, + outputs={'Out': [startup_reader]}, attrs={ 'shape_concat': shape_concat, 'lod_levels': lod_levels, 'ranks': ranks, 'file_names': filenames, - 'thread_num': thread_num + 'thread_num': thread_num, + 'buffer_size': buffer_size }) - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - return monkey_patch_reader_methods(main_prog_var) + startup_reader.desc.set_dtypes(dtypes) + startup_reader.persistable = True + main_prog_reader = _copy_reader_var_(default_main_program().current_block(), + startup_reader) + if pass_num > 1: + main_prog_reader = multi_pass( + reader=main_prog_reader, pass_num=pass_num) + + if for_parallel: + main_prog_reader = parallel(reader=main_prog_reader) + + return monkey_patch_reader_methods(main_prog_reader) -def __create_decorated_reader__(op_type, reader, attrs): +def __create_shared_decorated_reader__(op_type, reader, attrs): var_name = unique_name(op_type) startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=var_name) @@ -360,22 +450,41 @@ def __create_decorated_reader__(op_type, reader, attrs): return monkey_patch_reader_methods(main_prog_var) -def create_shuffle_reader(reader, buffer_size): - return __create_decorated_reader__('create_shuffle_reader', reader, - {'buffer_size': int(buffer_size)}) +def __create_unshared_decorated_reader__(op_type, reader, attrs): + new_reader_name = unique_name(op_type) + main_blk = default_main_program().current_block() + new_reader = main_blk.create_var(name=new_reader_name) + main_blk.append_op( + type=op_type, + inputs={'UnderlyingReader': reader}, + outputs={'Out': [new_reader]}, + attrs=attrs) + new_reader.persistable = True + new_reader.stop_gradient = True + return monkey_patch_reader_methods(new_reader) + + +def shuffle(reader, buffer_size): + return __create_unshared_decorated_reader__( + 'create_shuffle_reader', reader, {'buffer_size': int(buffer_size)}) -def create_double_buffer_reader(reader, place=None): +def double_buffer(reader, place=None): attrs = dict() if place is not None: attrs['place'] = str(place).upper() - return __create_decorated_reader__('create_double_buffer_reader', reader, - attrs) + return __create_unshared_decorated_reader__('create_double_buffer_reader', + reader, attrs) + + +def multi_pass(reader, pass_num): + return __create_shared_decorated_reader__( + 'create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) -def create_multi_pass_reader(reader, pass_num): - return __create_decorated_reader__('create_multi_pass_reader', reader, - {'pass_num': int(pass_num)}) +def parallel(reader): + return __create_shared_decorated_reader__('create_threaded_reader', reader, + {}) def read_file(file_obj): diff --git a/python/paddle/fluid/tests/unittests/test_multiple_reader.py b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py similarity index 91% rename from python/paddle/fluid/tests/unittests/test_multiple_reader.py rename to python/paddle/fluid/tests/unittests/test_multi_file_reader.py index a60a5d6c4af2b6b3652d0fe2089018b9403eee25..5dc41e54d6158787eb966333c894e378b5c706d0 100644 --- a/python/paddle/fluid/tests/unittests/test_multiple_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py @@ -61,8 +61,12 @@ class TestMultipleReader(unittest.TestCase): exe.run(fluid.default_startup_program()) batch_count = 0 - while not data_files.eof(): - img_val, = exe.run(fetch_list=[img]) + while True: + try: + img_val, = exe.run(fetch_list=[img]) + except fluid.core.EnforceNotMet as ex: + self.assertIn("There is no next data.", ex.message) + break batch_count += 1 self.assertLessEqual(img_val.shape[0], self.batch_size) data_files.reset() diff --git a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py index 0b7a29075939a548320185947b5afa7261029d49..1471843ded7a42432a84a9fad76bb97dcf7fb9c2 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py @@ -44,7 +44,7 @@ class TestMultipleReader(unittest.TestCase): shapes=[(-1, 784), (-1, 1)], lod_levels=[0, 0], dtypes=['float32', 'int64']) - data_file = fluid.layers.create_multi_pass_reader( + data_file = fluid.layers.io.multi_pass( reader=data_file, pass_num=self.pass_num) img, label = fluid.layers.read_file(data_file) @@ -57,8 +57,12 @@ class TestMultipleReader(unittest.TestCase): exe.run(fluid.default_startup_program()) batch_count = 0 - while not data_file.eof(): - img_val, = exe.run(fetch_list=[img]) + while True: + try: + img_val, = exe.run(fetch_list=[img]) + except fluid.core.EnforceNotMet as ex: + self.assertIn("There is no next data.", ex.message) + break batch_count += 1 self.assertLessEqual(img_val.shape[0], self.batch_size) data_file.reset() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 8401716db88ef3dda68644a052d78b4476c9fdc7..3c00f708f08b6637acd731d23a5b9eb4eed12d2a 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -26,11 +26,14 @@ def simple_fc_net(use_feed): img = fluid.layers.data(name='image', shape=[784], dtype='float32') label = fluid.layers.data(name='label', shape=[1], dtype='int64') else: - reader = fluid.layers.open_recordio_file( - filename='./mnist.recordio', + reader = fluid.layers.open_files( + filenames=['./mnist.recordio'], shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64']) + dtypes=['float32', 'int64'], + thread_num=1, + for_parallel=True) + reader = fluid.layers.io.double_buffer(reader) img, label = fluid.layers.read_file(reader) hidden = img for _ in xrange(4): @@ -51,11 +54,14 @@ def fc_with_batchnorm(use_feed): img = fluid.layers.data(name='image', shape=[784], dtype='float32') label = fluid.layers.data(name='label', shape=[1], dtype='int64') else: - reader = fluid.layers.open_recordio_file( - filename='./mnist.recordio', + reader = fluid.layers.open_files( + filenames=['mnist.recordio'], shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64']) + dtypes=['float32', 'int64'], + thread_num=1, + for_parallel=True) + reader = fluid.layers.io.double_buffer(reader) img, label = fluid.layers.read_file(reader) hidden = img diff --git a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py index f98a8bbc68a4315df3ae761f2e52b8f11cb620c6..3f9059fb5b31cd009c068ccddc9a8938adae5772 100644 --- a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py +++ b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py @@ -201,24 +201,6 @@ class TestBlockDesc(unittest.TestCase): op1.set_type("test") op2.set_type("test") - var0 = block.var("var0") - var1 = block.var("var1") - var2 = block.var("var2") - var3 = block.var("var3") - var4 = block.var("var4") - var5 = block.var("var5") - - op0.set_input("X", ["var0"]) - op0.set_output("Y", ["var0"]) - op1.set_input("X", ["var1", "var2"]) - op1.set_output("Y", ["var3", "var4"]) - op2.set_input("X", ["var1"]) - op2.set_output("Y", ["var4", "var5"]) - - program.sync_with_cpp() - - # remove op1, its input var2 and output var3 will be removed at the same time, - # but its input var1 and output var4 will not be removed since they are used for op2. block.remove_op(1, 2) program.sync_with_cpp() @@ -226,8 +208,6 @@ class TestBlockDesc(unittest.TestCase): for idx in xrange(0, block.op_size()): all_ops.append(block.op(idx)) self.assertEqual(all_ops, [op0, op2]) - all_vars = block.all_vars() - self.assertEqual(set(all_vars), {var0, var1, var4, var5}) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index 24a0074d9b9621d902d12eb8cb29d9b65be22ed3..7c8e7f634fdd3ee3f056a95df774402a7c29e906 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -65,8 +65,13 @@ class TestRecordIO(unittest.TestCase): # train a pass batch_id = 0 - while not data_file.eof(): - tmp, = exe.run(fetch_list=[avg_loss]) + while True: + try: + tmp, = exe.run(fetch_list=[avg_loss]) + except fluid.core.EnforceNotMet as ex: + self.assertIn("There is no next data.", ex.message) + break + avg_loss_np.append(tmp) batch_id += 1 data_file.reset() @@ -74,8 +79,8 @@ class TestRecordIO(unittest.TestCase): self.assertLess(avg_loss_np[-1], avg_loss_np[0]) def test_shuffle_reader(self): - self.test_main(decorator_callback=lambda reader: fluid.layers.create_shuffle_reader(reader, buffer_size=200)) + self.test_main(decorator_callback=lambda reader: fluid.layers.io.shuffle(reader, buffer_size=200)) def test_double_buffer_reader(self): - self.test_main(decorator_callback=lambda reader: fluid.layers.create_double_buffer_reader(reader, + self.test_main(decorator_callback=lambda reader: fluid.layers.io.double_buffer(reader, place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu'))