From be6cf7ed5107e8e2c8b5c3c5a9d03bf13b6bd560 Mon Sep 17 00:00:00 2001 From: Dong Daxiang <35550832+guru4elephant@users.noreply.github.com> Date: Fri, 26 Jul 2019 15:19:19 +0800 Subject: [PATCH] Update cluster_quick_start.rst --- .../howto/training/cluster_quick_start.rst | 168 ++++++++---------- 1 file changed, 74 insertions(+), 94 deletions(-) diff --git a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst index 236c81e6b..0d0296856 100644 --- a/doc/fluid/user_guides/howto/training/cluster_quick_start.rst +++ b/doc/fluid/user_guides/howto/training/cluster_quick_start.rst @@ -7,120 +7,100 @@ 为了让读者快速上手,我们采用点击率预估任务作为示例,相关的源码可以参考 xxxx -单机训练代码 +为了方便学习,这里给出的示例是单机与多机混合的代码,用户可以通过不同的启动命令进行单机或多机任务的启动。 .. code:: python - def train(): - args = parse_args() - if not os.path.isdir(args.model_output_dir): - os.mkdir(args.model_output_dir) - - dense_input = fluid.layers.data( - name="dense_input", shape=[dense_feature_dim], dtype='float32') - sparse_input_ids = [ - fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype="int64") - for i in range(1, 27)] - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(dense_input, sparse_input_ids, label, - args.embedding_size, args.sparse_feature_dim) - - optimizer = fluid.optimizer.SGD(learning_rate=1e-4) - optimizer.minimize(loss) - - exe = fluid.Executor(fluid.CPUPlace()) - exe.run(fluid.default_startup_program()) - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_use_var([dense_input] + sparse_input_ids + [label]) - pipe_command = "python criteo_reader.py %d" % args.sparse_feature_dim - dataset.set_pipe_command(pipe_command) - dataset.set_batch_size(100) - thread_num = 10 - dataset.set_thread(thread_num) - whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))] - - epochs = 20 - for i in range(epochs): - dataset.set_filelist(whole_filelist[:int(0.8*len(whole_filelist))]) - exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=[auc_var], - fetch_info=["auc"], - debug=False) - model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model" - sys.stderr.write("epoch%d finished" % (i + 1)) - fluid.io.save_inference_model(model_dir, [dense_input.name] + [x.name for x in sparse_input_ids] + [label.name], [loss, auc_var], exe) - -使用FleetAPI进行训练的代码 - -.. code:: python + from __future__ import print_function + from args import parse_args + import os + import paddle.fluid as fluid + import sys + from network_conf import ctr_dnn_model_dataset import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig - def train(): - args = parse_args() - if not os.path.isdir(args.model_output_dir): - os.mkdir(args.model_output_dir) - + dense_feature_dim = 13 + sparse_feature_dim = 10000001 + batch_size = 100 + thread_num = 10 + embedding_size = 10 + + args = parse_args() + + def main_function(is_local): dense_input = fluid.layers.data( - name="dense_input", shape=[dense_feature_dim], dtype='float32') + name="dense_input", shape=[dense_feature_dim], dtype='float32') sparse_input_ids = [ - fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype="int64") - for i in range(1, 27)] - label = fluid.layers.data(name='label', shape=[1], dtype='int64') + fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, + dtype="int64") for i in range(1, 27)] + label = fluid.layers.data(name="label", shape=[1], dtype="int64") - loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(dense_input, sparse_input_ids, label, - args.embedding_size, args.sparse_feature_dim) + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var([dense_input] + sparse_input_ids + [label]) + pipe_command = "python criteo_reader.py %d" % sparse_feature_dim + dataset.set_pipe_command(pipe_command) + dataset.set_batch_size(batch_size) + dataset.set_thread(thread_num) + whole_filelist = ["raw_data/part-%d" % x + for x in range(len(os.listdir("raw_data")))] + dataset.set_filelist(whole_filelist) + loss, auc_var, batch_auc_var = ctr_dnn_model_dataset( + dense_input, sparse_input_ids, label, embedding_size, + sparse_feature_dim) - role = role_maker.PaddleCloudRoleMaker() exe = fluid.Executor(fluid.CPUPlace()) - fleet.init(role) - - optimizer = fluid.optimizer.SGD(learning_rate=1e-4) - strategy = DistributeTranspilerConfig() - strategy.sync_mode = False - optimizer = fleet.distributed_optimizer(optimizer, strategy) - optimizer.minimize(loss) - - if fleet.is_server(): - fleet.init_server() - fleet.run_server() - elif fleet.is_worker(): - fleet.init_worker() - exe = fluid.Executor(fluid.CPUPlace()) - exe.run(fluid.default_startup_program()) - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_use_var([dense_input] + sparse_input_ids + [label]) - pipe_command = "python criteo_reader.py %d" % args.sparse_feature_dim - dataset.set_pipe_command(pipe_command) - dataset.set_batch_size(100) - thread_num = 10 - dataset.set_thread(thread_num) - whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))] - - epochs = 20 - for i in range(epochs): - dataset.set_filelist(whole_filelist[:int(0.8*len(whole_filelist))]) + + def train_loop(epoch=20): + for i in range(epoch): exe.train_from_dataset(program=fluid.default_main_program(), dataset=dataset, fetch_list=[auc_var], fetch_info=["auc"], debug=False) - - if fleet.worker_index() == 0: - model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model" - sys.stderr.write("epoch%d finished" % (i + 1)) - fluid.io.save_inference_model(model_dir, - [dense_input.name] + [x.name for x in sparse_input_ids] + [label.name], [loss, auc_var], exe) - - -启动命令 + def local_train(optimizer): + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + optimizer.minimize(loss) + exe.run(fluid.default_startup_program()) + train_loop() + + + def dist_train(optimizer): + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + strategy = DistributeTranspilerConfig() + strategy.sync_mode = False + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(loss) + if fleet.is_server(): + fleet.init_server() + fleet.run_server() + elif fleet.is_worker(): + fleet.init_worker() + exe.run(fluid.default_startup_program()) + train_loop() + + if is_local: + local_train(optimizer) + else: + dist_train(optimizer) + + if __name__ == '__main__': + main_function(args.is_local) + +单机训练启动命令 +.. code:: python + + python train.py --is_local 1 + +在单机模拟多机训练的启动命令,这里我们用到了paddle内置的一个启动器launch_ps,用户可以指定worker和server的数量进行参数服务器任务的启动 .. code:: python - python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 dist_train.py + python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 train.py 运行日志 -- GitLab