未验证 提交 c5353f2e 编写于 作者: T tangwei12 提交者: GitHub

Merge branch 'master' into modify_yaml

......@@ -96,9 +96,10 @@ cd paddlerec
修改dnn模型的[超参配置](./models/rank/dnn/config.yaml),例如将迭代训练轮数从10轮修改为5轮:
```yaml
train:
# epochs: 10
epochs: 5
runner:
- name: runner1
class: single_train
epochs: 5 # 10->5
```
在Linux环境下,可以使用`vim`等文本编辑工具修改yaml文件:
......@@ -126,9 +127,9 @@ python -m paddlerec.run -m ./models/rank/dnn/config.yaml
我们以dnn模型为例,在paddlerec代码目录下,修改dnn模型的`config.yaml`文件:
```yaml
train:
#engine: single
engine: local_cluster
runner:
- name: runner1
class: local_cluster_train # single_train -> local_cluster_train
```
然后启动paddlerec训练:
......@@ -142,9 +143,9 @@ python -m paddlerec.run -m ./models/rank/dnn/config.yaml
我们以dnn模型为例,在paddlerec代码目录下,首先修改dnn模型`config.yaml`文件:
```yaml
train:
#engine: single
engine: cluster
runner:
- name: runner1
class: cluster_train # single_train -> cluster_train
```
再添加分布式启动配置文件`backend.yaml`,具体配置规则在[分布式训练](doc/distributed_train.md)教程中介绍。最后启动paddlerec训练:
......@@ -177,7 +178,7 @@ python -m paddlerec.run -m ./models/rank/dnn/config.yaml -b backend.yaml
| 多任务 | [ESMM](models/multitask/esmm/model.py) | ✓ | ✓ | ✓ |
| 多任务 | [MMOE](models/multitask/mmoe/model.py) | ✓ | ✓ | ✓ |
| 多任务 | [ShareBottom](models/multitask/share-bottom/model.py) | ✓ | ✓ | ✓ |
| 重排序 | [Listwise](models/rerank/listwise/model.py) | ✓ | x | ✓ |
| 重排序 | [Listwise](models/rerank/listwise/model.py) | ✓ | x | ✓ |
......@@ -203,6 +204,13 @@ python -m paddlerec.run -m ./models/rank/dnn/config.yaml -b backend.yaml
### 关于PaddleRec性能
* [Benchmark](doc/benchmark.md)
### 开发者教程
* [PaddleRec设计文档](doc/design.md)
* [二次开发](doc/development.md)
### 关于PaddleRec性能
* [Benchmark](doc/benchmark.md)
### FAQ
* [常见问题FAQ](doc/faq.md)
......
# 二次开发
## 如何添加自定义模型
当您希望开发自定义模型时,需要继承模型的模板基类,并实现三个必要的方法`init_hyper_parameter`,`intput_data`,`net`
并按照以下规范添加代码。
### 基类的继承
继承`paddlerec.core.model`的ModelBase,命名为`Class Model`
```python
from paddlerec.core.model import ModelBase
class Model(ModelBase):
# 构造函数无需显式指定
# 若继承,务必调用基类的__init__方法
def __init__(self, config):
ModelBase.__init__(self, config)
# ModelBase的__init__方法会调用_init_hyper_parameter()
```
### 超参的初始化
继承并实现`_init_hyper_parameter`方法(必要),可以在该方法中,从`yaml`文件获取超参或进行自定义操作。如下面的示例:
所有的envs调用接口在_init_hyper_parameters()方法中实现,同时类成员也推荐在此做声明及初始化。
```python
def _init_hyper_parameters(self):
self.feature_size = envs.get_global_env(
"hyper_parameters.feature_size")
self.expert_num = envs.get_global_env("hyper_parameters.expert_num")
self.gate_num = envs.get_global_env("hyper_parameters.gate_num")
self.expert_size = envs.get_global_env("hyper_parameters.expert_size")
self.tower_size = envs.get_global_env("hyper_parameters.tower_size")
```
### 数据输入的定义
继承并实现`input_data`方法(非必要)
#### 直接使用基类的数据读取方法
`ModelBase`中的input_data默认实现为slot_reader,在`config.yaml`中分别配置`reader.sparse_slot``reader.dense_slot`选项实现`slog:feasign`模式的数据读取。
> Slot : Feasign 是什么?
>
> Slot直译是槽位,在Rec工程中,是指某一个宽泛的特征类别,比如用户ID、性别、年龄就是Slot,Feasign则是具体值,比如:12345,男,20岁。
>
> 在实践过程中,很多特征槽位不是单一属性,或无法量化并且离散稀疏的,比如某用户兴趣爱好有三个:游戏/足球/数码,且每个具体兴趣又有多个特征维度,则在兴趣爱好这个Slot兴趣槽位中,就会有多个Feasign值。
>
> PaddleRec在读取数据时,每个Slot ID对应的特征,支持稀疏,且支持变长,可以非常灵活的支持各种场景的推荐模型训练。
使用示例请参考`rank.dnn`模型。
#### 自定义数据输入
如果您不想使用`slot:feasign`模式,则需继承并实现`input_data`接口,接口定义:`def input_data(self, is_infer=False, **kwargs)`
使用示例如下:
```python
def input_data(self, is_infer=False, **kwargs):
ser_slot_names = fluid.data(
name='user_slot_names',
shape=[None, 1],
dtype='int64',
lod_level=1)
item_slot_names = fluid.data(
name='item_slot_names',
shape=[None, self.item_len],
dtype='int64',
lod_level=1)
lens = fluid.data(name='lens', shape=[None], dtype='int64')
labels = fluid.data(
name='labels',
shape=[None, self.item_len],
dtype='int64',
lod_level=1)
train_inputs = [user_slot_names] + [item_slot_names] + [lens] + [labels]
infer_inputs = [user_slot_names] + [item_slot_names] + [lens]
if is_infer:
return infer_inputs
else:
return train_inputs
```
更多数据读取教程,请参考[自定义数据集及Reader](custom_dataset_reader.md)
### 组网的定义
继承并实现`net`方法(必要)
- 接口定义`def net(self, inputs, is_infer=False)`
- 自定义网络需在该函数中使用paddle组网,实现前向逻辑,定义网络的Loss及Metrics,通过`is_infer`判断是否为infer网络。
- 我们强烈建议`train``infer`尽量复用相同代码,
- `net`中调用的其他函数以下划线为头进行命名,封装网络中的结构模块,如`_sparse_embedding_layer(self)`
- `inputs``def input_data()`的输出,若使用`slot_reader`方式,inputs为占位符,无实际意义,通过以下方法拿到dense及sparse的输入
```python
self.sparse_inputs = self._sparse_data_var[1:]
self.dense_input = self._dense_data_var[0]
self.label_input = self._sparse_data_var[0]
```
可以参考官方模型的示例学习net的构造方法。
## 如何运行自定义模型
记录`model.py`,`config.yaml`及数据读取`reader.py`的文件路径,建议置于同一文件夹下,如`/home/custom_model`下,更改`config.yaml`中的配置选项
1. 更改 workerspace为模型文件所在文件夹
```yaml
workspace: "/home/custom_model"
```
2. 更改数据地址及读取reader地址
```yaml
dataset:
- name: custom_model_train
- data_path: "{workspace}/data/train" # or "/home/custom_model/data/train"
- data_converter: "{workspace}/reader.py" # or "/home/custom_model/reader.py"
```
3. 更改执行器的路径配置
```yaml
mode: train_runner
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 10
save_checkpoint_interval: 2
save_inference_interval: 5
save_checkpoint_path: "{workspace}/increment" # or "/home/custom_model/increment"
save_inference_path: "{workspace}/inference" # or "/home/custom_model/inference"
print_interval: 10
phase:
- name: train
model: "{workspace}/model.py" # or "/home/custom_model/model"
dataset_name: custom_model_train
thread_num: 1
```
4. 使用paddlerec.run方法运行自定义模型
```shell
python -m paddlerec.run -m /home/custom_model/config.yaml
```
以上~请开始享受你的推荐算法高效开发流程。如有任何问题,欢迎在[issue](https://github.com/PaddlePaddle/PaddleRec/issues)提出,我们会第一时间跟进解决。
doc/imgs/overview.png

698.6 KB | W: | H:

doc/imgs/overview.png

217.7 KB | W: | H:

doc/imgs/overview.png
doc/imgs/overview.png
doc/imgs/overview.png
doc/imgs/overview.png
  • 2-up
  • Swipe
  • Onion skin
```
```yaml
# 全局配置
# Debug 模式开关,Debug模式下,会打印OP的耗时及IO占比
debug: false
workspace: "."
# 工作区目录
# 使用文件夹路径,则会在该目录下寻找超参配置,组网,数据等必须文件
workspace: "/home/demo_model/"
# 若 workspace: paddlerec.models.rank.dnn
# 则会使用官方默认配置与组网
# 用户可以配多个dataset,exector里不同阶段可以用不同的dataset
# 用户可以指定多个dataset(数据读取配置)
# 运行的不同阶段可以使用不同的dataset
dataset:
- name: sample_1
type: DataLoader #或者QueueDataset
# dataloader 示例
- name: dataset_1
type: DataLoader
batch_size: 5
data_path: "{workspace}/data/train"
# 用户自定义reader
# 指定自定义的reader.py所在路径
data_converter: "{workspace}/rsc15_reader.py"
- name: sample_2
type: QueueDataset #或者DataLoader
# QueueDataset 示例
- name: dataset_2
type: QueueDataset
batch_size: 5
data_path: "{workspace}/data/train"
# 用户可以配置sparse_slots和dense_slots,无需再定义data_converter
# 用户可以配置sparse_slots和dense_slots,无需再定义data_converter,使用默认reader
sparse_slots: "click ins_weight 6001 6002 6003 6005 6006 6007 6008 6009"
dense_slots: "readlist:9"
#示例一,用户自定义参数,用于组网配置
# 自定义超参数,主要涉及网络中的模型超参及优化器
hyper_parameters:
#优化器
optimizer
class: Adam
learning_rate: 0.001
strategy: "{workspace}/conf/config_fleet.py"
# 用户自定义配置
optimizer:
class: Adam # 直接配置Optimizer,目前支持sgd/Adam/AdaGrad
learning_rate: 0.001
strategy: "{workspace}/conf/config_fleet.py" # 使用大规模稀疏pslib模式的特有配置
# 模型超参
vocab_size: 1000
hid_size: 100
my_key1: 233
my_key2: 0.1
mode: runner1
# 通过全局参数mode指定当前运行的runner
mode: runner_1
# runner主要涉及模型的执行环境,如:单机/分布式,CPU/GPU,迭代轮次,模型加载与保存地址
runner:
- name: runner1 # 示例一,train
trainer_class: single_train
- name: runner_1 # 配置一个runner,进行单机的训练
class: single_train # 配置运行模式的选择,还可以选择:single_infer/local_cluster_train/cluster_train
epochs: 10
device: cpu
init_model_path: ""
......@@ -50,14 +59,16 @@ runner:
save_checkpoint_path: "xxxx"
save_inference_path: "xxxx"
- name: runner2 # 示例二,infer
trainer_class: single_train
- name: runner_2 # 配置一个runner,进行单机的预测
class: single_infer
epochs: 1
device: cpu
init_model_path: "afs:/xxx/xxx"
# 模型在训练时,可能存在多个阶段,每个阶段的组网与数据读取都可能不尽相同
# 每个runner都会完整的运行所有阶段
# phase指定运行时加载的模型及reader
phase:
- name: phase1
model: "{workspace}/model.py"
......
......@@ -12,40 +12,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.
evaluate:
reader:
batch_size: 1
class: "{workspace}/esmm_infer_reader.py"
test_data_path: "{workspace}/data/train"
train:
trainer:
# for cluster training
strategy: "async"
workspace: "paddlerec.models.multitask.esmm"
epochs: 3
workspace: "paddlerec.models.multitask.esmm"
device: cpu
dataset:
- name: dataset_train
batch_size: 1
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/esmm_reader.py"
- name: dataset_infer
batch_size: 1
type: QueueDataset
data_path: "{workspace}/data/test"
data_converter: "{workspace}/esmm_reader.py"
reader:
batch_size: 2
class: "{workspace}/esmm_reader.py"
train_data_path: "{workspace}/data/train"
hyper_parameters:
vocab_size: 10000
embed_size: 128
optimizer:
class: adam
learning_rate: 0.001
strategy: async
model:
models: "{workspace}/model.py"
hyper_parameters:
vocab_size: 10000
embed_size: 128
learning_rate: 0.001
optimizer: adam
#use infer_runner mode and modify 'phase' below if infer
mode: train_runner
#mode: infer_runner
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 3
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 10
- name: infer_runner
class: single_infer
init_model_path: "increment/0"
device: cpu
epochs: 3
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_train
thread_num: 1
#- name: infer
# model: "{workspace}/model.py"
# dataset_name: dataset_infer
# thread_num: 1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from collections import defaultdict
from paddlerec.core.reader import Reader
class EvaluateReader(Reader):
def init(self):
all_field_id = [
'101', '109_14', '110_14', '127_14', '150_14', '121', '122', '124',
'125', '126', '127', '128', '129', '205', '206', '207', '210',
'216', '508', '509', '702', '853', '301'
]
self.all_field_id_dict = defaultdict(int)
for i, field_id in enumerate(all_field_id):
self.all_field_id_dict[field_id] = [False, i]
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.strip().split(',')
ctr = int(features[1])
cvr = int(features[2])
padding = 0
output = [(field_id, []) for field_id in self.all_field_id_dict]
for elem in features[4:]:
field_id, feat_id = elem.strip().split(':')
if field_id not in self.all_field_id_dict:
continue
self.all_field_id_dict[field_id][0] = True
index = self.all_field_id_dict[field_id][1]
output[index][1].append(int(feat_id))
for field_id in self.all_field_id_dict:
visited, index = self.all_field_id_dict[field_id]
if visited:
self.all_field_id_dict[field_id][0] = False
else:
output[index][1].append(padding)
output.append(('ctr', [ctr]))
output.append(('cvr', [cvr]))
yield output
return reader
......@@ -40,8 +40,6 @@ class TrainReader(Reader):
This function needs to be implemented by the user, based on data format
"""
features = line.strip().split(',')
# ctr = list(map(int, features[1]))
# cvr = list(map(int, features[2]))
ctr = int(features[1])
cvr = int(features[2])
......@@ -54,7 +52,6 @@ class TrainReader(Reader):
continue
self.all_field_id_dict[field_id][0] = True
index = self.all_field_id_dict[field_id][1]
# feat_id = list(map(int, feat_id))
output[index][1].append(int(feat_id))
for field_id in self.all_field_id_dict:
......
......@@ -23,28 +23,11 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def fc(self, tag, data, out_dim, active='prelu'):
def _init_hyper_parameters(self):
self.vocab_size = envs.get_global_env("hyper_parameters.vocab_size")
self.embed_size = envs.get_global_env("hyper_parameters.embed_size")
init_stddev = 1.0
scales = 1.0 / np.sqrt(data.shape[1])
p_attr = fluid.param_attr.ParamAttr(
name='%s_weight' % tag,
initializer=fluid.initializer.NormalInitializer(
loc=0.0, scale=init_stddev * scales))
b_attr = fluid.ParamAttr(
name='%s_bias' % tag, initializer=fluid.initializer.Constant(0.1))
out = fluid.layers.fc(input=data,
size=out_dim,
act=active,
param_attr=p_attr,
bias_attr=b_attr,
name=tag)
return out
def input_data(self):
def input_data(self, is_infer=False, **kwargs):
sparse_input_ids = [
fluid.data(
name="field_" + str(i),
......@@ -55,26 +38,24 @@ class Model(ModelBase):
label_ctr = fluid.data(name="ctr", shape=[-1, 1], dtype="int64")
label_cvr = fluid.data(name="cvr", shape=[-1, 1], dtype="int64")
inputs = sparse_input_ids + [label_ctr] + [label_cvr]
self._data_var.extend(inputs)
return inputs
if is_infer:
return inputs
else:
return inputs
def net(self, inputs, is_infer=False):
vocab_size = envs.get_global_env("hyper_parameters.vocab_size", None,
self._namespace)
embed_size = envs.get_global_env("hyper_parameters.embed_size", None,
self._namespace)
emb = []
# input feature data
for data in inputs[0:-2]:
feat_emb = fluid.embedding(
input=data,
size=[vocab_size, embed_size],
size=[self.vocab_size, self.embed_size],
param_attr=fluid.ParamAttr(
name='dis_emb',
learning_rate=5,
initializer=fluid.initializer.Xavier(
fan_in=embed_size, fan_out=embed_size)),
fan_in=self.embed_size, fan_out=self.embed_size)),
is_sparse=True)
field_emb = fluid.layers.sequence_pool(
input=feat_emb, pool_type='sum')
......@@ -83,14 +64,14 @@ class Model(ModelBase):
# ctr
active = 'relu'
ctr_fc1 = self.fc('ctr_fc1', concat_emb, 200, active)
ctr_fc2 = self.fc('ctr_fc2', ctr_fc1, 80, active)
ctr_out = self.fc('ctr_out', ctr_fc2, 2, 'softmax')
ctr_fc1 = self._fc('ctr_fc1', concat_emb, 200, active)
ctr_fc2 = self._fc('ctr_fc2', ctr_fc1, 80, active)
ctr_out = self._fc('ctr_out', ctr_fc2, 2, 'softmax')
# cvr
cvr_fc1 = self.fc('cvr_fc1', concat_emb, 200, active)
cvr_fc2 = self.fc('cvr_fc2', cvr_fc1, 80, active)
cvr_out = self.fc('cvr_out', cvr_fc2, 2, 'softmax')
cvr_fc1 = self._fc('cvr_fc1', concat_emb, 200, active)
cvr_fc2 = self._fc('cvr_fc2', cvr_fc1, 80, active)
cvr_out = self._fc('cvr_out', cvr_fc2, 2, 'softmax')
ctr_clk = inputs[-2]
ctcvr_buy = inputs[-1]
......@@ -127,15 +108,23 @@ class Model(ModelBase):
self._metrics["AUC_ctcvr"] = auc_ctcvr
self._metrics["BATCH_AUC_ctcvr"] = batch_auc_ctcvr
def train_net(self):
input_data = self.input_data()
self.net(input_data)
def infer_net(self):
self._infer_data_var = self.input_data()
self._infer_data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._infer_data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
self.net(self._infer_data_var, is_infer=True)
def _fc(self, tag, data, out_dim, active='prelu'):
init_stddev = 1.0
scales = 1.0 / np.sqrt(data.shape[1])
p_attr = fluid.param_attr.ParamAttr(
name='%s_weight' % tag,
initializer=fluid.initializer.NormalInitializer(
loc=0.0, scale=init_stddev * scales))
b_attr = fluid.ParamAttr(
name='%s_bias' % tag, initializer=fluid.initializer.Constant(0.1))
out = fluid.layers.fc(input=data,
size=out_dim,
act=active,
param_attr=p_attr,
bias_attr=b_attr,
name=tag)
return out
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from paddlerec.core.reader import Reader
class EvaluateReader(Reader):
def init(self):
pass
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
l = line.strip().split(',')
l = list(map(float, l))
label_income = []
label_marital = []
data = l[2:]
if int(l[1]) == 0:
label_income = [1, 0]
elif int(l[1]) == 1:
label_income = [0, 1]
if int(l[0]) == 0:
label_marital = [1, 0]
elif int(l[0]) == 1:
label_marital = [0, 1]
feature_name = ["input", "label_income", "label_marital"]
yield zip(feature_name, [data] + [label_income] + [label_marital])
return reader
......@@ -12,43 +12,57 @@
# See the License for the specific language governing permissions and
# limitations under the License.
evaluate:
reader:
batch_size: 1
class: "{workspace}/census_infer_reader.py"
test_data_path: "{workspace}/data/train"
workspace: "paddlerec.models.multitask.mmoe"
train:
trainer:
# for cluster training
strategy: "async"
dataset:
- name: dataset_train
batch_size: 1
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/census_reader.py"
- name: dataset_infer
batch_size: 1
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/census_reader.py"
epochs: 3
workspace: "paddlerec.models.multitask.mmoe"
device: cpu
hyper_parameters:
feature_size: 499
expert_num: 8
gate_num: 2
expert_size: 16
tower_size: 8
optimizer:
class: adam
learning_rate: 0.001
strategy: async
reader:
batch_size: 1
class: "{workspace}/census_reader.py"
train_data_path: "{workspace}/data/train"
#use infer_runner mode and modify 'phase' below if infer
mode: train_runner
#mode: infer_runner
model:
models: "{workspace}/model.py"
hyper_parameters:
feature_size: 499
expert_num: 8
gate_num: 2
expert_size: 16
tower_size: 8
learning_rate: 0.001
optimizer: adam
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 3
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 10
- name: infer_runner
class: single_infer
init_model_path: "increment/0"
device: cpu
epochs: 3
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_train
thread_num: 1
#- name: infer
# model: "{workspace}/model.py"
# dataset_name: dataset_infer
# thread_num: 1
mkdir train_data
mkdir test_data
mkdir data
train_path="data/census-income.data"
test_path="data/census-income.test"
train_data_path="train_data/"
test_data_path="test_data/"
pip install -r requirements.txt
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census.tar.gz
tar -zxvf data/census.tar.gz -C data/
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
--train_data_path ${train_data_path}\
--test_data_path ${test_data_path}
......@@ -22,53 +22,51 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def MMOE(self, is_infer=False):
feature_size = envs.get_global_env("hyper_parameters.feature_size",
None, self._namespace)
expert_num = envs.get_global_env("hyper_parameters.expert_num", None,
self._namespace)
gate_num = envs.get_global_env("hyper_parameters.gate_num", None,
self._namespace)
expert_size = envs.get_global_env("hyper_parameters.expert_size", None,
self._namespace)
tower_size = envs.get_global_env("hyper_parameters.tower_size", None,
self._namespace)
input_data = fluid.data(
name="input", shape=[-1, feature_size], dtype="float32")
def _init_hyper_parameters(self):
self.feature_size = envs.get_global_env(
"hyper_parameters.feature_size")
self.expert_num = envs.get_global_env("hyper_parameters.expert_num")
self.gate_num = envs.get_global_env("hyper_parameters.gate_num")
self.expert_size = envs.get_global_env("hyper_parameters.expert_size")
self.tower_size = envs.get_global_env("hyper_parameters.tower_size")
def input_data(self, is_infer=False, **kwargs):
inputs = fluid.data(
name="input", shape=[-1, self.feature_size], dtype="float32")
label_income = fluid.data(
name="label_income", shape=[-1, 2], dtype="float32", lod_level=0)
label_marital = fluid.data(
name="label_marital", shape=[-1, 2], dtype="float32", lod_level=0)
if is_infer:
self._infer_data_var = [input_data, label_income, label_marital]
self._infer_data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._infer_data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
self._data_var.extend([input_data, label_income, label_marital])
return [inputs, label_income, label_marital]
else:
return [inputs, label_income, label_marital]
def net(self, inputs, is_infer=False):
input_data = inputs[0]
label_income = inputs[1]
label_marital = inputs[2]
# f_{i}(x) = activation(W_{i} * x + b), where activation is ReLU according to the paper
expert_outputs = []
for i in range(0, expert_num):
for i in range(0, self.expert_num):
expert_output = fluid.layers.fc(
input=input_data,
size=expert_size,
size=self.expert_size,
act='relu',
bias_attr=fluid.ParamAttr(learning_rate=1.0),
name='expert_' + str(i))
expert_outputs.append(expert_output)
expert_concat = fluid.layers.concat(expert_outputs, axis=1)
expert_concat = fluid.layers.reshape(expert_concat,
[-1, expert_num, expert_size])
expert_concat = fluid.layers.reshape(
expert_concat, [-1, self.expert_num, self.expert_size])
# g^{k}(x) = activation(W_{gk} * x + b), where activation is softmax according to the paper
output_layers = []
for i in range(0, gate_num):
for i in range(0, self.gate_num):
cur_gate = fluid.layers.fc(
input=input_data,
size=expert_num,
size=self.expert_num,
act='softmax',
bias_attr=fluid.ParamAttr(learning_rate=1.0),
name='gate_' + str(i))
......@@ -78,7 +76,7 @@ class Model(ModelBase):
cur_gate_expert = fluid.layers.reduce_sum(cur_gate_expert, dim=1)
# Build tower layer
cur_tower = fluid.layers.fc(input=cur_gate_expert,
size=tower_size,
size=self.tower_size,
act='relu',
name='task_layer_' + str(i))
out = fluid.layers.fc(input=cur_tower,
......@@ -127,8 +125,5 @@ class Model(ModelBase):
self._metrics["AUC_marital"] = auc_marital
self._metrics["BATCH_AUC_marital"] = batch_auc_2
def train_net(self):
self.MMOE()
def infer_net(self):
self.MMOE(is_infer=True)
pass
......@@ -9,7 +9,9 @@
* [整体介绍](#整体介绍)
* [多任务模型列表](#多任务模型列表)
* [使用教程](#使用教程)
* [训练&预测](#训练&预测)
* [数据处理](#数据处理)
* [训练](#训练)
* [预测](#预测)
* [效果对比](#效果对比)
* [模型效果列表](#模型效果列表)
......@@ -40,14 +42,49 @@
<img align="center" src="../../doc/imgs/mmoe.png">
<p>
## 使用教程
### 训练&预测
## 使用教程(快速开始)
```shell
python -m paddlerec.run -m paddlerec.models.multitask.mmoe # mmoe
python -m paddlerec.run -m paddlerec.models.multitask.share-bottom # share-bottom
python -m paddlerec.run -m paddlerec.models.multitask.esmm # esmm
```
## 使用教程(复现论文)
### 注意
为了方便使用者能够快速的跑通每一个模型,我们在每个模型下都提供了样例数据,并且调整了batch_size等超参以便在样例数据上更加友好的显示训练&测试日志。如果需要复现readme中的效果请按照如下表格调整batch_size等超参,并使用提供的脚本下载对应数据集以及数据预处理。
| 模型 | batch_size | thread_num | epoch_num |
| :------------------: | :--------------------: | :--------------------: | :--------------------: |
| Share-Bottom | 32 | 1 | 400 |
| MMoE | 32 | 1 | 400 |
| ESMM | 64 | 2 | 100 |
### 数据处理
参考每个模型目录数据下载&预处理脚本
```
sh run.sh
```
### 训练
```
cd modles/multitask/mmoe # 进入选定好的排序模型的目录 以MMoE为例
python -m paddlerec.run -m ./config.yaml # 自定义修改超参后,指定配置文件,使用自定义配置
```
### 预测
```
# 修改对应模型的config.yaml, workspace配置为当前目录的绝对路径
# 修改对应模型的config.yaml,mode配置infer_runner
# 示例: mode: train_runner -> mode: infer_runner
# infer_runner中 class配置为 class: single_infer
# 修改phase阶段为infer的配置,参照config注释
# 修改完config.yaml后 执行:
python -m paddlerec.run -m ./config.yaml # 以MMoE为例
```
## 效果对比
### 模型效果列表
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from paddlerec.core.reader import Reader
class EvaluateReader(Reader):
def init(self):
pass
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
l = line.strip().split(',')
l = list(map(float, l))
label_income = []
label_marital = []
data = l[2:]
if int(l[1]) == 0:
label_income = [1, 0]
elif int(l[1]) == 1:
label_income = [0, 1]
if int(l[0]) == 0:
label_marital = [1, 0]
elif int(l[0]) == 1:
label_marital = [0, 1]
feature_name = ["input", "label_income", "label_marital"]
yield zip(feature_name, [data] + [label_income] + [label_marital])
return reader
......@@ -12,42 +12,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
evaluate:
reader:
batch_size: 1
class: "{workspace}/census_infer_reader.py"
test_data_path: "{workspace}/data/train"
workspace: "paddlerec.models.multitask.share-bottom"
train:
trainer:
# for cluster training
strategy: "async"
dataset:
- name: dataset_train
batch_size: 1
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/census_reader.py"
- name: dataset_infer
batch_size: 1
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/census_reader.py"
epochs: 3
workspace: "paddlerec.models.multitask.share-bottom"
device: cpu
hyper_parameters:
feature_size: 499
bottom_size: 117
tower_nums: 2
tower_size: 8
optimizer:
class: adam
learning_rate: 0.001
strategy: async
reader:
batch_size: 2
class: "{workspace}/census_reader.py"
train_data_path: "{workspace}/data/train"
#use infer_runner mode and modify 'phase' below if infer
mode: train_runner
#mode: infer_runner
model:
models: "{workspace}/model.py"
hyper_parameters:
feature_size: 499
bottom_size: 117
tower_nums: 2
tower_size: 8
learning_rate: 0.001
optimizer: adam
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 3
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 5
- name: infer_runner
class: single_infer
init_model_path: "increment/0"
device: cpu
epochs: 3
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_train
thread_num: 1
#- name: infer
# model: "{workspace}/model.py"
# dataset_name: dataset_infer
# thread_num: 1
......@@ -22,46 +22,42 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def model(self, is_infer=False):
feature_size = envs.get_global_env("hyper_parameters.feature_size",
None, self._namespace)
bottom_size = envs.get_global_env("hyper_parameters.bottom_size", None,
self._namespace)
tower_size = envs.get_global_env("hyper_parameters.tower_size", None,
self._namespace)
tower_nums = envs.get_global_env("hyper_parameters.tower_nums", None,
self._namespace)
input_data = fluid.data(
name="input", shape=[-1, feature_size], dtype="float32")
def _init_hyper_parameters(self):
self.feature_size = envs.get_global_env(
"hyper_parameters.feature_size")
self.bottom_size = envs.get_global_env("hyper_parameters.bottom_size")
self.tower_size = envs.get_global_env("hyper_parameters.tower_size")
self.tower_nums = envs.get_global_env("hyper_parameters.tower_nums")
def input_data(self, is_infer=False, **kwargs):
inputs = fluid.data(
name="input", shape=[-1, self.feature_size], dtype="float32")
label_income = fluid.data(
name="label_income", shape=[-1, 2], dtype="float32", lod_level=0)
label_marital = fluid.data(
name="label_marital", shape=[-1, 2], dtype="float32", lod_level=0)
if is_infer:
self._infer_data_var = [input_data, label_income, label_marital]
self._infer_data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._infer_data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
return [inputs, label_income, label_marital]
else:
return [inputs, label_income, label_marital]
self._data_var.extend([input_data, label_income, label_marital])
def net(self, inputs, is_infer=False):
input_data = inputs[0]
label_income = inputs[1]
label_marital = inputs[2]
bottom_output = fluid.layers.fc(
input=input_data,
size=bottom_size,
size=self.bottom_size,
act='relu',
bias_attr=fluid.ParamAttr(learning_rate=1.0),
name='bottom_output')
# Build tower layer from bottom layer
output_layers = []
for index in range(tower_nums):
for index in range(self.tower_nums):
tower_layer = fluid.layers.fc(input=bottom_output,
size=tower_size,
size=self.tower_size,
act='relu',
name='task_layer_' + str(index))
output_layer = fluid.layers.fc(input=tower_layer,
......@@ -107,9 +103,3 @@ class Model(ModelBase):
self._metrics["BATCH_AUC_income"] = batch_auc_1
self._metrics["AUC_marital"] = auc_marital
self._metrics["BATCH_AUC_marital"] = batch_auc_2
def train_net(self):
self.model()
def infer_net(self):
self.model(is_infer=True)
......@@ -12,43 +12,66 @@
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "paddlerec.models.rank.dcn"
reader:
batch_size: 2
train_data_path: "{workspace}/data/sample_data/train"
feat_dict_name: "{workspace}/data/vocab"
# global settings
debug: false
workspace: "paddlerec.models.rank.dcn"
dataset:
- name: train_sample
type: QueueDataset
batch_size: 5
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "label C1 C2 C3 C4 C5 C6 C7 C8 C9 C10 C11 C12 C13 C14 C15 C16 C17 C18 C19 C20 C21 C22 C23 C24 C25 C26"
dense_slots: "I1:1 I2:1 I3:1 I4:1 I5:1 I6:1 I7:1 I8:1 I9:1 I10:1 I11:1 I12:1 I13:1"
- name: infer_sample
type: QueueDataset
batch_size: 5
data_path: "{workspace}/data/sample_data/infer"
sparse_slots: "label C1 C2 C3 C4 C5 C6 C7 C8 C9 C10 C11 C12 C13 C14 C15 C16 C17 C18 C19 C20 C21 C22 C23 C24 C25 C26"
dense_slots: "I1:1 I2:1 I3:1 I4:1 I5:1 I6:1 I7:1 I8:1 I9:1 I10:1 I11:1 I12:1 I13:1"
model:
models: "{workspace}/model.py"
hyper_parameters:
cross_num: 2
dnn_hidden_units: [128, 128]
l2_reg_cross: 0.00005
dnn_use_bn: False
clip_by_norm: 100.0
cat_feat_num: "{workspace}/data/sample_data/cat_feature_num.txt"
is_sparse: False
is_test: False
num_field: 39
learning_rate: 0.0001
act: "relu"
optimizer: adam
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
hyper_parameters:
optimizer:
class: Adam
learning_rate: 0.0001
# 用户自定义配置
cross_num: 2
dnn_hidden_units: [128, 128]
l2_reg_cross: 0.00005
dnn_use_bn: False
clip_by_norm: 100.0
cat_feat_num: "{workspace}/data/sample_data/cat_feature_num.txt"
is_sparse: False
mode: train_runner
# if infer, change mode to "infer_runner" and change phase to "infer_phase"
runner:
- name: train_runner
trainer_class: single_train
epochs: 1
device: cpu
init_model_path: ""
save_checkpoint_interval: 1
save_inference_interval: 1
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
print_interval: 1
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: train_sample
thread_num: 1
#- name: infer_phase
# model: "{workspace}/model.py"
# dataset_name: infer_sample
# thread_num: 1
label:0 I1:0.69314718056 I2:1.60943791243 I3:1.79175946923 I4:0.0 I5:7.23201033166 I6:1.60943791243 I7:2.77258872224 I8:1.09861228867 I9:5.20400668708 I10:0.69314718056 I11:1.09861228867 I12:0 I13:1.09861228867 C1:95 C2:398 C3:0 C4:0 C5:53 C6:1 C7:73 C8:71 C9:3 C10:1974 C11:832 C12:0 C13:875 C14:8 C15:1764 C16:0 C17:5 C18:390 C19:226 C20:1 C21:0 C22:0 C23:8 C24:1759 C25:1 C26:862
label:0 I1:1.09861228867 I2:1.38629436112 I3:3.80666248977 I4:0.69314718056 I5:4.63472898823 I6:2.19722457734 I7:1.09861228867 I8:1.09861228867 I9:1.60943791243 I10:0.69314718056 I11:0.69314718056 I12:0 I13:1.60943791243 C1:95 C2:200 C3:1184 C4:1929 C5:53 C6:4 C7:1477 C8:2 C9:3 C10:1283 C11:1567 C12:1048 C13:271 C14:6 C15:1551 C16:899 C17:1 C18:162 C19:226 C20:2 C21:575 C22:0 C23:8 C24:1615 C25:1 C26:659
label:0 I1:1.09861228867 I2:1.38629436112 I3:0.69314718056 I4:2.7080502011 I5:6.64378973315 I6:4.49980967033 I7:1.60943791243 I8:1.09861228867 I9:5.50533153593 I10:0.69314718056 I11:1.38629436112 I12:1.38629436112 I13:3.82864139649 C1:123 C2:378 C3:991 C4:197 C5:53 C6:1 C7:689 C8:2 C9:3 C10:245 C11:623 C12:1482 C13:887 C14:21 C15:106 C16:720 C17:3 C18:768 C19:0 C20:0 C21:1010 C22:1 C23:8 C24:720 C25:0 C26:0
label:0 I1:0 I2:6.79905586206 I3:0 I4:0 I5:8.38776764398 I6:0 I7:0.0 I8:0.0 I9:0.0 I10:0 I11:0.0 I12:0 I13:0 C1:95 C2:227 C3:0 C4:219 C5:53 C6:4 C7:3174 C8:2 C9:3 C10:569 C11:1963 C12:0 C13:1150 C14:21 C15:1656 C16:0 C17:6 C18:584 C19:0 C20:0 C21:0 C22:0 C23:8 C24:954 C25:0 C26:0
label:0 I1:1.38629436112 I2:1.09861228867 I3:0 I4:0.0 I5:1.09861228867 I6:0.0 I7:1.38629436112 I8:0.0 I9:0.0 I10:0.69314718056 I11:0.69314718056 I12:0 I13:0.0 C1:121 C2:147 C3:0 C4:1356 C5:53 C6:7 C7:2120 C8:2 C9:3 C10:703 C11:1678 C12:1210 C13:1455 C14:8 C15:538 C16:1276 C17:6 C18:346 C19:0 C20:0 C21:944 C22:0 C23:10 C24:355 C25:0 C26:0
label:0 I1:0 I2:1.09861228867 I3:0 I4:0 I5:9.45915167004 I6:0 I7:0.0 I8:0.0 I9:1.94591014906 I10:0 I11:0.0 I12:0 I13:0 C1:14 C2:75 C3:993 C4:480 C5:50 C6:6 C7:1188 C8:2 C9:3 C10:245 C11:1037 C12:1365 C13:1421 C14:21 C15:786 C16:5 C17:2 C18:555 C19:0 C20:0 C21:1408 C22:6 C23:7 C24:753 C25:0 C26:0
label:0 I1:0 I2:1.60943791243 I3:1.09861228867 I4:0 I5:8.06117135969 I6:0 I7:0.0 I8:0.69314718056 I9:1.09861228867 I10:0 I11:0.0 I12:0 I13:0 C1:139 C2:343 C3:553 C4:828 C5:50 C6:4 C7:0 C8:2 C9:3 C10:245 C11:2081 C12:260 C13:455 C14:21 C15:122 C16:1159 C17:2 C18:612 C19:0 C20:0 C21:1137 C22:0 C23:1 C24:1583 C25:0 C26:0
label:1 I1:0.69314718056 I2:2.07944154168 I3:1.09861228867 I4:0.0 I5:0.0 I6:0.0 I7:0.69314718056 I8:0.0 I9:0.0 I10:0.69314718056 I11:0.69314718056 I12:0 I13:0.0 C1:95 C2:227 C3:0 C4:1567 C5:21 C6:7 C7:2496 C8:71 C9:3 C10:1913 C11:2212 C12:0 C13:673 C14:21 C15:1656 C16:0 C17:5 C18:584 C19:0 C20:0 C21:0 C22:0 C23:10 C24:954 C25:0 C26:0
label:0 I1:0 I2:3.87120101091 I3:1.60943791243 I4:2.19722457734 I5:9.85277303799 I6:5.52146091786 I7:3.36729582999 I8:3.4657359028 I9:4.9558270576 I10:0 I11:0.69314718056 I12:0 I13:2.19722457734 C1:14 C2:14 C3:454 C4:197 C5:53 C6:1 C7:1386 C8:2 C9:3 C10:0 C11:1979 C12:205 C13:214 C14:6 C15:1837 C16:638 C17:5 C18:6 C19:0 C20:0 C21:70 C22:0 C23:10 C24:720 C25:0 C26:0
label:0 I1:0 I2:3.66356164613 I3:0 I4:0.69314718056 I5:10.4263800775 I6:3.09104245336 I7:0.69314718056 I8:1.09861228867 I9:1.38629436112 I10:0 I11:0.69314718056 I12:0 I13:0.69314718056 C1:14 C2:179 C3:120 C4:746 C5:53 C6:0 C7:1312 C8:2 C9:3 C10:1337 C11:1963 C12:905 C13:1150 C14:21 C15:1820 C16:328 C17:9 C18:77 C19:0 C20:0 C21:311 C22:0 C23:10 C24:89 C25:0 C26:0
......@@ -24,44 +24,21 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def init_network(self):
def _init_hyper_parameters(self):
self.cross_num = envs.get_global_env("hyper_parameters.cross_num",
None, self._namespace)
None)
self.dnn_hidden_units = envs.get_global_env(
"hyper_parameters.dnn_hidden_units", None, self._namespace)
"hyper_parameters.dnn_hidden_units", None)
self.l2_reg_cross = envs.get_global_env(
"hyper_parameters.l2_reg_cross", None, self._namespace)
"hyper_parameters.l2_reg_cross", None)
self.dnn_use_bn = envs.get_global_env("hyper_parameters.dnn_use_bn",
None, self._namespace)
None)
self.clip_by_norm = envs.get_global_env(
"hyper_parameters.clip_by_norm", None, self._namespace)
cat_feat_num = envs.get_global_env("hyper_parameters.cat_feat_num",
None, self._namespace)
self.sparse_inputs = self._sparse_data_var[1:]
self.dense_inputs = self._dense_data_var
self.target_input = self._sparse_data_var[0]
cat_feat_dims_dict = OrderedDict()
for line in open(cat_feat_num):
spls = line.strip().split()
assert len(spls) == 2
cat_feat_dims_dict[spls[0]] = int(spls[1])
self.cat_feat_dims_dict = cat_feat_dims_dict if cat_feat_dims_dict else OrderedDict(
)
"hyper_parameters.clip_by_norm", None)
self.cat_feat_num = envs.get_global_env(
"hyper_parameters.cat_feat_num", None)
self.is_sparse = envs.get_global_env("hyper_parameters.is_sparse",
None, self._namespace)
self.dense_feat_names = [i.name for i in self.dense_inputs]
self.sparse_feat_names = [i.name for i in self.sparse_inputs]
# {feat_name: dims}
self.feat_dims_dict = OrderedDict(
[(feat_name, 1) for feat_name in self.dense_feat_names])
self.feat_dims_dict.update(self.cat_feat_dims_dict)
self.net_input = None
self.loss = None
None)
def _create_embedding_input(self):
# sparse embedding
......@@ -121,9 +98,29 @@ class Model(ModelBase):
def _l2_loss(self, w):
return fluid.layers.reduce_sum(fluid.layers.square(w))
def train_net(self):
self._init_slots()
self.init_network()
def net(self, inputs, is_infer=False):
self.sparse_inputs = self._sparse_data_var[1:]
self.dense_inputs = self._dense_data_var
self.target_input = self._sparse_data_var[0]
cat_feat_dims_dict = OrderedDict()
for line in open(self.cat_feat_num):
spls = line.strip().split()
assert len(spls) == 2
cat_feat_dims_dict[spls[0]] = int(spls[1])
self.cat_feat_dims_dict = cat_feat_dims_dict if cat_feat_dims_dict else OrderedDict(
)
self.dense_feat_names = [i.name for i in self.dense_inputs]
self.sparse_feat_names = [i.name for i in self.sparse_inputs]
# {feat_name: dims}
self.feat_dims_dict = OrderedDict(
[(feat_name, 1) for feat_name in self.dense_feat_names])
self.feat_dims_dict.update(self.cat_feat_dims_dict)
self.net_input = None
self.loss = None
self.net_input = self._create_embedding_input()
......@@ -146,6 +143,9 @@ class Model(ModelBase):
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
if is_infer:
self._infer_results["AUC"] = auc_var
# logloss
logloss = fluid.layers.log_loss(
self.prob, fluid.layers.cast(
......@@ -157,11 +157,7 @@ class Model(ModelBase):
self.loss = self.avg_logloss + l2_reg_cross_loss
self._cost = self.loss
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate",
None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self):
self.train_net()
#def optimizer(self):
#
# optimizer = fluid.optimizer.Adam(self.learning_rate, lazy_mode=True)
# return optimizer
......@@ -12,39 +12,65 @@
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "paddlerec.models.rank.deepfm"
reader:
batch_size: 2
train_data_path: "{workspace}/data/sample_data/train"
feat_dict_name: "{workspace}/data/sample_data/feat_dict_10.pkl2"
# global settings
debug: false
workspace: "paddlerec.models.rank.deepfm"
dataset:
- name: train_sample
type: QueueDataset
batch_size: 5
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "label feat_idx"
dense_slots: "feat_value:39"
- name: infer_sample
type: QueueDataset
batch_size: 5
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "label feat_idx"
dense_slots: "feat_value:39"
model:
models: "{workspace}/model.py"
hyper_parameters:
sparse_feature_number: 1086460
sparse_feature_dim: 9
num_field: 39
fc_sizes: [400, 400, 400]
learning_rate: 0.0001
reg: 0.001
act: "relu"
optimizer: SGD
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
hyper_parameters:
optimizer:
class: SGD
learning_rate: 0.0001
sparse_feature_number: 1086460
sparse_feature_dim: 9
num_field: 39
fc_sizes: [400, 400, 400]
reg: 0.001
act: "relu"
mode: train_runner
# if infer, change mode to "infer_runner" and change phase to "infer_phase"
runner:
- name: train_runner
trainer_class: single_train
epochs: 2
device: cpu
init_model_path: ""
save_checkpoint_interval: 1
save_inference_interval: 1
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
print_interval: 1
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: train_sample
thread_num: 1
#- name: infer_phase
# model: "{workspace}/model.py"
# dataset_name: infer_sample
# thread_num: 1
......@@ -24,42 +24,46 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def deepfm_net(self):
def _init_hyper_parameters(self):
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim", None)
self.num_field = envs.get_global_env("hyper_parameters.num_field",
None)
self.reg = envs.get_global_env("hyper_parameters.reg", 1e-4)
self.layer_sizes = envs.get_global_env("hyper_parameters.fc_sizes",
None)
self.act = envs.get_global_env("hyper_parameters.act", None)
def net(self, inputs, is_infer=False):
init_value_ = 0.1
is_distributed = True if envs.get_trainer() == "CtrTrainer" else False
sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None, self._namespace)
sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim", None, self._namespace)
# ------------------------- network input --------------------------
num_field = envs.get_global_env("hyper_parameters.num_field", None,
self._namespace)
raw_feat_idx = self._sparse_data_var[1]
raw_feat_value = self._dense_data_var[0]
self.label = self._sparse_data_var[0]
feat_idx = raw_feat_idx
feat_value = fluid.layers.reshape(
raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
raw_feat_value, [-1, self.num_field, 1]) # None * num_field * 1
reg = envs.get_global_env("hyper_parameters.reg", 1e-4,
self._namespace)
first_weights_re = fluid.embedding(
input=feat_idx,
is_sparse=True,
is_distributed=is_distributed,
dtype='float32',
size=[sparse_feature_number + 1, 1],
size=[self.sparse_feature_number + 1, 1],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_),
regularizer=fluid.regularizer.L1DecayRegularizer(reg)))
regularizer=fluid.regularizer.L1DecayRegularizer(self.reg)))
first_weights = fluid.layers.reshape(
first_weights_re, shape=[-1, num_field, 1]) # None * num_field * 1
first_weights_re,
shape=[-1, self.num_field, 1]) # None * num_field * 1
y_first_order = fluid.layers.reduce_sum((first_weights * feat_value),
1)
......@@ -70,16 +74,17 @@ class Model(ModelBase):
is_sparse=True,
is_distributed=is_distributed,
dtype='float32',
size=[sparse_feature_number + 1, sparse_feature_dim],
size=[self.sparse_feature_number + 1, self.sparse_feature_dim],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0,
scale=init_value_ / math.sqrt(float(sparse_feature_dim)))))
scale=init_value_ /
math.sqrt(float(self.sparse_feature_dim)))))
feat_embeddings = fluid.layers.reshape(
feat_embeddings_re,
shape=[-1, num_field,
sparse_feature_dim]) # None * num_field * embedding_size
shape=[-1, self.num_field, self.sparse_feature_dim
]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# sum_square part
......@@ -101,17 +106,13 @@ class Model(ModelBase):
# ------------------------- DNN --------------------------
layer_sizes = envs.get_global_env("hyper_parameters.fc_sizes", None,
self._namespace)
act = envs.get_global_env("hyper_parameters.act", None,
self._namespace)
y_dnn = fluid.layers.reshape(feat_embeddings,
[-1, num_field * sparse_feature_dim])
for s in layer_sizes:
y_dnn = fluid.layers.reshape(
feat_embeddings, [-1, self.num_field * self.sparse_feature_dim])
for s in self.layer_sizes:
y_dnn = fluid.layers.fc(
input=y_dnn,
size=s,
act=act,
act=self.act,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_ / math.sqrt(float(10)))),
......@@ -133,21 +134,12 @@ class Model(ModelBase):
self.predict = fluid.layers.sigmoid(y_first_order + y_second_order +
y_dnn)
def train_net(self):
self._init_slots()
self.deepfm_net()
# ------------------------- Cost(logloss) --------------------------
cost = fluid.layers.log_loss(
input=self.predict, label=fluid.layers.cast(self.label, "float32"))
avg_cost = fluid.layers.reduce_sum(cost)
self._cost = avg_cost
# ------------------------- Metric(Auc) --------------------------
predict_2d = fluid.layers.concat([1 - self.predict, self.predict], 1)
label_int = fluid.layers.cast(self.label, 'int64')
auc_var, batch_auc_var, _ = fluid.layers.auc(input=predict_2d,
......@@ -155,12 +147,5 @@ class Model(ModelBase):
slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate",
None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self):
self.train_net()
if is_infer:
self._infer_results["AUC"] = auc_var
......@@ -12,40 +12,60 @@
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
# global settings
debug: false
workspace: "paddlerec.models.rank.din"
epochs: 10
workspace: "paddlerec.models.rank.din"
dataset:
- name: sample_1
type: DataLoader
batch_size: 5
data_path: "{workspace}/data/train_data"
data_converter: "{workspace}/reader.py"
- name: infer_sample
type: DataLoader
batch_size: 5
data_path: "{workspace}/data/train_data"
data_converter: "{workspace}/reader.py"
reader:
batch_size: 2
class: "{workspace}/reader.py"
train_data_path: "{workspace}/data/train_data"
dataset_class: "DataLoader"
hyper_parameters:
optimizer:
class: SGD
learning_rate: 0.0001
use_DataLoader: True
item_emb_size: 64
cat_emb_size: 64
is_sparse: False
item_count: 63001
cat_count: 801
model:
models: "{workspace}/model.py"
hyper_parameters:
use_DataLoader: True
item_emb_size: 64
cat_emb_size: 64
is_sparse: False
config_path: "data/config.txt"
fc_sizes: [400, 400, 400]
learning_rate: 0.0001
reg: 0.001
act: "sigmoid"
optimizer: SGD
act: "sigmoid"
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
epochs: 1
device: cpu
init_model_path: ""
save_checkpoint_interval: 1
save_inference_interval: 1
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: sample_1
thread_num: 1
#- name: infer_phase
# model: "{workspace}/model.py"
# dataset_name: infer_sample
# thread_num: 1
......@@ -22,12 +22,58 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def config_read(self, config_path):
with open(config_path, "r") as fin:
user_count = int(fin.readline().strip())
item_count = int(fin.readline().strip())
cat_count = int(fin.readline().strip())
return user_count, item_count, cat_count
def _init_hyper_parameters(self):
self.item_emb_size = envs.get_global_env(
"hyper_parameters.item_emb_size", 64)
self.cat_emb_size = envs.get_global_env(
"hyper_parameters.cat_emb_size", 64)
self.act = envs.get_global_env("hyper_parameters.act", "sigmoid")
self.is_sparse = envs.get_global_env("hyper_parameters.is_sparse",
False)
#significant for speeding up the training process
self.use_DataLoader = envs.get_global_env(
"hyper_parameters.use_DataLoader", False)
self.item_count = envs.get_global_env("hyper_parameters.item_count",
63001)
self.cat_count = envs.get_global_env("hyper_parameters.cat_count", 801)
def input_data(self, is_infer=False, **kwargs):
seq_len = -1
self.data_var = []
hist_item_seq = fluid.data(
name="hist_item_seq", shape=[None, seq_len], dtype="int64")
self.data_var.append(hist_item_seq)
hist_cat_seq = fluid.data(
name="hist_cat_seq", shape=[None, seq_len], dtype="int64")
self.data_var.append(hist_cat_seq)
target_item = fluid.data(
name="target_item", shape=[None], dtype="int64")
self.data_var.append(target_item)
target_cat = fluid.data(name="target_cat", shape=[None], dtype="int64")
self.data_var.append(target_cat)
label = fluid.data(name="label", shape=[None, 1], dtype="float32")
self.data_var.append(label)
mask = fluid.data(
name="mask", shape=[None, seq_len, 1], dtype="float32")
self.data_var.append(mask)
target_item_seq = fluid.data(
name="target_item_seq", shape=[None, seq_len], dtype="int64")
self.data_var.append(target_item_seq)
target_cat_seq = fluid.data(
name="target_cat_seq", shape=[None, seq_len], dtype="int64")
self.data_var.append(target_cat_seq)
train_inputs = [hist_item_seq] + [hist_cat_seq] + [target_item] + [
target_cat
] + [label] + [mask] + [target_item_seq] + [target_cat_seq]
return train_inputs
def din_attention(self, hist, target_expand, mask):
"""activation weight"""
......@@ -59,104 +105,58 @@ class Model(ModelBase):
out = fluid.layers.reshape(x=out, shape=[0, hidden_size])
return out
def train_net(self):
seq_len = -1
self.item_emb_size = envs.get_global_env(
"hyper_parameters.item_emb_size", 64, self._namespace)
self.cat_emb_size = envs.get_global_env(
"hyper_parameters.cat_emb_size", 64, self._namespace)
self.act = envs.get_global_env("hyper_parameters.act", "sigmoid",
self._namespace)
#item_emb_size = 64
#cat_emb_size = 64
self.is_sparse = envs.get_global_env("hyper_parameters.is_sparse",
False, self._namespace)
#significant for speeding up the training process
self.config_path = envs.get_global_env(
"hyper_parameters.config_path", "data/config.txt", self._namespace)
self.use_DataLoader = envs.get_global_env(
"hyper_parameters.use_DataLoader", False, self._namespace)
user_count, item_count, cat_count = self.config_read(self.config_path)
def net(self, inputs, is_infer=False):
hist_item_seq = inputs[0]
hist_cat_seq = inputs[1]
target_item = inputs[2]
target_cat = inputs[3]
label = inputs[4]
mask = inputs[5]
target_item_seq = inputs[6]
target_cat_seq = inputs[7]
item_emb_attr = fluid.ParamAttr(name="item_emb")
cat_emb_attr = fluid.ParamAttr(name="cat_emb")
hist_item_seq = fluid.data(
name="hist_item_seq", shape=[None, seq_len], dtype="int64")
self._data_var.append(hist_item_seq)
hist_cat_seq = fluid.data(
name="hist_cat_seq", shape=[None, seq_len], dtype="int64")
self._data_var.append(hist_cat_seq)
target_item = fluid.data(
name="target_item", shape=[None], dtype="int64")
self._data_var.append(target_item)
target_cat = fluid.data(name="target_cat", shape=[None], dtype="int64")
self._data_var.append(target_cat)
label = fluid.data(name="label", shape=[None, 1], dtype="float32")
self._data_var.append(label)
mask = fluid.data(
name="mask", shape=[None, seq_len, 1], dtype="float32")
self._data_var.append(mask)
target_item_seq = fluid.data(
name="target_item_seq", shape=[None, seq_len], dtype="int64")
self._data_var.append(target_item_seq)
target_cat_seq = fluid.data(
name="target_cat_seq", shape=[None, seq_len], dtype="int64")
self._data_var.append(target_cat_seq)
if self.use_DataLoader:
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var,
capacity=10000,
use_double_buffer=False,
iterable=False)
hist_item_emb = fluid.embedding(
input=hist_item_seq,
size=[item_count, self.item_emb_size],
size=[self.item_count, self.item_emb_size],
param_attr=item_emb_attr,
is_sparse=self.is_sparse)
hist_cat_emb = fluid.embedding(
input=hist_cat_seq,
size=[cat_count, self.cat_emb_size],
size=[self.cat_count, self.cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=self.is_sparse)
target_item_emb = fluid.embedding(
input=target_item,
size=[item_count, self.item_emb_size],
size=[self.item_count, self.item_emb_size],
param_attr=item_emb_attr,
is_sparse=self.is_sparse)
target_cat_emb = fluid.embedding(
input=target_cat,
size=[cat_count, self.cat_emb_size],
size=[self.cat_count, self.cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=self.is_sparse)
target_item_seq_emb = fluid.embedding(
input=target_item_seq,
size=[item_count, self.item_emb_size],
size=[self.item_count, self.item_emb_size],
param_attr=item_emb_attr,
is_sparse=self.is_sparse)
target_cat_seq_emb = fluid.embedding(
input=target_cat_seq,
size=[cat_count, self.cat_emb_size],
size=[self.cat_count, self.cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=self.is_sparse)
item_b = fluid.embedding(
input=target_item,
size=[item_count, 1],
size=[self.item_count, 1],
param_attr=fluid.initializer.Constant(value=0.0))
hist_seq_concat = fluid.layers.concat(
......@@ -195,12 +195,5 @@ class Model(ModelBase):
slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate",
None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self, parameter_list):
self.deepfm_net()
if is_infer:
self._infer_results["AUC"] = auc_var
......@@ -29,8 +29,8 @@ from paddlerec.core.utils import envs
class TrainReader(Reader):
def init(self):
self.train_data_path = envs.get_global_env("train_data_path", None,
"train.reader")
self.train_data_path = envs.get_global_env(
"dataset.sample_1.data_path", None)
self.res = []
self.max_len = 0
......@@ -46,7 +46,8 @@ class TrainReader(Reader):
fo = open("tmp.txt", "w")
fo.write(str(self.max_len))
fo.close()
self.batch_size = envs.get_global_env("batch_size", 32, "train.reader")
self.batch_size = envs.get_global_env("dataset.sample_1.batch_size",
32, "train.reader")
self.group_size = self.batch_size * 20
def _process_line(self, line):
......
......@@ -56,7 +56,18 @@
<img align="center" src="../../doc/imgs/din.png">
<p>
## 使用教程
## 使用教程(快速开始)
使用样例数据快速开始,参考[训练](###训练) & [预测](###预测)
## 使用教程(复现论文)
为了方便使用者能够快速的跑通每一个模型,我们在每个模型下都提供了样例数据,并且调整了batch_size等超参以便在样例数据上更加友好的显示训练&测试日志。如果需要复现readme中的效果请按照如下表格调整batch_size等超参,并使用提供的脚本下载对应数据集以及数据预处理。
| 模型 | batch_size | thread_num | epoch_num |
| :------------------: | :--------------------: | :--------------------: | :--------------------: |
| DNN | 1000 | 10 | 1 |
| DCN | 512 | 20 | 2 |
| DeepFM | 100 | 10 | 30 |
| DIN | 32 | 10 | 100 |
| Wide&Deep | 40 | 1 | 40 |
| xDeepFM | 100 | 1 | 10 |
### 数据处理
参考每个模型目录数据下载&预处理脚本
......@@ -68,11 +79,21 @@ sh run.sh
### 训练
```
python -m paddlerec.run -m paddlerec.models.rank.dnn # 以DNN为例
cd modles/rank/dnn # 进入选定好的排序模型的目录 以DNN为例
python -m paddlerec.run -m paddlerec.models.rank.dnn # 使用内置配置
# 如果需要使用自定义配置,config.yaml中workspace需要使用改模型目录的绝对路径
# 自定义修改超参后,指定配置文件,使用自定义配置
python -m paddlerec.run -m ./config.yaml
```
### 预测
```
python -m paddlerec.run -m paddlerec.models.rank.dnn # 以DNN为例
# 修改对应模型的config.yaml,mode配置infer_runner
# 示例: mode: runner1 -> mode: infer_runner
# infer_runner中 class配置为 class: single_infer
# 如果训练阶段和预测阶段的模型输入一致,phase不需要改动,复用train的即可
# 修改完config.yaml后 执行:
python -m paddlerec.run -m ./config.yaml # 以DNN为例
```
## 效果对比
......@@ -87,6 +108,7 @@ python -m paddlerec.run -m paddlerec.models.rank.dnn # 以DNN为例
| Census-income Data | Wide&Deep | 0.76195 | 0.90577 | -- | -- |
| Amazon Product | DIN | 0.47005 | 0.86379 | -- | -- |
## 分布式
### 模型训练性能 (样本/s)
| 数据集 | 模型 | 单机 | 同步 (4节点) | 同步 (8节点) | 同步 (16节点) | 同步 (32节点) |
......
......@@ -12,37 +12,59 @@
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
# global settings
debug: false
workspace: "paddlerec.models.rank.wide_deep"
epochs: 10
workspace: "paddlerec.models.rank.wide_deep"
reader:
batch_size: 2
train_data_path: "{workspace}/data/sample_data/train"
dataset:
- name: sample_1
type: QueueDataset
batch_size: 5
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "label"
dense_slots: "wide_input:8 deep_input:58"
- name: infer_sample
type: QueueDataset
batch_size: 5
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "label"
dense_slots: "wide_input:8 deep_input:58"
hyper_parameters:
optimizer:
class: SGD
learning_rate: 0.0001
hidden1_units: 75
hidden2_units: 50
hidden3_units: 25
mode: train_runner
# if infer, change mode to "infer_runner" and change phase to "infer_phase"
runner:
- name: train_runner
trainer_class: single_train
epochs: 1
device: cpu
init_model_path: ""
save_checkpoint_interval: 1
save_inference_interval: 1
save_checkpoint_path: "increment"
save_inference_path: "inference"
- name: infer_runner
trainer_class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
model:
models: "{workspace}/model.py"
hyper_parameters:
hidden1_units: 75
hidden2_units: 50
hidden3_units: 25
learning_rate: 0.0001
reg: 0.001
act: "relu"
optimizer: SGD
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: sample_1
thread_num: 1
#- name: infer_phase
# model: "{workspace}/model.py"
# dataset_name: infer_sample
# thread_num: 1
......@@ -24,6 +24,14 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.hidden1_units = envs.get_global_env(
"hyper_parameters.hidden1_units", 75)
self.hidden2_units = envs.get_global_env(
"hyper_parameters.hidden2_units", 50)
self.hidden3_units = envs.get_global_env(
"hyper_parameters.hidden3_units", 25)
def wide_part(self, data):
out = fluid.layers.fc(
input=data,
......@@ -56,21 +64,14 @@ class Model(ModelBase):
return l3
def train_net(self):
self._init_slots()
def net(self, inputs, is_infer=False):
wide_input = self._dense_data_var[0]
deep_input = self._dense_data_var[1]
label = self._sparse_data_var[0]
hidden1_units = envs.get_global_env("hyper_parameters.hidden1_units",
75, self._namespace)
hidden2_units = envs.get_global_env("hyper_parameters.hidden2_units",
50, self._namespace)
hidden3_units = envs.get_global_env("hyper_parameters.hidden3_units",
25, self._namespace)
wide_output = self.wide_part(wide_input)
deep_output = self.deep_part(deep_input, hidden1_units, hidden2_units,
hidden3_units)
deep_output = self.deep_part(deep_input, self.hidden1_units,
self.hidden2_units, self.hidden3_units)
wide_model = fluid.layers.fc(
input=wide_output,
......@@ -109,18 +110,12 @@ class Model(ModelBase):
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc
self._metrics["ACC"] = acc
if is_infer:
self._infer_results["AUC"] = auc_var
self._infer_results["ACC"] = acc
cost = fluid.layers.sigmoid_cross_entropy_with_logits(
x=prediction, label=fluid.layers.cast(
label, dtype='float32'))
avg_cost = fluid.layers.mean(cost)
self._cost = avg_cost
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate",
None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self):
self.train_net()
......@@ -11,41 +11,61 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
debug: false
workspace: "paddlerec.models.rank.xdeepfm"
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "paddlerec.models.rank.xdeepfm"
reader:
batch_size: 2
train_data_path: "{workspace}/data/sample_data/train"
dataset:
- name: sample_1
type: QueueDataset #或者DataLoader
batch_size: 5
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "label feat_idx"
dense_slots: "feat_value:39"
- name: infer_sample
type: QueueDataset #或者DataLoader
batch_size: 5
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "label feat_idx"
dense_slots: "feat_value:39"
model:
models: "{workspace}/model.py"
hyper_parameters:
layer_sizes_dnn: [10, 10, 10]
layer_sizes_cin: [10, 10]
sparse_feature_number: 1086460
sparse_feature_dim: 9
num_field: 39
fc_sizes: [400, 400, 400]
learning_rate: 0.0001
reg: 0.0001
act: "relu"
optimizer: SGD
hyper_parameters:
optimizer:
class: SGD
learning_rate: 0.0001
layer_sizes_dnn: [10, 10, 10]
layer_sizes_cin: [10, 10]
sparse_feature_number: 1086460
sparse_feature_dim: 9
num_field: 39
fc_sizes: [400, 400, 400]
act: "relu"
mode: train_runner
# if infer, change mode to "infer_runner" and change phase to "infer_phase"
runner:
- name: train_runner
trainer_class: single_train
epochs: 1
device: cpu
init_model_path: ""
save_checkpoint_interval: 1
save_inference_interval: 1
save_checkpoint_path: "increment"
save_inference_path: "inference"
- name: infer_runner
trainer_class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: sample_1
thread_num: 1
#- name: infer_phase
# model: "{workspace}/model.py"
# dataset_name: infer_sample
# thread_num: 1
......@@ -22,38 +22,45 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def xdeepfm_net(self):
def _init_hyper_parameters(self):
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim", None)
self.num_field = envs.get_global_env("hyper_parameters.num_field",
None)
self.layer_sizes_cin = envs.get_global_env(
"hyper_parameters.layer_sizes_cin", None)
self.layer_sizes_dnn = envs.get_global_env(
"hyper_parameters.layer_sizes_dnn", None)
self.act = envs.get_global_env("hyper_parameters.act", None)
def net(self, inputs, is_infer=False):
raw_feat_idx = self._sparse_data_var[1]
raw_feat_value = self._dense_data_var[0]
self.label = self._sparse_data_var[0]
init_value_ = 0.1
initer = fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)
is_distributed = True if envs.get_trainer() == "CtrTrainer" else False
sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None, self._namespace)
sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim", None, self._namespace)
# ------------------------- network input --------------------------
num_field = envs.get_global_env("hyper_parameters.num_field", None,
self._namespace)
raw_feat_idx = self._sparse_data_var[1]
raw_feat_value = self._dense_data_var[0]
self.label = self._sparse_data_var[0]
feat_idx = raw_feat_idx
feat_value = fluid.layers.reshape(
raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
raw_feat_value, [-1, self.num_field, 1]) # None * num_field * 1
feat_embeddings = fluid.embedding(
input=feat_idx,
is_sparse=True,
dtype='float32',
size=[sparse_feature_number + 1, sparse_feature_dim],
size=[self.sparse_feature_number + 1, self.sparse_feature_dim],
padding_idx=0,
param_attr=fluid.ParamAttr(initializer=initer))
feat_embeddings = fluid.layers.reshape(feat_embeddings, [
-1, num_field, sparse_feature_dim
-1, self.num_field, self.sparse_feature_dim
]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
......@@ -63,11 +70,11 @@ class Model(ModelBase):
input=feat_idx,
is_sparse=True,
dtype='float32',
size=[sparse_feature_number + 1, 1],
size=[self.sparse_feature_number + 1, 1],
padding_idx=0,
param_attr=fluid.ParamAttr(initializer=initer))
weights_linear = fluid.layers.reshape(
weights_linear, [-1, num_field, 1]) # None * num_field * 1
weights_linear, [-1, self.num_field, 1]) # None * num_field * 1
b_linear = fluid.layers.create_parameter(
shape=[1],
dtype='float32',
......@@ -77,31 +84,30 @@ class Model(ModelBase):
# -------------------- CIN --------------------
layer_sizes_cin = envs.get_global_env(
"hyper_parameters.layer_sizes_cin", None, self._namespace)
Xs = [feat_embeddings]
last_s = num_field
for s in layer_sizes_cin:
last_s = self.num_field
for s in self.layer_sizes_cin:
# calculate Z^(k+1) with X^k and X^0
X_0 = fluid.layers.reshape(
fluid.layers.transpose(Xs[0], [0, 2, 1]),
[-1, sparse_feature_dim, num_field,
[-1, self.sparse_feature_dim, self.num_field,
1]) # None, embedding_size, num_field, 1
X_k = fluid.layers.reshape(
fluid.layers.transpose(Xs[-1], [0, 2, 1]),
[-1, sparse_feature_dim, 1,
[-1, self.sparse_feature_dim, 1,
last_s]) # None, embedding_size, 1, last_s
Z_k_1 = fluid.layers.matmul(
X_0, X_k) # None, embedding_size, num_field, last_s
# compresses Z^(k+1) to X^(k+1)
Z_k_1 = fluid.layers.reshape(Z_k_1, [
-1, sparse_feature_dim, last_s * num_field
-1, self.sparse_feature_dim, last_s * self.num_field
]) # None, embedding_size, last_s*num_field
Z_k_1 = fluid.layers.transpose(
Z_k_1, [0, 2, 1]) # None, s*num_field, embedding_size
Z_k_1 = fluid.layers.reshape(
Z_k_1, [-1, last_s * num_field, 1, sparse_feature_dim]
Z_k_1,
[-1, last_s * self.num_field, 1, self.sparse_feature_dim]
) # None, last_s*num_field, 1, embedding_size (None, channal_in, h, w)
X_k_1 = fluid.layers.conv2d(
Z_k_1,
......@@ -112,7 +118,8 @@ class Model(ModelBase):
param_attr=fluid.ParamAttr(
initializer=initer)) # None, s, 1, embedding_size
X_k_1 = fluid.layers.reshape(
X_k_1, [-1, s, sparse_feature_dim]) # None, s, embedding_size
X_k_1,
[-1, s, self.sparse_feature_dim]) # None, s, embedding_size
Xs.append(X_k_1)
last_s = s
......@@ -130,17 +137,13 @@ class Model(ModelBase):
# -------------------- DNN --------------------
layer_sizes_dnn = envs.get_global_env(
"hyper_parameters.layer_sizes_dnn", None, self._namespace)
act = envs.get_global_env("hyper_parameters.act", None,
self._namespace)
y_dnn = fluid.layers.reshape(feat_embeddings,
[-1, num_field * sparse_feature_dim])
for s in layer_sizes_dnn:
y_dnn = fluid.layers.reshape(
feat_embeddings, [-1, self.num_field * self.sparse_feature_dim])
for s in self.layer_sizes_dnn:
y_dnn = fluid.layers.fc(
input=y_dnn,
size=s,
act=act,
act=self.act,
param_attr=fluid.ParamAttr(initializer=initer),
bias_attr=None)
y_dnn = fluid.layers.fc(input=y_dnn,
......@@ -152,11 +155,6 @@ class Model(ModelBase):
# ------------------- xDeepFM ------------------
self.predict = fluid.layers.sigmoid(y_linear + y_cin + y_dnn)
def train_net(self):
self._init_slots()
self.xdeepfm_net()
cost = fluid.layers.log_loss(
input=self.predict,
label=fluid.layers.cast(self.label, "float32"),
......@@ -172,12 +170,5 @@ class Model(ModelBase):
slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate",
None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self):
self.train_net()
if is_infer:
self._infer_results["AUC"] = auc_var
......@@ -12,47 +12,59 @@
# See the License for the specific language governing permissions and
# limitations under the License.
evaluate:
reader:
batch_size: 1
class: "{workspace}/rsc15_infer_reader.py"
test_data_path: "{workspace}/data/train"
is_return_numpy: False
workspace: "paddlerec.models.recall.gru4rec"
dataset:
- name: dataset_train
batch_size: 5
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/rsc15_reader.py"
- name: dataset_infer
batch_size: 5
type: QueueDataset
data_path: "{workspace}/data/test"
data_converter: "{workspace}/rsc15_reader.py"
train:
trainer:
# for cluster training
strategy: "async"
hyper_parameters:
vocab_size: 1000
hid_size: 100
emb_lr_x: 10.0
gru_lr_x: 1.0
fc_lr_x: 1.0
init_low_bound: -0.04
init_high_bound: 0.04
optimizer:
class: adagrad
learning_rate: 0.01
strategy: async
#use infer_runner mode and modify 'phase' below if infer
mode: train_runner
#mode: infer_runner
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 3
workspace: "paddlerec.models.recall.gru4rec"
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 10
- name: infer_runner
class: single_infer
init_model_path: "increment/0"
device: cpu
epochs: 3
reader:
batch_size: 5
class: "{workspace}/rsc15_reader.py"
train_data_path: "{workspace}/data/train"
model:
models: "{workspace}/model.py"
hyper_parameters:
vocab_size: 1000
hid_size: 100
emb_lr_x: 10.0
gru_lr_x: 1.0
fc_lr_x: 1.0
init_low_bound: -0.04
init_high_bound: 0.04
learning_rate: 0.01
optimizer: adagrad
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_train
thread_num: 1
#- name: infer
# model: "{workspace}/model.py"
# dataset_name: dataset_infer
# thread_num: 1
......@@ -22,84 +22,72 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def all_vocab_network(self, is_infer=False):
""" network definition """
recall_k = envs.get_global_env("hyper_parameters.recall_k", None,
self._namespace)
vocab_size = envs.get_global_env("hyper_parameters.vocab_size", None,
self._namespace)
hid_size = envs.get_global_env("hyper_parameters.hid_size", None,
self._namespace)
init_low_bound = envs.get_global_env("hyper_parameters.init_low_bound",
None, self._namespace)
init_high_bound = envs.get_global_env(
"hyper_parameters.init_high_bound", None, self._namespace)
emb_lr_x = envs.get_global_env("hyper_parameters.emb_lr_x", None,
self._namespace)
gru_lr_x = envs.get_global_env("hyper_parameters.gru_lr_x", None,
self._namespace)
fc_lr_x = envs.get_global_env("hyper_parameters.fc_lr_x", None,
self._namespace)
def _init_hyper_parameters(self):
self.recall_k = envs.get_global_env("hyper_parameters.recall_k")
self.vocab_size = envs.get_global_env("hyper_parameters.vocab_size")
self.hid_size = envs.get_global_env("hyper_parameters.hid_size")
self.init_low_bound = envs.get_global_env(
"hyper_parameters.init_low_bound")
self.init_high_bound = envs.get_global_env(
"hyper_parameters.init_high_bound")
self.emb_lr_x = envs.get_global_env("hyper_parameters.emb_lr_x")
self.gru_lr_x = envs.get_global_env("hyper_parameters.gru_lr_x")
self.fc_lr_x = envs.get_global_env("hyper_parameters.fc_lr_x")
def input_data(self, is_infer=False, **kwargs):
# Input data
src_wordseq = fluid.data(
name="src_wordseq", shape=[None, 1], dtype="int64", lod_level=1)
dst_wordseq = fluid.data(
name="dst_wordseq", shape=[None, 1], dtype="int64", lod_level=1)
if is_infer:
self._infer_data_var = [src_wordseq, dst_wordseq]
self._infer_data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._infer_data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
return [src_wordseq, dst_wordseq]
def net(self, inputs, is_infer=False):
src_wordseq = inputs[0]
dst_wordseq = inputs[1]
emb = fluid.embedding(
input=src_wordseq,
size=[vocab_size, hid_size],
size=[self.vocab_size, self.hid_size],
param_attr=fluid.ParamAttr(
name="emb",
initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound),
learning_rate=emb_lr_x),
low=self.init_low_bound, high=self.init_high_bound),
learning_rate=self.emb_lr_x),
is_sparse=True)
fc0 = fluid.layers.fc(input=emb,
size=hid_size * 3,
size=self.hid_size * 3,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=init_low_bound,
high=init_high_bound),
learning_rate=gru_lr_x))
low=self.init_low_bound,
high=self.init_high_bound),
learning_rate=self.gru_lr_x))
gru_h0 = fluid.layers.dynamic_gru(
input=fc0,
size=hid_size,
size=self.hid_size,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound),
learning_rate=gru_lr_x))
low=self.init_low_bound, high=self.init_high_bound),
learning_rate=self.gru_lr_x))
fc = fluid.layers.fc(input=gru_h0,
size=vocab_size,
size=self.vocab_size,
act='softmax',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound),
learning_rate=fc_lr_x))
low=self.init_low_bound,
high=self.init_high_bound),
learning_rate=self.fc_lr_x))
cost = fluid.layers.cross_entropy(input=fc, label=dst_wordseq)
acc = fluid.layers.accuracy(input=fc, label=dst_wordseq, k=recall_k)
acc = fluid.layers.accuracy(
input=fc, label=dst_wordseq, k=self.recall_k)
if is_infer:
self._infer_results['recall20'] = acc
return
avg_cost = fluid.layers.mean(x=cost)
self._data_var.append(src_wordseq)
self._data_var.append(dst_wordseq)
self._cost = avg_cost
self._metrics["cost"] = avg_cost
self._metrics["acc"] = acc
def train_net(self):
self.all_vocab_network()
def infer_net(self):
self.all_vocab_network(is_infer=True)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from paddlerec.core.reader import Reader
class EvaluateReader(Reader):
def init(self):
pass
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
l = line.strip().split()
l = [w for w in l]
src_seq = l[:len(l) - 1]
src_seq = [int(e) for e in src_seq]
trg_seq = l[1:]
trg_seq = [int(e) for e in trg_seq]
feature_name = ["src_wordseq", "dst_wordseq"]
yield zip(feature_name, [src_seq] + [trg_seq])
return reader
......@@ -12,42 +12,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
evaluate:
reader:
batch_size: 1
class: "{workspace}/movielens_infer_reader.py"
test_data_path: "{workspace}/data/test"
workspace: "paddlerec.models.recall.ncf"
train:
trainer:
# for cluster training
strategy: "async"
dataset:
- name: dataset_train
batch_size: 5
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/movielens_reader.py"
- name: dataset_infer
batch_size: 5
type: QueueDataset
data_path: "{workspace}/data/test"
data_converter: "{workspace}/movielens_infer_reader.py"
epochs: 3
workspace: "paddlerec.models.recall.ncf"
device: cpu
hyper_parameters:
num_users: 6040
num_items: 3706
latent_dim: 8
fc_layers: [64, 32, 16, 8]
optimizer:
class: adam
learning_rate: 0.001
strategy: async
reader:
batch_size: 2
class: "{workspace}/movielens_reader.py"
train_data_path: "{workspace}/data/train"
#use infer_runner mode and modify 'phase' below if infer
mode: train_runner
#mode: infer_runner
model:
models: "{workspace}/model.py"
hyper_parameters:
num_users: 6040
num_items: 3706
latent_dim: 8
layers: [64, 32, 16, 8]
learning_rate: 0.001
optimizer: adam
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 3
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 10
- name: infer_runner
class: single_infer
init_model_path: "increment/0"
device: cpu
epochs: 3
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_train
thread_num: 1
#- name: infer
# model: "{workspace}/model.py"
# dataset_name: dataset_infer
# thread_num: 1
......@@ -24,7 +24,13 @@ class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def input_data(self, is_infer=False):
def _init_hyper_parameters(self):
self.num_users = envs.get_global_env("hyper_parameters.num_users")
self.num_items = envs.get_global_env("hyper_parameters.num_items")
self.latent_dim = envs.get_global_env("hyper_parameters.latent_dim")
self.layers = envs.get_global_env("hyper_parameters.fc_layers")
def input_data(self, is_infer=False, **kwargs):
user_input = fluid.data(
name="user_input", shape=[-1, 1], dtype="int64", lod_level=0)
item_input = fluid.data(
......@@ -35,45 +41,35 @@ class Model(ModelBase):
inputs = [user_input] + [item_input]
else:
inputs = [user_input] + [item_input] + [label]
self._data_var = inputs
return inputs
def net(self, inputs, is_infer=False):
num_users = envs.get_global_env("hyper_parameters.num_users", None,
self._namespace)
num_items = envs.get_global_env("hyper_parameters.num_items", None,
self._namespace)
latent_dim = envs.get_global_env("hyper_parameters.latent_dim", None,
self._namespace)
layers = envs.get_global_env("hyper_parameters.layers", None,
self._namespace)
num_layer = len(layers) #Number of layers in the MLP
num_layer = len(self.layers) #Number of layers in the MLP
MF_Embedding_User = fluid.embedding(
input=inputs[0],
size=[num_users, latent_dim],
size=[self.num_users, self.latent_dim],
param_attr=fluid.initializer.Normal(
loc=0.0, scale=0.01),
is_sparse=True)
MF_Embedding_Item = fluid.embedding(
input=inputs[1],
size=[num_items, latent_dim],
size=[self.num_items, self.latent_dim],
param_attr=fluid.initializer.Normal(
loc=0.0, scale=0.01),
is_sparse=True)
MLP_Embedding_User = fluid.embedding(
input=inputs[0],
size=[num_users, int(layers[0] / 2)],
size=[self.num_users, int(self.layers[0] / 2)],
param_attr=fluid.initializer.Normal(
loc=0.0, scale=0.01),
is_sparse=True)
MLP_Embedding_Item = fluid.embedding(
input=inputs[1],
size=[num_items, int(layers[0] / 2)],
size=[self.num_items, int(self.layers[0] / 2)],
param_attr=fluid.initializer.Normal(
loc=0.0, scale=0.01),
is_sparse=True)
......@@ -94,7 +90,7 @@ class Model(ModelBase):
for i in range(1, num_layer):
mlp_vector = fluid.layers.fc(
input=mlp_vector,
size=layers[i],
size=self.layers[i],
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormal(
......@@ -126,16 +122,3 @@ class Model(ModelBase):
self._cost = avg_cost
self._metrics["cost"] = avg_cost
def train_net(self):
input_data = self.input_data()
self.net(input_data)
def infer_net(self):
self._infer_data_var = self.input_data(is_infer=True)
self._infer_data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._infer_data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
self.net(self._infer_data_var, is_infer=True)
......@@ -19,7 +19,7 @@ from collections import defaultdict
import numpy as np
class EvaluateReader(Reader):
class TrainReader(Reader):
def init(self):
pass
......
......@@ -12,43 +12,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.
workspace: "paddlerec.models.recall.ssr"
evaluate:
reader:
batch_size: 1
class: "{workspace}/ssr_infer_reader.py"
test_data_path: "{workspace}/data/train"
is_return_numpy: True
dataset:
- name: dataset_train
batch_size: 5
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/ssr_reader.py"
- name: dataset_infer
batch_size: 5
type: QueueDataset
data_path: "{workspace}/data/test"
data_converter: "{workspace}/ssr_infer_reader.py"
train:
trainer:
# for cluster training
strategy: "async"
hyper_parameters:
vocab_size: 1000
emb_dim: 128
hidden_size: 100
optimizer:
class: adagrad
learning_rate: 0.01
strategy: async
#use infer_runner mode and modify 'phase' below if infer
mode: train_runner
#mode: infer_runner
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 3
workspace: "paddlerec.models.recall.ssr"
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 10
- name: infer_runner
class: single_infer
init_model_path: "increment/0"
device: cpu
epochs: 3
reader:
batch_size: 5
class: "{workspace}/ssr_reader.py"
train_data_path: "{workspace}/data/train"
model:
models: "{workspace}/model.py"
hyper_parameters:
vocab_size: 1000
emb_dim: 128
hidden_size: 100
learning_rate: 0.01
optimizer: adagrad
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_train
thread_num: 1
#- name: infer
# model: "{workspace}/model.py"
# dataset_name: dataset_infer
# thread_num: 1
......@@ -20,85 +20,45 @@ from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
class BowEncoder(object):
""" bow-encoder """
def __init__(self):
self.param_name = ""
def forward(self, emb):
return fluid.layers.sequence_pool(input=emb, pool_type='sum')
class GrnnEncoder(object):
""" grnn-encoder """
def __init__(self, param_name="grnn", hidden_size=128):
self.param_name = param_name
self.hidden_size = hidden_size
def forward(self, emb):
fc0 = fluid.layers.fc(input=emb,
size=self.hidden_size * 3,
param_attr=self.param_name + "_fc.w",
bias_attr=False)
gru_h = fluid.layers.dynamic_gru(
input=fc0,
size=self.hidden_size,
is_reverse=False,
param_attr=self.param_name + ".param",
bias_attr=self.param_name + ".bias")
return fluid.layers.sequence_pool(input=gru_h, pool_type='max')
class PairwiseHingeLoss(object):
def __init__(self, margin=0.8):
self.margin = margin
def forward(self, pos, neg):
loss_part1 = fluid.layers.elementwise_sub(
tensor.fill_constant_batch_size_like(
input=pos, shape=[-1, 1], value=self.margin, dtype='float32'),
pos)
loss_part2 = fluid.layers.elementwise_add(loss_part1, neg)
loss_part3 = fluid.layers.elementwise_max(
tensor.fill_constant_batch_size_like(
input=loss_part2, shape=[-1, 1], value=0.0, dtype='float32'),
loss_part2)
return loss_part3
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def get_correct(self, x, y):
less = tensor.cast(cf.less_than(x, y), dtype='float32')
correct = fluid.layers.reduce_sum(less)
return correct
def train(self):
vocab_size = envs.get_global_env("hyper_parameters.vocab_size", None,
self._namespace)
emb_dim = envs.get_global_env("hyper_parameters.emb_dim", None,
self._namespace)
hidden_size = envs.get_global_env("hyper_parameters.hidden_size", None,
self._namespace)
emb_shape = [vocab_size, emb_dim]
def _init_hyper_parameters(self):
self.vocab_size = envs.get_global_env("hyper_parameters.vocab_size")
self.emb_dim = envs.get_global_env("hyper_parameters.emb_dim")
self.hidden_size = envs.get_global_env("hyper_parameters.hidden_size")
def input_data(self, is_infer=False, **kwargs):
if is_infer:
user_data = fluid.data(
name="user", shape=[None, 1], dtype="int64", lod_level=1)
all_item_data = fluid.data(
name="all_item", shape=[None, self.vocab_size], dtype="int64")
pos_label = fluid.data(
name="pos_label", shape=[None, 1], dtype="int64")
return [user_data, all_item_data, pos_label]
else:
user_data = fluid.data(
name="user", shape=[None, 1], dtype="int64", lod_level=1)
pos_item_data = fluid.data(
name="p_item", shape=[None, 1], dtype="int64", lod_level=1)
neg_item_data = fluid.data(
name="n_item", shape=[None, 1], dtype="int64", lod_level=1)
return [user_data, pos_item_data, neg_item_data]
def net(self, inputs, is_infer=False):
if is_infer:
self._infer_net(inputs)
return
user_data = inputs[0]
pos_item_data = inputs[1]
neg_item_data = inputs[2]
emb_shape = [self.vocab_size, self.emb_dim]
self.user_encoder = GrnnEncoder()
self.item_encoder = BowEncoder()
self.pairwise_hinge_loss = PairwiseHingeLoss()
user_data = fluid.data(
name="user", shape=[None, 1], dtype="int64", lod_level=1)
pos_item_data = fluid.data(
name="p_item", shape=[None, 1], dtype="int64", lod_level=1)
neg_item_data = fluid.data(
name="n_item", shape=[None, 1], dtype="int64", lod_level=1)
self._data_var.extend([user_data, pos_item_data, neg_item_data])
user_emb = fluid.embedding(
input=user_data, size=emb_shape, param_attr="emb.item")
pos_item_emb = fluid.embedding(
......@@ -109,79 +69,115 @@ class Model(ModelBase):
pos_item_enc = self.item_encoder.forward(pos_item_emb)
neg_item_enc = self.item_encoder.forward(neg_item_emb)
user_hid = fluid.layers.fc(input=user_enc,
size=hidden_size,
size=self.hidden_size,
param_attr='user.w',
bias_attr="user.b")
pos_item_hid = fluid.layers.fc(input=pos_item_enc,
size=hidden_size,
size=self.hidden_size,
param_attr='item.w',
bias_attr="item.b")
neg_item_hid = fluid.layers.fc(input=neg_item_enc,
size=hidden_size,
size=self.hidden_size,
param_attr='item.w',
bias_attr="item.b")
cos_pos = fluid.layers.cos_sim(user_hid, pos_item_hid)
cos_neg = fluid.layers.cos_sim(user_hid, neg_item_hid)
hinge_loss = self.pairwise_hinge_loss.forward(cos_pos, cos_neg)
avg_cost = fluid.layers.mean(hinge_loss)
correct = self.get_correct(cos_neg, cos_pos)
correct = self._get_correct(cos_neg, cos_pos)
self._cost = avg_cost
self._metrics["correct"] = correct
self._metrics["hinge_loss"] = hinge_loss
def train_net(self):
self.train()
def infer(self):
vocab_size = envs.get_global_env("hyper_parameters.vocab_size", None,
self._namespace)
emb_dim = envs.get_global_env("hyper_parameters.emb_dim", None,
self._namespace)
hidden_size = envs.get_global_env("hyper_parameters.hidden_size", None,
self._namespace)
user_data = fluid.data(
name="user", shape=[None, 1], dtype="int64", lod_level=1)
all_item_data = fluid.data(
name="all_item", shape=[None, vocab_size], dtype="int64")
pos_label = fluid.data(
name="pos_label", shape=[None, 1], dtype="int64")
self._infer_data_var = [user_data, all_item_data, pos_label]
self._infer_data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._infer_data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
def _infer_net(self, inputs):
user_data = inputs[0]
all_item_data = inputs[1]
pos_label = inputs[2]
user_emb = fluid.embedding(
input=user_data, size=[vocab_size, emb_dim], param_attr="emb.item")
input=user_data,
size=[self.vocab_size, self.emb_dim],
param_attr="emb.item")
all_item_emb = fluid.embedding(
input=all_item_data,
size=[vocab_size, emb_dim],
size=[self.vocab_size, self.emb_dim],
param_attr="emb.item")
all_item_emb_re = fluid.layers.reshape(
x=all_item_emb, shape=[-1, emb_dim])
x=all_item_emb, shape=[-1, self.emb_dim])
user_encoder = GrnnEncoder()
user_enc = user_encoder.forward(user_emb)
user_hid = fluid.layers.fc(input=user_enc,
size=hidden_size,
size=self.hidden_size,
param_attr='user.w',
bias_attr="user.b")
user_exp = fluid.layers.expand(
x=user_hid, expand_times=[1, vocab_size])
user_re = fluid.layers.reshape(x=user_exp, shape=[-1, hidden_size])
x=user_hid, expand_times=[1, self.vocab_size])
user_re = fluid.layers.reshape(
x=user_exp, shape=[-1, self.hidden_size])
all_item_hid = fluid.layers.fc(input=all_item_emb_re,
size=hidden_size,
size=self.hidden_size,
param_attr='item.w',
bias_attr="item.b")
cos_item = fluid.layers.cos_sim(X=all_item_hid, Y=user_re)
all_pre_ = fluid.layers.reshape(x=cos_item, shape=[-1, vocab_size])
all_pre_ = fluid.layers.reshape(
x=cos_item, shape=[-1, self.vocab_size])
acc = fluid.layers.accuracy(input=all_pre_, label=pos_label, k=20)
self._infer_results['recall20'] = acc
def infer_net(self):
self.infer()
def _get_correct(self, x, y):
less = tensor.cast(cf.less_than(x, y), dtype='float32')
correct = fluid.layers.reduce_sum(less)
return correct
class BowEncoder(object):
""" bow-encoder """
def __init__(self):
self.param_name = ""
def forward(self, emb):
return fluid.layers.sequence_pool(input=emb, pool_type='sum')
class GrnnEncoder(object):
""" grnn-encoder """
def __init__(self, param_name="grnn", hidden_size=128):
self.param_name = param_name
self.hidden_size = hidden_size
def forward(self, emb):
fc0 = fluid.layers.fc(input=emb,
size=self.hidden_size * 3,
param_attr=self.param_name + "_fc.w",
bias_attr=False)
gru_h = fluid.layers.dynamic_gru(
input=fc0,
size=self.hidden_size,
is_reverse=False,
param_attr=self.param_name + ".param",
bias_attr=self.param_name + ".bias")
return fluid.layers.sequence_pool(input=gru_h, pool_type='max')
class PairwiseHingeLoss(object):
def __init__(self, margin=0.8):
self.margin = margin
def forward(self, pos, neg):
loss_part1 = fluid.layers.elementwise_sub(
tensor.fill_constant_batch_size_like(
input=pos, shape=[-1, 1], value=self.margin, dtype='float32'),
pos)
loss_part2 = fluid.layers.elementwise_add(loss_part1, neg)
loss_part3 = fluid.layers.elementwise_max(
tensor.fill_constant_batch_size_like(
input=loss_part2, shape=[-1, 1], value=0.0, dtype='float32'),
loss_part2)
return loss_part3
......@@ -13,37 +13,42 @@
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
workspace: "paddlerec.models.recall.youtube_dnn"
epochs: 3
workspace: "paddlerec.models.recall.youtube_dnn"
device: cpu
dataset:
- name: dataset_train
batch_size: 5
type: DataLoader
#type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/random_reader.py"
hyper_parameters:
watch_vec_size: 64
search_vec_size: 64
other_feat_size: 64
output_size: 100
layers: [128, 64, 32]
optimizer:
class: adam
learning_rate: 0.001
strategy: async
reader:
batch_size: 2
class: "{workspace}/random_reader.py"
train_data_path: "{workspace}/data/train"
mode: train_runner
model:
models: "{workspace}/model.py"
hyper_parameters:
watch_vec_size: 64
search_vec_size: 64
other_feat_size: 64
output_size: 100
layers: [128, 64, 32]
learning_rate: 0.01
optimizer: sgd
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 3
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 10
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_train
thread_num: 1
......@@ -13,39 +13,64 @@
# limitations under the License.
import math
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
import numpy as np
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def input_data(self, is_infer=False):
def _init_hyper_parameters(self):
self.watch_vec_size = envs.get_global_env(
"hyper_parameters.watch_vec_size")
self.search_vec_size = envs.get_global_env(
"hyper_parameters.search_vec_size")
self.other_feat_size = envs.get_global_env(
"hyper_parameters.other_feat_size")
self.output_size = envs.get_global_env("hyper_parameters.output_size")
self.layers = envs.get_global_env("hyper_parameters.layers")
watch_vec_size = envs.get_global_env("hyper_parameters.watch_vec_size",
None, self._namespace)
search_vec_size = envs.get_global_env(
"hyper_parameters.search_vec_size", None, self._namespace)
other_feat_size = envs.get_global_env(
"hyper_parameters.other_feat_size", None, self._namespace)
def input_data(self, is_infer=False, **kwargs):
watch_vec = fluid.data(
name="watch_vec", shape=[None, watch_vec_size], dtype="float32")
name="watch_vec",
shape=[None, self.watch_vec_size],
dtype="float32")
search_vec = fluid.data(
name="search_vec", shape=[None, search_vec_size], dtype="float32")
name="search_vec",
shape=[None, self.search_vec_size],
dtype="float32")
other_feat = fluid.data(
name="other_feat", shape=[None, other_feat_size], dtype="float32")
name="other_feat",
shape=[None, self.other_feat_size],
dtype="float32")
label = fluid.data(name="label", shape=[None, 1], dtype="int64")
inputs = [watch_vec] + [search_vec] + [other_feat] + [label]
self._data_var = inputs
return inputs
def fc(self, tag, data, out_dim, active='relu'):
def net(self, inputs, is_infer=False):
concat_feats = fluid.layers.concat(input=inputs[:-1], axis=-1)
l1 = self._fc('l1', concat_feats, self.layers[0], 'relu')
l2 = self._fc('l2', l1, self.layers[1], 'relu')
l3 = self._fc('l3', l2, self.layers[2], 'relu')
l4 = self._fc('l4', l3, self.output_size, 'softmax')
num_seqs = fluid.layers.create_tensor(dtype='int64')
acc = fluid.layers.accuracy(input=l4, label=inputs[-1], total=num_seqs)
cost = fluid.layers.cross_entropy(input=l4, label=inputs[-1])
avg_cost = fluid.layers.mean(cost)
self._cost = avg_cost
self._metrics["acc"] = acc
def _fc(self, tag, data, out_dim, active='relu'):
init_stddev = 1.0
scales = 1.0 / np.sqrt(data.shape[1])
......@@ -67,31 +92,3 @@ class Model(ModelBase):
bias_attr=b_attr,
name=tag)
return out
def net(self, inputs):
output_size = envs.get_global_env("hyper_parameters.output_size", None,
self._namespace)
layers = envs.get_global_env("hyper_parameters.layers", None,
self._namespace)
concat_feats = fluid.layers.concat(input=inputs[:-1], axis=-1)
l1 = self.fc('l1', concat_feats, layers[0], 'relu')
l2 = self.fc('l2', l1, layers[1], 'relu')
l3 = self.fc('l3', l2, layers[2], 'relu')
l4 = self.fc('l4', l3, output_size, 'softmax')
num_seqs = fluid.layers.create_tensor(dtype='int64')
acc = fluid.layers.accuracy(input=l4, label=inputs[-1], total=num_seqs)
cost = fluid.layers.cross_entropy(input=l4, label=inputs[-1])
avg_cost = fluid.layers.mean(cost)
self._cost = avg_cost
self._metrics["acc"] = acc
def train_net(self):
input_data = self.input_data()
self.net(input_data)
def infer_net(self):
pass
......@@ -13,22 +13,22 @@
# limitations under the License.
from __future__ import print_function
import numpy as np
from paddlerec.core.reader import Reader
from paddlerec.core.utils import envs
from collections import defaultdict
import numpy as np
class TrainReader(Reader):
def init(self):
self.watch_vec_size = envs.get_global_env(
"hyper_parameters.watch_vec_size", None, "train.model")
"hyper_parameters.watch_vec_size")
self.search_vec_size = envs.get_global_env(
"hyper_parameters.search_vec_size", None, "train.model")
"hyper_parameters.search_vec_size")
self.other_feat_size = envs.get_global_env(
"hyper_parameters.other_feat_size", None, "train.model")
self.output_size = envs.get_global_env("hyper_parameters.output_size",
None, "train.model")
"hyper_parameters.other_feat_size")
self.output_size = envs.get_global_env("hyper_parameters.output_size")
def generate_sample(self, line):
"""
......
......@@ -12,44 +12,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
evaluate:
reader:
batch_size: 1
class: "{workspace}/random_infer_reader.py"
test_data_path: "{workspace}/data/train"
train:
trainer:
# for cluster training
strategy: "async"
workspace: "paddlerec.models.rerank.listwise"
epochs: 3
workspace: "paddlerec.models.rerank.listwise"
device: cpu
dataset:
- name: dataset_train
type: DataLoader
data_path: "{workspace}/data/train"
data_converter: "{workspace}/random_reader.py"
- name: dataset_infer
type: DataLoader
data_path: "{workspace}/data/test"
data_converter: "{workspace}/random_reader.py"
reader:
batch_size: 2
class: "{workspace}/random_reader.py"
train_data_path: "{workspace}/data/train"
dataset_class: "DataLoader"
hyper_parameters:
hidden_size: 128
user_vocab: 200
item_vocab: 1000
item_len: 5
embed_size: 16
batch_size: 1
optimizer:
class: sgd
learning_rate: 0.01
strategy: async
model:
models: "{workspace}/model.py"
hyper_parameters:
hidden_size: 128
user_vocab: 200
item_vocab: 1000
item_len: 5
embed_size: 16
learning_rate: 0.01
optimizer: sgd
#use infer_runner mode and modify 'phase' below if infer
mode: train_runner
#mode: infer_runner
runner:
- name: train_runner
class: single_train
device: cpu
epochs: 3
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
- name: infer_runner
class: single_infer
init_model_path: "increment/0"
device: cpu
epochs: 3
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
phase:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_train
thread_num: 1
#- name: infer
# model: "{workspace}/model.py"
# dataset_name: dataset_infer
# thread_num: 1
......@@ -25,18 +25,13 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.item_len = envs.get_global_env("hyper_parameters.self.item_len",
None, self._namespace)
self.hidden_size = envs.get_global_env("hyper_parameters.hidden_size",
None, self._namespace)
self.user_vocab = envs.get_global_env("hyper_parameters.user_vocab",
None, self._namespace)
self.item_vocab = envs.get_global_env("hyper_parameters.item_vocab",
None, self._namespace)
self.embed_size = envs.get_global_env("hyper_parameters.embed_size",
None, self._namespace)
def input_data(self, is_infer=False):
self.item_len = envs.get_global_env("hyper_parameters.self.item_len")
self.hidden_size = envs.get_global_env("hyper_parameters.hidden_size")
self.user_vocab = envs.get_global_env("hyper_parameters.user_vocab")
self.item_vocab = envs.get_global_env("hyper_parameters.item_vocab")
self.embed_size = envs.get_global_env("hyper_parameters.embed_size")
def input_data(self, is_infer=False, **kwargs):
user_slot_names = fluid.data(
name='user_slot_names',
shape=[None, 1],
......
......@@ -23,14 +23,10 @@ from collections import defaultdict
class TrainReader(Reader):
def init(self):
self.user_vocab = envs.get_global_env("hyper_parameters.user_vocab",
None, "train.model")
self.item_vocab = envs.get_global_env("hyper_parameters.item_vocab",
None, "train.model")
self.item_len = envs.get_global_env("hyper_parameters.item_len", None,
"train.model")
self.batch_size = envs.get_global_env("batch_size", None,
"train.reader")
self.user_vocab = envs.get_global_env("hyper_parameters.user_vocab")
self.item_vocab = envs.get_global_env("hyper_parameters.item_vocab")
self.item_len = envs.get_global_env("hyper_parameters.item_len")
self.batch_size = envs.get_global_env("hyper_parameters.batch_size")
def reader_creator(self):
def reader():
......
......@@ -9,9 +9,6 @@
* [整体介绍](#整体介绍)
* [重排序模型列表](#重排序模型列表)
* [使用教程](#使用教程)
* [训练 预测](#训练 预测)
* [效果对比](#效果对比)
* [模型效果列表](#模型效果列表)
## 整体介绍
### 融合模型列表
......@@ -29,15 +26,11 @@
<p>
## 使用教程
### 训练 预测
## 使用教程(快速开始)
```shell
python -m paddlerec.run -m paddlerec.models.rerank.listwise # listwise
```
## 效果对比
### 模型效果列表
## 使用教程(复现论文)
| 数据集 | 模型 | loss | auc |
| :------------------: | :--------------------: | :---------: |:---------: |
| -- | Listwise | -- | -- |
listwise原论文没有给出训练数据,我们使用了随机的数据,可参考快速开始
......@@ -62,7 +62,8 @@ def build(dirname):
models_copy = [
'data/*.txt', 'data/*/*.txt', '*.yaml', '*.sh', 'tree/*.npy',
'tree/*.txt', 'data/sample_data/*', 'data/sample_data/train/*'
'tree/*.txt', 'data/sample_data/*', 'data/sample_data/train/*',
'data/sample_data/infer/*'
]
engine_copy = ['*/*.sh']
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册