diff --git a/PaddleRec/ctr/dnn/README.md b/PaddleRec/ctr/dnn/README.md index 5f57b5907d03ed821e0ea2005f03b0797eccfc49..f0ece05cd8908c910f6f58658ed9315939bb56dd 100644 --- a/PaddleRec/ctr/dnn/README.md +++ b/PaddleRec/ctr/dnn/README.md @@ -59,8 +59,8 @@ **示例训练代码仅支持在Linux环境下运行** - Win/Mac 暂不支持dataset数据读取方式 - Win/Mac 可以使用其他数据读取方式改写本示例代码并运行(参照`infer.py`) -- 请确保您的运行环境基于Linux,示例代码支持`unbuntu`及`CentOS` -- 运行环境中python版本高于`2.7` +- 目前仅支持Linux,如:`unbuntu`及`CentOS` +- 目前仅支持python版本`2.7` - 请确保您的paddle版本高于`1.6.1`,可以利用pip升级您的paddle版本 - 请确保您的本地模拟分布式运行环境中没有设置`http/https`代理,可以在终端键入`env`查看环境变量 @@ -461,8 +461,6 @@ elif fleet.is_worker(): return train_result ``` -了解了以上区别,便可以非常顺利的将单机模型升级为分布式训练模型。 - ### 区别五 启动训练 #### 运行单机训练 @@ -493,7 +491,7 @@ Epoch 0 auc auc_0.tmp_0 lod: {} #### 运行分布式训练(本地模拟分布式) 如果暂时没有集群环境,或者想要快速调试代码,可以通过本地多进程模拟分布式来运行分布式训练的代码。 -运行`local_cluster.sh`脚本可以一键启动本地模拟分布式训练。 +运行`local_cluster.sh`脚本可以一键启动本地模拟分布式训练。为了保证重复运行时符合预期,`该脚本会在运行前Kill掉其他python进程`,请注意该风险,也可以自行注释掉脚本中杀python进程相关的语句。 ```bash # 根据自己的运行环境,选择sh或bash sh local_cluster.sh @@ -565,7 +563,7 @@ I1126 07:38:28.947571 14715 communicator.cc:363] Communicator stop done ### 区别七 增量训练 #### 单机增量训练 -单机训练的基本顺序是:`构建网络->初始化参数->加载数据A->开始训练`。而增量训练的基本顺序是:`构建网络->加载已有参数->加载数据B->开始训练`。因此需要增加加载参数的逻辑,在示例代码`infer.py`中有相关操作: +单机训练的基本顺序是:`构建网络->初始化参数->加载数据A->开始训练`。而增量训练的基本顺序是:`构建网络->加载已有参数->加载数据B->开始训练`。因此需要增加加载参数的逻辑,在示例代码`infer.py`中有相关加载参数的操作: ```python # 构建网络 inputs = ctr_model.input_data(params) @@ -574,7 +572,11 @@ loss, auc_var = ctr_model.net(inputs, params) exe = fluid.Executor(fluid.CPUPlace()) # 不使用 exe.run(fluid.defalut_startup_program()) # 加载已有参数到内存中,使用fluid.load接口(因为本示例使用fluid.save保存模型) -fluid.load(fluid.default_main_program(), model_path + "/checkpoint", exe) +# model_path为模型保存的地址,如model_path = "./models/epoch_0" +fluid.load(fluid.default_main_program(), os.path.join(model_path, "checkpoint"), exe) +dataset, file_list = get_dataset(inputs, args) + +# 下同单机训练流程 ``` #### 分布式增量训练 @@ -583,12 +585,13 @@ Paddle的分布式增量训练也十分易用,代码与上述分布式训练 # 增量训练 if fleet.is_server(): # 初始化参数服务器节点时,传入模型保存的地址 + # 不要混用fluid.save和fleet.save_persistables保存的模型 fleet.init_server(model_path) # 运行参数服务器节点 fleet.run_server() elif fleet.is_worker(): # 训练节点的代码无需更改 - # 在运行fleet.startup_program时,训练节点会从pserver上拉取最新参数 + # 在运行fleet.startup_program时,训练节点会自动从pserver上拉取最新参数 ``` # @@ -652,12 +655,12 @@ def set_zero(): 为了快速验证,我们仅取用测试数据集的一个part文件,进行测试。在代码目录下,键入以下命令,进行预测: - 对单机训练的模型进行预测 ```python -python -u infer.py --is_local=1 &> test.log & +python -u infer.py --is_local=1 --infer_epoch=0 &> test.log & ``` - 对分布式训练的模型进行预测 ```python -python -u infer.py --is_cloud=1 &> test.log & +python -u infer.py --is_cloud=1 --infer_epoch=0 &> test.log & ``` 测试结果的日志位于`test.log`,仅训练一个epoch后,在`part-220`上的的理想测试结果为: ```bash @@ -672,6 +675,6 @@ open file success 因为快速验证的训练数据与测试数据极少,同时只训练了一轮,所以远远没有达到收敛,且初始化带有随机性,在您的环境下出现测试结果与示例输出不一致是正常情况。 ### benchmark -全量数据的训练与预测,请修改对应`train.py`与`infer.py`中对应的`train_files_path`与`test_files_path`超参数,分别修改为`./train_data_full`与`./test_data_full`。在全量数据中训练三轮后,加载epoch_2的模型,预期测试AUC可以达到0.794。 +全量数据的训练与预测,请修改对应`train.py`与`infer.py`中对应的`train_files_path`与`test_files_path`超参数,分别修改为`./train_data_full`与`./test_data_full`。在全量数据中训练三轮后,加载epoch_2的模型,`auc=0.79395`。 分布式benchmark相关代码及复现方式见[Fleet Repo](https://github.com/PaddlePaddle/Fleet.git),路径为Fleet/benchmark/ps/distribute_ctr/paddle/。 \ No newline at end of file diff --git a/PaddleRec/ctr/dnn/infer.py b/PaddleRec/ctr/dnn/infer.py index 00d06b1f8975205353c5c9899661d6a84eb3adbf..11bbdbaddb61b75d02e44639494692768ca29766 100644 --- a/PaddleRec/ctr/dnn/infer.py +++ b/PaddleRec/ctr/dnn/infer.py @@ -15,6 +15,7 @@ from __future__ import print_function import os import time +import six import numpy as np import logging import argparse @@ -32,11 +33,6 @@ def parse_args(): parser = argparse.ArgumentParser( description="PaddlePaddle CTR-DNN example") # -------------Data & Model Path------------- - parser.add_argument( - '--train_files_path', - type=str, - default='./train_data', - help="The path of training dataset") parser.add_argument( '--test_files_path', type=str, @@ -48,23 +44,18 @@ def parse_args(): default='models', help='The path for model to store (default: models)') - # -------------Training parameter------------- - parser.add_argument( - '--learning_rate', - type=float, - default=1e-4, - help="Initial learning rate for training") + # -------------Running parameter------------- parser.add_argument( '--batch_size', type=int, default=1000, help="The size of mini-batch (default:1000)") parser.add_argument( - "--epochs", + '--infer_epoch', type=int, - default=1, - help="Number of epochs for training.") - + default=0, + help='Specify which epoch to run infer' + ) # -------------Network parameter------------- parser.add_argument( '--embedding_size', @@ -93,30 +84,25 @@ def parse_args(): type=int, default=0, help='Local train or distributed train on paddlecloud (default: 0)') - parser.add_argument( - '--save_model', - type=int, - default=0, - help='Save training model or not') - parser.add_argument( - '--enable_ce', - action='store_true', - help='If set, run the task with continuous evaluation logs.') - parser.add_argument( - '--cpu_num', - type=int, - default=2, - help='threads for ctr training') return parser.parse_args() +def print_arguments(args): + """ + print arguments + """ + logger.info('----------- Configuration Arguments -----------') + for arg, value in sorted(six.iteritems(vars(args))): + logger.info('%s: %s' % (arg, value)) + logger.info('------------------------------------------------') + + def run_infer(args, model_path): place = fluid.CPUPlace() train_generator = generator.CriteoDataset(args.sparse_feature_dim) file_list = [ - str(args.test_files_path) + "/%s" % x - for x in os.listdir(args.test_files_path) + os.path.join(args.test_files_path, x) for x in os.listdir(args.test_files_path) ] test_reader = paddle.batch(train_generator.test(file_list), batch_size=args.batch_size) @@ -150,7 +136,7 @@ def run_infer(args, model_path): main_program=fluid.default_main_program()) elif args.is_local: fluid.load(fluid.default_main_program(), - model_path + "/checkpoint", exe) + os.path.join(model_path, "checkpoint"), exe) set_zero() run_index = 0 @@ -171,7 +157,7 @@ def run_infer(args, model_path): infer_result = {} infer_result['loss'] = infer_loss infer_result['auc'] = infer_auc - log_path = model_path + '/infer_result.log' + log_path = os.path.join(model_path, 'infer_result.log') logger.info(str(infer_result)) with open(log_path, 'w+') as f: f.write(str(infer_result)) @@ -181,12 +167,18 @@ def run_infer(args, model_path): if __name__ == "__main__": args = parse_args() + print_arguments(args) model_list = [] for _, dir, _ in os.walk(args.model_path): for model in dir: - if "epoch" in model: - path = "/".join([args.model_path, model]) + if "epoch" in model and args.infer_epoch == int(model.split('_')[-1]): + path = os.path.join(args.model_path, model) model_list.append(path) + + if len(model_list) == 0: + logger.info("There is no satisfactory model {} at path {}, please check your start command & env. ".format( + str("epoch_")+str(args.infer_epoch), args.model_path)) + for model in model_list: logger.info("Test model {}".format(model)) run_infer(args, model) diff --git a/PaddleRec/ctr/dnn/train.py b/PaddleRec/ctr/dnn/train.py index 3546aa1987f4d871691e561e775941073fe05751..19da71c85971ea7df456be6a46720bad5c7e9cfc 100644 --- a/PaddleRec/ctr/dnn/train.py +++ b/PaddleRec/ctr/dnn/train.py @@ -18,6 +18,7 @@ from __future__ import print_function import argparse import logging import os +import six import time import random import numpy as np @@ -119,6 +120,16 @@ def parse_args(): return parser.parse_args() +def print_arguments(args): + """ + print arguments + """ + logger.info('----------- Configuration Arguments -----------') + for arg, value in sorted(six.iteritems(vars(args))): + logger.info('%s: %s' % (arg, value)) + logger.info('------------------------------------------------') + + def get_dataset(inputs, args): dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) @@ -127,8 +138,7 @@ def get_dataset(inputs, args): thread_num = int(args.cpu_num) dataset.set_thread(thread_num) file_list = [ - str(args.train_files_path) + "/%s" % x - for x in os.listdir(args.train_files_path) + os.path.join(args.train_files_path, x) for x in os.listdir(args.train_files_path) ] # 请确保每一个训练节点都持有不同的训练文件 # 当我们用本地多进程模拟分布式时,每个进程需要拿到不同的文件 @@ -176,11 +186,12 @@ def local_train(args): ((epoch), end_time - start_time)) if args.save_model: - model_path = (str(args.model_path) + "/" + - "epoch_" + str(epoch) + "/") + model_path = os.path.join( + str(args.model_path), "epoch_" + str(epoch)) if not os.path.isdir(model_path): os.mkdir(model_path) - fluid.save(fluid.default_main_program(), model_path + "checkpoint") + fluid.save(fluid.default_main_program(), + os.path.join(model_path, "checkpoint")) logger.info("Train Success!") @@ -239,8 +250,8 @@ def distribute_train(args): # 默认使用0号节点保存模型 if args.save_model and fleet.is_first_worker(): - model_path = (str(args.model_path) + "/" + "epoch_" + - str(epoch)) + model_path = os.path.join(str(args.model_path), "epoch_" + + str(epoch)) fleet.save_persistables(executor=exe, dirname=model_path) fleet.stop_worker() @@ -252,7 +263,7 @@ def train(): if not os.path.isdir(args.model_path): os.mkdir(args.model_path) - + print_arguments(args) if args.is_cloud: logger.info("run cloud training") distribute_train(args)