未验证 提交 42074469 编写于 作者: T tangwei12 提交者: GitHub

Merge branch 'master' into rarfile

此差异已折叠。
(简体中文|[English](./README.md))
<p align="center">
<img align="center" src="doc/imgs/logo.png">
<p>
<p align="center">
<img align="center" src="doc/imgs/structure.png">
<p>
<p align="center">
<img align="center" src="doc/imgs/overview.png">
<p>
<h2 align="center">什么是推荐系统?</h2>
<p align="center">
<img align="center" src="doc/imgs/rec-overview.png">
<p>
- 推荐系统是在互联网信息爆炸式增长的时代背景下,帮助用户高效获得感兴趣信息的关键;
- 推荐系统也是帮助产品最大限度吸引用户、留存用户、增加用户粘性、提高用户转化率的银弹。
- 有无数优秀的产品依靠用户可感知的推荐系统建立了良好的口碑,也有无数的公司依靠直击用户痛点的推荐系统在行业中占领了一席之地。
> 可以说,谁能掌握和利用好推荐系统,谁就能在信息分发的激烈竞争中抢得先机。
> 但与此同时,有着许多问题困扰着推荐系统的开发者,比如:庞大的数据量,复杂的模型结构,低效的分布式训练环境,波动的在离线一致性,苛刻的上线部署要求,以上种种,不胜枚举。
<h2 align="center">什么是PaddleRec?</h2>
- 源于飞桨生态的搜索推荐模型 **一站式开箱即用工具**
- 适合初学者,开发者,研究者的推荐系统全流程解决方案
- 包含内容理解、匹配、召回、排序、 多任务、重排序等多个任务的完整推荐搜索算法库
| 方向 | 模型 | 单机CPU | 单机GPU | 分布式CPU | 分布式GPU | 论文 |
| :------: | :-----------------------------------------------------------------------: | :-----: | :-----: | :-------: | :-------: | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 内容理解 | [Text-Classifcation](models/contentunderstanding/classification/model.py) | ✓ | ✓ | ✓ | x | [EMNLP 2014][Convolutional neural networks for sentence classication](https://www.aclweb.org/anthology/D14-1181.pdf) |
| 内容理解 | [TagSpace](models/contentunderstanding/tagspace/model.py) | ✓ | ✓ | ✓ | x | [EMNLP 2014][TagSpace: Semantic Embeddings from Hashtags](https://www.aclweb.org/anthology/D14-1194.pdf) |
| 匹配 | [DSSM](models/match/dssm/model.py) | ✓ | ✓ | ✓ | x | [CIKM 2013][Learning Deep Structured Semantic Models for Web Search using Clickthrough Data](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/cikm2013_DSSM_fullversion.pdf) |
| 匹配 | [MultiView-Simnet](models/match/multiview-simnet/model.py) | ✓ | ✓ | ✓ | x | [WWW 2015][A Multi-View Deep Learning Approach for Cross Domain User Modeling in Recommendation Systems](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/frp1159-songA.pdf) |
| 召回 | [TDM](models/treebased/tdm/model.py) | ✓ | >=1.8.0 | ✓ | >=1.8.0 | [KDD 2018][Learning Tree-based Deep Model for Recommender Systems](https://arxiv.org/pdf/1801.02294.pdf) |
| 召回 | [fasttext](models/recall/fasttext/model.py) | ✓ | ✓ | x | x | [EACL 2017][Bag of Tricks for Efficient Text Classification](https://www.aclweb.org/anthology/E17-2068.pdf) |
| 召回 | [Word2Vec](models/recall/word2vec/model.py) | ✓ | ✓ | ✓ | x | [NIPS 2013][Distributed Representations of Words and Phrases and their Compositionality](https://papers.nips.cc/paper/5021-distributed-representations-of-words-and-phrases-and-their-compositionality.pdf) |
| 召回 | [SSR](models/recall/ssr/model.py) | ✓ | ✓ | ✓ | ✓ | [SIGIR 2016][Multi-Rate Deep Learning for Temporal Recommendation](http://sonyis.me/paperpdf/spr209-song_sigir16.pdf) |
| 召回 | [Gru4Rec](models/recall/gru4rec/model.py) | ✓ | ✓ | ✓ | ✓ | [2015][Session-based Recommendations with Recurrent Neural Networks](https://arxiv.org/abs/1511.06939) |
| 召回 | [Youtube_dnn](models/recall/youtube_dnn/model.py) | ✓ | ✓ | ✓ | ✓ | [RecSys 2016][Deep Neural Networks for YouTube Recommendations](https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/45530.pdf) |
| 召回 | [NCF](models/recall/ncf/model.py) | ✓ | ✓ | ✓ | ✓ | [WWW 2017][Neural Collaborative Filtering](https://arxiv.org/pdf/1708.05031.pdf) |
| 召回 | [GNN](models/recall/gnn/model.py) | ✓ | ✓ | ✓ | ✓ | [AAAI 2019][Session-based Recommendation with Graph Neural Networks](https://arxiv.org/abs/1811.00855) |
| 排序 | [Logistic Regression](models/rank/logistic_regression/model.py) | ✓ | x | ✓ | x | / |
| 排序 | [Dnn](models/rank/dnn/model.py) | ✓ | ✓ | ✓ | ✓ | / |
| 排序 | [FM](models/rank/fm/model.py) | ✓ | x | ✓ | x | [IEEE Data Mining 2010][Factorization machines](https://analyticsconsultores.com.mx/wp-content/uploads/2019/03/Factorization-Machines-Steffen-Rendle-Osaka-University-2010.pdf) |
| 排序 | [FFM](models/rank/ffm/model.py) | ✓ | x | ✓ | x | [RECSYS 2016][Field-aware Factorization Machines for CTR Prediction](https://dl.acm.org/doi/pdf/10.1145/2959100.2959134) |
| 排序 | [FNN](models/rank/fnn/model.py) | ✓ | x | ✓ | x | [ECIR 2016][Deep Learning over Multi-field Categorical Data](https://arxiv.org/pdf/1601.02376.pdf) |
| 排序 | [Deep Crossing](models/rank/deep_crossing/model.py) | ✓ | x | ✓ | x | [ACM 2016][Deep Crossing: Web-Scale Modeling without Manually Crafted Combinatorial Features](https://www.kdd.org/kdd2016/papers/files/adf0975-shanA.pdf) |
| 排序 | [Pnn](models/rank/pnn/model.py) | ✓ | x | ✓ | x | [ICDM 2016][Product-based Neural Networks for User Response Prediction](https://arxiv.org/pdf/1611.00144.pdf) |
| 排序 | [DCN](models/rank/dcn/model.py) | ✓ | x | ✓ | x | [KDD 2017][Deep & Cross Network for Ad Click Predictions](https://dl.acm.org/doi/pdf/10.1145/3124749.3124754) |
| 排序 | [NFM](models/rank/nfm/model.py) | ✓ | x | ✓ | x | [SIGIR 2017][Neural Factorization Machines for Sparse Predictive Analytics](https://dl.acm.org/doi/pdf/10.1145/3077136.3080777) |
| 排序 | [AFM](models/rank/afm/model.py) | ✓ | x | ✓ | x | [IJCAI 2017][Attentional Factorization Machines: Learning the Weight of Feature Interactions via Attention Networks](https://arxiv.org/pdf/1708.04617.pdf) |
| 排序 | [DeepFM](models/rank/deepfm/model.py) | ✓ | x | ✓ | x | [IJCAI 2017][DeepFM: A Factorization-Machine based Neural Network for CTR Prediction](https://arxiv.org/pdf/1703.04247.pdf) |
| 排序 | [xDeepFM](models/rank/xdeepfm/model.py) | ✓ | x | ✓ | x | [KDD 2018][xDeepFM: Combining Explicit and Implicit Feature Interactions for Recommender Systems](https://dl.acm.org/doi/pdf/10.1145/3219819.3220023) |
| 排序 | [DIN](models/rank/din/model.py) | ✓ | x | ✓ | x | [KDD 2018][Deep Interest Network for Click-Through Rate Prediction](https://dl.acm.org/doi/pdf/10.1145/3219819.3219823) |
| 排序 | [DIEN](models/rank/dien/model.py) | ✓ | x | ✓ | x | [AAAI 2019][Deep Interest Evolution Network for Click-Through Rate Prediction](https://www.aaai.org/ojs/index.php/AAAI/article/view/4545/4423) |
| 排序 | [Wide&Deep](models/rank/wide_deep/model.py) | ✓ | x | ✓ | x | [DLRS 2016][Wide & Deep Learning for Recommender Systems](https://dl.acm.org/doi/pdf/10.1145/2988450.2988454) |
| 排序 | [FGCNN](models/rank/fgcnn/model.py) | ✓ | ✓ | ✓ | ✓ | [WWW 2019][Feature Generation by Convolutional Neural Network for Click-Through Rate Prediction](https://arxiv.org/pdf/1904.04447.pdf) |
| 排序 | [Fibinet](models/rank/fibinet/model.py) | ✓ | ✓ | ✓ | ✓ | [RecSys19][FiBiNET: Combining Feature Importance and Bilinear feature Interaction for Click-Through Rate Prediction]( https://arxiv.org/pdf/1905.09433.pdf) |
| 多任务 | [ESMM](models/multitask/esmm/model.py) | ✓ | ✓ | ✓ | ✓ | [SIGIR 2018][Entire Space Multi-Task Model: An Effective Approach for Estimating Post-Click Conversion Rate](https://arxiv.org/abs/1804.07931) |
| 多任务 | [MMOE](models/multitask/mmoe/model.py) | ✓ | ✓ | ✓ | ✓ | [KDD 2018][Modeling Task Relationships in Multi-task Learning with Multi-gate Mixture-of-Experts](https://dl.acm.org/doi/abs/10.1145/3219819.3220007) |
| 多任务 | [ShareBottom](models/multitask/share-bottom/model.py) | ✓ | ✓ | ✓ | ✓ | [1998][Multitask learning](http://reports-archive.adm.cs.cmu.edu/anon/1997/CMU-CS-97-203.pdf) |
| 重排序 | [Listwise](models/rerank/listwise/model.py) | ✓ | ✓ | ✓ | x | [2019][Sequential Evaluation and Generation Framework for Combinatorial Recommender System](https://arxiv.org/pdf/1902.00245.pdf) |
<h2 align="center">快速安装</h2>
### 环境要求
* Python 2.7/ 3.5 / 3.6 / 3.7
* PaddlePaddle >= 1.7.2
* 操作系统: Windows/Mac/Linux
> Windows下PaddleRec目前仅支持单机训练,分布式训练建议使用Linux环境
### 安装命令
- 安装方法一 **PIP源直接安装**
```bash
python -m pip install paddle-rec
```
> 该方法会默认下载安装`paddlepaddle v1.7.2 cpu版本`,若提示`PaddlePaddle`无法安装,则依照下述方法首先安装`PaddlePaddle`,再安装`PaddleRec`:
> - 可以在[该地址](https://pypi.org/project/paddlepaddle/1.7.2/#files),下载PaddlePaddle后手动安装whl包
> - 可以先pip安装`PaddlePaddle`,`python -m pip install paddlepaddle==1.7.2 -i https://mirror.baidu.com/pypi/simple`
> - 其他安装问题可以在[Paddle Issue](https://github.com/PaddlePaddle/Paddle/issues)或[PaddleRec Issue](https://github.com/PaddlePaddle/PaddleRec/issues)提出,会有工程师及时解答
- 安装方法二 **源码编译安装**
- 安装飞桨 **注:需要用户安装版本 == 1.7.2 的飞桨**
```shell
python -m pip install paddlepaddle==1.7.2 -i https://mirror.baidu.com/pypi/simple
```
- 源码安装PaddleRec
```
git clone https://github.com/PaddlePaddle/PaddleRec/
cd PaddleRec
python setup.py install
```
- PaddleRec-GPU安装方法
在使用方法一或方法二完成PaddleRec安装后,需再手动安装`paddlepaddle-gpu`,并根据自身环境(Cuda/Cudnn)选择合适的版本,安装教程请查阅[飞桨-开始使用](https://www.paddlepaddle.org.cn/install/quick)
<h2 align="center">一键启动</h2>
我们以排序模型中的`dnn`模型为例介绍PaddleRec的一键启动。训练数据来源为[Criteo数据集](https://www.kaggle.com/c/criteo-display-ad-challenge/),我们从中截取了100条数据:
```bash
# 使用CPU进行单机训练
python -m paddlerec.run -m paddlerec.models.rank.dnn
```
<h2 align="center">帮助文档</h2>
### 项目背景
* [推荐系统介绍](doc/rec_background.md)
* [分布式深度学习介绍](doc/ps_background.md)
### 快速开始
* [十分钟上手PaddleRec](https://aistudio.baidu.com/aistudio/projectdetail/559336)
### 入门教程
* [数据准备](doc/slot_reader.md)
* [模型调参](doc/model.md)
* [启动训练](doc/train.md)
* [启动预测](doc/predict.md)
* [快速部署](doc/serving.md)
### 进阶教程
* [自定义Reader](doc/custom_reader.md)
* [自定义模型](doc/model_develop.md)
* [自定义流程](doc/trainer_develop.md)
* [yaml配置说明](doc/yaml.md)
* [PaddleRec设计文档](doc/design.md)
### Benchmark
* [Benchmark](doc/benchmark.md)
### FAQ
* [常见问题FAQ](doc/faq.md)
<h2 align="center">社区</h2>
<p align="center">
<br>
<img alt="Release" src="https://img.shields.io/badge/Release-0.1.0-yellowgreen">
<img alt="License" src="https://img.shields.io/github/license/PaddlePaddle/PaddleRec">
<img alt="Slack" src="https://img.shields.io/badge/Join-Slack-green">
<br>
<p>
### 版本历史
- 2020.06.17 - PaddleRec v0.1.0
- 2020.06.03 - PaddleRec v0.0.2
- 2020.05.14 - PaddleRec v0.0.1
### 许可证书
本项目的发布受[Apache 2.0 license](LICENSE)许可认证。
### 联系我们
如有意见、建议及使用中的BUG,欢迎在[GitHub Issue](https://github.com/PaddlePaddle/PaddleRec/issues)提交
亦可通过以下方式与我们沟通交流:
- QQ群号码:`861717190`
- 微信小助手微信号:`paddlerec2020`
<p align="center"><img width="200" height="200" margin="500" src="./doc/imgs/QQ_group.png"/>&#8194;&#8194;&#8194;&#8194;&#8194<img width="200" height="200" src="doc/imgs/weixin_supporter.png"/></p>
<p align="center">PaddleRec交流QQ群&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;PaddleRec微信小助手</p>
([简体中文](./README.md)|English)
<p align="center">
<img align="center" src="doc/imgs/logo.png">
<p>
<p align="center">
<img align="center" src="doc/imgs/overview_en.png">
<p>
<h2 align="center">What is recommendation system ?</h2>
<p align="center">
<img align="center" src="doc/imgs/rec-overview-en.png">
<p>
- Recommendation system helps users quickly find useful and interesting information from massive data.
- Recommendation system is also a silver bullet to attract users, retain users, increase users' stickness or conversionn.
> Who can better use the recommendation system, who can gain more advantage in the fierce competition.
>
> At the same time, there are many problems in the process of using the recommendation system, such as: huge data, complex model, inefficient distributed training, and so on.
<h2 align="center">What is PaddleRec ?</h2>
- A quick start tool of search & recommendation algorithm based on [PaddlePaddle](https://www.paddlepaddle.org.cn/documentation/docs/en/beginners_guide/index_en.html)
- A complete solution of recommendation system for beginners, developers and researchers.
- Recommendation algorithm library including content-understanding, match, recall, rank, multi-task, re-rank etc.
| Type | Algorithm | CPU | GPU | Parameter-Server | Multi-GPU | Paper |
| :-------------------: | :-----------------------------------------------------------------------: | :---: | :-----: | :--------------: | :-------: | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Content-Understanding | [Text-Classifcation](models/contentunderstanding/classification/model.py) | ✓ | ✓ | ✓ | x | [EMNLP 2014][Convolutional neural networks for sentence classication](https://www.aclweb.org/anthology/D14-1181.pdf) |
| Content-Understanding | [TagSpace](models/contentunderstanding/tagspace/model.py) | ✓ | ✓ | ✓ | x | [EMNLP 2014][TagSpace: Semantic Embeddings from Hashtags](https://www.aclweb.org/anthology/D14-1194.pdf) |
| Match | [DSSM](models/match/dssm/model.py) | ✓ | ✓ | ✓ | x | [CIKM 2013][Learning Deep Structured Semantic Models for Web Search using Clickthrough Data](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/cikm2013_DSSM_fullversion.pdf) |
| Match | [MultiView-Simnet](models/match/multiview-simnet/model.py) | ✓ | ✓ | ✓ | x | [WWW 2015][A Multi-View Deep Learning Approach for Cross Domain User Modeling in Recommendation Systems](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/frp1159-songA.pdf) |
| Recall | [TDM](models/treebased/tdm/model.py) | ✓ | >=1.8.0 | ✓ | >=1.8.0 | [KDD 2018][Learning Tree-based Deep Model for Recommender Systems](https://arxiv.org/pdf/1801.02294.pdf) |
| Recall | [fasttext](models/recall/fasttext/model.py) | ✓ | ✓ | x | x | [EACL 2017][Bag of Tricks for Efficient Text Classification](https://www.aclweb.org/anthology/E17-2068.pdf) |
| Recall | [Word2Vec](models/recall/word2vec/model.py) | ✓ | ✓ | ✓ | x | [NIPS 2013][Distributed Representations of Words and Phrases and their Compositionality](https://papers.nips.cc/paper/5021-distributed-representations-of-words-and-phrases-and-their-compositionality.pdf) |
| Recall | [SSR](models/recall/ssr/model.py) | ✓ | ✓ | ✓ | ✓ | [SIGIR 2016][Multi-Rate Deep Learning for Temporal Recommendation](http://sonyis.me/paperpdf/spr209-song_sigir16.pdf) |
| Recall | [Gru4Rec](models/recall/gru4rec/model.py) | ✓ | ✓ | ✓ | ✓ | [2015][Session-based Recommendations with Recurrent Neural Networks](https://arxiv.org/abs/1511.06939) |
| Recall | [Youtube_dnn](models/recall/youtube_dnn/model.py) | ✓ | ✓ | ✓ | ✓ | [RecSys 2016][Deep Neural Networks for YouTube Recommendations](https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/45530.pdf) |
| Recall | [NCF](models/recall/ncf/model.py) | ✓ | ✓ | ✓ | ✓ | [WWW 2017][Neural Collaborative Filtering](https://arxiv.org/pdf/1708.05031.pdf) |
| Recall | [GNN](models/recall/gnn/model.py) | ✓ | ✓ | ✓ | ✓ | [AAAI 2019][Session-based Recommendation with Graph Neural Networks](https://arxiv.org/abs/1811.00855) |
| Rank | [Logistic Regression](models/rank/logistic_regression/model.py) | ✓ | x | ✓ | x | / |
| Rank | [Dnn](models/rank/dnn/model.py) | ✓ | ✓ | ✓ | ✓ | / |
| Rank | [FM](models/rank/fm/model.py) | ✓ | x | ✓ | x | [IEEE Data Mining 2010][Factorization machines](https://analyticsconsultores.com.mx/wp-content/uploads/2019/03/Factorization-Machines-Steffen-Rendle-Osaka-University-2010.pdf) |
| Rank | [FFM](models/rank/ffm/model.py) | ✓ | x | ✓ | x | [RECSYS 2016][Field-aware Factorization Machines for CTR Prediction](https://dl.acm.org/doi/pdf/10.1145/2959100.2959134) |
| Rank | [FNN](models/rank/fnn/model.py) | ✓ | x | ✓ | x | [ECIR 2016][Deep Learning over Multi-field Categorical Data](https://arxiv.org/pdf/1601.02376.pdf) |
| Rank | [Deep Crossing](models/rank/deep_crossing/model.py) | ✓ | x | ✓ | x | [ACM 2016][Deep Crossing: Web-Scale Modeling without Manually Crafted Combinatorial Features](https://www.kdd.org/kdd2016/papers/files/adf0975-shanA.pdf) |
| Rank | [Pnn](models/rank/pnn/model.py) | ✓ | x | ✓ | x | [ICDM 2016][Product-based Neural Networks for User Response Prediction](https://arxiv.org/pdf/1611.00144.pdf) |
| Rank | [DCN](models/rank/dcn/model.py) | ✓ | x | ✓ | x | [KDD 2017][Deep & Cross Network for Ad Click Predictions](https://dl.acm.org/doi/pdf/10.1145/3124749.3124754) |
| Rank | [NFM](models/rank/nfm/model.py) | ✓ | x | ✓ | x | [SIGIR 2017][Neural Factorization Machines for Sparse Predictive Analytics](https://dl.acm.org/doi/pdf/10.1145/3077136.3080777) |
| Rank | [AFM](models/rank/afm/model.py) | ✓ | x | ✓ | x | [IJCAI 2017][Attentional Factorization Machines: Learning the Weight of Feature Interactions via Attention Networks](https://arxiv.org/pdf/1708.04617.pdf) |
| Rank | [DeepFM](models/rank/deepfm/model.py) | ✓ | x | ✓ | x | [IJCAI 2017][DeepFM: A Factorization-Machine based Neural Network for CTR Prediction](https://arxiv.org/pdf/1703.04247.pdf) |
| Rank | [xDeepFM](models/rank/xdeepfm/model.py) | ✓ | x | ✓ | x | [KDD 2018][xDeepFM: Combining Explicit and Implicit Feature Interactions for Recommender Systems](https://dl.acm.org/doi/pdf/10.1145/3219819.3220023) |
| Rank | [DIN](models/rank/din/model.py) | ✓ | x | ✓ | x | [KDD 2018][Deep Interest Network for Click-Through Rate Prediction](https://dl.acm.org/doi/pdf/10.1145/3219819.3219823) |
| Rank | [DIEN](models/rank/dien/model.py) | ✓ | x | ✓ | x | [AAAI 2019][Deep Interest Evolution Network for Click-Through Rate Prediction](https://www.aaai.org/ojs/index.php/AAAI/article/view/4545/4423) |
| Rank | [BST](models/rank/BST/model.py) | ✓ | x | ✓ | x | [DLP-KDD 2019][Behavior Sequence Transformer for E-commerce Recommendation in Alibaba](https://arxiv.org/pdf/1905.06874v1.pdf) |
| Rank | [AutoInt](models/rank/AutoInt/model.py) | ✓ | x | ✓ | x | [CIKM 2019][AutoInt: Automatic Feature Interaction Learning via Self-Attentive Neural Networks](https://arxiv.org/pdf/1810.11921.pdf) |
| Rank | [Wide&Deep](models/rank/wide_deep/model.py) | ✓ | x | ✓ | x | [DLRS 2016][Wide & Deep Learning for Recommender Systems](https://dl.acm.org/doi/pdf/10.1145/2988450.2988454) |
| Rank | [FGCNN](models/rank/fgcnn/model.py) | ✓ | ✓ | ✓ | ✓ | [WWW 2019][Feature Generation by Convolutional Neural Network for Click-Through Rate Prediction](https://arxiv.org/pdf/1904.04447.pdf) |
| Rank | [Fibinet](models/rank/fibinet/model.py) | ✓ | ✓ | ✓ | ✓ | [RecSys19][FiBiNET: Combining Feature Importance and Bilinear feature Interaction for Click-Through Rate Prediction]( https://arxiv.org/pdf/1905.09433.pdf) |
| Rank | [Flen](models/rank/flen/model.py) | ✓ | ✓ | ✓ | ✓ | [2019][FLEN: Leveraging Field for Scalable CTR Prediction]( https://arxiv.org/pdf/1911.04690.pdf) |
| Multi-Task | [ESMM](models/multitask/esmm/model.py) | ✓ | ✓ | ✓ | ✓ | [SIGIR 2018][Entire Space Multi-Task Model: An Effective Approach for Estimating Post-Click Conversion Rate](https://arxiv.org/abs/1804.07931) |
| Multi-Task | [MMOE](models/multitask/mmoe/model.py) | ✓ | ✓ | ✓ | ✓ | [KDD 2018][Modeling Task Relationships in Multi-task Learning with Multi-gate Mixture-of-Experts](https://dl.acm.org/doi/abs/10.1145/3219819.3220007) |
| Multi-Task | [ShareBottom](models/multitask/share-bottom/model.py) | ✓ | ✓ | ✓ | ✓ | [1998][Multitask learning](http://reports-archive.adm.cs.cmu.edu/anon/1997/CMU-CS-97-203.pdf) |
| Re-Rank | [Listwise](models/rerank/listwise/model.py) | ✓ | ✓ | ✓ | x | [2019][Sequential Evaluation and Generation Framework for Combinatorial Recommender System](https://arxiv.org/pdf/1902.00245.pdf) |
<h2 align="center">Getting Started</h2>
### Environmental requirements
* Python 2.7/ 3.5 / 3.6 / 3.7
* PaddlePaddle >= 1.7.2
* operating system: Windows/Mac/Linux
> Linux is recommended for distributed training
### Installation
1. **Install by pip**
```bash
python -m pip install paddle-rec
```
> This method will download and install `paddlepaddle-v1.7.2-cpu`. If `PaddlePaddle` can not be installed automatically,You need to install `PaddlePaddle` manually,and then install `PaddleRec` again:
> - Download [PaddlePaddle](https://pypi.org/project/paddlepaddle/1.7.2/#files) and install by pip.
> - Install `PaddlePaddle` by pip,`python -m pip install paddlepaddle==1.7.2 -i https://mirror.baidu.com/pypi/simple`
> - Other installation problems can be raised in [Paddle Issue](https://github.com/PaddlePaddle/Paddle/issues) or [PaddleRec Issue](https://github.com/PaddlePaddle/PaddleRec/issues)
2. **Install by source code**
- Install PaddlePaddle
```shell
python -m pip install paddlepaddle==1.7.2 -i https://mirror.baidu.com/pypi/simple
```
- Install PaddleRec by source code
```
git clone https://github.com/PaddlePaddle/PaddleRec/
cd PaddleRec
python setup.py install
```
- Install PaddleRec-GPU
After installing `PaddleRec`,please install the appropriate version of `paddlepaddle-gpu` according to your environment (CUDA / cudnn),refer to the installation tutorial [Installation Manuals](https://www.paddlepaddle.org.cn/documentation/docs/en/install/index_en.html)
<h2 align="center">Quick Start</h2>
We take the `dnn` algorithm as an example to get start of `PaddleRec`, and we take 100 pieces of training data from [Criteo Dataset](https://www.kaggle.com/c/criteo-display-ad-challenge/):
```bash
# Training with cpu
python -m paddlerec.run -m paddlerec.models.rank.dnn
```
<h2 align="center">Documentation</h2>
### Background
* [Recommendation System](doc/rec_background.md)
* [Distributed deep learning](doc/ps_background.md)
### Introductory Project
* [Get start of PaddleRec in ten minutes](https://aistudio.baidu.com/aistudio/projectdetail/559336)
### Introductory tutorial
* [Data](doc/slot_reader.md)
* [Model](doc/model.md)
* [Loacl Train](doc/train.md)
* [Distributed Train](doc/distributed_train.md)
* [Predict](doc/predict.md)
* [Serving](doc/serving.md)
### Advanced tutorial
* [Custom Reader](doc/custom_reader.md)
* [Custom Model](doc/model_develop.md)
* [Custom Training Process](doc/trainer_develop.md)
* [Configuration description of yaml](doc/yaml.md)
* [Design document of PaddleRec](doc/design.md)
### Benchmark
* [Benchmark](doc/benchmark.md)
### FAQ
* [Common Problem FAQ](doc/faq.md)
<h2 align="center">Community</h2>
<p align="center">
<br>
<img alt="Release" src="https://img.shields.io/badge/Release-0.1.0-yellowgreen">
<img alt="License" src="https://img.shields.io/github/license/PaddlePaddle/PaddleRec">
<img alt="Slack" src="https://img.shields.io/badge/Join-Slack-green">
<br>
<p>
### Version history
- 2020.06.17 - PaddleRec v0.1.0
- 2020.06.03 - PaddleRec v0.0.2
- 2020.05.14 - PaddleRec v0.0.1
### License
[Apache 2.0 license](LICENSE)
### Contact us
For any feedback, please propose a [GitHub Issue](https://github.com/PaddlePaddle/PaddleRec/issues)
You can also communicate with us in the following ways:
- QQ group id:`861717190`
- Wechat account:`paddlerec2020`
<p align="center"><img width="200" height="200" margin="500" src="./doc/imgs/QQ_group.png"/>&#8194;&#8194;&#8194;&#8194;&#8194<img width="200" height="200" src="doc/imgs/weixin_supporter.png"/></p>
<p align="center">PaddleRec QQ Group&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;PaddleRec Wechat account</p>
echo "Run before_hook.sh ..."
wget https://paddlerec.bj.bcebos.com/whl/PaddleRec.tar.gz --no-check-certificate
tar -xf PaddleRec.tar.gz
cd PaddleRec
python setup.py install
pip uninstall -y paddlepaddle
pip install paddlepaddle==<$ PADDLEPADDLE_VERSION $> --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com
echo "End before_hook.sh ..."
echo "Run before_hook.sh ..."
wget https://paddlerec.bj.bcebos.com/whl/PaddleRec.tar.gz --no-check-certificate
tar -xf PaddleRec.tar.gz
cd PaddleRec
python setup.py install
pip uninstall -y paddlepaddle
pip install paddlepaddle-gpu==<$ PADDLEPADDLE_VERSION $>.post107 --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com
echo "End before_hook.sh ..."
......@@ -16,23 +16,13 @@
###################################################
# Usage: submit.sh
# Description: run mpi submit client implement
# Description: run paddlecloud submit client implement
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
#-----------------------------------------------------------------------------------------------------------------
#fun : package
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function package_hook() {
g_run_stage="package"
package
}
#-----------------------------------------------------------------------------------------------------------------
#fun : before hook submit to cluster
#param : N/A
......@@ -40,17 +30,128 @@ function package_hook() {
#-----------------------------------------------------------------------------------------------------------------
function _before_submit() {
echo "before_submit"
before_submit_hook
if [ ${DISTRIBUTE_MODE} == "PS_CPU_MPI" ]; then
_gen_cpu_before_hook
_gen_mpi_config
_gen_mpi_job
_gen_end_hook
elif [ ${DISTRIBUTE_MODE} == "COLLECTIVE_GPU_K8S" ]; then
_gen_gpu_before_hook
_gen_k8s_config
_gen_k8s_gpu_job
_gen_end_hook
elif [ ${DISTRIBUTE_MODE} == "PS_CPU_K8S" ]; then
_gen_cpu_before_hook
_gen_k8s_config
_gen_k8s_cpu_job
_gen_end_hook
fi
}
function _gen_mpi_config() {
echo "gen mpi_config.ini"
sed -e "s#<$ FS_NAME $>#$FS_NAME#g" \
-e "s#<$ FS_UGI $>#$FS_UGI#g" \
-e "s#<$ TRAIN_DATA_PATH $>#$TRAIN_DATA_PATH#g" \
-e "s#<$ TEST_DATA_PATH $>#$TEST_DATA_PATH#g" \
-e "s#<$ OUTPUT_PATH $>#$OUTPUT_PATH#g" \
-e "s#<$ THIRDPARTY_PATH $>#$THIRDPARTY_PATH#g" \
-e "s#<$ CPU_NUM $>#$max_thread_num#g" \
-e "s#<$ FLAGS_communicator_is_sgd_optimizer $>#$FLAGS_communicator_is_sgd_optimizer#g" \
-e "s#<$ FLAGS_communicator_send_queue_size $>#$FLAGS_communicator_send_queue_size#g" \
-e "s#<$ FLAGS_communicator_thread_pool_size $>#$FLAGS_communicator_thread_pool_size#g" \
-e "s#<$ FLAGS_communicator_max_merge_var_num $>#$FLAGS_communicator_max_merge_var_num#g" \
-e "s#<$ FLAGS_communicator_max_send_grad_num_before_recv $>#$FLAGS_communicator_max_send_grad_num_before_recv#g" \
-e "s#<$ FLAGS_communicator_fake_rpc $>#$FLAGS_communicator_fake_rpc#g" \
-e "s#<$ FLAGS_rpc_retry_times $>#$FLAGS_rpc_retry_times#g" \
${abs_dir}/cloud/mpi_config.ini.template >${PWD}/config.ini
}
function _gen_k8s_config() {
echo "gen k8s_config.ini"
sed -e "s#<$ FS_NAME $>#$FS_NAME#g" \
-e "s#<$ FS_UGI $>#$FS_UGI#g" \
-e "s#<$ AFS_REMOTE_MOUNT_POINT $>#$AFS_REMOTE_MOUNT_POINT#g" \
-e "s#<$ OUTPUT_PATH $>#$OUTPUT_PATH#g" \
-e "s#<$ CPU_NUM $>#$max_thread_num#g" \
-e "s#<$ FLAGS_communicator_is_sgd_optimizer $>#$FLAGS_communicator_is_sgd_optimizer#g" \
-e "s#<$ FLAGS_communicator_send_queue_size $>#$FLAGS_communicator_send_queue_size#g" \
-e "s#<$ FLAGS_communicator_thread_pool_size $>#$FLAGS_communicator_thread_pool_size#g" \
-e "s#<$ FLAGS_communicator_max_merge_var_num $>#$FLAGS_communicator_max_merge_var_num#g" \
-e "s#<$ FLAGS_communicator_max_send_grad_num_before_recv $>#$FLAGS_communicator_max_send_grad_num_before_recv#g" \
-e "s#<$ FLAGS_communicator_fake_rpc $>#$FLAGS_communicator_fake_rpc#g" \
-e "s#<$ FLAGS_rpc_retry_times $>#$FLAGS_rpc_retry_times#g" \
${abs_dir}/cloud/k8s_config.ini.template >${PWD}/config.ini
}
function _gen_cpu_before_hook() {
echo "gen cpu before_hook.sh"
sed -e "s#<$ PADDLEPADDLE_VERSION $>#$PADDLE_VERSION#g" \
${abs_dir}/cloud/before_hook_cpu.sh.template >${PWD}/before_hook.sh
}
function _gen_gpu_before_hook() {
echo "gen gpu before_hook.sh"
sed -e "s#<$ PADDLEPADDLE_VERSION $>#$PADDLE_VERSION#g" \
${abs_dir}/cloud/before_hook_gpu.sh.template >${PWD}/before_hook.sh
}
function _gen_end_hook() {
echo "gen end_hook.sh"
cp ${abs_dir}/cloud/end_hook.sh.template ${PWD}/end_hook.sh
}
function _gen_mpi_job() {
echo "gen mpi_job.sh"
sed -e "s#<$ GROUP_NAME $>#$GROUP_NAME#g" \
-e "s#<$ JOB_NAME $>#$OLD_JOB_NAME#g" \
-e "s#<$ AK $>#$AK#g" \
-e "s#<$ SK $>#$SK#g" \
-e "s#<$ MPI_PRIORITY $>#$PRIORITY#g" \
-e "s#<$ MPI_NODES $>#$MPI_NODES#g" \
-e "s#<$ START_CMD $>#$START_CMD#g" \
${abs_dir}/cloud/mpi_job.sh.template >${PWD}/job.sh
}
function _gen_k8s_gpu_job() {
echo "gen k8s_job.sh"
sed -e "s#<$ GROUP_NAME $>#$GROUP_NAME#g" \
-e "s#<$ JOB_NAME $>#$OLD_JOB_NAME#g" \
-e "s#<$ AK $>#$AK#g" \
-e "s#<$ SK $>#$SK#g" \
-e "s#<$ K8S_PRIORITY $>#$PRIORITY#g" \
-e "s#<$ K8S_TRAINERS $>#$K8S_TRAINERS#g" \
-e "s#<$ K8S_CPU_CORES $>#$K8S_CPU_CORES#g" \
-e "s#<$ K8S_GPU_CARD $>#$K8S_GPU_CARD#g" \
-e "s#<$ START_CMD $>#$START_CMD#g" \
${abs_dir}/cloud/k8s_job.sh.template >${PWD}/job.sh
}
function _gen_k8s_cpu_job() {
echo "gen k8s_job.sh"
sed -e "s#<$ GROUP_NAME $>#$GROUP_NAME#g" \
-e "s#<$ JOB_NAME $>#$OLD_JOB_NAME#g" \
-e "s#<$ AK $>#$AK#g" \
-e "s#<$ SK $>#$SK#g" \
-e "s#<$ K8S_PRIORITY $>#$PRIORITY#g" \
-e "s#<$ K8S_TRAINERS $>#$K8S_TRAINERS#g" \
-e "s#<$ K8S_PS_NUM $>#$K8S_PS_NUM#g" \
-e "s#<$ K8S_PS_CORES $>#$K8S_PS_CORES#g" \
-e "s#<$ K8S_CPU_CORES $>#$K8S_CPU_CORES#g" \
-e "s#<$ START_CMD $>#$START_CMD#g" \
${abs_dir}/cloud/k8s_cpu_job.sh.template >${PWD}/job.sh
}
#-----------------------------------------------------------------------------------------------------------------
#fun : after hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function _after_submit() {
echo "after_submit"
after_submit_hook
echo "end submit"
}
#-----------------------------------------------------------------------------------------------------------------
......@@ -60,23 +161,19 @@ function _after_submit() {
#-----------------------------------------------------------------------------------------------------------------
function _submit() {
g_run_stage="submit"
sh job.sh
}
cd ${engine_temp_path}
paddlecloud job --ak ${engine_submit_ak} --sk ${engine_submit_sk} train --cluster-name ${engine_submit_cluster} \
--job-version ${engine_submit_version} \
--mpi-priority ${engine_submit_priority} \
--mpi-wall-time 300:59:00 \
--mpi-nodes ${engine_submit_nodes} --is-standalone 0 \
--mpi-memory 110Gi \
--job-name ${engine_submit_jobname} \
--start-cmd "${g_run_cmd}" \
--group-name ${engine_submit_group} \
--job-conf ${engine_submit_config} \
--files ${g_submitfiles} \
--json
cd -
function package_hook() {
cur_time=`date +"%Y%m%d%H%M"`
new_job_name="${JOB_NAME}_${cur_time}"
export OLD_JOB_NAME=${JOB_NAME}
export JOB_NAME=${new_job_name}
export job_file_path="${PWD}/${new_job_name}"
mkdir ${job_file_path}
cp $FILES ${job_file_path}/
cd ${job_file_path}
echo "The task submission folder is generated at ${job_file_path}"
}
function submit_hook() {
......@@ -86,8 +183,6 @@ function submit_hook() {
}
function main() {
source ${engine_submit_scrpit}
package_hook
submit_hook
}
......
echo "Run before_hook.sh ..."
\ No newline at end of file
# 必须涵盖的参数
fs_name=<$ FS_NAME $>
fs_ugi=<$ FS_UGI $>
# 模型输出目录
output_path=<$ OUTPUT_PATH $>
# ===================
# 以下是新增参数
# ===================
# 挂载 afs 的开关
mount_afs="true"
# afs 路径的远端挂载点
AFS_REMOTE_MOUNT_POINT=<$ AFS_REMOTE_MOUNT_POINT $>
# 作业运行环境的本地挂载点,/root/paddlejob/workspace/env_run/是一个固定路径,是平台运行时workspace的路径
afs_local_mount_point="/root/paddlejob/workspace/env_run/afs/"
# 可以访问运行时默认文件夹下的 ./afs/ 目录拿到挂载目录的文件
# 新k8s afs挂载帮助文档: http://wiki.baidu.com/pages/viewpage.action?pageId=906443193
PADDLE_PADDLEREC_ROLE=WORKER
CPU_NUM=<$ CPU_NUM $>
GLOG_v=0
FLAGS_communicator_is_sgd_optimizer=<$ FLAGS_communicator_is_sgd_optimizer $>
FLAGS_communicator_send_queue_size=<$ FLAGS_communicator_send_queue_size $>
FLAGS_communicator_thread_pool_size=<$ FLAGS_communicator_thread_pool_size $>
FLAGS_communicator_max_merge_var_num=<$ FLAGS_communicator_max_merge_var_num $>
FLAGS_communicator_max_send_grad_num_before_recv=<$ FLAGS_communicator_max_send_grad_num_before_recv $>
FLAGS_communicator_fake_rpc=<$ FLAGS_communicator_fake_rpc $>
FLAGS_rpc_retry_times=<$ FLAGS_rpc_retry_times $>
\ No newline at end of file
#!/bin/bash
###############################################################
## 注意-- 注意--注意 ##
## K8S PS-CPU多机作业作业示例 ##
###############################################################
job_name=<$ JOB_NAME $>
# 作业参数
group_name="<$ GROUP_NAME $>"
job_version="paddle-fluid-v1.7.1"
start_cmd="<$ START_CMD $>"
wall_time="10:00:00"
k8s_priority=<$ K8S_PRIORITY $>
k8s_trainers=<$ K8S_TRAINERS $>
k8s_cpu_cores=<$ K8S_CPU_CORES $>
k8s_ps_num=<$ K8S_PS_NUM $>
k8s_ps_cores=<$ K8S_PS_CORES $>
# 你的ak/sk(可在paddlecloud web页面【个人中心】处获取)
ak=<$ AK $>
sk=<$ SK $>
paddlecloud job --ak ${ak} --sk ${sk} \
train --job-name ${job_name} \
--group-name ${group_name} \
--job-conf config.ini \
--start-cmd "${start_cmd}" \
--files ./* \
--job-version ${job_version} \
--k8s-priority ${k8s_priority} \
--wall-time ${wall_time} \
--k8s-trainers ${k8s_trainers} \
--k8s-cpu-cores ${k8s_cpu_cores} \
--k8s-ps-num ${k8s_ps_num} \
--k8s-ps-cores ${k8s_ps_cores} \
--is-standalone 0 \
--distribute-job-type "PSERVER" \
--json
\ No newline at end of file
#!/bin/bash
###############################################################
## 注意-- 注意--注意 ##
## K8S NCCL2多机作业作业示例 ##
###############################################################
job_name=<$ JOB_NAME $>
# 作业参数
group_name="<$ GROUP_NAME $>"
job_version="paddle-fluid-v1.7.1"
start_cmd="<$ START_CMD $>"
wall_time="10:00:00"
k8s_priority=<$ K8S_PRIORITY $>
k8s_trainers=<$ K8S_TRAINERS $>
k8s_cpu_cores=<$ K8S_CPU_CORES $>
k8s_gpu_cards=<$ K8S_GPU_CARD $>
is_stand_alone=0
nccl="--distribute-job-type "NCCL2""
if [ ${k8s_trainers} == 1 ];then
is_stand_alone=1
nccl="--job-remark single-trainer"
if [ ${k8s_gpu_cards} == 1];then
nccl="--job-remark single-gpu"
echo "Attention: Use single GPU card for PaddleRec distributed training, please set runner class from 'cluster_train' to 'train' in config.yaml."
fi
fi
# 你的ak/sk(可在paddlecloud web页面【个人中心】处获取)
ak=<$ AK $>
sk=<$ SK $>
paddlecloud job --ak ${ak} --sk ${sk} \
train --job-name ${job_name} \
--group-name ${group_name} \
--job-conf config.ini \
--start-cmd "${start_cmd}" \
--files ./* \
--job-version ${job_version} \
--k8s-trainers ${k8s_trainers} \
--k8s-cpu-cores ${k8s_cpu_cores} \
--k8s-gpu-cards ${k8s_gpu_cards} \
--k8s-priority ${k8s_priority} \
--wall-time ${wall_time} \
--is-standalone ${is_stand_alone} \
--json \
${nccl}
\ No newline at end of file
#type of storage cluster
storage_type="hdfs"
#attention: files for training should be put on hdfs
force_reuse_output_path="True"
# 可以替换成自己的hdfs集群
fs_name=<$ FS_NAME $>
fs_ugi=<$ FS_UGI $>
FLAGS_rpc_deadline=300000
##train data path on hdfs
train_data_path=<$ TRAIN_DATA_PATH $>
test_data_path=<$ TEST_DATA_PATH $>
output_path=<$ OUTPUT_PATH $>
thirdparty_path=<$ THIRDPARTY_PATH $>
PADDLE_PADDLEREC_ROLE=WORKER
CPU_NUM=<$ CPU_NUM $>
GLOG_v=0
FLAGS_communicator_is_sgd_optimizer=<$ FLAGS_communicator_is_sgd_optimizer $>
FLAGS_communicator_send_queue_size=<$ FLAGS_communicator_send_queue_size $>
FLAGS_communicator_thread_pool_size=<$ FLAGS_communicator_thread_pool_size $>
FLAGS_communicator_max_merge_var_num=<$ FLAGS_communicator_max_merge_var_num $>
FLAGS_communicator_max_send_grad_num_before_recv=<$ FLAGS_communicator_max_send_grad_num_before_recv $>
FLAGS_communicator_fake_rpc=<$ FLAGS_communicator_fake_rpc $>
FLAGS_rpc_retry_times=<$ FLAGS_rpc_retry_times $>
#!/bin/bash
###############################################################
## 注意--注意--注意 ##
## MPI 类型作业演示 ##
###############################################################
job_name=<$ JOB_NAME $>
# 作业参数
group_name=<$ GROUP_NAME $>
job_version="paddle-fluid-v1.7.1"
start_cmd="<$ START_CMD $>"
wall_time="2:00:00"
# 你的ak/sk(可在paddlecloud web页面【个人中心】处获取)
ak=<$ AK $>
sk=<$ SK $>
paddlecloud job --ak ${ak} --sk ${sk} \
train \
--job-name ${job_name} \
--mpi-priority <$ MPI_PRIORITY $> \
--group-name ${group_name} \
--mpi-wall-time ${wall_time} \
--mpi-nodes <$ MPI_NODES $> \
--is-standalone 0 \
--permission group \
--job-version ${job_version} \
--job-conf config.ini \
--start-cmd "${start_cmd}" \
--files ./* \
--json
......@@ -18,6 +18,7 @@ from __future__ import unicode_literals
import copy
import os
import subprocess
import warnings
from paddlerec.core.engine.engine import Engine
from paddlerec.core.factory import TrainerFactory
......@@ -26,24 +27,35 @@ from paddlerec.core.utils import envs
class ClusterEngine(Engine):
def __init_impl__(self):
self.role = envs.get_runtime_environ("engine_role")
if self.role == "WORKER":
return
abs_dir = os.path.dirname(os.path.abspath(__file__))
os.environ["abs_dir"] = str(abs_dir)
backend = envs.get_runtime_environ("engine_backend")
if not backend:
backend = ""
backend = backend.upper()
if backend == "PADDLECLOUD":
self.backend = envs.get_runtime_environ("backend")
if not self.backend:
self.backend = ""
self.backend = self.backend.upper()
if self.backend == "PADDLECLOUD":
self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh")
elif backend == "KUBERNETES":
elif self.backend == "KUBERNETES":
self.submit_script = os.path.join(abs_dir, "k8s/cluster.sh")
else:
raise ValueError("{} can not be supported now".format(backend))
raise ValueError("{} can not be supported now".format(
self.backend))
def start_worker_procs(self):
trainer = TrainerFactory.create(self.trainer)
trainer.run()
def start_master_procs(self):
if self.backend == "PADDLECLOUD":
self.paddlecloud_env_check()
elif self.backend == "KUBERNETES":
self.kubernetes_env_check()
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
......@@ -55,21 +67,245 @@ class ClusterEngine(Engine):
@staticmethod
def workspace_replace():
workspace = envs.get_runtime_environ("engine_workspace")
remote_workspace = envs.get_runtime_environ("remote_workspace")
for k, v in os.environ.items():
v = v.replace("{workspace}", workspace)
v = v.replace("{workspace}", remote_workspace)
os.environ[k] = str(v)
def run(self):
role = envs.get_runtime_environ("engine_role")
if role == "MASTER":
if self.role == "MASTER":
self.start_master_procs()
elif role == "WORKER":
elif self.role == "WORKER":
self.start_worker_procs()
else:
raise ValueError("role {} error, must in MASTER/WORKER".format(
role))
self.role))
def paddlecloud_env_check(self):
# get fleet mode
fleet_mode = envs.get_runtime_environ("fleet_mode")
# get device
device = envs.get_runtime_environ("device")
# get cluster type
cluster_type = envs.get_runtime_environ("cluster_type")
cluster_env_check_tool = None
if cluster_type.upper() == "MPI":
if device == "CPU" and fleet_mode == "PS":
cluster_env_check_tool = PaddleCloudMpiEnv()
else:
raise ValueError(
"Paddlecloud with Mpi don't support GPU training, check your config.yaml & backend.yaml"
)
elif cluster_type.upper() == "K8S":
if fleet_mode == "PS":
if device == "CPU":
cluster_env_check_tool = CloudPsCpuEnv()
elif device == "GPU":
raise ValueError(
"PS-GPU on paddlecloud is not supported at this time, comming soon"
)
if fleet_mode == "COLLECTIVE":
if device == "GPU":
cluster_env_check_tool = CloudCollectiveEnv()
elif device == "CPU":
raise ValueError(
"Unexpected config -> device: CPU with fleet_mode: Collective, check your config.yaml"
)
else:
raise ValueError("cluster_type {} error, must in MPI/K8S".format(
cluster_type))
cluster_env_check_tool.env_check()
cluster_env_check_tool.env_set()
def kubernetes_env_check(self):
pass
class ClusterEnvBase(object):
def __init__(self):
# get backend env
backend_yaml = envs.get_runtime_environ("backend_yaml")
_env = envs.load_yaml(backend_yaml)
self.backend_env = envs.flatten_environs(_env, ".")
self.cluster_env = {}
def env_check(self):
# check common env
# fs_name & fs_ugi
self.cluster_env["FS_NAME"] = self.backend_env.get("config.fs_name",
"")
self.cluster_env["FS_UGI"] = self.backend_env.get("config.fs_ugi", "")
if self.cluster_env["FS_NAME"] == "" or self.cluster_env[
"FS_UGI"] == "":
raise ValueError(
"No -- FS_UGI or FS_NAME -- found in your backend.yaml, please check."
)
# output_path
self.cluster_env["OUTPUT_PATH"] = self.backend_env.get(
"config.output_path", "")
if self.cluster_env["OUTPUT_PATH"] == "":
warnings.warn(
"Job output_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# paddle_version
self.cluster_env["PADDLE_VERSION"] = self.backend_env.get(
"config.paddle_version", "1.7.2")
# communicator
self.cluster_env[
"FLAGS_communicator_is_sgd_optimizer"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_is_sgd_optimizer", 0)
self.cluster_env[
"FLAGS_communicator_send_queue_size"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_send_queue_size", 5)
self.cluster_env[
"FLAGS_communicator_thread_pool_size"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_thread_pool_size", 32)
self.cluster_env[
"FLAGS_communicator_max_merge_var_num"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_max_merge_var_num", 5)
self.cluster_env[
"FLAGS_communicator_max_send_grad_num_before_recv"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_max_send_grad_num_before_recv",
5)
self.cluster_env["FLAGS_communicator_fake_rpc"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_fake_rpc", 0)
self.cluster_env["FLAGS_rpc_retry_times"] = self.backend_env.get(
"config.communicator.FLAGS_rpc_retry_times", 3)
# ak & sk
self.cluster_env["AK"] = self.backend_env.get("submit.ak", "")
self.cluster_env["SK"] = self.backend_env.get("submit.sk", "")
if self.cluster_env["AK"] == "" or self.cluster_env["SK"] == "":
raise ValueError(
"No -- AK or SK -- found in your backend.yaml, please check.")
# priority
self.cluster_env["PRIORITY"] = self.backend_env.get("submit.priority",
"high")
# job name
self.cluster_env["JOB_NAME"] = self.backend_env.get(
"submit.job_name", "PaddleRecClusterJob")
# group
self.cluster_env["GROUP_NAME"] = self.backend_env.get("submit.group",
"paddle")
# start_cmd
self.cluster_env["START_CMD"] = self.backend_env.get(
"submit.start_cmd", "python -m paddlerec.run -m config.yaml")
# files
self.cluster_env["FILES"] = self.backend_env.get("submit.files", "")
if self.cluster_env["FILES"] == "":
raise ValueError(
"No -- files -- found in your backend.yaml, please check.")
def env_set(self):
envs.set_runtime_environs(self.cluster_env)
flattens = envs.flatten_environs(self.cluster_env)
print(envs.pretty_print_envs(flattens, ("Cluster Envs", "Value")))
class PaddleCloudMpiEnv(ClusterEnvBase):
def __init__(self):
super(PaddleCloudMpiEnv, self).__init__()
def env_check(self):
super(PaddleCloudMpiEnv, self).env_check()
# check mpi env
self.cluster_env["DISTRIBUTE_MODE"] = "PS_CPU_MPI"
# train_data_path
self.cluster_env["TRAIN_DATA_PATH"] = self.backend_env.get(
"config.train_data_path", "")
if self.cluster_env["TRAIN_DATA_PATH"] == "":
raise ValueError(
"No -- TRAIN_DATA_PATH -- found in your backend.yaml, please add train_data_path in your backend yaml."
)
# test_data_path
self.cluster_env["TEST_DATA_PATH"] = self.backend_env.get(
"config.test_data_path", "")
if self.cluster_env["TEST_DATA_PATH"] == "":
warnings.warn(
"Job test_data_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# thirdparty_path
self.cluster_env["THIRDPARTY_PATH"] = self.backend_env.get(
"config.thirdparty_path", "")
if self.cluster_env["THIRDPARTY_PATH"] == "":
warnings.warn(
"Job thirdparty_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# nodes
self.cluster_env["MPI_NODES"] = self.backend_env.get("submit.nodes", 1)
class PaddleCloudK8sEnv(ClusterEnvBase):
def __init__(self):
super(PaddleCloudK8sEnv, self).__init__()
def env_check(self):
super(PaddleCloudK8sEnv, self).env_check()
# check afs_remote_mount_point
self.cluster_env["AFS_REMOTE_MOUNT_POINT"] = self.backend_env.get(
"config.afs_remote_mount_point", "")
if self.cluster_env["AFS_REMOTE_MOUNT_POINT"] == "":
warnings.warn(
"Job afs_remote_mount_point not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
warnings.warn(
"The remote afs path will be mounted to the ./afs/",
category=UserWarning,
stacklevel=2)
class CloudCollectiveEnv(PaddleCloudK8sEnv):
def __init__(self):
super(CloudCollectiveEnv, self).__init__()
def env_check(self):
super(CloudCollectiveEnv, self).env_check()
self.cluster_env["DISTRIBUTE_MODE"] = "COLLECTIVE_GPU_K8S"
self.cluster_env["K8S_TRAINERS"] = self.backend_env.get(
"submit.k8s_trainers", 1)
self.cluster_env["K8S_GPU_CARD"] = self.backend_env.get(
"submit.k8s_gpu_card", 1)
self.cluster_env["K8S_CPU_CORES"] = self.backend_env.get(
"submit.k8s_cpu_cores", 1)
class CloudPsCpuEnv(PaddleCloudK8sEnv):
def __init__(self):
super(CloudPsCpuEnv, self).__init__()
def env_check(self):
super(CloudPsCpuEnv, self).env_check()
self.cluster_env["DISTRIBUTE_MODE"] = "PS_CPU_K8S"
self.cluster_env["K8S_TRAINERS"] = self.backend_env.get(
"submit.k8s_trainers", 1)
self.cluster_env["K8S_CPU_CORES"] = self.backend_env.get(
"submit.k8s_cpu_cores", 2)
self.cluster_env["K8S_PS_NUM"] = self.backend_env.get(
"submit.k8s_ps_num", 1)
self.cluster_env["K8S_PS_CORES"] = self.backend_env.get(
"submit.k8s_ps_cores", 2)
......@@ -118,6 +118,7 @@ class QueueDataset(DatasetBase):
dataset.set_batch_size(batch_size)
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
......@@ -125,7 +126,7 @@ class QueueDataset(DatasetBase):
if context["engine"] == EngineMode.LOCAL_CLUSTER:
file_list = split_files(file_list, context["fleet"].worker_index(),
context["fleet"].worker_num())
print("File_list: {}".format(file_list))
dataset.set_filelist(file_list)
for model_dict in context["phases"]:
if model_dict["dataset_name"] == dataset_name:
......
......@@ -42,7 +42,7 @@ def dataloader_by_name(readerclass,
if context["engine"] == EngineMode.LOCAL_CLUSTER:
files = split_files(files, context["fleet"].worker_index(),
context["fleet"].worker_num())
print("file_list : {}".format(files))
print("file_list : {}".format(files))
reader = reader_class(yaml_file)
reader.init()
......
# PaddleRec 分布式训练
目录
=================
## PaddleRec分布式运行
> 占位
### 本地模拟分布式
> 占位
- [目录](#目录)
- [基于PaddleCloud的分布式训练启动方法](#基于paddlecloud的分布式训练启动方法)
- [使用PaddleRec提交](#使用paddlerec提交)
- [第一步:运行环境下安装PaddleCloud的Client](#第一步运行环境下安装paddlecloud的client)
- [第二步:更改模型运行`config.yaml`配置](#第二步更改模型运行configyaml配置)
- [第三步:增加集群运行`backend.yaml`配置](#第三步增加集群运行backendyaml配置)
- [MPI集群的Parameter Server模式配置](#mpi集群的parameter-server模式配置)
- [K8S集群的Collective模式配置](#k8s集群的collective模式配置)
- [K8S集群的PS-CPU模式配置](#k8s集群的ps-cpu模式配置)
- [第四步:任务提交](#第四步任务提交)
- [使用PaddleCloud Client提交](#使用paddlecloud-client提交)
- [第一步:在`before_hook.sh`里手动安装PaddleRec](#第一步在before_hooksh里手动安装paddlerec)
- [第二步:在`config.ini`中调整超参](#第二步在configini中调整超参)
- [第三步:在`job.sh`中上传文件及修改启动命令](#第三步在jobsh中上传文件及修改启动命令)
- [第四步: 提交任务](#第四步-提交任务)
### K8S集群运行分布式
> 占位
# 基于PaddleCloud的分布式训练启动方法
> PaddleCloud目前处于百度内部测试推广阶段,将适时推出面向广大用户的公有云版本,欢迎持续关注
## 使用PaddleRec提交
### 第一步:运行环境下安装PaddleCloud的Client
- 环境要求:python > 2.7.5
- 首先在PaddleCloud平台申请`group`的权限,获得计算资源
- 然后在[PaddleCloud client使用手册](http://wiki.baidu.com/pages/viewpage.action?pageId=1017488941#1.%20安装PaddleCloud客户端)下载安装`PaddleCloud-Cli`
- 在PaddleCloud的个人中心获取`AK``SK`
### 第二步:更改模型运行`config.yaml`配置
分布式运行首先需要更改`config.yaml`,主要调整以下内容:
- workspace: 调整为在远程点运行时的工作目录,一般设置为`"./"`即可
- runner_class: 从单机的"train"调整为"cluster_train",单机训练->分布式训练(例外情况,k8s上单机单卡训练仍然为train)
- fleet_mode: 选则参数服务器模式(ps),抑或GPU的all-reduce模式(collective)
- distribute_strategy: 可选项,选择分布式训练的策略,目前只在参数服务器模式下生效,可选项:`sync、asycn、half_async、geo`
配置选项具体参数,可以参考[yaml配置说明](./yaml.md)
以Rank/dnn模型为例
单机训练配置:
```yaml
# workspace
workspace: "paddlerec.models.rank.dnn"
mode: [single_cpu_train]
runner:
- name: single_cpu_train
class: train
epochs: 4
device: cpu
save_checkpoint_interval: 2
save_checkpoint_path: "increment_dnn"
init_model_path: ""
print_interval: 10
phases: [phase1]
dataset:
- name: dataloader_train
batch_size: 2
type: DataLoader
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
```
分布式的训练配置可以改为:
```yaml
# 改变一:代码上传至节点后,在默认目录下
workspace: "./"
mode: [ps_cluster]
runner:
- name: ps_cluster
# 改变二:调整runner的class
class: cluster_train
epochs: 4
device: cpu
# 改变三 & 四: 指定fleet_mode 与 distribute_strategy
fleet_mode: ps
distribute_strategy: async
save_checkpoint_interval: 2
save_checkpoint_path: "increment_dnn"
init_model_path: ""
print_interval: 10
phases: [phase1]
dataset:
- name: dataloader_train
batch_size: 2
type: DataLoader
# 改变五: 改变数据的读取目录
# 通常而言,mpi模式下,数据会下载到远程节点执行目录的'./train_data'下, k8s则与挂载位置有关
data_path: "{workspace}/train_data"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
```
除此之外,还需关注数据及模型加载的路径,一般而言:
- PaddleCloud MPI集群下,训练数据会下载到节点运行目录的`./train_data/`,测试数据位于`./test_data/`,其他数据及文件可以通过上传到hdfs配置的`thirdparty`后,自动下载到节点运行目录的`./thirdparty/`文件夹下。
- PaddleCloud K8S集群下,hdfs的指定目录会挂载到节点工作目录的`./afs/`
### 第三步:增加集群运行`backend.yaml`配置
分布式训练除了模型的部分调整外,更重要的是加入集群的配置选项,我们通过另一个yaml文件来指定分布式的运行配置,将分布式配置与模型超参解耦。
下面给出一个完整的`backend.yaml`示例:
```yaml
backend: "PaddleCloud"
cluster_type: mpi # k8s 可选
config:
# 填写任务运行的paddle官方版本号 >= 1.7.2, 默认1.7.2
paddle_version: "1.7.2"
# hdfs/afs的配置信息填写
fs_name: "afs://xxx.com"
fs_ugi: "usr,pwd"
# 填任务输出目录的远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
output_path: ""
# for mpi
# 填远程数据及地址,如afs:/user/your/path/ 则此处填 /user/your/path
train_data_path: ""
test_data_path: ""
thirdparty_path: ""
# for k8s
# 填远程挂载地址,如afs:/user/your/path/ 则此处填 /user/your/path
afs_remote_mount_point: ""
# paddle参数服务器分布式底层超参,无特殊需求不理不改
communicator:
FLAGS_communicator_is_sgd_optimizer: 0
FLAGS_communicator_send_queue_size: 5
FLAGS_communicator_thread_pool_size: 32
FLAGS_communicator_max_merge_var_num: 5
FLAGS_communicator_max_send_grad_num_before_recv: 5
FLAGS_communicator_fake_rpc: 0
FLAGS_rpc_retry_times: 3
submit:
# PaddleCloud 个人信息 AK 及 SK
ak: ""
sk: ""
# 任务运行优先级,默认high
priority: "high"
# 任务名称
job_name: "PaddleRec_CTR"
# 训练资源所在组
group: ""
# 节点上的任务启动命令
start_cmd: "python -m paddlerec.run -m ./config.yaml"
# 本地需要上传到节点工作目录的文件
files: ./*.py ./*.yaml
# for mpi ps-cpu
# mpi 参数服务器模式下,任务的节点数
nodes: 2
# for k8s gpu
# k8s gpu 模式下,训练节点数,及每个节点上的GPU卡数
k8s_trainers: 2
k8s-cpu-cores: 4
k8s_gpu_card: 1
# for k8s ps-cpu
k8s_trainers: 2
k8s-cpu-cores: 4
k8s_ps_num: 2
k8s_ps_cores: 4
```
更多backend.yaml配置选项信息,可以查看[yaml配置说明](./yaml.md)
除此之外,我们还需要关注上传到工作目录的文件(`files选项`)的路径问题,在示例中是`./*.py`,说明我们执行任务提交时,与这些py文件在同一目录。若不在同一目录,则需要适当调整files路径,或改为这些文件的绝对路径。
不建议利用`files`上传过大的数据文件,可以通过指定`train_data_path`自动下载,或在k8s模式下指定`afs_remote_mount_point`挂载实现数据到节点的转移。
#### MPI集群的Parameter Server模式配置
下面是一个利用PaddleCloud提交MPI参数服务器模式任务的`backend.yaml`示例
首先调整`config.yaml`:
```yaml
workspace: "./"
mode: [ps_cluster]
dataset:
- name: dataloader_train
batch_size: 2
type: DataLoader
data_path: "{workspace}/train_data"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
runner:
- name: ps_cluster
class: cluster_train
epochs: 2
device: cpu
fleet_mode: ps
save_checkpoint_interval: 1
save_checkpoint_path: "increment_dnn"
init_model_path: ""
print_interval: 1
phases: [phase1]
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: dataloader_train
thread_num: 1
```
再新增`backend.yaml`
```yaml
backend: "PaddleCloud"
cluster_type: mpi
config:
paddle_version: "1.7.2"
# hdfs/afs的配置信息填写
fs_name: "afs://xxx.com"
fs_ugi: "usr,pwd"
# 填任务输出目录的远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
output_path: ""
# for mpi
# 填远程数据及地址,如afs:/user/your/path/ 则此处填 /user/your/path
train_data_path: ""
test_data_path: ""
thirdparty_path: ""
submit:
# PaddleCloud 个人信息 AK 及 SK
ak: ""
sk: ""
# 任务运行优先级,默认high
priority: "high"
# 任务名称
job_name: "PaddleRec_CTR"
# 训练资源所在组
group: ""
# 节点上的任务启动命令
start_cmd: "python -m paddlerec.run -m ./config.yaml"
# 本地需要上传到节点工作目录的文件
files: ./*.py ./*.yaml
# for mpi ps-cpu
# mpi 参数服务器模式下,任务的节点数
nodes: 2
```
#### K8S集群的Collective模式配置
下面是一个利用PaddleCloud提交K8S集群进行GPU训练的`backend.yaml`示例
首先调整`config.yaml`
```yaml
workspace: "./"
mode: [collective_cluster]
dataset:
- name: dataloader_train
batch_size: 2
type: DataLoader
data_path: "{workspace}/train_data"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
runner:
- name: collective_cluster
class: cluster_train
epochs: 2
device: gpu
fleet_mode: collective
save_checkpoint_interval: 1 # save model interval of epochs
save_checkpoint_path: "increment_dnn" # save checkpoint path
init_model_path: "" # load model path
print_interval: 1
phases: [phase1]
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: dataloader_train
thread_num: 1
```
再增加`backend.yaml`
```yaml
backend: "PaddleCloud"
cluster_type: k8s # k8s 可选
config:
# 填写任务运行的paddle官方版本号 >= 1.7.2, 默认1.7.2
paddle_version: "1.7.2"
# hdfs/afs的配置信息填写
fs_name: "afs://xxx.com"
fs_ugi: "usr,pwd"
# 填任务输出目录的远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
output_path: ""
# for k8s
# 填远程挂载地址,如afs:/user/your/path/ 则此处填 /user/your/path
afs_remote_mount_point: ""
submit:
# PaddleCloud 个人信息 AK 及 SK
ak: ""
sk: ""
# 任务运行优先级,默认high
priority: "high"
# 任务名称
job_name: "PaddleRec_CTR"
# 训练资源所在组
group: ""
# 节点上的任务启动命令
start_cmd: "python -m paddlerec.run -m ./config.yaml"
# 本地需要上传到节点工作目录的文件
files: ./*.py ./*.yaml
# for k8s gpu
# k8s gpu 模式下,训练节点数,及每个节点上的GPU卡数
k8s_trainers: 2
k8s-cpu-cores: 4
k8s_gpu_card: 1
```
#### K8S集群的PS-CPU模式配置
下面是一个利用PaddleCloud提交K8S集群进行参数服务器CPU训练的`backend.yaml`示例
首先调整`config.yaml`:
```yaml
workspace: "./"
mode: [ps_cluster]
dataset:
- name: dataloader_train
batch_size: 2
type: DataLoader
data_path: "{workspace}/train_data"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
runner:
- name: ps_cluster
class: cluster_train
epochs: 2
device: cpu
fleet_mode: ps
save_checkpoint_interval: 1
save_checkpoint_path: "increment_dnn"
init_model_path: ""
print_interval: 1
phases: [phase1]
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: dataloader_train
thread_num: 1
```
再新增`backend.yaml`
```yaml
backend: "PaddleCloud"
cluster_type: k8s # k8s 可选
config:
# 填写任务运行的paddle官方版本号 >= 1.7.2, 默认1.7.2
paddle_version: "1.7.2"
# hdfs/afs的配置信息填写
fs_name: "afs://xxx.com"
fs_ugi: "usr,pwd"
# 填任务输出目录的远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
output_path: ""
# for k8s
# 填远程挂载地址,如afs:/user/your/path/ 则此处填 /user/your/path
afs_remote_mount_point: ""
submit:
# PaddleCloud 个人信息 AK 及 SK
ak: ""
sk: ""
# 任务运行优先级,默认high
priority: "high"
# 任务名称
job_name: "PaddleRec_CTR"
# 训练资源所在组
group: ""
# 节点上的任务启动命令
start_cmd: "python -m paddlerec.run -m ./config.yaml"
# 本地需要上传到节点工作目录的文件
files: ./*.py ./*.yaml
# for k8s gpu
# k8s ps-cpu 模式下,训练节点数,参数服务器节点数,及每个节点上的cpu核心数及内存限制
k8s_trainers: 2
k8s-cpu-cores: 4
k8s_ps_num: 2
k8s_ps_cores: 4
```
### 第四步:任务提交
当我们准备好`config.yaml``backend.yaml`,便可以进行一键任务提交,命令为:
```shell
python -m paddlerec.run -m config.yaml -b backend.yaml
```
执行过程中会进行配置的若干check,并给出错误提示。键入提交命令后,会有以下提交信息打印在屏幕上:
```shell
The task submission folder is generated at /home/PaddleRec/models/rank/dnn/PaddleRec_CTR_202007091308
before_submit
gen gpu before_hook.sh
gen k8s_config.ini
gen k8s_job.sh
gen end_hook.sh
Start checking your job configuration, please be patient.
Congratulations! Job configuration check passed!
Congratulations! The new job is ready for training.
{
"groupName": "xxxxxxx",
"jobId": "job-xxxxxx",
"userId": "x-x-x-x-x"
}
end submit
```
则代表任务已顺利提交PaddleCloud,恭喜。
同时,我们还可以进入`/home/PaddleRec/models/rank/dnn/PaddleRec_CTR_202007091308`这个目录检查我们的提交环境,该目录下有以下文件:
```shell
.
├── backend.yaml # 用户定义的分布式配置backend.yaml
├── config.yaml # 用户定义的模型执行config.yaml
├── before_hook.sh # PaddleRec生成的训练前执行的脚本
├── config.ini # PaddleRec生成的PaddleCloud环境配置
├── end_hook.sh # PaddleRec生成的训练后执行的脚本
├── job.sh # PaddleRec生成的PaddleCloud任务提交脚本
└── model.py # CTR模型的组网.py文件
```
该目录下的文件会被打平上传到节点的工作目录,用户可以复查PaddleRec生成的配置文件是否符合预期,如不符合预期,既可以调整backend.yaml,亦可以直接修改生成的文件,并执行:
```shell
sh job.sh
```
再次提交任务。
## 使用PaddleCloud Client提交
假如你已经很熟悉PaddleCloud的使用,并且之前是用PaddleCloud-Client提交过任务,熟悉`before_hook.sh``config.ini``job.sh`,希望通过之前的方式提交PaddleCloud任务,PaddleRec也支持。
我们可以不添加`backend.yaml`,直接用PaddleCloud-Client的提交要求提交任务,除了为分布式训练[修改config.yaml](#第二步更改模型运行configyaml配置)以外,有以下几个额外的步骤:
### 第一步:在`before_hook.sh`里手动安装PaddleRec
```shell
# before_hook.sh
echo "Run before_hook.sh ..."
wget https://paddlerec.bj.bcebos.com/whl/PaddleRec.tar.gz
tar -xf PaddleRec.tar.gz
cd PaddleRec
python setup.py install
echo "End before_hook.sh ..."
```
### 第二步:在`config.ini`中调整超参
```shell
# config.ini
# 设置PADDLE_PADDLEREC_ROLE环境变量为WORKER
# 告诉PaddleRec当前运行环境在节点中,无需执行提交流程,直接执行分布式训练
PADDLE_PADDLEREC_ROLE=WORKER
```
### 第三步:在`job.sh`中上传文件及修改启动命令
我们需要在`job.sh`中上传运行PaddleRec所需的必要文件,如运行该模型的`model.py``config.yaml`以及`reader.py`等,PaddleRec的框架代码无需上传,已在before_hook中安装。
同时还需调整启动命令(start_cmd),调整为
```shell
python -m paddlerec.run -m config.yaml
```
### 第四步: 提交任务
直接运行:
```shell
sh job.sh
```
复用之前的提交脚本执行任务的提交。
doc/imgs/flen.png

36.1 KB

# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# global settings
debug: false
workspace: "paddlerec.models.rank.AutoInt"
dataset:
- name: train_sample
type: QueueDataset
batch_size: 5
data_path: "{workspace}/../dataset/Criteo_data/sample_data/train"
sparse_slots: "label feat_idx"
dense_slots: "feat_value:39"
- name: infer_sample
type: QueueDataset
batch_size: 5
data_path: "{workspace}/../dataset/Criteo_data/sample_data/train"
sparse_slots: "label feat_idx"
dense_slots: "feat_value:39"
hyper_parameters:
optimizer:
class: SGD
learning_rate: 0.0001
sparse_feature_number: 1086460
sparse_feature_dim: 96
num_field: 39
d_model: 96
d_key: 16
d_value: 16
n_head: 6
dropout_rate: 0
n_interacting_layers: 1
mode: train_runner
# if infer, change mode to "infer_runner" and change phase to "infer_phase"
runner:
- name: train_runner
class: train
epochs: 2
device: cpu
init_model_path: ""
save_checkpoint_interval: 1
save_inference_interval: 1
save_checkpoint_path: "increment"
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
class: infer
device: cpu
init_model_path: "increment/0"
print_interval: 1
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: train_sample
thread_num: 1
#- name: infer_phase
# model: "{workspace}/model.py"
# dataset_name: infer_sample
# thread_num: 1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim", None)
self.num_field = envs.get_global_env("hyper_parameters.num_field",
None)
self.d_model = envs.get_global_env("hyper_parameters.d_model", None)
self.d_key = envs.get_global_env("hyper_parameters.d_key", None)
self.d_value = envs.get_global_env("hyper_parameters.d_value", None)
self.n_head = envs.get_global_env("hyper_parameters.n_head", None)
self.dropout_rate = envs.get_global_env(
"hyper_parameters.dropout_rate", 0)
self.n_interacting_layers = envs.get_global_env(
"hyper_parameters.n_interacting_layers", 1)
def multi_head_attention(self, queries, keys, values, d_key, d_value,
d_model, n_head, dropout_rate):
keys = queries if keys is None else keys
values = keys if values is None else values
if not (len(queries.shape) == len(keys.shape) == len(values.shape) == 3
):
raise ValueError(
"Inputs: quries, keys and values should all be 3-D tensors.")
def __compute_qkv(queries, keys, values, n_head, d_key, d_value):
"""
Add linear projection to queries, keys, and values.
"""
q = fluid.layers.fc(input=queries,
size=d_key * n_head,
bias_attr=False,
num_flatten_dims=2)
k = fluid.layers.fc(input=keys,
size=d_key * n_head,
bias_attr=False,
num_flatten_dims=2)
v = fluid.layers.fc(input=values,
size=d_value * n_head,
bias_attr=False,
num_flatten_dims=2)
return q, k, v
def __split_heads_qkv(queries, keys, values, n_head, d_key, d_value):
"""
Reshape input tensors at the last dimension to split multi-heads
and then transpose. Specifically, transform the input tensor with shape
[bs, max_sequence_length, n_head * hidden_dim] to the output tensor
with shape [bs, n_head, max_sequence_length, hidden_dim].
"""
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
reshaped_q = fluid.layers.reshape(
x=queries, shape=[0, 0, n_head, d_key], inplace=True)
# permuate the dimensions into:
# [batch_size, n_head, max_sequence_len, hidden_size_per_head]
q = fluid.layers.transpose(x=reshaped_q, perm=[0, 2, 1, 3])
# For encoder-decoder attention in inference, insert the ops and vars
# into global block to use as cache among beam search.
reshaped_k = fluid.layers.reshape(
x=keys, shape=[0, 0, n_head, d_key], inplace=True)
k = fluid.layers.transpose(x=reshaped_k, perm=[0, 2, 1, 3])
reshaped_v = fluid.layers.reshape(
x=values, shape=[0, 0, n_head, d_value], inplace=True)
v = fluid.layers.transpose(x=reshaped_v, perm=[0, 2, 1, 3])
return q, k, v
def scaled_dot_product_attention(q, k, v, d_key, dropout_rate):
"""
Scaled Dot-Product Attention
"""
product = fluid.layers.matmul(
x=q, y=k, transpose_y=True, alpha=d_key**-0.5)
weights = fluid.layers.softmax(product)
if dropout_rate:
weights = fluid.layers.dropout(
weights,
dropout_prob=dropout_rate,
seed=None,
is_test=False)
out = fluid.layers.matmul(weights, v)
return out
def __combine_heads(x):
"""
Transpose and then reshape the last two dimensions of inpunt tensor x
so that it becomes one dimension, which is reverse to __split_heads.
"""
if len(x.shape) != 4:
raise ValueError("Input(x) should be a 4-D Tensor.")
trans_x = fluid.layers.transpose(x, perm=[0, 2, 1, 3])
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
return fluid.layers.reshape(
x=trans_x,
shape=[0, 0, trans_x.shape[2] * trans_x.shape[3]],
inplace=True)
q, k, v = __compute_qkv(queries, keys, values, n_head, d_key, d_value)
q, k, v = __split_heads_qkv(q, k, v, n_head, d_key, d_value)
ctx_multiheads = scaled_dot_product_attention(q, k, v, self.d_model,
dropout_rate)
out = __combine_heads(ctx_multiheads)
return out
def interacting_layer(self, x):
attention_out = self.multi_head_attention(
x, None, None, self.d_key, self.d_value, self.d_model, self.n_head,
self.dropout_rate)
W_0_x = fluid.layers.fc(input=x,
size=self.d_model,
bias_attr=False,
num_flatten_dims=2)
res_out = fluid.layers.relu(attention_out + W_0_x)
return res_out
def net(self, inputs, is_infer=False):
init_value_ = 0.1
is_distributed = True if envs.get_trainer() == "CtrTrainer" else False
# ------------------------- network input --------------------------
raw_feat_idx = self._sparse_data_var[1]
raw_feat_value = self._dense_data_var[0]
self.label = self._sparse_data_var[0]
feat_idx = raw_feat_idx
feat_value = fluid.layers.reshape(
raw_feat_value, [-1, self.num_field, 1]) # None * num_field * 1
# ------------------------- Embedding --------------------------
feat_embeddings_re = fluid.embedding(
input=feat_idx,
is_sparse=True,
is_distributed=is_distributed,
dtype='float32',
size=[self.sparse_feature_number + 1, self.sparse_feature_dim],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0,
scale=init_value_ /
math.sqrt(float(self.sparse_feature_dim)))))
feat_embeddings = fluid.layers.reshape(
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # None * num_field * embedding_size
# None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
inter_input = feat_embeddings
# ------------------------- interacting layer --------------------------
for _ in range(self.n_interacting_layers):
interacting_layer_out = self.interacting_layer(inter_input)
inter_input = interacting_layer_out
# ------------------------- DNN --------------------------
dnn_input = fluid.layers.flatten(interacting_layer_out, axis=1)
y_dnn = fluid.layers.fc(
input=dnn_input,
size=1,
act=None,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)))
self.predict = fluid.layers.sigmoid(y_dnn)
cost = fluid.layers.log_loss(
input=self.predict, label=fluid.layers.cast(self.label, "float32"))
avg_cost = fluid.layers.reduce_sum(cost)
self._cost = avg_cost
predict_2d = fluid.layers.concat([1 - self.predict, self.predict], 1)
label_int = fluid.layers.cast(self.label, 'int64')
auc_var, batch_auc_var, _ = fluid.layers.auc(input=predict_2d,
label=label_int,
slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
if is_infer:
self._infer_results["AUC"] = auc_var
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# global settings
debug: false
workspace: "paddlerec.models.rank.BST"
dataset:
- name: sample_1
type: DataLoader
batch_size: 5
data_path: "{workspace}/data/train_data"
sparse_slots: "label history cate position target target_cate target_position"
- name: infer_sample
type: DataLoader
batch_size: 5
data_path: "{workspace}/data/train_data"
sparse_slots: "label history cate position target target_cate target_position"
hyper_parameters:
optimizer:
class: SGD
learning_rate: 0.0001
use_DataLoader: True
item_emb_size: 96
cat_emb_size: 96
position_emb_size: 96
is_sparse: False
item_count: 63001
cat_count: 801
position_count: 5001
n_encoder_layers: 1
d_model: 288
d_key: 48
d_value: 48
n_head: 6
dropout_rate: 0
postprocess_cmd: "da"
prepostprocess_dropout: 0
d_inner_hid: 512
relu_dropout: 0.0
act: "relu"
fc_sizes: [1024, 512, 256]
mode: train_runner
runner:
- name: train_runner
class: train
epochs: 1
device: cpu
init_model_path: ""
save_checkpoint_interval: 1
save_inference_interval: 1
save_checkpoint_path: "increment_BST"
save_inference_path: "inference_BST"
print_interval: 1
- name: infer_runner
class: infer
device: cpu
init_model_path: "increment_BST/0"
print_interval: 1
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: sample_1
thread_num: 1
#- name: infer_phase
# model: "{workspace}/model.py"
# dataset_name: infer_sample
# thread_num: 1
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import random
import pickle
random.seed(1234)
print("read and process data")
with open('./raw_data/remap.pkl', 'rb') as f:
reviews_df = pickle.load(f)
cate_list = pickle.load(f)
user_count, item_count, cate_count, example_count = pickle.load(f)
train_set = []
test_set = []
for reviewerID, hist in reviews_df.groupby('reviewerID'):
pos_list = hist['asin'].tolist()
time_list = hist['unixReviewTime'].tolist()
def gen_neg():
neg = pos_list[0]
while neg in pos_list:
neg = random.randint(0, item_count - 1)
return neg
neg_list = [gen_neg() for i in range(len(pos_list))]
for i in range(1, len(pos_list)):
hist = pos_list[:i]
# set maximum position value
time_seq = [
min(int((time_list[i] - time_list[j]) / (3600 * 24)), 5000)
for j in range(i)
]
if i != len(pos_list) - 1:
train_set.append((reviewerID, hist, pos_list[i], 1, time_seq))
train_set.append((reviewerID, hist, neg_list[i], 0, time_seq))
else:
label = (pos_list[i], neg_list[i])
test_set.append((reviewerID, hist, label, time_seq))
random.shuffle(train_set)
random.shuffle(test_set)
assert len(test_set) == user_count
def print_to_file(data, fout, slot):
if not isinstance(data, list):
data = [data]
for i in range(len(data)):
fout.write(slot + ":" + str(data[i]))
fout.write(' ')
print("make train data")
with open("paddle_train.txt", "w") as fout:
for line in train_set:
history = line[1]
target = line[2]
label = line[3]
position = line[4]
cate = [cate_list[x] for x in history]
print_to_file(history, fout, "history")
print_to_file(cate, fout, "cate")
print_to_file(position, fout, "position")
print_to_file(target, fout, "target")
print_to_file(cate_list[target], fout, "target_cate")
print_to_file(0, fout, "target_position")
print_to_file(label, fout, "label")
fout.write("\n")
print("make test data")
with open("paddle_test.txt", "w") as fout:
for line in test_set:
history = line[1]
target = line[2]
position = line[3]
cate = [cate_list[x] for x in history]
print_to_file(history, fout, "history")
print_to_file(cate, fout, "cate")
print_to_file(position, fout, "position")
print_to_file(target[0], fout, "target")
print_to_file(cate_list[target[0]], fout, "target_cate")
print_to_file(0, fout, "target_position")
fout.write("label:1\n")
print_to_file(history, fout, "history")
print_to_file(cate, fout, "cate")
print_to_file(position, fout, "position")
print_to_file(target[0], fout, "target")
print_to_file(cate_list[target[1]], fout, "target_cate")
print_to_file(0, fout, "target_position")
fout.write("label:0\n")
print("make config data")
with open('config.txt', 'w') as f:
f.write(str(user_count) + "\n")
f.write(str(item_count) + "\n")
f.write(str(cate_count) + "\n")
f.wrire(str(50000) + "\n")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import pickle
import pandas as pd
def to_df(file_path):
with open(file_path, 'r') as fin:
df = {}
i = 0
for line in fin:
df[i] = eval(line)
i += 1
df = pd.DataFrame.from_dict(df, orient='index')
return df
print("start to analyse reviews_Electronics_5.json")
reviews_df = to_df('./raw_data/reviews_Electronics_5.json')
with open('./raw_data/reviews.pkl', 'wb') as f:
pickle.dump(reviews_df, f, pickle.HIGHEST_PROTOCOL)
print("start to analyse meta_Electronics.json")
meta_df = to_df('./raw_data/meta_Electronics.json')
meta_df = meta_df[meta_df['asin'].isin(reviews_df['asin'].unique())]
meta_df = meta_df.reset_index(drop=True)
with open('./raw_data/meta.pkl', 'wb') as f:
pickle.dump(meta_df, f, pickle.HIGHEST_PROTOCOL)
#! /bin/bash
set -e
echo "begin download data"
mkdir raw_data
cd raw_data
wget -c http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Electronics_5.json.gz
gzip -d reviews_Electronics_5.json.gz
wget -c http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Electronics.json.gz
gzip -d meta_Electronics.json.gz
echo "download data successfully"
cd ..
python convert_pd.py
python remap_id.py
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import random
import pickle
import numpy as np
random.seed(1234)
with open('./raw_data/reviews.pkl', 'rb') as f:
reviews_df = pickle.load(f)
reviews_df = reviews_df[['reviewerID', 'asin', 'unixReviewTime']]
with open('./raw_data/meta.pkl', 'rb') as f:
meta_df = pickle.load(f)
meta_df = meta_df[['asin', 'categories']]
meta_df['categories'] = meta_df['categories'].map(lambda x: x[-1][-1])
def build_map(df, col_name):
key = sorted(df[col_name].unique().tolist())
m = dict(zip(key, range(len(key))))
df[col_name] = df[col_name].map(lambda x: m[x])
return m, key
asin_map, asin_key = build_map(meta_df, 'asin')
cate_map, cate_key = build_map(meta_df, 'categories')
revi_map, revi_key = build_map(reviews_df, 'reviewerID')
user_count, item_count, cate_count, example_count =\
len(revi_map), len(asin_map), len(cate_map), reviews_df.shape[0]
print('user_count: %d\titem_count: %d\tcate_count: %d\texample_count: %d' %
(user_count, item_count, cate_count, example_count))
meta_df = meta_df.sort_values('asin')
meta_df = meta_df.reset_index(drop=True)
reviews_df['asin'] = reviews_df['asin'].map(lambda x: asin_map[x])
reviews_df = reviews_df.sort_values(['reviewerID', 'unixReviewTime'])
reviews_df = reviews_df.reset_index(drop=True)
reviews_df = reviews_df[['reviewerID', 'asin', 'unixReviewTime']]
cate_list = [meta_df['categories'][i] for i in range(len(asin_map))]
cate_list = np.array(cate_list, dtype=np.int32)
with open('./raw_data/remap.pkl', 'wb') as f:
pickle.dump(reviews_df, f, pickle.HIGHEST_PROTOCOL) # uid, iid
pickle.dump(cate_list, f, pickle.HIGHEST_PROTOCOL) # cid of iid line
pickle.dump((user_count, item_count, cate_count, example_count), f,
pickle.HIGHEST_PROTOCOL)
pickle.dump((asin_key, cate_key, revi_key), f, pickle.HIGHEST_PROTOCOL)
此差异已折叠。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from functools import partial
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.layers as layers
from paddlerec.core.utils import envs
from paddlerec.core.model import ModelBase
def positionwise_feed_forward(x, d_inner_hid, d_hid, dropout_rate):
"""
Position-wise Feed-Forward Networks.
This module consists of two linear transformations with a ReLU activation
in between, which is applied to each position separately and identically.
"""
hidden = layers.fc(input=x,
size=d_inner_hid,
num_flatten_dims=2,
act="relu")
if dropout_rate:
hidden = layers.dropout(
hidden,
dropout_prob=dropout_rate,
seed=dropout_seed,
is_test=False)
out = layers.fc(input=hidden, size=d_hid, num_flatten_dims=2)
return out
def pre_post_process_layer(prev_out, out, process_cmd, dropout_rate=0.):
"""
Add residual connection, layer normalization and droput to the out tensor
optionally according to the value of process_cmd.
This will be used before or after multi-head attention and position-wise
feed-forward networks.
"""
for cmd in process_cmd:
if cmd == "a": # add residual connection
out = out + prev_out if prev_out else out
elif cmd == "n": # add layer normalization
out = layers.layer_norm(
out,
begin_norm_axis=len(out.shape) - 1,
param_attr=fluid.initializer.Constant(1.),
bias_attr=fluid.initializer.Constant(0.))
elif cmd == "d": # add dropout
if dropout_rate:
out = layers.dropout(
out,
dropout_prob=dropout_rate,
seed=dropout_seed,
is_test=False)
return out
pre_process_layer = partial(pre_post_process_layer, None)
post_process_layer = pre_post_process_layer
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.item_emb_size = envs.get_global_env(
"hyper_parameters.item_emb_size", 64)
self.cat_emb_size = envs.get_global_env(
"hyper_parameters.cat_emb_size", 64)
self.position_emb_size = envs.get_global_env(
"hyper_parameters.position_emb_size", 64)
self.act = envs.get_global_env("hyper_parameters.act", "sigmoid")
self.is_sparse = envs.get_global_env("hyper_parameters.is_sparse",
False)
# significant for speeding up the training process
self.use_DataLoader = envs.get_global_env(
"hyper_parameters.use_DataLoader", False)
self.item_count = envs.get_global_env("hyper_parameters.item_count",
63001)
self.cat_count = envs.get_global_env("hyper_parameters.cat_count", 801)
self.position_count = envs.get_global_env(
"hyper_parameters.position_count", 5001)
self.n_encoder_layers = envs.get_global_env(
"hyper_parameters.n_encoder_layers", 1)
self.d_model = envs.get_global_env("hyper_parameters.d_model", 96)
self.d_key = envs.get_global_env("hyper_parameters.d_key", None)
self.d_value = envs.get_global_env("hyper_parameters.d_value", None)
self.n_head = envs.get_global_env("hyper_parameters.n_head", None)
self.dropout_rate = envs.get_global_env(
"hyper_parameters.dropout_rate", 0.0)
self.postprocess_cmd = envs.get_global_env(
"hyper_parameters.postprocess_cmd", "da")
self.preprocess_cmd = envs.get_global_env(
"hyper_parameters.postprocess_cmd", "n")
self.prepostprocess_dropout = envs.get_global_env(
"hyper_parameters.prepostprocess_dropout", 0.0)
self.d_inner_hid = envs.get_global_env("hyper_parameters.d_inner_hid",
512)
self.relu_dropout = envs.get_global_env(
"hyper_parameters.relu_dropout", 0.0)
self.layer_sizes = envs.get_global_env("hyper_parameters.fc_sizes",
None)
def multi_head_attention(self, queries, keys, values, d_key, d_value,
d_model, n_head, dropout_rate):
keys = queries if keys is None else keys
values = keys if values is None else values
if not (len(queries.shape) == len(keys.shape) == len(values.shape) == 3
):
raise ValueError(
"Inputs: quries, keys and values should all be 3-D tensors.")
def __compute_qkv(queries, keys, values, n_head, d_key, d_value):
"""
Add linear projection to queries, keys, and values.
"""
q = fluid.layers.fc(input=queries,
size=d_key * n_head,
bias_attr=False,
num_flatten_dims=2)
k = fluid.layers.fc(input=keys,
size=d_key * n_head,
bias_attr=False,
num_flatten_dims=2)
v = fluid.layers.fc(input=values,
size=d_value * n_head,
bias_attr=False,
num_flatten_dims=2)
return q, k, v
def __split_heads_qkv(queries, keys, values, n_head, d_key, d_value):
"""
Reshape input tensors at the last dimension to split multi-heads
and then transpose. Specifically, transform the input tensor with shape
[bs, max_sequence_length, n_head * hidden_dim] to the output tensor
with shape [bs, n_head, max_sequence_length, hidden_dim].
"""
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
reshaped_q = fluid.layers.reshape(
x=queries, shape=[0, 0, n_head, d_key], inplace=True)
# permuate the dimensions into:
# [batch_size, n_head, max_sequence_len, hidden_size_per_head]
q = fluid.layers.transpose(x=reshaped_q, perm=[0, 2, 1, 3])
# For encoder-decoder attention in inference, insert the ops and vars
# into global block to use as cache among beam search.
reshaped_k = fluid.layers.reshape(
x=keys, shape=[0, 0, n_head, d_key], inplace=True)
k = fluid.layers.transpose(x=reshaped_k, perm=[0, 2, 1, 3])
reshaped_v = fluid.layers.reshape(
x=values, shape=[0, 0, n_head, d_value], inplace=True)
v = fluid.layers.transpose(x=reshaped_v, perm=[0, 2, 1, 3])
return q, k, v
def scaled_dot_product_attention(q, k, v, d_key, dropout_rate):
"""
Scaled Dot-Product Attention
"""
product = fluid.layers.matmul(
x=q, y=k, transpose_y=True, alpha=d_key**-0.5)
weights = fluid.layers.softmax(product)
if dropout_rate:
weights = fluid.layers.dropout(
weights,
dropout_prob=dropout_rate,
seed=None,
is_test=False)
out = fluid.layers.matmul(weights, v)
return out
def __combine_heads(x):
"""
Transpose and then reshape the last two dimensions of inpunt tensor x
so that it becomes one dimension, which is reverse to __split_heads.
"""
if len(x.shape) != 4:
raise ValueError("Input(x) should be a 4-D Tensor.")
trans_x = fluid.layers.transpose(x, perm=[0, 2, 1, 3])
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
return fluid.layers.reshape(
x=trans_x,
shape=[0, 0, trans_x.shape[2] * trans_x.shape[3]],
inplace=True)
q, k, v = __compute_qkv(queries, keys, values, n_head, d_key, d_value)
q, k, v = __split_heads_qkv(q, k, v, n_head, d_key, d_value)
ctx_multiheads = scaled_dot_product_attention(q, k, v, d_model,
dropout_rate)
out = __combine_heads(ctx_multiheads)
proj_out = fluid.layers.fc(input=out,
size=d_model,
bias_attr=False,
num_flatten_dims=2)
return proj_out
def encoder_layer(self, x):
attention_out = self.multi_head_attention(
pre_process_layer(x, self.preprocess_cmd,
self.prepostprocess_dropout), None, None,
self.d_key, self.d_value, self.d_model, self.n_head,
self.dropout_rate)
attn_output = post_process_layer(x, attention_out,
self.postprocess_cmd,
self.prepostprocess_dropout)
ffd_output = positionwise_feed_forward(
pre_process_layer(attn_output, self.preprocess_cmd,
self.prepostprocess_dropout), self.d_inner_hid,
self.d_model, self.relu_dropout)
return post_process_layer(attn_output, ffd_output,
self.postprocess_cmd,
self.prepostprocess_dropout)
def net(self, inputs, is_infer=False):
init_value_ = 0.1
hist_item_seq = self._sparse_data_var[1]
hist_cat_seq = self._sparse_data_var[2]
position_seq = self._sparse_data_var[3]
target_item = self._sparse_data_var[4]
target_cat = self._sparse_data_var[5]
target_position = self._sparse_data_var[6]
self.label = self._sparse_data_var[0]
item_emb_attr = fluid.ParamAttr(name="item_emb")
cat_emb_attr = fluid.ParamAttr(name="cat_emb")
position_emb_attr = fluid.ParamAttr(name="position_emb")
hist_item_emb = fluid.embedding(
input=hist_item_seq,
size=[self.item_count, self.item_emb_size],
param_attr=item_emb_attr,
is_sparse=self.is_sparse)
hist_cat_emb = fluid.embedding(
input=hist_cat_seq,
size=[self.cat_count, self.cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=self.is_sparse)
hist_position_emb = fluid.embedding(
input=hist_cat_seq,
size=[self.position_count, self.position_emb_size],
param_attr=position_emb_attr,
is_sparse=self.is_sparse)
target_item_emb = fluid.embedding(
input=target_item,
size=[self.item_count, self.item_emb_size],
param_attr=item_emb_attr,
is_sparse=self.is_sparse)
target_cat_emb = fluid.embedding(
input=target_cat,
size=[self.cat_count, self.cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=self.is_sparse)
target_position_emb = fluid.embedding(
input=target_position,
size=[self.position_count, self.position_emb_size],
param_attr=position_emb_attr,
is_sparse=self.is_sparse)
item_sequence_target = fluid.layers.reduce_sum(
fluid.layers.sequence_concat([hist_item_emb, target_item_emb]),
dim=1)
cat_sequence_target = fluid.layers.reduce_sum(
fluid.layers.sequence_concat([hist_cat_emb, target_cat_emb]),
dim=1)
position_sequence_target = fluid.layers.reduce_sum(
fluid.layers.sequence_concat(
[hist_position_emb, target_position_emb]),
dim=1)
whole_embedding_withlod = fluid.layers.concat(
[
item_sequence_target, cat_sequence_target,
position_sequence_target
],
axis=1)
pad_value = fluid.layers.assign(input=np.array(
[0.0], dtype=np.float32))
whole_embedding, _ = fluid.layers.sequence_pad(whole_embedding_withlod,
pad_value)
for _ in range(self.n_encoder_layers):
enc_output = self.encoder_layer(whole_embedding)
enc_input = enc_output
enc_output = pre_process_layer(enc_output, self.preprocess_cmd,
self.prepostprocess_dropout)
dnn_input = fluid.layers.reduce_sum(enc_output, dim=1)
for s in self.layer_sizes:
dnn_input = fluid.layers.fc(
input=dnn_input,
size=s,
act=self.act,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_ / math.sqrt(float(10)))),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)))
y_dnn = fluid.layers.fc(input=dnn_input, size=1, act=None)
self.predict = fluid.layers.sigmoid(y_dnn)
cost = fluid.layers.log_loss(
input=self.predict, label=fluid.layers.cast(self.label, "float32"))
avg_cost = fluid.layers.reduce_sum(cost)
self._cost = avg_cost
predict_2d = fluid.layers.concat([1 - self.predict, self.predict], 1)
label_int = fluid.layers.cast(self.label, 'int64')
auc_var, batch_auc_var, _ = fluid.layers.auc(input=predict_2d,
label=label_int,
slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
if is_infer:
self._infer_results["AUC"] = auc_var
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
backend: "PaddleCloud"
cluster_type: k8s # mpi 可选
config:
fs_name: "afs://xxx.com"
fs_ugi: "usr,pwd"
output_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
# for mpi
train_data_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
test_data_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
thirdparty_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
paddle_version: "1.7.2" # 填写paddle官方版本号 >= 1.7.2
# for k8s
afs_remote_mount_point: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
# paddle分布式底层超参,无特殊需求不理不改
communicator:
FLAGS_communicator_is_sgd_optimizer: 0
FLAGS_communicator_send_queue_size: 5
FLAGS_communicator_thread_pool_size: 32
FLAGS_communicator_max_merge_var_num: 5
FLAGS_communicator_max_send_grad_num_before_recv: 5
FLAGS_communicator_fake_rpc: 0
FLAGS_rpc_retry_times: 3
submit:
ak: ""
sk: ""
priority: "high"
job_name: "PaddleRec_CTR"
group: ""
start_cmd: "python -m paddlerec.run -m ./config.yaml"
files: ./*.py ./*.yaml
# for mpi ps-cpu
nodes: 2
# for k8s gpu
k8s_trainers: 2
k8s_cpu_cores: 2
k8s_gpu_card: 1
# for k8s ps-cpu
k8s_trainers: 2
k8s_cpu_cores: 4
k8s_ps_num: 2
k8s_ps_cores: 4
......@@ -80,6 +80,28 @@ runner:
init_model_path: "increment_dnn" # load model path
phases: [phase2]
- name: ps_cluster
class: cluster_train
epochs: 2
device: cpu
fleet_mode: ps
save_checkpoint_interval: 1 # save model interval of epochs
save_checkpoint_path: "increment_dnn" # save checkpoint path
init_model_path: "" # load model path
print_interval: 1
phases: [phase1]
- name: collective_cluster
class: cluster_train
epochs: 2
device: gpu
fleet_mode: collective
save_checkpoint_interval: 1 # save model interval of epochs
save_checkpoint_path: "increment_dnn" # save checkpoint path
init_model_path: "" # load model path
print_interval: 1
phases: [phase1]
# runner will run all the phase in each epoch
phase:
- name: phase1
......
......@@ -59,8 +59,8 @@ runner:
device: cpu
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment_model" # save checkpoint path
save_inference_path: "inference" # save inference path
save_checkpoint_path: "increment_model_fibinet" # save checkpoint path
save_inference_path: "inference_fibinet" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
......@@ -75,8 +75,8 @@ runner:
device: gpu
save_checkpoint_interval: 1 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment_model" # save checkpoint path
save_inference_path: "inference" # save inference path
save_checkpoint_path: "increment_model_fibinet" # save checkpoint path
save_inference_path: "inference_fibinet" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
......@@ -87,14 +87,14 @@ runner:
class: infer
# device to run training or infer
device: cpu
init_model_path: "increment_model" # load model path
init_model_path: "increment_model_fibinet" # load model path
phases: [phase2]
- name: single_gpu_infer
class: infer
# device to run training or infer
device: gpu
init_model_path: "increment_model" # load model path
init_model_path: "increment_model_fibinet" # load model path
phases: [phase2]
# runner will run all the phase in each epoch
......
# FLEN
以下是本例的简要目录结构及说明:
```
├── data #样例数据
├── sample_data
├── train
├── sample_train.txt
├── run.sh
├── get_slot_data.py
├── __init__.py
├── README.md # 文档
├── model.py #模型文件
├── config.yaml #配置文件
```
## 简介
[《FLEN: Leveraging Field for Scalable CTR Prediction》](https://arxiv.org/pdf/1911.04690.pdf)文章提出了field-wise bi-interaction pooling技术,解决了在大规模应用特征field信息时存在的时间复杂度和空间复杂度高的困境,同时提出了一种缓解梯度耦合问题的方法dicefactor。该模型已应用于美图的大规模推荐系统中,持续稳定地取得业务效果的全面提升。
本项目在avazu数据集上验证模型效果
## 数据下载及预处理
## 环境
PaddlePaddle 1.7.2
python3.7
PaddleRec
## 单机训练
CPU环境
在config.yaml文件中设置好设备,epochs等。
```
# select runner by name
mode: [single_cpu_train, single_cpu_infer]
# config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: single_cpu_train
class: train
# num of epochs
epochs: 4
# device to run training or infer
device: cpu
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment_model" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
phases: [phase1]
```
## 单机预测
CPU环境
在config.yaml文件中设置好epochs、device等参数。
```
- name: single_cpu_infer
class: infer
# num of epochs
epochs: 1
# device to run training or infer
device: cpu #选择预测的设备
init_model_path: "increment_dnn" # load model path
phases: [phase2]
```
## 运行
```
python -m paddlerec.run -m paddlerec.models.rank.flen
```
## 模型效果
在样例数据上测试模型
训练:
```
0702 13:38:20.903220 7368 parallel_executor.cc:440] The Program will be executed on CPU using ParallelExecutor, 2 cards are used, so 2 programs are executed in parallel.
I0702 13:38:20.925912 7368 parallel_executor.cc:307] Inplace strategy is enabled, when build_strategy.enable_inplace = True
I0702 13:38:20.933356 7368 parallel_executor.cc:375] Garbage collection strategy is enabled, when FLAGS_eager_delete_tensor_gb = 0
batch: 2, AUC: [0.09090909 0. ], BATCH_AUC: [0.09090909 0. ]
batch: 4, AUC: [0.31578947 0.29411765], BATCH_AUC: [0.31578947 0.29411765]
batch: 6, AUC: [0.41333333 0.33333333], BATCH_AUC: [0.41333333 0.33333333]
batch: 8, AUC: [0.4453125 0.44166667], BATCH_AUC: [0.4453125 0.44166667]
batch: 10, AUC: [0.39473684 0.38888889], BATCH_AUC: [0.44117647 0.41176471]
batch: 12, AUC: [0.41860465 0.45535714], BATCH_AUC: [0.5078125 0.54545455]
batch: 14, AUC: [0.43413729 0.42746615], BATCH_AUC: [0.56666667 0.56 ]
batch: 16, AUC: [0.46433566 0.47460087], BATCH_AUC: [0.53 0.59247649]
batch: 18, AUC: [0.44009217 0.44642857], BATCH_AUC: [0.46 0.47]
batch: 20, AUC: [0.42705314 0.43781095], BATCH_AUC: [0.45878136 0.4874552 ]
batch: 22, AUC: [0.45176471 0.46011281], BATCH_AUC: [0.48046875 0.45878136]
batch: 24, AUC: [0.48375 0.48910256], BATCH_AUC: [0.56630824 0.59856631]
epoch 0 done, use time: 0.21532440185546875
PaddleRec Finish
```
预测
```
PaddleRec: Runner single_cpu_infer Begin
Executor Mode: infer
processor_register begin
Running SingleInstance.
Running SingleNetwork.
QueueDataset can not support PY3, change to DataLoader
QueueDataset can not support PY3, change to DataLoader
Running SingleInferStartup.
Running SingleInferRunner.
load persistables from increment_model/0
batch: 20, AUC: [0.49121353], BATCH_AUC: [0.66176471]
batch: 40, AUC: [0.51156463], BATCH_AUC: [0.55197133]
Infer phase2 of 0 done, use time: 0.3941819667816162
PaddleRec Finish
```
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# workspace
workspace: "paddlerec.models.rank.flen"
# list of dataset
dataset:
- name: dataloader_train # name of dataset to distinguish different datasets
batch_size: 2
type: QueueDataset
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click user_0 user_1 user_2 user_3 user_4 user_5 user_6 user_7 user_8 user_9 user_10 user_11 item_0 item_1 item_2 contex_0 contex_1 contex_2 contex_3 contex_4 contex_5"
dense_slots: ""
- name: dataset_infer # name
batch_size: 2
type: QueueDataset
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click user_0 user_1 user_2 user_3 user_4 user_5 user_6 user_7 user_8 user_9 user_10 user_11 item_0 item_1 item_2 contex_0 contex_1 contex_2 contex_3 contex_4 contex_5"
dense_slots: ""
# hyper parameters of user-defined network
hyper_parameters:
# optimizer config
optimizer:
class: Adam
learning_rate: 0.001
strategy: async
# user-defined <key, value> pairs
sparse_inputs_slots: 21
sparse_feature_number: 100
sparse_feature_dim: 8
dense_input_dim: 1
dropout_rate: 0.5
# select runner by name
mode: [single_cpu_train, single_cpu_infer]
# config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: single_cpu_train
class: train
# num of epochs
epochs: 1
# device to run training or infer
device: cpu
save_checkpoint_interval: 1 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment_model_flen" # save checkpoint path
save_inference_path: "inference_flen" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 2
phases: [phase1]
- name: single_gpu_train
class: train
# num of epochs
epochs: 1
# device to run training or infer
device: gpu
save_checkpoint_interval: 1 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment_model_flen" # save checkpoint path
save_inference_path: "inference_flen" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 2
phases: [phase1]
- name: single_cpu_infer
class: infer
# device to run training or infer
device: cpu
init_model_path: "increment_model_flen" # load model path
phases: [phase2]
- name: single_gpu_infer
class: infer
# device to run training or infer
device: gpu
init_model_path: "increment_model_flen" # load model path
phases: [phase2]
# runner will run all the phase in each epoch
phase:
- name: phase1
model: "{workspace}/model.py" # user-defined model
dataset_name: dataloader_train # select dataset by name
thread_num: 2
- name: phase2
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_infer # select dataset by name
thread_num: 2
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
class CriteoDataset(dg.MultiSlotDataGenerator):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.strip().split(',')
label = [int(features[0])]
s = "click:" + str(label[0])
for i, elem in enumerate(features[1:13]):
s += " user_" + str(i) + ":" + str(elem)
for i, elem in enumerate(features[13:16]):
s += " item_" + str(i) + ":" + str(elem)
for i, elem in enumerate(features[16:]):
s += " contex_" + str(i) + ":" + str(elem)
print(s.strip())
yield None
return reader
d = CriteoDataset()
d.run_from_stdin()
mkdir train
for i in `ls ./train_data`
do
cat train_data/$i | python get_slot_data.py > train/$i
done
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import itertools
from paddlerec.core.utils import envs
from paddlerec.core.model import ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim")
self.learning_rate = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
def _FieldWiseBiInteraction(self, inputs):
# MF module
field_wise_embeds_list = inputs
field_wise_vectors = [
fluid.layers.reduce_sum(
field_i_vectors, dim=1, keep_dim=True)
for field_i_vectors in field_wise_embeds_list
]
num_fields = len(field_wise_vectors)
h_mf_list = []
for emb_left, emb_right in itertools.combinations(field_wise_vectors,
2):
embeddings_prod = fluid.layers.elementwise_mul(emb_left, emb_right)
field_weighted_embedding = fluid.layers.fc(
input=embeddings_prod,
size=self.sparse_feature_dim,
param_attr=fluid.initializer.ConstantInitializer(value=1),
name='kernel_mf')
h_mf_list.append(field_weighted_embedding)
h_mf = fluid.layers.concat(h_mf_list, axis=1)
h_mf = fluid.layers.reshape(
x=h_mf, shape=[-1, num_fields, self.sparse_feature_dim])
h_mf = fluid.layers.reduce_sum(h_mf, dim=1)
square_of_sum_list = [
fluid.layers.square(
fluid.layers.reduce_sum(
field_i_vectors, dim=1, keep_dim=True))
for field_i_vectors in field_wise_embeds_list
]
sum_of_square_list = [
fluid.layers.reduce_sum(
fluid.layers.elementwise_mul(field_i_vectors, field_i_vectors),
dim=1,
keep_dim=True) for field_i_vectors in field_wise_embeds_list
]
field_fm_list = []
for square_of_sum, sum_of_square in zip(square_of_sum_list,
sum_of_square_list):
field_fm = fluid.layers.reshape(
fluid.layers.elementwise_sub(square_of_sum, sum_of_square),
shape=[-1, self.sparse_feature_dim])
field_fm = fluid.layers.fc(
input=field_fm,
size=self.sparse_feature_dim,
param_attr=fluid.initializer.ConstantInitializer(value=0.5),
name='kernel_fm')
field_fm_list.append(field_fm)
h_fm = fluid.layers.concat(field_fm_list, axis=1)
h_fm = fluid.layers.reshape(
x=h_fm, shape=[-1, num_fields, self.sparse_feature_dim])
h_fm = fluid.layers.reduce_sum(h_fm, dim=1)
return fluid.layers.elementwise_add(h_mf, h_fm)
def _DNNLayer(self, inputs, dropout_rate=0.2):
deep_input = inputs
for i, hidden_unit in enumerate([64, 32]):
fc_out = fluid.layers.fc(
input=deep_input,
size=hidden_unit,
param_attr=fluid.initializer.Xavier(uniform=False),
act='relu',
name='d_' + str(i))
fc_out = fluid.layers.dropout(fc_out, dropout_prob=dropout_rate)
deep_input = fc_out
return deep_input
def _embeddingLayer(self, inputs):
emb_list = []
in_len = len(inputs)
for data in inputs:
feat_emb = fluid.embedding(
input=data,
size=[self.sparse_feature_number, self.sparse_feature_dim],
param_attr=fluid.ParamAttr(
name='item_emb',
learning_rate=5,
initializer=fluid.initializer.Xavier(
fan_in=self.sparse_feature_dim,
fan_out=self.sparse_feature_dim)),
is_sparse=True)
emb_list.append(feat_emb)
concat_emb = fluid.layers.concat(emb_list, axis=1)
field_emb = fluid.layers.reshape(
x=concat_emb, shape=[-1, in_len, self.sparse_feature_dim])
return field_emb
def net(self, input, is_infer=False):
self.user_inputs = self._sparse_data_var[1:13]
self.item_inputs = self._sparse_data_var[13:16]
self.contex_inputs = self._sparse_data_var[16:]
self.label_input = self._sparse_data_var[0]
dropout_rate = envs.get_global_env("hyper_parameters.dropout_rate")
field_wise_embeds_list = []
for inputs in [self.user_inputs, self.item_inputs, self.contex_inputs]:
field_emb = self._embeddingLayer(inputs)
field_wise_embeds_list.append(field_emb)
dnn_input = fluid.layers.concat(
[
fluid.layers.flatten(
x=field_i_vectors, axis=1)
for field_i_vectors in field_wise_embeds_list
],
axis=1)
#mlp part
dnn_output = self._DNNLayer(dnn_input, dropout_rate)
#field-weighted embedding
fm_mf_out = self._FieldWiseBiInteraction(field_wise_embeds_list)
logits = fluid.layers.concat([fm_mf_out, dnn_output], axis=1)
y_pred = fluid.layers.fc(
input=logits,
size=1,
param_attr=fluid.initializer.Xavier(uniform=False),
act='sigmoid',
name='logit')
self.predict = y_pred
auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
label=self.label_input,
num_thresholds=2**12,
slide_steps=20)
if is_infer:
self._infer_results["AUC"] = auc
self._infer_results["BATCH_AUC"] = batch_auc
return
self._metrics["AUC"] = auc
self._metrics["BATCH_AUC"] = batch_auc
cost = fluid.layers.log_loss(
input=self.predict,
label=fluid.layers.cast(
x=self.label_input, dtype='float32'))
avg_cost = fluid.layers.reduce_mean(cost)
self._cost = avg_cost
......@@ -37,8 +37,10 @@
| xDeepFM | xDeepFM | [xDeepFM: Combining Explicit and Implicit Feature Interactions for Recommender Systems](https://dl.acm.org/doi/pdf/10.1145/3219819.3220023)(2018) |
| DIN | Deep Interest Network | [Deep Interest Network for Click-Through Rate Prediction](https://dl.acm.org/doi/pdf/10.1145/3219819.3219823)(2018) |
| DIEN | Deep Interest Evolution Network | [Deep Interest Evolution Network for Click-Through Rate Prediction](https://www.aaai.org/ojs/index.php/AAAI/article/view/4545/4423)(2019) |
| BST | transformer in user behavior sequence for rank | [Behavior Sequence Transformer for E-commerce Recommendation in Alibaba](https://arxiv.org/pdf/1905.06874v1.pdf)(2019) |
| FGCNN | Feature Generation by CNN | [Feature Generation by Convolutional Neural Network for Click-Through Rate Prediction](https://arxiv.org/pdf/1904.04447.pdf)(2019) |
| FIBINET | Combining Feature Importance and Bilinear feature Interaction | [《FiBiNET: Combining Feature Importance and Bilinear feature Interaction for Click-Through Rate Prediction》]( https://arxiv.org/pdf/1905.09433.pdf)(2019) |
| FLEN | Leveraging Field for Scalable CTR Prediction | [《FLEN: Leveraging Field for Scalable CTR Prediction》]( https://arxiv.org/pdf/1911.04690.pdf)(2019) |
下面是每个模型的简介(注:图片引用自链接中的论文)
......@@ -72,6 +74,11 @@
<p align="center">
<img align="center" src="../../doc/imgs/fibinet.png">
<p>
[FLEN](https://arxiv.org/pdf/1911.04690.pdf):
<p align="center">
<img align="center" src="../../doc/imgs/flen.png">
<p>
## 使用教程(快速开始)
......@@ -87,6 +94,7 @@
| Wide&Deep | 40 | 1 | 40 |
| xDeepFM | 100 | 1 | 10 |
| Fibinet | 1000 | 8 | 4 |
| Flen | 512 | 8 | 1 |
### 数据处理
参考每个模型目录数据下载&预处理脚本
......@@ -127,6 +135,7 @@ python -m paddlerec.run -m ./config.yaml # 以DNN为例
| Census-income Data | Wide&Deep | 0.76195 | 0.90577 | -- | -- |
| Amazon Product | DIN | 0.47005 | 0.86379 | -- | -- |
| Criteo | Fibinet | -- | 0.86662 | -- | -- |
| Avazu | Flen | -- | -- | -- | -- |
## 分布式
......
......@@ -38,7 +38,7 @@ def engine_registry():
engines["TRANSPILER"]["TRAIN"] = single_train_engine
engines["TRANSPILER"]["INFER"] = single_infer_engine
engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine
engines["TRANSPILER"]["CLUSTER"] = cluster_engine
engines["TRANSPILER"]["CLUSTER_TRAIN"] = cluster_engine
engines["PSLIB"]["TRAIN"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine
engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine
......@@ -111,8 +111,8 @@ def get_engine(args, running_config, mode):
engine = running_config.get(engine_class, None)
if engine is None:
raise ValueError("not find {} in yaml, please check".format(
mode, engine_class))
raise ValueError("not find {} in engine_class , please check".format(
engine))
device = running_config.get(engine_device, None)
engine = engine.upper()
......@@ -262,15 +262,48 @@ def single_infer_engine(args):
def cluster_engine(args):
def master():
from paddlerec.core.engine.cluster.cluster import ClusterEngine
_envs = envs.load_yaml(args.backend)
flattens = envs.flatten_environs(_envs, "_")
# Get fleet_mode & device
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
device = device.upper()
fleet_mode = fleet_mode.upper()
if fleet_mode == "COLLECTIVE" and device != "GPU":
raise ValueError("COLLECTIVE can not be used without GPU")
# Get Thread nums
model_envs = envs.load_yaml(args.model)
phases_class = ".".join(["runner", mode, "phases"])
phase_names = run_extras.get(phases_class)
phases = []
all_phases = model_envs.get("phase")
if phase_names is None:
phases = all_phases
else:
for phase in all_phases:
if phase["name"] in phase_names:
phases.append(phase)
thread_num = []
for phase in phases:
thread_num.append(int(phase["thread_num"]))
max_thread_num = max(thread_num)
backend_envs = envs.load_yaml(args.backend)
flattens = envs.flatten_environs(backend_envs, "_")
flattens["engine_role"] = "MASTER"
flattens["engine_mode"] = envs.get_runtime_environ("mode")
flattens["engine_run_config"] = args.model
flattens["engine_temp_path"] = tempfile.mkdtemp()
flattens["max_thread_num"] = max_thread_num
flattens["fleet_mode"] = fleet_mode
flattens["device"] = device
flattens["backend_yaml"] = args.backend
envs.set_runtime_environs(flattens)
ClusterEngine.workspace_replace()
print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value")))
launch = ClusterEngine(None, args.model)
return launch
......@@ -278,40 +311,29 @@ def cluster_engine(args):
def worker(mode):
if not mode:
raise ValueError("mode: {} can not be recognized")
from paddlerec.core.engine.cluster.cluster import ClusterEngine
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
strategy_class = ".".join(["runner", mode, "distribute_strategy"])
worker_class = ".".join(["runner", mode, "worker_num"])
server_class = ".".join(["runner", mode, "server_num"])
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
distributed_strategy = run_extras.get(strategy_class, "async")
worker_num = run_extras.get(worker_class, 1)
server_num = run_extras.get(server_class, 1)
executor_mode = "train"
device = device.upper()
fleet_mode = fleet_mode.upper()
if fleet_mode == "COLLECTIVE" and device != "GPU":
raise ValueError("COLLECTIVE can not be used with GPU")
raise ValueError("COLLECTIVE can not be used without GPU")
cluster_envs = {}
if device == "GPU":
cluster_envs["selected_gpus"] = selected_gpus
cluster_envs["server_num"] = server_num
cluster_envs["worker_num"] = worker_num
cluster_envs["fleet_mode"] = fleet_mode
cluster_envs["engine_role"] = "WORKER"
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.executor_mode"] = executor_mode
......@@ -321,15 +343,15 @@ def cluster_engine(args):
cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to with model: {}".format(
trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
set_runtime_envs(cluster_envs, args.model)
launch = ClusterEngine(None, args.model)
return launch
role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER")
if role == "WORKER":
mode = os.getenv("PADDLE_PADDLEREC_MODE", None)
mode = os.getenv("mode", None)
return worker(mode)
else:
return master()
......
[easy_install]
index_url=http://pip.baidu.com/pypi/simple
\ No newline at end of file
# coding=utf8
# -*- coding: utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
......@@ -69,7 +70,7 @@ def build(dirname):
'Criteo_data/sample_data/train/*'
]
engine_copy = ['*/*.sh']
engine_copy = ['*/*.sh', '*/*.template']
for package in packages:
if package.startswith("paddlerec.models."):
package_data[package] = models_copy
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册
反馈
建议
客服 返回
顶部