未验证 提交 27378ed8 编写于 作者: W wuzhihua 提交者: GitHub

Merge branch 'master' into add_FNN

<p align="center">
<img align="center" src="doc/imgs/logo.png">
<p>
<p align="center">
<br>
<img alt="Release" src="https://img.shields.io/badge/Release-0.1.0-yellowgreen">
<img alt="License" src="https://img.shields.io/github/license/PaddlePaddle/Serving">
<img alt="Slack" src="https://img.shields.io/badge/Join-Slack-green">
<br>
<img align="center" src="doc/imgs/structure.png">
<p>
<p align="center">
<img align="center" src="doc/imgs/overview.png">
<p>
<h2 align="center">什么是PaddleRec</h2>
<h2 align="center">什么是推荐系统?</h2>
<p align="center">
<img align="center" src="doc/imgs/structure.png">
<img align="center" src="doc/imgs/rec-overview.png">
<p>
- 源于飞桨生态的搜索推荐模型**一站式开箱即用工具**
- 适合初学者,开发者,研究者从调研,训练到预测部署的全流程解决方案
- 包含语义理解、召回、粗排、精排、多任务学习、融合等多个任务的推荐搜索算法库
- 配置**yaml**自定义选项,即可快速上手使用单机训练、大规模分布式训练、离线预测、在线部署
- 推荐系统是在互联网信息爆炸式增长的时代背景下,帮助用户高效获得感兴趣信息的关键;
- 推荐系统也是帮助产品最大限度吸引用户、留存用户、增加用户粘性、提高用户转化率的银弹。
<h2 align="center">PadlleRec概览</h2>
- 有无数优秀的产品依靠用户可感知的推荐系统建立了良好的口碑,也有无数的公司依靠直击用户痛点的推荐系统在行业中占领了一席之地。
<p align="center">
<img align="center" src="doc/imgs/overview.png">
<p>
> 可以说,谁能掌握和利用好推荐系统,谁就能在信息分发的激烈竞争中抢得先机。
> 但与此同时,有着许多问题困扰着推荐系统的开发者,比如:庞大的数据量,复杂的模型结构,低效的分布式训练环境,波动的在离线一致性,苛刻的上线部署要求,以上种种,不胜枚举。
<h2 align="center">什么是PaddleRec?</h2>
<h2 align="center">推荐系统-流程概览</h2>
<p align="center">
<img align="center" src="doc/imgs/rec-overview.png">
<p>
- 源于飞桨生态的搜索推荐模型 **一站式开箱即用工具**
- 适合初学者,开发者,研究者的推荐系统全流程解决方案
- 包含内容理解、匹配、召回、排序、 多任务、重排序等多个任务的完整推荐搜索算法库
| 方向 | 模型 | 单机CPU训练 | 单机GPU训练 | 分布式CPU训练 |
| :------: | :-----------------------------------------------------------------------: | :---------: | :---------: | :-----------: |
| 内容理解 | [Text-Classifcation](models/contentunderstanding/classification/model.py) | ✓ | x | ✓ |
| 内容理解 | [TagSpace](models/contentunderstanding/tagspace/model.py) | ✓ | x | ✓ |
| 召回 | [DSSM](models/match/dssm/model.py) | ✓ | x | ✓ |
| 召回 | [MultiView-Simnet](models/match/multiview-simnet/model.py) | ✓ | x | ✓ |
| 召回 | [TDM](models/treebased/tdm/model.py) | ✓ | x | ✓ |
| 召回 | [Word2Vec](models/recall/word2vec/model.py) | ✓ | x | ✓ |
| 召回 | [SSR](models/recall/ssr/model.py) | ✓ | ✓ | ✓ |
| 召回 | [Gru4Rec](models/recall/gru4rec/model.py) | ✓ | ✓ | ✓ |
| 召回 | [Youtube_dnn](models/recall/youtube_dnn/model.py) | ✓ | ✓ | ✓ |
| 召回 | [NCF](models/recall/ncf/model.py) | ✓ | ✓ | ✓ |
| 排序 | [Logistic Regression](models/rank/logistic_regression/model.py) | ✓ | x | ✓ |
| 排序 | [Dnn](models/rank/dnn/model.py) | ✓ | x | ✓ |
| 排序 | [FM](models/rank/fm/model.py) | ✓ | x | ✓ |
| 排序 | [FFM](models/rank/ffm/model.py) | ✓ | x | ✓ |
| 排序 | [Pnn](models/rank/pnn/model.py) | ✓ | x | ✓ |
| 排序 | [DCN](models/rank/dcn/model.py) | ✓ | x | ✓ |
| 排序 | [NFM](models/rank/nfm/model.py) | ✓ | x | ✓ |
| 排序 | [AFM](models/rank/afm/model.py) | ✓ | x | ✓ |
| 排序 | [DeepFM](models/rank/deepfm/model.py) | ✓ | x | ✓ |
| 排序 | [xDeepFM](models/rank/xdeepfm/model.py) | ✓ | x | ✓ |
| 排序 | [DIN](models/rank/din/model.py) | ✓ | x | ✓ |
| 排序 | [Wide&Deep](models/rank/wide_deep/model.py) | ✓ | x | ✓ |
| 多任务 | [ESMM](models/multitask/esmm/model.py) | ✓ | ✓ | ✓ |
| 多任务 | [MMOE](models/multitask/mmoe/model.py) | ✓ | ✓ | ✓ |
| 多任务 | [ShareBottom](models/multitask/share-bottom/model.py) | ✓ | ✓ | ✓ |
| 重排序 | [Listwise](models/rerank/listwise/model.py) | ✓ | x | ✓ |
<h2 align="center">便捷安装</h2>
<h2 align="center">快速安装</h2>
### 环境要求
* Python 2.7/ 3.5 / 3.6 / 3.7
* PaddlePaddle >= 1.7.2
* 操作系统: Windows/Mac/Linux
> Windows下目前仅提供单机训练,建议使用Linux
> Windows下目前仅提供单机训练,建议分布式使用Linux
### 安装命令
......@@ -55,7 +83,7 @@
- 安装方法二
源码编译安装
1. 安装飞桨 **注:需要用户安装版本 >1.7.2 的飞桨**
1. 安装飞桨 **注:需要用户安装版本 >=1.7.2 的飞桨**
```shell
python -m pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple
......@@ -70,139 +98,41 @@
```
<h2 align="center">快速启动</h2>
### 启动内置模型的默认配置
<h2 align="center">一键启动</h2>
目前框架内置了多个模型,一行命令即可使用内置模型开始单机训练和本地模拟分布式训练。
> 本地模拟分布式(`local_cluster`)为`1个server + 1个trainer`的参数服务器模式
我们以排序模型中的`dnn`模型为例介绍PaddleRec的简单使用。训练数据来源为[Criteo数据集](https://www.kaggle.com/c/criteo-display-ad-challenge/),我们从中截取了100条方便您快速上手体验完整的PaddleRec流程。
我们以排序模型中的`dnn`模型为例介绍PaddleRec的一键启动。训练数据来源为[Criteo数据集](https://www.kaggle.com/c/criteo-display-ad-challenge/),我们从中截取了100条数据:
```bash
# 使用CPU进行单机训练
python -m paddlerec.run -m paddlerec.models.rank.dnn
```
### 启动内置模型的自定配置
若您复用内置模型,对**yaml**配置文件进行了修改,如更改超参,重新配置数据后,可以直接使用paddlerec运行该yaml文件。
我们以dnn模型为例,在paddlerec代码目录下:
```bash
cd paddlerec
```
修改dnn模型的[超参配置](./models/rank/dnn/config.yaml),例如将迭代训练轮数从10轮修改为5轮:
```yaml
runner:
- name: runner1
class: single_train
epochs: 5 # 10->5
```
在Linux环境下,可以使用`vim`等文本编辑工具修改yaml文件:
```bash
vim ./models/rank/dnn/config.yaml
# 键入 i, 进入编辑模式
# 修改yaml文件配置
# 完成修改后,点击esc,退出编辑模式
# 键入 :wq 保存文件并退出
```
完成dnn模型`models/rank/dnn/config.yaml`的配置修改后,运行`dnn`模型:
```bash
# 使用自定配置进行训练
python -m paddlerec.run -m ./models/rank/dnn/config.yaml
```
### 分布式训练
<h2 align="center">帮助文档</h2>
分布式训练需要配置`config.yaml`,加入或修改`engine`选项为`cluster``local_cluster`,以进行分布式训练,或本地模拟分布式训练。
#### 本地模拟分布式训练
我们以dnn模型为例,在paddlerec代码目录下,修改dnn模型的`config.yaml`文件:
```yaml
runner:
- name: runner1
class: local_cluster_train # single_train -> local_cluster_train
```
然后启动paddlerec训练:
```bash
# 进行本地模拟分布式训练
python -m paddlerec.run -m ./models/rank/dnn/config.yaml
```
#### 集群分布式训练
我们以dnn模型为例,在paddlerec代码目录下,首先修改dnn模型`config.yaml`文件:
```yaml
runner:
- name: runner1
class: cluster_train # single_train -> cluster_train
```
再添加分布式启动配置文件`backend.yaml`,具体配置规则在[分布式训练](doc/distributed_train.md)教程中介绍。最后启动paddlerec训练:
```bash
# 配置好 mpi/k8s/paddlecloud集群环境后
python -m paddlerec.run -m ./models/rank/dnn/config.yaml -b backend.yaml
```
<h2 align="center">支持模型列表</h2>
| 方向 | 模型 | 单机CPU训练 | 单机GPU训练 | 分布式CPU训练 |
| :------: | :-----------------------------------------------------------------------: | :---------: | :---------: | :-----------: |
| 内容理解 | [Text-Classifcation](models/contentunderstanding/classification/model.py) | ✓ | x | ✓ |
| 内容理解 | [TagSpace](models/contentunderstanding/tagspace/model.py) | ✓ | x | ✓ |
| 召回 | [DSSM](models/match/dssm/model.py) | ✓ | x | ✓ |
| 召回 | [MultiView-Simnet](models/match/multiview-simnet/model.py) | ✓ | x | ✓ |
| 召回 | [TDM](models/treebased/tdm/model.py) | ✓ | x | ✓ |
| 召回 | [Word2Vec](models/recall/word2vec/model.py) | ✓ | x | ✓ |
| 召回 | [SSR](models/recall/ssr/model.py) | ✓ | ✓ | ✓ |
| 召回 | [Gru4Rec](models/recall/gru4rec/model.py) | ✓ | ✓ | ✓ |
| 召回 | [Youtube_dnn](models/recall/youtube_dnn/model.py) | ✓ | ✓ | ✓ |
| 召回 | [NCF](models/recall/ncf/model.py) | ✓ | ✓ | ✓ |
| 排序 | [Dnn](models/rank/dnn/model.py) | ✓ | x | ✓ |
| 排序 | [DeepFM](models/rank/deepfm/model.py) | ✓ | x | ✓ |
| 排序 | [xDeepFM](models/rank/xdeepfm/model.py) | ✓ | x | ✓ |
| 排序 | [DIN](models/rank/din/model.py) | ✓ | x | ✓ |
| 排序 | [Wide&Deep](models/rank/wide_deep/model.py) | ✓ | x | ✓ |
| 多任务 | [ESMM](models/multitask/esmm/model.py) | ✓ | ✓ | ✓ |
| 多任务 | [MMOE](models/multitask/mmoe/model.py) | ✓ | ✓ | ✓ |
| 多任务 | [ShareBottom](models/multitask/share-bottom/model.py) | ✓ | ✓ | ✓ |
| 重排序 | [Listwise](models/rerank/listwise/model.py) | ✓ | x | ✓ |
<h2 align="center">文档</h2>
### 背景介绍
### 项目背景
* [推荐系统介绍](doc/rec_background.md)
* [分布式深度学习介绍](doc/ps_background.md)
### 新手教程
* [环境要求](#环境要求)
* [安装命令](#安装命令)
* [快速开始](#启动内置模型的默认配置)
### 快速开始
* [十分钟上手PaddleRec](doc/quick_start.md)
### 入门教程
* [数据准备](doc/slot_reader.md)
* [模型调参](doc/model.md)
* [启动训练](doc/train.md)
* [启动预测](doc/predict.md)
* [快速部署](doc/serving.md)
### 进阶教程
* [自定义数据集及Reader](doc/custom_dataset_reader.md)
* [分布式训练](doc/distributed_train.md)
### 开发者教程
### 进阶教程
* [自定义Reader](doc/custom_reader.md)
* [自定义模型](doc/development.md)
* [自定义流程](doc/development.md)
* [yaml配置说明](doc/yaml.md)
* [PaddleRec设计文档](doc/design.md)
* [二次开发](doc/development.md)
### 关于PaddleRec性能
### Benchmark
* [Benchmark](doc/benchmark.md)
### FAQ
......@@ -211,12 +141,25 @@ python -m paddlerec.run -m ./models/rank/dnn/config.yaml -b backend.yaml
<h2 align="center">社区</h2>
### 反馈
如有意见、建议及使用中的BUG,欢迎在[GitHub Issue](https://github.com/PaddlePaddle/PaddleRec/issues)提交
<p align="center">
<br>
<img alt="Release" src="https://img.shields.io/badge/Release-0.1.0-yellowgreen">
<img alt="License" src="https://img.shields.io/github/license/PaddlePaddle/Serving">
<img alt="Slack" src="https://img.shields.io/badge/Join-Slack-green">
<br>
<p>
### 版本历史
- 2020.5.14 - PaddleRec v0.1
### 许可证书
本项目的发布受[Apache 2.0 license](LICENSE)许可认证。
### 联系我们
如有意见、建议及使用中的BUG,欢迎在[GitHub Issue](https://github.com/PaddlePaddle/PaddleRec/issues)提交
亦可通过以下方式与我们沟通交流:
<p align="center"><img width="200" height="200" src="doc/imgs/wechet.png"/>&#8194;&#8194;&#8194;&#8194;&#8194;<img width="200" height="200" margin="500" src="./doc/imgs/QQ_group.png"/>&#8194;&#8194;&#8194;&#8194;&#8194<img width="200" height="200" src="doc/imgs/weixin_supporter.png"/></p>
<p align="center"> &#8194;&#8194;&#8194;&#8194;&#8194;&#8194;微信公众号&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;PaddleRec交流QQ群&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;PaddleRec微信小助手</p>
......@@ -20,6 +20,8 @@ from __future__ import print_function
import time
import logging
import os
import json
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
......@@ -263,8 +265,10 @@ class SingleInfer(TranspileTrainer):
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics_format.append("{}: {{}}".format("batch"))
metrics_indexes = dict()
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_indexes[var.name] = len(metrics_varnames) - 1
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
......@@ -272,19 +276,30 @@ class SingleInfer(TranspileTrainer):
reader.start()
batch_id = 0
scope = self._model[model_name][2]
infer_results = []
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
fetch_list=metrics_varnames,
return_numpy=False)
metrics = [batch_id]
metrics.extend(metrics_rets)
batch_infer_result = {}
for k, v in metrics_indexes.items():
batch_infer_result[k] = np.array(metrics_rets[
v]).tolist()
infer_results.append(batch_infer_result)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
with open(model_dict['save_path'], 'w') as fout:
json.dump(infer_results, fout)
def terminal(self, context):
context['is_exit'] = True
......
# PaddleRec 推荐数据集格式
当你的数据集格式为[slot:feasign]*这种模式,或者可以预处理为这种格式时,可以直接使用PaddleRec内置的Reader。
好处是不用自己写Reader了,各个model之间的数据格式也都可以统一成一样的格式。
## 数据格式说明
假如你的原始数据格式为
```bash
<label> <integer feature 1> ... <integer feature 13> <categorical feature 1> ... <categorical feature 26>
```
其中```<label>```表示广告是否被点击,点击用1表示,未点击用0表示。```<integer feature>```代表数值特征(连续特征),共有13个连续特征。
并且每个特征有一个特征值。
```<categorical feature>```代表分类特征(离散特征),共有26个离散特征。相邻两个特征用```\t```分隔。
假设这13个连续特征(dense slot)的name如下:
```
D1 D2 D3 D4 D4 D6 D7 D8 D9 D10 D11 D12 D13
```
这26个离散特征(sparse slot)的name如下:
```
S1 S2 S3 S4 S5 S6 S7 S8 S9 S10 S11 S12 S13 S14 S15 S16 S17 S18 S19 S20 S21 S22 S23 S24 S25 S26
```
那么下面这条样本(1个label + 13个dense值 + 26个feasign)
```
1 0.1 0.4 0.2 0.3 0.5 0.8 0.3 0.2 0.1 0.5 0.6 0.3 0.9 60 16 91 50 52 52 28 69 63 33 87 69 48 59 27 12 95 36 37 41 17 3 86 19 88 60
```
可以转换成:
```
label:1 D1:0.1 D2:0.4 D3:0.2 D4:0.3 D5:0.5 D6:0.8 D7:0.3 D8:0.2 D9:0.1 D10:0.5 D11:0.6 D12:0.3 D13:0.9 S14:60 S15:16 S16:91 S17:50 S18:52 S19:52 S20:28 S21:69 S22:63 S23:33 S24:87 S25:69 S26:48 S27:59 S28:27 S29:12 S30:95 S31:36 S32:37 S33:41 S34:17 S35:3 S36:86 S37:19 S38:88 S39:60
```
注意:上面各个slot:feasign字段之间的顺序没有要求,比如```D1:0.1 D2:0.4```改成```D2:0.4 D1:0.1```也可以。
## 配置
reader中需要配置```sparse_slots```与```dense_slots```,例如
```
workspace: xxxx
reader:
batch_size: 2
train_data_path: "{workspace}/data/train_data"
sparse_slots: "label S1 S2 S3 S4 S5 S6 S7 S8 S9 S10 S11 S12 S13 S14 S15 S16 S17 S18 S19 S20 S21 S22 S23 S24 S25 S26"
dense_slots: "D1:1 D2:1 D3:1 D4:1 D4:1 D6:1 D7:1 D8:1 D9:1 D10:1 D11:1 D12:1 D13:1"
model:
xxxxx
```
sparse_slots表示稀疏特征的列表,以空格分开。
dense_slots表示稠密特征的列表,以空格分开。每个字段的格式是```[dense_slot_name]:[dim1,dim2,dim3...]```,其中```dim1,dim2,dim3...```表示shape
配置好了之后,这些slot对应的variable就可以在model中的如下变量啦:
```
self._sparse_data_var
self._dense_data_var
```
# PaddleRec 自定义数据集及Reader
用户自定义数据集及配置异步Reader,需要关注以下几个步骤:
......
# 二次开发
## 如何添加自定义模型
# 如何添加自定义模型
当您希望开发自定义模型时,需要继承模型的模板基类,并实现三个必要的方法`init_hyper_parameter`,`intput_data`,`net`
......
# PaddleRec 单机训练
> 占位
# 模型调参
PaddleRec模型调参需要同时关注两个部分
1. model.py
2. config.yaml 中 hyper_parameters的部分
我们以`models/rank/dnn`为例介绍两者的对应情况:
```yaml
hyper_parameters:
optimizer:
class: Adam
learning_rate: 0.001
sparse_feature_number: 1000001
sparse_feature_dim: 9
fc_sizes: [512, 256, 128, 32]
```
## optimizer
该参数决定了网络参数训练时使用的优化器,class可选项有:`SGD`/`Adam`/`AdaGrad`,通过learning_rate选项设置学习率。
`PaddleRec/core/model.py`中,可以看到该选项是如何生效的:
```python
if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001,
self._namespace)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
elif name == "ADAGRAD":
optimizer_i = fluid.optimizer.Adagrad(lr)
```
## sparse_feature_number & sparse_feature_dim
该参数指定了ctr-dnn组网中,Embedding表的维度,在`PaddelRec/models/rank/dnn/model.py`中可以看到该参数是如何生效的:
```python
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim")
def embedding_layer(input):
emb = fluid.layers.embedding(
input=input,
is_sparse=True,
is_distributed=self.is_distributed,
size=[self.sparse_feature_number, self.sparse_feature_dim],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()), )
emb_sum = fluid.layers.sequence_pool(input=emb, pool_type='sum')
return emb_sum
```
## fc_sizes
该参数指定了ctr-dnn模型中的dnn共有几层,且每层的维度,在在`PaddelRec/models/rank/dnn/model.py`中可以看到该参数是如何生效的:
```python
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes")
for size in hidden_layers:
output = fluid.layers.fc(
input=fcs[-1],
size=size,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1.0 / math.sqrt(fcs[-1].shape[1]))))
fcs.append(output)
```
# 支持模型列表
| 方向 | 模型 | 单机CPU训练 | 单机GPU训练 | 分布式CPU训练 | 大规模稀疏 | 分布式GPU训练 | 自定义数据集 |
| :------: | :--------------------: | :---------: | :---------: | :-----------: | :--------: | :-----------: | :----------: |
| 内容理解 | [Text-Classifcation]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 内容理解 | [TagSpace]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 召回 | [Word2Vec]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 召回 | [TDM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 排序 | [CTR-Dnn]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 排序 | [DeepFm]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 排序 | [ListWise]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 多任务 | [MMOE]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 多任务 | [ESMM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 匹配 | [DSSM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 匹配 | [Multiview-Simnet]() | ✓ | x | ✓ | x | ✓ | ✓ |
# PaddleRec 模型调参
> 占位
# PaddleRec 离线预测
## 单机离线预测启动配置
下面我们开始定义一个单机预测的`runner`:
```yaml
mode: runner_infer # 执行名为 runner1 的运行器
runner:
- name: runner_infer # 定义 runner 名为 runner1
class: single_infer # 执行单机预测 class = single_infer
device: cpu # 执行在 cpu 上
init_model_path: "init_model" # 指定初始化模型的地址
print_interval: 10 # 预测信息的打印间隔,以batch为单位
```
再定义具体的执行内容:
```yaml
phase:
- name: phase_infer # 该阶段名为 phase_infer
model: "{workspace}/model.py" # 模型文件为workspace下的model.py
dataset_name: dataset_infer # reader的名字
dataset:
- name: dataset_infer
type: DataLoader # 使用DataLoader的数据读取方式
batch_size: 2
data_path: "{workspace}/test_data" # 数据地址
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" # sparse 输入的位置定义
dense_slots: "dense_var:13" # dense参数的维度定义
```
# 十分钟上手PaddleRec
......@@ -3,7 +3,7 @@
本文代码目录在[book/recommender_system](https://github.com/PaddlePaddle/book/tree/develop/05.recommender_system),初次使用请您参考[Book文档使用说明](https://github.com/PaddlePaddle/book/blob/develop/README.cn.md#运行这本书)
更多教程及背景知识可以查阅[深度学习实践应用:个性化推荐](https://www.paddlepaddle.org.cn/tutorials/projectdetail/443958)
更多教程及背景知识可以查阅[深度学习实践应用:个性化推荐](https://www.paddlepaddle.org.cn/tutorials/projectdetail/460300)
## 背景介绍
......
# 在线部署
PaddleRec以飞桨框架为底层,因此训练保存出的inference_model(预测模型),可以使用飞桨强大的部署能力快速在线部署。
首先我们在`yaml`配置中,指定inference_model的保存间隔与保存地址:
```yaml
mode: runner_train # 执行名为 runner_train 的运行器
runner:
- name: runner_train # 定义 runner 名为 runner_train
class: single_train # 执行单机训练 class = single_train
device: cpu # 执行在 cpu 上
epochs: 10 # 训练轮数
save_checkpoint_interval: 2 # 每隔2轮保存一次checkpoint
save_inference_interval: 4 # 每个4轮保存依次inference model
save_checkpoint_path: "increment" # checkpoint 的保存地址
save_inference_path: "inference" # inference model 的保存地址
save_inference_feed_varnames: [] # inference model 的feed参数的名字
save_inference_fetch_varnames: [] # inference model 的fetch参数的名字
init_model_path: "" # 如果是加载模型热启,则可以指定初始化模型的地址
print_interval: 10 # 训练信息的打印间隔,以batch为单位
```
训练完成后,我们便可以在`inference``increment`文件夹中看到保存的模型/参数。
参考以下链接进行模型的不同场景下的部署。
### [服务器端部署](https://www.paddlepaddle.org.cn/documentation/docs/zh/advanced_guide/inference_deployment/inference/index_cn.html)
### [移动端部署](https://www.paddlepaddle.org.cn/documentation/docs/zh/advanced_guide/inference_deployment/mobile/index_cn.html)
### [在线Serving](https://github.com/PaddlePaddle/Serving)
### [模型压缩](https://www.paddlepaddle.org.cn/documentation/docs/zh/advanced_guide/inference_deployment/paddleslim/paddle_slim.html)
# PaddleRec 推荐数据集格式
当你的数据集格式为`slot:feasign`这种模式,或者可以预处理为这种格式时,可以直接使用PaddleRec内置的Reader。
> Slot : Feasign 是什么?
>
> Slot直译是槽位,在推荐工程中,是指某一个宽泛的特征类别,比如用户ID、性别、年龄就是Slot,Feasign则是具体值,比如:12345,男,20岁。
>
> 在实践过程中,很多特征槽位不是单一属性,或无法量化并且离散稀疏的,比如某用户兴趣爱好有三个:游戏/足球/数码,且每个具体兴趣又有多个特征维度,则在兴趣爱好这个Slot兴趣槽位中,就会有多个Feasign值。
>
> PaddleRec在读取数据时,每个Slot ID对应的特征,支持稀疏,且支持变长,可以非常灵活的支持各种场景的推荐模型训练。
## 数据格式说明
假如你的原始数据格式为
```bash
<label> <integer feature 1> ... <integer feature 13> <categorical feature 1> ... <categorical feature 26>
```
其中```<label>```表示广告是否被点击,点击用1表示,未点击用0表示。```<integer feature>```代表数值特征(连续特征),共有13个连续特征。
并且每个特征有一个特征值。
```<categorical feature>```代表分类特征(离散特征),共有26个离散特征。相邻两个特征用```\t```分隔。
假设这13个连续特征(dense slot)的name如下:
```
D1 D2 D3 D4 D4 D6 D7 D8 D9 D10 D11 D12 D13
```
这26个离散特征(sparse slot)的name如下:
```
S1 S2 S3 S4 S5 S6 S7 S8 S9 S10 S11 S12 S13 S14 S15 S16 S17 S18 S19 S20 S21 S22 S23 S24 S25 S26
```
那么下面这条样本(1个label + 13个dense值 + 26个feasign)
```
1 0.1 0.4 0.2 0.3 0.5 0.8 0.3 0.2 0.1 0.5 0.6 0.3 0.9 60 16 91 50 52 52 28 69 63 33 87 69 48 59 27 12 95 36 37 41 17 3 86 19 88 60
```
可以转换成:
```
label:1 D1:0.1 D2:0.4 D3:0.2 D4:0.3 D5:0.5 D6:0.8 D7:0.3 D8:0.2 D9:0.1 D10:0.5 D11:0.6 D12:0.3 D13:0.9 S14:60 S15:16 S16:91 S17:50 S18:52 S19:52 S20:28 S21:69 S22:63 S23:33 S24:87 S25:69 S26:48 S27:59 S28:27 S29:12 S30:95 S31:36 S32:37 S33:41 S34:17 S35:3 S36:86 S37:19 S38:88 S39:60
```
注意:上面各个slot:feasign字段之间的顺序没有要求,比如```D1:0.1 D2:0.4```改成```D2:0.4 D1:0.1```也可以。
## 配置
reader中需要配置```sparse_slots```与```dense_slots```,例如
```
workspace: xxxx
reader:
batch_size: 2
train_data_path: "{workspace}/data/train_data"
sparse_slots: "label S1 S2 S3 S4 S5 S6 S7 S8 S9 S10 S11 S12 S13 S14 S15 S16 S17 S18 S19 S20 S21 S22 S23 S24 S25 S26"
dense_slots: "D1:1 D2:1 D3:1 D4:1 D4:1 D6:1 D7:1 D8:1 D9:1 D10:1 D11:1 D12:1 D13:1"
model:
xxxxx
```
sparse_slots表示稀疏特征的列表,以空格分开,支持lod_level=1的变长数据,变长数据的相关概念可以参考[LodTensor](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/basic_concept/lod_tensor.html#lodtensor)
dense_slots表示稠密特征的列表,以空格分开。每个字段的格式是```[dense_slot_name]:[dim1,dim2,dim3...]```,其中```dim1,dim2,dim3...```表示shape,多维的Dense参数shape为一个list。
配置好了之后,这些slot对应的variable在model中可以使用如下方式调用:
```
self._sparse_data_var
self._dense_data_var
```
# PaddleRec 启动训练
## 启动方法
### 1. 启动内置模型的默认配置训练
在安装好`paddlepaddle``paddlerec`后,可以直接使用一行命令快速启动内置模型的默认配置训练,命令如下;
```shell
python -m paddlerec.run -m paddlerec.models.xxx.yyy
```
注意事项:
1. 请确保调用的是安装了paddlerec的`python`环境
2. `xxx`为paddlerec.models下有多个大类,如:`recall`/`rank`/`rerank`
3. `yyy`为每个类别下又有多个模型,如`recall`下有:`gnn`/`grup4rec`/`ncf`
例如启动`recall`下的`word2vec`模型的默认配置;
```shell
python -m paddlerec.run -m paddlerec.models.recall.word2vec
```
### 2. 启动内置模型的个性化配置训练
如果我们修改了默认模型的config.yaml文件,怎么运行修改后的模型呢?
- **没有改动模型组网**
假如你将paddlerec代码库克隆在了`/home/PaddleRec`,并修改了`/home/PaddleRec/models/rank/dnn/config.yaml`,则如下启动训练
```shell
python -m paddlerec.run -m /home/PaddleRec/models/rank/dnn/config.yaml
```
paddlerec 运行的是在paddlerec库安装目录下的组网文件(model.py)
- **改动了模型组网**
假如你将paddlerec代码库克隆在了`/home/PaddleRec`,并修改了`/home/PaddleRec/models/rank/dnn/model.py`, 以及`/home/PaddleRec/models/rank/dnn/config.yaml`,则首先需要更改`yaml`中的`workspace`的设置:
```yaml
workspace: /home/PaddleRec/models/rank/dnn/
```
再执行:
```shell
python -m paddlerec.run -m /home/PaddleRec/models/rank/dnn/config.yaml
```
paddlerec 运行的是绝对路径下的组网文件(model.py)
### yaml中训练相关的概念
`config.yaml`中训练流程相关有两个重要的逻辑概念,`runner``phase`
- **`runner`** : runner是训练的引擎,亦可称之为运行器,在runner中定义执行设备(cpu、gpu),执行的模式(训练、预测、单机、多机等),以及运行的超参,例如训练轮数,模型保存地址等。
- **`phase`** : phase是训练中的阶段的概念,是引擎具体执行的内容,该内容是指:具体运行哪个模型文件,使用哪个reader。
PaddleRec每次运行时,只会执行一个运行器,通过`mode`指定`runner`的名字。但每个运行器可以执行多个`phase`,所以PaddleRec支持一键启动多阶段的训练。
### 单机训练启动配置
下面我们开始定义一个单机训练的`runner`:
```yaml
mode: runner_train # 执行名为 runner_train 的运行器
runner:
- name: runner_train # 定义 runner 名为 runner_train
class: single_train # 执行单机训练 class = single_train
device: cpu # 执行在 cpu 上
epochs: 10 # 训练轮数
save_checkpoint_interval: 2 # 每隔2轮保存一次checkpoint
save_inference_interval: 4 # 每个4轮保存依次inference model
save_checkpoint_path: "increment" # checkpoint 的保存地址
save_inference_path: "inference" # inference model 的保存地址
save_inference_feed_varnames: [] # inference model 的feed参数的名字
save_inference_fetch_varnames: [] # inference model 的fetch参数的名字
init_model_path: "" # 如果是加载模型热启,则可以指定初始化模型的地址
print_interval: 10 # 训练信息的打印间隔,以batch为单位
```
再定义具体的执行内容:
```yaml
phase:
- name: phase_train # 该阶段名为 phase1
model: "{workspace}/model.py" # 模型文件为workspace下的model.py
dataset_name: dataset_train # reader的名字
dataset:
- name: dataset_train
type: DataLoader # 使用DataLoader的数据读取方式
batch_size: 2
data_path: "{workspace}/train_data" # 数据地址
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" # sparse 输入的位置定义
dense_slots: "dense_var:13" # dense参数的维度定义
```
```yaml
# 全局配置
# Debug 模式开关,Debug模式下,会打印OP的耗时及IO占比
debug: false
# 工作区目录
# 使用文件夹路径,则会在该目录下寻找超参配置,组网,数据等必须文件
workspace: "/home/demo_model/"
# 若 workspace: paddlerec.models.rank.dnn
# 则会使用官方默认配置与组网
# 用户可以指定多个dataset(数据读取配置)
# 运行的不同阶段可以使用不同的dataset
dataset:
# dataloader 示例
- name: dataset_1
type: DataLoader
batch_size: 5
data_path: "{workspace}/data/train"
# 指定自定义的reader.py所在路径
data_converter: "{workspace}/rsc15_reader.py"
# QueueDataset 示例
- name: dataset_2
type: QueueDataset
batch_size: 5
data_path: "{workspace}/data/train"
# 用户可以配置sparse_slots和dense_slots,无需再定义data_converter,使用默认reader
sparse_slots: "click ins_weight 6001 6002 6003 6005 6006 6007 6008 6009"
dense_slots: "readlist:9"
# 自定义超参数,主要涉及网络中的模型超参及优化器
hyper_parameters:
#优化器
optimizer:
class: Adam # 直接配置Optimizer,目前支持sgd/Adam/AdaGrad
learning_rate: 0.001
strategy: "{workspace}/conf/config_fleet.py" # 使用大规模稀疏pslib模式的特有配置
# 模型超参
vocab_size: 1000
hid_size: 100
# 通过全局参数mode指定当前运行的runner
mode: runner_1
# runner主要涉及模型的执行环境,如:单机/分布式,CPU/GPU,迭代轮次,模型加载与保存地址
runner:
- name: runner_1 # 配置一个runner,进行单机的训练
class: single_train # 配置运行模式的选择,还可以选择:single_infer/local_cluster_train/cluster_train
epochs: 10
device: cpu
init_model_path: ""
save_checkpoint_interval: 2
save_inference_interval: 4
# 下面是保存模型路径配置
save_checkpoint_path: "xxxx"
save_inference_path: "xxxx"
- name: runner_2 # 配置一个runner,进行单机的预测
class: single_infer
epochs: 1
device: cpu
init_model_path: "afs:/xxx/xxx"
# 模型在训练时,可能存在多个阶段,每个阶段的组网与数据读取都可能不尽相同
# 每个runner都会完整的运行所有阶段
# phase指定运行时加载的模型及reader
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: sample_1
thread_num: 1
```
# PaddleRec yaml配置说明
## 全局变量
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
| :-------: | :----: | :-----------------------------------: | :------: | :------------------------------------------------: |
| workspace | string | 路径 / paddlerec.models.{方向}.{模型} | 是 | 指定model/reader/data所在位置 |
| mode | string | runner名称 | 是 | 指定当次运行使用哪个runner |
| debug | bool | True / False | 否 | 当dataset.mode=QueueDataset时,开启op耗时debug功能 |
## runner变量
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
| :---------------------------: | :----------: | :-------------------------------: | :------: | :---------------------------------------------: |
| name | string | 任意 | 是 | 指定runner名称 |
| class | string | single_train(默认) / single_infer | 是 | 指定运行runner的类别(单机/分布式, 训练/预测) |
| device | string | cpu(默认) / gpu | 否 | 程序执行设备 |
| epochs | int | >= 1 | 否 | 模型训练迭代轮数 |
| init_model_path | string | 路径 | 否 | 初始化模型地址 |
| save_checkpoint_interval | int | >= 1 | 否 | Save参数的轮数间隔 |
| save_checkpoint_path | string | 路径 | 否 | Save参数的地址 |
| save_inference_interval | int | >= 1 | 否 | Save预测模型的轮数间隔 |
| save_inference_path | string | 路径 | 否 | Save预测模型的地址 |
| save_inference_feed_varnames | list[string] | 组网中指定Variable的name | 否 | 预测模型的入口变量name |
| save_inference_fetch_varnames | list[string] | 组网中指定Variable的name | 否 | 预测模型的出口变量name |
| print_interval | int | >= 1 | 否 | 训练指标打印batch间隔 |
## phase变量
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
| :----------: | :----: | :----------: | :------: | :-----------------------------: |
| name | string | 任意 | 是 | 指定phase名称 |
| model | string | model.py路径 | 是 | 指定Model()所在的python文件地址 |
| dataset_name | string | dataset名称 | 是 | 指定使用哪个Reader |
| thread_num | int | >= 1 | 否 | 模型训练线程数 |
## dataset变量
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
| :------------: | :----: | :-----------------------: | :------: | :----------------------------: |
| name | string | 任意 | 是 | 指定dataset名称 |
| type | string | DataLoader / QueueDataset | 是 | 指定数据读取方式 |
| batch_size | int | >= 1 | 否 | 指定批训练样本数量 |
| data_path | string | 路径 | 是 | 指定数据来源地址 |
| data_converter | string | reader.py路径 | 是 | 指定Reader()所在python文件地址 |
| sparse_slots | string | string | 否 | 指定稀疏参数选项 |
| dense_slots | string | string | 否 | 指定稠密参数选项 |
## hyper_parameters变量
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
| :---------------------: | :----: | :--------------: | :------: | :-------------------------: |
| optimizer.class | string | SGD/Adam/Adagrad | 是 | 指定优化器类型 |
| optimizer.learning_rate | float | > 0 | 否 | 指定学习率 |
| reg | float | > 0 | 否 | L2正则化参数,只在SGD下生效 |
| others | / | / | / | 由各个模型组网独立指定 |
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#coding=utf8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import random
import json
user_fea = ["userid", "gender", "age", "occupation"]
movie_fea = ["movieid", "title", "genres"]
rating_fea = ["userid", "movieid", "rating", "time"]
dict_size = 60000000
hash_dict = dict()
data_path = "ml-1m"
test_user_path = "online_user"
def process(path):
user_dict = parse_data(data_path + "/users.dat", user_fea)
movie_dict = parse_movie_data(data_path + "/movies.dat", movie_fea)
for line in open(path):
line = line.strip()
arr = line.split("::")
userid = arr[0]
movieid = arr[1]
out_str = "time:%s\t%s\t%s\tlabel:%s" % (arr[3], user_dict[userid],
movie_dict[movieid], arr[2])
log_id = hash(out_str) % 1000000000
print "%s\t%s" % (log_id, out_str)
def parse_data(file_name, feas):
dict = {}
for line in open(file_name):
line = line.strip()
arr = line.split("::")
out_str = ""
for i in range(0, len(feas)):
out_str += "%s:%s\t" % (feas[i], arr[i])
dict[arr[0]] = out_str.strip()
return dict
def parse_movie_data(file_name, feas):
dict = {}
for line in open(file_name):
line = line.strip()
arr = line.split("::")
title_str = ""
genres_str = ""
for term in arr[1].split(" "):
term = term.strip()
if term != "":
title_str += "%s " % (term)
for term in arr[2].split("|"):
term = term.strip()
if term != "":
genres_str += "%s " % (term)
out_str = "movieid:%s\ttitle:%s\tgenres:%s" % (
arr[0], title_str.strip(), genres_str.strip())
dict[arr[0]] = out_str.strip()
return dict
def to_hash(in_str):
feas = in_str.split(":")[0]
arr = in_str.split(":")[1]
out_str = "%s:%s" % (feas, (arr + arr[::-1] + arr[::-2] + arr[::-3]))
hash_id = hash(out_str) % dict_size
if hash_id in hash_dict and hash_dict[hash_id] != out_str:
print(hash_id, out_str, hash(out_str))
print("conflict")
exit(-1)
return "%s:%s" % (feas, hash_id)
def to_hash_list(in_str):
arr = in_str.split(":")
tmp_arr = arr[1].split(" ")
out_str = ""
for item in tmp_arr:
item = item.strip()
if item != "":
key = "%s:%s" % (arr[0], item)
out_str += "%s " % (to_hash(key))
return out_str.strip()
def get_hash(path):
#0-34831 1-time:974673057 2-userid:2021 3-gender:M 4-age:25 5-occupation:0 6-movieid:1345 7-title:Carrie (1976) 8-genres:Horror 9-label:2
for line in open(path):
arr = line.strip().split("\t")
out_str = "logid:%s %s %s %s %s %s %s %s %s %s" % \
(arr[0], arr[1], to_hash(arr[2]), to_hash(arr[3]), to_hash(arr[4]), to_hash(arr[5]), \
to_hash(arr[6]), to_hash_list(arr[7]), to_hash_list(arr[8]), arr[9])
print out_str
def generate_online_user():
movie_dict = parse_movie_data(data_path + "/movies.dat", movie_fea)
with open(test_user_path + "/movies.dat", 'w') as f:
for line in open(test_user_path + "/users.dat"):
line = line.strip()
arr = line.split("::")
userid = arr[0]
for item in movie_dict:
f.write(userid + "::" + item + "::1")
f.write("\n")
def generate_online_data(path):
user_dict = parse_data(data_path + "/users.dat", user_fea)
movie_dict = parse_movie_data(data_path + "/movies.dat", movie_fea)
for line in open(path):
line = line.strip()
arr = line.split("::")
userid = arr[0]
movieid = arr[1]
label = arr[2]
out_str = "time:%s\t%s\t%s\tlabel:%s" % ("1", user_dict[userid],
movie_dict[movieid], label)
log_id = hash(out_str) % 1000000000
res = "%s\t%s" % (log_id, out_str)
arr = res.strip().split("\t")
out_str = "logid:%s %s %s %s %s %s %s %s %s %s" % \
(arr[0], arr[1], to_hash(arr[2]), to_hash(arr[3]), to_hash(arr[4]), to_hash(arr[5]), \
to_hash(arr[6]), to_hash_list(arr[7]), to_hash_list(arr[8]), arr[9])
print(out_str)
if __name__ == "__main__":
random.seed(1111111)
if sys.argv[1] == "process_raw":
process(sys.argv[2])
elif sys.argv[1] == "hash":
get_hash(sys.argv[2])
elif sys.argv[1] == "data_recall":
generate_online_user()
generate_online_data(test_user_path + "/movies.dat")
elif sys.argv[1] == "data_rank":
generate_online_data(test_user_path + "/movies.dat")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
train = dict()
test = dict()
data_path = "ml-1m"
for line in open(data_path + "/ratings.dat"):
fea = line.rstrip().split("::")
if fea[0] not in train:
train[fea[0]] = [line]
elif fea[0] not in test:
test[fea[0]] = dict()
test[fea[0]]['time'] = int(fea[3])
test[fea[0]]['content'] = line
else:
time = int(fea[3])
if time <= test[fea[0]]['time']:
train[fea[0]].append(line)
else:
train[fea[0]].append(test[fea[0]]['content'])
test[fea[0]]['time'] = time
test[fea[0]]['content'] = line
train_data = []
for key in train:
for line in train[key]:
train_data.append(line)
random.shuffle(train_data)
with open(data_path + "/train.dat", 'w') as f:
for line in train_data:
f.write(line)
with open(data_path + "/test.dat", 'w') as f:
for key in test:
f.write(test[key]['content'])
cd data
wget http://files.grouplens.org/datasets/movielens/ml-1m.zip
unzip ml-1m.zip
python split.py
mkdir train/
mkdir test/
python process_ml_1m.py process_raw ./ml-1m/train.dat | sort -t $'\t' -k 9 -n > log.data.train
python process_ml_1m.py process_raw ./ml-1m/test.dat | sort -t $'\t' -k 9 -n > log.data.test
python process_ml_1m.py hash log.data.train > ./train/data.txt
python process_ml_1m.py hash log.data.test > ./test/data.txt
rm log.data.train
rm log.data.test
cd ../
## modify config.yaml to infer mode at first
cd recall
python -m paddlerec.run -m ./config.yaml
cd ../rank
python -m paddlerec.run -m ./config.yaml
cd ..
echo "recall offline test result:"
python parse.py recall_offline recall/infer_result
echo "rank offline test result:"
python parse.py rank_offline rank/infer_result
cd data
python process_ml_1m.py data_rank > online_user/test/data.txt
## modify recall/config.yaml to online_infer mode
cd ../rank
python -m paddlerec.run -m ./config.yaml
cd ../
python parse.py rank_online rank/infer_result
cd data
mkdir online_user/test
python process_ml_1m.py data_recall > online_user/test/data.txt
## modify recall/config.yaml to online_infer mode
cd ../recall
python -m paddlerec.run -m ./config.yaml
cd ../
python parse.py recall_online recall/infer_result
#coding=utf8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import random
import json
import numpy as np
import operator
user_fea = ["userid", "gender", "age", "occupation"]
movie_fea = ["movieid", "title", "genres"]
rating_fea = ["userid", "movieid", "rating", "time"]
dict_size = 60000000
hash_dict = dict()
data_path = "data/ml-1m"
test_user_path = "data/online_user"
topk = 100
def read_raw_data():
user_dict = parse_data(data_path + "/users.dat", user_fea)
movie_dict = parse_data(data_path + "/movies.dat", movie_fea)
ratings_dict = dict()
for line in open(data_path + "/ratings.dat"):
arr = line.strip().split("::")
if arr[0] not in ratings_dict:
ratings_dict[arr[0]] = []
tmp = dict()
tmp["movieid"] = arr[1]
tmp["score"] = arr[2]
tmp["time"] = arr[3]
ratings_dict[arr[0]].append(tmp)
return user_dict, movie_dict, ratings_dict
def parse_data(file_name, feas):
res = {}
for line in open(file_name):
line = line.strip()
arr = line.split("::")
res[arr[0]] = dict()
_ = to_hash(feas[0], arr[0])
for i in range(0, len(feas)):
res[arr[0]][feas[i]] = arr[i]
return res
def to_hash(feas, arr):
out_str = "%s:%s" % (feas, (arr + arr[::-1] + arr[::-2] + arr[::-3]))
hash_id = hash(out_str) % dict_size
if hash_id in hash_dict and hash_dict[hash_id] != out_str:
print(hash_id, out_str, hash(out_str), hash_dict[hash_id])
print("conflict")
exit(-1)
hash_dict[hash_id] = out_str
return hash_id
def load_ground_truth(user_dict, movie_dict, ratings_dict):
for line in open(test_user_path + "/users.dat"):
uid = line.strip().split("::")[0]
display_user(user_dict[uid])
ratings_dict[uid] = sorted(
ratings_dict[uid],
key=lambda i: (i["score"], i["time"]),
reverse=True)
ratings_dict[uid] = ratings_dict[uid][:topk]
for i in range(len(ratings_dict[uid])):
item = ratings_dict[uid][i]
mid = item["movieid"]
for key in movie_fea:
item[key] = movie_dict[mid][key]
display_movies(ratings_dict[uid])
def load_infer_results(path, feas, movie_dict):
with open(path) as f:
content = json.load(f)
total = 0
correct = 0
mae = 0.0
res = dict()
for item in content:
userid = reduce(operator.add, item[feas["userid"]])
movieid = reduce(operator.add, item[feas["movieid"]])
ratings = reduce(operator.add, item[feas["ratings"]])
predict = map(int, ratings)
label = reduce(operator.add, item[feas["label"]])
mae += sum(np.square(np.array(ratings) - np.array(label)))
total += len(label)
correct += sum(np.array(predict) == np.array(label))
for i in range(len(userid)):
hash_uid = userid[i]
hash_mid = movieid[i]
if hash_uid not in hash_dict or hash_mid not in hash_dict:
continue
tmp = hash_dict[hash_uid].split(':')[1]
uid = tmp[:len(tmp) / 3]
tmp = hash_dict[hash_mid].split(':')[1]
mid = tmp[:len(tmp) / 3]
if uid not in res:
res[uid] = []
item = {"score": ratings[i]}
for info in movie_dict[mid]:
item[info] = movie_dict[mid][info]
res[uid].append(item)
for key in res:
tmp = sorted(res[key], key=lambda i: i["score"], reverse=True)
existed_movie = []
res[key] = []
for i in range(len(tmp)):
if len(res[key]) >= topk:
break
if tmp[i]["movieid"] not in existed_movie:
existed_movie.append(tmp[i]["movieid"])
res[key].append(tmp[i])
print("total: " + str(total) + "; correct: " + str(correct))
print("accuracy: " + str(float(correct) / total))
print("mae: " + str(mae / total))
return res
def display_user(item):
out_str = ""
for key in user_fea:
out_str += "%s:%s " % (key, item[key])
print(out_str)
def display_movies(input):
for item in input:
print_str = ""
for key in movie_fea:
print_str += "%s:%s " % (key, item[key])
print_str += "%s:%s" % ("score", item["score"])
print(print_str)
def parse_infer(mode, path, user_dict, movie_dict):
stage, online = mode.split('_')
feas = {
"userid": "userid",
"movieid": "movieid",
"ratings": "scale_0.tmp_0",
"label": "label"
}
infer_results = load_infer_results(path, feas, movie_dict)
if online.startswith("offline"):
return
for uid in infer_results:
display_user(user_dict[uid])
display_movies(infer_results[uid])
with open(test_user_path + "/movies.dat", 'w') as fout:
for uid in infer_results:
for item in infer_results[uid]:
str_ = uid + "::" + str(item["movieid"]) + "::" + str(
int(item["score"])) + "\n"
fout.write(str_)
if __name__ == "__main__":
user_dict, movie_dict, ratings_dict = read_raw_data()
if sys.argv[1] == "ground_truth":
load_ground_truth(user_dict, movie_dict, ratings_dict)
else:
parse_infer(sys.argv[1], sys.argv[2], user_dict, movie_dict)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
workspace: "paddlerec.models.demo.movie_recommand"
# list of dataset
dataset:
- name: dataset_train # name of dataset to distinguish different datasets
batch_size: 128
type: QueueDataset
data_path: "{workspace}/data/train"
sparse_slots: "logid time userid gender age occupation movieid title genres label"
dense_slots: ""
- name: dataset_infer # name
batch_size: 128
type: DataLoader
data_path: "{workspace}/data/test"
sparse_slots: "logid time userid gender age occupation movieid title genres label"
dense_slots: ""
- name: dataset_online_infer # name
batch_size: 10
type: DataLoader
data_path: "{workspace}/data/online_user/test"
sparse_slots: "logid time userid gender age occupation movieid title genres label"
dense_slots: ""
# hyper parameters of user-defined network
hyper_parameters:
# optimizer config
optimizer:
class: Adam
learning_rate: 0.001
strategy: async
# user-defined <key, value> pairs
sparse_feature_number: 60000000
sparse_feature_dim: 9
dense_input_dim: 13
fc_sizes: [512, 256, 128, 32]
# train
mode: runner_train
## online or offline infer
#mode: runner_infer
runner:
- name: runner_train
class: single_train
save_checkpoint_interval: 1 # save model interval of epochs
save_inference_interval: 1 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
epochs: 10
device: cpu
- name: runner_infer
epochs: 1
class: single_infer
print_interval: 10000
init_model_path: "increment/9" # load model path
#train
phase:
- name: phase1
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_train # select dataset by name
thread_num: 12
##offline infer
#phase:
#- name: phase1
# model: "{workspace}/model.py" # user-defined model
# dataset_name: dataset_infer # select dataset by name
# save_path: "./infer_result"
# thread_num: 1
##offline infer
#phase:
#- name: phase1
# model: "{workspace}/model.py" # user-defined model
# dataset_name: dataset_online_infer # select dataset by name
# save_path: "./infer_result"
# thread_num: 1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim")
self.learning_rate = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
self.hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes")
def net(self, input, is_infer=False):
self.user_sparse_inputs = self._sparse_data_var[2:6]
self.mov_sparse_inputs = self._sparse_data_var[6:9]
self.label_input = self._sparse_data_var[-1]
def fc(input):
fcs = [input]
for size in self.hidden_layers:
output = fluid.layers.fc(
input=fcs[-1],
size=size,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1.0 / math.sqrt(fcs[-1].shape[1]))))
fcs.append(output)
return fcs[-1]
def embedding_layer(input):
emb = fluid.layers.embedding(
input=input,
is_sparse=True,
is_distributed=self.is_distributed,
size=[self.sparse_feature_number, self.sparse_feature_dim],
param_attr=fluid.ParamAttr(
name="emb", initializer=fluid.initializer.Uniform()), )
emb_sum = fluid.layers.sequence_pool(input=emb, pool_type='sum')
return emb_sum
user_sparse_embed_seq = list(
map(embedding_layer, self.user_sparse_inputs))
mov_sparse_embed_seq = list(
map(embedding_layer, self.mov_sparse_inputs))
concated_user = fluid.layers.concat(user_sparse_embed_seq, axis=1)
concated_mov = fluid.layers.concat(mov_sparse_embed_seq, axis=1)
usr_combined_features = fc(concated_user)
mov_combined_features = fc(concated_mov)
fc_input = fluid.layers.concat(
[usr_combined_features, mov_combined_features], axis=1)
sim = fluid.layers.fc(
input=fc_input,
size=1,
act='sigmoid',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1.0 / math.sqrt(fc_input.shape[1]))))
predict = fluid.layers.scale(sim, scale=5)
self.predict = predict
#auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
# label=self.label_input,
# num_thresholds=10000,
# slide_steps=20)
if is_infer:
self._infer_results["user_feature"] = usr_combined_features
self._infer_results["movie_feature"] = mov_combined_features
self._infer_results["uid"] = self._sparse_data_var[2]
self._infer_results["movieid"] = self._sparse_data_var[6]
self._infer_results["label"] = self._sparse_data_var[-1]
self._infer_results["predict"] = self.predict
return
#self._metrics["AUC"] = auc
#self._metrics["BATCH_AUC"] = batch_auc
#cost = fluid.layers.cross_entropy(
# input=self.predict, label=self.label_input)
cost = fluid.layers.square_error_cost(
self.predict,
fluid.layers.cast(
x=self.label_input, dtype='float32'))
avg_cost = fluid.layers.reduce_mean(cost)
self._cost = avg_cost
self._metrics["LOSS"] = avg_cost
def optimizer(self):
optimizer = fluid.optimizer.Adam(self.learning_rate, lazy_mode=True)
return optimizer
def infer_net(self):
pass
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
workspace: "paddlerec.models.demo.movie_recommand"
# list of dataset
dataset:
- name: dataset_train # name of dataset to distinguish different datasets
batch_size: 128
type: QueueDataset
data_path: "{workspace}/data/train"
sparse_slots: "logid time userid gender age occupation movieid title genres label"
dense_slots: ""
- name: dataset_infer # name
batch_size: 128
type: DataLoader
data_path: "{workspace}/data/test"
sparse_slots: "logid time userid gender age occupation movieid title genres label"
dense_slots: ""
- name: dataset_online_infer # name
batch_size: 128
type: DataLoader
data_path: "{workspace}/data/online_user/test"
sparse_slots: "logid time userid gender age occupation movieid title genres label"
dense_slots: ""
# hyper parameters of user-defined network
hyper_parameters:
# optimizer config
optimizer:
class: Adam
learning_rate: 0.001
strategy: async
# user-defined <key, value> pairs
sparse_feature_number: 60000000
sparse_feature_dim: 9
dense_input_dim: 13
fc_sizes: [512, 256, 128, 32]
# train
mode: runner_train
## online or offline infer
#mode: runner_infer
runner:
- name: runner_train
class: single_train
save_checkpoint_interval: 1 # save model interval of epochs
save_inference_interval: 1 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
epochs: 10
device: cpu
- name: runner_infer
epochs: 1
class: single_infer
print_interval: 10000
init_model_path: "increment/9" # load model path
#train
phase:
- name: phase1
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_train # select dataset by name
thread_num: 12
##offline infer
#phase:
#- name: phase1
# model: "{workspace}/model.py" # user-defined model
# dataset_name: dataset_infer # select dataset by name
# save_path: "./infer_result"
# thread_num: 1
##offline infer
#phase:
#- name: phase1
# model: "{workspace}/model.py" # user-defined model
# dataset_name: dataset_online_infer # select dataset by name
# save_path: "./infer_result"
# thread_num: 1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim")
self.learning_rate = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
self.hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes")
def net(self, input, is_infer=False):
self.user_sparse_inputs = self._sparse_data_var[2:6]
self.mov_sparse_inputs = self._sparse_data_var[6:9]
self.label_input = self._sparse_data_var[-1]
def fc(input):
fcs = [input]
for size in self.hidden_layers:
output = fluid.layers.fc(
input=fcs[-1],
size=size,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1.0 / math.sqrt(fcs[-1].shape[1]))))
fcs.append(output)
return fcs[-1]
def embedding_layer(input):
emb = fluid.layers.embedding(
input=input,
is_sparse=True,
is_distributed=self.is_distributed,
size=[self.sparse_feature_number, self.sparse_feature_dim],
param_attr=fluid.ParamAttr(
name="emb", initializer=fluid.initializer.Uniform()), )
emb_sum = fluid.layers.sequence_pool(input=emb, pool_type='sum')
return emb_sum
user_sparse_embed_seq = list(
map(embedding_layer, self.user_sparse_inputs))
mov_sparse_embed_seq = list(
map(embedding_layer, self.mov_sparse_inputs))
concated_user = fluid.layers.concat(user_sparse_embed_seq, axis=1)
concated_mov = fluid.layers.concat(mov_sparse_embed_seq, axis=1)
usr_combined_features = fc(concated_user)
mov_combined_features = fc(concated_mov)
sim = fluid.layers.cos_sim(
X=usr_combined_features, Y=mov_combined_features)
predict = fluid.layers.scale(sim, scale=5)
self.predict = predict
if is_infer:
self._infer_results["uid"] = self._sparse_data_var[2]
self._infer_results["movieid"] = self._sparse_data_var[6]
self._infer_results["label"] = self._sparse_data_var[-1]
self._infer_results["predict"] = self.predict
return
cost = fluid.layers.square_error_cost(
self.predict,
fluid.layers.cast(
x=self.label_input, dtype='float32'))
avg_cost = fluid.layers.reduce_mean(cost)
self._cost = avg_cost
self._metrics["LOSS"] = avg_cost
def optimizer(self):
optimizer = fluid.optimizer.Adam(self.learning_rate, lazy_mode=True)
return optimizer
cd recall
python -m paddlerec.run -m ./config.yaml &> log &
cd ../rank
python -m paddlerec.run -m ./config.yaml &> log &
cd ..
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册