diff --git a/doc/fluid/advanced_guide/distributed_training/cluster_quick_start.rst b/doc/fluid/advanced_guide/distributed_training/cluster_quick_start.rst index 5509d34403aedd8fd92dd3978fed10b723073d0a..1988aee0ae578f584b723bdf38010945b264320d 100644 --- a/doc/fluid/advanced_guide/distributed_training/cluster_quick_start.rst +++ b/doc/fluid/advanced_guide/distributed_training/cluster_quick_start.rst @@ -14,7 +14,7 @@ * - [x] 成功安装Paddle Fluid,如果尚未安装,请参考 `快速开始 `_ + [x] 成功安装Paddle Fluid,如果尚未安装,请参考 `快速开始 `_ * [x] 学会最基本的单机训练方法,请参考 `单机训练 `_ 中描述的单卡训练,进行学习 @@ -113,7 +113,7 @@ main_function(args.is_local) -* 说明:示例中使用的IO方法是dataset,想了解具体的文档和用法请参考 `Dataset API `_ 。示例中使用的 ``train_from_dataset`` 接口,想了解具体的文档和使用方法请参考 `Executor API `_ 。示例中的 ``from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet`` 表示引入参数服务器架构进行分布式训练,如果想更进一步了解Fleet API的更多选项和示例,请参考 `Fleet API `_ +* 说明:示例中使用的IO方法是dataset,想了解具体的文档和用法请参考 `Dataset API `_ 。示例中使用的 ``train_from_dataset`` 接口,想了解具体的文档和使用方法请参考 `Executor API `_ 。示例中的 ``from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet`` 表示引入参数服务器架构进行分布式训练,如果想更进一步了解Fleet API的更多选项和示例,请参考 `Fleet API `_ 单机训练启动命令 diff --git a/doc/fluid/advanced_guide/distributed_training/cluster_quick_start_en.rst b/doc/fluid/advanced_guide/distributed_training/cluster_quick_start_en.rst index ad9868e38bf4c3f4b953749db45064436c972661..ff8ea39c02200f8397c9d3bd9454fd6d01214f51 100644 --- a/doc/fluid/advanced_guide/distributed_training/cluster_quick_start_en.rst +++ b/doc/fluid/advanced_guide/distributed_training/cluster_quick_start_en.rst @@ -1,193 +1,159 @@ -.. _cluster_quick_start_en: +Quick start for distributed training +==================================== -Quick Start with Distributed Training -========================== +Distributed training with Fleet API +----------------------------------- -Preparation --------------------- -In this article, we'll show you how to quickly start a PaddlePaddle distributed training task in a cluster. Before you start, do some preparatory work as follows: - -1. Prepare a connected training cluster. Here we use 4 training nodes with format ``*.paddlepaddle.com`` to represent the host name of the node. You can modify it according to the actual situation. - -2. Make sure you have read :ref:`install_steps` before you start and can run PaddlePaddle on all nodes of the cluster. +Since Paddle Fluid `Release +1.5.1 `__, +it is officially recommended to use the Fleet API for distributed +training. For the introduction of the Fleet API, please refer to `Fleet +Design Doc `__. -Example code -------------- - -Let's use a very simple linear regression model as an example to explain how to start a distributed training task with 2 pserver server nodes and 2 trainer nodes. You can save this code as ``dist_train.py`` . +Preparation +~~~~~~~~~~~ + +- [x] Install Paddle Fluid. If not already installed, please refer to + `Beginner’s + Guide `__. +- [x] Master the most basic single node training method. Please refer + to the single card training described in `Single-node + training `__. + +Click-through rate prediction +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Here, we will use a simple example, click-through rate prediction task, +to illustrate how to configure Fleet API for distributed training, and +gives an example by using a single node environment to simulate the +distributed environment. The source code of the example comes from `CTR +with +Fleet `__. + +In order to facilitate learning, the example given here is a mixed code +of single node and multi node. You can start single node or multi node +tasks through different startup commands. For the part of obtaining data +and the logic of data preprocessing, please refer to the source code and +description of `CTR with +Fleet `__. .. code:: python - + from __future__ import print_function + from args import parse_args import os - import paddle import paddle.fluid as fluid - - # train reader - BATCH_SIZE = 20 - EPOCH_NUM = 30 - BATCH_SIZE = 8 - - train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.uci_housing.train(), buf_size=500), - batch_size=BATCH_SIZE) - - def train(): - y = fluid.layers.data(name='y', shape=[1], dtype='float32') - x = fluid.layers.data(name='x', shape=[13], dtype='float32') - y_predict = fluid.layers.fc(input=x, size=1, act=None) - - loss = fluid.layers.square_error_cost(input=y_predict, label=y) - avg_loss = fluid.layers.mean(loss) - opt = fluid.optimizer.SGD(learning_rate=0.001) - opt.minimize(avg_loss) - - place = fluid.CPUPlace() - feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) - exe = fluid.Executor(place) - - # fetch distributed training environment setting - training_role = os.getenv("PADDLE_TRAINING_ROLE", None) - port = os.getenv("PADDLE_PSERVER_PORT", "6174") - pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "") - trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - eplist = [] - for ip in pserver_ips.split(","): - eplist.append(':'.join([ip, port])) - pserver_endpoints = ",".join(eplist) - trainers = int(os.getenv("PADDLE_TRAINERS")) - current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port - - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id = trainer_id, - pservers = pserver_endpoints, - trainers = trainers) - - if training_role == "PSERVER": - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(startup_prog) - exe.run(pserver_prog) - elif training_role == "TRAINER": - trainer_prog = t.get_trainer_program() + import sys + from network_conf import ctr_dnn_model_dataset + import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig + + dense_feature_dim = 13 + sparse_feature_dim = 10000001 + batch_size = 100 + thread_num = 10 + embedding_size = 10 + args = parse_args() + + def main_function(is_local): + # common code for local training and distributed training + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + + sparse_input_ids = [ + fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, + dtype="int64") for i in range(1, 27)] + + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var([dense_input] + sparse_input_ids + [label]) + pipe_command = "python criteo_reader.py %d" % sparse_feature_dim + dataset.set_pipe_command(pipe_command) + dataset.set_batch_size(batch_size) + dataset.set_thread(thread_num) + + whole_filelist = ["raw_data/part-%d" % x + for x in range(len(os.listdir("raw_data")))] + + dataset.set_filelist(whole_filelist) + loss, auc_var, batch_auc_var = ctr_dnn_model_dataset( + dense_input, sparse_input_ids, label, embedding_size, + sparse_feature_dim) + + exe = fluid.Executor(fluid.CPUPlace()) + def train_loop(epoch=20): + for i in range(epoch): + exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=[auc_var], + fetch_info=["auc"], + debug=False) + # local training + def local_train(): + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + optimizer.minimize(loss) exe.run(fluid.default_startup_program()) - - for epoch in range(EPOCH_NUM): - for batch_id, batch_data in enumerate(train_reader()): - avg_loss_value, = exe.run(trainer_prog, - feed=feeder.feed(batch_data), - fetch_list=[avg_loss]) - if (batch_id + 1) % 10 == 0: - print("Epoch: {0}, Batch: {1}, loss: {2}".format( - epoch, batch_id, avg_loss_value[0])) - # destory the resource of current trainer node in pserver server node - exe.close() + train_loop() + + # distributed training + def dist_train(): + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + strategy = DistributeTranspilerConfig() + strategy.sync_mode = False + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(loss) + + if fleet.is_server(): + fleet.init_server() + fleet.run_server() + elif fleet.is_worker(): + fleet.init_worker() + exe.run(fluid.default_startup_program()) + train_loop() + if is_local: + local_train() else: - raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]") - - train() - - -Environment Variables ------------------------------------- - -When starting a distributed training task, different environment variables are used to represent different node roles, details as follows: - -.. list-table:: - :header-rows: 1 - - * - Environment Variable - - Data Type - - Example - - Description - * - :code:`PADDLE_TRAINING_ROLE` - - str - - :code:`PSERVER,TRANERR` - - role of current training node - * - :code:`PADDLE_PSERVER_IPS` - - str - - :code:`ps0.paddlepaddle.com, ps1.paddlepaddle.com` - - The IP addresses or hostnames of all pserver nodes in the distributed training task, separated by "," - * - :code:`PADDLE_PSERVER_PORT` - - int - - 6174 - - port that the pserver process listens to - * - :code:`PADDLE_TRAINERS` - - int - - 2 - - Number of trainer nodes in a distributed training task - * - :code:`PADDLE_CURRENT_IP` - - str - - :code:`ps0.paddlepaddle.com` - - IP address or hostname of the current pserver node - * - :code:`PADDLE_TRAINER_ID` - - str - - 0 - - ID of the current trainer node (unique), in the range of [0, PADDLE_TRAINERS) - -**Note:** Environment variables are just a way to get runtime information. In practical tasks, you can use command line parameters to obtain runtime information. - -API related to Distributed Training ---------------------------------- - -DistributeTranspiler -~~~~~~~~~~~~~~~~~~~~~~ - -The machines in distributed training tasks based on the pserver-trainer architecture are divided into two roles: Parameter Server (pserver) and trainer. In Fluid, users only need to configure the network configuration required for single node training. The ``DistributeTranspiler`` module automatically modifies the single-node network settings into settings on which pserver and trainer needs to run based on the role of current training node: + dist_train() -.. code:: python + if __name__ == '__main__': + main_function(args.is_local) - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id = trainer_id, - pservers = pserver_endpoints, - trainers = trainers) - if PADDLE_TRAINING_ROLE == "TRAINER": - # fetch the trainer program and execute it - trainer_prog = t.get_trainer_program() - ... +- Note: The IO method used in this example is dataset, please refer to + `Dataset + API `__ + for specific documents and usage. For the ``train_from_dataset`` + interface, please refer to `Executor + API `__. + ``from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet`` + in this example means to introduce parameter server architecture for + distributed training, which you can refer to `Fleet + API `__ + for getting more about the options and examples of Fleet API. - elif PADDLE_TRAINER_ROLE == "PSERVER": - # fetch the pserver program and execute it - pserver_prog = t.get_pserver_program(current_endpoint) - ... +Start command of single node training +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. code:: bash -Exe.close() -~~~~~~~~~~~~~~ + python train.py --is_local 1 +Start command of single machine simulation distributed training +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The status information of all trainer nodes is saved in the pserver node. When trainer finishes training, ``exe.close()`` should be called to notify all PServer nodes to release the resources of the current Trainer nodes: +Here we use launch\_ps, a built-in launcher of paddle, which users can +specify the number of workers and servers to start the parameter server +tasks. -.. code:: python +.. code:: bash + + python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 train.py - exe = fluid.Executor(fluid.CPUPlace()) - # training process ... - exe.close() # notify PServer to destory the resource - -Note: every trainer needs to call exe.close() when the trainer finishes. - -Start a Distributed Training Task ----------------------------------- - -.. list-table:: - :header-rows: 1 - - - * - Start Node - - Start Command - - Description - * - ps0.paddlepaddle.com - - :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps0.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com, ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py` - - Start pserver node - * - ps1.paddlepaddle.com - - :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps1.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com, ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py` - - Start pserver node - * - trainer0.paddlepaddle.com - - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com, ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=0 PADDLE_PSERVER_PORT=6174 python fluid_dist.py` - - Start the number 0 Trainer Node - * - trainer1.paddlepaddle.com - - :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com, ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=1 PADDLE_PSERVER_PORT=6174 python fluid_dist.py` - - Start the number 1 trainer node +The task running log can be viewed in the logs directory of the working +directory. When you can use a single machine to simulate distributed +training, you can perform true multi node distributed training. We +recommend that users refer directly to +`百度云运行分布式任务的示例 `__.