未验证 提交 269b1767 编写于 作者: O overlordmax 提交者: GitHub

Test mmoe (#4521)

* add models

* add ESMM

* fix some bug
上级 42d56927
# ESMM
以下是本例的简要目录结构及说明:
```
├── README.md # 文档
├── net.py # ESMM网络结构
├── train.py # ESMM模型训练脚本
├── infer.py # ESMM模型预测脚本
├── reader.py # 数据预处理文件
├── utils.py # 通用函数
├── args.py # 参数脚本
├── get_data.sh # 生成训练数据脚本
├── dataset_generator.py # dataset生成脚本
├── gpu_train.sh # gpu训练shell脚本
├── cpu_train.sh # cpu训练shell脚本
├── gpu_infer.sh # gpu预测shell脚本
├── cpu_infer.sh # cpu预测shell脚本
```
## 简介
不同于CTR预估问题,CVR预估面临两个关键问题:
1. **Sample Selection Bias (SSB)** 转化是在点击之后才“有可能”发生的动作,传统CVR模型通常以点击数据为训练集,其中点击未转化为负例,点击并转化为正例。但是训练好的模型实际使用时,则是对整个空间的样本进行预估,而非只对点击样本进行预估。即是说,训练数据与实际要预测的数据来自不同分布,这个偏差对模型的泛化能力构成了很大挑战。
2. **Data Sparsity (DS)** 作为CVR训练数据的点击样本远小于CTR预估训练使用的曝光样本。
ESMM是发表在 SIGIR’2018 的论文[《Entire Space Multi-Task Model: An Effective Approach for Estimating Post-Click Conversion Rate》]( https://arxiv.org/abs/1804.07931 )文章基于 Multi-Task Learning 的思路,提出一种新的CVR预估模型——ESMM,有效解决了真实场景中CVR预估面临的数据稀疏以及样本选择偏差这两个关键问题
本项目再Paddlepaddle定义ESMM的网络结构,并在论文的公开数据集[Ali-CCP:Alibaba Click and Conversion Prediction]( https://tianchi.aliyun.com/datalab/dataSet.html?dataId=408 )验证模型的效果(目前只抽取部分数据验证模型的正确性)。
## 环境
PaddlePaddle 1.7.0
python3.7
## 数据下载及预处理
执行get_data.sh即可获得处理后的数据
```shell
./get_data.sh
```
## 单机训练
GPU环境
在gpu_train.sh脚本文件中设置好数据路径、参数。
```shell
CUDA_VISIBLE_DEVICES=0 python train.py --use_gpu True\ #是否使用gpu
--epochs 100\ #训练轮次
--batch_size 64\ #batch_size大小
--embed_size 12\ #每个featsigns的embedding维度
--cpu_num 2\ #cpu数量
--model_dir ./model_dir \ #模型保存路径
--train_data_path ./train_data \ #训练数据路径
--vocab_path ./vocab/vocab_size.txt #embedding词汇表大小路径
```
修改脚本的可执行权限并运行
```shell
./gpu_train.sh
```
CPU环境
在cpu_train.sh脚本文件中设置好数据路径、参数。
```shell
python train.py --use_gpu False\ #是否使用gpu
--epochs 100\ #训练轮次
--batch_size 64\ #batch_size大小
--embed_size 12\ #每个featsigns的embedding维度
--cpu_num 2\ #cpu数量
--model_dir ./model_dir \ #模型保存路径
--train_data_path ./train_data \ #训练数据路径
--vocab_path ./vocab/vocab_size.txt #embedding词汇表大小路径
```
修改脚本的可执行权限并运行
```
./cpu_train.sh
```
## 预测
GPU环境
在gpu_infer.sh脚本文件中设置好数据路径、参数。
```sh
python infer.py --use_gpu True\ #是否使用gpu
--batch_size 64\ #batch_size大小
--test_data_path ./test_data \ #训练数据路径
--vocab_path ./vocab/vocab_size.txt #embedding词汇表大小路径
```
修改脚本的可执行权限并运行
```shell
./gpu_infer.sh
```
CPU环境
在cpu_train.sh脚本文件中设置好数据路径、参数。
```shell
python infer.py --use_gpu False\ #是否使用gpu
--batch_size 64\ #batch_size大小
--cpu_num 2\ #cpu数量
--test_data_path ./test_data \ #训练数据路径
--vocab_path ./vocab/vocab_size.txt #embedding词汇表大小路径
```
## 模型效果
目前只抽取部分数据验证模型正确性。模型预测结果实例如下:
> auc_ctr auc_0.tmp_0 lod: {}
> dim: 1
> layout: NCHW
> dtype: double
> data: [0.971812]
>
> auc_ctcvr auc_1.tmp_0 lod: {}
> dim: 1
> layout: NCHW
> dtype: double
> data: [0.499668]
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import distutils.util
import sys
def parse_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--epochs", type=int, default=100, help="epochs")
parser.add_argument("--batch_size", type=int, default=64, help="batch_size")
parser.add_argument("--embed_size", type=int, default=12, help="embed_size")
parser.add_argument("--cpu_num", type=int, default=2, help="cpu_num")
parser.add_argument('--use_gpu', type=bool, default=False, help='whether using gpu')
parser.add_argument('--model_dir', type=str, default='./model_dir', help='whether using gpu')
parser.add_argument('--train_data_path', type=str, default='./train_data', help='train_data_path')
parser.add_argument('--test_data_path', type=str, default='./test_data', help='test_data_path')
parser.add_argument('--vocab_path', type=str, default='./vocab/vocab_size.txt', help='vocab_path')
parser.add_argument("--train_sample_size", type=int, default=sys.maxsize, help="train_sample_size")
parser.add_argument("--test_sample_size", type=int, default=sys.maxsize, help="test_sample_size")
args = parser.parse_args()
return args
python infer.py --use_gpu False\ #是否使用gpu
--batch_size 64\ #batch_size大小
--cpu_num 2\ #cpu数量
--test_data_path ./test_data \ #训练数据路径
--vocab_path ./vocab/vocab_size.txt #embedding词汇表大小路径
\ No newline at end of file
python train.py --use_gpu False\ #是否使用gpu
--epochs 100\ #训练轮次
--batch_size 64\ #batch_size大小
--embed_size 12\ #每个featsigns的embedding维度
--cpu_num 2\ #cpu数量
--model_dir ./model_dir \ #模型保存路径
--train_data_path ./train_data \ #训练数据路径
--vocab_path ./vocab/vocab_size.txt #embedding词汇表大小路径
\ No newline at end of file
import paddle.fluid.incubate.data_generator as dg
import numpy as np
import paddle.fluid as fluid
from collections import defaultdict
all_field_id = ['101', '109_14', '110_14', '127_14', '150_14', '121', '122', '124', '125', '126', '127', '128', '129',
'205', '206', '207', '210', '216', '508', '509', '702', '853', '301']
all_field_id_dict = defaultdict(int)
for i,field_id in enumerate(all_field_id):
all_field_id_dict[field_id] = [False,i]
class CriteoDataset(dg.MultiSlotStringDataGenerator):
def generate_sample(self, line):
def reader():
features = line.strip().split(',')
#ctr = list(map(int, features[1]))
#cvr = list(map(int, features[2]))
ctr = features[1]
cvr = features[2]
padding = '0'
output = [(field_id,[]) for field_id in all_field_id_dict]
for elem in features[4:]:
field_id,feat_id = elem.strip().split(':')
if field_id not in all_field_id_dict:
continue
all_field_id_dict[field_id][0] = True
index = all_field_id_dict[field_id][1]
#feat_id = list(map(int, feat_id))
output[index][1].append(feat_id)
for field_id in all_field_id_dict:
visited,index = all_field_id_dict[field_id]
if visited:
all_field_id_dict[field_id][0] = False
else:
output[index][1].append(padding)
output.append(('ctr',ctr))
output.append(('cvr',cvr))
yield output
return reader
d = CriteoDataset()
d.run_from_stdin()
\ No newline at end of file
mkdir train_data
mkdir test_data
mkdir vocab
mkdir data
train_source_path="./data/sample_train.tar.gz"
train_target_path="train_data"
test_source_path="./data/sample_test.tar.gz"
test_target_path="test_data"
cd data
echo "downloading sample_train.tar.gz......"
curl -# 'http://jupter-oss.oss-cn-hangzhou.aliyuncs.com/file/opensearch/documents/408/sample_train.tar.gz?Expires=1586435769&OSSAccessKeyId=LTAIGx40tjZWxj6q&Signature=ahUDqhvKT1cGjC4%2FIER2EWtq7o4%3D&response-content-disposition=attachment%3B%20' -H 'Proxy-Connection: keep-alive' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9' -H 'Accept-Language: zh-CN,zh;q=0.9' --compressed --insecure -o sample_train.tar.gz
cd ..
echo "unzipping sample_train.tar.gz......"
tar -xzvf ${train_source_path} -C ${train_target_path} && rm -rf ${train_source_path}
cd data
echo "downloading sample_test.tar.gz......"
curl -# 'http://jupter-oss.oss-cn-hangzhou.aliyuncs.com/file/opensearch/documents/408/sample_test.tar.gz?Expires=1586435821&OSSAccessKeyId=LTAIGx40tjZWxj6q&Signature=OwLMPjt1agByQtRVi8pazsAliNk%3D&response-content-disposition=attachment%3B%20' -H 'Proxy-Connection: keep-alive' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9' -H 'Accept-Language: zh-CN,zh;q=0.9' --compressed --insecure -o sample_test.tar.gz
cd ..
echo "unzipping sample_test.tar.gz......"
tar -xzvf ${test_source_path} -C ${test_target_path} && rm -rf ${test_source_path}
echo "preprocessing data......"
python reader.py --train_data_path ${train_target_path} \
--test_data_path ${test_target_path} \
--vocab_path vocab/vocab_size.txt \
--train_sample_size 6400 \
--test_sample_size 6400 \
python infer.py --use_gpu True\ #是否使用gpu
--batch_size 64\ #batch_size大小
--test_data_path ./test_data \ #训练数据路径
--vocab_path ./vocab/vocab_size.txt #embedding词汇表大小路径
\ No newline at end of file
CUDA_VISIBLE_DEVICES=0 python train.py --use_gpu True\ #是否使用gpu
--epochs 100\ #训练轮次
--batch_size 64\ #batch_size大小
--embed_size 12\ #每个featsigns的embedding维度
--cpu_num 2\ #cpu数量
--model_dir ./model_dir \ #模型保存路径
--train_data_path ./train_data \ #训练数据路径
--vocab_path ./vocab/vocab_size.txt #embedding词汇表大小路径
\ No newline at end of file
import os
import numpy as np
import paddle
import paddle.fluid as fluid
from net import ESMM
import args
import logging
import utils
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def set_zero(place):
auc_states_names = [
'auc_1.tmp_0', 'auc_0.tmp_0'
]
for name in auc_states_names:
param = fluid.global_scope().var(name).get_tensor()
if param:
param_array = np.zeros(param._get_dims()).astype("int64")
param.set(param_array, place)
def run_infer(args,model_path,test_data_path,vocab_size):
place = fluid.CPUPlace()
esmm_model = ESMM()
test_data_generator = utils.CriteoDataset()
test_reader = paddle.batch(test_data_generator.test(test_data_path),batch_size=args.batch_size)
startup_program = fluid.framework.Program()
test_program = fluid.framework.Program()
with fluid.framework.program_guard(test_program, startup_program):
with fluid.unique_name.guard():
inputs = esmm_model.input_data()
avg_cost,auc_ctr,auc_ctcvr= esmm_model.net(inputs, vocab_size, args.embed_size)
dataset, file_list = utils.get_dataset(inputs, test_data_path,args.batch_size,args.cpu_num)
exe = fluid.Executor(place)
#加载模型
fluid.load(fluid.default_main_program(),os.path.join(model_path, "checkpoint"), exe)
set_zero(place)
dataset.set_filelist(file_list)
exe.infer_from_dataset(program=test_program,
dataset=dataset,
fetch_list=[auc_ctr,auc_ctcvr],
fetch_info=["auc_ctr","auc_ctcvr"],
print_period=20,
debug=False)
if __name__ == "__main__":
args = args.parse_args()
model_list = []
for _, dir, _ in os.walk(args.model_dir):
for model in dir:
if "epoch" in model:
path = os.path.join(args.model_dir, model)
model_list.append(path)
vocab_size =utils.get_vocab_size(args.vocab_path)
for model in model_list:
logger.info("Test model {}".format(model))
run_infer(args, model,args.test_data_path)
\ No newline at end of file
import numpy as np
import os
import paddle.fluid as fluid
import paddle
import utils
import args
class ESMM(object):
def fc(self,tag, data, out_dim, active='prelu'):
init_stddev = 1.0
scales = 1.0 / np.sqrt(data.shape[1])
p_attr = fluid.param_attr.ParamAttr(name='%s_weight' % tag,
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=init_stddev * scales))
b_attr = fluid.ParamAttr(name='%s_bias' % tag, initializer=fluid.initializer.Constant(0.1))
out = fluid.layers.fc(input=data,
size=out_dim,
act=active,
param_attr=p_attr,
bias_attr =b_attr,
name=tag)
return out
def input_data(self):
sparse_input_ids = [
fluid.data(name="field_" + str(i), shape=[-1, 1], dtype="int64", lod_level=1) for i in range(0,23)
]
label_ctr = fluid.data(name="ctr", shape=[-1, 1], dtype="int64")
label_cvr = fluid.data(name="cvr", shape=[-1, 1], dtype="int64")
inputs = sparse_input_ids + [label_ctr] + [label_cvr]
return inputs
def net(self,inputs,vocab_size,embed_size):
emb = []
for data in inputs[0:-2]:
feat_emb = fluid.embedding(input=data,
size=[vocab_size, embed_size],
param_attr=fluid.ParamAttr(name='dis_emb',
learning_rate=5,
initializer=fluid.initializer.Xavier(fan_in=embed_size,fan_out=embed_size)
),
is_sparse=True)
#fluid.layers.Print(feat_emb, message="feat_emb")
field_emb = fluid.layers.sequence_pool(input=feat_emb,pool_type='sum')
emb.append(field_emb)
concat_emb = fluid.layers.concat(emb, axis=1)
# ctr
active = 'relu'
ctr_fc1 = self.fc('ctr_fc1', concat_emb, 200, active)
ctr_fc2 = self.fc('ctr_fc2', ctr_fc1, 80, active)
ctr_out = self.fc('ctr_out', ctr_fc2, 2, 'softmax')
# cvr
cvr_fc1 = self.fc('cvr_fc1', concat_emb, 200, active)
cvr_fc2 = self.fc('cvr_fc2', cvr_fc1, 80, active)
cvr_out = self.fc('cvr_out', cvr_fc2, 2,'softmax')
ctr_clk = inputs[-2]
ctcvr_buy = inputs[-1]
#ctr_label = fluid.layers.concat(input=[ctr_clk,1-ctr_clk],axis=1)
#ctcvr_label = fluid.layers.concat(input=[ctcvr_buy,1-ctcvr_buy],axis=1)
#ctr_label = fluid.layers.cast(x=ctr_label, dtype='float32')
#ctcvr_label = fluid.layers.cast(x=ctcvr_label, dtype='float32')
ctr_prop_one = fluid.layers.slice(ctr_out, axes=[1], starts=[1], ends=[2])
cvr_prop_one = fluid.layers.slice(cvr_out, axes=[1], starts=[1], ends=[2])
ctcvr_prop_one = fluid.layers.elementwise_mul(ctr_prop_one, cvr_prop_one)
ctcvr_prop = fluid.layers.concat(input=[1-ctcvr_prop_one,ctcvr_prop_one], axis = 1)
loss_ctr = paddle.fluid.layers.cross_entropy(input=ctr_out, label=ctr_clk)
loss_ctcvr = paddle.fluid.layers.cross_entropy(input=ctcvr_prop, label=ctcvr_buy)
cost = loss_ctr + loss_ctcvr
avg_cost = fluid.layers.mean(cost)
#fluid.layers.Print(ctr_clk, message="ctr_clk")
auc_ctr, batch_auc_ctr, auc_states_ctr = fluid.layers.auc(input=ctr_out, label=ctr_clk)
auc_ctcvr, batch_auc_ctcvr, auc_states_ctcvr = fluid.layers.auc(input=ctcvr_prop, label=ctcvr_buy)
return avg_cost,auc_ctr,auc_ctcvr
\ No newline at end of file
import numpy as np
import pandas as pd
from collections import defaultdict
import args
import os
def join_data(file1,file2,write_file,sample_size):
sample_list = []
common_logs = defaultdict(lambda: '')
file = open(write_file, 'w',encoding='utf-8')
print("begin push sample_list!")
with open(file1,'r') as f:
for i, line in enumerate(f):
try:
sample_list.append(line)
except:
continue
print("begin push common_logs!")
with open(file2,'r') as f:
for i, line in enumerate(f):
try:
common_feature_index,sample_str = line.strip().split('\t')
common_logs[common_feature_index] = sample_str
except:
continue
print("begin join data!")
for i, sample in enumerate(sample_list):
try:
common_feature_index,sample_str = sample.strip().split('\t')
common_str = common_logs.get(common_feature_index)
if common_str:
sample = "{0},{1}".format(sample_str, common_str)
else:
sample = "{0}".format(sample_str)
file.write(sample + "\n")
except:
continue
if(i == sample_size):
break
print("join data successfully!")
def read_data(file_name,write_file):
file = open(write_file, 'w',encoding='utf-8')
print("begin to write!")
with open(file_name,'r') as f:
for i, line in enumerate(f):
try:
line = line.strip().split(',')
feat_len = len(line)
feat_lists = []
#common_feature_index|feat_num|feat_list
if(feat_len == 3):
feat_strs = line[2]
for fstr in feat_strs.split('\x01'):
filed, feat_val = fstr.split('\x02')
feat, val = feat_val.split('\x03')
feat_lists.append('%s:%s' % (filed,feat))
common_feature = "{0}\t{1}".format(line[0], ','.join(feat_lists)) + "\n"
file.write(common_feature)
#sample_id|y|z|common_feature_index|feat_num|feat_list
elif(feat_len == 6):
# y=0 & z=1过滤
if(line[1] == '0' and line[2] == '1'):
continue
feat_strs = line[5]
for fstr in feat_strs.split('\x01'):
filed, feat_val = fstr.split('\x02')
feat, val = feat_val.split('\x03')
feat_lists.append('%s:%s' % (filed,feat))
sample = "{0}\t{1},{2},{3},{4}".format(line[3], line[0], line[1], line[2], ','.join(feat_lists)) + "\n"
file.write(sample)
except:
continue
file.close()
##重新编码
def recode(file_path,writh_file,vocab_path):
all_feat_id_dict = defaultdict(int)
file1 = open(writh_file[0], 'w',encoding='utf-8')
file2 = open(writh_file[1], 'w',encoding='utf-8')
vocab_file = open(vocab_path, 'w',encoding='utf-8')
id = 0
with open(file_path[0], "r", encoding='utf-8') as f:
for i, line in enumerate(f):
line = line.strip().split(',')
feat_lists = []
for elem in line[3:]:
field_id,feat_id = elem.strip().split(':')
if feat_id not in all_feat_id_dict:
id += 1
all_feat_id_dict[feat_id] = id
feat_lists.append('%s:%s' % (field_id,all_feat_id_dict[feat_id]))
sample = "{0},{1},{2},{3}".format(line[0], line[1], line[2], ','.join(feat_lists)) + "\n"
file1.write(sample)
with open(file_path[1], "r", encoding='utf-8') as f:
for i, line in enumerate(f):
line = line.strip().split(',')
feat_lists = []
for elem in line[3:]:
field_id,feat_id = elem.strip().split(':')
if feat_id not in all_feat_id_dict:
id += 1
all_feat_id_dict[feat_id] = id
feat_lists.append('%s:%s' % (field_id,all_feat_id_dict[feat_id]))
sample = "{0},{1},{2},{3}".format(line[0], line[1], line[2], ','.join(feat_lists)) + "\n"
file2.write(sample)
vocab_size =len(all_feat_id_dict)
vocab_file.write(str(vocab_size))
file1.close()
file2.close()
vocab_file.close()
if __name__ == "__main__":
args = args.parse_args()
read_data(args.train_data_path + '/sample_skeleton_train.csv',args.train_data_path + '/skeleton_train.csv')
print("write skeleton_train.csv successfully")
read_data(args.train_data_path + '/common_features_train.csv',args.train_data_path + '/features_train.csv')
print("write features_train.csv successfully")
skeleton_train_path = args.train_data_path + '/skeleton_train.csv'
features_train_path = args.train_data_path + '/features_train.csv'
write_file = args.train_data_path + '/train_data.csv'
join_data(skeleton_train_path,features_train_path,write_file,args.train_sample_size)
##删除产生的中间文件
os.system('rm -rf ' + skeleton_train_path)
os.system('rm -rf ' + features_train_path)
read_data(args.test_data_path + '/sample_skeleton_test.csv',args.test_data_path + '/skeleton_test.csv')
print("write skeleton_est.csv successfully")
read_data(args.test_data_path + '/common_features_test.csv',args.test_data_path + '/features_test.csv')
print("write features_test.csv successfully")
skeleton_test_path = args.test_data_path + '/skeleton_test.csv'
features_test_path = args.test_data_path + '/features_test.csv'
write_file = args.test_data_path + '/test_data.csv'
join_data(skeleton_test_path,features_test_path,write_file,args.test_sample_size)
##删除产生的中间文件
os.system('rm -rf ' + skeleton_test_path)
os.system('rm -rf ' + features_test_path)
file_path = [args.train_data_path + '/train_data.csv', args.test_data_path + '/test_data.csv']
write_file = [args.train_data_path + '/traindata.csv',args.test_data_path + '/testdata.csv']
recode(file_path,write_file,args.vocab_path)
##删除产生的中间文件
for file in file_path:
os.system('rm -rf ' + file_path)
import numpy as np
import os
import paddle.fluid as fluid
from net import ESMM
import paddle
import utils
import args
def train(args, vocab_size, train_data_path):
esmm_model = ESMM()
inputs = esmm_model.input_data()
dataset, file_list = utils.get_dataset(inputs, train_data_path,args.batch_size,args.cpu_num)
avg_cost,auc_ctr,auc_ctcvr= esmm_model.net(inputs, vocab_size, args.embed_size)
# 选择反向更新优化策略
optimizer = fluid.optimizer.Adam()
optimizer.minimize(avg_cost)
if args.use_gpu == True:
exe = fluid.Executor(fluid.CUDAPlace(0))
dataset.set_thread(1)
else:
exe = fluid.Executor(fluid.CPUPlace())
dataset.set_thread(args.cpu_num)
exe.run(fluid.default_startup_program())
for epoch in range(args.epochs):
dataset.set_filelist(file_list)
exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=[avg_cost,auc_ctr,auc_ctcvr],
fetch_info=['epoch %d batch loss' % (epoch), "auc_ctr","auc_ctcvr"],
print_period=20,
debug=False)
model_dir = os.path.join(args.model_dir,'epoch_' + str(epoch + 1), "checkpoint")
main_program = fluid.default_main_program()
fluid.io.save(main_program,model_dir)
if __name__ == "__main__":
args = args.parse_args()
vocab_size =utils.get_vocab_size(args.vocab_path)
train(args, vocab_size, args.train_data_path)
import numpy as np
import os
import paddle.fluid as fluid
import logging
from collections import defaultdict
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
all_field_id = ['101', '109_14', '110_14', '127_14', '150_14', '121', '122', '124', '125', '126', '127', '128', '129',
'205', '206', '207', '210', '216', '508', '509', '702', '853', '301']
all_field_id_dict = defaultdict(int)
for i,field_id in enumerate(all_field_id):
all_field_id_dict[field_id] = [False,i]
def get_dataset(inputs,files,batch_size,cpu_num):
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command("python dataset_generator.py")
dataset.set_batch_size(batch_size)
dataset.set_thread(int(cpu_num))
file_list = [
os.path.join(files, x) for x in os.listdir(files)
]
logger.info("file list: {}".format(file_list))
return dataset, file_list
def get_vocab_size(vocab_path):
with open(vocab_path, "r") as rf:
line = rf.readline()
return int(line.strip()) + 1
class CriteoDataset(object):
def _reader_creator(self, file):
def reader():
with open(file, 'r') as f:
for line in f:
features = line.strip().split(',')
ctr = features[1]
cvr = features[2]
padding = '0'
output = [(field_id,[]) for field_id in all_field_id_dict]
for elem in features[4:]:
field_id,feat_id = elem.strip().split(':')
if field_id not in all_field_id_dict:
continue
all_field_id_dict[field_id][0] = True
index = all_field_id_dict[field_id][1]
output[index][1].append(feat_id)
for field_id in all_field_id_dict:
visited,index = all_field_id_dict[field_id]
if visited:
all_field_id_dict[field_id][0] = False
else:
output[index][1].append(padding)
output.append(('ctr',ctr))
output.append(('cvr',cvr))
yield output
return reader
def train(self, file):
return self._reader_creator(file)
def test(self, file):
return self._reader_creator(file)
\ No newline at end of file
......@@ -37,26 +37,27 @@
## 数据下载及预处理
数据地址: [Census-income Data](https://archive.ics.uci.edu/ml/datasets/Census-Income+(KDD) )
数据地址: [Census-income Data](https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census.tar.gz )
数据解压后, 在create_data.sh脚本文件中添加文件的路径,并运行脚本。
```sh
mkdir data/data24913/train_data #新建训练数据目录
mkdir data/data24913/test_data #新建测试数据目录
mkdir data/data24913/validation_data #新建验证数据目录
mkdir train_data
mkdir test_data
mkdir data
train_path="data/census-income.data"
test_path="data/census-income.test"
train_data_path="train_data/"
test_data_path="test_data/"
train_path="data/data24913/census-income.data" #原始训练数据路径
test_path="data/data24913/census-income.test" #原始测试数据路径
train_data_path="data/data24913/train_data/" #处理后训练数据路径
test_data_path="data/data24913/test_data/" #处理后测试数据路径
validation_data_path="data/data24913/validation_data/" #处理后验证数据路径
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census.tar.gz
tar -zxvf data/census.tar.gz -C data/
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
--train_data_path ${train_data_path}\
--test_data_path ${test_data_path}\
--validation_data_path ${validation_data_path}
--test_data_path ${test_data_path}
```
## 环境
......@@ -72,7 +73,7 @@ GPU环境
在train_gpu.sh脚本文件中设置好数据路径、参数。
```sh
python train_mmoe.py --use_gpu True \ #使用gpu训练
CUDA_VISIBLE_DEVICES=0 python train_mmoe.py --use_gpu 1 \ #使用gpu训练
--train_path data/data24913/train_data/\ #训练数据路径
--test_path data/data24913/test_data/\ #测试数据路径
--feature_size 499\ #设置特征的维度
......@@ -95,7 +96,7 @@ CPU环境
在train_cpu.sh脚本文件中设置好数据路径、参数。
```sh
python train_mmoe.py --use_gpu False \ #使用cpu训练
python train_mmoe.py --use_gpu 0 \ #使用cpu训练
--train_path data/data24913/train_data/\ #训练数据路径
--test_path data/data24913/test_data/\ #测试数据路径
--feature_size 499\ #设置特征的维度
......
......@@ -29,7 +29,7 @@ def parse_args():
parser.add_argument("--gate_num", type=int, default=2, help="gate_num")
parser.add_argument("--epochs", type=int, default=400, help="epochs")
parser.add_argument("--batch_size", type=int, default=32, help="batch_size")
parser.add_argument('--use_gpu', type=bool, default=False, help='whether using gpu')
parser.add_argument('--use_gpu', type=int, default=0, help='whether using gpu')
parser.add_argument('--train_data_path',type=str, default='./data/data24913/train_data/', help="train_data_path")
parser.add_argument('--test_data_path',type=str, default='./data/data24913/test_data/', help="test_data_path")
args = parser.parse_args()
......
mkdir data/data24913/train_data
mkdir data/data24913/test_data
mkdir data/data24913/validation_data
mkdir train_data
mkdir test_data
mkdir data
train_path="data/census-income.data"
test_path="data/census-income.test"
train_data_path="train_data/"
test_data_path="test_data/"
train_path="data/data24913/census-income.data"
test_path="data/data24913/census-income.test"
train_data_path="data/data24913/train_data/"
test_data_path="data/data24913/test_data/"
validation_data_path="data/data24913/validation_data/"
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census.tar.gz
tar -zxvf data/census.tar.gz -C data/
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
--train_data_path ${train_data_path}\
--test_data_path ${test_data_path}\
--validation_data_path ${validation_data_path}
\ No newline at end of file
--test_data_path ${test_data_path}
......@@ -99,8 +99,6 @@ def data_preparation(train_path, test_path, train_data_path, test_data_path,
validation_data.shape, test_data.shape)
transformed_train.to_csv(train_data_path + 'train_data.csv', index=False)
test_data.to_csv(test_data_path + 'test_data.csv', index=False)
validation_data.to_csv(
validation_data_path + 'validation_data.csv', index=False)
args = data_preparation_args()
......
python train_mmoe.py --use_gpu False \ #使用cpu训练
--train_path data/data24913/train_data/\ #训练数据路径
--test_path data/data24913/test_data/\ #测试数据路径
--feature_size 499\ #设置特征的维度
--batch_size 32\ #设置batch_size大小
--expert_num 8\ #设置expert数量
--gate_num 2\ #设置gate数量
--expert_size 16\ #设置expert网络大小
--tower_size 8\ #设置tower网络大小
--epochs 400 #设置epoch轮次
\ No newline at end of file
python mmoe_train.py --use_gpu 0\
--train_data_path 'train_data'\
--test_data_path 'test_data'\
--feature_size 499\
--batch_size 32\
--expert_num 8\
--gate_num 2\
--expert_size 16\
--tower_size 8\
--epochs 100
python train_mmoe.py --use_gpu True \ #使用gpu训练
--train_path data/data24913/train_data/\ #训练数据路径
--test_path data/data24913/test_data/\ #测试数据路径
--feature_size 499\ #设置特征的维度
--batch_size 32\ #设置batch_size大小
--expert_num 8\ #设置expert数量
--gate_num 2\ #设置gate数量
--expert_size 16\ #设置expert网络大小
--tower_size 8\ #设置tower网络大小
--epochs 400 #设置epoch轮次
\ No newline at end of file
CUDA_VISIBLE_DEVICES=0 python mmoe_train.py --use_gpu 1\
--train_data_path 'train_data'\
--test_data_path 'test_data'\
--feature_size 499\
--batch_size 32\
--expert_num 8\
--gate_num 2\
--expert_size 16\
--tower_size 8\
--epochs 100
......@@ -42,21 +42,22 @@ share_bottom是多任务学习的基本框架,其特点是对于不同的任
数据解压后, 在create_data.sh脚本文件中添加文件的路径,并运行脚本。
```sh
mkdir data/data24913/train_data #新建训练数据目录
mkdir data/data24913/test_data #新建测试数据目录
mkdir data/data24913/validation_data #新建验证数据目录
mkdir train_data
mkdir test_data
mkdir data
train_path="data/census-income.data"
test_path="data/census-income.test"
train_data_path="train_data/"
test_data_path="test_data/"
train_path="data/data24913/census-income.data" #原始训练数据路径
test_path="data/data24913/census-income.test" #原始测试数据路径
train_data_path="data/data24913/train_data/" #处理后训练数据路径
test_data_path="data/data24913/test_data/" #处理后测试数据路径
validation_data_path="data/data24913/validation_data/" #处理后验证数据路径
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census.tar.gz
tar -zxvf data/census.tar.gz -C data/
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
--train_data_path ${train_data_path}\
--test_data_path ${test_data_path}\
--validation_data_path ${validation_data_path}
--test_data_path ${test_data_path}
```
## 环境
......@@ -72,7 +73,7 @@ GPU环境
在train_gpu.sh脚本文件中设置好数据路径、参数。
```sh
python share_bottom.py --use_gpu True\ #使用gpu训练
python share_bottom.py --use_gpu 1\ #使用gpu训练
--train_path data/data24913/train_data/\ #训练数据路径
--test_path data/data24913/test_data/\ #测试数据路径
--batch_size 32\ #设置batch_size大小
......@@ -94,7 +95,7 @@ CPU环境
在train_cpu.sh脚本文件中设置好数据路径、参数。
```sh
python share_bottom.py --use_gpu False\ #使用cpu训练
python share_bottom.py --use_gpu 0\ #使用cpu训练
--train_path data/data24913/train_data/\ #训练数据路径
--test_path data/data24913/test_data/\ #测试数据路径
--batch_size 32\ #设置batch_size大小
......
......@@ -28,16 +28,16 @@ def parse_args():
parser.add_argument("--tower_size", type=int, default=8, help="tower_size")
parser.add_argument("--epochs", type=int, default=400, help="epochs")
parser.add_argument("--batch_size", type=int, default=32, help="batch_size")
parser.add_argument('--use_gpu', type=bool, default=False, help='whether using gpu')
parser.add_argument('--use_gpu', type=int, default=0, help='whether using gpu')
parser.add_argument(
'--train_data_path',
type=str,
default='./data/data24913/train_data/',
default=' ',
help="train_data_path")
parser.add_argument(
'--test_data_path',
type=str,
default='./data/data24913/test_data/',
default=' ',
help="test_data_path")
args = parser.parse_args()
return args
......
mkdir data/data24913/train_data
mkdir data/data24913/test_data
mkdir data/data24913/validation_data
mkdir train_data
mkdir test_data
mkdir data
train_path="data/census-income.data"
test_path="data/census-income.test"
train_data_path="train_data/"
test_data_path="test_data/"
train_path="data/data24913/census-income.data"
test_path="data/data24913/census-income.test"
train_data_path="data/data24913/train_data/"
test_data_path="data/data24913/test_data/"
validation_data_path="data/data24913/validation_data/"
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census.tar.gz
tar -zxvf data/census.tar.gz -C data/
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
--train_data_path ${train_data_path}\
--test_data_path ${test_data_path}\
--validation_data_path ${validation_data_path}
\ No newline at end of file
--test_data_path ${test_data_path}
......@@ -99,8 +99,6 @@ def data_preparation(train_path, test_path, train_data_path, test_data_path,
validation_data.shape, test_data.shape)
transformed_train.to_csv(train_data_path + 'train_data.csv', index=False)
test_data.to_csv(test_data_path + 'test_data.csv', index=False)
validation_data.to_csv(
validation_data_path + 'validation_data.csv', index=False)
args = data_preparation_args()
......
python share_bottom.py --use_gpu False\ #使用cpu训练
--train_path data/data24913/train_data/\ #训练数据路径
--test_path data/data24913/test_data/\ #测试数据路径
--batch_size 32\ #设置batch_size大小
--feature_size 499\ #设置特征维度
--bottom_size 117\ #设置bottom网络大小
--tower_nums 2\ #设置tower数量
--tower_size 8\ #设置tower网络大小
--epochs 400 #设置epoch轮次
\ No newline at end of file
python share_bottom.py --use_gpu 0 \
--epochs 100 \
--train_data_path '../train_data' \
--test_data_path '../test_data' \
--batch_size 16 \
--feature_size 499 \
--bottom_size 117 \
--tower_nums 2 \
--tower_size 8
python share_bottom.py --use_gpu True\ #使用gpu训练
--train_path data/data24913/train_data/\ #训练数据路径
--test_path data/data24913/test_data/\ #测试数据路径
--batch_size 32\ #设置batch_size大小
--feature_size 499\ #设置特征维度
--bottom_size 117\ #设置bottom网络大小
--tower_nums 2\ #设置tower数量
--tower_size 8\ #设置tower网络大小
--epochs 400 #设置epoch轮次
\ No newline at end of file
python share_bottom.py --use_gpu 1 \
--epochs 100 \
--train_data_path '../train_data' \
--test_data_path '../test_data' \
--batch_size 16 \
--feature_size 499 \
--bottom_size 117 \
--tower_nums 2 \
--tower_size 8
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册