提交 adf7eca6 编写于 作者: Z zhoushiyu 提交者: Thunderbrook

Add local distributed training in PaddleRec (#3367)

* PaddleRec xdeepfm add local distributed training

* PaddleRec deepfm dcn add local distributed training

* change PaddleRec deepfm README.md

* add embedding is_sparse flag in argparser

* fix deepfm failed in ver 1.5.2 when use transpiler

* fix xdeepfm failed in ver 1.5.2 when use transpiler

* fix some bugs in dnn deepfm and dcn

* fix embedding size in deepfm
上级 dad22cfa
......@@ -4,15 +4,18 @@
```text
.
├── README.md # 文档
├── local_train.py # 本地训练脚本
├── infer.py # 预测脚本
├── network.py # 网络结构
├── config.py # 参数配置
├── reader.py # 读取数据相关的函数
├── README.md # 文档
├── local_train.py # 本地训练脚本
├── infer.py # 预测脚本
├── network.py # 网络结构
├── config.py # 参数配置
├── reader.py # 读取数据相关的函数
├── data/
├── download.sh # 下载数据脚本
├── preprocess.py # 数据预处理脚本
├── download.sh # 下载数据脚本
├── preprocess.py # 数据预处理脚本
├── dist_data/
├── dist_data_download.sh # 下载单机模拟多机小样本数据脚本
├── preprocess_dist.py # 小样本数据预处理脚本
```
......@@ -20,7 +23,7 @@
DCN模型介绍可以参阅论文[Deep & Cross Network for Ad Click Predictions](https://arxiv.org/abs/1708.05123)
## 环境
- PaddlePaddle 1.5.1
- PaddlePaddle 1.5.2
## 数据下载
......@@ -62,3 +65,39 @@ nohup python -u infer.py --test_epoch 2 > test.log &
```text
loss: [0.44703564] auc_val: [0.80654419]
```
## 多机训练
首先使用命令下载并预处理小规模样例数据集:
```bash
cd dist_data && sh dist_download.sh && cd ..
```
运行命令本地模拟多机场景,默认使用2 X 2,即2个pserver,2个trainer的方式组网训练。
```bash
sh cluster_train.sh
```
参数说明:
- train_data_dir: 训练数据目录
- model_output_dir: 模型保存目录
- is_local: 是否单机本地训练(单机模拟多机分布式训练是为0)
- is_sparse: embedding是否使用sparse。如果没有设置,默认是False
- role: 进程角色(pserver或trainer)
- endpoints: 所有pserver地址和端口
- current_endpoint: 当前pserver(role是pserver)端口和地址
- trainers: trainer数量
其他参数见cluster_train.py
预测
```bash
python infer.py --model_output_dir cluster_model --test_epoch 10 --test_valid_data_dir dist_data/dist_test_valid_data --vocab_dir dist_data/vocab --cat_feat_num dist_data/cat_feature_num.txt
```
注意:
- 本地模拟需要关闭代理,e.g. unset http_proxy, unset https_proxy
- 0号trainer保存模型参数
- 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程:
>ps -ef | grep python
- 数据读取使用dataset模式,目前仅支持运行在Linux环境下
import argparse
import os
import sys
import time
from collections import OrderedDict
import paddle.fluid as fluid
from network import DCN
def parse_args():
parser = argparse.ArgumentParser("dcn cluster train.")
parser.add_argument(
'--train_data_dir',
type=str,
default='dist_data/dist_train_data',
help='The path of train data')
parser.add_argument(
'--test_valid_data_dir',
type=str,
default='dist_data/dist_test_valid_data',
help='The path of test and valid data')
parser.add_argument(
'--vocab_dir',
type=str,
default='dist_data/vocab',
help='The path of generated vocabs')
parser.add_argument(
'--cat_feat_num',
type=str,
default='dist_data/cat_feature_num.txt',
help='The path of generated cat_feature_num.txt')
parser.add_argument(
'--batch_size', type=int, default=512, help="Batch size")
parser.add_argument('--num_epoch', type=int, default=10, help="train epoch")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store')
parser.add_argument(
'--num_thread', type=int, default=1, help='The number of threads')
parser.add_argument('--test_epoch', type=str, default='1')
parser.add_argument(
'--dnn_hidden_units',
nargs='+',
type=int,
default=[1024, 1024],
help='DNN layers and hidden units')
parser.add_argument(
'--cross_num',
type=int,
default=6,
help='The number of Cross network layers')
parser.add_argument('--lr', type=float, default=1e-4, help='Learning rate')
parser.add_argument(
'--l2_reg_cross',
type=float,
default=1e-5,
help='Cross net l2 regularizer coefficient')
parser.add_argument(
'--use_bn',
type=bool,
default=True,
help='Whether use batch norm in dnn part')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
parser.add_argument(
'--clip_by_norm', type=float, default=100.0, help="gradient clip norm")
parser.add_argument('--print_steps', type=int, default=5)
parser.add_argument('--use_gpu', type=int, default=1)
# dist params
parser.add_argument('--is_local', type=int, default=1, help='whether local')
parser.add_argument(
'--num_devices', type=int, default=1, help='Number of GPU devices')
parser.add_argument(
'--role', type=str, default='pserver', help='trainer or pserver')
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000, 127.0.0.1:6001')
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The current_endpoint')
parser.add_argument(
'--trainer_id',
type=int,
default=0,
help='trainer id ,only trainer_id=0 save model')
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trianers, (default: 1)')
args = parser.parse_args()
return args
def train():
""" do training """
args = parse_args()
print(args)
if args.trainer_id == 0 and not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
cat_feat_dims_dict = OrderedDict()
for line in open(args.cat_feat_num):
spls = line.strip().split()
assert len(spls) == 2
cat_feat_dims_dict[spls[0]] = int(spls[1])
dcn_model = DCN(args.cross_num, args.dnn_hidden_units, args.l2_reg_cross,
args.use_bn, args.clip_by_norm, cat_feat_dims_dict,
args.is_sparse)
dcn_model.build_network()
optimizer = fluid.optimizer.Adam(learning_rate=args.lr)
optimizer.minimize(dcn_model.loss)
def train_loop(main_program):
""" train network """
start_time = time.time()
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(dcn_model.data_list)
pipe_command = 'python reader.py {}'.format(args.vocab_dir)
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(args.batch_size)
dataset.set_thread(args.num_thread)
train_filelist = [
os.path.join(args.train_data_dir, fname)
for fname in next(os.walk(args.train_data_dir))[2]
]
dataset.set_filelist(train_filelist)
if args.use_gpu == 1:
exe = fluid.Executor(fluid.CUDAPlace(0))
dataset.set_thread(1)
else:
exe = fluid.Executor(fluid.CPUPlace())
dataset.set_thread(args.num_thread)
exe.run(fluid.default_startup_program())
for epoch_id in range(args.num_epoch):
start = time.time()
sys.stderr.write('\nepoch%d start ...\n' % (epoch_id + 1))
exe.train_from_dataset(
program=main_program,
dataset=dataset,
fetch_list=[
dcn_model.loss, dcn_model.avg_logloss, dcn_model.auc_var
],
fetch_info=['total_loss', 'avg_logloss', 'auc'],
debug=False,
print_period=args.print_steps)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1)
sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start))
if args.trainer_id == 0: # only trainer 0 save model
print("save model in {}".format(model_dir))
fluid.io.save_persistables(
executor=exe, dirname=model_dir, main_program=main_program)
print("train time cost {:.4f}".format(time.time() - start_time))
print("finish training")
if args.is_local:
print("run local training")
train_loop(fluid.default_main_program())
else:
print("run distribute training")
t = fluid.DistributeTranspiler()
t.transpile(
args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
if args.role == "pserver":
print("run psever")
pserver_prog, pserver_startup = t.get_pserver_programs(
args.current_endpoint)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(pserver_startup)
exe.run(pserver_prog)
elif args.role == "trainer":
print("run trainer")
train_loop(t.get_trainer_program())
if __name__ == "__main__":
train()
#!/bin/bash
#export GLOG_v=30
#export GLOG_logtostderr=1
# start pserver0
python -u cluster_train.py \
--train_data_dir dist_data/dist_train_data \
--model_output_dir cluster_model \
--is_local 0 \
--is_sparse \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6000 \
--trainers 2 \
> pserver0.log 2>&1 &
# start pserver1
python -u cluster_train.py \
--train_data_dir dist_data/dist_train_data \
--model_output_dir cluster_model \
--is_local 0 \
--is_sparse \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6001 \
--trainers 2 \
> pserver1.log 2>&1 &
# start trainer0
#CUDA_VISIBLE_DEVICES=1 python cluster_train.py \
python -u cluster_train.py \
--train_data_dir dist_data/dist_train_data \
--model_output_dir cluster_model \
--use_gpu 0 \
--is_local 0 \
--is_sparse \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 0 \
> trainer0.log 2>&1 &
# start trainer1
#CUDA_VISIBLE_DEVICES=2 python cluster_train.py \
python -u cluster_train.py \
--train_data_dir dist_data/dist_train_data \
--model_output_dir cluster_model \
--use_gpu 0 \
--is_local 0 \
--is_sparse \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 1 \
> trainer1.log 2>&1 &
echo "2 pservers and 2 trainers started."
\ No newline at end of file
......@@ -18,6 +18,16 @@ def parse_args():
type=str,
default='data/test_valid',
help='The path of test and valid data')
parser.add_argument(
'--vocab_dir',
type=str,
default='data/vocab',
help='The path of generated vocabs')
parser.add_argument(
'--cat_feat_num',
type=str,
default='data/cat_feature_num.txt',
help='The path of generated cat_feature_num.txt')
parser.add_argument(
'--batch_size', type=int, default=512, help="Batch size")
parser.add_argument(
......@@ -56,6 +66,12 @@ def parse_args():
type=bool,
default=True,
help='Whether use batch norm in dnn part')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
parser.add_argument(
'--clip_by_norm', type=float, default=100.0, help="gradient clip norm")
parser.add_argument('--print_steps', type=int, default=100)
......
......@@ -15,7 +15,7 @@ wget --no-check-certificate -c https://s3-eu-west-1.amazonaws.com/kaggle-display
echo "download finished"
echo "extracting ..."
tar xzvf dac.tar.gz
tar xzf dac.tar.gz >/dev/null 2>&1
wc -l $trainfile | awk '{print $1}' > line_nums.log
echo "extract finished"
......
#!/bin/bash
# download small demo dataset
wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz -O dist_data_demo.tar.gz
tar xzvf dist_data_demo.tar.gz
# preprocess demo dataset
python dist_preprocess.py
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, absolute_import, division
import os
import sys
from collections import Counter
import numpy as np
"""
preprocess Criteo train data, generate extra statistic files for model input.
"""
# input filename
FILENAME = 'dist_data_demo.txt'
# global vars
CAT_FEATURE_NUM = 'cat_feature_num.txt'
INT_FEATURE_MINMAX = 'int_feature_minmax.txt'
VOCAB_DIR = 'vocab'
TRAIN_DIR = 'dist_train_data'
TEST_DIR = 'dist_test_valid_data'
TRAIN_FILE = os.path.join(TRAIN_DIR, 'tr')
TEST_FILE = os.path.join(TEST_DIR, 'ev')
SPLIT_RATIO = 0.9
LINE_NUMS = "line_nums.log"
FREQ_THR = 10
INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)]
CAT_COLUMN_NAMES = ['C' + str(i) for i in range(1, 27)]
def check_statfiles():
"""
check if statistic files of Criteo exists
:return:
"""
statsfiles = [CAT_FEATURE_NUM, INT_FEATURE_MINMAX] + [
os.path.join(VOCAB_DIR, cat_fn + '.txt') for cat_fn in CAT_COLUMN_NAMES
]
if all([os.path.exists(fn) for fn in statsfiles]):
return True
return False
def create_statfiles():
"""
create statistic files of Criteo, including:
min/max of interger features
counts of categorical features
vocabs of each categorical features
:return:
"""
int_minmax_list = [[sys.maxsize, -sys.maxsize]
for _ in range(13)] # count integer feature min max
cat_ct_list = [Counter() for _ in range(26)] # count categorical features
for idx, line in enumerate(open(FILENAME)):
spls = line.rstrip('\n').split('\t')
assert len(spls) == 40
for i in range(13):
if not spls[1 + i]: continue
int_val = int(spls[1 + i])
int_minmax_list[i][0] = min(int_minmax_list[i][0], int_val)
int_minmax_list[i][1] = max(int_minmax_list[i][1], int_val)
for i in range(26):
cat_ct_list[i].update([spls[14 + i]])
# save min max of integer features
with open(INT_FEATURE_MINMAX, 'w') as f:
for name, minmax in zip(INT_COLUMN_NAMES, int_minmax_list):
print("{} {} {}".format(name, minmax[0], minmax[1]), file=f)
# remove '' from all cat_set[i] and filter low freq categorical value
cat_set_list = [set() for i in range(len(cat_ct_list))]
for i, ct in enumerate(cat_ct_list):
if '' in ct: del ct['']
for key in list(ct.keys()):
if ct[key] >= FREQ_THR:
cat_set_list[i].add(key)
del cat_ct_list
# create vocab dir
if not os.path.exists(VOCAB_DIR):
os.makedirs(VOCAB_DIR)
# write vocab file of categorical features
with open(CAT_FEATURE_NUM, 'w') as cat_feat_count_file:
for name, s in zip(CAT_COLUMN_NAMES, cat_set_list):
print('{} {}'.format(name, len(s)), file=cat_feat_count_file)
vocabfile = os.path.join(VOCAB_DIR, name + '.txt')
with open(vocabfile, 'w') as f:
for vocab_val in s:
print(vocab_val, file=f)
def split_data():
"""
split train.txt into train and test_valid files.
:return:
"""
if not os.path.exists(TRAIN_DIR):
os.makedirs(TRAIN_DIR)
if not os.path.exists(TEST_DIR):
os.makedirs(TEST_DIR)
all_lines = []
for line in open(FILENAME):
all_lines.append(line)
split_line_idx = int(len(all_lines) * SPLIT_RATIO)
with open(TRAIN_FILE, 'w') as f:
f.writelines(all_lines[:split_line_idx])
with open(TEST_FILE, 'w') as f:
f.writelines(all_lines[split_line_idx:])
if __name__ == '__main__':
if not check_statfiles():
print('create statstic files of Criteo...')
create_statfiles()
print('split train.txt...')
split_data()
print('done')
......@@ -15,6 +15,7 @@ import paddle.fluid as fluid
from config import parse_args
from reader import CriteoDataset
from network import DCN
from collections import OrderedDict
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('fluid')
......@@ -28,16 +29,18 @@ def infer():
place = fluid.CPUPlace()
inference_scope = fluid.Scope()
test_files = [
test_valid_files = [
os.path.join(args.test_valid_data_dir, fname)
for fname in next(os.walk(args.test_valid_data_dir))[2]
]
test_files = random.sample(test_files, int(len(test_files) * 0.5))
test_files = random.sample(test_valid_files,
int(len(test_valid_files) * 0.5))
if not test_files:
test_files = test_valid_files
print('test files num {}'.format(len(test_files)))
criteo_dataset = CriteoDataset()
criteo_dataset.setup()
criteo_dataset.setup(args.vocab_dir)
test_reader = criteo_dataset.test_reader(test_files, args.batch_size, 100)
startup_program = fluid.framework.Program()
......@@ -46,8 +49,14 @@ def infer():
with fluid.scope_guard(inference_scope):
with fluid.framework.program_guard(test_program, startup_program):
cat_feat_dims_dict = OrderedDict()
for line in open(args.cat_feat_num):
spls = line.strip().split()
assert len(spls) == 2
cat_feat_dims_dict[spls[0]] = int(spls[1])
dcn_model = DCN(args.cross_num, args.dnn_hidden_units,
args.l2_reg_cross, args.use_bn)
args.l2_reg_cross, args.use_bn, args.clip_by_norm,
cat_feat_dims_dict, args.is_sparse)
dcn_model.build_network(is_test=True)
exe = fluid.Executor(place)
......
......@@ -5,6 +5,7 @@ import os
import random
import sys
import time
from collections import OrderedDict
import paddle.fluid as fluid
......@@ -21,15 +22,21 @@ def train(args):
:param args: hyperparams of model
:return:
"""
cat_feat_dims_dict = OrderedDict()
for line in open(args.cat_feat_num):
spls = line.strip().split()
assert len(spls) == 2
cat_feat_dims_dict[spls[0]] = int(spls[1])
dcn_model = DCN(args.cross_num, args.dnn_hidden_units, args.l2_reg_cross,
args.use_bn, args.clip_by_norm)
args.use_bn, args.clip_by_norm, cat_feat_dims_dict,
args.is_sparse)
dcn_model.build_network()
dcn_model.backward(args.lr)
# config dataset
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(dcn_model.data_list)
pipe_command = 'python reader.py'
pipe_command = 'python reader.py {}'.format(args.vocab_dir)
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(args.batch_size)
dataset.set_thread(args.num_thread)
......
......@@ -14,24 +14,22 @@ class DCN(object):
dnn_hidden_units=(128, 128),
l2_reg_cross=1e-5,
dnn_use_bn=False,
clip_by_norm=None):
clip_by_norm=None,
cat_feat_dims_dict=None,
is_sparse=False):
self.cross_num = cross_num
self.dnn_hidden_units = dnn_hidden_units
self.l2_reg_cross = l2_reg_cross
self.dnn_use_bn = dnn_use_bn
self.clip_by_norm = clip_by_norm
self.cat_feat_dims_dict = OrderedDict()
self.cat_feat_dims_dict = cat_feat_dims_dict if cat_feat_dims_dict else OrderedDict(
)
self.is_sparse = is_sparse
self.dense_feat_names = ['I' + str(i) for i in range(1, 14)]
self.sparse_feat_names = ['C' + str(i) for i in range(1, 27)]
target = ['label']
for line in open('data/cat_feature_num.txt'):
spls = line.strip().split()
assert len(spls) == 2
self.cat_feat_dims_dict[spls[0]] = int(spls[1])
# {feat_name: dims}
self.feat_dims_dict = OrderedDict(
[(feat_name, 1) for feat_name in self.dense_feat_names])
......@@ -122,16 +120,14 @@ class DCN(object):
def _create_embedding_input(self, data_dict):
# sparse embedding
sparse_emb_dict = OrderedDict((
name, fluid.layers.embedding(
input=fluid.layers.cast(
data_dict[name], dtype='int64'),
size=[
self.feat_dims_dict[name] + 1,
6 * int(pow(self.feat_dims_dict[name], 0.25))
]))
for name in self.sparse_feat_names
)
sparse_emb_dict = OrderedDict((name, fluid.layers.embedding(
input=fluid.layers.cast(
data_dict[name], dtype='int64'),
size=[
self.feat_dims_dict[name] + 1,
6 * int(pow(self.feat_dims_dict[name], 0.25))
],
is_sparse=self.is_sparse)) for name in self.sparse_feat_names)
# combine dense and sparse_emb
dense_input_list = [
......
......@@ -12,7 +12,7 @@ import os
class CriteoDataset(dg.MultiSlotDataGenerator):
def setup(self):
def setup(self, vocab_dir):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
5775, 257675, 65535, 969, 23159456, 431037, 56311, 6047, 29019, 11,
......@@ -35,7 +35,7 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
for i in range(26):
lookup_idx = 1 # remain 0 for default value
for line in open(
os.path.join('data/vocab', 'C' + str(i + 1) + '.txt')):
os.path.join(vocab_dir, 'C' + str(i + 1) + '.txt')):
self.cat_feat_idx_dict_list[i][line.strip()] = lookup_idx
lookup_idx += 1
......@@ -89,5 +89,8 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
if __name__ == '__main__':
criteo_dataset = CriteoDataset()
criteo_dataset.setup()
if len(sys.argv) <= 1:
sys.stderr.write("feat_dict needed for criteo reader.")
exit(1)
criteo_dataset.setup(sys.argv[1])
criteo_dataset.run_from_stdin()
......@@ -30,13 +30,13 @@ cd data && sh download_preprocess.sh && cd ..
After executing these commands, 3 folders "train_data", "test_data" and "aid_data" will be generated. The folder "train_data" contains 90% of the raw data, while the rest 10% is in "test_data". The folder "aid_data" contains a created feature dictionary "feat_dict.pkl2".
## Train
## Local Train
```bash
nohup python local_train.py --model_output_dir models >> train_log 2>&1 &
```
## Infer
## Local Infer
```bash
nohup python infer.py --model_output_dir models --test_epoch 1 >> infer_log 2>&1 &
```
......@@ -49,3 +49,47 @@ When the training set is iterated to the 22nd round, the testing Logloss is `0.4
<p align="center">
<img src="./picture/deepfm_result.png" height=200 hspace='10'/> <br />
</p>
## Distributed Train
We emulate distributed training on a local machine. In default, we use 2 X 2,i.e. 2 pservers X 2 trainers。
### Download and preprocess distributed demo dataset
This small demo dataset(a few lines from Criteo dataset) only test if distributed training can train.
```bash
cd dist_data && sh dist_data_download.sh && cd ..
```
### Distributed Train and Infer
Train
```bash
sh cluster_train.sh
```
params of cluster_train.sh:
- train_data_dir: path of train data
- model_output_dir: path of saved model
- is_local: local or distributed training(set 0 in distributed training)
- is_sparse: whether to use sparse update in embedding. If not set, default is flase.
- role: role of process(pserver or trainer)
- endpoints: ip:port of all pservers
- current_endpoint: ip:port of current pserver(role should be pserver)
- trainers: the number of trainers
other params explained in cluster_train.py
Infer
```bash
python infer.py --model_output_dir cluster_model --test_epoch 50 --test_data_dir=dist_data/dist_test_data --feat_dict='dist_data/aid_data/feat_dict_10.pkl2'
```
Notes:
- **Proxy must be closed**, e.g. unset http_proxy, unset https_proxy.
- The first trainer(with trainer_id 0) saves model params.
- After each training, pserver processes should be stop manually. You can use command below:
>ps -ef | grep python
- We use Dataset API to load data,it's only supported on Linux now.
## Distributed Train with Fleet
Fleet is High-Level API for distributed training in PaddlePaddle. See [DeepFM example](https://github.com/PaddlePaddle/Fleet/tree/develop/examples/deepFM) in Fleet Repo.
......@@ -13,6 +13,11 @@ def parse_args():
type=str,
default='data/test_data',
help='The path of test data (default: models)')
parser.add_argument(
'--feat_dict',
type=str,
default='data/aid_data/feat_dict_10.pkl2',
help='The path of feat_dict')
parser.add_argument(
'--batch_size',
type=int,
......@@ -50,6 +55,12 @@ def parse_args():
type=str,
default='relu',
help='The activation of each layers (default: relu)')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
parser.add_argument(
'--lr', type=float, default=1e-4, help='Learning rate (default: 1e-4)')
parser.add_argument(
......
import argparse
import os
import sys
import time
from network_conf import ctr_deepfm_model
import paddle.fluid as fluid
def parse_args():
parser = argparse.ArgumentParser("deepfm cluster train.")
parser.add_argument(
'--train_data_dir',
type=str,
default='dist_data/dist_train_data',
help='The path of train data (default: data/train_data)')
parser.add_argument(
'--test_data_dir',
type=str,
default='dist_data/dist_test_data',
help='The path of test data (default: models)')
parser.add_argument(
'--feat_dict',
type=str,
default='dist_data/aid_data/feat_dict_10.pkl2',
help='The path of feat_dict')
parser.add_argument(
'--batch_size',
type=int,
default=100,
help="The size of mini-batch (default:100)")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--num_epoch',
type=int,
default=50,
help="The number of epochs to train (default: 50)")
parser.add_argument(
'--model_output_dir',
type=str,
required=True,
help='The path for model to store (default: models)')
parser.add_argument(
'--num_thread',
type=int,
default=1,
help='The number of threads (default: 1)')
parser.add_argument('--test_epoch', type=str, default='1')
parser.add_argument(
'--layer_sizes',
nargs='+',
type=int,
default=[400, 400, 400],
help='The size of each layers (default: [10, 10, 10])')
parser.add_argument(
'--act',
type=str,
default='relu',
help='The activation of each layers (default: relu)')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
parser.add_argument(
'--lr', type=float, default=1e-4, help='Learning rate (default: 1e-4)')
parser.add_argument(
'--reg', type=float, default=1e-4, help=' (default: 1e-4)')
parser.add_argument('--num_field', type=int, default=39)
parser.add_argument('--num_feat', type=int, default=135483)
parser.add_argument('--use_gpu', type=int, default=1)
# dist params
parser.add_argument('--is_local', type=int, default=1, help='whether local')
parser.add_argument(
'--num_devices', type=int, default=1, help='Number of GPU devices')
parser.add_argument(
'--role', type=str, default='pserver', help='trainer or pserver')
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000, 127.0.0.1:6001')
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The current_endpoint')
parser.add_argument(
'--trainer_id',
type=int,
default=0,
help='trainer id ,only trainer_id=0 save model')
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trianers, (default: 1)')
args = parser.parse_args()
return args
def train():
""" do training """
args = parse_args()
print(args)
if args.trainer_id == 0 and not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
loss, auc, data_list = ctr_deepfm_model(args.embedding_size, args.num_field,
args.num_feat, args.layer_sizes,
args.act, args.reg, args.is_sparse)
optimizer = fluid.optimizer.SGD(
learning_rate=args.lr,
regularization=fluid.regularizer.L2DecayRegularizer(args.reg))
optimizer.minimize(loss)
def train_loop(main_program):
""" train network """
start_time = time.time()
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(data_list)
pipe_command = 'python criteo_reader.py {}'.format(args.feat_dict)
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(args.batch_size)
dataset.set_thread(args.num_thread)
train_filelist = [
args.train_data_dir + '/' + x
for x in os.listdir(args.train_data_dir)
]
if args.use_gpu == 1:
exe = fluid.Executor(fluid.CUDAPlace(0))
dataset.set_thread(1)
else:
exe = fluid.Executor(fluid.CPUPlace())
dataset.set_thread(args.num_thread)
exe.run(fluid.default_startup_program())
for epoch_id in range(args.num_epoch):
start = time.time()
sys.stderr.write('\nepoch%d start ...\n' % (epoch_id + 1))
dataset.set_filelist(train_filelist)
exe.train_from_dataset(
program=main_program,
dataset=dataset,
fetch_list=[loss],
fetch_info=['epoch %d batch loss' % (epoch_id + 1)],
print_period=20,
debug=False)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1)
sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start))
if args.trainer_id == 0: # only trainer 0 save model
print("save model in {}".format(model_dir))
fluid.io.save_persistables(
executor=exe, dirname=model_dir, main_program=main_program)
print("train time cost {:.4f}".format(time.time() - start_time))
print("finish training")
if args.is_local:
print("run local training")
train_loop(fluid.default_main_program())
else:
print("run distribute training")
t = fluid.DistributeTranspiler()
t.transpile(
args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
if args.role == "pserver":
print("run psever")
pserver_prog, pserver_startup = t.get_pserver_programs(
args.current_endpoint)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(pserver_startup)
exe.run(pserver_prog)
elif args.role == "trainer":
print("run trainer")
train_loop(t.get_trainer_program())
if __name__ == "__main__":
train()
#!/bin/bash
#export GLOG_v=30
#export GLOG_logtostderr=1
# start pserver0
python -u cluster_train.py \
--train_data_dir dist_data/dist_train_data \
--model_output_dir cluster_model \
--is_local 0 \
--is_sparse \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6000 \
--trainers 2 \
> pserver0.log 2>&1 &
# start pserver1
python -u cluster_train.py \
--train_data_dir dist_data/dist_train_data \
--model_output_dir cluster_model \
--is_local 0 \
--is_sparse \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6001 \
--trainers 2 \
> pserver1.log 2>&1 &
# start trainer0
#CUDA_VISIBLE_DEVICES=1 python cluster_train.py \
python -u cluster_train.py \
--train_data_dir dist_data/dist_train_data \
--model_output_dir cluster_model \
--use_gpu 0 \
--is_local 0 \
--is_sparse \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 0 \
> trainer0.log 2>&1 &
# start trainer1
#CUDA_VISIBLE_DEVICES=2 python cluster_train.py \
python -u cluster_train.py \
--train_data_dir dist_data/dist_train_data \
--model_output_dir cluster_model \
--use_gpu 0 \
--is_local 0 \
--is_sparse \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 1 \
> trainer1.log 2>&1 &
echo "2 pservers and 2 trainers started."
\ No newline at end of file
......@@ -9,7 +9,7 @@ import os
class CriteoDataset(dg.MultiSlotDataGenerator):
def setup(self):
def setup(self, feat_dict_name):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
5775, 257675, 65535, 969, 23159456, 431037, 56311, 6047, 29019, 46,
......@@ -21,8 +21,7 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
]
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
self.feat_dict_ = pickle.load(
open('data/aid_data/feat_dict_10.pkl2', 'rb'))
self.feat_dict_ = pickle.load(open(feat_dict_name, 'rb'))
def _process_line(self, line):
features = line.rstrip('\n').split('\t')
......@@ -68,5 +67,8 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
if __name__ == '__main__':
criteo_dataset = CriteoDataset()
criteo_dataset.setup()
if len(sys.argv) <= 1:
sys.stderr.write("feat_dict needed for criteo reader.")
exit(1)
criteo_dataset.setup(sys.argv[1])
criteo_dataset.run_from_stdin()
......@@ -2,7 +2,7 @@
wget --no-check-certificate https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz
wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2 -O ./aid_data/feat_dict_10.pkl2 || rm -f ./aid_data/feat_dict_10.pkl2
tar zxf dac.tar.gz
tar zxf dac.tar.gz >/dev/null 2>&1
rm -f dac.tar.gz
python preprocess.py
......
......@@ -92,7 +92,7 @@ def get_feat_dict():
# Save dictionary
with open(dir_feat_dict_, 'wb') as fout:
pickle.dump(feat_dict, fout)
pickle.dump(feat_dict, fout, protocol=2)
print('args.num_feat ', len(feat_dict) + 1)
......
#!/bin/bash
# download small demo dataset
wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz -O dist_data_demo.tar.gz
tar xzvf dist_data_demo.tar.gz
# preprocess dataset
python preprocess_dist.py
import os
import numpy
from collections import Counter
import shutil
import pickle
SPLIT_RATIO = 0.9
INPUT_FILE = 'dist_data_demo.txt'
TRAIN_FILE = os.path.join('dist_train_data', 'tr')
TEST_FILE = os.path.join('dist_test_data', 'ev')
def split_data():
all_lines = []
for line in open(INPUT_FILE):
all_lines.append(line)
split_line_idx = int(len(all_lines) * SPLIT_RATIO)
with open(TRAIN_FILE, 'w') as f:
f.writelines(all_lines[:split_line_idx])
with open(TEST_FILE, 'w') as f:
f.writelines(all_lines[split_line_idx:])
def get_feat_dict():
freq_ = 10
dir_feat_dict_ = 'aid_data/feat_dict_' + str(freq_) + '.pkl2'
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
if not os.path.exists(dir_feat_dict_):
# print('generate a feature dict')
# Count the number of occurrences of discrete features
feat_cnt = Counter()
with open(INPUT_FILE, 'r') as fin:
for line_idx, line in enumerate(fin):
features = line.lstrip('\n').split('\t')
for idx in categorical_range_:
if features[idx] == '': continue
feat_cnt.update([features[idx]])
# Only retain discrete features with high frequency
# not filter low freq in small dataset
freq_ = 0
feat_set = set()
for feat, ot in feat_cnt.items():
if ot >= freq_:
feat_set.add(feat)
# Create a dictionary for continuous and discrete features
feat_dict = {}
tc = 1
# Continuous features
for idx in continuous_range_:
feat_dict[idx] = tc
tc += 1
# Discrete features
cnt_feat_set = set()
with open(INPUT_FILE, 'r') as fin:
for line_idx, line in enumerate(fin):
features = line.rstrip('\n').split('\t')
for idx in categorical_range_:
if features[idx] == '' or features[idx] not in feat_set:
continue
if features[idx] not in cnt_feat_set:
cnt_feat_set.add(features[idx])
feat_dict[features[idx]] = tc
tc += 1
# Save dictionary
with open(dir_feat_dict_, 'wb') as fout:
pickle.dump(feat_dict, fout, protocol=2)
print('args.num_feat ', len(feat_dict) + 1)
if __name__ == '__main__':
if not os.path.isdir('dist_train_data'):
os.mkdir('dist_train_data')
if not os.path.isdir('dist_test_data'):
os.mkdir('dist_test_data')
if not os.path.isdir('aid_data'):
os.mkdir('aid_data')
split_data()
get_feat_dict()
print('Done!')
......@@ -27,7 +27,7 @@ def infer():
args.test_data_dir + '/' + x for x in os.listdir(args.test_data_dir)
]
criteo_dataset = CriteoDataset()
criteo_dataset.setup()
criteo_dataset.setup(args.feat_dict)
test_reader = paddle.batch(
criteo_dataset.test(test_files), batch_size=args.batch_size)
......
......@@ -30,7 +30,7 @@ def train():
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(data_list)
pipe_command = 'python criteo_reader.py'
pipe_command = 'python criteo_reader.py {}'.format(args.feat_dict)
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(args.batch_size)
dataset.set_thread(args.num_thread)
......
......@@ -2,8 +2,13 @@ import paddle.fluid as fluid
import math
def ctr_deepfm_model(embedding_size, num_field, num_feat, layer_sizes, act,
reg):
def ctr_deepfm_model(embedding_size,
num_field,
num_feat,
layer_sizes,
act,
reg,
is_sparse=False):
init_value_ = 0.1
raw_feat_idx = fluid.layers.data(
......@@ -14,35 +19,41 @@ def ctr_deepfm_model(embedding_size, num_field, num_feat, layer_sizes, act,
name='label', shape=[1], dtype='float32') # None * 1
feat_idx = fluid.layers.reshape(raw_feat_idx,
[-1, num_field, 1]) # None * num_field * 1
[-1, 1]) # (None * num_field) * 1
feat_value = fluid.layers.reshape(
raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
# -------------------- first order term --------------------
first_weights = fluid.layers.embedding(
first_weights_re = fluid.layers.embedding(
input=feat_idx,
is_sparse=is_sparse,
dtype='float32',
size=[num_feat + 1, 1],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_),
regularizer=fluid.regularizer.L1DecayRegularizer(
reg))) # None * num_field * 1
regularizer=fluid.regularizer.L1DecayRegularizer(reg)))
first_weights = fluid.layers.reshape(
first_weights_re, shape=[-1, num_field, 1]) # None * num_field * 1
y_first_order = fluid.layers.reduce_sum((first_weights * feat_value), 1)
# -------------------- second order term --------------------
feat_embeddings = fluid.layers.embedding(
feat_embeddings_re = fluid.layers.embedding(
input=feat_idx,
is_sparse=is_sparse,
dtype='float32',
size=[num_feat + 1, embedding_size],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_ / math.sqrt(float(
embedding_size))))) # None * num_field * embedding_size
loc=0.0, scale=init_value_ / math.sqrt(float(embedding_size)))))
feat_embeddings = fluid.layers.reshape(
feat_embeddings_re,
shape=[-1, num_field,
embedding_size]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# sum_square part
......
#!/bin/bash
wget --no-check-certificate https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz
tar zxf dac.tar.gz
tar zxf dac.tar.gz >/dev/null 2>&1
rm -f dac.tar.gz
mkdir raw
......
......@@ -29,5 +29,39 @@ test_epoch设置加载第10轮训练的模型。
注意:最后的 log info是测试数据集的整体 Logloss 和 AUC。
## 结果
## 单机结果
训练集训练10轮后,测试集的LogLoss : `0.48657` 和 AUC : `0.7308`
## 多机训练
运行命令本地模拟多机场景,默认使用2 X 2模式,即2个pserver,2个trainer的方式组网训练。
数据下载同上面命令。
```bash
sh cluster_train.sh
```
参数说明:
- train_data_dir: 训练数据目录
- model_output_dir: 模型保存目录
- is_local: 是否单机本地训练(单机模拟多机分布式训练是为0)
- is_sparse: embedding是否使用sparse。如果没有设置,默认是False
- role: 进程角色(pserver或trainer)
- endpoints: 所有pserver地址和端口
- current_endpoint: 当前pserver(role是pserver)端口和地址
- trainers: trainer数量
其他参数见cluster_train.py
预测
```bash
python infer.py --model_output_dir cluster_model --test_epoch 10 --use_gpu=0
```
注意:
- 本地模拟需要关闭代理,e.g. unset http_proxy, unset https_proxy
- 0号trainer保存模型参数
- 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程:
>ps -ef | grep python
- 数据读取使用dataset模式,目前仅支持运行在Linux环境下
......@@ -69,5 +69,11 @@ def parse_args():
help='The name of model (default: ctr_xdeepfm_model)')
parser.add_argument('--use_gpu', type=int, default=1)
parser.add_argument('--print_steps', type=int, default=50)
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
return parser.parse_args()
import argparse
import os
import sys
import time
import network_conf
import paddle.fluid as fluid
def parse_args():
parser = argparse.ArgumentParser("xdeepfm cluster train.")
parser.add_argument(
'--train_data_dir',
type=str,
default='data/train_data',
help='The path of train data (default: data/train_data)')
parser.add_argument(
'--test_data_dir',
type=str,
default='data/test_data',
help='The path of test data (default: models)')
parser.add_argument(
'--batch_size',
type=int,
default=100,
help="The size of mini-batch (default:100)")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--num_epoch',
type=int,
default=10,
help="The number of epochs to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
required=True,
help='The path for model to store (default: models)')
parser.add_argument(
'--num_thread',
type=int,
default=1,
help='The number of threads (default: 1)')
parser.add_argument('--test_epoch', type=str, default='1')
parser.add_argument(
'--layer_sizes_dnn',
nargs='+',
type=int,
default=[10, 10, 10],
help='The size of each layers')
parser.add_argument(
'--layer_sizes_cin',
nargs='+',
type=int,
default=[10, 10],
help='The size of each layers')
parser.add_argument(
'--act',
type=str,
default='relu',
help='The activation of each layers (default: relu)')
parser.add_argument(
'--lr', type=float, default=1e-1, help='Learning rate (default: 1e-4)')
parser.add_argument(
'--reg', type=float, default=1e-4, help=' (default: 1e-4)')
parser.add_argument('--num_field', type=int, default=39)
parser.add_argument('--num_feat', type=int, default=28651)
parser.add_argument(
'--model_name',
type=str,
default='ctr_xdeepfm_model',
help='The name of model (default: ctr_xdeepfm_model)')
parser.add_argument('--use_gpu', type=int, default=1)
parser.add_argument('--print_steps', type=int, default=50)
parser.add_argument('--is_local', type=int, default=1, help='whether local')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
# dist params
parser.add_argument(
'--num_devices', type=int, default=1, help='Number of GPU devices')
parser.add_argument(
'--role', type=str, default='pserver', help='trainer or pserver')
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000, 127.0.0.1:6001')
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The current_endpoint')
parser.add_argument(
'--trainer_id',
type=int,
default=0,
help='trainer id ,only trainer_id=0 save model')
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trianers, (default: 1)')
args = parser.parse_args()
return args
def train():
""" do training """
args = parse_args()
print(args)
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
loss, auc, data_list = eval('network_conf.' + args.model_name)(
args.embedding_size, args.num_field, args.num_feat,
args.layer_sizes_dnn, args.act, args.reg, args.layer_sizes_cin,
args.is_sparse)
optimizer = fluid.optimizer.SGD(
learning_rate=args.lr,
regularization=fluid.regularizer.L2DecayRegularizer(args.reg))
optimizer.minimize(loss)
def train_loop(main_program):
""" train network """
start_time = time.time()
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(data_list)
dataset.set_pipe_command('python criteo_reader.py')
dataset.set_batch_size(args.batch_size)
dataset.set_filelist([
args.train_data_dir + '/' + x
for x in os.listdir(args.train_data_dir)
])
if args.use_gpu == 1:
exe = fluid.Executor(fluid.CUDAPlace(0))
dataset.set_thread(1)
else:
exe = fluid.Executor(fluid.CPUPlace())
dataset.set_thread(args.num_thread)
exe.run(fluid.default_startup_program())
for epoch_id in range(args.num_epoch):
start = time.time()
sys.stderr.write('\nepoch%d start ...\n' % (epoch_id + 1))
exe.train_from_dataset(
program=main_program,
dataset=dataset,
fetch_list=[loss, auc],
fetch_info=['loss', 'auc'],
debug=False,
print_period=args.print_steps)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1)
sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start))
if args.trainer_id == 0: # only trainer 0 save model
print("save model in {}".format(model_dir))
fluid.io.save_persistables(
executor=exe, dirname=model_dir, main_program=main_program)
print("train time cost {:.4f}".format(time.time() - start_time))
print("finish training")
if args.is_local:
print("run local training")
train_loop(fluid.default_main_program())
else:
print("run distribute training")
t = fluid.DistributeTranspiler()
t.transpile(
args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
if args.role == "pserver":
print("run psever")
pserver_prog, pserver_startup = t.get_pserver_programs(
args.current_endpoint)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(pserver_startup)
exe.run(pserver_prog)
elif args.role == "trainer":
print("run trainer")
train_loop(t.get_trainer_program())
if __name__ == "__main__":
train()
#!/bin/bash
#export GLOG_v=30
#export GLOG_logtostderr=1
# start pserver0
python -u cluster_train.py \
--train_data_dir data/train_data \
--model_output_dir cluster_model \
--is_local 0 \
--is_sparse \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6000 \
--trainers 2 \
> pserver0.log 2>&1 &
# start pserver1
python -u cluster_train.py \
--train_data_dir data/train_data \
--model_output_dir cluster_model \
--is_local 0 \
--is_sparse \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6001 \
--trainers 2 \
> pserver1.log 2>&1 &
# start trainer0
#CUDA_VISIBLE_DEVICES=1 python cluster_train.py \
python -u cluster_train.py \
--train_data_dir data/train_data \
--model_output_dir cluster_model \
--use_gpu 0 \
--is_local 0 \
--is_sparse \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 0 \
> trainer0.log 2>&1 &
# start trainer1
#CUDA_VISIBLE_DEVICES=2 python cluster_train.py \
python -u cluster_train.py \
--train_data_dir data/train_data \
--model_output_dir cluster_model \
--use_gpu 0 \
--is_local 0 \
--is_sparse \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 1 \
> trainer1.log 2>&1 &
echo "2 pservers and 2 trainers started."
\ No newline at end of file
#!/bin/bash
if [ ! -d "train_data" ]; then
mkdir train_data
fi
if [ ! -d "test_data" ]; then
mkdir test_data
fi
wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Fev -O ./test_data/ev
wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr -O ./train_data/tr
......@@ -2,8 +2,14 @@ import paddle.fluid as fluid
import math
def ctr_xdeepfm_model(embedding_size, num_field, num_feat, layer_sizes_dnn, act,
reg, layer_sizes_cin):
def ctr_xdeepfm_model(embedding_size,
num_field,
num_feat,
layer_sizes_dnn,
act,
reg,
layer_sizes_cin,
is_sparse=False):
init_value_ = 0.1
initer = fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)
......@@ -15,27 +21,33 @@ def ctr_xdeepfm_model(embedding_size, num_field, num_feat, layer_sizes_dnn, act,
label = fluid.layers.data(
name='label', shape=[1], dtype='float32') # None * 1
feat_idx = fluid.layers.reshape(raw_feat_idx,
[-1, num_field, 1]) # None * num_field * 1
[-1, 1]) # (None * num_field) * 1
feat_value = fluid.layers.reshape(
raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
feat_embeddings = fluid.layers.embedding(
input=feat_idx,
is_sparse=is_sparse,
dtype='float32',
size=[num_feat + 1, embedding_size],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=initer)) # None * num_field * embedding_size
param_attr=fluid.ParamAttr(initializer=initer))
feat_embeddings = fluid.layers.reshape(
feat_embeddings,
[-1, num_field, embedding_size]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# -------------------- linear --------------------
weights_linear = fluid.layers.embedding(
input=feat_idx,
is_sparse=is_sparse,
dtype='float32',
size=[num_feat + 1, 1],
padding_idx=0,
param_attr=fluid.ParamAttr(initializer=initer)) # None * num_field * 1
param_attr=fluid.ParamAttr(initializer=initer))
weights_linear = fluid.layers.reshape(
weights_linear, [-1, num_field, 1]) # None * num_field * 1
b_linear = fluid.layers.create_parameter(
shape=[1],
dtype='float32',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册