diff --git a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst index a72d18fbeb9afcb337963ea51965deed25685129..236c81e6b527cdc0ca3a7f16b2cf97baaa4de13d 100644 --- a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst +++ b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst @@ -1,5 +1,134 @@ .. _cluster_quick_start: +从Paddle Fluid 1.5开始,我们推荐使用Fleet API进行分布式训练,关于Fleet API的介绍可以参考 :ref:`fleet_api` + +首先,我们假设读者已经学会单机训练,如果还没单机训练的经验,请参考 :ref:`single_training` +想了解分布式训练,我们可以从单机模拟分布式训练开始,在单台机器上启动多个进程代表多台机器,并进行分布式训练。 + +为了让读者快速上手,我们采用点击率预估任务作为示例,相关的源码可以参考 xxxx + +单机训练代码 + +.. code:: python + + def train(): + args = parse_args() + if not os.path.isdir(args.model_output_dir): + os.mkdir(args.model_output_dir) + + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + sparse_input_ids = [ + fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype="int64") + for i in range(1, 27)] + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(dense_input, sparse_input_ids, label, + args.embedding_size, args.sparse_feature_dim) + + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + optimizer.minimize(loss) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var([dense_input] + sparse_input_ids + [label]) + pipe_command = "python criteo_reader.py %d" % args.sparse_feature_dim + dataset.set_pipe_command(pipe_command) + dataset.set_batch_size(100) + thread_num = 10 + dataset.set_thread(thread_num) + whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))] + + epochs = 20 + for i in range(epochs): + dataset.set_filelist(whole_filelist[:int(0.8*len(whole_filelist))]) + exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=[auc_var], + fetch_info=["auc"], + debug=False) + model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model" + sys.stderr.write("epoch%d finished" % (i + 1)) + fluid.io.save_inference_model(model_dir, [dense_input.name] + [x.name for x in sparse_input_ids] + [label.name], [loss, auc_var], exe) + +使用FleetAPI进行训练的代码 + +.. code:: python + + import paddle.fluid.incubate.fleet.base.role_maker as role_maker + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + + def train(): + args = parse_args() + if not os.path.isdir(args.model_output_dir): + os.mkdir(args.model_output_dir) + + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + sparse_input_ids = [ + fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype="int64") + for i in range(1, 27)] + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(dense_input, sparse_input_ids, label, + args.embedding_size, args.sparse_feature_dim) + + role = role_maker.PaddleCloudRoleMaker() + exe = fluid.Executor(fluid.CPUPlace()) + fleet.init(role) + + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + strategy = DistributeTranspilerConfig() + strategy.sync_mode = False + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(loss) + + if fleet.is_server(): + fleet.init_server() + fleet.run_server() + elif fleet.is_worker(): + fleet.init_worker() + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var([dense_input] + sparse_input_ids + [label]) + pipe_command = "python criteo_reader.py %d" % args.sparse_feature_dim + dataset.set_pipe_command(pipe_command) + dataset.set_batch_size(100) + thread_num = 10 + dataset.set_thread(thread_num) + whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))] + + epochs = 20 + for i in range(epochs): + dataset.set_filelist(whole_filelist[:int(0.8*len(whole_filelist))]) + exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=[auc_var], + fetch_info=["auc"], + debug=False) + + if fleet.worker_index() == 0: + model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model" + sys.stderr.write("epoch%d finished" % (i + 1)) + fluid.io.save_inference_model(model_dir, + [dense_input.name] + [x.name for x in sparse_input_ids] + [label.name], [loss, auc_var], exe) + + +启动命令 + +.. code:: python + + python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 dist_train.py + +运行日志 + +如何进行多机分布式训练 +请参考百度云运行分布式任务的示例 + + + 分布式训练快速开始 ==================