From b360dec9db3930cdb11cb284a6f81a83bdb8181e Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 21 Apr 2017 15:07:17 +0800 Subject: [PATCH] update doc --- doc/design/cluster_train/README.md | 4 ++-- doc/design/cluster_train/checkpointing.md | 14 ++++++------- doc/design/cluster_train/data_dispatch.md | 25 +++++++++++++++-------- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/doc/design/cluster_train/README.md b/doc/design/cluster_train/README.md index 84678481c..b88a8f382 100644 --- a/doc/design/cluster_train/README.md +++ b/doc/design/cluster_train/README.md @@ -23,9 +23,9 @@ Their relation is illustrated in the following graph: -By coordinate these processes, paddle can complete the procedure of training neural networks using SGD. Paddle can support both "synchronize SGD" and "asynchronize SGD". +By coordinating these processes, PaddlePaddle supports use both Synchronize Stochastic Gradient Descent (sync SGD) and Asynchronous Stochastic Gradient Descent (async SGD) to train user-defined neural network topologies. -When training with "sync SGD", paddle parameter servers use barriers to wait for all trainers to finish gradients update. When using "async SGD", parameter servers would not wait for all trainers, so training and parameter optimize will run in parallel. parameter servers will not depend on each other, they will receive the gradients update in parrallel; Also trainers will not depend on each other, run training jobs in parrallel. Using asyc SGD will be faster when training, but parameters on one of the parameter server will be newer than the other, but this will introduce more Randomness. +When training with sync SGD, parameter servers wait for all trainers to finish gradients update and then send the updated parameters to trainers, training can not proceed until the trainer received the updated parameters. This creates a synchronization point between trainers. When training with async SGD, each trainer upload gradient and download new parameters individually, without the synchronization with other trainers. Using asyc SGD will be faster in terms of time per pass, but have more noise in gradient since trainers are likely to have a stale model. ### Master Process diff --git a/doc/design/cluster_train/checkpointing.md b/doc/design/cluster_train/checkpointing.md index 0a2682899..a4d09004b 100644 --- a/doc/design/cluster_train/checkpointing.md +++ b/doc/design/cluster_train/checkpointing.md @@ -1,5 +1,5 @@ -## 模型参数检查点(Checkpointing) -模型数据检查点的实现,可以有效的避免parameter server的单点或多点同时故障。模型参数检查点通过定期向磁盘上保存一份存储在parameter server内存中的模型数据的完整镜像,来保证训练过程可以从中间状态重新启动。在一个不可中断并缺少备份的训练任务中,可以通过阶段性的保存每个parameter server的数据快照(snapshot)到 ***分布式存储服务*** 达到容灾的目的,比如每隔10分钟最新的快照,并删除更早的快照。在出现单点故障时,只需要恢复这台节点,或者将这台节点迁移到另一个节点并启动即可恢复训练任务。 +## 模型参数检查点(Checkpointing) +模型数据检查点的实现,可以有效的避免parameter server的单点或多点同时故障。模型参数检查点通过定期向磁盘上保存一份存储在parameter server内存中的模型数据的完整镜像,来保证训练过程可以从中间状态重新启动。在一个不可中断并缺少备份的训练任务中,可以通过阶段性的保存每个parameter server的数据快照(snapshot)到 ***分布式存储服务*** 达到容灾的目的,比如每隔10分钟最新的快照,并删除更早的快照。在出现单点故障时,只需要恢复这台节点,或者将这台节点迁移到另一个节点并启动即可恢复训练任务。 @@ -13,10 +13,10 @@ 检查点保存程序流程: -1. 如果满足条件""每个pass或每n个mini-batch"时,parameter server会锁住保存parameter的内存,开始保存检查点。如果已经正在执行保存检查点的任务,则忽略。 +1. 如果满足条件"每隔10分钟"时,parameter server会获取parameters内存的`read_lock`,启动一个新的线程开始保存检查点。如果已经正在执行保存检查点的线程,则忽略。由于对parameters的更新需要获取parameters内存的`write_lock`,所以在写入快照的过程中,parameter server会暂停参数更新并等待。 2. parameter server生成一个UUID,向指定的目录中一个新的文件(文件名为此UUID)写入快照数据。在快照写入完成后,计算这个文件的MD5 sum。然后在etcd的`/checkpoints/[pserver_id]`中写入json内容:`{"uuid": [UUID], "md5", "MD5 sum", "timestamp": xxxx}`。 3. 删除磁盘目录中不是当前uuid的快照文件。 -4. 释放对paramters内存的锁定。 +4. 释放对paramters内存的锁定,停止保存检查点的线程。 这里需要用户额外注意,在您的实际环境中,训练任务的运行可能会占满trainer和parameter server之间的网络带宽,如果parameter server此时还需要通过网络访问分布式存储以保存快照,可能会造成网络拥塞,而出现阶段性的运行停滞。 @@ -30,8 +30,8 @@ 1. 开始提供服务 ## TODO List -### 推测执行/加速执行(TODO) -在异构集群中,如果存在某些trainer执行速度过慢会影响整体集群的速度(如图中Trainer 1),此时master将负责启动一个新的Trainer(Accelerate Trainer 2),使用同样的训练数据block。哪个trainer先完成block的训练,则把另一个慢速的kill掉。 +### 推测执行/加速执行(TODO) +在异构集群中,如果存在某些trainer执行速度过慢会影响整体集群的速度(如图中Trainer 1),此时master将负责启动一个新的Trainer(Accelerate Trainer 2),使用同样的训练数据block。哪个trainer先完成block的训练,则把另一个慢速的kill掉。 ### 动态扩容/缩容 目前只考虑动态扩容trainer数量,可以减小系统复杂性。 @@ -42,4 +42,4 @@ * shard: 分片,通常指将一个整体拆分成多份的其中的一份。 * model shard: 将一个神经网络参数拆分成多份,每个shard分别存储在其中一台parameter server之上 * parameter block: 多个parameter block构成一个model shard -* 单点故障: 任意时刻只可能同时有一台服务器故障。由于集群中同时存在两台机器故障的概率极低((平均故障率*平均故障修复时间)^2)只对特殊在线系统考虑两台以上同时故障的容灾。 +* 单点故障: 任意时刻只可能同时有一台服务器故障。由于集群中同时存在两台机器故障的概率极低((平均故障率*平均故障修复时间)^2)只对特殊在线系统考虑两台以上同时故障的容灾。 diff --git a/doc/design/cluster_train/data_dispatch.md b/doc/design/cluster_train/data_dispatch.md index cce3deb00..b013b14e4 100644 --- a/doc/design/cluster_train/data_dispatch.md +++ b/doc/design/cluster_train/data_dispatch.md @@ -1,7 +1,7 @@ ## 训练数据的存储和分发 ### 流程介绍 -生产环境中的训练数据集通常体积很大,并被存储在诸如Hadoop HDFS, Ceph, AWS S3之类的分布式存储之上。这些分布式存储服务通常会把数据切割成多个分片分布式的存储在多个节点之上。这样就可以在云端执行多种数据类计算任务,包括: +生产环境中的训练数据集通常体积很大,并被存储在诸如Hadoop HDFS,Ceph,AWS S3之类的分布式存储之上。这些分布式存储服务通常会把数据切割成多个分片分布式的存储在多个节点之上。这样就可以在云端执行多种数据类计算任务,包括: * 数据预处理任务 * Paddle训练任务 @@ -9,11 +9,11 @@ -在上图中显示了在一个实际生产环境中的应用(人脸识别)的数据流图。生产环境的日志数据会通过实时流的方式(Kafka)和离线数据的方式(HDFS)存储,并在集群中运行多个分布式数据处理任务,比如流式数据处理(online data process),离线批处理(offline data process)完成数据的预处理,提供给paddle作为训练数据。用于也可以上传labeled data到分布式存储补充训练数据。在paddle之上运行的深度学习训练输出的模型会提供给在线人脸识别的应用使用。 +在上图中显示了在一个实际生产环境中的应用(人脸识别)的数据流图。生产环境的日志数据会通过实时流的方式(Kafka)和离线数据的方式(HDFS)存储,并在集群中运行多个分布式数据处理任务,比如流式数据处理(online data process),离线批处理(offline data process)完成数据的预处理,提供给paddle作为训练数据。用于也可以上传labeled data到分布式存储补充训练数据。在paddle之上运行的深度学习训练输出的模型会提供给在线人脸识别的应用使用。 ### 训练数据的存储 -选择GlusterFS作为训练数据的存储服务(后续的实现考虑HDFS)。 +选择GlusterFS作为训练数据的存储服务(后续的实现考虑HDFS)。 在Kubernetes上运行的不同的计算框架,可以通过Volume或PersistentVolume挂载存储空间到每个容器中。 @@ -39,7 +39,7 @@ paddle upload train_data.list ... ``` -对于文本类训练数据样例如下(机器翻译),一行中包含源语言,目标语言的文本(label): +对于文本类训练数据样例如下(机器翻译),一行中包含源语言,目标语言的文本(label): ``` L' inflation , en Europe , a dérapé sur l' alimentation Food : Where European inflation slipped up @@ -63,22 +63,29 @@ trainer.train内部会获取reader的内容: ``` def paddle.train(batch_reader): - r = batch_reader() # create a interator for one pass of data + r = batch_reader() # create a iterator for one pass of data for batch in r: # train ``` -这里面batch是含有128个data instance的mini-batch。每一个data instance会是一个tuple,tuple元素的顺序与`.list`文件文件中每一列的顺序是一致的。每一个data instance会是(raw_image_file_binary_data, label)。其中raw_image_file_binary_data是对应图像文件的没有解码的原始二进制数据,用户需要自己解码。label是文本类型(比如:“1“,”2“),这里用户需要的其实是整形,用户需要自己转换成整形。 +这里面batch是含有128个data instance的mini-batch。每一个data instance会是一个tuple,tuple元素的顺序与`.list`文件文件中每一列的顺序是一致的。每一个data instance会是(raw_image_file_binary_data, label)。其中raw_image_file_binary_data是对应图像文件的没有解码的原始二进制数据,用户需要自己解码。label是文本类型(比如:“1“,”2“),这里用户需要的其实是整形,用户需要自己转换成整形。 ### 实现reader reader的实现需要考虑本地训练程序实现之后,可以不修改程序直接提交集群进行分布式训练。要达到这样的目标,需要实现下面的功能: -paddle会实现内置的默认reader和对公开数据集的reader(如MNIST,BOW等)。这些内置的reader都会被一个`paddle.reader`修饰器修饰。这个修饰器会读取环境变量`PADDLE_TRAIN_LOCAL`,如果是True,则返回只有一个文件的task queue生成器,这个文件就是reader传入的文件。如果是False,则为分布式训练模式(在集群中训练的任务,都会从这个环境变量获得False值),开始读取task queue,获取分布式存储系统中的文件名数据,并返回这个task queue。不同的reader的实现,都要遵守固定的方式:从task queue生成器中逐个获取文件名,完成对不同训练数据的解析。 +paddle会封装一个在集群中使用的reader: `paddle.dist.reader()`。在集群训练时需要使用这个reader指定要使用的数据集开始训练。用户的训练程序需要按照如下方式初始化reader: -同样用户在实现自己的reader时,也需要使用此修饰器来修饰reader函数。 +```python +if os.getenv("PADDLE_TRAIN_LOCAL"): + reader = my_local_reader("dataset-name") +else: + reader = paddle.dist.reader("dataset-name") +``` + +用户训练程序提交到集群之后,集群会自动设置`PADDLE_TRAIN_LOCAL`环境变量,reader会被配置成集群训练的版本。其中`paddle.dist.reader()`需要从master的队列中获得需要开始执行的训练task,并找到对应的训练数据文件,开始训练任务。如果用户的训练数据源来自于其他服务,比如从集群中的Kafka,zeromq队列读取,也可以根据实际情况实现集群中运行的reader程序。 ## TODO -### 支持将数据合并成内部的文件格式(key-value),方便sharding与顺序读取 +### 支持将数据合并成内部的文件格式(key-value),方便sharding与顺序读取 ### 支持用户自定义的数据预处理job -- GitLab