cluster_quick_start.rst 12.9 KB
Newer Older
Y
yuyang18 已提交
1 2
..  _cluster_quick_start:

D
Dong Daxiang 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
从Paddle Fluid 1.5开始,我们推荐使用Fleet API进行分布式训练,关于Fleet API的介绍可以参考 :ref:`fleet_api`

首先,我们假设读者已经学会单机训练,如果还没单机训练的经验,请参考 :ref:`single_training`
想了解分布式训练,我们可以从单机模拟分布式训练开始,在单台机器上启动多个进程代表多台机器,并进行分布式训练。

为了让读者快速上手,我们采用点击率预估任务作为示例,相关的源码可以参考 xxxx

单机训练代码

.. code:: python

    def train():
        args = parse_args()
        if not os.path.isdir(args.model_output_dir):
            os.mkdir(args.model_output_dir)
    
        dense_input = fluid.layers.data(
               name="dense_input", shape=[dense_feature_dim], dtype='float32')
        sparse_input_ids = [
               fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype="int64")
               for i in range(1, 27)]
        label = fluid.layers.data(name='label', shape=[1], dtype='int64')

        loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(dense_input, sparse_input_ids, label,
                                                             args.embedding_size, args.sparse_feature_dim)

        optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
        optimizer.minimize(loss)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
        dataset = fluid.DatasetFactory().create_dataset()
        dataset.set_use_var([dense_input] + sparse_input_ids + [label])
        pipe_command = "python criteo_reader.py %d" % args.sparse_feature_dim
        dataset.set_pipe_command(pipe_command)
        dataset.set_batch_size(100)
        thread_num = 10
        dataset.set_thread(thread_num)
        whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))]
        
        epochs = 20
        for i in range(epochs):
            dataset.set_filelist(whole_filelist[:int(0.8*len(whole_filelist))])
            exe.train_from_dataset(program=fluid.default_main_program(),
                                   dataset=dataset,
                                   fetch_list=[auc_var],
                                   fetch_info=["auc"],
                                   debug=False)
            model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model"
            sys.stderr.write("epoch%d finished" % (i + 1))
            fluid.io.save_inference_model(model_dir, [dense_input.name] + [x.name for x in sparse_input_ids] + [label.name],                                          [loss, auc_var], exe)

使用FleetAPI进行训练的代码

.. code:: python

    import paddle.fluid.incubate.fleet.base.role_maker as role_maker
    from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet

    def train():
        args = parse_args()
        if not os.path.isdir(args.model_output_dir):
            os.mkdir(args.model_output_dir)
    
        dense_input = fluid.layers.data(
               name="dense_input", shape=[dense_feature_dim], dtype='float32')
        sparse_input_ids = [
               fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype="int64")
               for i in range(1, 27)]
        label = fluid.layers.data(name='label', shape=[1], dtype='int64')

        loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(dense_input, sparse_input_ids, label,
                                                             args.embedding_size, args.sparse_feature_dim)

        role = role_maker.PaddleCloudRoleMaker()
        exe = fluid.Executor(fluid.CPUPlace())
        fleet.init(role)

        optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
        strategy = DistributeTranspilerConfig()
        strategy.sync_mode = False
        optimizer = fleet.distributed_optimizer(optimizer, strategy)
        optimizer.minimize(loss)

        if fleet.is_server():
            fleet.init_server()
            fleet.run_server()
        elif fleet.is_worker():
            fleet.init_worker()
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(fluid.default_startup_program())
            dataset = fluid.DatasetFactory().create_dataset()
            dataset.set_use_var([dense_input] + sparse_input_ids + [label])
            pipe_command = "python criteo_reader.py %d" % args.sparse_feature_dim
            dataset.set_pipe_command(pipe_command)
            dataset.set_batch_size(100)
            thread_num = 10
            dataset.set_thread(thread_num)
            whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))]
            
            epochs = 20
            for i in range(epochs):
                dataset.set_filelist(whole_filelist[:int(0.8*len(whole_filelist))])
                exe.train_from_dataset(program=fluid.default_main_program(),
                                       dataset=dataset,
                                       fetch_list=[auc_var],
                                       fetch_info=["auc"],
                                       debug=False)
                
                if fleet.worker_index() == 0:
                    model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model"
                    sys.stderr.write("epoch%d finished" % (i + 1))
                    fluid.io.save_inference_model(model_dir, 
                        [dense_input.name] + [x.name for x in sparse_input_ids] + [label.name], [loss, auc_var], exe)
                
                
启动命令

.. code:: python
    
    python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 dist_train.py

运行日志

如何进行多机分布式训练
请参考百度云运行分布式任务的示例



Y
yuyang18 已提交
132 133 134 135 136 137 138 139 140
分布式训练快速开始
==================

准备工作
--------

在本篇文章中,我们将会在介绍如何快速在一个集群中启动一个 PaddlePaddle
的分布式训练任务,在开始之前,请按如下步骤做些准备工作:

Y
Yancey1989 已提交
141 142
1. 准备一个网络连通的训练集群,在本文中我们使用4个训练节点使用 ``*.paddlepaddle.com``
   来表示节点的主机名称,您可以根据实际情况修改它。
Y
yuyang18 已提交
143

Y
Yancey1989 已提交
144
2. 在开始之前确保已经阅读过 :ref:`install_steps`
Y
yuyang18 已提交
145 146 147
   并且可以在集群的所有节点上可以正常运行 PaddlePaddle。

样例代码
Y
Yancey1989 已提交
148
-------
Y
yuyang18 已提交
149

150 151
下面使用一个非常简单的线性回归模型作为样例来解释如何启动一个包含2个 ``PSERVER`` 节点以及
2个 ``TRAINER`` 节点的分布式训练任务,您可以将本段代码保存为 ``dist_train.py`` 运行。
Y
yuyang18 已提交
152 153 154

.. code:: python

Y
Yancey1989 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
    import os
    import paddle
    import paddle.fluid as fluid

    # train reader
    BATCH_SIZE = 20
    EPOCH_NUM = 30
    BATCH_SIZE = 8

    train_reader = paddle.batch(
        paddle.reader.shuffle(
            paddle.dataset.uci_housing.train(), buf_size=500),
        batch_size=BATCH_SIZE)

    def train():
        y = fluid.layers.data(name='y', shape=[1], dtype='float32')
        x = fluid.layers.data(name='x', shape=[13], dtype='float32')
        y_predict = fluid.layers.fc(input=x, size=1, act=None)

        loss = fluid.layers.square_error_cost(input=y_predict, label=y)
        avg_loss = fluid.layers.mean(loss)
        opt = fluid.optimizer.SGD(learning_rate=0.001)
        opt.minimize(avg_loss)

        place = fluid.CPUPlace()
        feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
        exe = fluid.Executor(place)

        # fetch distributed training environment setting
        training_role = os.getenv("PADDLE_TRAINING_ROLE", None)
        port = os.getenv("PADDLE_PSERVER_PORT", "6174")
        pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
        trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
        eplist = []
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
        pserver_endpoints = ",".join(eplist)
        trainers = int(os.getenv("PADDLE_TRAINERS"))
        current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port

        t = fluid.DistributeTranspiler()
        t.transpile(
            trainer_id = trainer_id,
            pservers = pserver_endpoints,
            trainers = trainers)

        if training_role == "PSERVER":
            pserver_prog = t.get_pserver_program(current_endpoint)
            startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
            exe.run(startup_prog)
            exe.run(pserver_prog)
        elif training_role == "TRAINER":
            trainer_prog = t.get_trainer_program()
            exe.run(fluid.default_startup_program())

            for epoch in range(EPOCH_NUM):
                for batch_id, batch_data in enumerate(train_reader()):
                    avg_loss_value, = exe.run(trainer_prog,
                                          feed=feeder.feed(batch_data),
                                          fetch_list=[avg_loss])
                    if (batch_id + 1) % 10 == 0:
                        print("Epoch: {0}, Batch: {1}, loss: {2}".format(
                            epoch, batch_id, avg_loss_value[0]))
            # destory the resource of current trainer node in pserver server node
            exe.close()
        else:
            raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")

    train()

环境变量说明
-----------

在启动分布式训练任务时,使用不同的环境变量来表示不同的节点角色,具体如下:
Y
yuyang18 已提交
229

Y
Yancey1989 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243
.. list-table::
  :header-rows: 1

  * - 环境变量
    - 数据类型
    - 样例
    - 描述
  * - :code:`PADDLE_TRAINING_ROLE`
    - str
    - :code:`PSERVER,TRAINER`
    - 当前训练节点角色
  * - :code:`PADDLE_PSERVER_IPS`
    - str
    - :code:`ps0.paddlepaddle.com,ps1.paddlepaddle.com`
244
    - 分布式训练任务中所有 PSERVER 节点的 IP 地址或 hostname, 使用","分隔
Y
Yancey1989 已提交
245 246 247
  * - :code:`PADDLE_PSERVER_PORT`
    - int
    - 6174
248
    - PSERVER 进程监听的端口
Y
Yancey1989 已提交
249 250 251 252 253 254
  * - :code:`PADDLE_TRAINERS`
    - int
    - 2
    - 分布式训练任务中 trainer 节点的数量
  * - :code:`PADDLE_CURRENT_IP`
    - str
Y
Yancey1989 已提交
255
    - :code:`ps0.paddlepaddle.com`
256
    - 当前 PSERVER 节点的 IP 地址或 hostname
Y
Yancey1989 已提交
257 258 259
  * - :code:`PADDLE_TRAINER_ID`
    - str 
    - 0
260
    - 当前 TRAINER 节点的 ID (唯一), 取值范围为 [0, PADDLE_TRAINERS)
Y
Yancey1989 已提交
261

Y
Yancey1989 已提交
262
注: 环境变量只是获取运行时信息的一种方式,实际任务中可以采用命令行参数等方式获取运行时信息。
Y
Yancey1989 已提交
263 264 265 266 267 268 269

分布式训练相关 API
------------------

DistributeTranspiler
~~~~~~~~~~~~~~~~~~~~~~

270
基于 pserver-trainer 架构的的分布式训练任务分为两种角色: Parameter Server(PSERVER) 以及 TRAINER, 
Y
Yancey1989 已提交
271
在 Fluid 中,用户只需配置单机训练所需要的网络配置, ``DistributeTranspiler`` 模块会自动地根据
272
当前训练节点的角色将用户配置的单机网路配置改写成 PSERVER 和 TRAINER 需要运行的网络配置:
Y
yuyang18 已提交
273

Y
Yancey1989 已提交
274
.. code:: python
Y
yuyang18 已提交
275

Y
Yancey1989 已提交
276 277 278 279 280 281
    t = fluid.DistributeTranspiler()
    t.transpile(
        trainer_id = trainer_id,                   
        pservers = pserver_endpoints,    
        trainers = trainers)
    if PADDLE_TRAINING_ROLE == "TRAINER":
H
Hao Wang 已提交
282
        # fetch the trainer program and execute it
Y
Yancey1989 已提交
283 284
        trainer_prog = t.get_trainer_program()
        ...
Y
yuyang18 已提交
285

Y
Yancey1989 已提交
286
    elif PADDLE_TRAINER_ROLE == "PSERVER":
H
Hao Wang 已提交
287
        # fetch the pserver program and execute it
Y
Yancey1989 已提交
288 289
        pserver_prog = t.get_pserver_program(current_endpoint) 
        ...
Y
yuyang18 已提交
290

Y
Yancey1989 已提交
291 292
exe.close()
~~~~~~~~~~~~~~
Y
yuyang18 已提交
293

294 295
PSERVER 节点中会保存所有 TRAINER 节点的状态信息,在 TRAINER 结束训练时需要调用 ``exe.close()``
通知所有 PSERVER 节点释放当前 TRAINER 节点的资源:
Y
yuyang18 已提交
296

Y
Yancey1989 已提交
297
.. code:: python
Y
yuyang18 已提交
298

Y
Yancey1989 已提交
299 300 301
    exe = fluid.Executor(fluid.CPUPlace())
    # training process ...
    exe.close() # notify PServer to destory the resource
Y
yuyang18 已提交
302

303 304 305
注意:所有的trainer在退出时都需要调用exe.close()。


Y
Yancey1989 已提交
306 307
启动分布式训练任务
--------------------
Y
yuyang18 已提交
308 309 310 311 312 313 314 315 316

.. list-table::
   :header-rows: 1

   * - 启动节点
     - 启动命令
     - 说明
   * - ps0.paddlepaddle.com
     - :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps0.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
317
     - 启动 PSERVER 节点
Y
yuyang18 已提交
318 319
   * - ps1.paddlepaddle.com
     - :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps1.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
320
     - 启动 PSERVER 节点
Y
yuyang18 已提交
321
   * - trainer0.paddlepaddle.com
Y
Yancey1989 已提交
322
     - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=0 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
323
     - 启动第0号 TRAINER 节点
Y
yuyang18 已提交
324
   * - trainer1.paddlepaddle.com
Y
Yancey1989 已提交
325
     - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=1 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
326
     - 启动第1号 TRAINER 节点