diff --git a/doc/design/cluster_train/src/trainer.graffle b/doc/design/cluster_train/src/trainer.graffle index 42384a3f059966e22e22f5fa4295cc9ead5cef83..43415ed8cf61a5acfa34f8e56b9577f338dbf254 100644 Binary files a/doc/design/cluster_train/src/trainer.graffle and b/doc/design/cluster_train/src/trainer.graffle differ diff --git a/doc/design/prune.md b/doc/design/prune.md new file mode 100644 index 0000000000000000000000000000000000000000..4a5cf10c79a554779137f0cce5494fdd96ef6b7a --- /dev/null +++ b/doc/design/prune.md @@ -0,0 +1,63 @@ +# Prune + +## Motivation + +We want to support running inference, training and checkpointing in one `ProgramDesc`. We implement +`void Prune(const ProgramDesc* input, ProgramDesc* output)` function, which takes a `ProgramDesc` +and generate a pruned `ProgramDesc`. + +## Challenge + +Pruning need to support both variables and operators being evaluation targets. Consider the following +different situations. + +```python +# Case 1: run foward pass. +cost_np = session.run(target=cost) +# Case 2: run backward passing. +opts_np, _ = session.run(target=[cost, opt]) +# Case 3: run checkpointing +_ = session.run(target=checkpoint) +``` + +## Solution + +To support evaluation of operators, we add `is_target` field in the `OpDesc`. + +```c++ +message OpDesc { + required string type = 3; + repeated Var inputs = 1; + repeated Var outputs = 2; + repeated Attr attrs = 4; + optional bool is_target = 5 [ default = false ]; +}; +``` + +To support evaluation of variables, we add [fetch_op](https://github.com/PaddlePaddle/Paddle/pull/4599). +For each variable in the `target`, we insert a `fetch_op` into the `ProgramDesc` with `variable` being +`fetch_op`'s input. Then we also set `fetch_op` is a target. + +### Algorithm + +If an operator needs to be run, it must fall into one of the following cases: + +1. It is the target. +2. It is depended by some other ops, meaning its output is some other op's input. + +The first case can be checked by `op_desc.is_traget()` . The second case can be implement as + +```c++ +bool HasDependentVar(const OpDesc& op_desc, const std::set& dependent_vars) { + for (auto& var : op_desc.outputs()) { + for (auto& argu : var.arguments()) { + if (dependent_vars.count(argu) != 0) { + return true; + } + } + } + return false; +} +``` + +Then the whole algorithm can be implemented as the following [code](https://github.com/tonyyang-svail/Paddle/blob/prune_impl/paddle/framework/prune.cc). diff --git a/doc/design/refactorization.md b/doc/design/refactorization.md index ec51aa1a0ec667175ff7215dcd359023e296769f..f93d6155e1764386b01d2f0df3f141ab75cd55d4 100644 --- a/doc/design/refactorization.md +++ b/doc/design/refactorization.md @@ -177,9 +177,6 @@ REGISTER_OP(op_type, op_class, op_maker_class, grad_op_type, grad_op_class) REGISTER_OP_WITHOUT_GRADIENT(op_type, op_class, op_maker_class) ``` -### USE Macros -Make sure the registration process is executed and linked. - --- # Registration Process 1. Write an Op class and its gradient Op class, if required. @@ -188,8 +185,6 @@ Make sure the registration process is executed and linked. 1. Call maker class to complete `proto` and `checker` 2. Using the completed `proto` and `checker`, it will add a new key-value pair to the `OpInfoMap` -4. Invoke the `USE` macro in which the Op is used to make sure that it is linked. - --- # Backward Module (1/2) ### Create Backward Operator diff --git a/doc/howto/usage/cluster/cluster_train_cn.md b/doc/howto/usage/cluster/cluster_train_cn.md index 274452fbf0c595ad7b4dbeffe85ad9038f12b458..93c5544bcfa911f8bdcdaea39a75b3ab7ef218f8 100644 --- a/doc/howto/usage/cluster/cluster_train_cn.md +++ b/doc/howto/usage/cluster/cluster_train_cn.md @@ -1,135 +1,215 @@ -```eval_rst -.. _cluster_train: +# PaddlePaddle分布式训练 + +* [概述](#概述) +* [环境准备](#环境准备) +* [启动参数说明](#启动参数说明) + * [启动参数服务器](#启动参数服务器) + * [启动计算节点](#启动计算节点) + * [准备数据集](#准备数据集) + * [准备训练程序](#准备训练程序) +* [使用分布式计算平台或工具](#使用分布式计算平台或工具) + * [使用Fabric启动集群作业](#使用fabric启动集群作业) + * [准备一个Linux集群](#准备一个linux集群) + * [启动集群作业](#启动集群作业) + * [终止集群作业](#终止集群作业) + * [检查集群训练结果](#检查集群训练结果) + * [检查模型输出](#检查模型输出) + * [在OpenMPI集群中提交训练作业](#在openmpi集群中提交训练作业) + * [准备OpenMPI集群](#准备OpenMPI集群) + * [启动集群作业](#启动集群作业-1) + * [在Kubernetes集群中提交训练作业](#在kubernetes集群中提交训练作业) + +# 概述 +本文将介绍如何使用PaddlePaddle在不同的集群框架下完成分布式训练。分布式训练架构如下图所示: + + + +- 数据分片(Data shard): 用于训练神经网络的数据,被切分成多个部分,每个部分分别给每个trainer使用。 +- 计算节点(Trainer): 每个trainer启动后读取切分好的一部分数据,开始神经网络的“前馈”和“后馈”计算,并和参数服务器通信。在完成一定量数据的训练后,上传计算得出的梯度(gradients),然后下载优化更新后的神经网络参数(parameters)。 +- 参数服务器(Parameter server):每个参数服务器只保存整个神经网络所有参数的一部分。参数服务器接收从计算节点上传的梯度,并完成参数优化更新,再将更新后的参数下发到每个计算节点。 + +这样,通过计算节点和参数服务器的分布式协作,可以完成神经网络的SGD方法的训练。PaddlePaddle可以同时支持同步随机梯度下降(SGD)和异步随机梯度下降。 + +在使用同步SGD训练神经网络时,PaddlePaddle使用同步屏障(barrier),使梯度的提交和参数的更新按照顺序方式执行。在异步SGD中,则并不会等待所有trainer提交梯度才更新参数,这样极大地提高了计算的并行性:参数服务器之间不相互依赖,并行地接收梯度和更新参数,参数服务器也不会等待计算节点全部都提交梯度之后才开始下一步,计算节点之间也不会相互依赖,并行地执行模型的训练。可以看出,虽然异步SGD方式会提高参数更新并行度, 但是并不能保证参数同步更新,在任意时间某一台参数服务器上保存的参数可能比另一台要更新,与同步SGD相比,梯度会有噪声。 + +# 环境准备 + +1. 准备您的计算集群。计算集群通常由一组(几台到几千台规模)的Linux服务器组成。服务器之间可以通过局域网(LAN)联通,每台服务器具有集群中唯一的IP地址(或者可被DNS解析的主机名)。集群中的每台计算机通常被成为一个“节点”。 +1. 我们需要在集群的所有节点上安装 PaddlePaddle。 如果要启用GPU,还需要在节点上安装对应的GPU驱动以及CUDA。PaddlePaddle的安装可以参考[build_and_install](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/getstarted/build_and_install)的多种安装方式。我们推荐使用[Docker](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/getstarted/build_and_install/docker_install_cn.rst)安装方式来快速安装PaddlePaddle。 + +安装完成之后,执行下面的命令可以查看已经安装的版本(docker安装方式可以进入docker容器执行:`docker run -it paddlepaddle/paddle:[tag] /bin/bash`): +```bash +$ paddle version +PaddlePaddle 0.10.0, compiled with + with_avx: ON + with_gpu: OFF + with_double: OFF + with_python: ON + with_rdma: OFF + with_timer: OFF ``` -# 运行分布式训练 +下面以`doc/howto/usage/cluster/src/word2vec`中的代码作为实例,介绍使用PaddlePaddle v2 API完成分布式训练。 -在本文中,我们将阐释如何在集群上运行分布式 Paddle 训练作业。我们将以[推荐系统](https://github.com/baidu/Paddle/tree/develop/demo/recommendation)为例创建分布式的单进程训练。 +# 启动参数说明 +## 启动参数服务器 +执行以下的命令启动一个参数服务器并等待和计算节点的数据交互 +```bash +$ paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradient_servers=1 +``` -在本文中使用的[脚本](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train)通过 SSH 运行分布式作业。 它们还可以供那些运行更复杂的集群管理系统(如 MPI 和 [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s) )的用户参考。 +如果希望可以在后台运行pserver程序,并保存输出到一个日志文件,可以运行: +```bash +$ stdbuf -oL /usr/bin/nohup paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradient_servers=1 &> pserver.log +``` -## 前提条件 +| 参数 | 是否必选 | 默认值 | 说明 | +| ------------- | ------------- | ------------- | ------------- | +| port | 必选 | 7164 | pserver监听的起始端口,根据ports_num决定
总端口个数,从起始端口监听多个端口用于通信 | +| ports_num | 必选 | 1 | 监听的端口个数 | +| ports_num_for_sparse | 必选 | 1 | 用于稀疏类型参数通信的端口个数 | +| num_gradient_servers | 必选 | 1 | 当前训练任务pserver总数 | + +## 启动计算节点 +执行以下命令启动使用python编写的trainer程序(文件名为任意文件名,如train.py) +```bash +$ python train.py +``` -1. 上述脚本使用 Python 库 [fabric](http://www.fabfile.org/) 来运行 SSH 命令。 我们使用 `pip` 来安装 fabric: +trainer需要和pserver保持网络联通以完成训练。trainer启动需要传入端口、pserver地址等参数使trainer可以正确连接到pserver。这些参数可以通过环境变量(https://zh.wikipedia.org/wiki/环境变量 )或编写程序时`paddle.init()`中传入参数。如果同时使用`paddle.init()`参数和环境变量,将会优先使用`paddle.init()`中传入的参数。 - ```bash - pip install fabric - ``` +使用环境变量: -2. 我们需要在集群的所有节点上安装 PaddlePaddle。 如果要启用GPU,需要在 `/usr/local/cuda` 中安装 CUDA; 否则 Paddle 将在运行时报错。 +```bash +export PADDLE_INIT_USE_GPU=False +export PADDLE_INIT_TRAINER_COUNT=1 +export PADDLE_INIT_PORT=7164 +export PADDLE_INIT_PORTS_NUM=1 +export PADDLE_INIT_PORTS_NUM_FOR_SPARSE=1 +export PADDLE_INIT_NUM_GRADIENT_SERVERS=1 +export PADDLE_INIT_TRAINER_ID=0 +export PADDLE_INIT_PSERVERS=127.0.0.1 +``` -3. 在 [`cluster_train/conf.py`] 中设置 `ROOT_DIR`, 该 ROOT_DIR 要在所有节点上存在。为了方便起见,我们通常在所有节点上创建一个 Unix 用户 `paddle`,并设置 `ROOT_DIR=/home/paddle`。这样,我们可以将 SSH 公钥写入 `/home/paddle/.ssh/authorized_keys`,以便用户 `paddle` 可以 SSH 到所有节点而不用密码。 +使用参数: -## 准备工作空间 +```python +paddle.init( + use_gpu=False, + trainer_count=1, + port=7164, + ports_num=1, + ports_num_for_sparse=1, + num_gradient_servers=1, + trainer_id=0, + pservers="127.0.0.1") +``` -我们将放置依赖库、配置等文件的目录视为 *工作空间(workspace)*。 +| 参数 | 是否必选 | 默认 | 说明 | +| ------------- | ------------- | ------------- | ------------- | +| use_gpu | 可选 | False | 是否启用GPU训练 | +| trainer_count | 必选 | 1 | 当前训练任务trainer总个数 | +| port | 必选 | 7164 | 连接到pserver的端口 | +| ports_num | 必选 | 1 | 连接到pserver的端口个数 | +| ports_num_for_sparse | 必选 | 1 | 和pserver之间用于稀疏类型参数通信的端口个数 | +| num_gradient_servers | 必选 | 1 | 当前训练任务pserver总数 | +| trainer_id | 必选 | 0 | 每个trainer的唯一ID,从0开始的整数 | +| pservers | 必选 | 127.0.0.1 | 当前训练任务启动的pserver的IP列表,多个IP使用“,”隔开 | -这些 `train/test` 数据应该在启动集群作业之前准备好。 为了满足训练/测试数据放置在工作空间中不同目录的要求,PADDLE 根据在模型配置文件中使用的名为 `train.list/test.list` 的索引文件引用训练/测试数据,所以训练/测试数据也包含 train.list/test.list 两个列表文件。所有本地训练 demo 已经提供了脚本来帮助您创建这两个文件,并且集群作业中的所有节点将在正常情况下处理具有相同逻辑代码的文件。 -通常,你可以使用本地训练中的相同模型文件进行集群训练。请记住,在模型文件的 `setting`函数中设置的 `batch_size` 表示在集群作业**每个**节点中的 batch 大小,而不是使用同步 SGD 的总 batch 大小。 +## 准备数据集 -以下步骤基于 demo 目录中的 [demo/recommendation](https://github.com/PaddlePaddle/Paddle/tree/develop/demo/recommendation)。 +参考样例数据准备脚本[prepare.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/prepare.py),准备训练数据和验证数据集,我们使用paddle.dataset.imikolov数据集,并根据分布式训练并发数(trainer节点个数),在`prepare.py`开头部分指定`SPLIT_COUNT`将数据切分成多份。 -你只需完成 demo/recommendation 教程文档到 `Train` 的部分,之后你会得到训练/测试数据和模型配置文件。最后,只需使用 demo/recommendation 作为集群训练的工作空间。 +在线上系统中,通常会使用MapReduce任务的输出结果作为训练结果,这样训练文件的个数会比较多,而且个数并不确定。在trainer中可以使用下面取模的方法为每个trainer分配训练数据文件: -最后,你的工作空间应如下所示: -``` -. -|-- common_utils.py -|-- data -| |-- config.json -| |-- config_generator.py -| |-- meta.bin -| |-- meta_config.json -| |-- meta_generator.py -| |-- ml-1m -| |-- ml_data.sh -| |-- ratings.dat.test -| |-- ratings.dat.train -| |-- split.py -| |-- test.list -| `-- train.list -|-- dataprovider.py -|-- evaluate.sh -|-- prediction.py -|-- preprocess.sh -|-- requirements.txt -|-- run.sh -`-- trainer_config.py +```python +import os +train_list = [] +flist = os.listdir("/train_data/") +for f in flist: + suffix = int(f.split("-")[1]) + if suffix % TRAINER_COUNT == TRAINER_ID: + train_list.append(f) ``` -虽然这些文件并非都需要集群训练,但是也没有必要删除无用的文件。 - -`trainer_config.py` -表示模型配置文件。 -`train.list` 和 `test.list` -文件索引。它存储当前节点所有训练/测试数据的所有相对或绝对文件路径。 +示例程序`prepare.py`会把训练集和测试集分别分割成多个文件(例子中为3个,后缀为`-00000`、`-00001`和`-00002`): +``` +train.txt +train.txt-00000 +train.txt-00001 +train.txt-00002 +test.txt +test.txt-00000 +test.txt-00001 +test.txt-00002 +``` -`dataprovider.py` -用于读取训练/测试样本。这与本地训练相同。 +在进行分布式训练时,每个trainer进程需要能够读取属于自己的一份数据。在一些分布式系统中,系统会提供一个分布式存储服务,这样保存在分布式存储中的数据可以被集群中的每个节点读取到。如果不使用分布式存储,则需要手动拷贝属于每个trainer节点的训练数据到对应的节点上。 -`data` -数据目录中的所有文件被 train.list/test.list 引用。 +对于不同的训练任务,训练数据格式和训练程序的`reader()`会大不相同,所以开发者需要根据自己训练任务的实际场景完成训练数据的分割和`reader()`的编写。 +## 准备训练程序 -## 准备集群作业配置 +我们会对每个训练任务都会在每个节点上创建一个工作空间(workspace),其中包含了用户的训练程序、程序依赖、挂载或下载的训练数据分片。 -以下选项必须在 cluster_train/conf.py 中认真设置 +最后,工作空间应如下所示: +``` +. +|-- my_lib.py +|-- word_dict.pickle +|-- train.py +|-- train_data_dir/ +| |-- train.txt-00000 +| |-- train.txt-00001 +| |-- train.txt-00002 +`-- test_data_dir/ + |-- test.txt-00000 + |-- test.txt-00001 + `-- test.txt-00002 +``` -`HOSTS` 所有节点运行集群作业的主机名或 IP 。你还可以将用户和 ssh 端口附加到主机名上,例如 root@192.168.100.17:9090。 +- `my_lib.py`:会被`train.py`调用的一些用户定义的库函数,比如PIL库等。 +- `word_dict.pickle`:在`train.py`中会使用到的字典数据文件。 +- `train.py`:训练程序,代码参考[api_train_v2_cluster.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/prepare.py)。***注意:*** 对于本样例代码,在使用不同的分布式计算平台时,您可能需要修改`train.py`开头的部分(如下),以便获得训练数据的位置和获取环境变量配置: -`ROOT_DIR` 用于放置 JOB 工作空间目录的工作空间 ROOT 目录 + ```python + cluster_train_file = "./train_data_dir/train/train.txt" + cluster_test_file = "./test_data_dir/test/test.txt" + node_id = os.getenv("OMPI_COMM_WORLD_RANK") + if not node_id: + raise EnvironmentError("must provied OMPI_COMM_WORLD_RANK") + ``` -`PADDLE_NIC` 集群通信通道的 NIC(Network Interface Card, 网络接口卡) 接口名称,例如以太网的 eth0,infiniband 的 ib0。 +- `train_data_dir`:包含训练数据的目录,可以是从分布式存储挂载过来的,也可以是在任务启动前下载到本地的。 +- `test_data_dir`:包含测试数据集的目录。 -`PADDLE_PORT` 集群通信通道的端口号 +# 使用分布式计算平台或工具 -`PADDLE_PORTS_NUM` 用于集群通信通道的端口数。 如果集群节点数量少(少于5〜6个节点),建议将其设置为较大,如2〜8,以获得更好的网络性能。 +PaddlePaddle可以使用多种分布式计算平台构建分布式计算任务,包括: +- [Kubernetes](http://kubernetes.io) Google开源的容器集群的调度框架,支持大规模集群生产环境的完整集群方案。 +- [OpenMPI](https://www.open-mpi.org) 成熟的高性能并行计算框架。 +- [Fabric](http://www.fabfile.org) 集群管理工具。可以使用`Fabric`编写集群任务提交和管理脚本。 -`PADDLE_PORTS_NUM_FOR_SPARSE` 用于 sparse remote updater 集群通信信道的端口数。如果使用 sparse remote update,则可以像 `PADDLE_PORTS_NUM` 一样设置。 +对于不同的集群平台,会分别介绍集群作业的启动和停止方法。这些例子都可以在[cluster_train_v2](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/scripts/cluster_train_v2)找到。 -`LD_LIBRARY_PATH` 为集群作业设置额外的 LD_LIBRARY_PATH。你可以使用它来设置 CUDA 库的路径。 +在使用分布式计算平台进行训练时,任务被调度在集群中时,分布式计算平台通常会通过API或者环境变量提供任务运行需要的参数,比如节点的ID、IP和任务节点个数等。 -默认配置如下: +## 使用Fabric启动集群作业 -```python -HOSTS = [ - "root@192.168.100.17", - "root@192.168.100.18", - ] - -''' -工作空间配置 -''' - -#工作空间根目录 -ROOT_DIR = "/home/paddle" - -''' -网络配置 -''' -#pserver NIC -PADDLE_NIC = "eth0" -#pserver 端口 -PADDLE_PORT = 7164 -#pserver 端口数 -PADDLE_PORTS_NUM = 2 -#pserver sparse ports num -PADDLE_PORTS_NUM_FOR_SPARSE = 2 - -#集群作业中所有进程的环境设置 -LD_LIBRARY_PATH="/usr/local/cuda/lib64:/usr/lib64" -``` +### 准备一个Linux集群 +可以在`paddle/scripts/cluster_train_v2/fabric/docker_cluster`目录下,执行`kubectl -f ssh_servers.yaml`启动一个测试集群,并使用`kubectl get po -o wide`获得这些节点的IP地址。 ### 启动集群作业 -`paddle.py` 提供了自动化脚本来启动不同节点中的所有 PaddlePaddle 集群进程。默认情况下,所有命令行选项可以设置为```paddle.py``` 命令选项并且 `paddle.py` 将透明、自动地将这些选项应用到 PaddlePaddle 底层进程。 + +`paddle.py` 提供了自动化脚本来启动不同节点中的所有 PaddlePaddle 集群进程。默认情况下,所有命令行选项可以设置为 `paddle.py` 命令选项并且 `paddle.py` 将透明、自动地将这些选项应用到 PaddlePaddle 底层进程。 `paddle.py` 为方便作业启动提供了两个独特的命令选项。 -`job_dispatch_package` 设为本地 `workspace` 目录,它将被分发到 conf.py 中设置的所有节点。 它有助于帮助频繁修改和访问工作区文件的用户减少负担,否则频繁的多节点工作空间部署可能会很麻烦。 -`job_workspace` 设为已部署的工作空间目录,`paddle.py` 将跳过分发阶段直接启动所有节点的集群作业。它可以帮助减少分发延迟。 +- `job_dispatch_package` 设为本地 `workspace` 目录,它将被分发到 `conf.py` 中设置的所有节点。它有助于帮助频繁修改和访问工作区文件的用户减少负担,否则频繁的多节点工作空间部署可能会很麻烦。 +- `job_workspace` 设为已部署的工作空间目录,`paddle.py` 将跳过分发阶段直接启动所有节点的集群作业。它可以帮助减少分发延迟。 -`cluster_train/run.sh` 提供了命令样例来运行 `demo/recommendation` 集群工作,只需用你定义的目录修改 `job_dispatch_package` 和 `job_workspace`,然后: +`cluster_train/run.sh` 提供了命令样例来运行 `doc/howto/usage/cluster/src/word2vec` 集群任务,只需用您定义的目录修改 `job_dispatch_package` 和 `job_workspace`,然后: ``` sh run.sh ``` @@ -149,7 +229,7 @@ sh run.sh 提供 pserver 运行日志,有助于诊断分布式错误。 `server.log` -提供 pserver 进程的 stderr 和 stdout。训练失败时可以检查错误日志。 +提供 parameter server 进程的 stderr 和 stdout。训练失败时可以检查错误日志。 `train.log` 提供训练过程的 stderr 和 stdout。训练失败时可以检查错误日志。 @@ -157,3 +237,49 @@ sh run.sh ### 检查模型输出 运行完成后,模型文件将被写入节点 0 的 `output` 目录中。 工作空间中的 `nodefile` 表示当前集群作业的节点 ID。 + +## 在OpenMPI集群中提交训练作业 + +### 准备OpenMPI集群 + +执行下面的命令以启动3个节点的OpenMPI集群和一个"head"节点: + +```bash +paddle/scripts/cluster_train_v2/openmpi/docker_cluster +kubectl create -f head.yaml +kubectl create -f mpi-nodes.yaml +``` + +然后可以从head节点ssh无密码登录到OpenMPI的每个节点上。 + +### 启动集群作业 + +您可以按照下面的步骤在OpenMPI集群中提交paddle训练任务: + +```bash +# 获得head和node节点的IP地址 +kubectl get po -o wide +# 将node节点的IP地址保存到machines文件中 +kubectl get po -o wide | grep nodes | awk '{print $6}' > machines +# 拷贝必要的文件到head节点 +scp -i ssh/id_rsa.mpi.pub machines prepare.py train.py start_mpi_train.sh tutorial@[headIP]:~ +# ssh 登录到head节点 +ssh -i ssh/id_rsa.mpi.pub tutorial@[headIP] +# --------------- 以下操作均在head节点中执行 --------------- +# 准备训练数据 +python prepare.py +# 拷贝训练程序和字典文件到每台MPI节点 +cat machines | xargs -i scp word_dict.pickle train.py start_mpi_train.sh machines {}:/home/tutorial +# 创建日志目录 +mpirun -hostfile machines -n 3 mkdir /home/tutorial/logs +# 拷贝训练数据到各自的节点 +scp train.txt-00000 test.txt-00000 [node1IP]:/home/tutorial +scp train.txt-00001 test.txt-00001 [node2IP]:/home/tutorial +scp train.txt-00002 test.txt-00002 [node3IP]:/home/tutorial +# 启动训练任务 +mpirun -hostfile machines -n 3 /home/tutorial/start_mpi_train.sh +``` + +## 在Kubernetes集群中提交训练作业 + +此部分的使用方法可以参考[here](../k8s/k8s_distributed_cn.md)。 diff --git a/doc/howto/usage/cluster/cluster_train_en.md b/doc/howto/usage/cluster/cluster_train_en.md index c60876721cbf5565d6e48c8061811aacada748cd..1e8b4d54b9ffa99b3beef35ecaf95bbd0866535f 100644 --- a/doc/howto/usage/cluster/cluster_train_en.md +++ b/doc/howto/usage/cluster/cluster_train_en.md @@ -1,129 +1,220 @@ -# Run Distributed Training +# PaddlePaddle Distributed Training + +* [Introduction](#introduction) +* [Preparations](#preparations) +* [Command-line arguments](#command-line-arguments) + * [Starting parameter server](#starting-parameter-server) + * [Starting trainer](#starting-trainer) + * [Prepare Training Dataset](#prepare-training-dataset) + * [Prepare Training program](#prepare-training-program) +* [Use cluster platforms or cluster management tools](#use-cluster-platforms-or-cluster-management-tools) + * [Cluster Training Using Fabric](#cluster-training-using-fabric) + * [Prepare a Linux cluster](#prepare-a-linux-cluster) + * [Launching Cluster Job](#launching-cluster-job) + * [Kill Cluster Job](#kill-cluster-job) + * [Check Cluster Training Result](#check-cluster-training-result) + * [Check Model Output](#check-model-output) + * [Cluster Training Using OpenMPI](#cluster-training-using-openmpi) + * [Prepare an OpenMPI cluster](#prepare-an-openmpi-cluster) + * [Launching Cluster Job](#launching-cluster-job-1) + * [Cluster Training Using Kubernetes](#cluster-training-using-kubernetes) + +# Introduction + +In this article, we'll explain how to run distributed training jobs with PaddlePaddle on different types of clusters. The diagram below shows the main architecture of a distributed trainning job: + + + +- Data shard: training data will be split into multiple partitions, trainers use the partitions of the whole dataset to do the training job. +- Trainer: each trainer reads the data shard, and train the neural network. Then the trainer will upload calculated "gradients" to parameter servers, and wait for parameters to be optimized on the parameter server side. When that finishes, the trainer download optimized parameters and continues its training. +- Parameter server: every parameter server stores part of the whole neural network model data. They will do optimization calculations when gradients are uploaded from trainers, and then send updated parameters to trainers. + +PaddlePaddle can support both synchronize stochastic gradient descent (SGD) and asynchronous SGD. + +When training with synchronize SGD, PaddlePaddle uses an internal "synchronize barrier" which makes gradients update and parameter download in strict order. On the other hand, asynchronous SGD won't wait for all trainers to finish upload at a single step, this will increase the parallelism of distributed training: parameter servers do not depend on each other, they'll do parameter optimization concurrently. Parameter servers will not wait for trainers, so trainers will also do their work concurrently. But asynchronous SGD will introduce more randomness and noises in the gradient. + +# Preparations +1. Prepare your computer cluster. It's normally a bunch of Linux servers connected by LAN. Each server will be assigned a unique IP address. The computers in the cluster can be called "nodes". +2. Install PaddlePaddle on every node. If you are going to take advantage of GPU cards, you'll also need to install proper driver and CUDA libraries. To install PaddlePaddle please read [this build and install](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/getstarted/build_and_install) document. We strongly recommend using [Docker installation](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/getstarted/build_and_install/docker_install_en.rst). + +After installation, you can check the version by typing the below command (run a docker container if using docker: `docker run -it paddlepaddle/paddle:[tag] /bin/bash`): + +```bash +$ paddle version +PaddlePaddle 0.10.0rc, compiled with + with_avx: ON + with_gpu: OFF + with_double: OFF + with_python: ON + with_rdma: OFF + with_timer: OFF +``` -In this article, we explain how to run distributed Paddle training jobs on clusters. We will create the distributed version of the single-process training example, [recommendation](https://github.com/baidu/Paddle/tree/develop/demo/recommendation). +We'll take `doc/howto/usage/cluster/src/word2vec` as an example to introduce distributed training using PaddlePaddle v2 API. -[Scripts](https://github.com/baidu/Paddle/tree/develop/paddle/scripts/cluster_train) used in this article launch distributed jobs via SSH. They also work as a reference for users running more sophisticated cluster management systems like MPI and [Kubernetes](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/k8s). +# Command-line arguments -## Prerequisite +## Starting parameter server -1. Aforementioned scripts use a Python library [fabric](http://www.fabfile.org/) to run SSH commands. We can use `pip` to install fabric: +Type the below command to start a parameter server which will wait for trainers to connect: - ```bash - pip install fabric - ``` +```bash +$ paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradient_servers=1 +``` -1. We need to install PaddlePaddle on all nodes in the cluster. To enable GPUs, we need to install CUDA in `/usr/local/cuda`; otherwise Paddle would report errors at runtime. +If you wish to run parameter servers in background, and save a log file, you can type: +```bash +$ stdbuf -oL /usr/bin/nohup paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradient_servers=1 &> pserver.log +``` -1. Set the `ROOT_DIR` variable in [`cluster_train/conf.py`] on all nodes. For convenience, we often create a Unix user `paddle` on all nodes and set `ROOT_DIR=/home/paddle`. In this way, we can write public SSH keys into `/home/paddle/.ssh/authorized_keys` so that user `paddle` can SSH to all nodes without password. +| param | required | default | description | +| ------------- | ------------- | ------------- | ------------- | +| port | required | 7164 | port which parameter server will listen on. If ports_num greater than 1, parameter server will listen on multiple ports for more network throughput | +| ports_num | required | 1 | total number of ports will listen on | +| ports_num_for_sparse | required | 1 | number of ports which serves sparse parameter update | +| num_gradient_servers | required | 1 | total number of gradient servers | -## Prepare Job Workspace +## Starting trainer +Type the command below to start the trainer(name the file whatever you want, like "train.py") -We refer to the directory where we put dependent libraries, config files, etc., as *workspace*. +```bash +$ python train.py +``` -These `train/test` data should be prepared before launching cluster job. To satisfy the requirement that train/test data are placed in different directory from workspace, PADDLE refers train/test data according to index file named as `train.list/test.list` which are used in model config file. So the train/test data also contains train.list/test.list two list file. All local training demo already provides scripts to help you create these two files, and all nodes in cluster job will handle files with same logical code in normal condition. +Trainers' network need to be connected with parameter servers' network to finish the job. Trainers need to know port and IPs to locate parameter servers. You can pass arguments to trainers through [environment variables](https://en.wikipedia.org/wiki/Environment_variable) or pass to `paddle.init()` function. Arguments passed to the `paddle.init()` function will overwrite environment variables. -Generally, you can use same model file from local training for cluster training. What you should have in mind that, the `batch_size` set in `setting` function in model file means batch size in `each` node of cluster job instead of total batch size if synchronization SGD was used. +Use environment viriables: -Following steps are based on [demo/recommendation](https://github.com/PaddlePaddle/Paddle/tree/develop/demo/recommendation) demo in demo directory. +```bash +export PADDLE_INIT_USE_GPU=False +export PADDLE_INIT_TRAINER_COUNT=1 +export PADDLE_INIT_PORT=7164 +export PADDLE_INIT_PORTS_NUM=1 +export PADDLE_INIT_PORTS_NUM_FOR_SPARSE=1 +export PADDLE_INIT_NUM_GRADIENT_SERVERS=1 +export PADDLE_INIT_TRAINER_ID=0 +export PADDLE_INIT_PSERVERS=127.0.0.1 +python train.py +``` -You just go through demo/recommendation tutorial doc until `Train` section, and at last you will get train/test data and model configuration file. Finaly, just use demo/recommendation as workspace for cluster training. +Pass arguments: -At last your workspace should look like as follow: +```python +paddle.init( + use_gpu=False, + trainer_count=1, + port=7164, + ports_num=1, + ports_num_for_sparse=1, + num_gradient_servers=1, + trainer_id=0, + pservers="127.0.0.1") ``` -. -|-- common_utils.py -|-- data -| |-- config.json -| |-- config_generator.py -| |-- meta.bin -| |-- meta_config.json -| |-- meta_generator.py -| |-- ml-1m -| |-- ml_data.sh -| |-- ratings.dat.test -| |-- ratings.dat.train -| |-- split.py -| |-- test.list -| `-- train.list -|-- dataprovider.py -|-- evaluate.sh -|-- prediction.py -|-- preprocess.sh -|-- requirements.txt -|-- run.sh -`-- trainer_config.py + +| param | required | default | description | +| ------------- | ------------- | ------------- | ------------- | +| use_gpu | optional | False | set to "True" to enable GPU training | +| trainer_count | required | 1 | total count of trainers in the training job | +| port | required | 7164 | port to connect to parameter server | +| ports_num | required | 1 | number of ports for communication | +| ports_num_for_sparse | required | 1 | number of ports for sparse type caculation | +| num_gradient_servers | required | 1 | total number of gradient server | +| trainer_id | required | 0 | ID for every trainer, start from 0 | +| pservers | required | 127.0.0.1 | list of IPs of parameter servers, separated by "," | + +## Prepare Training Dataset + +Here's some example code [prepare.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/prepare.py), it will download public `imikolov` dataset and split it into multiple files according to job parallelism(trainers count). Modify `SPLIT_COUNT` at the begining of `prepare.py` to change the count of output files. + +In the real world, we often use `MapReduce` job's output as training data, so there will be lots of files. You can use `mod` to assign training file to trainers: + +```python +import os +train_list = [] +flist = os.listdir("/train_data/") +for f in flist: + suffix = int(f.split("-")[1]) + if suffix % TRAINER_COUNT == TRAINER_ID: + train_list.append(f) +``` + +Example code `prepare.py` will split training data and testing data into 3 files with digital suffix like `-00000`, `-00001` and`-00002`: + +``` +train.txt +train.txt-00000 +train.txt-00001 +train.txt-00002 +test.txt +test.txt-00000 +test.txt-00001 +test.txt-00002 ``` -Not all of these files are needed for cluster training, but it's not necessary to remove useless files. -`trainer_config.py` -Indicates the model config file. +When job started, every trainer needs to get it's own part of data. In some distributed systems a storage service will be provided, so the date under that path can be accessed by all the trainer nodes. Without the storage service, you must copy the training data to each trainer node. -`train.list` and `test.list` -File index. It stores all relative or absolute file paths of all train/test data at current node. +Different training jobs may have different data format and `reader()` function, developers may need to write different data prepare scripts and `reader()` functions for their job. -`dataprovider.py` -used to read train/test samples. It's same as local training. +## Prepare Training program -`data` -all files in data directory are refered by train.list/test.list which are refered by data provider. +We'll create a *workspace* directory on each node, storing your training program, dependencies, mounted or downloaded dataset directory. -## Prepare Cluster Job Configuration +Your workspace may looks like: +``` +. +|-- my_lib.py +|-- word_dict.pickle +|-- train.py +|-- train_data_dir/ +| |-- train.txt-00000 +| |-- train.txt-00001 +| |-- train.txt-00002 +`-- test_data_dir/ + |-- test.txt-00000 + |-- test.txt-00001 + `-- test.txt-00002 +``` -The options below must be carefully set in cluster_train/conf.py +- `my_lib.py`: user defined libraries, like PIL libs. This is optional. +- `word_dict.pickle`: dict file for training word embeding. +- `train.py`: training program. Sample code: [api_train_v2_cluster.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/prepare.py). ***NOTE:*** You may need to modify the head part of `train.py` when using different cluster platform to retrive configuration environment variables: -`HOSTS` all nodes hostname or ip that will run cluster job. You can also append user and ssh port with hostname, such as root@192.168.100.17:9090. + ```python + cluster_train_file = "./train_data_dir/train/train.txt" + cluster_test_file = "./test_data_dir/test/test.txt" + node_id = os.getenv("OMPI_COMM_WORLD_RANK") + if not node_id: + raise EnvironmentError("must provied OMPI_COMM_WORLD_RANK") + ``` -`ROOT_DIR` workspace ROOT directory for placing JOB workspace directory +- `train_data_dir`: containing training data. Mount from storage service or copy trainning data to here. +- `test_data_dir`: containing testing data. -`PADDLE_NIC` the NIC(Network Interface Card) interface name for cluster communication channel, such as eth0 for ethternet, ib0 for infiniband. +# Use cluster platforms or cluster management tools -`PADDLE_PORT` port number for cluster commnunication channel +PaddlePaddle supports running jobs on several platforms including: +- [Kubernetes](http://kubernetes.io) open-source system for automating deployment, scaling, and management of containerized applications from Google. +- [OpenMPI](https://www.open-mpi.org) Mature high performance parallel computing framework. +- [Fabric](http://www.fabfile.org) A cluster management tool. Write scripts to submit jobs or manage the cluster. -`PADDLE_PORTS_NUM` the number of port used for cluster communication channle. if the number of cluster nodes is small(less than 5~6nodes), recommend you set it to larger, such as 2 ~ 8, for better network performance. +We'll introduce cluster job management on these platforms. The examples can be found under [cluster_train_v2](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/scripts/cluster_train_v2). -`PADDLE_PORTS_NUM_FOR_SPARSE` the number of port used for sparse updater cluster commnunication channel. if sparse remote update is used, set it like `PADDLE_PORTS_NUM` +These cluster platforms provide API or environment variables for training processes, when the job is dispatched to different nodes. Like node ID, IP or total number of nodes etc. -`LD_LIBRARY_PATH` set addtional LD_LIBRARY_PATH for cluster job. You can use it to set CUDA libraries path. +## Cluster Training Using Fabric -Default Configuration as follow: +### Prepare a Linux cluster -```python -HOSTS = [ - "root@192.168.100.17", - "root@192.168.100.18", - ] - -''' -workspace configuration -''' - -#root dir for workspace -ROOT_DIR = "/home/paddle" - -''' -network configuration -''' -#pserver nics -PADDLE_NIC = "eth0" -#pserver port -PADDLE_PORT = 7164 -#pserver ports num -PADDLE_PORTS_NUM = 2 -#pserver sparse ports num -PADDLE_PORTS_NUM_FOR_SPARSE = 2 - -#environments setting for all processes in cluster job -LD_LIBRARY_PATH="/usr/local/cuda/lib64:/usr/lib64" -``` +Run `kubectl -f ssh_servers.yaml` under the directory: `paddle/scripts/cluster_train_v2/fabric/docker_cluster` will launch a demo cluster. Run `kubectl get po -o wide` to get IP addresses of these nodes. ### Launching Cluster Job -`paddle.py` provides automatical scripts to start all PaddlePaddle cluster processes in different nodes. By default, all command line options can set as `paddle.py` command options and `paddle.py` will transparently and automatically set these options to PaddlePaddle lower level processes. +`paddle.py` provides automatical scripts to start all PaddlePaddle cluster processes in different nodes. By default, all command line options can be set as `paddle.py` command options and `paddle.py` will transparently and automatically set these options to PaddlePaddle lower level processes. `paddle.py`provides two distinguished command option for easy job launching. -`job_dispatch_package` set it with local `workspace`directory, it will be dispatched to all nodes set in conf.py. It could be helpful for frequent hacking workspace files, otherwise frequent mulit-nodes workspace deployment could make your crazy. -`job_workspace` set it with already deployed workspace directory, `paddle.py` will skip dispatch stage to directly launch cluster job with all nodes. It could help to reduce heavy +- `job_dispatch_package` set it with local `workspace` directory, it will be dispatched to all nodes which is set in `conf.py`. It could be helpful for frequently manipulating workspace files. otherwise, frequent multi-nodes workspace deployment is very annoying. +- `job_workspace` set it with already deployed workspace directory, `paddle.py` will skip dispatch stage to directly launch cluster job with all nodes. It could help to reduce heavy dispatch latency. `cluster_train/run.sh` provides command line sample to run `demo/recommendation` cluster job, just modify `job_dispatch_package` and `job_workspace` with your defined directory, then: @@ -134,23 +225,69 @@ sh run.sh The cluster Job will start in several seconds. ### Kill Cluster Job -`paddle.py` can capture `Ctrl + C` SIGINT signal to automatically kill all processes launched by it. So just stop `paddle.py` to kill cluster job. You should mannally kill job if program crashed. +`paddle.py` can capture `Ctrl + C` SIGINT signal to automatically kill all processes launched by it. So just stop `paddle.py` to kill cluster job. You should manually kill the job if the program crashed. ### Check Cluster Training Result Check log in $workspace/log for details, each node owns same log structure. `paddle_trainer.INFO` -It provides almost all interal output log for training, same as local training. Check runtime model convergence here. +It provides almost all internal output log for training, same as local training. Check runtime model convergence here. `paddle_pserver2.INFO` -It provides pserver running log, which could help to diagnose distributed error. +It provides parameter server running log, which could help to diagnose distributed error. `server.log` -It provides stderr and stdout of pserver process. Check error log if training crashs. +It provides stderr and stdout of parameter server process. Check error log if training crashes. `train.log` -It provides stderr and stdout of trainer process. Check error log if training crashs. +It provides stderr and stdout of trainer process. Check error log if training crashes. ### Check Model Output -After one pass finished, model files will be writed in `output` directory in node 0. +After one pass finished, model files will be written in `output` directory in node 0. `nodefile` in workspace indicates the node id of current cluster job. + +## Cluster Training Using OpenMPI + +### Prepare an OpenMPI cluster + +Run the following command to start a 3-node MPI cluster and one "head" node. + +```bash +cd paddle/scripts/cluster_train_v2/openmpi/docker_cluster +kubectl create -f head.yaml +kubectl create -f mpi-nodes.yaml +``` + +Then you can log in to every OpenMPI node using ssh without input any passwords. + +### Launching Cluster Job + +Follow the steps to launch a PaddlePaddle training job in OpenMPI cluster:\ + +```bash +# find out node IP addresses +kubectl get po -o wide +# generate a "machines" file containing node IP addresses +kubectl get po -o wide | grep nodes | awk '{print $6}' > machines +# copy necessary files onto "head" node +scp -i ssh/id_rsa.mpi.pub machines prepare.py train.py start_mpi_train.sh tutorial@[headIP]:~ +# login to head node using ssh +ssh -i ssh/id_rsa.mpi.pub tutorial@[headIP] +# --------------- in head node --------------- +# prepare training data +python prepare.py +# copy training data and dict file to MPI nodes +cat machines | xargs -i scp word_dict.pickle train.py start_mpi_train.sh machines {}:/home/tutorial +# creat a directory for storing log files +mpirun -hostfile machines -n 3 mkdir /home/tutorial/logs +# copy training data to every node +scp train.txt-00000 test.txt-00000 [node1IP]:/home/tutorial +scp train.txt-00001 test.txt-00001 [node2IP]:/home/tutorial +scp train.txt-00002 test.txt-00002 [node3IP]:/home/tutorial +# start the job +mpirun -hostfile machines -n 3 /home/tutorial/start_mpi_train.sh +``` + +## Cluster Training Using Kubernetes + +The details can be found [here](../k8s/k8s_cn.md) diff --git a/doc/howto/usage/cluster/src/trainer.png b/doc/howto/usage/cluster/src/trainer.png new file mode 100644 index 0000000000000000000000000000000000000000..6537d3d56589ca9f19a77a50a970e4b5275e6ce0 Binary files /dev/null and b/doc/howto/usage/cluster/src/trainer.png differ diff --git a/doc/howto/usage/cluster/src/trainer_cn.png b/doc/howto/usage/cluster/src/trainer_cn.png new file mode 100644 index 0000000000000000000000000000000000000000..f9525739cc8bc6506adde642aafa0a85ae3ebebc Binary files /dev/null and b/doc/howto/usage/cluster/src/trainer_cn.png differ diff --git a/doc/howto/usage/cluster/src/word2vec/api_train_v2.py b/doc/howto/usage/cluster/src/word2vec/api_train_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..c0940f0e56eafa22f8aeb7052c0ddc79d8862917 --- /dev/null +++ b/doc/howto/usage/cluster/src/word2vec/api_train_v2.py @@ -0,0 +1,100 @@ +import gzip +import math + +import paddle.v2 as paddle + +embsize = 32 +hiddensize = 256 +N = 5 + + +def wordemb(inlayer): + wordemb = paddle.layer.embedding( + input=inlayer, + size=embsize, + param_attr=paddle.attr.Param( + name="_proj", + initial_std=0.001, + learning_rate=1, + l2_rate=0, + sparse_update=True)) + return wordemb + + +def main(): + # for local training + cluster_train = False + + if not cluster_train: + paddle.init(use_gpu=False, trainer_count=1) + else: + paddle.init( + use_gpu=False, + trainer_count=2, + port=7164, + ports_num=1, + ports_num_for_sparse=1, + num_gradient_servers=1) + word_dict = paddle.dataset.imikolov.build_dict() + dict_size = len(word_dict) + firstword = paddle.layer.data( + name="firstw", type=paddle.data_type.integer_value(dict_size)) + secondword = paddle.layer.data( + name="secondw", type=paddle.data_type.integer_value(dict_size)) + thirdword = paddle.layer.data( + name="thirdw", type=paddle.data_type.integer_value(dict_size)) + fourthword = paddle.layer.data( + name="fourthw", type=paddle.data_type.integer_value(dict_size)) + nextword = paddle.layer.data( + name="fifthw", type=paddle.data_type.integer_value(dict_size)) + + Efirst = wordemb(firstword) + Esecond = wordemb(secondword) + Ethird = wordemb(thirdword) + Efourth = wordemb(fourthword) + + contextemb = paddle.layer.concat(input=[Efirst, Esecond, Ethird, Efourth]) + hidden1 = paddle.layer.fc(input=contextemb, + size=hiddensize, + act=paddle.activation.Sigmoid(), + layer_attr=paddle.attr.Extra(drop_rate=0.5), + bias_attr=paddle.attr.Param(learning_rate=2), + param_attr=paddle.attr.Param( + initial_std=1. / math.sqrt(embsize * 8), + learning_rate=1)) + predictword = paddle.layer.fc(input=hidden1, + size=dict_size, + bias_attr=paddle.attr.Param(learning_rate=2), + act=paddle.activation.Softmax()) + + def event_handler(event): + if isinstance(event, paddle.event.EndIteration): + if event.batch_id % 100 == 0: + with gzip.open("batch-" + str(event.batch_id) + ".tar.gz", + 'w') as f: + trainer.save_parameter_to_tar(f) + result = trainer.test( + paddle.batch( + paddle.dataset.imikolov.test(word_dict, N), 32)) + print "Pass %d, Batch %d, Cost %f, %s, Testing metrics %s" % ( + event.pass_id, event.batch_id, event.cost, event.metrics, + result.metrics) + + cost = paddle.layer.classification_cost(input=predictword, label=nextword) + + parameters = paddle.parameters.create(cost) + adagrad = paddle.optimizer.AdaGrad( + learning_rate=3e-3, + regularization=paddle.optimizer.L2Regularization(8e-4)) + trainer = paddle.trainer.SGD(cost, + parameters, + adagrad, + is_local=not cluster_train) + trainer.train( + paddle.batch(paddle.dataset.imikolov.train(word_dict, N), 32), + num_passes=30, + event_handler=event_handler) + + +if __name__ == '__main__': + main() diff --git a/doc/howto/usage/cluster/src/word2vec/api_train_v2_cluster.py b/doc/howto/usage/cluster/src/word2vec/api_train_v2_cluster.py new file mode 100644 index 0000000000000000000000000000000000000000..2e6d8887124a5524505b097803a60a35478ca644 --- /dev/null +++ b/doc/howto/usage/cluster/src/word2vec/api_train_v2_cluster.py @@ -0,0 +1,123 @@ +import math +import os +import paddle.v2 as paddle +import pickle + +embsize = 32 +hiddensize = 256 +N = 5 +cluster_train_file = "./train_data_dir/train/train.txt" +cluster_test_file = "./test_data_dir/test/test.txt" +node_id = os.getenv("OMPI_COMM_WORLD_RANK") +if not node_id: + raise EnvironmentError("must provied OMPI_COMM_WORLD_RANK") + + +def wordemb(inlayer): + wordemb = paddle.layer.embedding( + input=inlayer, + size=embsize, + param_attr=paddle.attr.Param( + name="_proj", + initial_std=0.001, + learning_rate=1, + l2_rate=0, + sparse_update=True)) + return wordemb + + +def cluster_reader_cluster(filename, node_id): + def cluster_reader(): + with open("-".join([filename, "%05d" % int(node_id)]), "r") as f: + for l in f: + csv_data = [int(cell) for cell in l.split(",")] + yield tuple(csv_data) + + return cluster_reader + + +def main(): + # get arguments from env + + # for local training + TRUTH = ["true", "True", "TRUE", "1", "yes", "Yes", "YES"] + cluster_train = os.getenv('PADDLE_CLUSTER_TRAIN', "False") in TRUTH + use_gpu = os.getenv('PADDLE_INIT_USE_GPU', "False") + + if not cluster_train: + paddle.init( + use_gpu=use_gpu, + trainer_count=int(os.getenv("PADDLE_INIT_TRAINER_COUNT", "1"))) + else: + paddle.init( + use_gpu=use_gpu, + trainer_count=int(os.getenv("PADDLE_INIT_TRAINER_COUNT", "1")), + port=int(os.getenv("PADDLE_INIT_PORT", "7164")), + ports_num=int(os.getenv("PADDLE_INIT_PORTS_NUM", "1")), + ports_num_for_sparse=int( + os.getenv("PADDLE_INIT_PORTS_NUM_FOR_SPARSE", "1")), + num_gradient_servers=int( + os.getenv("PADDLE_INIT_NUM_GRADIENT_SERVERS", "1")), + trainer_id=int(os.getenv("PADDLE_INIT_TRAINER_ID", "0")), + pservers=os.getenv("PADDLE_INIT_PSERVERS", "127.0.0.1")) + fn = open("thirdparty/wuyi_train_thdpty/word_dict.pickle", "r") + word_dict = pickle.load(fn) + fn.close() + dict_size = len(word_dict) + firstword = paddle.layer.data( + name="firstw", type=paddle.data_type.integer_value(dict_size)) + secondword = paddle.layer.data( + name="secondw", type=paddle.data_type.integer_value(dict_size)) + thirdword = paddle.layer.data( + name="thirdw", type=paddle.data_type.integer_value(dict_size)) + fourthword = paddle.layer.data( + name="fourthw", type=paddle.data_type.integer_value(dict_size)) + nextword = paddle.layer.data( + name="fifthw", type=paddle.data_type.integer_value(dict_size)) + + Efirst = wordemb(firstword) + Esecond = wordemb(secondword) + Ethird = wordemb(thirdword) + Efourth = wordemb(fourthword) + + contextemb = paddle.layer.concat(input=[Efirst, Esecond, Ethird, Efourth]) + hidden1 = paddle.layer.fc(input=contextemb, + size=hiddensize, + act=paddle.activation.Sigmoid(), + layer_attr=paddle.attr.Extra(drop_rate=0.5), + bias_attr=paddle.attr.Param(learning_rate=2), + param_attr=paddle.attr.Param( + initial_std=1. / math.sqrt(embsize * 8), + learning_rate=1)) + predictword = paddle.layer.fc(input=hidden1, + size=dict_size, + bias_attr=paddle.attr.Param(learning_rate=2), + act=paddle.activation.Softmax()) + + def event_handler(event): + if isinstance(event, paddle.event.EndIteration): + if event.batch_id % 100 == 0: + result = trainer.test( + paddle.batch( + cluster_reader_cluster(cluster_test_file, node_id), 32)) + print "Pass %d, Batch %d, Cost %f, %s, Testing metrics %s" % ( + event.pass_id, event.batch_id, event.cost, event.metrics, + result.metrics) + + cost = paddle.layer.classification_cost(input=predictword, label=nextword) + parameters = paddle.parameters.create(cost) + adagrad = paddle.optimizer.AdaGrad( + learning_rate=3e-3, + regularization=paddle.optimizer.L2Regularization(8e-4)) + trainer = paddle.trainer.SGD(cost, + parameters, + adagrad, + is_local=not cluster_train) + trainer.train( + paddle.batch(cluster_reader_cluster(cluster_train_file, node_id), 32), + num_passes=30, + event_handler=event_handler) + + +if __name__ == '__main__': + main() diff --git a/doc/howto/usage/cluster/src/word2vec/prepare.py b/doc/howto/usage/cluster/src/word2vec/prepare.py new file mode 100644 index 0000000000000000000000000000000000000000..24f5c5b26d37ea03de3ab4dc2d967a4bd009eef0 --- /dev/null +++ b/doc/howto/usage/cluster/src/word2vec/prepare.py @@ -0,0 +1,41 @@ +import paddle.v2 as paddle +import tarfile +import os +import pickle + +SPLIT_COUNT = 3 +N = 5 + + +def file_len(fd): + for i, l in enumerate(fd): + pass + return i + 1 + + +def split_from_reader_by_line(filename, reader, split_count): + fn = open(filename, "w") + for batch_id, batch_data in enumerate(reader()): + batch_data_str = [str(d) for d in batch_data] + fn.write(",".join(batch_data_str)) + fn.write("\n") + fn.close() + + fn = open(filename, "r") + total_line_count = file_len(fn) + fn.close() + per_file_lines = total_line_count / split_count + 1 + cmd = "split -d -a 5 -l %d %s %s-" % (per_file_lines, filename, filename) + os.system(cmd) + + +word_dict = paddle.dataset.imikolov.build_dict() +with open("word_dict.pickle", "w") as dict_f: + pickle.dump(word_dict, dict_f) + +split_from_reader_by_line("train.txt", + paddle.dataset.imikolov.train(word_dict, N), + SPLIT_COUNT) +split_from_reader_by_line("test.txt", + paddle.dataset.imikolov.test(word_dict, N), + SPLIT_COUNT) diff --git a/paddle/framework/CMakeLists.txt b/paddle/framework/CMakeLists.txt index 4bc3fdeeea461ea2a1f82caa00d6c0c11a2775d0..774c7b021754be607cd895ca910583e992ed26a0 100644 --- a/paddle/framework/CMakeLists.txt +++ b/paddle/framework/CMakeLists.txt @@ -20,13 +20,14 @@ proto_library(framework_proto SRCS framework.proto) cc_library(attribute SRCS attribute.cc DEPS framework_proto) cc_library(proto_desc SRCS var_desc.cc op_desc.cc block_desc.cc program_desc.cc DEPS attribute ddim op_info) +cc_test(program_desc_test SRCS program_desc_test.cc DEPS proto_desc) cc_library(op_proto_maker SRCS op_proto_maker.cc DEPS framework_proto attribute) cc_test(op_proto_maker_test SRCS op_proto_maker_test.cc DEPS op_proto_maker) cc_library(op_info SRCS op_info.cc DEPS attribute framework_proto) -cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope proto_desc) +cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope proto_desc glog) cc_test(operator_test SRCS operator_test.cc DEPS operator op_registry) -cc_library(op_registry SRCS op_registry.cc DEPS op_proto_maker op_info operator) +cc_library(op_registry SRCS op_registry.cc DEPS op_proto_maker op_info operator glog) cc_test(op_registry_test SRCS op_registry_test.cc DEPS op_registry) py_proto_compile(framework_py_proto SRCS framework.proto) @@ -42,7 +43,10 @@ add_custom_command(TARGET framework_py_proto POST_BUILD cc_library(backward SRCS backward.cc DEPS net_op) cc_test(backward_test SRCS backward_test.cc DEPS backward recurrent_op device_context) -cc_library(executor SRCS executor.cc DEPS op_registry device_context scope framework_proto backward) +cc_library(executor SRCS executor.cc DEPS op_registry device_context scope framework_proto backward glog) + +cc_library(prune SRCS prune.cc DEPS framework_proto) +cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) cc_library(tensor_array SRCS tensor_array.cc DEPS lod_tensor) cc_test(tensor_array_test SRCS tensor_array_test.cc DEPS tensor_array place) diff --git a/paddle/framework/block_desc.cc b/paddle/framework/block_desc.cc index 92ac302e46ae1a5c90bec8522a2974298311284a..21d4fdaf0680036a484ee4258e47c6c8854967c3 100644 --- a/paddle/framework/block_desc.cc +++ b/paddle/framework/block_desc.cc @@ -107,6 +107,19 @@ BlockDesc *BlockDescBind::Proto() { Flush(); return desc_; } +BlockDescBind::BlockDescBind(const BlockDescBind &other, BlockDesc *desc, + ProgramDescBind *prog) + : prog_(prog), desc_(desc) { + need_update_ = true; + for (auto &op : other.ops_) { + ops_.emplace_back(new OpDescBind(*op)); + } + + for (auto &it : other.vars_) { + auto *var = new VarDescBind(*it.second); + vars_[it.first].reset(var); + } +} void BlockDescBind::ClearPBOps() { auto ops = this->desc_->mutable_ops(); diff --git a/paddle/framework/block_desc.h b/paddle/framework/block_desc.h index 5e1f10c1aef8bb7b13d75baad49167052dd01e49..7d1d33f6860aa90518abb379a5e9964d6029c6fa 100644 --- a/paddle/framework/block_desc.h +++ b/paddle/framework/block_desc.h @@ -16,8 +16,10 @@ limitations under the License. */ #include #include +#include #include #include + #include "paddle/framework/op_desc.h" #include "paddle/framework/var_desc.h" #include "paddle/platform/macros.h" @@ -36,6 +38,9 @@ class BlockDescBind { BlockDescBind(ProgramDescBind *prog, BlockDesc *desc) : prog_(prog), desc_(desc), need_update_(false) {} + BlockDescBind(const BlockDescBind &other, BlockDesc *desc, + ProgramDescBind *prog); + ~BlockDescBind() { this->ClearPBVars(); this->ClearPBOps(); @@ -51,6 +56,14 @@ class BlockDescBind { bool HasVar(const std::string &var_name) const; + std::set LocalVarNames() const { + std::set var_names; + for (auto &var : vars_) { + var_names.insert(var.first); + } + return var_names; + } + std::vector AllVars() const; BlockDescBind *ParentBlock() const; diff --git a/paddle/framework/executor.cc b/paddle/framework/executor.cc index 00caa6e1d53a4bcfae56c4459413bc1622321960..d50f0da03245783f8f0de481d7be0699fd10feac 100644 --- a/paddle/framework/executor.cc +++ b/paddle/framework/executor.cc @@ -68,9 +68,13 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id) { for (auto& var : block.vars()) { if (var.persistable()) { - scope->Var(var.name()); + auto* ptr = scope->Var(var.name()); + VLOG(3) << "Create Variable " << var.name() + << " global, which pointer is " << ptr; } else { - local_scope.Var(var.name()); + auto* ptr = local_scope.Var(var.name()); + VLOG(3) << "Create Variable " << var.name() + << " locally, which pointer is " << ptr; } } diff --git a/paddle/framework/feed_fetch_method.h b/paddle/framework/feed_fetch_method.h index 826d180bfc5445224a8d9292f06eeb58d9a46b29..9b23ad271cb3782794f624cb17eaf28fd3ca801a 100644 --- a/paddle/framework/feed_fetch_method.h +++ b/paddle/framework/feed_fetch_method.h @@ -13,6 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include "glog/logging.h" +#include "paddle/framework/feed_fetch_type.h" #include "paddle/framework/scope.h" #include "paddle/framework/variable.h" @@ -24,6 +26,7 @@ void SetFeedVariable(const LoDTensor& input, const std::string& var_name, size_t index) { // If var_name Variable is not found in GlobalScope, a new variable will // be created. + VLOG(3) << "SetFeedVariable name=" << var_name << " index=" << index; Variable* g_feed_value = GetGlobalScope().Var(var_name); auto& feed_inputs = *(g_feed_value->GetMutable>()); @@ -40,10 +43,15 @@ LoDTensor& GetFetchVariable(const std::string& var_name, size_t index) { // Since we want to fetch LodTensor from a variable, the variable must // be created alreadly. Variable* g_fetch_value = GetGlobalScope().FindVar(var_name); - auto& fetch_outputs = - *(g_fetch_value->GetMutable>()); + PADDLE_ENFORCE(g_fetch_value->IsType(), + "Only %s can be invoked by GetFetchVariable", + typeid(FeedFetchList).name()); + auto& fetch_outputs = *g_fetch_value->GetMutable(); + auto& tensor = fetch_outputs[index]; + VLOG(3) << "Fetch " << var_name << " with index " << index + << " shape= " << tensor.dims(); PADDLE_ENFORCE_LT(index, fetch_outputs.size()); - return fetch_outputs[index]; + return tensor; } } // namespace framework diff --git a/paddle/framework/framework.proto b/paddle/framework/framework.proto index 65760b07ada7a63a568cb8296eef35a8aa18d9ff..2aa961f1407c44fb4d4a149c40b3dad5b243c354 100644 --- a/paddle/framework/framework.proto +++ b/paddle/framework/framework.proto @@ -55,6 +55,7 @@ message OpDesc { repeated Var inputs = 1; repeated Var outputs = 2; repeated Attr attrs = 4; + optional bool is_target = 5 [ default = false ]; }; // OpProto describes a C++ framework::OperatorBase derived class. @@ -111,6 +112,8 @@ message VarDesc { enum VarType { LOD_TENSOR = 1; SELECTED_ROWS = 2; + FEED_MINIBATCH = 3; + FETCH_LIST = 4; } required string name = 1; required VarType type = 2; diff --git a/paddle/framework/lod_tensor.h b/paddle/framework/lod_tensor.h index 4db36ee76609ac6360fe2fc7b4a366e0284d1016..3d893baa35391d38372c735ad62576f3dc35a99b 100644 --- a/paddle/framework/lod_tensor.h +++ b/paddle/framework/lod_tensor.h @@ -74,12 +74,12 @@ class LoDTensor : public Tensor { LoD lod() const { return lod_; } /* - * Get a element from LoD. + * Get the start offset and end offset of an element from LoD. */ - size_t lod_element(size_t level, size_t elem) const { + std::pair lod_element(size_t level, size_t elem) const { PADDLE_ENFORCE_LT(level, NumLevels()); PADDLE_ENFORCE_LT(elem, NumElements(level)); - return (lod_)[level][elem]; + return std::make_pair((lod_)[level][elem], (lod_)[level][elem + 1]); } /* diff --git a/paddle/framework/lod_tensor_test.cu b/paddle/framework/lod_tensor_test.cu index 647d07536dd070bc37137fc01f683ec07ba7d6f4..25041024cb51d4d2f360edb06571a0a99dcf29b1 100644 --- a/paddle/framework/lod_tensor_test.cu +++ b/paddle/framework/lod_tensor_test.cu @@ -36,8 +36,8 @@ TEST(LoDTensor, LoDInGPU) { lod_tensor.mutable_data(place); lod_tensor.set_lod(src_lod); - CHECK_EQ(lod_tensor.lod_element(0, 2), 4UL); - CHECK_EQ(lod_tensor.lod_element(0, 4), 8UL); + CHECK_EQ(lod_tensor.lod_element(0, 2).first, 4UL); + CHECK_EQ(lod_tensor.lod_element(0, 4).first, 8UL); auto lod = lod_tensor.lod(); diff --git a/paddle/framework/op_registry.h b/paddle/framework/op_registry.h index d25b4abccb11bfe4f208ffc5a3113f3624fee914..ed85c386ec2632604bf5faf0ff9b1a087eb9c276 100644 --- a/paddle/framework/op_registry.h +++ b/paddle/framework/op_registry.h @@ -20,6 +20,8 @@ limitations under the License. */ #include #include #include + +#include "glog/logging.h" // For VLOG() #include "paddle/framework/attribute.h" #include "paddle/framework/details/op_registry.h" #include "paddle/framework/framework.pb.h" diff --git a/paddle/framework/operator.h b/paddle/framework/operator.h index cf15f9933ab3bc881add3d45b7ca17194a70e0f1..12cd307297d010201a47e048089ed7be0db52647 100644 --- a/paddle/framework/operator.h +++ b/paddle/framework/operator.h @@ -20,12 +20,13 @@ limitations under the License. */ #include #include -#include "op_info.h" +#include "glog/logging.h" // For VLOG #include "paddle/framework/attribute.h" #include "paddle/framework/block_desc.h" #include "paddle/framework/data_type.h" #include "paddle/framework/framework.pb.h" #include "paddle/framework/lod_tensor.h" +#include "paddle/framework/op_info.h" #include "paddle/framework/scope.h" #include "paddle/framework/shape_inference.h" #include "paddle/framework/tensor.h" @@ -573,6 +574,7 @@ class OperatorWithKernel : public OperatorBase { void Run(const Scope& scope, const platform::DeviceContext& dev_ctx) const final { + VLOG(3) << "Running operator " << this->Type(); RuntimeInferShapeContext infer_shape_ctx(*this, scope); this->InferShape(&infer_shape_ctx); diff --git a/paddle/framework/program_desc.cc b/paddle/framework/program_desc.cc index df846f115a5f6dc202b872349d258ac33366e518..e2349cefe09a6c1e0b11f77775426fe5c7594c9d 100644 --- a/paddle/framework/program_desc.cc +++ b/paddle/framework/program_desc.cc @@ -39,5 +39,14 @@ ProgramDescBind::ProgramDescBind() { block->set_parent_idx(-1); blocks_.emplace_back(new BlockDescBind(this, block)); } + +ProgramDescBind::ProgramDescBind(const ProgramDescBind &o) { + prog_ = o.prog_; + + for (int i = 0; i < prog_.blocks_size(); ++i) { + auto *block = prog_.mutable_blocks(i); + blocks_.emplace_back(new BlockDescBind(*o.blocks_[i], block, this)); + } +} } // namespace framework } // namespace paddle diff --git a/paddle/framework/program_desc.h b/paddle/framework/program_desc.h index 514b62654df8f2179ab06e57074b1857d1200ed8..20cc1a2325ffd6f8ef52879a749f106c268376d4 100644 --- a/paddle/framework/program_desc.h +++ b/paddle/framework/program_desc.h @@ -28,6 +28,8 @@ class ProgramDescBind { public: ProgramDescBind(); + ProgramDescBind(const ProgramDescBind &o); + BlockDescBind *AppendBlock(const BlockDescBind &parent); BlockDescBind *Block(size_t idx) { return blocks_[idx].get(); } @@ -40,8 +42,6 @@ class ProgramDescBind { ProgramDesc prog_; std::vector> blocks_; - - DISABLE_COPY_AND_ASSIGN(ProgramDescBind); }; } // namespace framework } // namespace paddle diff --git a/paddle/framework/program_desc_test.cc b/paddle/framework/program_desc_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..c9709a2d3f1d9e0be2bda1e8e9e7835ca49141b1 --- /dev/null +++ b/paddle/framework/program_desc_test.cc @@ -0,0 +1,83 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ + +#include "paddle/framework/program_desc.h" +#include "gtest/gtest.h" +#include "paddle/framework/block_desc.h" + +namespace paddle { +namespace framework { +TEST(ProgramDesc, copy_ctor) { + ProgramDescBind program; + auto* global_block = program.Block(0); + auto* x = global_block->Var("X"); + x->SetType(VarDesc_VarType_LOD_TENSOR); + x->SetLoDLevel(0); + x->SetDataType(FP32); + x->SetShape({1000, 784}); + + auto* y = global_block->Var("Y"); + y->SetType(VarDesc_VarType_LOD_TENSOR); + y->SetLoDLevel(0); + y->SetDataType(FP32); + y->SetShape({784, 100}); + + auto* op = global_block->AppendOp(); + op->SetType("mul"); + op->SetInput("X", {x->Name()}); + op->SetInput("Y", {y->Name()}); + + auto* out = global_block->Var("Out"); + out->SetType(VarDesc_VarType_LOD_TENSOR); + op->SetOutput("Y", {out->Name()}); + + ProgramDescBind program_copy(program); + + auto* global_block_copy = program_copy.Block(0); + ASSERT_NE(global_block, global_block_copy); + + auto assert_same_var = [&](const std::string& name, VarDescBind* var_before) { + ASSERT_TRUE(global_block_copy->HasVar(name)); + auto* copy = global_block_copy->Var(name); + ASSERT_NE(copy, var_before); + ASSERT_EQ(copy->Name(), var_before->Name()); + ASSERT_EQ(copy->GetType(), var_before->GetType()); + ASSERT_EQ(copy->Shape(), var_before->Shape()); + ASSERT_EQ(copy->Proto()->SerializeAsString(), + var_before->Proto()->SerializeAsString()); + }; + + ASSERT_EQ(global_block->LocalVarNames(), global_block_copy->LocalVarNames()); + ASSERT_EQ(3, global_block_copy->LocalVarNames().size()); + assert_same_var("X", x); + assert_same_var("Y", y); + assert_same_var("Out", out); + + for (size_t i = 0; i < global_block->OpSize(); ++i) { + auto op_origin = global_block->Op(i); + auto op_copy = global_block->Op(i); + + ASSERT_EQ(op_origin->Type(), op_copy->Type()); + ASSERT_EQ(op_origin->Inputs(), op_copy->Inputs()); + ASSERT_EQ(op_origin->Outputs(), op_copy->Outputs()); + + ASSERT_EQ(op_copy->Proto()->SerializeAsString(), + op_origin->Proto()->SerializeAsString()); + } + + // Not check block's protostr are same it because the order of vars could be + // different and it is correct. +} +} // namespace framework +} // namespace paddle diff --git a/paddle/framework/prune.cc b/paddle/framework/prune.cc new file mode 100644 index 0000000000000000000000000000000000000000..95833692925af4477fe575d6bd908a2ce7653c1b --- /dev/null +++ b/paddle/framework/prune.cc @@ -0,0 +1,109 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/framework/prune.h" + +#include +#include +#include +#include + +#include + +namespace paddle { +namespace framework { + +const std::string kFeedOpType = "feed"; +const std::string kFetchOpType = "fetch"; + +bool HasDependentVar(const OpDesc& op_desc, + const std::set& dependent_vars) { + for (auto& var : op_desc.outputs()) { + for (auto& argu : var.arguments()) { + if (dependent_vars.count(argu) != 0) { + return true; + } + } + } + return false; +} + +bool IsTarget(const OpDesc& op_desc) { + if (op_desc.has_is_target()) { + return op_desc.is_target(); + } + return false; +} + +void prune_impl(const ProgramDesc& input, ProgramDesc& output, int block_id) { + // TODO(tonyyang-svail): + // - will change to use multiple blocks for RNN op and Cond Op + + auto& block = input.blocks(block_id); + auto& ops = block.ops(); + + bool expect_feed = true; + for (auto& op_desc : ops) { + PADDLE_ENFORCE(op_desc.type() != kFeedOpType || expect_feed, + "All FeedOps are at the beginning of the ProgramDesc"); + expect_feed = (op_desc.type() == kFeedOpType); + } + + bool expect_fetch = true; + for (auto op_iter = ops.rbegin(); op_iter != ops.rend(); ++op_iter) { + auto& op_desc = *op_iter; + PADDLE_ENFORCE(op_desc.type() != kFetchOpType || expect_fetch, + "All FetchOps must at the end of the ProgramDesc"); + expect_fetch = (op_desc.type() == kFetchOpType); + } + + std::set dependent_vars; + std::vector should_run; + for (auto op_iter = ops.rbegin(); op_iter != ops.rend(); ++op_iter) { + auto& op_desc = *op_iter; + + if (IsTarget(op_desc) || HasDependentVar(op_desc, dependent_vars)) { + // insert its input to the dependency graph + for (auto& var : op_desc.inputs()) { + for (auto& argu : var.arguments()) { + dependent_vars.insert(argu); + } + } + + should_run.push_back(true); + } else { + should_run.push_back(false); + } + } + + // since we are traversing the ProgramDesc in reverse order + // we reverse the should_run vector + std::reverse(should_run.begin(), should_run.end()); + + output = input; + auto* op_field = output.mutable_blocks(block_id)->mutable_ops(); + op_field->Clear(); + for (size_t i = 0; i < should_run.size(); ++i) { + if (should_run[i]) { + *op_field->Add() = input.blocks(block_id).ops(i); + } + } +} + +void Prune(const ProgramDesc& input, ProgramDesc& output) { + prune_impl(input, output, 0); +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/framework/prune.h b/paddle/framework/prune.h new file mode 100644 index 0000000000000000000000000000000000000000..9414ac64f9491c07aabb216a4c81dfe6e78e8043 --- /dev/null +++ b/paddle/framework/prune.h @@ -0,0 +1,26 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include "paddle/framework/framework.pb.h" +#include "paddle/platform/enforce.h" + +namespace paddle { +namespace framework { + +void Prune(const ProgramDesc& input, ProgramDesc& output); + +} // namespace framework +} // namespace paddle diff --git a/paddle/framework/prune_test.cc b/paddle/framework/prune_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..3ab4b43d9256af5880083b00df446c451e3f598b --- /dev/null +++ b/paddle/framework/prune_test.cc @@ -0,0 +1,138 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ + +#include "paddle/framework/prune.h" + +#include "paddle/framework/attribute.h" +#include "paddle/framework/operator.h" +#include "paddle/operators/net_op.h" + +#include "paddle/framework/block_desc.h" +#include "paddle/framework/op_desc.h" +#include "paddle/framework/program_desc.h" + +#include + +namespace f = paddle::framework; +namespace ops = paddle::operators; + +void AddOp(const std::string &type, const f::VariableNameMap &inputs, + const f::VariableNameMap &outputs, f::AttributeMap attrs, + paddle::framework::BlockDescBind *block) { + // insert output + for (auto kv : outputs) { + for (auto v : kv.second) { + auto var = block->Var(v); + var->SetDataType(paddle::framework::DataType::FP32); + } + } + + // insert op + auto op = block->AppendOp(); + op->SetType(type); + for (auto &kv : inputs) { + op->SetInput(kv.first, kv.second); + } + for (auto &kv : outputs) { + op->SetOutput(kv.first, kv.second); + } + op->SetAttrMap(attrs); +} + +TEST(Prune, one_operator) { + f::ProgramDescBind program; + f::BlockDescBind *block = program.Block(0); + + AddOp("one_one", {{"input", {"a"}}}, {{"output", {"b"}}}, {}, block); + + f::ProgramDesc *pdesc = program.Proto(); + f::ProgramDesc pruned; + + Prune(*pdesc, pruned); + PADDLE_ENFORCE_EQ(pruned.blocks(0).ops_size(), 0); + + pdesc->mutable_blocks(0)->mutable_ops(0)->set_is_target(true); + Prune(*pdesc, pruned); + PADDLE_ENFORCE_EQ(pruned.blocks(0).ops_size(), 1); +} + +TEST(Prune, forward) { + f::ProgramDescBind program; + f::BlockDescBind *block = program.Block(0); + + AddOp("one_one", {{"input", {"a"}}}, {{"output", {"b"}}}, {}, block); + AddOp("one_one", {{"input", {"b"}}}, {{"output", {"c"}}}, {}, block); + AddOp("one_one", {{"input", {"c"}}}, {{"output", {"d"}}}, {}, block); + AddOp("one_one", {{"input", {"d"}}}, {{"output", {"e"}}}, {}, block); + + f::ProgramDesc *pdesc = program.Proto(); + + for (int i = 0; i < pdesc->blocks(0).ops_size(); ++i) { + f::ProgramDesc pruned; + pdesc->mutable_blocks(0)->mutable_ops(i)->set_is_target(true); + Prune(*pdesc, pruned); + PADDLE_ENFORCE_EQ(pruned.blocks(0).ops_size(), i + 1); + } +} + +TEST(Prune, multi_input_op) { + f::ProgramDescBind program; + f::BlockDescBind *block = program.Block(0); + + AddOp("one_one", {{"input", {"a0"}}}, {{"output", {"b0"}}}, {}, block); + AddOp("one_one", {{"input", {"a1"}}}, {{"output", {"b1"}}}, {}, block); + AddOp("one_one", {{"input", {"a2"}}}, {{"output", {"b2"}}}, {}, block); + AddOp("three_one", {{"input", {"b0", "b1", "b2"}}}, {{"output", {"c"}}}, {}, + block); + + f::ProgramDesc *pdesc = program.Proto(); + pdesc->mutable_blocks(0)->mutable_ops(3)->set_is_target(true); + + f::ProgramDesc pruned; + Prune(*pdesc, pruned); + PADDLE_ENFORCE_EQ(pruned.blocks(0).ops_size(), 4); +} + +TEST(Prune, multi_output_op) { + f::ProgramDescBind program; + f::BlockDescBind *block = program.Block(0); + + AddOp("one_two", {{"input", {"a"}}}, {{"output", {"b", "c"}}}, {}, block); + AddOp("one_one", {{"input", {"b"}}}, {{"output", {"b1"}}}, {}, block); + AddOp("one_one", {{"input", {"c"}}}, {{"output", {"c1"}}}, {}, block); + + f::ProgramDesc *pdesc = program.Proto(); + pdesc->mutable_blocks(0)->mutable_ops(2)->set_is_target(true); + + f::ProgramDesc pruned; + Prune(*pdesc, pruned); + PADDLE_ENFORCE_EQ(pruned.blocks(0).ops_size(), 2); +} + +TEST(Prune, multi_target) { + f::ProgramDescBind program; + f::BlockDescBind *block = program.Block(0); + + AddOp("one_two", {{"input", {"a"}}}, {{"output", {"b", "c"}}}, {}, block); + AddOp("one_one", {{"input", {"b"}}}, {{"output", {"b1"}}}, {}, block); + AddOp("one_one", {{"input", {"c"}}}, {{"output", {"c1"}}}, {}, block); + + f::ProgramDesc *pdesc = program.Proto(); + pdesc->mutable_blocks(0)->mutable_ops(1)->set_is_target(true); + pdesc->mutable_blocks(0)->mutable_ops(2)->set_is_target(true); + + f::ProgramDesc pruned; + Prune(*pdesc, pruned); + PADDLE_ENFORCE_EQ(pruned.blocks(0).ops_size(), 3); +} diff --git a/paddle/framework/var_desc.h b/paddle/framework/var_desc.h index 688a46f83982fc464c7602ec1041ad3f42122211..af4c26ca0a77b444852cc01545a8b585a5c3afcc 100644 --- a/paddle/framework/var_desc.h +++ b/paddle/framework/var_desc.h @@ -79,6 +79,10 @@ class VarDescBind { void SetType(VarDesc::VarType type) { desc_.set_type(type); } + bool Persistable() const { return desc_.persistable(); } + + void SetPersistable(bool persistable) { desc_.set_persistable(persistable); } + private: const TensorDesc &tensor_desc() const; TensorDesc *mutable_tensor_desc(); diff --git a/paddle/framework/variable.h b/paddle/framework/variable.h index 38fc2720a3023039aa113b32a394bda9c5def4c0..a80f0e66b5a59bf95efc200d159ad5dd9cf4111a 100644 --- a/paddle/framework/variable.h +++ b/paddle/framework/variable.h @@ -25,7 +25,10 @@ class Variable { public: template const T& Get() const { - PADDLE_ENFORCE(IsType(), "Variable must be type %s", typeid(T).name()); + PADDLE_ENFORCE(holder_ != nullptr, "Variable must hold some thing"); + PADDLE_ENFORCE(IsType(), + "Variable must be type %s, the holding type is %s", + typeid(T).name(), holder_->Type().name()); return *static_cast(holder_->Ptr()); } diff --git a/paddle/operators/adam_op.cc b/paddle/operators/adam_op.cc index e3db70ea129880434add21e71d15e5129c4551bd..3572de06bd60f7979e3bfbf39856b04942ce81c0 100644 --- a/paddle/operators/adam_op.cc +++ b/paddle/operators/adam_op.cc @@ -43,10 +43,6 @@ class AdamOp : public framework::OperatorWithKernel { "Output(Moment1Out) of AdamOp should not be null."); PADDLE_ENFORCE(ctx->HasOutput("Moment2Out"), "Output(Moment2Out) of AdamOp should not be null."); - PADDLE_ENFORCE(ctx->HasOutput("Beta1PowOut"), - "Output(Beta1PowOut) of AdamOp should not be null."); - PADDLE_ENFORCE(ctx->HasOutput("Beta2PowOut"), - "Output(Beta2PowOut) of AdamOp should not be null."); auto lr_dims = ctx->GetInputDim("LearningRate"); PADDLE_ENFORCE_EQ(framework::product(lr_dims), 1, @@ -72,8 +68,6 @@ class AdamOp : public framework::OperatorWithKernel { ctx->SetOutputDim("ParamOut", param_dims); ctx->SetOutputDim("Moment1Out", param_dims); ctx->SetOutputDim("Moment2Out", param_dims); - ctx->SetOutputDim("Beta1PowOut", beta1_pow_dims); - ctx->SetOutputDim("Beta2PowOut", beta2_pow_dims); } }; @@ -92,8 +86,6 @@ class AdamOpMaker : public framework::OpProtoAndCheckerMaker { AddOutput("ParamOut", "(Tensor) Output parameter"); AddOutput("Moment1Out", "(Tensor) Output first moment"); AddOutput("Moment2Out", "(Tensor) Output second moment"); - AddOutput("Beta1PowOut", "(Tensor) Output beta1 power accumulator"); - AddOutput("Beta2PowOut", "(Tensor) Output beta2 power accumulator"); AddAttr("beta1", "(float, default 0.9) " @@ -121,10 +113,8 @@ Adam updates: moment1_out = beta1 * moment1 + (1 − beta1) * grad moment2_out = beta2 * moment2 + (1 − beta2) * grad * grad -beta1_pow_out = beta1_pow * beta1 -beta2_pow_out = beta2_pow * beta2 learning_rate_t = learning_rate_t * - sqrt(1 - beta2_pow_out) / (1 - beta1_pow_out) + sqrt(1 - beta2_pow) / (1 - beta1_pow) param_out = param - learning_rate_t * moment1/ (sqrt(moment2) + epsilon) References: diff --git a/paddle/operators/adam_op.h b/paddle/operators/adam_op.h index 789c2f14b32478bf9ddc967fc5725bcf65ed2146..45938006db1231a7a134964d729df6ca114d4dbe 100644 --- a/paddle/operators/adam_op.h +++ b/paddle/operators/adam_op.h @@ -26,14 +26,10 @@ class AdamOpKernel : public framework::OpKernel { auto param_out_tensor = ctx.Output("ParamOut"); auto moment1_out_tensor = ctx.Output("Moment1Out"); auto moment2_out_tensor = ctx.Output("Moment2Out"); - auto beta1_pow_out_tensor = ctx.Output("Beta1PowOut"); - auto beta2_pow_out_tensor = ctx.Output("Beta2PowOut"); param_out_tensor->mutable_data(ctx.GetPlace()); moment1_out_tensor->mutable_data(ctx.GetPlace()); moment2_out_tensor->mutable_data(ctx.GetPlace()); - beta1_pow_out_tensor->mutable_data(ctx.GetPlace()); - beta2_pow_out_tensor->mutable_data(ctx.GetPlace()); float beta1 = ctx.Attr("beta1"); float beta2 = ctx.Attr("beta2"); @@ -56,18 +52,13 @@ class AdamOpKernel : public framework::OpKernel { auto param_out = framework::EigenVector::Flatten(*param_out_tensor); auto moment1_out = framework::EigenVector::Flatten(*moment1_out_tensor); auto moment2_out = framework::EigenVector::Flatten(*moment2_out_tensor); - auto beta1_pow_out = - framework::EigenVector::Flatten(*beta1_pow_out_tensor); - auto beta2_pow_out = - framework::EigenVector::Flatten(*beta2_pow_out_tensor); auto place = ctx.GetEigenDevice(); moment1_out.device(place) = beta1 * moment1 + (1 - beta1) * grad; moment2_out.device(place) = beta2 * moment2 + (1 - beta2) * grad.square(); - beta1_pow_out.device(place) = beta1_pow * beta1; - beta2_pow_out.device(place) = beta2_pow * beta2; + // All of these are tensors of 1 element - auto lr_t = lr * (1 - beta2_pow_out).sqrt() / (1 - beta1_pow_out); + auto lr_t = lr * (1 - beta2_pow).sqrt() / (1 - beta1_pow); // Eigen does not support automatic broadcast // Get dimensions of moment vector to broadcast lr_t Eigen::DSizes m_dsize(moment1_out_tensor->numel()); diff --git a/paddle/operators/adamax_op.cc b/paddle/operators/adamax_op.cc index e848333ef8a819648cc3056ae2f4a0e33fc58405..ff2565774115571166712b03c8990e5bf8de12a5 100644 --- a/paddle/operators/adamax_op.cc +++ b/paddle/operators/adamax_op.cc @@ -41,8 +41,6 @@ class AdamaxOp : public framework::OperatorWithKernel { "Output(MomentOut) of AdamaxOp should not be null."); PADDLE_ENFORCE(ctx->HasOutput("InfNormOut"), "Output(InfNormOut) of AdamaxOp should not be null."); - PADDLE_ENFORCE(ctx->HasOutput("Beta1PowOut"), - "Output(Beta1PowOut) of AdamaxOp should not be null."); auto lr_dims = ctx->GetInputDim("LearningRate"); PADDLE_ENFORCE_EQ(framework::product(lr_dims), 1, @@ -64,7 +62,6 @@ class AdamaxOp : public framework::OperatorWithKernel { ctx->SetOutputDim("ParamOut", param_dims); ctx->SetOutputDim("MomentOut", param_dims); ctx->SetOutputDim("InfNormOut", param_dims); - ctx->SetOutputDim("Beta1PowOut", beta1_pow_dims); } }; @@ -86,7 +83,6 @@ class AdamaxOpMaker : public framework::OpProtoAndCheckerMaker { AddOutput("InfNormOut", "(Tensor) " "Output exponentially weighted infinity norm"); - AddOutput("Beta1PowOut", "(Tensor) Output beta1 power accumulator"); AddAttr("beta1", "(float, default 0.9) " @@ -113,8 +109,7 @@ Adamax updates: moment_out = beta1 * moment + (1 - beta1) * grad inf_norm_out = max(beta2 * inf_norm + epsilon, abs(grad)) -beta1_pow_out = beta1_pow * beta1 -learning_rate_t = learning_rate/(1 - beta1_pow_out) +learning_rate_t = learning_rate/(1 - beta1_pow) param_out = param - learning_rate_t * moment_out/inf_norm_out The original paper does not have an epsilon attribute. diff --git a/paddle/operators/adamax_op.h b/paddle/operators/adamax_op.h index 9677b1bb786002aadfaeb571b2ba2e6aa2481ca5..2c99832ec08e9c1d9b5458c467d5238f9b1b3c37 100644 --- a/paddle/operators/adamax_op.h +++ b/paddle/operators/adamax_op.h @@ -26,12 +26,10 @@ class AdamaxOpKernel : public framework::OpKernel { auto param_out_tensor = ctx.Output("ParamOut"); auto moment_out_tensor = ctx.Output("MomentOut"); auto inf_norm_out_tensor = ctx.Output("InfNormOut"); - auto beta1_pow_out_tensor = ctx.Output("Beta1PowOut"); param_out_tensor->mutable_data(ctx.GetPlace()); moment_out_tensor->mutable_data(ctx.GetPlace()); inf_norm_out_tensor->mutable_data(ctx.GetPlace()); - beta1_pow_out_tensor->mutable_data(ctx.GetPlace()); float beta1 = ctx.Attr("beta1"); float beta2 = ctx.Attr("beta2"); @@ -53,15 +51,12 @@ class AdamaxOpKernel : public framework::OpKernel { auto moment_out = framework::EigenVector::Flatten(*moment_out_tensor); auto inf_norm_out = framework::EigenVector::Flatten(*inf_norm_out_tensor); - auto beta1_pow_out = - framework::EigenVector::Flatten(*beta1_pow_out_tensor); auto place = ctx.GetEigenDevice(); moment_out.device(place) = beta1 * moment + (1 - beta1) * grad; inf_norm_out.device(place) = grad.abs().cwiseMax((beta2 * inf_norm) + epsilon); - beta1_pow_out.device(place) = beta1_pow * beta1; - auto lr_t = lr / (1 - beta1_pow_out); + auto lr_t = lr / (1 - beta1_pow); Eigen::DSizes m_dsize(moment_out_tensor->numel()); param_out.device(place) = param - lr_t.broadcast(m_dsize) * (moment_out / inf_norm_out); diff --git a/paddle/operators/feed_op.cc b/paddle/operators/feed_op.cc index d742bbe51b678fcdaf54826947d29060bf3e4e0d..bf453c85966848d492606644a380a57196ab9869 100644 --- a/paddle/operators/feed_op.cc +++ b/paddle/operators/feed_op.cc @@ -26,8 +26,9 @@ class FeedOp : public framework::OperatorBase { : OperatorBase(type, inputs, outputs, attrs) {} void Run(const framework::Scope &scope, const platform::DeviceContext &dev_ctx) const override { - auto feed_var_name = Input("Input"); + auto feed_var_name = Input("X"); auto *feed_var = scope.FindVar(feed_var_name); + PADDLE_ENFORCE(feed_var != nullptr, "Cannot find feed_var in scope, feed_var_name is %s", feed_var_name); @@ -40,6 +41,9 @@ class FeedOp : public framework::OperatorBase { auto col = Attr("col"); + VLOG(3) << "Feed Var " << feed_var_name << "'s " << col << " column to var" + << out_name; + auto &feed_list = feed_var->Get(); auto &feed_item = feed_list.at(static_cast(col)); auto *out_item = out_var->GetMutable(); @@ -48,10 +52,21 @@ class FeedOp : public framework::OperatorBase { } }; +class FeedOpInfoMaker : public framework::OpProtoAndCheckerMaker { + public: + FeedOpInfoMaker(framework::OpProto *proto, + framework::OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("X", "The input of feed op"); + AddOutput("Out", "The output of feed op"); + AddComment("feed op, it should not be configured by users directly"); + AddAttr("col", "column of feed"); + } +}; + } // namespace operators } // namespace paddle -// We do not need to register OpInfoMaker, -// since feed operator will not be used by end users directly REGISTER_OPERATOR(feed, paddle::operators::FeedOp, - paddle::framework::EmptyGradOpMaker); + paddle::framework::EmptyGradOpMaker, + paddle::operators::FeedOpInfoMaker); diff --git a/paddle/operators/fetch_op.cc b/paddle/operators/fetch_op.cc index 55d6ac093959a6e1c11457085a8ebdd8a14adaf3..524e77d6ad3a1c7a96e104405827205f704f8a59 100644 --- a/paddle/operators/fetch_op.cc +++ b/paddle/operators/fetch_op.cc @@ -27,7 +27,7 @@ class FetchOp : public framework::OperatorBase { void Run(const framework::Scope &scope, const platform::DeviceContext &dev_ctx) const override { - auto fetch_var_name = Input("Input"); + auto fetch_var_name = Input("X"); auto *fetch_var = scope.FindVar(fetch_var_name); PADDLE_ENFORCE(fetch_var != nullptr, "Cannot find fetch variable in scope, fetch_var_name is %s", @@ -52,13 +52,25 @@ class FetchOp : public framework::OperatorBase { // FIXME(yuyang18): Should we assume the fetch operator always generate // CPU outputs? dst_item.CopyFromTensor(src_item, platform::CPUPlace(), dev_ctx); + + VLOG(3) << "Fetch variable " << fetch_var_name << " to " << out_name; } }; +class FetchOpInfoMaker : public framework::OpProtoAndCheckerMaker { + public: + FetchOpInfoMaker(framework::OpProto *proto, + framework::OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("X", "The input of fetch op"); + AddOutput("Out", "The output of fetch op"); + AddComment("fetch op, it should not be configured by users directly"); + AddAttr("col", "column of fetch"); + } +}; } // namespace operators } // namespace paddle -// We do not need to register OpInfoMaker, -// since fetch operator will not be used by end users directly REGISTER_OPERATOR(fetch, paddle::operators::FetchOp, - paddle::framework::EmptyGradOpMaker); + paddle::framework::EmptyGradOpMaker, + paddle::operators::FetchOpInfoMaker); diff --git a/paddle/operators/sgd_op.cc b/paddle/operators/sgd_op.cc index 0f78eeab9bc643a1a70c4b6ab02a160bbeda2b33..2acb96d1b4f5903ff6c57b10e7621c8adaf73171 100644 --- a/paddle/operators/sgd_op.cc +++ b/paddle/operators/sgd_op.cc @@ -21,7 +21,7 @@ class SGDOp : public framework::OperatorWithKernel { public: using framework::OperatorWithKernel::OperatorWithKernel; - void InferShape(framework::InferShapeContext *ctx) const override { + void InferShape(framework::InferShapeContext* ctx) const override { PADDLE_ENFORCE(ctx->HasInput("Param"), "Input(Param) of SGDOp should not be null."); PADDLE_ENFORCE(ctx->HasInput("Grad"), @@ -35,15 +35,15 @@ class SGDOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ(framework::product(lr_dims), 1, "Learning rate should have 1 element"); auto param_dim = ctx->GetInputDim("Param"); - PADDLE_ENFORCE_EQ(param_dim, ctx->GetInputDim("Grad"), - "Two input of SGD Op's dimension must be same."); + // TODO(qijun): check dimensions of Param and Grad at complie + // and run time. ctx->SetOutputDim("ParamOut", param_dim); } }; class SGDOpMaker : public framework::OpProtoAndCheckerMaker { public: - SGDOpMaker(framework::OpProto *proto, framework::OpAttrChecker *op_checker) + SGDOpMaker(framework::OpProto* proto, framework::OpAttrChecker* op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { AddInput("Param", "Input parameter"); AddInput("LearningRate", "Learning rate of SGD"); @@ -58,6 +58,38 @@ param_out = param - learning_rate * grad; )DOC"); } }; + +template +struct SparseSGDFunctor { + void operator()(const platform::DeviceContext& context, + const framework::SelectedRows& input, + const framework::Tensor& learning_rate, + framework::Tensor* output) { + auto in_height = input.height(); + auto out_dims = output->dims(); + PADDLE_ENFORCE_EQ(in_height, out_dims[0]); + + auto& in_value = input.value(); + auto& in_rows = input.rows(); + + int64_t in_row_numel = in_value.numel() / in_rows.size(); + PADDLE_ENFORCE_EQ(in_row_numel, output->numel() / in_height); + + auto* in_data = in_value.data(); + auto* out_data = output->data(); + auto* lr = learning_rate.data(); + + for (size_t i = 0; i < in_rows.size(); i++) { + for (int64_t j = 0; j < in_row_numel; j++) { + out_data[in_rows[i] * in_row_numel + j] -= + lr[0] * in_data[i * in_row_numel + j]; + } + } + } +}; + +template struct SparseSGDFunctor; + } // namespace operators } // namespace paddle diff --git a/paddle/operators/sgd_op.cu b/paddle/operators/sgd_op.cu index f5ba6d3c29f8dfbfdea4fbf2c3d5fd7f5b358666..106f9b746ba6614d8fa68b677c47ec04ed26fb81 100644 --- a/paddle/operators/sgd_op.cu +++ b/paddle/operators/sgd_op.cu @@ -14,6 +14,66 @@ #define EIGEN_USE_GPU #include "paddle/operators/sgd_op.h" +#include "paddle/platform/cuda_helper.h" + +namespace paddle { +namespace operators { + +namespace { +template +__global__ void SparseSGDFunctorKernel(const T* selected_rows, + const int64_t* rows, + const T* learning_rate, T* tensor_out, + int64_t row_numel, int block_size) { + const int ty = blockIdx.y; + int tid = threadIdx.x; + + selected_rows += ty * row_numel; + tensor_out += rows[ty] * row_numel; + + for (int index = tid; index < row_numel; index += block_size) { + // Since index in rows of SelectedRows can be duplicate, we have to use + // Atomic Operation to avoid concurrent write error. + paddle::platform::CudaAtomicAdd( + tensor_out + index, -1.0 * learning_rate[0] * selected_rows[index]); + } +} +} // namespace + +template +struct SparseSGDFunctor { + void operator()(const platform::DeviceContext& context, + const framework::SelectedRows& input, + const framework::Tensor& learning_rate, + framework::Tensor* output) { + auto in_height = input.height(); + auto out_dims = output->dims(); + PADDLE_ENFORCE_EQ(in_height, out_dims[0]); + + auto& in_value = input.value(); + auto& in_rows = input.rows(); + + int64_t in_row_numel = in_value.numel() / in_rows.size(); + PADDLE_ENFORCE_EQ(in_row_numel, output->numel() / in_height); + + auto* in_data = in_value.data(); + auto* out_data = output->data(); + + int block_size = 256; + dim3 threads(block_size, 1); + dim3 grid(1, in_rows.size()); + SparseSGDFunctorKernel< + T><<(context) + .stream()>>>(in_data, in_rows.data(), learning_rate.data(), + out_data, in_row_numel, block_size); + } +}; + +template struct SparseSGDFunctor; + +} // namespace operators +} // namespace paddle namespace ops = paddle::operators; REGISTER_OP_GPU_KERNEL(sgd, diff --git a/paddle/operators/sgd_op.h b/paddle/operators/sgd_op.h index 26f4012f258771794c736dbfad4af174b017f410..78b595fc6c63d775b627f23cafa9458f1dadd4e5 100644 --- a/paddle/operators/sgd_op.h +++ b/paddle/operators/sgd_op.h @@ -15,31 +15,53 @@ limitations under the License. */ #pragma once #include "paddle/framework/eigen.h" #include "paddle/framework/op_registry.h" +#include "paddle/framework/selected_rows.h" namespace paddle { namespace operators { +template +struct SparseSGDFunctor { + void operator()(const platform::DeviceContext& context, + const framework::SelectedRows& input, + const framework::Tensor& learning_rate, + framework::Tensor* output); +}; + template class SGDOpKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& ctx) const override { - auto param = ctx.Input("Param"); - auto grad = ctx.Input("Grad"); - auto param_out = ctx.Output("ParamOut"); - auto learning_rate = ctx.Input("LearningRate"); + auto* param = ctx.Input("Param"); + auto* param_out = ctx.Output("ParamOut"); + auto* learning_rate = ctx.Input("LearningRate"); - param_out->mutable_data(ctx.GetPlace()); + auto* grad_var = ctx.InputVar("Grad"); + // Actually, all tensors are LoDTensor except SelectedRows. + if (grad_var->IsType()) { + param_out->mutable_data(ctx.GetPlace()); + auto* grad = ctx.Input("Grad"); - auto p = framework::EigenVector::Flatten(*param); - auto g = framework::EigenVector::Flatten(*grad); - auto o = framework::EigenVector::Flatten(*param_out); - auto lr = framework::EigenVector::Flatten(*learning_rate); - auto place = ctx.GetEigenDevice(); + auto p = framework::EigenVector::Flatten(*param); + auto g = framework::EigenVector::Flatten(*grad); + auto o = framework::EigenVector::Flatten(*param_out); + auto lr = framework::EigenVector::Flatten(*learning_rate); + auto place = ctx.GetEigenDevice(); - Eigen::DSizes grad_dsize(grad->numel()); - o.device(place) = p - lr.broadcast(grad_dsize) * g; + Eigen::DSizes grad_dsize(grad->numel()); + o.device(place) = p - lr.broadcast(grad_dsize) * g; + } else if (grad_var->IsType()) { + // TODO(qijun): In Sparse SGD operator, in-place update is enforced. + // This manual optimization brings difficulty to track data dependency. + // It's better to find a more elegant solution. + PADDLE_ENFORCE_EQ(param, param_out); + auto* grad = ctx.Input("Grad"); + SparseSGDFunctor functor; + functor(ctx.device_context(), *grad, *learning_rate, param_out); + } else { + PADDLE_THROW("Unsupported Variable Type of Grad"); + } } }; - } // namespace operators } // namespace paddle diff --git a/paddle/parameter/FirstOrderOptimizer.h b/paddle/parameter/FirstOrderOptimizer.h index 895e8d6a63d1fad0ee7a6f5647402435d418b2f1..f157188a4f736319ea187052b90a17f8be9e9edb 100644 --- a/paddle/parameter/FirstOrderOptimizer.h +++ b/paddle/parameter/FirstOrderOptimizer.h @@ -265,6 +265,10 @@ public: addParameterType(PARAMETER_SECOND_MOMENTUM); } + virtual void startBatch(int64_t numSamplesProcessed) { + learningRate_ = calcLearningRate(numSamplesProcessed, pass_); + } + virtual void finishBatch() { ++step_; } virtual void update(const VectorPtr vecs[], diff --git a/paddle/pybind/protobuf.cc b/paddle/pybind/protobuf.cc index d9647717d2a397a972d29e93545424431c21d495..405ac544e10f19a33399a649f76699fefc3d49b9 100644 --- a/paddle/pybind/protobuf.cc +++ b/paddle/pybind/protobuf.cc @@ -101,6 +101,10 @@ using namespace paddle::framework; // NOLINT void BindProgramDesc(py::module &m) { py::class_(m, "ProgramDesc", "") .def(py::init<>()) + .def("__init__", + [](ProgramDescBind &self, const ProgramDescBind &other) { + new (&self) ProgramDescBind(other); + }) .def("append_block", &ProgramDescBind::AppendBlock, py::return_value_policy::reference) .def("append_backward", @@ -202,20 +206,25 @@ void BindVarDsec(py::module &m) { .def("set_lod_level", &VarDescBind::SetLoDLevel) .def("type", &VarDescBind::GetType) .def("set_type", &VarDescBind::SetType) - .def("serialize_to_string", [](VarDescBind &var_desc) -> py::bytes { - const VarDesc *desc = var_desc.Proto(); - PADDLE_ENFORCE(desc->IsInitialized(), - "VarDesc has not been initialized."); - std::string res; - PADDLE_ENFORCE( - desc->SerializeToString(&res), - "Serialize VarDesc Error. This could be a bug of Paddle."); - return res; - }); + .def("serialize_to_string", + [](VarDescBind &var_desc) -> py::bytes { + const VarDesc *desc = var_desc.Proto(); + PADDLE_ENFORCE(desc->IsInitialized(), + "VarDesc has not been initialized."); + std::string res; + PADDLE_ENFORCE( + desc->SerializeToString(&res), + "Serialize VarDesc Error. This could be a bug of Paddle."); + return res; + }) + .def("persistable", &VarDescBind::Persistable) + .def("set_persistable", &VarDescBind::SetPersistable); py::enum_(var_desc, "VarType", "") .value("LOD_TENSOR", VarDesc::LOD_TENSOR) - .value("SELECTED_ROWS", VarDesc::SELECTED_ROWS); + .value("SELECTED_ROWS", VarDesc::SELECTED_ROWS) + .value("FEED_MINIBATCH", VarDesc::FEED_MINIBATCH) + .value("FETCH_LIST", VarDesc::FETCH_LIST); } void BindOpDesc(py::module &m) { diff --git a/paddle/pybind/pybind.cc b/paddle/pybind/pybind.cc index 9eb1bf4a16ef40bb3044f46db9777fd2f6c341d2..84ebe3c2b84a5b4fd3fb5d49494a19dea873b9c4 100644 --- a/paddle/pybind/pybind.cc +++ b/paddle/pybind/pybind.cc @@ -111,6 +111,7 @@ PYBIND11_PLUGIN(core) { new (&instance) LoDTensor(new_lod); #endif }) + .def("__init__", [](LoDTensor &instance) { new (&instance) LoDTensor(); }) .def("set_lod", [](LoDTensor &self, const std::vector> &lod) { #ifndef PADDLE_WITH_CUDA @@ -154,7 +155,15 @@ PYBIND11_PLUGIN(core) { py::return_value_policy::reference) .def("set_height", &SelectedRows::set_height) .def("height", &SelectedRows::height) - .def("set_rows", &SelectedRows::set_rows) + .def("set_rows", + [](SelectedRows &self, std::vector rows) { +#ifndef PADDLE_WITH_CUDA + self.set_rows(rows); +#else + Vector new_rows(rows); + self.set_rows(new_rows); +#endif + }) .def("rows", [](SelectedRows &self) { #ifndef PADDLE_WITH_CUDA return self.rows(); @@ -187,6 +196,11 @@ All parameter, weight, gradient are variables in Paddle. return self.GetMutable(); }, py::return_value_policy::reference) + .def("get_selected_rows", + [](Variable &self) -> SelectedRows * { + return self.GetMutable(); + }, + py::return_value_policy::reference) .def("get_net", [](Variable &self) -> operators::NetOp * { return self.GetMutable(); @@ -203,7 +217,8 @@ All parameter, weight, gradient are variables in Paddle. .def(py::init<>()) .def("new_scope", [](Scope &self) -> Scope * { return &self.NewScope(); }, py::return_value_policy::reference) - .def("drop_kids", &Scope::DropKids); + .def("drop_kids", &Scope::DropKids) + .def_static("global_scope", &GetGlobalScope); //! @note: Be careful! PyBind will return std::string as an unicode, not //! Python str. If you want a str object, you should cast them in Python. @@ -251,6 +266,17 @@ All parameter, weight, gradient are variables in Paddle. .def(py::init<>()) .def("__str__", string::to_string); + py::class_(m, "Place") + .def(py::init<>()) + .def("set_place", + [](platform::Place &self, const platform::CPUPlace &cpu_place) { + self = cpu_place; + }) + .def("set_place", + [](platform::Place &self, const platform::GPUPlace &gpu_place) { + self = gpu_place; + }); + py::class_(m, "Operator") .def_static("create", [](py::bytes protobin) { @@ -424,14 +450,15 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "Executor") .def(py::init &>()) .def("run", - [](Executor &self, const ProgramDesc &program_desc, int block_id) { + [](Executor &self, ProgramDescBind *program_bind, int block_id) { framework::Scope &global_scope = GetGlobalScope(); - self.Run(program_desc, &global_scope, block_id); + self.Run(*program_bind->Proto(), &global_scope, block_id); }); m.def("unique_integer", UniqueIntegerGenerator); m.def("is_compile_gpu", IsCompileGPU); + //! FIXME: it is no need to `set_xxx_float/double/int` m.def("set_feed_variable_float", framework::SetFeedVariable); m.def("set_feed_variable_double", framework::SetFeedVariable); m.def("set_feed_variable_int", framework::SetFeedVariable); diff --git a/paddle/scripts/cluster_train_v2/fabric/conf.py b/paddle/scripts/cluster_train_v2/fabric/conf.py new file mode 100644 index 0000000000000000000000000000000000000000..e96503d093a4317df7bb006043eb42098f51b6f5 --- /dev/null +++ b/paddle/scripts/cluster_train_v2/fabric/conf.py @@ -0,0 +1,39 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +HOSTS = [ + "root@10.1.9.7", + "root@10.1.18.7", + "root@10.1.32.9", +] +''' +workspace configuration +''' +#root dir for workspace, can be set as any director with real user account +ROOT_DIR = "/root" +''' +network configuration +''' +#pserver nics +PADDLE_NIC = "eth0" +#pserver port +PADDLE_PORT = 7164 +#pserver ports num +PADDLE_PORTS_NUM = 1 +#pserver sparse ports num +PADDLE_PORTS_NUM_FOR_SPARSE = 1 +#trainer whether use gpu +PADDLE_USE_GPU = "False" +#environments setting for all processes in cluster job +LD_LIBRARY_PATH = "/usr/local/cuda/lib64:/usr/lib64" diff --git a/paddle/scripts/cluster_train_v2/fabric/docker_cluster/Dockerfile b/paddle/scripts/cluster_train_v2/fabric/docker_cluster/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..6606c01265af1fa8009e67906a3dbbe5c95ebc0d --- /dev/null +++ b/paddle/scripts/cluster_train_v2/fabric/docker_cluster/Dockerfile @@ -0,0 +1,11 @@ +FROM docker.paddlepaddlehub.com/paddle:0.10.0rc2 +RUN apt-get update && apt-get install -y openssh-server +RUN mkdir /var/run/sshd + +RUN echo 'root:root' |chpasswd + +RUN sed -ri 's/^PermitRootLogin\s+.*/PermitRootLogin yes/' /etc/ssh/sshd_config +RUN sed -ri 's/UsePAM yes/#UsePAM yes/g' /etc/ssh/sshd_config + +EXPOSE 22 +CMD ["/usr/sbin/sshd", "-D"] diff --git a/paddle/scripts/cluster_train_v2/fabric/docker_cluster/ssh_servers.yaml b/paddle/scripts/cluster_train_v2/fabric/docker_cluster/ssh_servers.yaml new file mode 100644 index 0000000000000000000000000000000000000000..0784b2d1b8785796f94fff1607643218564fc126 --- /dev/null +++ b/paddle/scripts/cluster_train_v2/fabric/docker_cluster/ssh_servers.yaml @@ -0,0 +1,23 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: ssh-servers +spec: + replicas: 3 + template: + metadata: + labels: + app: ssh-servers + spec: + containers: + - name: ssh-servers + image: docker.paddlepaddlehub.com/paddlessh + resources: + limits: + cpu: 500m + memory: 1Gi + requests: + cpu: 500m + memory: 1Gi + ports: + - containerPort: 22 diff --git a/paddle/scripts/cluster_train_v2/fabric/run.sh b/paddle/scripts/cluster_train_v2/fabric/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..f6324bcb136803ebc30e69bcdaa2f8725cb0ccba --- /dev/null +++ b/paddle/scripts/cluster_train_v2/fabric/run.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +python paddle.py \ + --job_dispatch_package="/root/wuyi/fabric_submit/workspace" \ + --dot_period=10 \ + --ports_num_for_sparse=1 \ + --log_period=50 \ + --num_passes=5 \ + --trainer_count=2 \ + --saving_period=1 \ + --local=0 \ + --config=./trainer_config.py \ + --save_dir=./output \ + --use_gpu=0 diff --git a/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/Dockerfile b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..1a2d19e823541750830fcaa25f65b2f8e1ea2b49 --- /dev/null +++ b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/Dockerfile @@ -0,0 +1,43 @@ +# Build this image: docker build -t mpi . +# + +FROM paddledev/paddle:0.10.0rc3 + +ENV DEBIAN_FRONTEND noninteractive + +RUN apt-get update -y && \ + apt-get upgrade -y && \ + apt-get install -y openssh-server zip unzip vim sudo \ +gcc gfortran openmpi-checkpoint binutils wget curl git openmpi-bin openmpi-common libopenmpi-dev && \ +pip install mpi4py numpy virtualenv scipy matplotlib lxml sqlalchemy suds ipython obspy && \ +mkdir /var/run/sshd && \ +echo 'root:tutorial' | chpasswd && \ +sed -i 's/PermitRootLogin without-password/PermitRootLogin yes/' /etc/ssh/sshd_config && \ +# SSH login fix. Otherwise user is kicked off after login +sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd && \ +echo "export VISIBLE=now" >> /etc/profile && \ +adduser --disabled-password --gecos "" tutorial && \ +echo "tutorial ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers && \ +mkdir /home/tutorial/.ssh/ + +ENV HOME /home/tutorial +ENV NOTVISIBLE "in users profile" + +# ------------------------------------------------------------ +# Set-Up SSH with our Github deploy key +# ------------------------------------------------------------ + +ADD ssh/config /home/tutorial/.ssh/config +ADD ssh/id_rsa.mpi /home/tutorial/.ssh/id_rsa +ADD ssh/id_rsa.mpi.pub /home/tutorial/.ssh/id_rsa.pub +ADD ssh/id_rsa.mpi.pub /home/tutorial/.ssh/authorized_keys + +#--------------------------------------------------------------- +#LD_LIBRARY_PATH +#--------------------------------------------------------------- + +RUN export LD_LIBRARY_PATH=/usr/lib/openmpi/lib/ + +WORKDIR /home/tutorial +EXPOSE 22 +CMD ["/usr/sbin/sshd", "-D"] diff --git a/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/head.yaml b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/head.yaml new file mode 100644 index 0000000000000000000000000000000000000000..34835e5eb8d7cb92ad3cf7758a47c9e565a7dcf6 --- /dev/null +++ b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/head.yaml @@ -0,0 +1,25 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: mpi-header + labels: + app: mpi-header +spec: + replicas: 1 + template: + metadata: + labels: + app: mpi-header + spec: + containers: + - image: typhoon1986/paddle-openmpi + name : mpi-header + resources: + limits: + cpu: 500m + memory: 2Gi + requests: + cpu: 500m + memory: 2Gi + ports: + - containerPort: 22 diff --git a/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/mpi-nodes.yaml b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/mpi-nodes.yaml new file mode 100644 index 0000000000000000000000000000000000000000..2fd5cb4d44a25efac68dd8c9195dea9fd8f84a26 --- /dev/null +++ b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/mpi-nodes.yaml @@ -0,0 +1,26 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: mpi-nodes + labels: + app: mpi-nodes +spec: + replicas: 3 + template: + metadata: + labels: + app: mpi-nodes + spec: + containers: + - image: typhoon1986/paddle-openmpi + name : mpi-nodes + resources: + limits: + cpu: 500m + memory: 2Gi + requests: + cpu: 500m + memory: 2Gi + ports: + - containerPort: 22 + imagePullPolicy: Always diff --git a/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/config b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/config new file mode 100644 index 0000000000000000000000000000000000000000..a9ecad07c39e4a9d6f0572d6cbf77795d99681f2 --- /dev/null +++ b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/config @@ -0,0 +1 @@ +StrictHostKeyChecking no diff --git a/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/id_rsa.mpi b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/id_rsa.mpi new file mode 100644 index 0000000000000000000000000000000000000000..23768343edf5258cf525523d471f67071a24f5de --- /dev/null +++ b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/id_rsa.mpi @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEA7PWLZmgdJ508dD15T6+xqGDvL9Ehzo9SgsnN6xJ+qpUvvOi4 +1axW0AqR4MnPTg/uuvk+x4tUpuufOW4w22UTGjsdvmIVWa9ujLtcRiN3YPY+SU+Y +O5FfqKg7r/hBn+/GMcSoffwSs7vVgmhBBnp/mJh2O1cOAFZEe98/47mbg3/kHBAk +36NOQktaU3l48B38EhBTnjWfcEGm1HcTRPFxXV5Wiko6ZhKFEuHcTVKng4ROtUqE +mgHyI0aB7TAxg4na0ejItsYWEPWGeDOw6ms/4MwylxNosWzHFPW9p4zgLCLNr+b6 +bDDfYKjXZflAuTQtQhLmJUwD9uuYLAijpSE2fQIDAQABAoIBADgcgRET8Gt0CV/B +OtvKz/f+VEVvcWD3gWNlJDTZIVOFllNWjIZUlA4ZoqenQkbK8Q4nfV1FOht4yjCQ +TlN1oMtiWk297i5Zo4UBzPzy4w774I39oh/g8dT/WXr2/5s+7SDV38xNh6Q2A34o +79T35wUcfUrZ93/O7dKjb/6d8hx2FMha0wVKqY4lmG1lQE3bbx3kakec0PdvU5kO +YHKlpqj3pMR7CpMa+4yL/iXFwWYmnK+uu+zw7JR7PwvH1CzrnvW438wjQ1QmYbSx +mHHOE89X67Lsl5hn81qYWBhpwAlBwi1qscsE0cV9GcFyKqWFqZsj5coM9u3CRfvy +lrWe1OUCgYEA+LBUFEd3Hxs4sFiYElJ8R9SAs1udaqPvAl01hTEijJLfYlMMVs/y +rgNN7j22zjDak2f8QdyMJZX7EZdRmdYcHO0csYOwbYvalzcnwk+U3mxmdD3r4xSo +DSvkJ70fogAqUlcVIg2re6fCmZVJQTvMQYTVEM8zQomJRt/Lb2esSfsCgYEA8+zv +44aToe8uqiDs4w8guRW7LCDkTw4z4IVo9JUibIaPjaAs5bZEBXSB43EEywXCR75H +fML0rU1PVvKh1rqcvZdVzm+XMWVr3asPk0sapaiHaTcmyZvJRDxxqbLFp0zRP1T6 +cCtXNFdHWU4KiuKrUi6cDyOKchpfkSZa4seiT+cCgYB+n4FgBfdQPlMB70oW4irn +g/q32CjxuGCk6oKqu5bkzo+xB6obtavSEFqouIGQwO056tNVUY+GP7Rjg5GH663K +yKw4cl3tmS0Gm43B8TVSfw03mKO3rrfWZQe5eCFYIg9qd26KNT2gK435FzsCXQkm +PxUhhu6JrW/ZR2/U3Iur6wKBgADrWLAb1ryagSuE+j+U1AO+kDkHWrTtkcZ72jxp +v3p3O11GSEUJXdJDcSXhTCpTuDq6/dv7hB6PFwh126RKicKxKlKf2wsFndV1Cpb8 +hnovW2tLGOtTmfuW2rrQAKyzvmolsNfxYd/BoHQ2thV16z1hDZeFA8WQUeHjKh6G +sBbrAoGATdtQlaUxx4izua6k02ihkxx/cRYwDl2N8UDvDBHokS7vJFMX8b8NpsGg +zMElnqSpu/pe/0UG7N2MtPF6uyMcX8AZzzcsRkiMkDvWJzYt8Jpf+Eyd/uryF+Yv +yrXaOEY83tm6x/fny5ZaZmk8lNth7bfWywuTMkZLX3fYpWtIeE4= +-----END RSA PRIVATE KEY----- diff --git a/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/id_rsa.mpi.pub b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/id_rsa.mpi.pub new file mode 100644 index 0000000000000000000000000000000000000000..015f2b42e71920e00de090cbb1108d9a12ed5f0c --- /dev/null +++ b/paddle/scripts/cluster_train_v2/openmpi/docker_cluster/ssh/id_rsa.mpi.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDs9YtmaB0nnTx0PXlPr7GoYO8v0SHOj1KCyc3rEn6qlS+86LjVrFbQCpHgyc9OD+66+T7Hi1Sm6585bjDbZRMaOx2+YhVZr26Mu1xGI3dg9j5JT5g7kV+oqDuv+EGf78YxxKh9/BKzu9WCaEEGen+YmHY7Vw4AVkR73z/juZuDf+QcECTfo05CS1pTeXjwHfwSEFOeNZ9wQabUdxNE8XFdXlaKSjpmEoUS4dxNUqeDhE61SoSaAfIjRoHtMDGDidrR6Mi2xhYQ9YZ4M7Dqaz/gzDKXE2ixbMcU9b2njOAsIs2v5vpsMN9gqNdl+UC5NC1CEuYlTAP265gsCKOlITZ9 oweidner@peahi diff --git a/paddle/scripts/cluster_train_v2/openmpi/start_mpi_train.sh b/paddle/scripts/cluster_train_v2/openmpi/start_mpi_train.sh new file mode 100644 index 0000000000000000000000000000000000000000..c645495448f9844de5ae9024b6a0f41452522765 --- /dev/null +++ b/paddle/scripts/cluster_train_v2/openmpi/start_mpi_train.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# General trainning configurations + +NICS=eth0 +PADDLE_INIT_PORT=7164 +PADDLE_INIT_PORTS_NUM=1 +PADDLE_INIT_PORTS_NUM_FOR_SPARSE=1 +PADDLE_INIT_PSERVERS=$(cat machines | sed -e ':a' -e 'N' -e '$!ba' -e 's/\n/,/g') +PADDLE_INIT_USE_GPU=False + +PADDLE_INIT_NUM_GRADIENT_SERVERS=${OMPI_COMM_WORLD_SIZE} +PADDLE_INIT_TRAINER_ID=${OMPI_COMM_WORLD_RANK} +PADDLE_CLUSTER_TRAIN=True + +env + +# start pserver +stdbuf -oL nohup paddle pserver --port=$PADDLE_INIT_PORT --ports_num=$PADDLE_INIT_PORTS_NUM \ + --ports_num_for_sparse=$PADDLE_INIT_PORTS_NUM_FOR_SPARSE --nics=$NICS \ + --comment=paddle_cluster_pserver \ + --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS &> logs/pserver.log & + +# start trainer +# NOTE: train.py will use the above environment variables as configuration +python train.py &> logs/train.log + +# kill background pservers when train finishes +ps -ef | grep pserver | awk '{print $2}' | xargs kill diff --git a/python/paddle/v2/framework/executor.py b/python/paddle/v2/framework/executor.py new file mode 100644 index 0000000000000000000000000000000000000000..8da5daad993e9ceaff93b5271c30a3b260b7abcc --- /dev/null +++ b/python/paddle/v2/framework/executor.py @@ -0,0 +1,59 @@ +import paddle.v2.framework.core as core +from paddle.v2.framework.framework import Block, Program + + +class Executor(object): + def __init__(self, places): + if not isinstance(places, list) and not isinstance(places, tuple): + places = [places] + + act_places = [] + for each in places: + p = core.Place() + p.set_place(each) + act_places.append(p) + + self.executor = core.Executor(act_places) + + def run(self, + program, + feed, + fetch_list, + feed_var_name='feed', + fetch_var_name='fetch'): + if not isinstance(program, Program): + raise TypeError() + + program = program.clone() + global_block = program.global_block() + feed_var = global_block.create_var( + name=feed_var_name, + type=core.VarDesc.VarType.FEED_MINIBATCH, + persistable=True) + + for i, name in enumerate(feed): + out = global_block.var(name) + global_block.prepend_op( + 'feed', + inputs={'X': [feed_var]}, + outputs={'Out': [out]}, + attrs={'col': i}) + # FIXME + core.set_feed_variable_float(feed[name], feed_var.name, i) + + fetch_var = global_block.create_var( + name=fetch_var_name, + type=core.VarDesc.VarType.FETCH_LIST, + persistable=True) + for i, var in enumerate(fetch_list): + global_block.append_op( + type='fetch', + inputs={'X': [var]}, + outputs={'Out': [fetch_var]}, + attrs={'col': i}) + + self.executor.run(program.desc, 0) + return [ + core.get_fetch_variable(fetch_var_name, i) + for i in xrange(len(fetch_list)) + ] diff --git a/python/paddle/v2/framework/framework.py b/python/paddle/v2/framework/framework.py index 5a8ded46ea4277f3e2032932e1774ba3ad056db9..622e09fdde9de1f05d141780e9f2fb9fb6416acd 100644 --- a/python/paddle/v2/framework/framework.py +++ b/python/paddle/v2/framework/framework.py @@ -15,6 +15,7 @@ class Variable(object): shape=None, dtype=None, lod_level=None, + persistable=False, **kwargs): self.block = block @@ -70,6 +71,17 @@ class Variable(object): "lod_level is {2}. They are not " "matched".format(self.name, self.lod_level, lod_level)) + if persistable is not None: + if is_new_var: + self.desc.set_persistable(persistable) + else: + if persistable != self.persistable: + raise ValueError( + "Variable {0} has been created before." + "The previous persistable is {1}; the new " + "persistable is {2}. They are not matched".format( + self.name, self.persistable, persistable)) + self.block.vars[name] = self self.op = None @@ -80,6 +92,10 @@ class Variable(object): __repr__ = __str__ + @property + def persistable(self): + return self.desc.persistable() + @property def name(self): return self.desc.name() @@ -240,7 +256,8 @@ class Operator(object): self.desc.set_block_attr(attr_name, attrs[attr_name].desc) self.desc.check_attrs() - self.desc.infer_shape(self.block.desc) + if type not in {'feed', 'fetch'}: + self.desc.infer_shape(self.block.desc) def __str__(self): protostr = self.desc.serialize_to_string() @@ -307,9 +324,12 @@ class Block(object): return self.desc.id def var(self, name): - if name not in self.vars: + if not isinstance(name, basestring): + raise TypeError() + v = self.vars.get(name, None) + if v is None: raise ValueError("var %s not in this block" % name) - return self.vars[name] + return v def all_parameters(self): return {v for k, v in self.vars.iteritems() if isinstance(v, Parameter)} @@ -348,18 +368,22 @@ class Block(object): for op_idx in range(0, self.desc.op_size()): ops_in_cpp.append(self.desc.op(op_idx)) - first_op_in_python = self.ops[0].desc - last_op_in_python = self.ops[len(self.ops) - 1].desc - start_index = None - end_index = None - for index in range(len(ops_in_cpp)): - if first_op_in_python == ops_in_cpp[index]: - start_index = index - if last_op_in_python == ops_in_cpp[index]: - end_index = index - assert start_index is not None - assert end_index is not None - assert start_index <= end_index + if len(self.ops) != 0: + first_op_in_python = self.ops[0].desc + last_op_in_python = self.ops[len(self.ops) - 1].desc + start_index = None + end_index = None + for index in range(len(ops_in_cpp)): + if first_op_in_python == ops_in_cpp[index]: + start_index = index + if last_op_in_python == ops_in_cpp[index]: + end_index = index + assert start_index is not None + assert end_index is not None + assert start_index <= end_index + else: + start_index = 0 + end_index = -1 # sync ops append to the head of cpp_ops for index in range((start_index - 1 - 1), -1, -1): @@ -379,14 +403,6 @@ class Block(object): class Program(object): - @classmethod - def instance(cls): - # From https://stackoverflow.com/questions/8212053 - # Making Program as a Singleton class. - if not hasattr(cls, '_instance'): - cls._instance = cls() - return cls._instance - def __init__(self): self.desc = core.ProgramDesc() self.blocks = [Block(self, 0)] @@ -397,7 +413,15 @@ class Program(object): proto = framework_pb2.ProgramDesc.FromString(str(protostr)) return proto.__str__() - __repr__ = __str__ + def clone(self): + p = Program() + p.desc = core.ProgramDesc(self.desc) + p.blocks = [Block(p, i) for i in xrange(self.desc.num_blocks())] + p.sync_with_cpp() + return p + + def __repr__(self): + return str(self) def global_block(self): return self.blocks[0] @@ -408,11 +432,13 @@ class Program(object): def current_block(self): return self.blocks[self.current_block_idx] - def append_backward(self, target, no_grad_set): + def append_backward(self, target, no_grad_set=None): """ return map(param_name -> (grad_name, block_index, op_index)) """ assert isinstance(target, Variable) + if no_grad_set is None: + no_grad_set = set() param_to_grad_info = self.desc.append_backward(target.desc, no_grad_set) self.sync_with_cpp() return param_to_grad_info @@ -445,7 +471,9 @@ class Parameter(Variable): if each < 0: raise ValueError("Parameter shape should not be related with " "batch-size") - Variable.__init__(self, block, shape=shape, dtype=dtype, **kwargs) + + Variable.__init__( + self, block, persistable=True, shape=shape, dtype=dtype, **kwargs) self.trainable = kwargs.get('trainable', True) self.init_attr = kwargs.get('initialize_attr', { 'type': 'uniform_random', @@ -470,4 +498,4 @@ class Parameter(Variable): # program is a global instance. -g_program = Program.instance() +g_program = Program() diff --git a/python/paddle/v2/framework/layers.py b/python/paddle/v2/framework/layers.py index c7397716c47dd7088d840edb00d96dda2fe88f1d..236427efcefafd8dc15f3f184f568887fdb00992 100644 --- a/python/paddle/v2/framework/layers.py +++ b/python/paddle/v2/framework/layers.py @@ -3,7 +3,7 @@ import paddle.v2.framework.core as core from paddle.v2.framework.framework import OpProtoHolder, Variable import re -__all__ = ['fc', 'data', 'cross_entropy', 'conv2d'] +__all__ = ['fc', 'data', 'cross_entropy', 'conv2d', 'pool2d'] def fc(input, @@ -35,7 +35,10 @@ def fc(input, "Y": w, }, outputs={"Out": tmp}, - attrs={'x_num_col_dims': num_flatten_dims}) + attrs={ + 'x_num_col_dims': num_flatten_dims, + 'y_num_col_dims': len(input_shape) - num_flatten_dims + }) mul_results.append(tmp) # sum @@ -55,9 +58,11 @@ def data(name, shape, data_type='float32', type=core.VarDesc.VarType.LOD_TENSOR, + append_batch_size=True, program=None): helper = LayerHelper('data', **locals()) - shape = [-1] + shape # append batch size as -1 + if append_batch_size: + shape = [-1] + shape # append batch size as -1 return helper.create_global_variable( name=name, shape=shape, dtype=data_type, type=type) @@ -112,7 +117,7 @@ def _create_op_func_(op_type): _create_op_func_('mean') -_create_op_func_('pool2d') +_create_op_func_('mul') def cross_entropy(input, label, **kwargs): @@ -167,6 +172,13 @@ def conv2d(input, raise ValueError("num_channels must be divisible by groups.") num_filter_channels = num_channels / groups + if isinstance(filter_size, int): + filter_size = [filter_size, filter_size] + if isinstance(stride, int): + stride = [stride, stride] + if isinstance(padding, int): + padding = [padding, padding] + input_shape = input.shape filter_shape = [num_filters, num_filter_channels] + filter_size filter = helper.create_parameter( @@ -187,3 +199,40 @@ def conv2d(input, pre_act = helper.append_bias_op(pre_bias) return helper.append_activation(pre_act) + + +def pool2d(input, + pool_size, + pool_type, + pool_stride=[1, 1], + pool_padding=[0, 0], + global_pooling=False, + program=None): + if pool_type not in ["max", "avg"]: + raise ValueError( + "Unknown pool_type: '%s'. It can only be 'max' or 'avg'.", + str(pool_type)) + if isinstance(pool_size, int): + pool_size = [pool_size, pool_size] + if isinstance(pool_stride, int): + pool_stride = [pool_stride, pool_stride] + if isinstance(pool_padding, int): + pool_padding = [pool_padding, pool_padding] + + helper = LayerHelper('conv2d', **locals()) + dtype = helper.input_dtype() + pool_out = helper.create_tmp_variable(dtype) + + helper.append_op( + type="pool2d", + inputs={"X": input}, + outputs={"Out": pool_out}, + attrs={ + "pooling_type": pool_type, + "ksize": pool_size, + "global_pooling": global_pooling, + "strides": pool_stride, + "paddings": pool_padding + }) + + return pool_out diff --git a/python/paddle/v2/framework/nets.py b/python/paddle/v2/framework/nets.py new file mode 100644 index 0000000000000000000000000000000000000000..381da55da3cd4e32fe09241a00d74e74e2de44f7 --- /dev/null +++ b/python/paddle/v2/framework/nets.py @@ -0,0 +1,24 @@ +import paddle.v2.framework.layers as layers + + +def simple_img_conv_pool(input, + filter_size, + num_filters, + pool_size, + pool_stride, + act, + program=None): + conv_out = layers.conv2d( + input=input, + num_filters=num_filters, + filter_size=filter_size, + act=act, + program=program) + + pool_out = layers.pool2d( + input=conv_out, + pool_size=pool_size, + pool_type='max', + pool_stride=pool_stride, + program=program) + return pool_out diff --git a/python/paddle/v2/framework/tests/test_adam_op.py b/python/paddle/v2/framework/tests/test_adam_op.py index ff6faafa6e2119fde11b9eb6cd2a65a75334ebe6..a0d6655d4cbcff8ed3d55df0f4e68fc6591fbb11 100644 --- a/python/paddle/v2/framework/tests/test_adam_op.py +++ b/python/paddle/v2/framework/tests/test_adam_op.py @@ -33,14 +33,12 @@ class TestAdamOp1(OpTest): self.attrs = {'epsilon': epsilon, 'beta1': beta1, 'beta2': beta2} - param_out, moment1_out, moment2_out, beta1_pow_out, \ - beta2_pow_out = adam_step(self.inputs, self.attrs) + param_out, moment1_out, \ + moment2_out = adam_step(self.inputs, self.attrs) self.outputs = { 'Moment1Out': moment1_out, 'Moment2Out': moment2_out, - 'Beta1PowOut': beta1_pow_out, - 'Beta2PowOut': beta2_pow_out, 'ParamOut': param_out } @@ -78,14 +76,12 @@ class TestAdamOp2(OpTest): attributes = {'epsilon': epsilon, 'beta1': beta1, 'beta2': beta2} - param_out, moment1_out, moment2_out, beta1_pow_out, \ - beta2_pow_out = adam_step(self.inputs, attributes) + param_out, moment1_out, \ + moment2_out = adam_step(self.inputs, attributes) self.outputs = { 'Moment1Out': moment1_out, 'Moment2Out': moment2_out, - 'Beta1PowOut': beta1_pow_out, - 'Beta2PowOut': beta2_pow_out, 'ParamOut': param_out } @@ -127,14 +123,12 @@ class TestAdamOpMultipleSteps(OpTest): def test_check_output(self): for _ in range(self.num_steps): - param_out, moment1_out, moment2_out, beta1_pow_out, \ - beta2_pow_out = adam_step(self.inputs, self.attrs) + param_out, moment1_out, \ + moment2_out = adam_step(self.inputs, self.attrs) self.outputs = { 'Moment1Out': moment1_out, 'Moment2Out': moment2_out, - 'Beta1PowOut': beta1_pow_out, - 'Beta2PowOut': beta2_pow_out, 'ParamOut': param_out } @@ -145,8 +139,10 @@ class TestAdamOpMultipleSteps(OpTest): self.inputs['Param'] = param_out self.inputs['Moment1'] = moment1_out self.inputs['Moment2'] = moment2_out - self.inputs['Beta1Pow'] = beta1_pow_out - self.inputs['Beta2Pow'] = beta2_pow_out + + # Update powers of Beta1 and Beta2 for next time step + self.inputs['Beta1Pow'] *= self.attrs['beta1'] + self.inputs['Beta2Pow'] *= self.attrs['beta1'] # Randomize gradient for next step self.inputs['Grad'] = np.random.uniform( @@ -175,11 +171,9 @@ def adam_step(inputs, attributes): moment1_out = beta1 * moment1 + (1 - beta1) * grad moment2_out = beta2 * moment2 + (1 - beta2) * np.square(grad) - beta1_pow_out = beta1_pow * beta1 - beta2_pow_out = beta2_pow * beta2 - lr_t = lr * np.sqrt(1 - beta2_pow_out) / (1 - beta1_pow_out) + lr_t = lr * np.sqrt(1 - beta2_pow) / (1 - beta1_pow) param_out = param - lr_t * (moment1_out / (np.sqrt(moment2_out) + epsilon)) - return param_out, moment1_out, moment2_out, beta1_pow_out, beta2_pow_out + return param_out, moment1_out, moment2_out if __name__ == "__main__": diff --git a/python/paddle/v2/framework/tests/test_adamax_op.py b/python/paddle/v2/framework/tests/test_adamax_op.py index af81075d6ad508dcd473ed596b00b036d87d894f..8e5a15aa3d12bbaae99cae6fcb627a336e48f684 100644 --- a/python/paddle/v2/framework/tests/test_adamax_op.py +++ b/python/paddle/v2/framework/tests/test_adamax_op.py @@ -31,14 +31,13 @@ class TestAdamaxOp1(OpTest): self.attrs = {'beta1': beta1, 'beta2': beta2, 'epsilon': epsilon} - param_out, moment_out, inf_norm_out, beta1_pow_out = adamax_step( - self.inputs, self.attrs) + param_out, moment_out, inf_norm_out = adamax_step(self.inputs, + self.attrs) self.outputs = { 'ParamOut': param_out, 'MomentOut': moment_out, - 'InfNormOut': inf_norm_out, - 'Beta1PowOut': beta1_pow_out + 'InfNormOut': inf_norm_out } def test_check_output(self): @@ -73,14 +72,12 @@ class TestAdamaxOp2(OpTest): } attrs = {'beta1': beta1, 'beta2': beta2, 'epsilon': epsilon} - param_out, moment_out, inf_norm_out, beta1_pow_out = adamax_step( - self.inputs, attrs) + param_out, moment_out, inf_norm_out = adamax_step(self.inputs, attrs) self.outputs = { 'ParamOut': param_out, 'MomentOut': moment_out, - 'InfNormOut': inf_norm_out, - 'Beta1PowOut': beta1_pow_out + 'InfNormOut': inf_norm_out } def test_check_output(self): @@ -117,19 +114,15 @@ class TestAdamaxOpMultipleSteps(OpTest): self.attrs = {'beta1': beta1, 'beta2': beta2, 'epsilon': epsilon} - param_out, moment_out, inf_norm_out, beta1_pow_out = adamax_step( - self.inputs, self.attrs) - def test_check_output(self): for _ in range(self.num_steps): - param_out, moment_out, inf_norm_out, beta1_pow_out = adamax_step( - self.inputs, self.attrs) + param_out, moment_out, inf_norm_out = adamax_step(self.inputs, + self.attrs) self.outputs = { 'ParamOut': param_out, 'MomentOut': moment_out, - 'InfNormOut': inf_norm_out, - 'Beta1PowOut': beta1_pow_out + 'InfNormOut': inf_norm_out } # Verify output for this step @@ -139,7 +132,9 @@ class TestAdamaxOpMultipleSteps(OpTest): self.inputs['Param'] = param_out self.inputs['Moment'] = moment_out self.inputs['InfNorm'] = inf_norm_out - self.inputs['Beta1Pow'] = beta1_pow_out + + # Update Beta1 Power accumulator for next step + self.inputs['Beta1Pow'] *= self.attrs['beta1'] # Randomize gradient for next step self.inputs['Grad'] = np.random.uniform( @@ -167,11 +162,10 @@ def adamax_step(inputs, attributes): moment_out = beta1 * moment + (1 - beta1) * grad inf_norm_out = np.maximum(beta2 * inf_norm + epsilon, np.abs(grad)) - beta1_pow_out = beta1_pow * beta1 - lr_t = (lr / (1 - beta1_pow_out)) + lr_t = (lr / (1 - beta1_pow)) param_out = param - lr_t * np.divide(moment_out, inf_norm_out) - return param_out, moment_out, inf_norm_out, beta1_pow_out + return param_out, moment_out, inf_norm_out if __name__ == "__main__": diff --git a/python/paddle/v2/framework/tests/test_executor_and_mul.py b/python/paddle/v2/framework/tests/test_executor_and_mul.py new file mode 100644 index 0000000000000000000000000000000000000000..35f775711167ce0d210044ab4cb382db802f39a5 --- /dev/null +++ b/python/paddle/v2/framework/tests/test_executor_and_mul.py @@ -0,0 +1,36 @@ +import unittest +from paddle.v2.framework.layers import mul, data +import paddle.v2.framework.core as core +from paddle.v2.framework.executor import Executor +from paddle.v2.framework.framework import g_program +import numpy + + +class TestExecutor(unittest.TestCase): + def test_mul(self): + a = data(name='a', shape=[784], data_type='float32') + b = data( + name='b', + shape=[784, 100], + data_type='float32', + append_batch_size=False) + out = mul(x=a, y=b) + place = core.CPUPlace() + a_np = numpy.random.random((100, 784)).astype('float32') + tensor_a = core.LoDTensor() + tensor_a.set(a_np, place) + b_np = numpy.random.random((784, 100)).astype('float32') + tensor_b = core.LoDTensor() + tensor_b.set(b_np, place) + exe = Executor(place) + outs = exe.run(g_program, + feed={'a': tensor_a, + 'b': tensor_b}, + fetch_list=[out]) + out = numpy.array(outs[0]) + self.assertEqual((100, 100), out.shape) + self.assertTrue(numpy.allclose(out, numpy.dot(a_np, b_np))) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/v2/framework/tests/test_layers.py b/python/paddle/v2/framework/tests/test_layers.py index dbbb6535389e2336d156734cc672e0cc7bba175c..4ecc02b12d8db53e897dea10186bc053d05be303 100644 --- a/python/paddle/v2/framework/tests/test_layers.py +++ b/python/paddle/v2/framework/tests/test_layers.py @@ -1,4 +1,5 @@ import paddle.v2.framework.layers as layers +import paddle.v2.framework.nets as nets from paddle.v2.framework.framework import Program, g_program import paddle.v2.framework.core as core import unittest @@ -18,7 +19,7 @@ class TestBook(unittest.TestCase): avg_cost = layers.mean(x=cost, program=program) self.assertIsNotNone(avg_cost) - program.append_backward(avg_cost, set()) + program.append_backward(avg_cost) print str(program) def test_recognize_digits_mlp(self): @@ -38,24 +39,52 @@ class TestBook(unittest.TestCase): cost = layers.cross_entropy(input=predict, label=label, program=program) avg_cost = layers.mean(x=cost, program=program) self.assertIsNotNone(avg_cost) - # print str(program) + print str(program) def test_simple_conv2d(self): - pd = core.ProgramDesc.__create_program_desc__() - program = Program(desc=pd) - images = data_layer( + program = Program() + images = layers.data( name='pixel', shape=[3, 48, 48], data_type='int32', program=program) - conv2d_layer( + layers.conv2d( input=images, num_filters=3, filter_size=[4, 4], program=program) - # print str(program) + print str(program) - def test_simple_conv2d(self): + def test_recognize_digits_conv(self): program = Program() + images = layers.data( - name='pixel', shape=[3, 48, 48], data_type='int32', program=program) - layers.conv2d( - input=images, num_filters=3, filter_size=[4, 4], program=program) + name='pixel', + shape=[1, 28, 28], + data_type='float32', + program=program) + label = layers.data( + name='label', shape=[1], data_type='int32', program=program) + conv_pool_1 = nets.simple_img_conv_pool( + input=images, + filter_size=5, + num_filters=2, + pool_size=2, + pool_stride=2, + act="relu", + program=program) + conv_pool_2 = nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=4, + pool_size=2, + pool_stride=2, + act="relu", + program=program) + + predict = layers.fc(input=conv_pool_2, + size=10, + act="softmax", + program=program) + cost = layers.cross_entropy(input=predict, label=label, program=program) + avg_cost = layers.mean(x=cost, program=program) + + program.append_backward(avg_cost) print str(program) diff --git a/python/paddle/v2/framework/tests/test_program.py b/python/paddle/v2/framework/tests/test_program.py index c98dc3492b9506f5a1639d26e3f0f1c2c9ad75d7..c55dd8de7282d4c941777054ad9d6437c87f0bc6 100644 --- a/python/paddle/v2/framework/tests/test_program.py +++ b/python/paddle/v2/framework/tests/test_program.py @@ -34,8 +34,26 @@ class TestProgram(unittest.TestCase): self.assertEqual(1, b.idx) self.assertEqual(0, b.parent_idx) + def test_program_clone(self): + prog = Program() + + x = prog.global_block().create_var( + name='X', shape=[1000, 784], dtype='float32') + + y = prog.global_block().create_var( + name='Y', shape=[784, 100], dtype='float32') + out = prog.global_block().create_var(name='Out', dtype='float32') + prog.global_block().append_op( + type="mul", inputs={'X': [x], + 'Y': [y]}, outputs={'Out': [out]}) + + # FIXME(yuyang18): We manual compare the output string, since the order + # of variable could be changed. + print prog + print prog.clone() + def test_append_backward(self): - prog = Program.instance() + prog = Program() block = prog.global_block() mul_x = block.create_var( diff --git a/python/paddle/v2/framework/tests/test_selected_rows.py b/python/paddle/v2/framework/tests/test_selected_rows.py index 661e81817951f5605ba3ca7fb0cc667074b1e37c..e8a930cb08c42b48f678bdd7bdb7698923535d4f 100644 --- a/python/paddle/v2/framework/tests/test_selected_rows.py +++ b/python/paddle/v2/framework/tests/test_selected_rows.py @@ -8,29 +8,30 @@ class TestSelectedRows(unittest.TestCase): place = core.CPUPlace() height = 10 rows = [0, 4, 7] - row_numel = 10 - selcted_rows = core.SelectedRows(rows, row_numel) - np_array = np.ones((len(rows), height)).astype("float32") + row_numel = 12 + selected_rows = core.SelectedRows(rows, height) + np_array = np.ones((len(rows), row_numel)).astype("float32") np_array[0, 0] = 2.0 np_array[2, 8] = 4.0 - tensor = selcted_rows.get_tensor() + tensor = selected_rows.get_tensor() tensor.set(np_array, place) # compare rows - self.assertEqual(0, selcted_rows.rows()[0]) - self.assertEqual(4, selcted_rows.rows()[1]) - self.assertEqual(7, selcted_rows.rows()[2]) + self.assertEqual(0, selected_rows.rows()[0]) + self.assertEqual(4, selected_rows.rows()[1]) + self.assertEqual(7, selected_rows.rows()[2]) # compare height - self.assertEqual(10, selcted_rows.height()) + self.assertEqual(10, selected_rows.height()) # compare tensor self.assertAlmostEqual(2.0, - selcted_rows.get_tensor().get_float_element(0)) + selected_rows.get_tensor().get_float_element(0)) self.assertAlmostEqual(1.0, - selcted_rows.get_tensor().get_float_element(1)) + selected_rows.get_tensor().get_float_element(1)) self.assertAlmostEqual( - 4.0, selcted_rows.get_tensor().get_float_element(2 * row_numel + 8)) + 4.0, + selected_rows.get_tensor().get_float_element(2 * row_numel + 8)) if __name__ == "__main__": diff --git a/python/paddle/v2/framework/tests/test_sgd_op.py b/python/paddle/v2/framework/tests/test_sgd_op.py index 2dd881e5e107249277a91bd8e3a72567269e1cd4..01262bba4d43adaed179baef88ccab6e69b0884b 100644 --- a/python/paddle/v2/framework/tests/test_sgd_op.py +++ b/python/paddle/v2/framework/tests/test_sgd_op.py @@ -1,5 +1,7 @@ import unittest import numpy as np +import paddle.v2.framework.core as core +from paddle.v2.framework.op import Operator from op_test import OpTest @@ -17,5 +19,70 @@ class TestSGDOp(OpTest): self.check_output() +class TestSparseSGDOp(unittest.TestCase): + def check_with_place(self, place): + scope = core.Scope() + + # create and initialize Grad Variable + height = 10 + rows = [0, 4, 7] + row_numel = 12 + + grad_selected_rows = scope.var('Grad').get_selected_rows() + grad_selected_rows.set_height(height) + grad_selected_rows.set_rows(rows) + np_array = np.ones((len(rows), row_numel)).astype("float32") + np_array[0, 0] = 2.0 + np_array[2, 8] = 4.0 + + grad_tensor = grad_selected_rows.get_tensor() + grad_tensor.set(np_array, place) + + # create and initialize Param Variable + param = scope.var('Param').get_tensor() + param_array = np.full((height, row_numel), 5.0).astype("float32") + param.set(param_array, place) + + # create and initialize LeraningRate Variable + lr = scope.var('LearningRate').get_tensor() + lr_array = np.full((1), 2.0).astype("float32") + lr.set(lr_array, place) + + # create and run sgd operator + sgd_op = Operator( + "sgd", + Param='Param', + Grad='Grad', + ParamOut='Param', + LearningRate='LearningRate') + ctx = core.DeviceContext.create(place) + sgd_op.run(scope, ctx) + + # get and compare result + result_array = np.array(param) + + # rows[0] = 0, 5.0 - 2.0 * 2.0 + self.assertAlmostEqual(1.0, result_array[rows[0], 0]) + # rows[0] = 0, 5.0 - 2.0 * 1.0 + self.assertAlmostEqual(3.0, result_array[rows[0], 2]) + # 5.0 - 2.0 * 0.0 + self.assertAlmostEqual(5.0, result_array[1, 0]) + # rows[1] = 4, 5.0 - 2.0 * 1.0 + self.assertAlmostEqual(3.0, result_array[rows[1], 10]) + # 5.0 - 2.0 * 0.0 + self.assertAlmostEqual(5.0, result_array[5, 8]) + # rows[2] = 7, 5.0 - 2.0 * 1.0 + self.assertAlmostEqual(3.0, result_array[rows[2], 1]) + # rows[2] = 7, 5.0 - 2.0 * 4.0 + self.assertAlmostEqual(-3.0, result_array[rows[2], 8]) + + def test_sparse_sgd(self): + places = [core.CPUPlace()] + if core.is_compile_gpu(): + places.append(core.GPUPlace(0)) + for place in places: + self.check_with_place(place) + + if __name__ == "__main__": unittest.main()