cluster_quick_start.rst 7.2 KB
Newer Older
Y
yuyang18 已提交
1 2 3 4 5 6 7 8 9 10 11
..  _cluster_quick_start:

分布式训练快速开始
==================

准备工作
--------

在本篇文章中,我们将会在介绍如何快速在一个集群中启动一个 PaddlePaddle
的分布式训练任务,在开始之前,请按如下步骤做些准备工作:

Y
Yancey1989 已提交
12 13
1. 准备一个网络连通的训练集群,在本文中我们使用4个训练节点使用 ``*.paddlepaddle.com``
   来表示节点的主机名称,您可以根据实际情况修改它。
Y
yuyang18 已提交
14

Y
Yancey1989 已提交
15
2. 在开始之前确保已经阅读过 :ref:`install_steps`
Y
yuyang18 已提交
16 17 18
   并且可以在集群的所有节点上可以正常运行 PaddlePaddle。

样例代码
Y
Yancey1989 已提交
19
-------
Y
yuyang18 已提交
20

21 22
下面使用一个非常简单的线性回归模型作为样例来解释如何启动一个包含2个 ``PSERVER`` 节点以及
2个 ``TRAINER`` 节点的分布式训练任务,您可以将本段代码保存为 ``dist_train.py`` 运行。
Y
yuyang18 已提交
23 24 25

.. code:: python

Y
Yancey1989 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
    import os
    import paddle
    import paddle.fluid as fluid

    # train reader
    BATCH_SIZE = 20
    EPOCH_NUM = 30
    BATCH_SIZE = 8

    train_reader = paddle.batch(
        paddle.reader.shuffle(
            paddle.dataset.uci_housing.train(), buf_size=500),
        batch_size=BATCH_SIZE)

    def train():
        y = fluid.layers.data(name='y', shape=[1], dtype='float32')
        x = fluid.layers.data(name='x', shape=[13], dtype='float32')
        y_predict = fluid.layers.fc(input=x, size=1, act=None)

        loss = fluid.layers.square_error_cost(input=y_predict, label=y)
        avg_loss = fluid.layers.mean(loss)
        opt = fluid.optimizer.SGD(learning_rate=0.001)
        opt.minimize(avg_loss)

        place = fluid.CPUPlace()
        feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
        exe = fluid.Executor(place)

        # fetch distributed training environment setting
        training_role = os.getenv("PADDLE_TRAINING_ROLE", None)
        port = os.getenv("PADDLE_PSERVER_PORT", "6174")
        pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
        trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
        eplist = []
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
        pserver_endpoints = ",".join(eplist)
        trainers = int(os.getenv("PADDLE_TRAINERS"))
        current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port

        t = fluid.DistributeTranspiler()
        t.transpile(
            trainer_id = trainer_id,
            pservers = pserver_endpoints,
            trainers = trainers)

        if training_role == "PSERVER":
            pserver_prog = t.get_pserver_program(current_endpoint)
            startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
            exe.run(startup_prog)
            exe.run(pserver_prog)
        elif training_role == "TRAINER":
            trainer_prog = t.get_trainer_program()
            exe.run(fluid.default_startup_program())

            for epoch in range(EPOCH_NUM):
                for batch_id, batch_data in enumerate(train_reader()):
                    avg_loss_value, = exe.run(trainer_prog,
                                          feed=feeder.feed(batch_data),
                                          fetch_list=[avg_loss])
                    if (batch_id + 1) % 10 == 0:
                        print("Epoch: {0}, Batch: {1}, loss: {2}".format(
                            epoch, batch_id, avg_loss_value[0]))
            # destory the resource of current trainer node in pserver server node
            exe.close()
        else:
            raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")

    train()

环境变量说明
-----------

在启动分布式训练任务时,使用不同的环境变量来表示不同的节点角色,具体如下:
Y
yuyang18 已提交
100

Y
Yancey1989 已提交
101 102 103 104 105 106 107 108 109 110 111 112 113 114
.. list-table::
  :header-rows: 1

  * - 环境变量
    - 数据类型
    - 样例
    - 描述
  * - :code:`PADDLE_TRAINING_ROLE`
    - str
    - :code:`PSERVER,TRAINER`
    - 当前训练节点角色
  * - :code:`PADDLE_PSERVER_IPS`
    - str
    - :code:`ps0.paddlepaddle.com,ps1.paddlepaddle.com`
115
    - 分布式训练任务中所有 PSERVER 节点的 IP 地址或 hostname, 使用","分隔
Y
Yancey1989 已提交
116 117 118
  * - :code:`PADDLE_PSERVER_PORT`
    - int
    - 6174
119
    - PSERVER 进程监听的端口
Y
Yancey1989 已提交
120 121 122 123 124 125
  * - :code:`PADDLE_TRAINERS`
    - int
    - 2
    - 分布式训练任务中 trainer 节点的数量
  * - :code:`PADDLE_CURRENT_IP`
    - str
Y
Yancey1989 已提交
126
    - :code:`ps0.paddlepaddle.com`
127
    - 当前 PSERVER 节点的 IP 地址或 hostname
Y
Yancey1989 已提交
128 129 130
  * - :code:`PADDLE_TRAINER_ID`
    - str 
    - 0
131
    - 当前 TRAINER 节点的 ID (唯一), 取值范围为 [0, PADDLE_TRAINERS)
Y
Yancey1989 已提交
132

Y
Yancey1989 已提交
133
注: 环境变量只是获取运行时信息的一种方式,实际任务中可以采用命令行参数等方式获取运行时信息。
Y
Yancey1989 已提交
134 135 136 137 138 139 140

分布式训练相关 API
------------------

DistributeTranspiler
~~~~~~~~~~~~~~~~~~~~~~

141
基于 pserver-trainer 架构的的分布式训练任务分为两种角色: Parameter Server(PSERVER) 以及 TRAINER, 
Y
Yancey1989 已提交
142
在 Fluid 中,用户只需配置单机训练所需要的网络配置, ``DistributeTranspiler`` 模块会自动地根据
143
当前训练节点的角色将用户配置的单机网路配置改写成 PSERVER 和 TRAINER 需要运行的网络配置:
Y
yuyang18 已提交
144

Y
Yancey1989 已提交
145
.. code:: python
Y
yuyang18 已提交
146

Y
Yancey1989 已提交
147 148 149 150 151 152
    t = fluid.DistributeTranspiler()
    t.transpile(
        trainer_id = trainer_id,                   
        pservers = pserver_endpoints,    
        trainers = trainers)
    if PADDLE_TRAINING_ROLE == "TRAINER":
H
Hao Wang 已提交
153
        # fetch the trainer program and execute it
Y
Yancey1989 已提交
154 155
        trainer_prog = t.get_trainer_program()
        ...
Y
yuyang18 已提交
156

Y
Yancey1989 已提交
157
    elif PADDLE_TRAINER_ROLE == "PSERVER":
H
Hao Wang 已提交
158
        # fetch the pserver program and execute it
Y
Yancey1989 已提交
159 160
        pserver_prog = t.get_pserver_program(current_endpoint) 
        ...
Y
yuyang18 已提交
161

Y
Yancey1989 已提交
162 163
exe.close()
~~~~~~~~~~~~~~
Y
yuyang18 已提交
164

165 166
PSERVER 节点中会保存所有 TRAINER 节点的状态信息,在 TRAINER 结束训练时需要调用 ``exe.close()``
通知所有 PSERVER 节点释放当前 TRAINER 节点的资源:
Y
yuyang18 已提交
167

Y
Yancey1989 已提交
168
.. code:: python
Y
yuyang18 已提交
169

Y
Yancey1989 已提交
170 171 172
    exe = fluid.Executor(fluid.CPUPlace())
    # training process ...
    exe.close() # notify PServer to destory the resource
Y
yuyang18 已提交
173

174 175 176
注意:所有的trainer在退出时都需要调用exe.close()。


Y
Yancey1989 已提交
177 178
启动分布式训练任务
--------------------
Y
yuyang18 已提交
179 180 181 182 183 184 185 186 187

.. list-table::
   :header-rows: 1

   * - 启动节点
     - 启动命令
     - 说明
   * - ps0.paddlepaddle.com
     - :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps0.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
188
     - 启动 PSERVER 节点
Y
yuyang18 已提交
189 190
   * - ps1.paddlepaddle.com
     - :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps1.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
191
     - 启动 PSERVER 节点
Y
yuyang18 已提交
192
   * - trainer0.paddlepaddle.com
Y
Yancey1989 已提交
193
     - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=0 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
194
     - 启动第0号 TRAINER 节点
Y
yuyang18 已提交
195
   * - trainer1.paddlepaddle.com
Y
Yancey1989 已提交
196
     - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=1 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
197
     - 启动第1号 TRAINER 节点