未验证 提交 1a1b2f18 编写于 作者: C Chengmo 提交者: GitHub

Update ctr dnn (#4265)

* update ctr/dnn for v/1.7
上级 de81b6fb
#!/bin/bash
export MKL_NUM_THREADS=1
export OMP_NUM_THREADS=1
#cudaid=${face_detection:=0} # use 0-th card as default
#export CUDA_VISIBLE_DEVICES=$cudaid
export CPU_NUM=1
export NUM_THREADS=1
FLAGS_benchmark=true python train.py --is_local 1 --cloud_train 0 --train_data_path data/raw/train.txt --enable_ce | python _ce.py
export CPU_NUM=1
export NUM_THREADS=8
FLAGS_benchmark=true python train.py --is_local 1 --cloud_train 0 --train_data_path data/raw/train.txt --enable_ce | python _ce.py
export CPU_NUM=8
export NUM_THREADS=8
FLAGS_benchmark=true python train.py --is_local 1 --cloud_train 0 --train_data_path data/raw/train.txt --enable_ce | python _ce.py
# 基于DNN模型的点击率预估模型
## 介绍
本模型实现了下述论文中提出的DNN模型:
```text
@inproceedings{guo2017deepfm,
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
pages={1725--1731},
year={2017}
}
```
## 运行环境
**要求使用PaddlePaddle 1.6及以上版本或适当的develop版本。**
需要先安装PaddlePaddle Fluid,然后运行:
```shell
pip install -r requirements.txt
```
## 数据集
本文使用的是Kaggle公司举办的[展示广告竞赛](https://www.kaggle.com/c/criteo-display-ad-challenge/)中所使用的Criteo数据集。
每一行是一次广告展示的特征,第一列是一个标签,表示这次广告展示是否被点击。总共有39个特征,其中13个特征采用整型值,另外26个特征是类别类特征。测试集中是没有标签的。
下载数据集:
```bash
cd data && python download.py && cd ..
```
## 模型
本例子只实现了DeepFM论文中介绍的模型的DNN部分,DeepFM会在其他例子中给出。
## 数据准备
处理原始数据集,整型特征使用min-max归一化方法规范到[0, 1],类别类特征使用了one-hot编码。原始数据集分割成两部分:90%用于训练,其他10%用于训练过程中的验证。
## 训练
训练的命令行选项可以通过`python train.py -h`列出。
### 单机训练:
```bash
python train.py \
--train_data_path data/raw/train.txt \
2>&1 | tee train.log
```
### 分布式训练
本地启动一个2 trainer 2 pserver的分布式训练任务,分布式场景下训练数据会按照trainer的id进行切分,保证trainer之间的训练数据不会重叠,提高训练效率
```bash
# 该sh不支持Windows
sh cluster_train.sh
```
## 预测
预测的命令行选项可以通过`python infer.py -h`列出。
对测试集进行预测:
```bash
python infer.py \
--model_path models/pass-2/ \
--data_path data/raw/train.txt
```
加载pass-2的模型, 预期测试AUC为`0.794`
注意:infer.py跑完最后输出的AUC才是整个预测文件的整体AUC。train.txt文件在reader.py中被分为训练和测试两部分,所以这里数据不会和训练重叠。
## 在百度云上运行集群训练
1. 参考文档 [在百度云上启动Fluid分布式训练](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) 在百度云上部署一个CPU集群。
1. 用preprocess.py处理训练数据生成train.txt。
1. 将train.txt切分成集群机器份,放到每台机器上。
1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务.
## 在PaddleCloud上运行集群训练
如果你正在使用PaddleCloud做集群训练,你可以使用```cloud.py```这个文件来帮助你提交任务,```trian.py```中所需要的参数可以通过PaddleCloud的环境变量来提交。
此差异已折叠。
# this file is only used for continuous evaluation test!
import os
import sys
sys.path.append(os.environ['ceroot'])
from kpi import CostKpi
from kpi import DurationKpi
from kpi import AccKpi
each_pass_duration_cpu1_thread1_kpi = DurationKpi(
'each_pass_duration_cpu1_thread1', 0.08, 0, actived=True)
train_loss_cpu1_thread1_kpi = CostKpi('train_loss_cpu1_thread1', 0.08, 0)
train_auc_val_cpu1_thread1_kpi = AccKpi('train_auc_val_cpu1_thread1', 0.08, 0)
train_batch_auc_val_cpu1_thread1_kpi = AccKpi(
'train_batch_auc_val_cpu1_thread1', 0.08, 0)
each_pass_duration_cpu1_thread8_kpi = DurationKpi(
'each_pass_duration_cpu1_thread8', 0.08, 0, actived=True)
train_loss_cpu1_thread8_kpi = CostKpi('train_loss_cpu1_thread8', 0.08, 0)
train_auc_val_cpu1_thread8_kpi = AccKpi('train_auc_val_cpu1_thread8', 0.08, 0)
train_batch_auc_val_cpu1_thread8_kpi = AccKpi(
'train_batch_auc_val_cpu1_thread8', 0.08, 0)
each_pass_duration_cpu8_thread8_kpi = DurationKpi(
'each_pass_duration_cpu8_thread8', 0.08, 0, actived=True)
train_loss_cpu8_thread8_kpi = CostKpi('train_loss_cpu8_thread8', 0.08, 0)
train_auc_val_cpu8_thread8_kpi = AccKpi('train_auc_val_cpu8_thread8', 0.08, 0)
train_batch_auc_val_cpu8_thread8_kpi = AccKpi(
'train_batch_auc_val_cpu8_thread8', 0.08, 0)
tracking_kpis = [
each_pass_duration_cpu1_thread1_kpi,
train_loss_cpu1_thread1_kpi,
train_auc_val_cpu1_thread1_kpi,
train_batch_auc_val_cpu1_thread1_kpi,
each_pass_duration_cpu1_thread8_kpi,
train_loss_cpu1_thread8_kpi,
train_auc_val_cpu1_thread8_kpi,
train_batch_auc_val_cpu1_thread8_kpi,
each_pass_duration_cpu8_thread8_kpi,
train_loss_cpu8_thread8_kpi,
train_auc_val_cpu8_thread8_kpi,
train_batch_auc_val_cpu8_thread8_kpi,
]
def parse_log(log):
'''
This method should be implemented by model developers.
The suggestion:
each line in the log should be key, value, for example:
"
train_cost\t1.0
test_cost\t1.0
train_cost\t1.0
train_cost\t1.0
train_acc\t1.2
"
'''
for line in log.split('\n'):
fs = line.strip().split('\t')
print(fs)
if len(fs) == 3 and fs[0] == 'kpis':
kpi_name = fs[1]
kpi_value = float(fs[2])
yield kpi_name, kpi_value
def log_to_ce(log):
kpi_tracker = {}
for kpi in tracking_kpis:
kpi_tracker[kpi.name] = kpi
for (kpi_name, kpi_value) in parse_log(log):
print(kpi_name, kpi_value)
kpi_tracker[kpi_name].add_record(kpi_value)
kpi_tracker[kpi_name].persist()
if __name__ == '__main__':
log = sys.stdin.read()
log_to_ce(log)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ======================================================================
#
# Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved
#
# ======================================================================
"""this file is only for PaddleCloud"""
import os
import logging
import paddle.fluid.contrib.utils.hdfs_utils as hdfs_utils
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("cloud")
logger.setLevel(logging.INFO)
def run():
cmd = "python -u train.py "
cmd += " --train_data_path %s " % "data/train.txt"
cmd += " --test_data_path %s " % "data/test.txt"
if os.getenv("BATCH_SIZE", ""):
cmd += " --batch_size %s " % os.getenv("BATCH_SIZE")
if os.getenv("EMBEDDING_SIZE", ""):
cmd += " --embedding_size %s " % os.getenv("EMBEDDING_SIZE")
if os.getenv("NUM_PASSES", ""):
cmd += " --num_passes %s " % os.getenv("NUM_PASSES")
if os.getenv("MODEL_OUTPUT_DIR", ""):
cmd += " --model_output_dir %s " % os.getenv("MODEL_OUTPUT_DIR")
if os.getenv("SPARSE_FEATURE_DIM", ""):
cmd += " --sparse_feature_dim %s " % os.getenv("SPARSE_FEATURE_DIM")
if os.getenv("ASYNC_MODE", ""):
cmd += " --async_mode "
if os.getenv("NO_SPLIT_VAR", ""):
cmd += " --no_split_var "
is_local = int(os.getenv("PADDLE_IS_LOCAL", "1"))
if is_local:
cmd += " --is_local 1 "
cmd += " --cloud_train 0 "
else:
cmd += " --is_local 0 "
cmd += " --cloud_train 1 "
trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
trainers = int(os.environ["PADDLE_TRAINERS"])
training_role = os.environ["PADDLE_TRAINING_ROLE"]
port = os.getenv("PADDLE_PSERVER_PORT", "6174")
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist)
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
if training_role == "PSERVER":
cmd += " --role pserver "
else:
cmd += " --role trainer "
cmd += " --endpoints %s " % pserver_endpoints
cmd += " --current_endpoint %s " % current_endpoint
cmd += " --trainer_id %s " % trainer_id
cmd += " --trainers %s " % trainers
logging.info("run cluster commands: {}".format(cmd))
exit(os.system(cmd))
def download():
hadoop_home = os.getenv("HADOOP_HOME")
configs = {}
configs["fs.default.name"] = os.getenv("DATA_FS_NAME")
configs["hadoop.job.ugi"] = os.getenv("DATA_FS_UGI")
client = hdfs_utils.HDFSClient(hadoop_home, configs)
local_train_data_dir = os.getenv("TRAIN_DATA_LOCAL", "data")
hdfs_train_data_dir = os.getenv("TRAIN_DATA_HDFS", "")
downloads = hdfs_utils.multi_download(
client,
hdfs_train_data_dir,
local_train_data_dir,
0,
1,
multi_processes=1)
print(downloads)
for d in downloads:
base_dir = os.path.dirname(d)
tar_cmd = "tar -zxvf {} -C {}".format(d, base_dir)
print tar_cmd
for d in downloads:
base_dir = os.path.dirname(d)
tar_cmd = "tar -zxvf {} -C {}".format(d, base_dir)
logging.info("DOWNLOAD DATA: {}, AND TAR IT: {}".format(d, tar_cmd))
os.system(tar_cmd)
def env_declar():
logging.info("******** Rename Cluster Env to PaddleFluid Env ********")
if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ[
"PADDLE_IS_LOCAL"] == "0":
os.environ["PADDLE_TRAINING_ROLE"] = os.environ["TRAINING_ROLE"]
os.environ["PADDLE_PSERVER_PORT"] = os.environ["PADDLE_PORT"]
os.environ["PADDLE_PSERVER_IPS"] = os.environ["PADDLE_PSERVERS"]
os.environ["PADDLE_TRAINERS"] = os.environ["PADDLE_TRAINERS_NUM"]
os.environ["PADDLE_CURRENT_IP"] = os.environ["POD_IP"]
os.environ["PADDLE_TRAINER_ID"] = os.environ["PADDLE_TRAINER_ID"]
os.environ["CPU_NUM"] = os.getenv("CPU_NUM", "12")
os.environ["NUM_THREADS"] = os.getenv("NUM_THREADS", "12")
logging.info("Content-Type: text/plain\n\n")
for key in os.environ.keys():
logging.info("%30s %s \n" % (key, os.environ[key]))
logging.info("****** Rename Cluster Env to PaddleFluid Env END ******")
if __name__ == '__main__':
env_declar()
if os.getenv("NEED_CUSTOM_DOWNLOAD", ""):
if os.environ["PADDLE_TRAINING_ROLE"] == "PSERVER":
logging.info("PSERVER do not need to download datas")
else:
logging.info(
"NEED_CUSTOM_DOWNLOAD is True, will download train data with hdfs_utils"
)
download()
run()
#!/bin/bash
# start pserver0
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6000 \
--trainers 2 \
> pserver0.log 2>&1 &
# start pserver1
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6001 \
--trainers 2 \
> pserver1.log 2>&1 &
# start trainer0
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 0 \
> trainer0.log 2>&1 &
# start trainer1
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 1 \
> trainer1.log 2>&1 &
\ No newline at end of file
import os
import shutil
import sys
import glob
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress
if __name__ == '__main__':
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url)
print("download and extract finished")
if os.path.exists("raw"):
shutil.rmtree("raw")
os.mkdir("raw")
# mv ./*.txt raw/
files = glob.glob("*.txt")
for f in files:
shutil.move(f, "raw")
print("done")
#!/bin/bash
wget --no-check-certificate https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz
tar zxf dac.tar.gz >/dev/null 2>&1
rm -f dac.tar.gz
mkdir raw
mv ./*.txt raw/
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in continuous_range_:
if features[idx] == "":
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) - cont_min_[idx - 1]) /
cont_diff_[idx - 1])
for idx in categorical_range_:
sparse_feature.append(
[hash(str(idx) + features[idx]) % hash_dim_])
label = [int(features[0])]
process_line = dense_feature, sparse_feature, label
feature_name = ["dense_feature"]
for idx in categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("label")
yield zip(feature_name, [dense_feature] + sparse_feature + [label])
return reader
d = CriteoDataset()
d.run_from_stdin()
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
mv ./raw_data ./train_data_full
mkdir train_data && cd train_data
cp ../train_data_full/part-0 ../train_data_full/part-1 ./ && cd ..
mv ./test_data ./test_data_full
mkdir test_data && cd test_data
cp ../test_data_full/part-220 ./ && cd ..
echo "Complete data download."
echo "Full Train data stored in ./train_data_full "
echo "Full Test data stored in ./test_data_full "
echo "Rapid Verification train data stored in ./train_data "
echo "Rapid Verification test data stored in ./test_data "
\ No newline at end of file
import mmh3 #!/usr/bin/python
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# There are 13 integer features and 26 categorical features
continous_features = range(1, 14)
categorial_features = range(14, 40)
continous_clip = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
class Dataset:
def __init__(self):
pass
class CriteoDataset(object):
class CriteoDataset(Dataset):
def __init__(self, sparse_feature_dim): def __init__(self, sparse_feature_dim):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [ self.cont_max_ = [
...@@ -28,12 +41,6 @@ class CriteoDataset(Dataset): ...@@ -28,12 +41,6 @@ class CriteoDataset(Dataset):
line_idx = 0 line_idx = 0
for line in f: for line in f:
line_idx += 1 line_idx += 1
if is_train and line_idx > self.train_idx_:
break
elif not is_train and line_idx <= self.train_idx_:
continue
if line_idx % trainer_num != trainer_id:
continue
features = line.rstrip('\n').split('\t') features = line.rstrip('\n').split('\t')
dense_feature = [] dense_feature = []
sparse_feature = [] sparse_feature = []
...@@ -41,13 +48,13 @@ class CriteoDataset(Dataset): ...@@ -41,13 +48,13 @@ class CriteoDataset(Dataset):
if features[idx] == '': if features[idx] == '':
dense_feature.append(0.0) dense_feature.append(0.0)
else: else:
dense_feature.append((float(features[idx]) - dense_feature.append(
self.cont_min_[idx - 1]) / (float(features[idx]) -
self.cont_diff_[idx - 1]) self.cont_min_[idx - 1]) /
self.cont_diff_[idx - 1])
for idx in self.categorical_range_: for idx in self.categorical_range_:
sparse_feature.append([ sparse_feature.append([
mmh3.hash(str(idx) + features[idx]) % hash(str(idx) + features[idx]) % self.hash_dim_
self.hash_dim_
]) ])
label = [int(features[0])] label = [int(features[0])]
...@@ -60,6 +67,3 @@ class CriteoDataset(Dataset): ...@@ -60,6 +67,3 @@ class CriteoDataset(Dataset):
def test(self, file_list): def test(self, file_list):
return self._reader_creator(file_list, False, 1, 0) return self._reader_creator(file_list, False, 1, 0)
def infer(self, file_list):
return self._reader_creator(file_list, False, 1, 0)
import argparse # Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
import logging #
# Licensed under the Apache License, Version 2.0 (the "License");
import numpy as np # you may not use this file except in compliance with the License.
# disable gpu training for this example # You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os import os
os.environ["CUDA_VISIBLE_DEVICES"] = "" import time
import numpy as np
import logging
import argparse
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
from network_conf import CTR
import reader import feed_generator as generator
from network_conf import ctr_dnn_model
import utils
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid") logger = logging.getLogger("fluid")
...@@ -18,17 +29,43 @@ logger.setLevel(logging.INFO) ...@@ -18,17 +29,43 @@ logger.setLevel(logging.INFO)
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle DeepFM example") parser = argparse.ArgumentParser(
description="PaddlePaddle CTR-DNN example")
# -------------Data & Model Path-------------
parser.add_argument( parser.add_argument(
'--model_path', '--train_files_path',
type=str, type=str,
required=True, default='./train_data',
help="The path of model parameters gz file") help="The path of training dataset")
parser.add_argument( parser.add_argument(
'--data_path', '--test_files_path',
type=str,
default='./test_data',
help="The path of testing dataset")
parser.add_argument(
'--model_path',
type=str, type=str,
required=True, default='models',
help="The path of the dataset to infer") help='The path for model to store (default: models)')
# -------------Training parameter-------------
parser.add_argument(
'--learning_rate',
type=float,
default=1e-4,
help="Initial learning rate for training")
parser.add_argument(
'--batch_size',
type=int,
default=1000,
help="The size of mini-batch (default:1000)")
parser.add_argument(
"--epochs",
type=int,
default=1,
help="Number of epochs for training.")
# -------------Network parameter-------------
parser.add_argument( parser.add_argument(
'--embedding_size', '--embedding_size',
type=int, type=int,
...@@ -38,59 +75,118 @@ def parse_args(): ...@@ -38,59 +75,118 @@ def parse_args():
'--sparse_feature_dim', '--sparse_feature_dim',
type=int, type=int,
default=1000001, default=1000001,
help="The size for embedding layer (default:1000001)") help='sparse feature hashing space for index processing')
parser.add_argument( parser.add_argument(
'--batch_size', '--dense_feature_dim',
type=int, type=int,
default=1000, default=13,
help="The size of mini-batch (default:1000)") help='dense feature shape')
# -------------device parameter-------------
parser.add_argument(
'--is_local',
type=int,
default=0,
help='Local train or distributed train (default: 1)')
parser.add_argument(
'--is_cloud',
type=int,
default=0,
help='Local train or distributed train on paddlecloud (default: 0)')
parser.add_argument(
'--save_model',
type=int,
default=0,
help='Save training model or not')
parser.add_argument(
'--enable_ce',
action='store_true',
help='If set, run the task with continuous evaluation logs.')
parser.add_argument(
'--cpu_num',
type=int,
default=2,
help='threads for ctr training')
return parser.parse_args() return parser.parse_args()
def infer(): def run_infer(args, model_path):
args = parse_args()
place = fluid.CPUPlace() place = fluid.CPUPlace()
inference_scope = fluid.Scope() train_generator = generator.CriteoDataset(args.sparse_feature_dim)
file_list = [
dataset = reader.CriteoDataset(args.sparse_feature_dim) str(args.test_files_path) + "/%s" % x
test_reader = paddle.batch( for x in os.listdir(args.test_files_path)
dataset.test([args.data_path]), batch_size=args.batch_size) ]
test_reader = paddle.batch(train_generator.test(file_list),
batch_size=args.batch_size)
startup_program = fluid.framework.Program() startup_program = fluid.framework.Program()
test_program = fluid.framework.Program() test_program = fluid.framework.Program()
with fluid.scope_guard(inference_scope): ctr_model = CTR()
with fluid.framework.program_guard(test_program, startup_program):
loss, auc_var, batch_auc_var, _, data_list, auc_states = ctr_dnn_model( def set_zero():
args.embedding_size, args.sparse_feature_dim, False) auc_states_names = [
'_generated_var_0', '_generated_var_1', '_generated_var_2',
exe = fluid.Executor(place) '_generated_var_3'
]
feeder = fluid.DataFeeder(feed_list=data_list, place=place) for name in auc_states_names:
param = fluid.global_scope().var(name).get_tensor()
fluid.io.load_persistables( if param:
executor=exe,
dirname=args.model_path,
main_program=fluid.default_main_program())
def set_zero(var_name):
param = inference_scope.var(var_name).get_tensor()
param_array = np.zeros(param._get_dims()).astype("int64") param_array = np.zeros(param._get_dims()).astype("int64")
param.set(param_array, place) param.set(param_array, place)
for var in auc_states: with fluid.framework.program_guard(test_program, startup_program):
set_zero(var.name) with fluid.unique_name.guard():
inputs = ctr_model.input_data(args)
loss, auc_var = ctr_model.net(inputs, args)
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=inputs, place=place)
if args.is_cloud:
fluid.io.load_persistables(
executor=exe,
dirname=model_path,
main_program=fluid.default_main_program())
elif args.is_local:
fluid.load(fluid.default_main_program(),
model_path + "/checkpoint", exe)
set_zero()
run_index = 0
infer_auc = 0
L = []
for batch_id, data in enumerate(test_reader()): for batch_id, data in enumerate(test_reader()):
loss_val, auc_val = exe.run(test_program, loss_val, auc_val = exe.run(test_program,
feed=feeder.feed(data), feed=feeder.feed(data),
fetch_list=[loss, auc_var]) fetch_list=[loss, auc_var])
run_index += 1
infer_auc = auc_val
L.append(loss_val / args.batch_size)
if batch_id % 100 == 0: if batch_id % 100 == 0:
logger.info("TEST --> batch: {} loss: {} auc: {}".format( logger.info("TEST --> batch: {} loss: {} auc: {}".format(
batch_id, loss_val / args.batch_size, auc_val)) batch_id, loss_val / args.batch_size, auc_val))
infer_loss = np.mean(L)
infer_result = {}
infer_result['loss'] = infer_loss
infer_result['auc'] = infer_auc
log_path = model_path + '/infer_result.log'
logger.info(str(infer_result))
with open(log_path, 'w+') as f:
f.write(str(infer_result))
logger.info("Inference complete")
return infer_result
if __name__ == '__main__':
utils.check_version() if __name__ == "__main__":
infer() args = parse_args()
model_list = []
for _, dir, _ in os.walk(args.model_path):
for model in dir:
if "epoch" in model:
path = "/".join([args.model_path, model])
model_list.append(path)
for model in model_list:
logger.info("Test model {}".format(model))
run_infer(args, model)
#!/bin/bash
echo "WARNING: This script only for run Paddle Paddle CTR distribute training locally"
if [ ! -d "./models" ]; then
mkdir ./models
echo "Create model folder for store infer model"
fi
if [ ! -d "./log" ]; then
mkdir ./log
echo "Create log floder for store running log"
fi
if [ ! -d "./output" ]; then
mkdir ./output
echo "Create output floder"
fi
# kill existing server process
ps -ef|grep python|awk '{print $2}'|xargs kill -9
# environment variables for fleet distribute training
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=2
export OUTPUT_PATH="output"
export FLAGS_communicator_thread_pool_size=5
export FLAGS_communicator_fake_rpc=0
export FLAGS_communicator_is_sgd_optimizer=0
export FLAGS_rpc_retry_times=3
export PADDLE_PSERVERS_IP_PORT_LIST="127.0.0.1:36011,127.0.0.1:36012"
export PADDLE_PSERVER_PORT_ARRAY=(36011 36012)
export PADDLE_PSERVER_NUMS=2
export PADDLE_TRAINERS=2
export TRAINING_ROLE=PSERVER
export GLOG_v=0
export GLOG_logtostderr=1
train_mode=$1
for((i=0;i<$PADDLE_PSERVER_NUMS;i++))
do
cur_port=${PADDLE_PSERVER_PORT_ARRAY[$i]}
echo "PADDLE WILL START PSERVER "$cur_port
export PADDLE_PORT=${cur_port}
export POD_IP=127.0.0.1
python -u train.py --save_model=1 --is_cloud=1 --cpu_num=10 &> ./log/pserver.$i.log &
done
export TRAINING_ROLE=TRAINER
export GLOG_v=0
export GLOG_logtostderr=1
for((i=0;i<$PADDLE_TRAINERS;i++))
do
echo "PADDLE WILL START Trainer "$i
PADDLE_TRAINER_ID=$i
python -u train.py --save_model=1 --is_cloud=1 --cpu_num=10 &> ./log/trainer.$i.log &
done
echo "Training log stored in ./log/"
\ No newline at end of file
#!/usr/bin/python
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid import paddle.fluid as fluid
import math import math
dense_feature_dim = 13
def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim,
sparse_input):
def dense_fm_layer(input, emb_dict_size, factor_size, fm_param_attr):
"""
dense_fm_layer
"""
first_order = fluid.layers.fc(input=input, size=1)
emb_table = fluid.layers.create_parameter(
shape=[emb_dict_size, factor_size],
dtype='float32',
attr=fm_param_attr)
input_mul_factor = fluid.layers.matmul(input, emb_table)
input_mul_factor_square = fluid.layers.square(input_mul_factor)
input_square = fluid.layers.square(input)
factor_square = fluid.layers.square(emb_table)
input_square_mul_factor_square = fluid.layers.matmul(input_square,
factor_square)
second_order = 0.5 * (
input_mul_factor_square - input_square_mul_factor_square)
return first_order, second_order
def sparse_fm_layer(input, emb_dict_size, factor_size, fm_param_attr):
"""
sparse_fm_layer
"""
first_embeddings = fluid.embedding(
input=input,
dtype='float32',
size=[emb_dict_size, 1],
is_sparse=True)
first_embeddings = fluid.layers.squeeze(
input=first_embeddings, axes=[1])
first_order = fluid.layers.sequence_pool(
input=first_embeddings, pool_type='sum')
nonzero_embeddings = fluid.embedding(
input=input,
dtype='float32',
size=[emb_dict_size, factor_size],
param_attr=fm_param_attr,
is_sparse=True)
nonzero_embeddings = fluid.layers.squeeze(
input=nonzero_embeddings, axes=[1])
summed_features_emb = fluid.layers.sequence_pool(
input=nonzero_embeddings, pool_type='sum')
summed_features_emb_square = fluid.layers.square(summed_features_emb)
squared_features_emb = fluid.layers.square(nonzero_embeddings)
squared_sum_features_emb = fluid.layers.sequence_pool(
input=squared_features_emb, pool_type='sum')
second_order = 0.5 * (
summed_features_emb_square - squared_sum_features_emb)
return first_order, second_order
dense_input = fluid.data(
name="dense_input", shape=[None, dense_feature_dim], dtype='float32')
sparse_input_ids = [
fluid.layers.data(
name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
for i in range(1, 27)
]
label = fluid.data(name='label', shape=[None, 1], dtype='int64')
datas = [dense_input] + sparse_input_ids + [label]
py_reader = fluid.layers.create_py_reader_by_data(
capacity=64, feed_list=datas, name='py_reader', use_double_buffer=True)
words = fluid.layers.read_file(py_reader)
sparse_fm_param_attr = fluid.param_attr.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Normal(scale=1 /
math.sqrt(sparse_feature_dim)))
dense_fm_param_attr = fluid.param_attr.ParamAttr(
name="DenseFeatFactors",
initializer=fluid.initializer.Normal(scale=1 /
math.sqrt(dense_feature_dim)))
sparse_fm_first, sparse_fm_second = sparse_fm_layer(
sparse_input, sparse_feature_dim, factor_size, sparse_fm_param_attr)
dense_fm_first, dense_fm_second = dense_fm_layer(
dense_input, dense_feature_dim, factor_size, dense_fm_param_attr)
def embedding_layer(input):
"""embedding_layer"""
emb = fluid.layers.embedding(
input=input,
dtype='float32',
size=[sparse_feature_dim, factor_size],
param_attr=sparse_fm_param_attr,
is_sparse=True)
emb = fluid.layers.squeeze(input=emb, axes=[1])
return fluid.layers.sequence_pool(input=emb, pool_type='average')
sparse_embed_seq = list(map(embedding_layer, sparse_input_ids))
concated = fluid.layers.concat(sparse_embed_seq + [dense_input], axis=1)
fc1 = fluid.layers.fc(input=concated,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(input=[
sparse_fm_first, sparse_fm_second, dense_fm_first, dense_fm_second, fc3
],
size=2,
act="softmax",
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))))
cost = fluid.layers.cross_entropy(input=predict, label=words[-1])
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=words[-1])
auc_var, batch_auc_var, auc_states = \
fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20)
return avg_cost, auc_var, batch_auc_var, py_reader
def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True):
def embedding_layer(input):
"""embedding_layer"""
emb = fluid.embedding(
input=input,
is_sparse=True,
# you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190
# if you want to set is_distributed to True
is_distributed=False,
size=[sparse_feature_dim, embedding_size],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
emb = fluid.layers.squeeze(input=emb, axes=[1])
return fluid.layers.sequence_pool(input=emb, pool_type='average')
dense_input = fluid.data(
name="dense_input", shape=[None, dense_feature_dim], dtype='float32')
sparse_input_ids = [
fluid.data(
name="C" + str(i), shape=[None, 1], lod_level=1, dtype='int64')
for i in range(1, 27)
]
label = fluid.data(name='label', shape=[None, 1], dtype='int64')
words = [dense_input] + sparse_input_ids + [label]
py_reader = None
if use_py_reader:
py_reader = fluid.layers.create_py_reader_by_data(
capacity=64,
feed_list=words,
name='py_reader',
use_double_buffer=True)
words = fluid.layers.read_file(py_reader)
sparse_embed_seq = list(map(embedding_layer, words[1:-1]))
concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1)
fc1 = fluid.layers.fc(input=concated,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(input=fc3,
size=1,
act='sigmoid',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))))
cost = fluid.layers.log_loss(input=predict, label=fluid.layers.cast(words[-1], dtype="float32"))
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=words[-1])
auc_var, batch_auc_var, auc_states = \
fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20)
return avg_cost, auc_var, batch_auc_var, py_reader, words, auc_states class CTR(object):
"""
DNN for Click-Through Rate prediction
"""
def input_data(self, args):
dense_input = fluid.data(name="dense_input",
shape=[-1, args.dense_feature_dim],
dtype="float32")
sparse_input_ids = [
fluid.data(name="C" + str(i),
shape=[-1, 1],
lod_level=1,
dtype="int64") for i in range(1, 27)
]
label = fluid.data(name="label", shape=[-1, 1], dtype="int64")
inputs = [dense_input] + sparse_input_ids + [label]
return inputs
def net(self, inputs, args):
def embedding_layer(input):
return fluid.layers.embedding(
input=input,
is_sparse=True,
size=[args.sparse_feature_dim, args.embedding_size],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()),
)
sparse_embed_seq = list(map(embedding_layer, inputs[1:-1]))
concated = fluid.layers.concat(sparse_embed_seq + inputs[0:1], axis=1)
fc1 = fluid.layers.fc(
input=concated,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))),
)
fc2 = fluid.layers.fc(
input=fc1,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))),
)
fc3 = fluid.layers.fc(
input=fc2,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))),
)
predict = fluid.layers.fc(
input=fc3,
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))),
)
cost = fluid.layers.cross_entropy(input=predict, label=inputs[-1])
avg_cost = fluid.layers.reduce_sum(cost)
auc_var, _, _ = fluid.layers.auc(input=predict,
label=inputs[-1],
num_thresholds=2**12,
slide_steps=20)
return avg_cost, auc_var
"""
Preprocess Criteo dataset. This dataset was used for the Display Advertising
Challenge (https://www.kaggle.com/c/criteo-display-ad-challenge).
"""
import os
import sys
import click
import random
import collections
# There are 13 integer features and 26 categorical features
continous_features = range(1, 14)
categorial_features = range(14, 40)
# Clip integer features. The clip point for each integer feature
# is derived from the 95% quantile of the total values in each feature
continous_clip = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
class CategoryDictGenerator:
"""
Generate dictionary for each of the categorical features
"""
def __init__(self, num_feature):
self.dicts = []
self.num_feature = num_feature
for i in range(0, num_feature):
self.dicts.append(collections.defaultdict(int))
def build(self, datafile, categorial_features, cutoff=0):
with open(datafile, 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
for i in range(0, self.num_feature):
if features[categorial_features[i]] != '':
self.dicts[i][features[categorial_features[i]]] += 1
for i in range(0, self.num_feature):
self.dicts[i] = filter(lambda x: x[1] >= cutoff,
self.dicts[i].items())
self.dicts[i] = sorted(self.dicts[i], key=lambda x: (-x[1], x[0]))
vocabs, _ = list(zip(*self.dicts[i]))
self.dicts[i] = dict(zip(vocabs, range(1, len(vocabs) + 1)))
self.dicts[i]['<unk>'] = 0
def gen(self, idx, key):
if key not in self.dicts[idx]:
res = self.dicts[idx]['<unk>']
else:
res = self.dicts[idx][key]
return res
def dicts_sizes(self):
return list(map(len, self.dicts))
class ContinuousFeatureGenerator:
"""
Normalize the integer features to [0, 1] by min-max normalization
"""
def __init__(self, num_feature):
self.num_feature = num_feature
self.min = [sys.maxsize] * num_feature
self.max = [-sys.maxsize] * num_feature
def build(self, datafile, continous_features):
with open(datafile, 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
for i in range(0, self.num_feature):
val = features[continous_features[i]]
if val != '':
val = int(val)
if val > continous_clip[i]:
val = continous_clip[i]
self.min[i] = min(self.min[i], val)
self.max[i] = max(self.max[i], val)
def gen(self, idx, val):
if val == '':
return 0.0
val = float(val)
return (val - self.min[idx]) / (self.max[idx] - self.min[idx])
@click.command("preprocess")
@click.option("--datadir", type=str, help="Path to raw criteo dataset")
@click.option("--outdir", type=str, help="Path to save the processed data")
def preprocess(datadir, outdir):
"""
All 13 integer features are normalized to continuous values and these continuous
features are combined into one vector with dimension of 13.
Each of the 26 categorical features are one-hot encoded and all the one-hot
vectors are combined into one sparse binary vector.
"""
dists = ContinuousFeatureGenerator(len(continous_features))
dists.build(os.path.join(datadir, 'train.txt'), continous_features)
dicts = CategoryDictGenerator(len(categorial_features))
dicts.build(
os.path.join(datadir, 'train.txt'), categorial_features, cutoff=200)
dict_sizes = dicts.dicts_sizes()
categorial_feature_offset = [0]
for i in range(1, len(categorial_features)):
offset = categorial_feature_offset[i - 1] + dict_sizes[i - 1]
categorial_feature_offset.append(offset)
random.seed(0)
# 90% of the data are used for training, and 10% of the data are used
# for validation.
with open(os.path.join(outdir, 'train.txt'), 'w') as out_train:
with open(os.path.join(outdir, 'valid.txt'), 'w') as out_valid:
with open(os.path.join(datadir, 'train.txt'), 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
continous_vals = []
for i in range(0, len(continous_features)):
val = dists.gen(i, features[continous_features[i]])
continous_vals.append("{0:.6f}".format(val).rstrip('0')
.rstrip('.'))
categorial_vals = []
for i in range(0, len(categorial_features)):
val = dicts.gen(i, features[categorial_features[
i]]) + categorial_feature_offset[i]
categorial_vals.append(str(val))
continous_vals = ','.join(continous_vals)
categorial_vals = ','.join(categorial_vals)
label = features[0]
if random.randint(0, 9999) % 10 != 0:
out_train.write('\t'.join(
[continous_vals, categorial_vals, label]) + '\n')
else:
out_valid.write('\t'.join(
[continous_vals, categorial_vals, label]) + '\n')
with open(os.path.join(outdir, 'test.txt'), 'w') as out:
with open(os.path.join(datadir, 'test.txt'), 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
continous_vals = []
for i in range(0, len(continous_features)):
val = dists.gen(i, features[continous_features[i] - 1])
continous_vals.append("{0:.6f}".format(val).rstrip('0')
.rstrip('.'))
categorial_vals = []
for i in range(0, len(categorial_features)):
val = dicts.gen(i, features[categorial_features[
i] - 1]) + categorial_feature_offset[i]
categorial_vals.append(str(val))
continous_vals = ','.join(continous_vals)
categorial_vals = ','.join(categorial_vals)
out.write('\t'.join([continous_vals, categorial_vals]) + '\n')
if __name__ == "__main__":
preprocess()
#!/usr/bin/python
# -*- coding=utf-8 -*-
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function from __future__ import print_function
import argparse import argparse
import logging import logging
import os import os
import time import time
import random
import numpy as np import numpy as np
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import reader from network_conf import CTR
from network_conf import ctr_dnn_model
from multiprocessing import cpu_count
import utils import utils
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
# disable gpu training for this example # disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = "" os.environ["CUDA_VISIBLE_DEVICES"] = ""
...@@ -24,249 +41,224 @@ logger.setLevel(logging.INFO) ...@@ -24,249 +41,224 @@ logger.setLevel(logging.INFO)
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR example") parser = argparse.ArgumentParser(
description="PaddlePaddle CTR-DNN example")
# -------------Data & Model Path-------------
parser.add_argument( parser.add_argument(
'--train_data_path', '--train_files_path',
type=str, type=str,
default='./data/raw/train.txt', default='./train_data',
help="The path of training dataset") help="The path of training dataset")
parser.add_argument( parser.add_argument(
'--test_data_path', '--test_files_path',
type=str, type=str,
default='./data/raw/valid.txt', default='./test_data',
help="The path of testing dataset") help="The path of testing dataset")
parser.add_argument(
'--model_path',
type=str,
default='models',
help='The path for model to store (default: models)')
# -------------Training parameter-------------
parser.add_argument(
'--learning_rate',
type=float,
default=1e-4,
help="Initial learning rate for training")
parser.add_argument( parser.add_argument(
'--batch_size', '--batch_size',
type=int, type=int,
default=1000, default=1000,
help="The size of mini-batch (default:1000)") help="The size of mini-batch (default:1000)")
parser.add_argument( parser.add_argument(
'--embedding_size', "--epochs",
type=int, type=int,
default=10, default=1,
help="The size for embedding layer (default:10)") help="Number of epochs for training.")
# -------------Network parameter-------------
parser.add_argument( parser.add_argument(
'--num_passes', '--embedding_size',
type=int, type=int,
default=10, default=10,
help="The number of passes to train (default: 10)") help="The size for embedding layer (default:10)")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store (default: models)')
parser.add_argument( parser.add_argument(
'--sparse_feature_dim', '--sparse_feature_dim',
type=int, type=int,
default=1000001, default=1000001,
help='sparse feature hashing space for index processing') help='sparse feature hashing space for index processing')
parser.add_argument(
'--dense_feature_dim',
type=int,
default=13,
help='dense feature shape')
# -------------device parameter-------------
parser.add_argument( parser.add_argument(
'--is_local', '--is_local',
type=int, type=int,
default=1, default=0,
help='Local train or distributed train (default: 1)') help='Local train or distributed train (default: 1)')
parser.add_argument( parser.add_argument(
'--cloud_train', '--is_cloud',
type=int, type=int,
default=0, default=0,
help='Local train or distributed train on paddlecloud (default: 0)') help='Local train or distributed train on paddlecloud (default: 0)')
parser.add_argument( parser.add_argument(
'--async_mode', '--save_model',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
# the following arguments is used for distributed train, if is_local == false, then you should set them
parser.add_argument(
'--role',
type=str,
default='pserver', # trainer or pserver
help='The path for model to store (default: models)')
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The path for model to store (default: 127.0.0.1:6000)')
parser.add_argument(
'--trainer_id',
type=int, type=int,
default=0, default=0,
help='The path for model to store (default: models)') help='Save training model or not')
parser.add_argument( parser.add_argument(
'--trainers', '--cpu_num',
type=int, type=int,
default=1, default=2,
help='The num of trianers, (default: 1)') help='threads for ctr training')
parser.add_argument(
'--enable_ce',
action='store_true',
help='If set, run the task with continuous evaluation logs.')
return parser.parse_args() return parser.parse_args()
def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, def get_dataset(inputs, args):
trainer_num, trainer_id): dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
if args.enable_ce: dataset.set_pipe_command("python dataset_generator.py")
SEED = 102 dataset.set_batch_size(args.batch_size)
train_program.random_seed = SEED thread_num = int(args.cpu_num)
fluid.default_startup_program().random_seed = SEED dataset.set_thread(thread_num)
file_list = [
dataset = reader.CriteoDataset(args.sparse_feature_dim) str(args.train_files_path) + "/%s" % x
train_reader = paddle.batch( for x in os.listdir(args.train_files_path)
paddle.reader.shuffle( ]
dataset.train([args.train_data_path], trainer_num, trainer_id), # 请确保每一个训练节点都持有不同的训练文件
buf_size=args.batch_size * 100), # 当我们用本地多进程模拟分布式时,每个进程需要拿到不同的文件
batch_size=args.batch_size) # 使用 fleet.split_files 可以便捷的以文件为单位根据节点编号分配训练样本
if int(args.is_cloud):
py_reader.decorate_paddle_reader(train_reader) file_list = fleet.split_files(file_list)
data_name_list = [] logger.info("file list: {}".format(file_list))
place = fluid.CPUPlace() return dataset, file_list
exe = fluid.Executor(place)
exec_strategy = fluid.ExecutionStrategy() def local_train(args):
build_strategy = fluid.BuildStrategy() # 引入模型的组网
ctr_model = CTR()
if os.getenv("NUM_THREADS", ""): inputs = ctr_model.input_data(args)
exec_strategy.num_threads = int(os.getenv("NUM_THREADS")) avg_cost, auc_var = ctr_model.net(inputs, args)
cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) # 选择反向更新优化策略
build_strategy.reduce_strategy = \ optimizer = fluid.optimizer.Adam(args.learning_rate)
fluid.BuildStrategy.ReduceStrategy.Reduce if cpu_num > 1 \ optimizer.minimize(avg_cost)
else fluid.BuildStrategy.ReduceStrategy.AllReduce
# 在CPU上创建训练的执行器并做参数初始化
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
pe = fluid.ParallelExecutor(
use_cuda=False, # 引入训练数据读取器与训练数据列表
loss_name=loss.name, dataset, file_list = get_dataset(inputs, args)
main_program=train_program,
build_strategy=build_strategy, logger.info("Training Begin")
exec_strategy=exec_strategy) for epoch in range(args.epochs):
# 以文件为粒度进行shuffle
total_time = 0 random.shuffle(file_list)
for pass_id in range(args.num_passes): dataset.set_filelist(file_list)
pass_start = time.time()
batch_id = 0 # 使用train_from_dataset实现多线程并发训练
py_reader.start() start_time = time.time()
exe.train_from_dataset(program=fluid.default_main_program(),
try: dataset=dataset,
while True: fetch_list=[auc_var],
loss_val, auc_val, batch_auc_val = pe.run( fetch_info=["Epoch {} auc ".format(epoch)],
fetch_list=[loss.name, auc_var.name, batch_auc_var.name]) print_period=100,
loss_val = np.mean(loss_val) debug=False)
auc_val = np.mean(auc_val) end_time = time.time()
batch_auc_val = np.mean(batch_auc_val) logger.info("epoch %d finished, use time=%d\n" %
((epoch), end_time - start_time))
logger.info(
"TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}" if args.save_model:
.format(pass_id, batch_id, loss_val / args.batch_size, model_path = (str(args.model_path) + "/" +
auc_val, batch_auc_val)) "epoch_" + str(epoch) + "/")
if batch_id % 1000 == 0 and batch_id != 0: if not os.path.isdir(model_path):
model_dir = os.path.join(args.model_output_dir, os.mkdir(model_path)
'batch-' + str(batch_id)) fluid.save(fluid.default_main_program(), model_path + "checkpoint")
if args.trainer_id == 0:
fluid.io.save_persistables( logger.info("Train Success!")
executor=exe,
dirname=model_dir,
main_program=fluid.default_main_program()) def distribute_train(args):
batch_id += 1 # 根据环境变量确定当前机器/进程在分布式训练中扮演的角色
except fluid.core.EOFException: # 然后使用 fleet api的 init()方法初始化这个节点
py_reader.reset() role = role_maker.PaddleCloudRoleMaker()
print("pass_id: %d, pass_time_cost: %f" % fleet.init(role)
(pass_id, time.time() - pass_start))
# 我们还可以进一步指定分布式的运行模式,通过 DistributeTranspilerConfig进行配置
total_time += time.time() - pass_start # 如下,我们设置分布式运行模式为异步(async),同时将参数进行切分,以分配到不同的节点
strategy = DistributeTranspilerConfig()
model_dir = os.path.join(args.model_output_dir, 'pass-' + str(pass_id)) strategy.sync_mode = False
if args.trainer_id == 0: strategy.runtime_split_send_recv = True
fluid.io.save_persistables(
executor=exe, ctr_model = CTR()
dirname=model_dir, inputs = ctr_model.input_data(args)
main_program=fluid.default_main_program()) avg_cost, auc_var = ctr_model.net(inputs, args)
# only for ce # 配置分布式的optimizer,传入我们指定的strategy,构建program
if args.enable_ce: optimizer = fluid.optimizer.Adam(args.learning_rate)
threads_num, cpu_num = get_cards(args) optimizer = fleet.distributed_optimizer(optimizer, strategy)
epoch_idx = args.num_passes optimizer.minimize(avg_cost)
print("kpis\teach_pass_duration_cpu%s_thread%s\t%s" %
(cpu_num, threads_num, total_time / epoch_idx)) # 根据节点角色,分别运行不同的逻辑
print("kpis\ttrain_loss_cpu%s_thread%s\t%s" % if fleet.is_server():
(cpu_num, threads_num, loss_val / args.batch_size)) # 初始化及运行参数服务器节点
print("kpis\ttrain_auc_val_cpu%s_thread%s\t%s" % fleet.init_server()
(cpu_num, threads_num, auc_val)) fleet.run_server()
print("kpis\ttrain_batch_auc_val_cpu%s_thread%s\t%s" %
(cpu_num, threads_num, batch_auc_val)) elif fleet.is_worker():
# 初始化工作节点
fleet.init_worker()
exe = fluid.Executor(fluid.CPUPlace())
# 初始化含有分布式流程的fleet.startup_program
exe.run(fleet.startup_program)
dataset, file_list = get_dataset(inputs, args)
for epoch in range(args.epochs):
# 以文件为粒度进行shuffle
random.shuffle(file_list)
dataset.set_filelist(file_list)
# 训练节点运行的是经过分布式裁剪的fleet.mian_program
start_time = time.time()
exe.train_from_dataset(program=fleet.main_program,
dataset=dataset,
fetch_list=[auc_var],
fetch_info=["Epoch {} auc ".format(epoch)],
print_period=100,
debug=False)
end_time = time.time()
logger.info("epoch %d finished, use time=%d\n" %
((epoch), end_time - start_time))
# 默认使用0号节点保存模型
if args.save_model and fleet.is_first_worker():
model_path = (str(args.model_path) + "/" + "epoch_" +
str(epoch))
fleet.save_persistables(executor=exe, dirname=model_path)
fleet.stop_worker()
logger.info("Distribute Train Success!")
def train(): def train():
args = parse_args() args = parse_args()
if not os.path.isdir(args.model_output_dir): if not os.path.isdir(args.model_path):
os.mkdir(args.model_output_dir) os.mkdir(args.model_path)
loss, auc_var, batch_auc_var, py_reader, _, auc_states = ctr_dnn_model( if args.is_cloud:
args.embedding_size, args.sparse_feature_dim) logger.info("run cloud training")
optimizer = fluid.optimizer.Adam(learning_rate=1e-4) distribute_train(args)
optimizer.minimize(loss) elif args.is_local:
if args.cloud_train:
# the port of all pservers, needed by both trainer and pserver
port = os.getenv("PADDLE_PORT", "6174")
# comma separated ips of all pservers, needed by trainer and
pserver_ips = os.getenv("PADDLE_PSERVERS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
args.endpoints = ",".join(eplist)
args.trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
args.current_endpoint = os.getenv("POD_IP", "localhost") + ":" + port
args.role = os.getenv("TRAINING_ROLE", "TRAINER")
args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
args.is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0)))
if args.is_local:
logger.info("run local training") logger.info("run local training")
main_program = fluid.default_main_program() local_train(args)
train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var,
1, 0)
else:
logger.info("run dist training")
t = fluid.DistributeTranspiler()
t.transpile(
args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
if args.role == "pserver" or args.role == "PSERVER":
logger.info("run pserver")
prog = t.get_pserver_program(args.current_endpoint)
startup = t.get_startup_program(
args.current_endpoint, pserver_program=prog)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup)
exe.run(prog)
elif args.role == "trainer" or args.role == "TRAINER":
logger.info("run trainer")
train_prog = t.get_trainer_program()
train_loop(args, train_prog, py_reader, loss, auc_var,
batch_auc_var, args.trainers, args.trainer_id)
else:
raise ValueError(
'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
)
def get_cards(args):
threads_num = os.environ.get('NUM_THREADS', 1)
cpu_num = os.environ.get('CPU_NUM', 1)
return int(threads_num), int(cpu_num)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -13,12 +13,12 @@ def check_version(): ...@@ -13,12 +13,12 @@ def check_version():
Log error and exit when the installed version of paddlepaddle is Log error and exit when the installed version of paddlepaddle is
not satisfied. not satisfied.
""" """
err = "PaddlePaddle version 1.6 or higher is required, " \ err = "PaddlePaddle version 1.6.1 or higher is required, " \
"or a suitable develop version is satisfied as well. \n" \ "or a suitable develop version is satisfied as well. \n" \
"Please make sure the version is good with your code." \ "Please make sure the version is good with your code." \
try: try:
fluid.require_version('1.6.0') fluid.require_version('1.6.1')
except Exception as e: except Exception as e:
logger.error(err) logger.error(err)
sys.exit(1) sys.exit(1)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册