未验证 提交 b9bd9ed0 编写于 作者: C Chengmo 提交者: GitHub

Fix ctr (#4318)

* Fix ctr/dnn for v/1.7
上级 e195772b
...@@ -59,8 +59,8 @@ ...@@ -59,8 +59,8 @@
**示例训练代码仅支持在Linux环境下运行** **示例训练代码仅支持在Linux环境下运行**
- Win/Mac 暂不支持dataset数据读取方式 - Win/Mac 暂不支持dataset数据读取方式
- Win/Mac 可以使用其他数据读取方式改写本示例代码并运行(参照`infer.py`) - Win/Mac 可以使用其他数据读取方式改写本示例代码并运行(参照`infer.py`)
- 请确保您的运行环境基于Linux,示例代码支持`unbuntu``CentOS` - 目前仅支持Linux,如:`unbuntu``CentOS`
- 运行环境中python版本高于`2.7` - 目前仅支持python版本`2.7`
- 请确保您的paddle版本高于`1.6.1`,可以利用pip升级您的paddle版本 - 请确保您的paddle版本高于`1.6.1`,可以利用pip升级您的paddle版本
- 请确保您的本地模拟分布式运行环境中没有设置`http/https`代理,可以在终端键入`env`查看环境变量 - 请确保您的本地模拟分布式运行环境中没有设置`http/https`代理,可以在终端键入`env`查看环境变量
...@@ -461,8 +461,6 @@ elif fleet.is_worker(): ...@@ -461,8 +461,6 @@ elif fleet.is_worker():
return train_result return train_result
``` ```
了解了以上区别,便可以非常顺利的将单机模型升级为分布式训练模型。
### 区别五 启动训练 ### 区别五 启动训练
#### 运行单机训练 #### 运行单机训练
...@@ -493,7 +491,7 @@ Epoch 0 auc auc_0.tmp_0 lod: {} ...@@ -493,7 +491,7 @@ Epoch 0 auc auc_0.tmp_0 lod: {}
#### 运行分布式训练(本地模拟分布式) #### 运行分布式训练(本地模拟分布式)
如果暂时没有集群环境,或者想要快速调试代码,可以通过本地多进程模拟分布式来运行分布式训练的代码。 如果暂时没有集群环境,或者想要快速调试代码,可以通过本地多进程模拟分布式来运行分布式训练的代码。
运行`local_cluster.sh`脚本可以一键启动本地模拟分布式训练。 运行`local_cluster.sh`脚本可以一键启动本地模拟分布式训练。为了保证重复运行时符合预期,`该脚本会在运行前Kill掉其他python进程`,请注意该风险,也可以自行注释掉脚本中杀python进程相关的语句。
```bash ```bash
# 根据自己的运行环境,选择sh或bash # 根据自己的运行环境,选择sh或bash
sh local_cluster.sh sh local_cluster.sh
...@@ -565,7 +563,7 @@ I1126 07:38:28.947571 14715 communicator.cc:363] Communicator stop done ...@@ -565,7 +563,7 @@ I1126 07:38:28.947571 14715 communicator.cc:363] Communicator stop done
### 区别七 增量训练 ### 区别七 增量训练
#### 单机增量训练 #### 单机增量训练
单机训练的基本顺序是:`构建网络->初始化参数->加载数据A->开始训练`。而增量训练的基本顺序是:`构建网络->加载已有参数->加载数据B->开始训练`。因此需要增加加载参数的逻辑,在示例代码`infer.py`中有相关操作: 单机训练的基本顺序是:`构建网络->初始化参数->加载数据A->开始训练`。而增量训练的基本顺序是:`构建网络->加载已有参数->加载数据B->开始训练`。因此需要增加加载参数的逻辑,在示例代码`infer.py`中有相关加载参数的操作:
```python ```python
# 构建网络 # 构建网络
inputs = ctr_model.input_data(params) inputs = ctr_model.input_data(params)
...@@ -574,7 +572,11 @@ loss, auc_var = ctr_model.net(inputs, params) ...@@ -574,7 +572,11 @@ loss, auc_var = ctr_model.net(inputs, params)
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
# 不使用 exe.run(fluid.defalut_startup_program()) # 不使用 exe.run(fluid.defalut_startup_program())
# 加载已有参数到内存中,使用fluid.load接口(因为本示例使用fluid.save保存模型) # 加载已有参数到内存中,使用fluid.load接口(因为本示例使用fluid.save保存模型)
fluid.load(fluid.default_main_program(), model_path + "/checkpoint", exe) # model_path为模型保存的地址,如model_path = "./models/epoch_0"
fluid.load(fluid.default_main_program(), os.path.join(model_path, "checkpoint"), exe)
dataset, file_list = get_dataset(inputs, args)
# 下同单机训练流程
``` ```
#### 分布式增量训练 #### 分布式增量训练
...@@ -583,12 +585,13 @@ Paddle的分布式增量训练也十分易用,代码与上述分布式训练 ...@@ -583,12 +585,13 @@ Paddle的分布式增量训练也十分易用,代码与上述分布式训练
# 增量训练 # 增量训练
if fleet.is_server(): if fleet.is_server():
# 初始化参数服务器节点时,传入模型保存的地址 # 初始化参数服务器节点时,传入模型保存的地址
# 不要混用fluid.save和fleet.save_persistables保存的模型
fleet.init_server(model_path) fleet.init_server(model_path)
# 运行参数服务器节点 # 运行参数服务器节点
fleet.run_server() fleet.run_server()
elif fleet.is_worker(): elif fleet.is_worker():
# 训练节点的代码无需更改 # 训练节点的代码无需更改
# 在运行fleet.startup_program时,训练节点会从pserver上拉取最新参数 # 在运行fleet.startup_program时,训练节点会自动从pserver上拉取最新参数
``` ```
# #
...@@ -652,12 +655,12 @@ def set_zero(): ...@@ -652,12 +655,12 @@ def set_zero():
为了快速验证,我们仅取用测试数据集的一个part文件,进行测试。在代码目录下,键入以下命令,进行预测: 为了快速验证,我们仅取用测试数据集的一个part文件,进行测试。在代码目录下,键入以下命令,进行预测:
- 对单机训练的模型进行预测 - 对单机训练的模型进行预测
```python ```python
python -u infer.py --is_local=1 &> test.log & python -u infer.py --is_local=1 --infer_epoch=0 &> test.log &
``` ```
- 对分布式训练的模型进行预测 - 对分布式训练的模型进行预测
```python ```python
python -u infer.py --is_cloud=1 &> test.log & python -u infer.py --is_cloud=1 --infer_epoch=0 &> test.log &
``` ```
测试结果的日志位于`test.log`,仅训练一个epoch后,在`part-220`上的的理想测试结果为: 测试结果的日志位于`test.log`,仅训练一个epoch后,在`part-220`上的的理想测试结果为:
```bash ```bash
...@@ -672,6 +675,6 @@ open file success ...@@ -672,6 +675,6 @@ open file success
因为快速验证的训练数据与测试数据极少,同时只训练了一轮,所以远远没有达到收敛,且初始化带有随机性,在您的环境下出现测试结果与示例输出不一致是正常情况。 因为快速验证的训练数据与测试数据极少,同时只训练了一轮,所以远远没有达到收敛,且初始化带有随机性,在您的环境下出现测试结果与示例输出不一致是正常情况。
### benchmark ### benchmark
全量数据的训练与预测,请修改对应`train.py``infer.py`中对应的`train_files_path``test_files_path`超参数,分别修改为`./train_data_full``./test_data_full`。在全量数据中训练三轮后,加载epoch_2的模型,预期测试AUC可以达到0.794 全量数据的训练与预测,请修改对应`train.py``infer.py`中对应的`train_files_path``test_files_path`超参数,分别修改为`./train_data_full``./test_data_full`。在全量数据中训练三轮后,加载epoch_2的模型,`auc=0.79395`
分布式benchmark相关代码及复现方式见[Fleet Repo](https://github.com/PaddlePaddle/Fleet.git),路径为Fleet/benchmark/ps/distribute_ctr/paddle/。 分布式benchmark相关代码及复现方式见[Fleet Repo](https://github.com/PaddlePaddle/Fleet.git),路径为Fleet/benchmark/ps/distribute_ctr/paddle/。
\ No newline at end of file
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from __future__ import print_function from __future__ import print_function
import os import os
import time import time
import six
import numpy as np import numpy as np
import logging import logging
import argparse import argparse
...@@ -32,11 +33,6 @@ def parse_args(): ...@@ -32,11 +33,6 @@ def parse_args():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="PaddlePaddle CTR-DNN example") description="PaddlePaddle CTR-DNN example")
# -------------Data & Model Path------------- # -------------Data & Model Path-------------
parser.add_argument(
'--train_files_path',
type=str,
default='./train_data',
help="The path of training dataset")
parser.add_argument( parser.add_argument(
'--test_files_path', '--test_files_path',
type=str, type=str,
...@@ -48,23 +44,18 @@ def parse_args(): ...@@ -48,23 +44,18 @@ def parse_args():
default='models', default='models',
help='The path for model to store (default: models)') help='The path for model to store (default: models)')
# -------------Training parameter------------- # -------------Running parameter-------------
parser.add_argument(
'--learning_rate',
type=float,
default=1e-4,
help="Initial learning rate for training")
parser.add_argument( parser.add_argument(
'--batch_size', '--batch_size',
type=int, type=int,
default=1000, default=1000,
help="The size of mini-batch (default:1000)") help="The size of mini-batch (default:1000)")
parser.add_argument( parser.add_argument(
"--epochs", '--infer_epoch',
type=int, type=int,
default=1, default=0,
help="Number of epochs for training.") help='Specify which epoch to run infer'
)
# -------------Network parameter------------- # -------------Network parameter-------------
parser.add_argument( parser.add_argument(
'--embedding_size', '--embedding_size',
...@@ -93,30 +84,25 @@ def parse_args(): ...@@ -93,30 +84,25 @@ def parse_args():
type=int, type=int,
default=0, default=0,
help='Local train or distributed train on paddlecloud (default: 0)') help='Local train or distributed train on paddlecloud (default: 0)')
parser.add_argument(
'--save_model',
type=int,
default=0,
help='Save training model or not')
parser.add_argument(
'--enable_ce',
action='store_true',
help='If set, run the task with continuous evaluation logs.')
parser.add_argument(
'--cpu_num',
type=int,
default=2,
help='threads for ctr training')
return parser.parse_args() return parser.parse_args()
def print_arguments(args):
"""
print arguments
"""
logger.info('----------- Configuration Arguments -----------')
for arg, value in sorted(six.iteritems(vars(args))):
logger.info('%s: %s' % (arg, value))
logger.info('------------------------------------------------')
def run_infer(args, model_path): def run_infer(args, model_path):
place = fluid.CPUPlace() place = fluid.CPUPlace()
train_generator = generator.CriteoDataset(args.sparse_feature_dim) train_generator = generator.CriteoDataset(args.sparse_feature_dim)
file_list = [ file_list = [
str(args.test_files_path) + "/%s" % x os.path.join(args.test_files_path, x) for x in os.listdir(args.test_files_path)
for x in os.listdir(args.test_files_path)
] ]
test_reader = paddle.batch(train_generator.test(file_list), test_reader = paddle.batch(train_generator.test(file_list),
batch_size=args.batch_size) batch_size=args.batch_size)
...@@ -150,7 +136,7 @@ def run_infer(args, model_path): ...@@ -150,7 +136,7 @@ def run_infer(args, model_path):
main_program=fluid.default_main_program()) main_program=fluid.default_main_program())
elif args.is_local: elif args.is_local:
fluid.load(fluid.default_main_program(), fluid.load(fluid.default_main_program(),
model_path + "/checkpoint", exe) os.path.join(model_path, "checkpoint"), exe)
set_zero() set_zero()
run_index = 0 run_index = 0
...@@ -171,7 +157,7 @@ def run_infer(args, model_path): ...@@ -171,7 +157,7 @@ def run_infer(args, model_path):
infer_result = {} infer_result = {}
infer_result['loss'] = infer_loss infer_result['loss'] = infer_loss
infer_result['auc'] = infer_auc infer_result['auc'] = infer_auc
log_path = model_path + '/infer_result.log' log_path = os.path.join(model_path, 'infer_result.log')
logger.info(str(infer_result)) logger.info(str(infer_result))
with open(log_path, 'w+') as f: with open(log_path, 'w+') as f:
f.write(str(infer_result)) f.write(str(infer_result))
...@@ -181,12 +167,18 @@ def run_infer(args, model_path): ...@@ -181,12 +167,18 @@ def run_infer(args, model_path):
if __name__ == "__main__": if __name__ == "__main__":
args = parse_args() args = parse_args()
print_arguments(args)
model_list = [] model_list = []
for _, dir, _ in os.walk(args.model_path): for _, dir, _ in os.walk(args.model_path):
for model in dir: for model in dir:
if "epoch" in model: if "epoch" in model and args.infer_epoch == int(model.split('_')[-1]):
path = "/".join([args.model_path, model]) path = os.path.join(args.model_path, model)
model_list.append(path) model_list.append(path)
if len(model_list) == 0:
logger.info("There is no satisfactory model {} at path {}, please check your start command & env. ".format(
str("epoch_")+str(args.infer_epoch), args.model_path))
for model in model_list: for model in model_list:
logger.info("Test model {}".format(model)) logger.info("Test model {}".format(model))
run_infer(args, model) run_infer(args, model)
...@@ -18,6 +18,7 @@ from __future__ import print_function ...@@ -18,6 +18,7 @@ from __future__ import print_function
import argparse import argparse
import logging import logging
import os import os
import six
import time import time
import random import random
import numpy as np import numpy as np
...@@ -119,6 +120,16 @@ def parse_args(): ...@@ -119,6 +120,16 @@ def parse_args():
return parser.parse_args() return parser.parse_args()
def print_arguments(args):
"""
print arguments
"""
logger.info('----------- Configuration Arguments -----------')
for arg, value in sorted(six.iteritems(vars(args))):
logger.info('%s: %s' % (arg, value))
logger.info('------------------------------------------------')
def get_dataset(inputs, args): def get_dataset(inputs, args):
dataset = fluid.DatasetFactory().create_dataset() dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs) dataset.set_use_var(inputs)
...@@ -127,8 +138,7 @@ def get_dataset(inputs, args): ...@@ -127,8 +138,7 @@ def get_dataset(inputs, args):
thread_num = int(args.cpu_num) thread_num = int(args.cpu_num)
dataset.set_thread(thread_num) dataset.set_thread(thread_num)
file_list = [ file_list = [
str(args.train_files_path) + "/%s" % x os.path.join(args.train_files_path, x) for x in os.listdir(args.train_files_path)
for x in os.listdir(args.train_files_path)
] ]
# 请确保每一个训练节点都持有不同的训练文件 # 请确保每一个训练节点都持有不同的训练文件
# 当我们用本地多进程模拟分布式时,每个进程需要拿到不同的文件 # 当我们用本地多进程模拟分布式时,每个进程需要拿到不同的文件
...@@ -176,11 +186,12 @@ def local_train(args): ...@@ -176,11 +186,12 @@ def local_train(args):
((epoch), end_time - start_time)) ((epoch), end_time - start_time))
if args.save_model: if args.save_model:
model_path = (str(args.model_path) + "/" + model_path = os.path.join(
"epoch_" + str(epoch) + "/") str(args.model_path), "epoch_" + str(epoch))
if not os.path.isdir(model_path): if not os.path.isdir(model_path):
os.mkdir(model_path) os.mkdir(model_path)
fluid.save(fluid.default_main_program(), model_path + "checkpoint") fluid.save(fluid.default_main_program(),
os.path.join(model_path, "checkpoint"))
logger.info("Train Success!") logger.info("Train Success!")
...@@ -239,8 +250,8 @@ def distribute_train(args): ...@@ -239,8 +250,8 @@ def distribute_train(args):
# 默认使用0号节点保存模型 # 默认使用0号节点保存模型
if args.save_model and fleet.is_first_worker(): if args.save_model and fleet.is_first_worker():
model_path = (str(args.model_path) + "/" + "epoch_" + model_path = os.path.join(str(args.model_path), "epoch_" +
str(epoch)) str(epoch))
fleet.save_persistables(executor=exe, dirname=model_path) fleet.save_persistables(executor=exe, dirname=model_path)
fleet.stop_worker() fleet.stop_worker()
...@@ -252,7 +263,7 @@ def train(): ...@@ -252,7 +263,7 @@ def train():
if not os.path.isdir(args.model_path): if not os.path.isdir(args.model_path):
os.mkdir(args.model_path) os.mkdir(args.model_path)
print_arguments(args)
if args.is_cloud: if args.is_cloud:
logger.info("run cloud training") logger.info("run cloud training")
distribute_train(args) distribute_train(args)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册