cluster_quick_start.rst 13.1 KB
Newer Older
D
Dong Daxiang 已提交
1 2
分布式训练快速开始
==================
Y
yuyang18 已提交
3

D
Dong Daxiang 已提交
4 5 6 7 8 9 10
使用Fleet API进行分布式训练
---------------------------

从Paddle Fluid `Release
1.5.1 <https://github.com/PaddlePaddle/Paddle/releases/tag/v1.5.1>`__
开始,官方推荐使用Fleet API进行分布式训练,关于Fleet API的介绍可以参考
`Fleet Design Doc <https://github.com/PaddlePaddle/Fleet>`__
D
Dong Daxiang 已提交
11

D
Dong Daxiang 已提交
12
准备条件
D
Dong Daxiang 已提交
13 14
~~~~~~~~

D
Dong Daxiang 已提交
15 16 17 18 19 20 21
-  [x] 成功安装Paddle
   Fluid,如果尚未安装,请参考\ `快速开始 <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/beginners_guide/quick_start_cn.html>`__
-  [x]
   学会最基本的单机训练方法,请参考\ `单机训练 <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/user_guides/howto/training/single_node.html>`__\ 中描述的单卡训练,进行学习

点击率预估任务
~~~~~~~~~~~~~~
D
Dong Daxiang 已提交
22

D
Dong Daxiang 已提交
23 24 25 26
本文使用一个简单的示例,点击率预估任务,来说明如何使用Fleet
API进行分布式训练的配置方法,并利用单机环境模拟分布式环境给出运行示例。示例的源码来自\ `CTR
with
Fleet <https://github.com/PaddlePaddle/Fleet/tree/develop/examples/ctr>`__
D
Dong Daxiang 已提交
27

D
Dong Daxiang 已提交
28
为了方便学习,这里给出的示例是单机与多机混合的代码,用户可以通过不同的启动命令进行单机或多机任务的启动。
D
Dong Daxiang 已提交
29 30 31

.. code:: python

D
Dong Daxiang 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
        from __future__ import print_function
        from args import parse_args
        import os
        import paddle.fluid as fluid
        import sys
        from network_conf import ctr_dnn_model_dataset
        import paddle.fluid.incubate.fleet.base.role_maker as role_maker
        from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
        from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
        dense_feature_dim = 13
        sparse_feature_dim = 10000001
        batch_size = 100
        thread_num = 10
        embedding_size = 10
        args = parse_args()
        def main_function(is_local):
D
Dong Daxiang 已提交
48
            # common code for local training and distributed training
D
Dong Daxiang 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
            dense_input = fluid.layers.data(
                name="dense_input", shape=[dense_feature_dim], dtype='float32')
            sparse_input_ids = [
                fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1,
                                  dtype="int64") for i in range(1, 27)]
            label = fluid.layers.data(name="label", shape=[1], dtype="int64")
            dataset = fluid.DatasetFactory().create_dataset()
            dataset.set_use_var([dense_input] + sparse_input_ids + [label])
            pipe_command = "python criteo_reader.py %d" % sparse_feature_dim
            dataset.set_pipe_command(pipe_command)
            dataset.set_batch_size(batch_size)
            dataset.set_thread(thread_num)
            whole_filelist = ["raw_data/part-%d" % x 
                              for x in range(len(os.listdir("raw_data")))]
            dataset.set_filelist(whole_filelist)
            loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(
                dense_input, sparse_input_ids, label, embedding_size,
                sparse_feature_dim)
            exe = fluid.Executor(fluid.CPUPlace())
            def train_loop(epoch=20):
                for i in range(epoch):
                    exe.train_from_dataset(program=fluid.default_main_program(),
                                           dataset=dataset,
                                           fetch_list=[auc_var],
                                           fetch_info=["auc"],
                                           debug=False)
D
Dong Daxiang 已提交
75 76
                    
            # local training
D
Dong Daxiang 已提交
77 78 79
            def local_train(optimizer):
                optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
                optimizer.minimize(loss)
D
Dong Daxiang 已提交
80 81
                exe.run(fluid.default_startup_program())
                train_loop()
D
Dong Daxiang 已提交
82 83
                
            # distributed training
D
Dong Daxiang 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
            def dist_train(optimizer):
                role = role_maker.PaddleCloudRoleMaker()
                fleet.init(role)
                strategy = DistributeTranspilerConfig()
                strategy.sync_mode = False
                optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
                optimizer = fleet.distributed_optimizer(optimizer, strategy)
                optimizer.minimize(loss)
                if fleet.is_server():
                    fleet.init_server()
                    fleet.run_server()
                elif fleet.is_worker():
                    fleet.init_worker()
                    exe.run(fluid.default_startup_program())
                    train_loop()
D
Dong Daxiang 已提交
99
                    
D
Dong Daxiang 已提交
100 101 102 103
            if is_local:
                local_train(optimizer)
            else:
                dist_train(optimizer)
D
Dong Daxiang 已提交
104
                
D
Dong Daxiang 已提交
105 106
        if __name__ == '__main__':
            main_function(args.is_local)
D
Dong Daxiang 已提交
107

D
Dong Daxiang 已提交
108 109 110 111
-  说明:示例中使用的IO方法是dataset,想了解具体的文档和用法请参考\ `Dataset
   API <hhttps://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/api_cn/dataset_cn.html>`__\ 。示例中使用的\ ``train_from_dataset``\ 接口,想了解具体的文档和使用方法请参考\ `Executor
   API <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/api_cn/executor_cn.html>`__

D
Dong Daxiang 已提交
112
单机训练启动命令
D
Dong Daxiang 已提交
113
^^^^^^^^^^^^^^^^
D
Dong Daxiang 已提交
114

D
Dong Daxiang 已提交
115
.. code:: python
D
Dong Daxiang 已提交
116

D
Dong Daxiang 已提交
117
        python train.py --is_local 1
D
Dong Daxiang 已提交
118

D
Dong Daxiang 已提交
119 120 121
单机模拟分布式训练的启动命令
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

D
Dong Daxiang 已提交
122
在单机模拟多机训练的启动命令,这里我们用到了paddle内置的一个启动器launch\_ps,用户可以指定worker和server的数量进行参数服务器任务的启动
D
Dong Daxiang 已提交
123

D
Dong Daxiang 已提交
124 125
.. code:: python

D
Dong Daxiang 已提交
126
        python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 train.py
D
Dong Daxiang 已提交
127

D
Dong Daxiang 已提交
128
任务运行的日志在工作目录的logs目录下可以查看,当您能够使用单机模拟分布式训练,可以进行真正的多机分布式训练。我们建议用户直接参\ `百度云运行分布式任务的示例 <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/user_guides/howto/training/deploy_ctr_on_baidu_cloud_cn.html>`__
D
Dong Daxiang 已提交
129 130


Y
yuyang18 已提交
131 132 133 134 135 136 137 138 139
分布式训练快速开始
==================

准备工作
--------

在本篇文章中,我们将会在介绍如何快速在一个集群中启动一个 PaddlePaddle
的分布式训练任务,在开始之前,请按如下步骤做些准备工作:

Y
Yancey1989 已提交
140 141
1. 准备一个网络连通的训练集群,在本文中我们使用4个训练节点使用 ``*.paddlepaddle.com``
   来表示节点的主机名称,您可以根据实际情况修改它。
Y
yuyang18 已提交
142

Y
Yancey1989 已提交
143
2. 在开始之前确保已经阅读过 :ref:`install_steps`
Y
yuyang18 已提交
144 145 146
   并且可以在集群的所有节点上可以正常运行 PaddlePaddle。

样例代码
Y
Yancey1989 已提交
147
-------
Y
yuyang18 已提交
148

149 150
下面使用一个非常简单的线性回归模型作为样例来解释如何启动一个包含2个 ``PSERVER`` 节点以及
2个 ``TRAINER`` 节点的分布式训练任务,您可以将本段代码保存为 ``dist_train.py`` 运行。
Y
yuyang18 已提交
151 152 153

.. code:: python

Y
Yancey1989 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
    import os
    import paddle
    import paddle.fluid as fluid

    # train reader
    BATCH_SIZE = 20
    EPOCH_NUM = 30
    BATCH_SIZE = 8

    train_reader = paddle.batch(
        paddle.reader.shuffle(
            paddle.dataset.uci_housing.train(), buf_size=500),
        batch_size=BATCH_SIZE)

    def train():
        y = fluid.layers.data(name='y', shape=[1], dtype='float32')
        x = fluid.layers.data(name='x', shape=[13], dtype='float32')
        y_predict = fluid.layers.fc(input=x, size=1, act=None)

        loss = fluid.layers.square_error_cost(input=y_predict, label=y)
        avg_loss = fluid.layers.mean(loss)
        opt = fluid.optimizer.SGD(learning_rate=0.001)
        opt.minimize(avg_loss)

        place = fluid.CPUPlace()
        feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
        exe = fluid.Executor(place)

        # fetch distributed training environment setting
        training_role = os.getenv("PADDLE_TRAINING_ROLE", None)
        port = os.getenv("PADDLE_PSERVER_PORT", "6174")
        pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
        trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
        eplist = []
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
        pserver_endpoints = ",".join(eplist)
        trainers = int(os.getenv("PADDLE_TRAINERS"))
        current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port

        t = fluid.DistributeTranspiler()
        t.transpile(
            trainer_id = trainer_id,
            pservers = pserver_endpoints,
            trainers = trainers)

        if training_role == "PSERVER":
            pserver_prog = t.get_pserver_program(current_endpoint)
            startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
            exe.run(startup_prog)
            exe.run(pserver_prog)
        elif training_role == "TRAINER":
            trainer_prog = t.get_trainer_program()
            exe.run(fluid.default_startup_program())

            for epoch in range(EPOCH_NUM):
                for batch_id, batch_data in enumerate(train_reader()):
                    avg_loss_value, = exe.run(trainer_prog,
                                          feed=feeder.feed(batch_data),
                                          fetch_list=[avg_loss])
                    if (batch_id + 1) % 10 == 0:
                        print("Epoch: {0}, Batch: {1}, loss: {2}".format(
                            epoch, batch_id, avg_loss_value[0]))
            # destory the resource of current trainer node in pserver server node
            exe.close()
        else:
            raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")

    train()

环境变量说明
-----------

在启动分布式训练任务时,使用不同的环境变量来表示不同的节点角色,具体如下:
Y
yuyang18 已提交
228

Y
Yancey1989 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241 242
.. list-table::
  :header-rows: 1

  * - 环境变量
    - 数据类型
    - 样例
    - 描述
  * - :code:`PADDLE_TRAINING_ROLE`
    - str
    - :code:`PSERVER,TRAINER`
    - 当前训练节点角色
  * - :code:`PADDLE_PSERVER_IPS`
    - str
    - :code:`ps0.paddlepaddle.com,ps1.paddlepaddle.com`
243
    - 分布式训练任务中所有 PSERVER 节点的 IP 地址或 hostname, 使用","分隔
Y
Yancey1989 已提交
244 245 246
  * - :code:`PADDLE_PSERVER_PORT`
    - int
    - 6174
247
    - PSERVER 进程监听的端口
Y
Yancey1989 已提交
248 249 250 251 252 253
  * - :code:`PADDLE_TRAINERS`
    - int
    - 2
    - 分布式训练任务中 trainer 节点的数量
  * - :code:`PADDLE_CURRENT_IP`
    - str
Y
Yancey1989 已提交
254
    - :code:`ps0.paddlepaddle.com`
255
    - 当前 PSERVER 节点的 IP 地址或 hostname
Y
Yancey1989 已提交
256 257 258
  * - :code:`PADDLE_TRAINER_ID`
    - str 
    - 0
259
    - 当前 TRAINER 节点的 ID (唯一), 取值范围为 [0, PADDLE_TRAINERS)
Y
Yancey1989 已提交
260

Y
Yancey1989 已提交
261
注: 环境变量只是获取运行时信息的一种方式,实际任务中可以采用命令行参数等方式获取运行时信息。
Y
Yancey1989 已提交
262 263 264 265 266 267 268

分布式训练相关 API
------------------

DistributeTranspiler
~~~~~~~~~~~~~~~~~~~~~~

269
基于 pserver-trainer 架构的的分布式训练任务分为两种角色: Parameter Server(PSERVER) 以及 TRAINER, 
Y
Yancey1989 已提交
270
在 Fluid 中,用户只需配置单机训练所需要的网络配置, ``DistributeTranspiler`` 模块会自动地根据
271
当前训练节点的角色将用户配置的单机网路配置改写成 PSERVER 和 TRAINER 需要运行的网络配置:
Y
yuyang18 已提交
272

Y
Yancey1989 已提交
273
.. code:: python
Y
yuyang18 已提交
274

Y
Yancey1989 已提交
275 276 277 278 279 280
    t = fluid.DistributeTranspiler()
    t.transpile(
        trainer_id = trainer_id,                   
        pservers = pserver_endpoints,    
        trainers = trainers)
    if PADDLE_TRAINING_ROLE == "TRAINER":
H
Hao Wang 已提交
281
        # fetch the trainer program and execute it
Y
Yancey1989 已提交
282 283
        trainer_prog = t.get_trainer_program()
        ...
Y
yuyang18 已提交
284

Y
Yancey1989 已提交
285
    elif PADDLE_TRAINER_ROLE == "PSERVER":
H
Hao Wang 已提交
286
        # fetch the pserver program and execute it
Y
Yancey1989 已提交
287 288
        pserver_prog = t.get_pserver_program(current_endpoint) 
        ...
Y
yuyang18 已提交
289

Y
Yancey1989 已提交
290 291
exe.close()
~~~~~~~~~~~~~~
Y
yuyang18 已提交
292

293 294
PSERVER 节点中会保存所有 TRAINER 节点的状态信息,在 TRAINER 结束训练时需要调用 ``exe.close()``
通知所有 PSERVER 节点释放当前 TRAINER 节点的资源:
Y
yuyang18 已提交
295

Y
Yancey1989 已提交
296
.. code:: python
Y
yuyang18 已提交
297

Y
Yancey1989 已提交
298 299 300
    exe = fluid.Executor(fluid.CPUPlace())
    # training process ...
    exe.close() # notify PServer to destory the resource
Y
yuyang18 已提交
301

302 303 304
注意:所有的trainer在退出时都需要调用exe.close()。


Y
Yancey1989 已提交
305 306
启动分布式训练任务
--------------------
Y
yuyang18 已提交
307 308 309 310 311 312 313 314 315

.. list-table::
   :header-rows: 1

   * - 启动节点
     - 启动命令
     - 说明
   * - ps0.paddlepaddle.com
     - :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps0.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
316
     - 启动 PSERVER 节点
Y
yuyang18 已提交
317 318
   * - ps1.paddlepaddle.com
     - :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps1.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
319
     - 启动 PSERVER 节点
Y
yuyang18 已提交
320
   * - trainer0.paddlepaddle.com
Y
Yancey1989 已提交
321
     - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=0 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
322
     - 启动第0号 TRAINER 节点
Y
yuyang18 已提交
323
   * - trainer1.paddlepaddle.com
Y
Yancey1989 已提交
324
     - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=1 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
325
     - 启动第1号 TRAINER 节点