提交 84a9ad29 编写于 作者: S seiriosPlus

merge develop

......@@ -18,14 +18,6 @@ repos:
- id: detect-private-key
files: (?!.*third_party)^.*$ | (?!.*book)^.*$
- id: end-of-file-fixer
- repo: local
hooks:
- id: pylint-doc-string
name: pylint
description: Check python docstring style using docstring_checker.
entry: bash ./tools/codestyle/pylint_pre_commit.hook
language: system
files: \.(py)$
- repo: local
hooks:
- id: copyright_checker
......
......@@ -10,10 +10,22 @@ os:
env:
- JOB=check_style
- JOB=model_test
before_install:
# For pylint dockstring checker
- sudo apt-get update
- sudo apt-get install -y python-pip libpython-dev
- sudo pip install -U pip
- sudo pip install six --upgrade --ignore-installed six
- sudo pip install pillow
- sudo pip install PyYAML
- sudo pip install pylint pytest astroid isort pre-commit
- sudo pip install kiwisolver
- sudo pip install paddlepaddle==1.7.2 --ignore-installed urllib3
- sudo pip uninstall -y rarfile
- sudo pip install rarfile==3.0
- sudo python setup.py install
- |
function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; }
......@@ -28,4 +40,3 @@ notifications:
email:
on_success: change
on_failure: always
此差异已折叠。
([简体中文](./README.md)|English)
<p align="center">
<img align="center" src="doc/imgs/logo.png">
<p>
<p align="center">
<img align="center" src="doc/imgs/overview_en.png">
<p>
<h2 align="center">What is recommendation system ?</h2>
<p align="center">
<img align="center" src="doc/imgs/rec-overview-en.png">
<p>
- Recommendation system helps users quickly find useful and interesting information from massive data.
- Recommendation system is also a silver bullet to attract users, retain users, increase users' stickness or conversionn.
> Who can better use the recommendation system, who can gain more advantage in the fierce competition.
>
> At the same time, there are many problems in the process of using the recommendation system, such as: huge data, complex model, inefficient distributed training, and so on.
<h2 align="center">What is PaddleRec ?</h2>
- A quick start tool of search & recommendation algorithm based on [PaddlePaddle](https://www.paddlepaddle.org.cn/documentation/docs/en/beginners_guide/index_en.html)
- A complete solution of recommendation system for beginners, developers and researchers.
- Recommendation algorithm library including content-understanding, match, recall, rank, multi-task, re-rank etc.
| Type | Algorithm | CPU | GPU | Parameter-Server | Multi-GPU | Paper |
| :-------------------: | :-----------------------------------------------------------------------: | :---: | :-----: | :--------------: | :-------: | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Content-Understanding | [Text-Classifcation](models/contentunderstanding/classification/model.py) | ✓ | ✓ | ✓ | x | [EMNLP 2014][Convolutional neural networks for sentence classication](https://www.aclweb.org/anthology/D14-1181.pdf) |
| Content-Understanding | [TagSpace](models/contentunderstanding/tagspace/model.py) | ✓ | ✓ | ✓ | x | [EMNLP 2014][TagSpace: Semantic Embeddings from Hashtags](https://www.aclweb.org/anthology/D14-1194.pdf) |
| Match | [DSSM](models/match/dssm/model.py) | ✓ | ✓ | ✓ | x | [CIKM 2013][Learning Deep Structured Semantic Models for Web Search using Clickthrough Data](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/cikm2013_DSSM_fullversion.pdf) |
| Match | [MultiView-Simnet](models/match/multiview-simnet/model.py) | ✓ | ✓ | ✓ | x | [WWW 2015][A Multi-View Deep Learning Approach for Cross Domain User Modeling in Recommendation Systems](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/frp1159-songA.pdf) |
| Recall | [TDM](models/treebased/tdm/model.py) | ✓ | >=1.8.0 | ✓ | >=1.8.0 | [KDD 2018][Learning Tree-based Deep Model for Recommender Systems](https://arxiv.org/pdf/1801.02294.pdf) |
| Recall | [fasttext](models/recall/fasttext/model.py) | ✓ | ✓ | x | x | [EACL 2017][Bag of Tricks for Efficient Text Classification](https://www.aclweb.org/anthology/E17-2068.pdf) |
| Recall | [Word2Vec](models/recall/word2vec/model.py) | ✓ | ✓ | ✓ | x | [NIPS 2013][Distributed Representations of Words and Phrases and their Compositionality](https://papers.nips.cc/paper/5021-distributed-representations-of-words-and-phrases-and-their-compositionality.pdf) |
| Recall | [SSR](models/recall/ssr/model.py) | ✓ | ✓ | ✓ | ✓ | [SIGIR 2016][Multi-Rate Deep Learning for Temporal Recommendation](http://sonyis.me/paperpdf/spr209-song_sigir16.pdf) |
| Recall | [Gru4Rec](models/recall/gru4rec/model.py) | ✓ | ✓ | ✓ | ✓ | [2015][Session-based Recommendations with Recurrent Neural Networks](https://arxiv.org/abs/1511.06939) |
| Recall | [Youtube_dnn](models/recall/youtube_dnn/model.py) | ✓ | ✓ | ✓ | ✓ | [RecSys 2016][Deep Neural Networks for YouTube Recommendations](https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/45530.pdf) |
| Recall | [NCF](models/recall/ncf/model.py) | ✓ | ✓ | ✓ | ✓ | [WWW 2017][Neural Collaborative Filtering](https://arxiv.org/pdf/1708.05031.pdf) |
| Recall | [GNN](models/recall/gnn/model.py) | ✓ | ✓ | ✓ | ✓ | [AAAI 2019][Session-based Recommendation with Graph Neural Networks](https://arxiv.org/abs/1811.00855) |
| Recall | [RALM](models/recall/look-alike_recall/model.py) | ✓ | ✓ | ✓ | ✓ | [KDD 2019][Real-time Attention Based Look-alike Model for Recommender System](https://arxiv.org/pdf/1906.05022.pdf) |
| Rank | [Logistic Regression](models/rank/logistic_regression/model.py) | ✓ | x | ✓ | x | / |
| Rank | [Dnn](models/rank/dnn/model.py) | ✓ | ✓ | ✓ | ✓ | / |
| Rank | [FM](models/rank/fm/model.py) | ✓ | x | ✓ | x | [IEEE Data Mining 2010][Factorization machines](https://analyticsconsultores.com.mx/wp-content/uploads/2019/03/Factorization-Machines-Steffen-Rendle-Osaka-University-2010.pdf) |
| Rank | [FFM](models/rank/ffm/model.py) | ✓ | x | ✓ | x | [RECSYS 2016][Field-aware Factorization Machines for CTR Prediction](https://dl.acm.org/doi/pdf/10.1145/2959100.2959134) |
| Rank | [FNN](models/rank/fnn/model.py) | ✓ | x | ✓ | x | [ECIR 2016][Deep Learning over Multi-field Categorical Data](https://arxiv.org/pdf/1601.02376.pdf) |
| Rank | [Deep Crossing](models/rank/deep_crossing/model.py) | ✓ | x | ✓ | x | [ACM 2016][Deep Crossing: Web-Scale Modeling without Manually Crafted Combinatorial Features](https://www.kdd.org/kdd2016/papers/files/adf0975-shanA.pdf) |
| Rank | [Pnn](models/rank/pnn/model.py) | ✓ | x | ✓ | x | [ICDM 2016][Product-based Neural Networks for User Response Prediction](https://arxiv.org/pdf/1611.00144.pdf) |
| Rank | [DCN](models/rank/dcn/model.py) | ✓ | x | ✓ | x | [KDD 2017][Deep & Cross Network for Ad Click Predictions](https://dl.acm.org/doi/pdf/10.1145/3124749.3124754) |
| Rank | [NFM](models/rank/nfm/model.py) | ✓ | x | ✓ | x | [SIGIR 2017][Neural Factorization Machines for Sparse Predictive Analytics](https://dl.acm.org/doi/pdf/10.1145/3077136.3080777) |
| Rank | [AFM](models/rank/afm/model.py) | ✓ | x | ✓ | x | [IJCAI 2017][Attentional Factorization Machines: Learning the Weight of Feature Interactions via Attention Networks](https://arxiv.org/pdf/1708.04617.pdf) |
| Rank | [DeepFM](models/rank/deepfm/model.py) | ✓ | x | ✓ | x | [IJCAI 2017][DeepFM: A Factorization-Machine based Neural Network for CTR Prediction](https://arxiv.org/pdf/1703.04247.pdf) |
| Rank | [xDeepFM](models/rank/xdeepfm/model.py) | ✓ | x | ✓ | x | [KDD 2018][xDeepFM: Combining Explicit and Implicit Feature Interactions for Recommender Systems](https://dl.acm.org/doi/pdf/10.1145/3219819.3220023) |
| Rank | [DIN](models/rank/din/model.py) | ✓ | x | ✓ | x | [KDD 2018][Deep Interest Network for Click-Through Rate Prediction](https://dl.acm.org/doi/pdf/10.1145/3219819.3219823) |
| Rank | [DIEN](models/rank/dien/model.py) | ✓ | x | ✓ | x | [AAAI 2019][Deep Interest Evolution Network for Click-Through Rate Prediction](https://www.aaai.org/ojs/index.php/AAAI/article/view/4545/4423) |
| Rank | [BST](models/rank/BST/model.py) | ✓ | x | ✓ | x | [DLP-KDD 2019][Behavior Sequence Transformer for E-commerce Recommendation in Alibaba](https://arxiv.org/pdf/1905.06874v1.pdf) |
| Rank | [AutoInt](models/rank/AutoInt/model.py) | ✓ | x | ✓ | x | [CIKM 2019][AutoInt: Automatic Feature Interaction Learning via Self-Attentive Neural Networks](https://arxiv.org/pdf/1810.11921.pdf) |
| Rank | [Wide&Deep](models/rank/wide_deep/model.py) | ✓ | x | ✓ | x | [DLRS 2016][Wide & Deep Learning for Recommender Systems](https://dl.acm.org/doi/pdf/10.1145/2988450.2988454) |
| Rank | [FGCNN](models/rank/fgcnn/model.py) | ✓ | ✓ | ✓ | ✓ | [WWW 2019][Feature Generation by Convolutional Neural Network for Click-Through Rate Prediction](https://arxiv.org/pdf/1904.04447.pdf) |
| Rank | [Fibinet](models/rank/fibinet/model.py) | ✓ | ✓ | ✓ | ✓ | [RecSys19][FiBiNET: Combining Feature Importance and Bilinear feature Interaction for Click-Through Rate Prediction]( https://arxiv.org/pdf/1905.09433.pdf) |
| Rank | [Flen](models/rank/flen/model.py) | ✓ | ✓ | ✓ | ✓ | [2019][FLEN: Leveraging Field for Scalable CTR Prediction]( https://arxiv.org/pdf/1911.04690.pdf) |
| Multi-Task | [ESMM](models/multitask/esmm/model.py) | ✓ | ✓ | ✓ | ✓ | [SIGIR 2018][Entire Space Multi-Task Model: An Effective Approach for Estimating Post-Click Conversion Rate](https://arxiv.org/abs/1804.07931) |
| Multi-Task | [MMOE](models/multitask/mmoe/model.py) | ✓ | ✓ | ✓ | ✓ | [KDD 2018][Modeling Task Relationships in Multi-task Learning with Multi-gate Mixture-of-Experts](https://dl.acm.org/doi/abs/10.1145/3219819.3220007) |
| Multi-Task | [ShareBottom](models/multitask/share-bottom/model.py) | ✓ | ✓ | ✓ | ✓ | [1998][Multitask learning](http://reports-archive.adm.cs.cmu.edu/anon/1997/CMU-CS-97-203.pdf) |
| Re-Rank | [Listwise](models/rerank/listwise/model.py) | ✓ | ✓ | ✓ | x | [2019][Sequential Evaluation and Generation Framework for Combinatorial Recommender System](https://arxiv.org/pdf/1902.00245.pdf) |
<h2 align="center">Getting Started</h2>
### Environmental requirements
* Python 2.7/ 3.5 / 3.6 / 3.7
* PaddlePaddle >= 1.7.2
* operating system: Windows/Mac/Linux
> Linux is recommended for distributed training
### Installation
1. **Install by pip**
```bash
python -m pip install paddle-rec
```
> This method will download and install `paddlepaddle-v1.7.2-cpu`. If `PaddlePaddle` can not be installed automatically,You need to install `PaddlePaddle` manually,and then install `PaddleRec` again:
> - Download [PaddlePaddle](https://pypi.org/project/paddlepaddle/1.7.2/#files) and install by pip.
> - Install `PaddlePaddle` by pip,`python -m pip install paddlepaddle==1.7.2 -i https://mirror.baidu.com/pypi/simple`
> - Other installation problems can be raised in [Paddle Issue](https://github.com/PaddlePaddle/Paddle/issues) or [PaddleRec Issue](https://github.com/PaddlePaddle/PaddleRec/issues)
2. **Install by source code**
- Install PaddlePaddle
```shell
python -m pip install paddlepaddle==1.7.2 -i https://mirror.baidu.com/pypi/simple
```
- Install PaddleRec by source code
```
git clone https://github.com/PaddlePaddle/PaddleRec/
cd PaddleRec
python setup.py install
```
- Install PaddleRec-GPU
After installing `PaddleRec`,please install the appropriate version of `paddlepaddle-gpu` according to your environment (CUDA / cudnn),refer to the installation tutorial [Installation Manuals](https://www.paddlepaddle.org.cn/documentation/docs/en/install/index_en.html)
<h2 align="center">Quick Start</h2>
We take the `dnn` algorithm as an example to get start of `PaddleRec`, and we take 100 pieces of training data from [Criteo Dataset](https://www.kaggle.com/c/criteo-display-ad-challenge/):
```bash
# Training with cpu
python -m paddlerec.run -m paddlerec.models.rank.dnn
```
<h2 align="center">Documentation</h2>
### Background
* [Recommendation System](doc/rec_background.md)
* [Distributed deep learning](doc/ps_background.md)
### Introductory Project
* [Get start of PaddleRec in ten minutes](https://aistudio.baidu.com/aistudio/projectdetail/559336)
### Introductory tutorial
* [Data](doc/slot_reader.md)
* [Model](doc/model.md)
* [Loacl Train](doc/train.md)
* [Distributed Train](doc/distributed_train.md)
* [Predict](doc/predict.md)
* [Serving](doc/serving.md)
### Advanced tutorial
* [Custom Reader](doc/custom_reader.md)
* [Custom Model](doc/model_develop.md)
* [Custom Training Process](doc/trainer_develop.md)
* [Configuration description of yaml](doc/yaml.md)
* [Design document of PaddleRec](doc/design.md)
### Benchmark
* [Benchmark](doc/benchmark.md)
### FAQ
* [Common Problem FAQ](doc/faq.md)
<h2 align="center">Community</h2>
<p align="center">
<br>
<img alt="Release" src="https://img.shields.io/badge/Release-0.1.0-yellowgreen">
<img alt="License" src="https://img.shields.io/github/license/PaddlePaddle/PaddleRec">
<img alt="Slack" src="https://img.shields.io/badge/Join-Slack-green">
<br>
<p>
### Version history
- 2020.06.17 - PaddleRec v0.1.0
- 2020.06.03 - PaddleRec v0.0.2
- 2020.05.14 - PaddleRec v0.0.1
### License
[Apache 2.0 license](LICENSE)
### Contact us
For any feedback, please propose a [GitHub Issue](https://github.com/PaddlePaddle/PaddleRec/issues)
You can also communicate with us in the following ways:
- QQ group id:`861717190`
- Wechat account:`paddlerec2020`
<p align="center"><img width="200" height="200" margin="500" src="./doc/imgs/QQ_group.png"/>&#8194;&#8194;&#8194;&#8194;&#8194<img width="200" height="200" src="doc/imgs/weixin_supporter.png"/></p>
<p align="center">PaddleRec QQ Group&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;&#8194;PaddleRec Wechat account</p>
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
echo "Run before_hook.sh ..."
wget https://paddlerec.bj.bcebos.com/whl/PaddleRec.tar.gz --no-check-certificate
tar -xf PaddleRec.tar.gz
cd PaddleRec
python setup.py install
pip uninstall -y paddlepaddle
pip install paddlepaddle==<$ PADDLEPADDLE_VERSION $> --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com
echo "End before_hook.sh ..."
echo "Run before_hook.sh ..."
wget https://paddlerec.bj.bcebos.com/whl/PaddleRec.tar.gz --no-check-certificate
tar -xf PaddleRec.tar.gz
cd PaddleRec
python setup.py install
pip uninstall -y paddlepaddle
pip install paddlepaddle-gpu==<$ PADDLEPADDLE_VERSION $>.post107 --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com
echo "End before_hook.sh ..."
......@@ -16,23 +16,13 @@
###################################################
# Usage: submit.sh
# Description: run mpi submit client implement
# Description: run paddlecloud submit client implement
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
#-----------------------------------------------------------------------------------------------------------------
#fun : package
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function package_hook() {
g_run_stage="package"
package
}
#-----------------------------------------------------------------------------------------------------------------
#fun : before hook submit to cluster
#param : N/A
......@@ -40,17 +30,130 @@ function package_hook() {
#-----------------------------------------------------------------------------------------------------------------
function _before_submit() {
echo "before_submit"
before_submit_hook
if [ ${DISTRIBUTE_MODE} == "PS_CPU_MPI" ]; then
_gen_cpu_before_hook
_gen_mpi_config
_gen_mpi_job
_gen_end_hook
elif [ ${DISTRIBUTE_MODE} == "COLLECTIVE_GPU_K8S" ]; then
_gen_gpu_before_hook
_gen_k8s_config
_gen_k8s_gpu_job
_gen_end_hook
elif [ ${DISTRIBUTE_MODE} == "PS_CPU_K8S" ]; then
_gen_cpu_before_hook
_gen_k8s_config
_gen_k8s_cpu_job
_gen_end_hook
fi
}
function _gen_mpi_config() {
echo "gen mpi_config.ini"
sed -e "s#<$ FS_NAME $>#$FS_NAME#g" \
-e "s#<$ FS_UGI $>#$FS_UGI#g" \
-e "s#<$ TRAIN_DATA_PATH $>#$TRAIN_DATA_PATH#g" \
-e "s#<$ TEST_DATA_PATH $>#$TEST_DATA_PATH#g" \
-e "s#<$ OUTPUT_PATH $>#$OUTPUT_PATH#g" \
-e "s#<$ THIRDPARTY_PATH $>#$THIRDPARTY_PATH#g" \
-e "s#<$ CPU_NUM $>#$max_thread_num#g" \
-e "s#<$ USE_PYTHON3 $>#$USE_PYTHON3#g" \
-e "s#<$ FLAGS_communicator_is_sgd_optimizer $>#$FLAGS_communicator_is_sgd_optimizer#g" \
-e "s#<$ FLAGS_communicator_send_queue_size $>#$FLAGS_communicator_send_queue_size#g" \
-e "s#<$ FLAGS_communicator_thread_pool_size $>#$FLAGS_communicator_thread_pool_size#g" \
-e "s#<$ FLAGS_communicator_max_merge_var_num $>#$FLAGS_communicator_max_merge_var_num#g" \
-e "s#<$ FLAGS_communicator_max_send_grad_num_before_recv $>#$FLAGS_communicator_max_send_grad_num_before_recv#g" \
-e "s#<$ FLAGS_communicator_fake_rpc $>#$FLAGS_communicator_fake_rpc#g" \
-e "s#<$ FLAGS_rpc_retry_times $>#$FLAGS_rpc_retry_times#g" \
${abs_dir}/cloud/mpi_config.ini.template >${PWD}/config.ini
}
function _gen_k8s_config() {
echo "gen k8s_config.ini"
sed -e "s#<$ FS_NAME $>#$FS_NAME#g" \
-e "s#<$ FS_UGI $>#$FS_UGI#g" \
-e "s#<$ AFS_REMOTE_MOUNT_POINT $>#$AFS_REMOTE_MOUNT_POINT#g" \
-e "s#<$ OUTPUT_PATH $>#$OUTPUT_PATH#g" \
-e "s#<$ CPU_NUM $>#$max_thread_num#g" \
-e "s#<$ USE_PYTHON3 $>#$USE_PYTHON3#g" \
-e "s#<$ FLAGS_communicator_is_sgd_optimizer $>#$FLAGS_communicator_is_sgd_optimizer#g" \
-e "s#<$ FLAGS_communicator_send_queue_size $>#$FLAGS_communicator_send_queue_size#g" \
-e "s#<$ FLAGS_communicator_thread_pool_size $>#$FLAGS_communicator_thread_pool_size#g" \
-e "s#<$ FLAGS_communicator_max_merge_var_num $>#$FLAGS_communicator_max_merge_var_num#g" \
-e "s#<$ FLAGS_communicator_max_send_grad_num_before_recv $>#$FLAGS_communicator_max_send_grad_num_before_recv#g" \
-e "s#<$ FLAGS_communicator_fake_rpc $>#$FLAGS_communicator_fake_rpc#g" \
-e "s#<$ FLAGS_rpc_retry_times $>#$FLAGS_rpc_retry_times#g" \
${abs_dir}/cloud/k8s_config.ini.template >${PWD}/config.ini
}
function _gen_cpu_before_hook() {
echo "gen cpu before_hook.sh"
sed -e "s#<$ PADDLEPADDLE_VERSION $>#$PADDLE_VERSION#g" \
${abs_dir}/cloud/before_hook_cpu.sh.template >${PWD}/before_hook.sh
}
function _gen_gpu_before_hook() {
echo "gen gpu before_hook.sh"
sed -e "s#<$ PADDLEPADDLE_VERSION $>#$PADDLE_VERSION#g" \
${abs_dir}/cloud/before_hook_gpu.sh.template >${PWD}/before_hook.sh
}
function _gen_end_hook() {
echo "gen end_hook.sh"
cp ${abs_dir}/cloud/end_hook.sh.template ${PWD}/end_hook.sh
}
function _gen_mpi_job() {
echo "gen mpi_job.sh"
sed -e "s#<$ GROUP_NAME $>#$GROUP_NAME#g" \
-e "s#<$ JOB_NAME $>#$OLD_JOB_NAME#g" \
-e "s#<$ AK $>#$AK#g" \
-e "s#<$ SK $>#$SK#g" \
-e "s#<$ MPI_PRIORITY $>#$PRIORITY#g" \
-e "s#<$ MPI_NODES $>#$MPI_NODES#g" \
-e "s#<$ START_CMD $>#$START_CMD#g" \
${abs_dir}/cloud/mpi_job.sh.template >${PWD}/job.sh
}
function _gen_k8s_gpu_job() {
echo "gen k8s_job.sh"
sed -e "s#<$ GROUP_NAME $>#$GROUP_NAME#g" \
-e "s#<$ JOB_NAME $>#$OLD_JOB_NAME#g" \
-e "s#<$ AK $>#$AK#g" \
-e "s#<$ SK $>#$SK#g" \
-e "s#<$ K8S_PRIORITY $>#$PRIORITY#g" \
-e "s#<$ K8S_TRAINERS $>#$K8S_TRAINERS#g" \
-e "s#<$ K8S_CPU_CORES $>#$K8S_CPU_CORES#g" \
-e "s#<$ K8S_GPU_CARD $>#$K8S_GPU_CARD#g" \
-e "s#<$ START_CMD $>#$START_CMD#g" \
${abs_dir}/cloud/k8s_job.sh.template >${PWD}/job.sh
}
function _gen_k8s_cpu_job() {
echo "gen k8s_job.sh"
sed -e "s#<$ GROUP_NAME $>#$GROUP_NAME#g" \
-e "s#<$ JOB_NAME $>#$OLD_JOB_NAME#g" \
-e "s#<$ AK $>#$AK#g" \
-e "s#<$ SK $>#$SK#g" \
-e "s#<$ K8S_PRIORITY $>#$PRIORITY#g" \
-e "s#<$ K8S_TRAINERS $>#$K8S_TRAINERS#g" \
-e "s#<$ K8S_PS_NUM $>#$K8S_PS_NUM#g" \
-e "s#<$ K8S_PS_CORES $>#$K8S_PS_CORES#g" \
-e "s#<$ K8S_CPU_CORES $>#$K8S_CPU_CORES#g" \
-e "s#<$ START_CMD $>#$START_CMD#g" \
${abs_dir}/cloud/k8s_cpu_job.sh.template >${PWD}/job.sh
}
#-----------------------------------------------------------------------------------------------------------------
#fun : after hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function _after_submit() {
echo "after_submit"
after_submit_hook
echo "end submit"
}
#-----------------------------------------------------------------------------------------------------------------
......@@ -60,23 +163,19 @@ function _after_submit() {
#-----------------------------------------------------------------------------------------------------------------
function _submit() {
g_run_stage="submit"
sh job.sh
}
cd ${engine_temp_path}
paddlecloud job --ak ${engine_submit_ak} --sk ${engine_submit_sk} train --cluster-name ${engine_submit_cluster} \
--job-version ${engine_submit_version} \
--mpi-priority ${engine_submit_priority} \
--mpi-wall-time 300:59:00 \
--mpi-nodes ${engine_submit_nodes} --is-standalone 0 \
--mpi-memory 110Gi \
--job-name ${engine_submit_jobname} \
--start-cmd "${g_run_cmd}" \
--group-name ${engine_submit_group} \
--job-conf ${engine_submit_config} \
--files ${g_submitfiles} \
--json
cd -
function package_hook() {
cur_time=`date +"%Y%m%d%H%M"`
new_job_name="${JOB_NAME}_${cur_time}"
export OLD_JOB_NAME=${JOB_NAME}
export JOB_NAME=${new_job_name}
export job_file_path="${PWD}/${new_job_name}"
mkdir ${job_file_path}
cp $FILES ${job_file_path}/
cd ${job_file_path}
echo "The task submission folder is generated at ${job_file_path}"
}
function submit_hook() {
......@@ -86,8 +185,6 @@ function submit_hook() {
}
function main() {
source ${engine_submit_scrpit}
package_hook
submit_hook
}
......
echo "Run before_hook.sh ..."
\ No newline at end of file
# 必须涵盖的参数
fs_name=<$ FS_NAME $>
fs_ugi=<$ FS_UGI $>
# 模型输出目录
output_path=<$ OUTPUT_PATH $>
# ===================
# 以下是新增参数
# ===================
# 挂载 afs 的开关
mount_afs="true"
# afs 路径的远端挂载点
AFS_REMOTE_MOUNT_POINT=<$ AFS_REMOTE_MOUNT_POINT $>
# 作业运行环境的本地挂载点,/root/paddlejob/workspace/env_run/是一个固定路径,是平台运行时workspace的路径
afs_local_mount_point="/root/paddlejob/workspace/env_run/afs/"
# 可以访问运行时默认文件夹下的 ./afs/ 目录拿到挂载目录的文件
# 新k8s afs挂载帮助文档: http://wiki.baidu.com/pages/viewpage.action?pageId=906443193
PADDLE_PADDLEREC_ROLE=WORKER
use_python3=<$ USE_PYTHON3 $>
CPU_NUM=<$ CPU_NUM $>
GLOG_v=0
FLAGS_communicator_is_sgd_optimizer=<$ FLAGS_communicator_is_sgd_optimizer $>
FLAGS_communicator_send_queue_size=<$ FLAGS_communicator_send_queue_size $>
FLAGS_communicator_thread_pool_size=<$ FLAGS_communicator_thread_pool_size $>
FLAGS_communicator_max_merge_var_num=<$ FLAGS_communicator_max_merge_var_num $>
FLAGS_communicator_max_send_grad_num_before_recv=<$ FLAGS_communicator_max_send_grad_num_before_recv $>
FLAGS_communicator_fake_rpc=<$ FLAGS_communicator_fake_rpc $>
FLAGS_rpc_retry_times=<$ FLAGS_rpc_retry_times $>
\ No newline at end of file
#!/bin/bash
###############################################################
## 注意-- 注意--注意 ##
## K8S PS-CPU多机作业作业示例 ##
###############################################################
job_name=<$ JOB_NAME $>
# 作业参数
group_name="<$ GROUP_NAME $>"
job_version="paddle-fluid-v1.7.1"
start_cmd="<$ START_CMD $>"
wall_time="2000:00:00"
k8s_priority=<$ K8S_PRIORITY $>
k8s_trainers=<$ K8S_TRAINERS $>
k8s_cpu_cores=<$ K8S_CPU_CORES $>
k8s_ps_num=<$ K8S_PS_NUM $>
k8s_ps_cores=<$ K8S_PS_CORES $>
# 你的ak/sk(可在paddlecloud web页面【个人中心】处获取)
ak=<$ AK $>
sk=<$ SK $>
paddlecloud job --ak ${ak} --sk ${sk} \
train --job-name ${job_name} \
--group-name ${group_name} \
--job-conf config.ini \
--start-cmd "${start_cmd}" \
--files ./* \
--job-version ${job_version} \
--k8s-priority ${k8s_priority} \
--wall-time ${wall_time} \
--k8s-trainers ${k8s_trainers} \
--k8s-cpu-cores ${k8s_cpu_cores} \
--k8s-ps-num ${k8s_ps_num} \
--k8s-ps-cores ${k8s_ps_cores} \
--is-standalone 0 \
--distribute-job-type "PSERVER" \
--json
\ No newline at end of file
#!/bin/bash
###############################################################
## 注意-- 注意--注意 ##
## K8S NCCL2多机作业作业示例 ##
###############################################################
job_name=<$ JOB_NAME $>
# 作业参数
group_name="<$ GROUP_NAME $>"
job_version="paddle-fluid-v1.7.1"
start_cmd="<$ START_CMD $>"
wall_time="2000:00:00"
k8s_priority=<$ K8S_PRIORITY $>
k8s_trainers=<$ K8S_TRAINERS $>
k8s_cpu_cores=<$ K8S_CPU_CORES $>
k8s_gpu_cards=<$ K8S_GPU_CARD $>
is_stand_alone=0
nccl="--distribute-job-type "NCCL2""
if [ ${k8s_trainers} == 1 ];then
is_stand_alone=1
nccl="--job-remark single-trainer"
if [ ${k8s_gpu_cards} == 1];then
nccl="--job-remark single-gpu"
echo "Attention: Use single GPU card for PaddleRec distributed training, please set runner class from 'cluster_train' to 'train' in config.yaml."
fi
fi
# 你的ak/sk(可在paddlecloud web页面【个人中心】处获取)
ak=<$ AK $>
sk=<$ SK $>
paddlecloud job --ak ${ak} --sk ${sk} \
train --job-name ${job_name} \
--group-name ${group_name} \
--job-conf config.ini \
--start-cmd "${start_cmd}" \
--files ./* \
--job-version ${job_version} \
--k8s-trainers ${k8s_trainers} \
--k8s-cpu-cores ${k8s_cpu_cores} \
--k8s-gpu-cards ${k8s_gpu_cards} \
--k8s-priority ${k8s_priority} \
--wall-time ${wall_time} \
--is-standalone ${is_stand_alone} \
--json \
${nccl}
\ No newline at end of file
#type of storage cluster
storage_type="hdfs"
#attention: files for training should be put on hdfs
force_reuse_output_path="True"
# 可以替换成自己的hdfs集群
fs_name=<$ FS_NAME $>
fs_ugi=<$ FS_UGI $>
FLAGS_rpc_deadline=300000
##train data path on hdfs
train_data_path=<$ TRAIN_DATA_PATH $>
test_data_path=<$ TEST_DATA_PATH $>
output_path=<$ OUTPUT_PATH $>
thirdparty_path=<$ THIRDPARTY_PATH $>
PADDLE_PADDLEREC_ROLE=WORKER
use_python3=<$ USE_PYTHON3 $>
CPU_NUM=<$ CPU_NUM $>
GLOG_v=0
FLAGS_communicator_is_sgd_optimizer=<$ FLAGS_communicator_is_sgd_optimizer $>
FLAGS_communicator_send_queue_size=<$ FLAGS_communicator_send_queue_size $>
FLAGS_communicator_thread_pool_size=<$ FLAGS_communicator_thread_pool_size $>
FLAGS_communicator_max_merge_var_num=<$ FLAGS_communicator_max_merge_var_num $>
FLAGS_communicator_max_send_grad_num_before_recv=<$ FLAGS_communicator_max_send_grad_num_before_recv $>
FLAGS_communicator_fake_rpc=<$ FLAGS_communicator_fake_rpc $>
FLAGS_rpc_retry_times=<$ FLAGS_rpc_retry_times $>
#!/bin/bash
###############################################################
## 注意--注意--注意 ##
## MPI 类型作业演示 ##
###############################################################
job_name=<$ JOB_NAME $>
# 作业参数
group_name=<$ GROUP_NAME $>
job_version="paddle-fluid-v1.7.1"
start_cmd="<$ START_CMD $>"
wall_time="2000:00:00"
# 你的ak/sk(可在paddlecloud web页面【个人中心】处获取)
ak=<$ AK $>
sk=<$ SK $>
paddlecloud job --ak ${ak} --sk ${sk} \
train \
--job-name ${job_name} \
--mpi-priority <$ MPI_PRIORITY $> \
--group-name ${group_name} \
--mpi-wall-time ${wall_time} \
--mpi-nodes <$ MPI_NODES $> \
--is-standalone 0 \
--permission group \
--job-version ${job_version} \
--job-conf config.ini \
--start-cmd "${start_cmd}" \
--files ./* \
--json
......@@ -18,6 +18,7 @@ from __future__ import unicode_literals
import copy
import os
import subprocess
import warnings
from paddlerec.core.engine.engine import Engine
from paddlerec.core.factory import TrainerFactory
......@@ -26,18 +27,35 @@ from paddlerec.core.utils import envs
class ClusterEngine(Engine):
def __init_impl__(self):
self.role = envs.get_runtime_environ("engine_role")
if self.role == "WORKER":
return
abs_dir = os.path.dirname(os.path.abspath(__file__))
backend = envs.get_runtime_environ("engine_backend")
if backend == "PaddleCloud":
os.environ["abs_dir"] = str(abs_dir)
self.backend = envs.get_runtime_environ("backend")
if not self.backend:
self.backend = ""
self.backend = self.backend.upper()
if self.backend == "PADDLECLOUD":
self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh")
elif self.backend == "KUBERNETES":
self.submit_script = os.path.join(abs_dir, "k8s/cluster.sh")
else:
raise ValueError("{} can not be supported now".format(backend))
raise ValueError("{} can not be supported now".format(
self.backend))
def start_worker_procs(self):
trainer = TrainerFactory.create(self.trainer)
trainer.run()
def start_master_procs(self):
if self.backend == "PADDLECLOUD":
self.paddlecloud_env_check()
elif self.backend == "KUBERNETES":
self.kubernetes_env_check()
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
......@@ -47,14 +65,254 @@ class ClusterEngine(Engine):
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
proc.wait()
def run(self):
role = envs.get_runtime_environ("engine_role")
@staticmethod
def workspace_replace():
remote_workspace = envs.get_runtime_environ("remote_workspace")
for k, v in os.environ.items():
v = v.replace("{workspace}", remote_workspace)
os.environ[k] = str(v)
if role == "MASTER":
def run(self):
if self.role == "MASTER":
self.start_master_procs()
elif role == "WORKER":
elif self.role == "WORKER":
self.start_worker_procs()
else:
raise ValueError("role {} error, must in MASTER/WORKER".format(role))
raise ValueError("role {} error, must in MASTER/WORKER".format(
self.role))
def paddlecloud_env_check(self):
# get fleet mode
fleet_mode = envs.get_runtime_environ("fleet_mode")
# get device
device = envs.get_runtime_environ("device")
# get cluster type
cluster_type = envs.get_runtime_environ("cluster_type")
cluster_env_check_tool = None
if cluster_type.upper() == "MPI":
if device == "CPU" and fleet_mode == "PS":
cluster_env_check_tool = PaddleCloudMpiEnv()
else:
raise ValueError(
"Paddlecloud with Mpi don't support GPU training, check your config.yaml & backend.yaml"
)
elif cluster_type.upper() == "K8S":
if fleet_mode == "PS":
if device == "CPU":
cluster_env_check_tool = CloudPsCpuEnv()
elif device == "GPU":
raise ValueError(
"PS-GPU on paddlecloud is not supported at this time, comming soon"
)
if fleet_mode == "COLLECTIVE":
if device == "GPU":
cluster_env_check_tool = CloudCollectiveEnv()
elif device == "CPU":
raise ValueError(
"Unexpected config -> device: CPU with fleet_mode: Collective, check your config.yaml"
)
else:
raise ValueError("cluster_type {} error, must in MPI/K8S".format(
cluster_type))
cluster_env_check_tool.env_check()
cluster_env_check_tool.env_set()
def kubernetes_env_check(self):
pass
class ClusterEnvBase(object):
def __init__(self):
# get backend env
backend_yaml = envs.get_runtime_environ("backend_yaml")
_env = envs.load_yaml(backend_yaml)
self.backend_env = envs.flatten_environs(_env, ".")
self.cluster_env = {}
def env_check(self):
# check common env
# fs_name & fs_ugi
self.cluster_env["FS_NAME"] = self.backend_env.get("config.fs_name",
"")
self.cluster_env["FS_UGI"] = self.backend_env.get("config.fs_ugi", "")
if self.cluster_env["FS_NAME"] == "" or self.cluster_env[
"FS_UGI"] == "":
raise ValueError(
"No -- FS_UGI or FS_NAME -- found in your backend.yaml, please check."
)
# output_path
self.cluster_env["OUTPUT_PATH"] = self.backend_env.get(
"config.output_path", "")
if self.cluster_env["OUTPUT_PATH"] == "":
warnings.warn(
"Job output_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# paddle_version
self.cluster_env["PADDLE_VERSION"] = self.backend_env.get(
"config.paddle_version", "1.7.2")
# python_version
self.cluster_env["USE_PYTHON3"] = self.backend_env.get(
"config.use_python3", "0")
# communicator
max_thread_num = int(envs.get_runtime_environ("max_thread_num"))
self.cluster_env[
"FLAGS_communicator_is_sgd_optimizer"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_is_sgd_optimizer", 0)
self.cluster_env[
"FLAGS_communicator_send_queue_size"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_send_queue_size",
max_thread_num)
self.cluster_env[
"FLAGS_communicator_thread_pool_size"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_thread_pool_size", 32)
self.cluster_env[
"FLAGS_communicator_max_merge_var_num"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_max_merge_var_num",
max_thread_num)
self.cluster_env[
"FLAGS_communicator_max_send_grad_num_before_recv"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_max_send_grad_num_before_recv",
max_thread_num)
self.cluster_env["FLAGS_communicator_fake_rpc"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_fake_rpc", 0)
self.cluster_env["FLAGS_rpc_retry_times"] = self.backend_env.get(
"config.communicator.FLAGS_rpc_retry_times", 3)
# ak & sk
self.cluster_env["AK"] = self.backend_env.get("submit.ak", "")
self.cluster_env["SK"] = self.backend_env.get("submit.sk", "")
if self.cluster_env["AK"] == "" or self.cluster_env["SK"] == "":
raise ValueError(
"No -- AK or SK -- found in your backend.yaml, please check.")
# priority
self.cluster_env["PRIORITY"] = self.backend_env.get("submit.priority",
"high")
# job name
self.cluster_env["JOB_NAME"] = self.backend_env.get(
"submit.job_name", "PaddleRecClusterJob")
# group
self.cluster_env["GROUP_NAME"] = self.backend_env.get("submit.group",
"paddle")
# start_cmd
self.cluster_env["START_CMD"] = self.backend_env.get(
"submit.start_cmd", "python -m paddlerec.run -m config.yaml")
# files
self.cluster_env["FILES"] = self.backend_env.get("submit.files", "")
if self.cluster_env["FILES"] == "":
raise ValueError(
"No -- files -- found in your backend.yaml, please check.")
def env_set(self):
envs.set_runtime_environs(self.cluster_env)
flattens = envs.flatten_environs(self.cluster_env)
print(envs.pretty_print_envs(flattens, ("Cluster Envs", "Value")))
class PaddleCloudMpiEnv(ClusterEnvBase):
def __init__(self):
super(PaddleCloudMpiEnv, self).__init__()
def env_check(self):
super(PaddleCloudMpiEnv, self).env_check()
# check mpi env
self.cluster_env["DISTRIBUTE_MODE"] = "PS_CPU_MPI"
# train_data_path
self.cluster_env["TRAIN_DATA_PATH"] = self.backend_env.get(
"config.train_data_path", "")
if self.cluster_env["TRAIN_DATA_PATH"] == "":
raise ValueError(
"No -- TRAIN_DATA_PATH -- found in your backend.yaml, please add train_data_path in your backend yaml."
)
# test_data_path
self.cluster_env["TEST_DATA_PATH"] = self.backend_env.get(
"config.test_data_path", "")
if self.cluster_env["TEST_DATA_PATH"] == "":
warnings.warn(
"Job test_data_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# thirdparty_path
self.cluster_env["THIRDPARTY_PATH"] = self.backend_env.get(
"config.thirdparty_path", "")
if self.cluster_env["THIRDPARTY_PATH"] == "":
warnings.warn(
"Job thirdparty_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# nodes
self.cluster_env["MPI_NODES"] = self.backend_env.get("submit.nodes", 1)
class PaddleCloudK8sEnv(ClusterEnvBase):
def __init__(self):
super(PaddleCloudK8sEnv, self).__init__()
def env_check(self):
super(PaddleCloudK8sEnv, self).env_check()
# check afs_remote_mount_point
self.cluster_env["AFS_REMOTE_MOUNT_POINT"] = self.backend_env.get(
"config.afs_remote_mount_point", "")
if self.cluster_env["AFS_REMOTE_MOUNT_POINT"] == "":
warnings.warn(
"Job afs_remote_mount_point not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
warnings.warn(
"The remote afs path will be mounted to the ./afs/",
category=UserWarning,
stacklevel=2)
class CloudCollectiveEnv(PaddleCloudK8sEnv):
def __init__(self):
super(CloudCollectiveEnv, self).__init__()
def env_check(self):
super(CloudCollectiveEnv, self).env_check()
self.cluster_env["DISTRIBUTE_MODE"] = "COLLECTIVE_GPU_K8S"
self.cluster_env["K8S_TRAINERS"] = self.backend_env.get(
"submit.k8s_trainers", 1)
self.cluster_env["K8S_GPU_CARD"] = self.backend_env.get(
"submit.k8s_gpu_card", 1)
self.cluster_env["K8S_CPU_CORES"] = self.backend_env.get(
"submit.k8s_cpu_cores", 1)
class CloudPsCpuEnv(PaddleCloudK8sEnv):
def __init__(self):
super(CloudPsCpuEnv, self).__init__()
def env_check(self):
super(CloudPsCpuEnv, self).env_check()
self.cluster_env["DISTRIBUTE_MODE"] = "PS_CPU_K8S"
self.cluster_env["K8S_TRAINERS"] = self.backend_env.get(
"submit.k8s_trainers", 1)
self.cluster_env["K8S_CPU_CORES"] = self.backend_env.get(
"submit.k8s_cpu_cores", 2)
self.cluster_env["K8S_PS_NUM"] = self.backend_env.get(
"submit.k8s_ps_num", 1)
self.cluster_env["K8S_PS_CORES"] = self.backend_env.get(
"submit.k8s_ps_cores", 2)
#! /bin/bash
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
......@@ -13,23 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
echo "begin to download data"
cd raw_data && python download.py
mkdir diginetica
python preprocess.py --dataset diginetica
echo "begin to convert data (binary -> txt)"
python convert_data.py --data_dir diginetica
cat diginetica/train.txt | wc -l >> diginetica/config.txt
mkdir train_data
mv diginetica/train.txt train_data
mkdir test_data
mv diginetica/test.txt test_data
#!/bin/bash
###################################################
# Usage: submit.sh
# Description: run k8s submit client implement
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
#-----------------------------------------------------------------------------------------------------------------
#fun : create ConfigMap for k8s pod
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function create_config_map() {
echo "Create configmap"
echo "Delete exist configmap which named 'modelconfig'"
kubectl delete configmap modelconfig
kubectl create configmap modelconfig --from-file=${abs_dir}/k8s/set_k8s_env.sh,${paddlerec_model_config}
}
#-----------------------------------------------------------------------------------------------------------------
#fun : create yaml config for k8s job
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function create_k8s_yaml() {
echo "Create k8s.yaml"
if [ -f ${PWD}/k8s.yaml ]; then
echo "Delete exist k8s.yaml at ${PWD}"
rm ${PWD}/k8s.yaml
fi
let total_pod_num=${engine_submit_trainer_num}+${engine_submit_server_num}
echo "--K8S ENV-- Job name: ${engine_job_name}"
echo "--K8S ENV-- Total pod nums: ${total_pod_num}"
echo "--K8S ENV-- Trainer nums: ${engine_submit_trainer_num}"
echo "--K8S ENV-- Pserver nums: ${engine_submit_server_num}"
echo "--K8S ENV-- Docker image: ${engine_submit_docker_image}"
echo "--K8S ENV-- Threads(cpu_num) ${CPU_NUM}"
echo "--K8S ENV-- Memory ${engine_submit_memory}"
echo "--K8S ENV-- Storage ${engine_submit_storage}"
echo "--K8S ENV-- Log level ${engine_submit_log_level}"
sed -e "s#<$ JOB_NAME $>#$engine_job_name#g" \
-e "s#<$ TOTAL_POD_NUM $>#$total_pod_num#g" \
-e "s#<$ TRAINER_NUM $>#$engine_submit_trainer_num#g" \
-e "s#<$ PSERVER_NUM $>#$engine_submit_server_num#g" \
-e "s#<$ IMAGE $>#$engine_submit_docker_image#g" \
-e "s#<$ CPU_NUM $>#$CPU_NUM#g" \
-e "s#<$ MEMORY $>#$engine_submit_memory#g" \
-e "s#<$ STORAGE $>#$engine_submit_storage#g" \
-e "s#<$ GLOG_V $>#$engine_submit_log_level#g" \
${abs_dir}/k8s/k8s.yaml.template >${PWD}/k8s.yaml
}
#-----------------------------------------------------------------------------------------------------------------
#fun : submit to k8s cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function submit() {
echo "Submit"
echo "Delete exist job which named ${engine_job_name}"
kubectl delete jobs.batch.volcano.sh $engine_job_name
kubectl apply -f ${PWD}/k8s.yaml
}
function main() {
create_config_map
create_k8s_yaml
submit
}
main
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: <$ JOB_NAME $>
spec:
minAvailable: <$ TOTAL_POD_NUM $>
schedulerName: volcano
policies:
- event: PodEvicted
action: RestartJob
- event: PodFailed
action: RestartJob
tasks:
- replicas: <$ PSERVER_NUM $>
name: pserver
template:
metadata:
labels:
paddle-job-pserver: paddle-rec
spec:
imagePullSecrets:
- name: default-secret
containers:
- image: <$ IMAGE $>
command:
- '/bin/bash'
args:
- "-c"
- |
set -ex
sh /usr/paddlerec/set_k8s_env.sh start_fluid
imagePullPolicy: Always
volumeMounts:
- name: model-config
mountPath: "/usr/paddlerec"
name: pserver
resources:
limits:
cpu: <$ CPU_NUM $>
memory: <$ MEMORY $>
ephemeral-storage: <$ STORAGE $>
requests:
cpu: 1
memory: 1Gi
ephemeral-storage: 1Gi
env:
- name: GLOG_v
value: "<$ GLOG_V $>"
- name: GLOG_logtostderr
value: "1"
- name: TOPOLOGY
value: ""
- name: TRAINER_PACKAGE
value: /root/paddlejob
- name: PADDLE_INIT_NICS
value: eth2
- name: NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: PADDLE_CURRENT_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: PADDLE_JOB_NAME
value: paddle-rec
- name: PADDLE_IS_LOCAL
value: "0"
- name: PADDLE_TRAINERS_NUM
value: "<$ TRAINER_NUM $>"
- name: PADDLE_PSERVERS_NUM
value: "<$ PSERVER_NUM $>"
- name: FLAGS_rpc_deadline
value: "100000"
- name: ENTRY
value: python -m paddlerec.run -m /usr/paddlerec/config.yaml -r WORKER
- name: PADDLE_PORT
value: "30240"
- name: PADDLE_TRAINING_ROLE
value: PSERVER
- name: TRAINING_ROLE
value: PSERVER
volumes:
- name: model-config
configMap:
name: modelconfig
defaultMode: 0777
restartPolicy: OnFailure
- replicas: <$ TRAINER_NUM $>
policies:
- event: TaskCompleted
action: CompleteJob
name: trainer
template:
metadata:
labels:
paddle-job: paddle-rec
spec:
imagePullSecrets:
- name: default-secret
containers:
- image: <$ IMAGE $>
command:
- '/bin/bash'
args:
- "-c"
- |
set -ex
sh /usr/paddlerec/set_k8s_env.sh start_fluid
imagePullPolicy: Always
volumeMounts:
- name: model-config
mountPath: "/usr/paddlerec"
name: trainer
resources:
limits:
cpu: <$ CPU_NUM $>
memory: <$ MEMORY $>
ephemeral-storage: <$ STORAGE $>
requests:
cpu: 1
memory: 1Gi
ephemeral-storage: 1Gi
env:
- name: GLOG_v
value: "<$ GLOG_V $>"
- name: GLOG_logtostderr
value: "1"
- name: TRAINER_PACKAGE
value: /root/paddlejob
- name: PADDLE_INIT_NICS
value: eth2
- name: NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: PADDLE_CURRENT_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: PADDLE_JOB_NAME
value: paddle-rec
- name: PADDLE_IS_LOCAL
value: "0"
- name: FLAGS_rpc_deadline
value: "3600"
- name: PADDLE_PORT
value: "30240"
- name: PADDLE_PSERVERS_NUM
value: "<$ PSERVER_NUM $>"
- name: PADDLE_TRAINERS_NUM
value: "<$ TRAINER_NUM $>"
- name: PADDLE_TRAINING_ROLE
value: TRAINER
- name: TRAINING_ROLE
value: TRAINER
- name: ENTRY
value: python -m paddlerec.run -m /usr/paddlerec/config.yaml -r WORKER
volumes:
- name: model-config
configMap:
name: modelconfig
defaultMode: 0777
restartPolicy: OnFailure
#!/bin/bash
set -x
check_failed_cnt() {
max_failed=$1
failed_count=$(python -m paddlerec.tools.k8s_tools count_pods_by_phase paddle-job=${PADDLE_JOB_NAME} Failed)
if [ $failed_count -gt $max_failed ]; then
stdbuf -oL echo "Failed trainer count beyond the threadhold: "$max_failed
echo "Failed trainer count beyond the threshold: " $max_failed >/dev/termination-log
exit 0
fi
}
check_trainer_ret() {
ret=$1
stdbuf -oL echo "job returned $ret...setting pod return message..."
stdbuf -oL echo "==============================="
if [ $ret -eq 136 ]; then
echo "Error Arithmetic Operation(Floating Point Exception)" >/dev/termination-log
elif [ $ret -eq 139 ]; then
echo "Segmentation Fault" >/dev/termination-log
elif [ $ret -eq 1 ]; then
echo "General Error" >/dev/termination-log
elif [ $ret -eq 134 ]; then
echo "Program Abort" >/dev/termination-log
fi
stdbuf -oL echo "termination log wroted..."
exit $ret
}
start_fluid_process() {
pserver_label="paddle-job-pserver=${PADDLE_JOB_NAME}"
trainer_label="paddle-job=${PADDLE_JOB_NAME}"
hostname=${HOSTNAME}
task_index=""
if [ "${PADDLE_TRAINING_ROLE}" == "TRAINER" ] || [ "${PADDLE_TRAINING_ROLE}" == "PSERVER" ]; then
stdbuf -oL python -m paddlerec.tools.k8s_tools wait_pods_running ${pserver_label} ${PADDLE_PSERVERS_NUM}
fi
export PADDLE_PSERVERS_IP_PORT_LIST=$(python -m paddlerec.tools.k8s_tools fetch_endpoints ${pserver_label} ${PADDLE_PORT})
export PADDLE_TRAINER_IPS=$(python -m paddlerec.tools.k8s_tools fetch_ips ${trainer_label})
if [ "${PADDLE_TRAINING_ROLE}" == "TRAINER" ]; then
check_failed_cnt 1
task_index=$(python -m paddlerec.tools.k8s_tools fetch_id ${trainer_label})
else
task_index=$(python -m paddlerec.tools.k8s_tools fetch_id ${pserver_label})
fi
export PADDLE_TRAINER_ID=${task_index}
export PADDLE_PSERVER_ID=${task_index}
stdbuf -oL sh -c "${ENTRY}"
check_trainer_ret $?
}
usage() {
echo "usage: paddle_k8s [<args>]:"
echo " start_fluid Start paddle fluid distributed training, set env"
}
case "$1" in
start_fluid)
start_fluid_process
;;
--help)
usage
;;
*)
usage
;;
esac
......@@ -26,6 +26,7 @@ from paddlerec.core.utils import envs
class LocalClusterEngine(Engine):
def start_procs(self):
fleet_mode = self.envs["fleet_mode"]
worker_num = self.envs["worker_num"]
server_num = self.envs["server_num"]
ports = [self.envs["start_port"]]
......@@ -39,51 +40,98 @@ class LocalClusterEngine(Engine):
procs = []
log_fns = []
for i in range(server_num - 1):
while True:
new_port = envs.find_free_port()
if new_port not in ports:
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
user_endpoints_ips = [x.split(":")[0]
for x in user_endpoints.split(",")]
user_endpoints_port = [x.split(":")[1]
for x in user_endpoints.split(",")]
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
if fleet_mode.upper() == "PS":
for i in range(server_num - 1):
while True:
new_port = envs.find_free_port()
if new_port not in ports:
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
for i in range(server_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_PORT": user_endpoints_port[i],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": user_endpoints_ips[i]
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/server.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
procs.append(proc)
for i in range(worker_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i)
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/worker.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
procs.append(proc)
user_endpoints_ips = [
x.split(":")[0] for x in user_endpoints.split(",")
]
user_endpoints_port = [
x.split(":")[1] for x in user_endpoints.split(",")
]
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
for i in range(server_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_PORT": user_endpoints_port[i],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": user_endpoints_ips[i]
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/server.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd,
env=current_env,
stdout=fn,
stderr=fn,
cwd=os.getcwd())
procs.append(proc)
for i in range(worker_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i)
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/worker.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd,
env=current_env,
stdout=fn,
stderr=fn,
cwd=os.getcwd())
procs.append(proc)
elif fleet_mode.upper() == "COLLECTIVE":
selected_gpus = self.envs["selected_gpus"].split(",")
selected_gpus_num = len(selected_gpus)
for i in range(selected_gpus_num - 1):
while True:
new_port = envs.find_free_port()
if new_port not in ports:
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
for i in range(selected_gpus_num):
current_env.update({
"PADDLE_TRAINER_ENDPOINTS": user_endpoints,
"PADDLE_CURRENT_ENDPOINTS": user_endpoints[i],
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i),
"FLAGS_selected_gpus": str(selected_gpus[i])
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/worker.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd,
env=current_env,
stdout=fn,
stderr=fn,
cwd=os.getcwd())
procs.append(proc)
# only wait worker to finish here
for i, proc in enumerate(procs):
......@@ -97,8 +145,10 @@ class LocalClusterEngine(Engine):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
print("all workers already completed, you can view logs under the `{}` directory".format(logs_dir),
file=sys.stderr)
print(
"all workers already completed, you can view logs under the `{}` directory".
format(logs_dir),
file=sys.stderr)
def run(self):
self.start_procs()
......@@ -26,7 +26,6 @@ from paddlerec.core.engine.engine import Engine
class LocalMPIEngine(Engine):
def start_procs(self):
logs_dir = self.envs["log_dir"]
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
......@@ -42,7 +41,8 @@ class LocalMPIEngine(Engine):
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/job.log" % logs_dir, "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
else:
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
procs.append(proc)
......@@ -51,7 +51,9 @@ class LocalMPIEngine(Engine):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].wait()
print("all workers and parameter servers already completed", file=sys.stderr)
print(
"all workers and parameter servers already completed",
file=sys.stderr)
def run(self):
self.start_procs()
......@@ -14,31 +14,35 @@
import os
import sys
import yaml
from paddlerec.core.utils import envs
trainer_abs = os.path.join(os.path.dirname(
os.path.abspath(__file__)), "trainers")
trainer_abs = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "trainers")
trainers = {}
def trainer_registry():
trainers["SingleTrainer"] = os.path.join(
trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(
trainer_abs, "cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(
trainer_abs, "ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(
trainer_abs, "ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(
trainer_abs, "tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(
trainer_abs, "tdm_cluster_trainer.py")
trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(trainer_abs,
"cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs,
"ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(trainer_abs,
"ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(trainer_abs,
"tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(trainer_abs,
"tdm_cluster_trainer.py")
trainers["OnlineLearningTrainer"] = os.path.join(
trainer_abs, "online_learning_trainer.py")
# Definition of procedure execution process
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs,
"ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(trainer_abs,
"ctr_modul_trainer.py")
trainers["GeneralTrainer"] = os.path.join(trainer_abs,
"general_trainer.py")
trainer_registry()
......@@ -56,8 +60,8 @@ class TrainerFactory(object):
if trainer_abs is None:
if not os.path.isfile(train_mode):
raise IOError(
"trainer {} can not be recognized".format(train_mode))
raise IOError("trainer {} can not be recognized".format(
train_mode))
trainer_abs = train_mode
train_mode = "UserDefineTrainer"
......@@ -67,16 +71,8 @@ class TrainerFactory(object):
@staticmethod
def create(config):
_config = None
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("paddlerec's config only support yaml")
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
trainer = TrainerFactory._build_trainer(config)
return trainer
......
......@@ -13,6 +13,8 @@
# limitations under the License.
import abc
import paddle.fluid as fluid
import numpy as np
class Metric(object):
......@@ -21,27 +23,58 @@ class Metric(object):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
""" """
"""R
"""
pass
@abc.abstractmethod
def clear(self, scope, params):
"""
clear current value
Args:
scope: value container
params: extend varilable for clear
def clear(self, scope=None):
"""R
"""
pass
if scope is None:
scope = fluid.global_scope()
@abc.abstractmethod
def calculate(self, scope, params):
place = fluid.CPUPlace()
for key in self._global_metric_state_vars:
varname, dtype = self._global_metric_state_vars[key]
var = scope.find_var(varname)
if not var:
continue
var = var.get_tensor()
data_array = np.zeros(var._get_dims()).astype(dtype)
var.set(data_array, place)
def _get_global_metric_state(self, fleet, scope, metric_name, mode="sum"):
"""R
"""
calculate result
Args:
scope: value container
params: extend varilable for clear
var = scope.find_var(metric_name)
if not var:
return None
input = np.array(var.get_tensor())
if fleet is None:
return input
fleet._role_maker._barrier_worker()
old_shape = np.array(input.shape)
input = input.reshape(-1)
output = np.copy(input) * 0
fleet._role_maker._all_reduce(input, output, mode=mode)
output = output.reshape(old_shape)
return output
def calc_global_metrics(self, fleet, scope=None):
"""R
"""
if scope is None:
scope = fluid.global_scope()
global_metrics = dict()
for key in self._global_metric_state_vars:
varname, dtype = self._global_metric_state_vars[key]
global_metrics[key] = self._get_global_metric_state(fleet, scope,
varname)
return self._calculate(global_metrics)
def _calculate(self, global_metrics):
pass
@abc.abstractmethod
......@@ -52,7 +85,6 @@ class Metric(object):
"""
pass
@abc.abstractmethod
def __str__(self):
"""
Return:
......
......@@ -11,3 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .recall_k import RecallK
from .pairwise_pn import PosNegRatio
from .precision_recall import PrecisionRecall
from .auc import AUC
__all__ = ['RecallK', 'PosNegRatio', 'AUC', 'PrecisionRecall']
......@@ -18,101 +18,60 @@ import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
from paddle.fluid.layers.tensor import Variable
class AUCMetric(Metric):
class AUC(Metric):
"""
Metric For Paddle Model
Metric For Fluid Model
"""
def __init__(self, config, fleet):
def __init__(self,
input,
label,
curve='ROC',
num_thresholds=2**12 - 1,
topk=1,
slide_steps=1):
""" """
self.config = config
self.fleet = fleet
def clear(self, scope, params):
"""
Clear current metric value, usually set to zero
Args:
scope : paddle runtime var container
params(dict) :
label : a group name for metric
metric_dict : current metric_items in group
Return:
None
"""
self._label = params['label']
self._metric_dict = params['metric_dict']
self._result = {}
place = fluid.CPUPlace()
for metric_name in self._metric_dict:
metric_config = self._metric_dict[metric_name]
if scope.find_var(metric_config['var'].name) is None:
continue
metric_var = scope.var(metric_config['var'].name).get_tensor()
data_type = 'float32'
if 'data_type' in metric_config:
data_type = metric_config['data_type']
data_array = np.zeros(metric_var._get_dims()).astype(data_type)
metric_var.set(data_array, place)
def get_metric(self, scope, metric_name):
"""
reduce metric named metric_name from all worker
Return:
metric reduce result
"""
metric = np.array(scope.find_var(metric_name).get_tensor())
old_metric_shape = np.array(metric.shape)
metric = metric.reshape(-1)
global_metric = np.copy(metric) * 0
self.fleet._role_maker._node_type_comm.Allreduce(metric, global_metric)
global_metric = global_metric.reshape(old_metric_shape)
return global_metric[0]
def get_global_metrics(self, scope, metric_dict):
"""
reduce all metric in metric_dict from all worker
Return:
dict : {matric_name : metric_result}
"""
self.fleet._role_maker._barrier_worker()
result = {}
for metric_name in metric_dict:
metric_item = metric_dict[metric_name]
if scope.find_var(metric_item['var'].name) is None:
result[metric_name] = None
continue
result[metric_name] = self.get_metric(scope, metric_item['var'].name)
return result
def calculate_auc(self, global_pos, global_neg):
"""R
"""
num_bucket = len(global_pos)
area = 0.0
pos = 0.0
neg = 0.0
new_pos = 0.0
new_neg = 0.0
total_ins_num = 0
for i in range(num_bucket):
index = num_bucket - 1 - i
new_pos = pos + global_pos[index]
total_ins_num += global_pos[index]
new_neg = neg + global_neg[index]
total_ins_num += global_neg[index]
area += (new_neg - neg) * (pos + new_pos) / 2
pos = new_pos
neg = new_neg
auc_value = None
if pos * neg == 0 or total_ins_num == 0:
auc_value = 0.5
else:
auc_value = area / (pos * neg)
return auc_value
def calculate_bucket_error(self, global_pos, global_neg):
if not isinstance(input, Variable):
raise ValueError("input must be Variable, but received %s" %
type(input))
if not isinstance(label, Variable):
raise ValueError("label must be Variable, but received %s" %
type(label))
auc_out, batch_auc_out, [
batch_stat_pos, batch_stat_neg, stat_pos, stat_neg
] = fluid.layers.auc(input,
label,
curve=curve,
num_thresholds=num_thresholds,
topk=topk,
slide_steps=slide_steps)
prob = fluid.layers.slice(input, axes=[1], starts=[1], ends=[2])
label_cast = fluid.layers.cast(label, dtype="float32")
label_cast.stop_gradient = True
sqrerr, abserr, prob, q, pos, total = \
fluid.contrib.layers.ctr_metric_bundle(prob, label_cast)
self._global_metric_state_vars = dict()
self._global_metric_state_vars['stat_pos'] = (stat_pos.name, "float32")
self._global_metric_state_vars['stat_neg'] = (stat_neg.name, "float32")
self._global_metric_state_vars['total_ins_num'] = (total.name,
"float32")
self._global_metric_state_vars['pos_ins_num'] = (pos.name, "float32")
self._global_metric_state_vars['q'] = (q.name, "float32")
self._global_metric_state_vars['prob'] = (prob.name, "float32")
self._global_metric_state_vars['abserr'] = (abserr.name, "float32")
self._global_metric_state_vars['sqrerr'] = (sqrerr.name, "float32")
self.metrics = dict()
self.metrics["AUC"] = auc_out
self.metrics["BATCH_AUC"] = batch_auc_out
def _calculate_bucket_error(self, global_pos, global_neg):
"""R
"""
num_bucket = len(global_pos)
......@@ -160,52 +119,69 @@ class AUCMetric(Metric):
bucket_error = error_sum / error_count if error_count > 0 else 0.0
return bucket_error
def calculate(self, scope, params):
""" """
self._label = params['label']
self._metric_dict = params['metric_dict']
self.fleet._role_maker._barrier_worker()
result = self.get_global_metrics(scope, self._metric_dict)
def _calculate_auc(self, global_pos, global_neg):
"""R
"""
num_bucket = len(global_pos)
area = 0.0
pos = 0.0
neg = 0.0
new_pos = 0.0
new_neg = 0.0
total_ins_num = 0
for i in range(num_bucket):
index = num_bucket - 1 - i
new_pos = pos + global_pos[index]
total_ins_num += global_pos[index]
new_neg = neg + global_neg[index]
total_ins_num += global_neg[index]
area += (new_neg - neg) * (pos + new_pos) / 2
pos = new_pos
neg = new_neg
auc_value = None
if pos * neg == 0 or total_ins_num == 0:
auc_value = 0.5
else:
auc_value = area / (pos * neg)
return auc_value
def _calculate(self, global_metrics):
result = dict()
for key in self._global_metric_state_vars:
if key not in global_metrics:
raise ValueError("%s not existed" % key)
result[key] = global_metrics[key][0]
if result['total_ins_num'] == 0:
self._result = result
self._result['auc'] = 0
self._result['bucket_error'] = 0
self._result['actual_ctr'] = 0
self._result['predict_ctr'] = 0
self._result['mae'] = 0
self._result['rmse'] = 0
self._result['copc'] = 0
self._result['mean_q'] = 0
return self._result
if 'stat_pos' in result and 'stat_neg' in result:
result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
if 'pos_ins_num' in result:
result['actual_ctr'] = result['pos_ins_num'] / result['total_ins_num']
if 'abserr' in result:
result['auc'] = 0
result['bucket_error'] = 0
result['actual_ctr'] = 0
result['predict_ctr'] = 0
result['mae'] = 0
result['rmse'] = 0
result['copc'] = 0
result['mean_q'] = 0
else:
result['auc'] = self._calculate_auc(result['stat_pos'],
result['stat_neg'])
result['bucket_error'] = self._calculate_bucket_error(
result['stat_pos'], result['stat_neg'])
result['actual_ctr'] = result['pos_ins_num'] / result[
'total_ins_num']
result['mae'] = result['abserr'] / result['total_ins_num']
if 'sqrerr' in result:
result['rmse'] = math.sqrt(result['sqrerr'] / result['total_ins_num'])
if 'prob' in result:
result['rmse'] = math.sqrt(result['sqrerr'] /
result['total_ins_num'])
result['predict_ctr'] = result['prob'] / result['total_ins_num']
if abs(result['predict_ctr']) > 1e-6:
result['copc'] = result['actual_ctr'] / result['predict_ctr']
if 'q' in result:
result['mean_q'] = result['q'] / result['total_ins_num']
self._result = result
return result
def get_result(self):
""" """
return self._result
def __str__(self):
""" """
result = self.get_result()
result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f " \
result_str = "AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f " \
"Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \
(self._label, result['auc'], result['bucket_error'], result['mae'], result['rmse'],
(result['auc'], result['bucket_error'], result['mae'], result['rmse'],
result['actual_ctr'],
result['predict_ctr'], result['copc'], result['mean_q'], result['total_ins_num'])
return result_str
def get_result(self):
return self.metrics
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
from paddle.fluid.initializer import Constant
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.layers.tensor import Variable
class PosNegRatio(Metric):
"""
Metric For Fluid Model
"""
def __init__(self, pos_score, neg_score):
""" """
kwargs = locals()
del kwargs['self']
helper = LayerHelper("PaddleRec_PosNegRatio", **kwargs)
if "pos_score" not in kwargs or "neg_score" not in kwargs:
raise ValueError(
"PosNegRatio expect pos_score and neg_score as inputs.")
pos_score = kwargs.get('pos_score')
neg_score = kwargs.get('neg_score')
if not isinstance(pos_score, Variable):
raise ValueError("pos_score must be Variable, but received %s" %
type(pos_score))
if not isinstance(neg_score, Variable):
raise ValueError("neg_score must be Variable, but received %s" %
type(neg_score))
wrong = fluid.layers.cast(
fluid.layers.less_equal(pos_score, neg_score), dtype='float32')
wrong_cnt = fluid.layers.reduce_sum(wrong)
right = fluid.layers.cast(
fluid.layers.less_than(neg_score, pos_score), dtype='float32')
right_cnt = fluid.layers.reduce_sum(right)
global_right_cnt, _ = helper.create_or_get_global_variable(
name="right_cnt", persistable=True, dtype='float32', shape=[1])
global_wrong_cnt, _ = helper.create_or_get_global_variable(
name="wrong_cnt", persistable=True, dtype='float32', shape=[1])
for var in [global_right_cnt, global_wrong_cnt]:
helper.set_variable_initializer(
var, Constant(
value=0.0, force_cpu=True))
helper.append_op(
type="elementwise_add",
inputs={"X": [global_right_cnt],
"Y": [right_cnt]},
outputs={"Out": [global_right_cnt]})
helper.append_op(
type="elementwise_add",
inputs={"X": [global_wrong_cnt],
"Y": [wrong_cnt]},
outputs={"Out": [global_wrong_cnt]})
self.pn = (global_right_cnt + 1.0) / (global_wrong_cnt + 1.0)
self._global_metric_state_vars = dict()
self._global_metric_state_vars['right_cnt'] = (global_right_cnt.name,
"float32")
self._global_metric_state_vars['wrong_cnt'] = (global_wrong_cnt.name,
"float32")
self.metrics = dict()
self.metrics['WrongCnt'] = global_wrong_cnt
self.metrics['RightCnt'] = global_right_cnt
self.metrics['PN'] = self.pn
def _calculate(self, global_metrics):
for key in self._global_communicate_var:
if key not in global_metrics:
raise ValueError("%s not existed" % key)
pn = (global_metrics['right_cnt'][0] + 1.0) / (
global_metrics['wrong_cnt'][0] + 1.0)
return "RightCnt=%s WrongCnt=%s PN=%s" % (
str(global_metrics['right_cnt'][0]),
str(global_metrics['wrong_cnt'][0]), str(pn))
def get_result(self):
return self.metrics
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
from paddle.fluid.initializer import Constant
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.layers.tensor import Variable
class PrecisionRecall(Metric):
"""
Metric For Fluid Model
"""
def __init__(self, input, label, class_num):
"""R
"""
kwargs = locals()
del kwargs['self']
self.num_cls = class_num
if not isinstance(input, Variable):
raise ValueError("input must be Variable, but received %s" %
type(input))
if not isinstance(label, Variable):
raise ValueError("label must be Variable, but received %s" %
type(label))
helper = LayerHelper("PaddleRec_PrecisionRecall", **kwargs)
label = fluid.layers.cast(label, dtype="int32")
label.stop_gradient = True
max_probs, indices = fluid.layers.nn.topk(input, k=1)
indices = fluid.layers.cast(indices, dtype="int32")
indices.stop_gradient = True
states_info, _ = helper.create_or_get_global_variable(
name="states_info",
persistable=True,
dtype='float32',
shape=[self.num_cls, 4])
states_info.stop_gradient = True
helper.set_variable_initializer(
states_info, Constant(
value=0.0, force_cpu=True))
batch_metrics, _ = helper.create_or_get_global_variable(
name="batch_metrics",
persistable=False,
dtype='float32',
shape=[6])
accum_metrics, _ = helper.create_or_get_global_variable(
name="global_metrics",
persistable=False,
dtype='float32',
shape=[6])
batch_states = fluid.layers.fill_constant(
shape=[self.num_cls, 4], value=0.0, dtype="float32")
batch_states.stop_gradient = True
helper.append_op(
type="precision_recall",
attrs={'class_number': self.num_cls},
inputs={
'MaxProbs': [max_probs],
'Indices': [indices],
'Labels': [label],
'StatesInfo': [states_info]
},
outputs={
'BatchMetrics': [batch_metrics],
'AccumMetrics': [accum_metrics],
'AccumStatesInfo': [batch_states]
})
helper.append_op(
type="assign",
inputs={'X': [batch_states]},
outputs={'Out': [states_info]})
batch_states.stop_gradient = True
states_info.stop_gradient = True
self._global_metric_state_vars = dict()
self._global_metric_state_vars['states_info'] = (states_info.name,
"float32")
self.metrics = dict()
self.metrics["precision_recall_f1"] = accum_metrics
self.metrics["[TP FP TN FN]"] = states_info
def _calculate(self, global_metrics):
for key in self._global_metric_state_vars:
if key not in global_metrics:
raise ValueError("%s not existed" % key)
def calc_precision(tp_count, fp_count):
if tp_count > 0.0 or fp_count > 0.0:
return tp_count / (tp_count + fp_count)
return 1.0
def calc_recall(tp_count, fn_count):
if tp_count > 0.0 or fn_count > 0.0:
return tp_count / (tp_count + fn_count)
return 1.0
def calc_f1_score(precision, recall):
if precision > 0.0 or recall > 0.0:
return 2 * precision * recall / (precision + recall)
return 0.0
states = global_metrics["states_info"]
total_tp_count = 0.0
total_fp_count = 0.0
total_fn_count = 0.0
macro_avg_precision = 0.0
macro_avg_recall = 0.0
for i in range(self.num_cls):
total_tp_count += states[i][0]
total_fp_count += states[i][1]
total_fn_count += states[i][3]
macro_avg_precision += calc_precision(states[i][0], states[i][1])
macro_avg_recall += calc_recall(states[i][0], states[i][3])
metrics = []
macro_avg_precision /= self.num_cls
macro_avg_recall /= self.num_cls
metrics.append(macro_avg_precision)
metrics.append(macro_avg_recall)
metrics.append(calc_f1_score(macro_avg_precision, macro_avg_recall))
micro_avg_precision = calc_precision(total_tp_count, total_fp_count)
metrics.append(micro_avg_precision)
micro_avg_recall = calc_recall(total_tp_count, total_fn_count)
metrics.append(micro_avg_recall)
metrics.append(calc_f1_score(micro_avg_precision, micro_avg_recall))
return "total metrics: [TP, FP, TN, FN]=%s; precision_recall_f1=%s" % (
str(states), str(np.array(metrics).astype('float32')))
def get_result(self):
return self.metrics
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
from paddle.fluid.layers import accuracy
from paddle.fluid.initializer import Constant
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.layers.tensor import Variable
class RecallK(Metric):
"""
Metric For Fluid Model
"""
def __init__(self, input, label, k=20):
""" """
kwargs = locals()
del kwargs['self']
self.k = k
if not isinstance(input, Variable):
raise ValueError("input must be Variable, but received %s" %
type(input))
if not isinstance(label, Variable):
raise ValueError("label must be Variable, but received %s" %
type(label))
helper = LayerHelper("PaddleRec_RecallK", **kwargs)
batch_accuracy = accuracy(input, label, self.k)
global_ins_cnt, _ = helper.create_or_get_global_variable(
name="ins_cnt", persistable=True, dtype='float32', shape=[1])
global_pos_cnt, _ = helper.create_or_get_global_variable(
name="pos_cnt", persistable=True, dtype='float32', shape=[1])
for var in [global_ins_cnt, global_pos_cnt]:
helper.set_variable_initializer(
var, Constant(
value=0.0, force_cpu=True))
tmp_ones = fluid.layers.fill_constant(
shape=fluid.layers.shape(label), dtype="float32", value=1.0)
batch_ins = fluid.layers.reduce_sum(tmp_ones)
batch_pos = batch_ins * batch_accuracy
helper.append_op(
type="elementwise_add",
inputs={"X": [global_ins_cnt],
"Y": [batch_ins]},
outputs={"Out": [global_ins_cnt]})
helper.append_op(
type="elementwise_add",
inputs={"X": [global_pos_cnt],
"Y": [batch_pos]},
outputs={"Out": [global_pos_cnt]})
self.acc = global_pos_cnt / global_ins_cnt
self._global_metric_state_vars = dict()
self._global_metric_state_vars['ins_cnt'] = (global_ins_cnt.name,
"float32")
self._global_metric_state_vars['pos_cnt'] = (global_pos_cnt.name,
"float32")
metric_name = "Acc(Recall@%d)" % self.k
self.metrics = dict()
self.metrics["InsCnt"] = global_ins_cnt
self.metrics["RecallCnt"] = global_pos_cnt
self.metrics[metric_name] = self.acc
# self.metrics["batch_metrics"] = batch_metrics
def _calculate(self, global_metrics):
for key in self._global_metric_state_vars:
if key not in global_metrics:
raise ValueError("%s not existed" % key)
ins_cnt = global_metrics['ins_cnt'][0]
pos_cnt = global_metrics['pos_cnt'][0]
if ins_cnt == 0:
acc = 0
else:
acc = float(pos_cnt) / ins_cnt
return "InsCnt=%s RecallCnt=%s Acc(Recall@%d)=%s" % (
str(ins_cnt), str(pos_cnt), self.k, str(acc))
def get_result(self):
return self.metrics
......@@ -13,14 +13,16 @@
# limitations under the License.
import abc
import os
import paddle.fluid as fluid
from paddle.fluid.framework import Variable
from paddlerec.core.metric import Metric
from paddlerec.core.utils import envs
class Model(object):
"""R
class ModelBase(object):
"""Base Model
"""
__metaclass__ = abc.ABCMeta
......@@ -35,36 +37,74 @@ class Model(object):
self._data_loader = None
self._infer_data_loader = None
self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform()
self._init_hyper_parameters()
self._env = config
self._slot_inited = False
self._clear_metrics = None
def _init_slots(self):
sparse_slots = envs.get_global_env("sparse_slots", None, "train.reader")
dense_slots = envs.get_global_env("dense_slots", None, "train.reader")
def _init_hyper_parameters(self):
pass
if sparse_slots is not None or dense_slots is not None:
sparse_slots = sparse_slots.strip().split(" ")
dense_slots = dense_slots.strip().split(" ")
dense_slots_shape = [[int(j) for j in i.split(":")[1].strip("[]").split(",")] for i in dense_slots]
def _init_slots(self, **kargs):
if self._slot_inited:
return
self._slot_inited = True
dataset = {}
model_dict = {}
for i in self._env["phase"]:
if i["name"] == kargs["name"]:
model_dict = i
break
for i in self._env["dataset"]:
if i["name"] == model_dict["dataset_name"]:
dataset = i
break
name = "dataset." + dataset["name"] + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots != "" or dense_slots != "":
if sparse_slots == "":
sparse_slots = []
else:
sparse_slots = sparse_slots.strip().split(" ")
if dense_slots == "":
dense_slots = []
else:
dense_slots = dense_slots.strip().split(" ")
dense_slots_shape = [[
int(j) for j in i.split(":")[1].strip("[]").split(",")
] for i in dense_slots]
dense_slots = [i.split(":")[0] for i in dense_slots]
self._dense_data_var = []
for i in range(len(dense_slots)):
l = fluid.layers.data(name=dense_slots[i], shape=dense_slots_shape[i], dtype="float32")
l = fluid.layers.data(
name=dense_slots[i],
shape=dense_slots_shape[i],
dtype="float32")
self._data_var.append(l)
self._dense_data_var.append(l)
self._sparse_data_var = []
for name in sparse_slots:
l = fluid.layers.data(name=name, shape=[1], lod_level=1, dtype="int64")
l = fluid.layers.data(
name=name, shape=[1], lod_level=1, dtype="int64")
self._data_var.append(l)
self._sparse_data_var.append(l)
dataset_class = envs.get_global_env("dataset_class", None, "train.reader")
dataset_class = envs.get_global_env(name + "type")
if dataset_class == "DataLoader":
self._init_dataloader()
def _init_dataloader(self):
def _init_dataloader(self, is_infer=False):
if is_infer:
data = self._infer_data_var
else:
data = self._data_var
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
feed_list=data,
capacity=64,
use_double_buffer=False,
iterable=False)
def get_inputs(self):
return self._data_var
......@@ -72,8 +112,23 @@ class Model(object):
def get_infer_inputs(self):
return self._infer_data_var
def get_clear_metrics(self):
if self._clear_metrics is not None:
return self._clear_metrics
self._clear_metrics = []
for key in self._infer_results:
if isinstance(self._infer_results[key], Metric):
self._clear_metrics.append(self._infer_results[key])
return self._clear_metrics
def get_infer_results(self):
return self._infer_results
res = dict()
for key in self._infer_results:
if isinstance(self._infer_results[key], Metric):
res.update(self._infer_results[key].get_result())
elif isinstance(self._infer_results[key], Variable):
res[key] = self._infer_results[key]
return res
def get_avg_cost(self):
"""R
......@@ -83,12 +138,18 @@ class Model(object):
def get_metrics(self):
"""R
"""
return self._metrics
res = dict()
for key in self._metrics:
if isinstance(self._metrics[key], Metric):
res.update(self._metrics[key].get_result())
elif isinstance(self._metrics[key], Variable):
res[key] = self._metrics[key]
return res
def get_fetch_period(self):
return self._fetch_interval
def _build_optimizer(self, name, lr):
def _build_optimizer(self, name, lr, strategy=None):
name = name.upper()
optimizers = ["SGD", "ADAM", "ADAGRAD"]
if name not in optimizers:
......@@ -96,10 +157,12 @@ class Model(object):
"configured optimizer can only supported SGD/Adam/Adagrad")
if name == "SGD":
reg = envs.get_global_env(
"hyper_parameters.reg", 0.0001, self._namespace)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '1'
else:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
if name == "SGD":
optimizer_i = fluid.optimizer.SGD(lr)
elif name == "ADAM":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
elif name == "ADAGRAD":
......@@ -111,19 +174,60 @@ class Model(object):
return optimizer_i
def optimizer(self):
learning_rate = envs.get_global_env(
"hyper_parameters.learning_rate", None, self._namespace)
optimizer = envs.get_global_env(
"hyper_parameters.optimizer", None, self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate)
@abc.abstractmethod
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env(
"hyper_parameters.optimizer.strategy")
return self._build_optimizer(opt_name, opt_lr, opt_strategy)
def input_data(self, is_infer=False, **kwargs):
name = "dataset." + kwargs.get("dataset_name") + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
self._sparse_data_var_map = {}
self._dense_data_var_map = {}
if sparse_slots != "" or dense_slots != "":
if sparse_slots == "":
sparse_slots = []
else:
sparse_slots = sparse_slots.strip().split(" ")
if dense_slots == "":
dense_slots = []
else:
dense_slots = dense_slots.strip().split(" ")
dense_slots_shape = [[
int(j) for j in i.split(":")[1].strip("[]").split(",")
] for i in dense_slots]
dense_slots = [i.split(":")[0] for i in dense_slots]
self._dense_data_var = []
data_var_ = []
for i in range(len(dense_slots)):
l = fluid.layers.data(
name=dense_slots[i],
shape=dense_slots_shape[i],
dtype="float32")
data_var_.append(l)
self._dense_data_var.append(l)
self._dense_data_var_map[dense_slots[i]] = l
self._sparse_data_var = []
for name in sparse_slots:
l = fluid.layers.data(
name=name, shape=[1], lod_level=1, dtype="int64")
data_var_.append(l)
self._sparse_data_var.append(l)
self._sparse_data_var_map[name] = l
return data_var_
else:
return None
def net(self, is_infer=False):
return None
def train_net(self):
"""R
"""
pass
@abc.abstractmethod
def infer_net(self):
pass
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -31,9 +31,15 @@ def create(config):
Model Instance
"""
model = None
if config['mode'] == 'fluid':
model = YamlModel(config)
model.train_net()
if config['layer_file'].endswith(".py"):
model_class = envs.lazy_instance_by_fliename(config['layer_file'],
"Model")
model = model_class(config)
else:
model = YamlModel(config)
model.train()
return model
......@@ -50,7 +56,12 @@ class YamlModel(Model):
f = open(config['layer_file'], 'r')
self._build_nodes = yaml.safe_load(f.read())
self._build_phase = ['input', 'param', 'summary', 'layer']
self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}}
self._build_param = {
'layer': {},
'inner_layer': {},
'layer_extend': {},
'model': {}
}
self._inference_meta = {'dependency': {}, 'params': {}}
def train_net(self):
......@@ -76,10 +87,12 @@ class YamlModel(Model):
if self._build_nodes[phase] is None:
continue
for node in self._build_nodes[phase]:
exec("""layer=layer.{}(node)""".format(node['class']))
layer_output, extend_output = layer.generate(self._config['mode'], self._build_param)
exec ("""layer=layer.{}(node)""".format(node['class']))
layer_output, extend_output = layer.generate(
self._config['mode'], self._build_param)
self._build_param['layer'][node['name']] = layer_output
self._build_param['layer_extend'][node['name']] = extend_output
self._build_param['layer_extend'][node[
'name']] = extend_output
if extend_output is None:
continue
if 'loss' in extend_output:
......@@ -89,17 +102,24 @@ class YamlModel(Model):
self._cost += extend_output['loss']
if 'data_var' in extend_output:
self._data_var += extend_output['data_var']
if 'metric_label' in extend_output and extend_output['metric_label'] is not None:
self._metrics[extend_output['metric_label']] = extend_output['metric_dict']
if 'metric_label' in extend_output and extend_output[
'metric_label'] is not None:
self._metrics[extend_output[
'metric_label']] = extend_output['metric_dict']
if 'inference_param' in extend_output:
inference_param = extend_output['inference_param']
param_name = inference_param['name']
if param_name not in self._build_param['table']:
self._build_param['table'][param_name] = {'params': []}
table_meta = table.TableMeta.alloc_new_table(inference_param['table_id'])
self._build_param['table'][param_name]['_meta'] = table_meta
self._build_param['table'][param_name]['params'] += inference_param['params']
self._build_param['table'][param_name] = {
'params': []
}
table_meta = table.TableMeta.alloc_new_table(
inference_param['table_id'])
self._build_param['table'][param_name][
'_meta'] = table_meta
self._build_param['table'][param_name][
'params'] += inference_param['params']
pass
@classmethod
......@@ -114,20 +134,25 @@ class YamlModel(Model):
metrics = params['metrics']
for name in metrics:
model_metrics = metrics[name]
stat_var_names += [model_metrics[metric]['var'].name for metric in model_metrics]
stat_var_names += [
model_metrics[metric]['var'].name
for metric in model_metrics
]
strategy['stat_var_names'] = list(set(stat_var_names))
optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \
'(learning_rate=' + str(optimizer_conf['learning_rate']) + ')'
exec(optimizer_generator)
exec (optimizer_generator)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
return optimizer
def dump_model_program(self, path):
"""R
"""
with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout:
with open(path + '/' + self._name + '_main_program.pbtxt',
"w") as fout:
print >> fout, self._build_param['model']['train_program']
with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout:
with open(path + '/' + self._name + '_startup_program.pbtxt',
"w") as fout:
print >> fout, self._build_param['model']['startup_program']
pass
......@@ -137,7 +162,8 @@ class YamlModel(Model):
scope = params['scope']
decay = params['decay']
for param_table in self._build_param['table']:
table_id = self._build_param['table'][param_table]['_meta']._table_id
table_id = self._build_param['table'][param_table][
'_meta']._table_id
fleet.shrink_dense_table(decay, scope=scope, table_id=table_id)
def dump_inference_program(self, inference_layer, path):
......@@ -152,17 +178,25 @@ class YamlModel(Model):
executor = params['executor']
program = self._build_param['model']['train_program']
for table_name, table in self._build_param['table'].items():
fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params'])
fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id,
table['params'])
for infernce_item in params['inference_list']:
params_name_list = self.inference_params(infernce_item['layer_name'])
params_var_list = [program.global_block().var(i) for i in params_name_list]
params_name_list = self.inference_params(infernce_item[
'layer_name'])
params_var_list = [
program.global_block().var(i) for i in params_name_list
]
params_file_name = infernce_item['save_file_name']
with fluid.scope_guard(scope):
if params['save_combine']:
fluid.io.save_vars(executor, "./", \
program, vars=params_var_list, filename=params_file_name)
else:
fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list)
fluid.io.save_vars(
executor,
params_file_name,
program,
vars=params_var_list)
def inference_params(self, inference_layer):
"""
......@@ -177,11 +211,13 @@ class YamlModel(Model):
return self._inference_meta['params'][layer]
self._inference_meta['params'][layer] = []
self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer)
self._inference_meta['dependency'][layer] = self.get_dependency(
self._build_param['inner_layer'], layer)
for node in self._build_nodes['layer']:
if node['name'] not in self._inference_meta['dependency'][layer]:
continue
if 'inference_param' in self._build_param['layer_extend'][node['name']]:
if 'inference_param' in self._build_param['layer_extend'][node[
'name']]:
self._inference_meta['params'][layer] += \
self._build_param['layer_extend'][node['name']]['inference_param']['params']
return self._inference_meta['params'][layer]
......@@ -199,5 +235,6 @@ class YamlModel(Model):
dependencys = copy.deepcopy(layer_graph[dest_layer]['input'])
dependency_list = copy.deepcopy(dependencys)
for dependency in dependencys:
dependency_list = dependency_list + self.get_dependency(layer_graph, dependency)
dependency_list = dependency_list + self.get_dependency(
layer_graph, dependency)
return list(set(dependency_list))
......@@ -18,7 +18,7 @@ from paddlerec.core.layer import Layer
class EmbeddingFuseLayer(Layer):
"""R
"""embedding + sequence + concat
"""
def __init__(self, config):
......@@ -40,7 +40,8 @@ class EmbeddingFuseLayer(Layer):
show_clk.stop_gradient = True
data_var = []
for slot in self._slots:
l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1)
l = fluid.layers.data(
name=slot, shape=[1], dtype="int64", lod_level=1)
data_var.append(l)
emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \
is_sparse=True, is_distributed=True,
......@@ -48,7 +49,8 @@ class EmbeddingFuseLayer(Layer):
emb = fluid.layers.sequence_pool(input=emb, pool_type='sum')
emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm)
self._emb_layers.append(emb)
output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name)
output = fluid.layers.concat(
input=self._emb_layers, axis=1, name=self._name)
return output, {'data_var': data_var}
......@@ -111,7 +113,13 @@ class ParamLayer(Layer):
def generate(self, param):
"""R
"""
return self._config, {'inference_param': {'name': 'param', 'params': [], 'table_id': self._table_id}}
return self._config, {
'inference_param': {
'name': 'param',
'params': [],
'table_id': self._table_id
}
}
class SummaryLayer(Layer):
......@@ -129,7 +137,13 @@ class SummaryLayer(Layer):
def generate(self, param):
"""R
"""
return self._config, {'inference_param': {'name': 'summary', 'params': [], 'table_id': self._table_id}}
return self._config, {
'inference_param': {
'name': 'summary',
'params': [],
'table_id': self._table_id
}
}
class NormalizationLayer(Layer):
......@@ -152,9 +166,19 @@ class NormalizationLayer(Layer):
if len(self._input) > 0:
input_list = [param['layer'][i] for i in self._input]
input_layer = fluid.layers.concat(input=input_list, axis=1)
bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={
"batch_size": 1e4, "batch_sum_default": 0.0, "batch_square": 1e4})
inference_param = [self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum']
bn = fluid.layers.data_norm(
input=input_layer,
name=self._name,
epsilon=1e-4,
param_attr={
"batch_size": 1e4,
"batch_sum_default": 0.0,
"batch_square": 1e4
})
inference_param = [
self._name + '.batch_size', self._name + '.batch_sum',
self._name + '.batch_square_sum'
]
return bn, {'inference_param': {'name': 'summary', \
'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}}
......@@ -181,11 +205,13 @@ class FCLayer(Layer):
input_list = [param['layer'][i] for i in self._input]
input_layer = fluid.layers.concat(input=input_list, axis=1)
input_coln = input_layer.shape[1]
scale = param_layer['init_range'] / (input_coln ** 0.5)
scale = param_layer['init_range'] / (input_coln**0.5)
bias = None
if self._bias:
bias = fluid.ParamAttr(learning_rate=1.0,
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale))
bias = fluid.ParamAttr(
learning_rate=1.0,
initializer=fluid.initializer.NormalInitializer(
loc=0.0, scale=scale))
fc = fluid.layers.fc(
name=self._name,
input=input_layer,
......@@ -216,18 +242,46 @@ class LogLossLayer(Layer):
self._extend_output = {
'metric_label': self._metric_label,
'metric_dict': {
'auc': {'var': None},
'batch_auc': {'var': None},
'stat_pos': {'var': None, 'data_type': 'int64'},
'stat_neg': {'var': None, 'data_type': 'int64'},
'batch_stat_pos': {'var': None, 'data_type': 'int64'},
'batch_stat_neg': {'var': None, 'data_type': 'int64'},
'pos_ins_num': {'var': None},
'abserr': {'var': None},
'sqrerr': {'var': None},
'prob': {'var': None},
'total_ins_num': {'var': None},
'q': {'var': None}
'auc': {
'var': None
},
'batch_auc': {
'var': None
},
'stat_pos': {
'var': None,
'data_type': 'int64'
},
'stat_neg': {
'var': None,
'data_type': 'int64'
},
'batch_stat_pos': {
'var': None,
'data_type': 'int64'
},
'batch_stat_neg': {
'var': None,
'data_type': 'int64'
},
'pos_ins_num': {
'var': None
},
'abserr': {
'var': None
},
'sqrerr': {
'var': None
},
'prob': {
'var': None
},
'total_ins_num': {
'var': None
},
'q': {
'var': None
}
}
}
......@@ -236,9 +290,12 @@ class LogLossLayer(Layer):
"""
input_layer = param['layer'][self._input[0]]
label_layer = param['layer'][self._label]
output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name=self._name)
output = fluid.layers.clip(
input_layer, self._bound[0], self._bound[1], name=self._name)
norm = fluid.layers.sigmoid(output, name=self._name)
output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32'))
output = fluid.layers.log_loss(
norm, fluid.layers.cast(
x=label_layer, dtype='float32'))
if self._weight:
weight_layer = param['layer'][self._weight]
output = fluid.layers.elementwise_mul(output, weight_layer)
......@@ -248,7 +305,11 @@ class LogLossLayer(Layer):
# For AUC Metric
metric = self._extend_output['metric_dict']
binary_predict = fluid.layers.concat(
input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1)
input=[
fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm),
norm
],
axis=1)
metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \
metric['batch_stat_neg']['var'], metric['stat_pos']['var'],
metric['stat_neg']['var']] = \
......
......@@ -11,35 +11,26 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import sys
from __future__ import print_function
import abc
import os
from functools import reduce
import paddle.fluid.incubate.data_generator as dg
import yaml
from paddlerec.core.utils import envs
class Reader(dg.MultiSlotDataGenerator):
class ReaderBase(dg.MultiSlotDataGenerator):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod
def init(self):
"""init """
pass
@abc.abstractmethod
......@@ -52,19 +43,24 @@ class SlotReader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
def init(self, sparse_slots, dense_slots, padding=0):
from operator import mul
self.sparse_slots = sparse_slots.strip().split(" ")
self.dense_slots = dense_slots.strip().split(" ")
self.dense_slots_shape = [reduce(mul, [int(j) for j in i.split(":")[1].strip("[]").split(",")]) for i in self.dense_slots]
self.sparse_slots = []
if sparse_slots.strip() != "#" and sparse_slots.strip(
) != "?" and sparse_slots.strip() != "":
self.sparse_slots = sparse_slots.strip().split(" ")
self.dense_slots = []
if dense_slots.strip() != "#" and dense_slots.strip(
) != "?" and dense_slots.strip() != "":
self.dense_slots = dense_slots.strip().split(" ")
self.dense_slots_shape = [
reduce(mul,
[int(j) for j in i.split(":")[1].strip("[]").split(",")])
for i in self.dense_slots
]
self.dense_slots = [i.split(":")[0] for i in self.dense_slots]
self.slots = self.dense_slots + self.sparse_slots
self.slot2index = {}
......@@ -93,10 +89,13 @@ class SlotReader(dg.MultiSlotDataGenerator):
slot = i
if not self.visit[slot]:
if i in self.dense_slots:
output[self.slot2index[i]][1].extend([self.padding] * self.dense_slots_shape[self.slot2index[i]])
output[self.slot2index[i]][1].extend(
[self.padding] *
self.dense_slots_shape[self.slot2index[i]])
else:
output[self.slot2index[i]][1].extend([self.padding])
else:
self.visit[slot] = False
yield output
return reader
......@@ -16,28 +16,177 @@ import abc
import os
import time
import sys
import yaml
import traceback
from paddle import fluid
from paddlerec.core.utils import envs
class EngineMode:
"""
There are various engine designed for different runing environment.
"""
SINGLE = 1
CLUSTER = 2
LOCAL_CLUSTER = 3
class FleetMode:
"""
Paddle Distributed train support: ParameterServer/Collective/PSlib
"""
PS = 1
COLLECTIVE = 2
PSLIB = 3
class Device:
"""
PaddleRec Support CPU/GPU, XPU will comming soon
"""
CPU = 1
GPU = 2
# XPU =3
class Trainer(object):
"""R
"""
Trainer Base
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config=None):
self._status_processor = {}
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.model = None
self.inference_models = []
self.increment_models = []
self._exector_context = {}
self._context = {'status': 'uninit', 'is_exit': False}
self._config_yaml = config
self._context["config_yaml"] = config
self._model = {}
self._dataset = {}
with open(config, 'r') as rb:
self._config = yaml.load(rb.read(), Loader=yaml.FullLoader)
self._runner_name = envs.get_runtime_environ("mode")
self._context["runner_name"] = self._runner_name
phase_names = envs.get_global_env(
"runner." + self._runner_name + ".phases", None)
_config = envs.load_yaml(config)
self._context["env"] = _config
self._context["dataset"] = _config.get("dataset")
phases = []
if phase_names is None:
phases = _config.get("phase")
else:
for phase in _config.get("phase"):
if phase["name"] in phase_names:
phases.append(phase)
self._context["phases"] = phases
print("PaddleRec: Runner {} Begin".format(self._runner_name))
self.which_engine()
self.which_device()
self.which_fleet_mode()
self.which_executor_mode()
self.legality_check()
def which_device(self):
"""R
"""
device = envs.get_global_env(
"runner." + self._runner_name + ".device", default_value="CPU")
device = device.upper()
if device == 'GPU':
self.check_gpu()
self.device = Device.GPU
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
self._place = fluid.CUDAPlace(gpu_id)
self._exe = fluid.Executor(self._place)
elif device == "CPU":
self.device = Device.CPU
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
else:
raise ValueError("Not Support device {}".format(device))
self._context["device"] = device
self._context["exe"] = self._exe
self._context["place"] = self._place
def check_gpu(self):
"""
Log error and exit when set use_gpu=true in paddlepaddle
cpu version.
"""
err = "GPU cannot be set as true while you are " \
"using paddlepaddle cpu version ! \nPlease try: \n" \
"\t1. Install paddlepaddle-gpu to run model on GPU \n" \
"\t2. Set device as cpu in config file to run " \
"model on CPU"
try:
if not fluid.is_compiled_with_cuda():
raise RuntimeError(err)
except Exception as e:
pass
def which_engine(self):
engine = envs.get_runtime_environ("train.trainer.engine")
if engine.upper() == "SINGLE":
self.engine = EngineMode.SINGLE
self.is_fleet = False
elif engine.upper() == "LOCAL_CLUSTER":
self.engine = EngineMode.LOCAL_CLUSTER
self.is_fleet = True
elif engine.upper() == "CLUSTER":
self.engine = EngineMode.CLUSTER
self.is_fleet = True
else:
raise ValueError("Not Support Engine {}".format(engine))
self._context["is_fleet"] = self.is_fleet
self._context["engine"] = self.engine
def which_fleet_mode(self):
fleet_mode = envs.get_runtime_environ("fleet_mode")
if fleet_mode.upper() == "PS":
self.fleet_mode = FleetMode.PS
elif fleet_mode.upper() == "COLLECTIVE":
self.fleet_mode = FleetMode.COLLECTIVE
elif fleet_mode.upper() == "PSLIB":
self.fleet_mode = FleetMode.PSLIB
else:
raise ValueError("Not Support Fleet Mode {}".format(fleet_mode))
self._context["is_pslib"] = (fleet_mode.upper() == "PSLIB")
self._context["fleet_mode"] = fleet_mode
def which_executor_mode(self):
executor_mode = envs.get_runtime_environ("train.trainer.executor_mode")
if executor_mode.upper() not in ["TRAIN", "INFER"]:
raise ValueError("Not Support Executor Mode {}".format(
executor_mode))
if executor_mode.upper() == "TRAIN":
self.is_infer = False
else:
self.is_infer = True
print("Executor Mode: {}".format(executor_mode))
self._context["is_infer"] = self.is_infer
def legality_check(self):
if self.device == Device.CPU:
assert self.fleet_mode != FleetMode.COLLECTIVE, "Not Support CPU with Collective Mode"
if self.is_infer:
assert self.engine == EngineMode.SINGLE, "Not Support Distributed Infer "
@abc.abstractmethod
def processor_register(self):
pass
def regist_context_processor(self, status_name, processor):
"""
......@@ -53,7 +202,8 @@ class Trainer(object):
Return:
None : run a processor for this status
"""
if context['status'] in self._status_processor:
status = context['status']
if status in self._status_processor:
self._status_processor[context['status']](context)
else:
self.other_status_processor(context)
......@@ -67,6 +217,19 @@ class Trainer(object):
print('unknow context_status:%s, do nothing' % context['status'])
time.sleep(60)
def handle_processor_exception(self, context, exception):
"""
when exception throwed from processor, will call this func to handle it
Return:
bool exit_app or not
"""
print("\n--------------------------------\nPaddleRec Error Message "
"Summary:\n--------------------------------\n")
print(
'Exit PaddleRec. catch exception in precoss status: [%s], except: %s'
% (context['status'], str(exception)))
return True
def reload_train_context(self):
"""
context maybe update timely, reload for update
......@@ -78,23 +241,14 @@ class Trainer(object):
keep running by statu context.
"""
while True:
self.reload_train_context()
self.context_process(self._context)
if self._context['is_exit']:
break
def user_define_engine(engine_yaml):
with open(engine_yaml, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
assert _config is not None
envs.set_runtime_environs(_config)
train_location = envs.get_global_env("engine.file")
train_dirname = os.path.dirname(train_location)
base_name = os.path.splitext(os.path.basename(train_location))[0]
sys.path.append(train_dirname)
trainer_class = envs.lazy_instance_by_fliename(
base_name, "UserDefineTraining")
return trainer_class
try:
self.reload_train_context()
self.context_process(self._context)
if self._context['is_exit']:
break
except Exception as err:
traceback.print_exc()
print('Catch Exception:%s' % str(err))
sys.stdout.flush()
self.handle_processor_exception(self._context, err)
sys.exit(type(err).__name__)
......@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
trainer implement.
......@@ -22,5 +21,3 @@ Trainer
↘ (for online learning training) OnlineLearningTrainer
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import os
import time
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from paddlerec.core.utils import envs
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
class ClusterTrainer(TranspileTrainer):
def processor_register(self):
role = PaddleCloudRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor(
'train_pass', self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def build_strategy(self):
mode = envs.get_runtime_environ("train.trainer.strategy")
assert mode in ["async", "geo", "sync", "half_async"]
strategy = None
if mode == "async":
strategy = StrategyFactory.create_async_strategy()
elif mode == "geo":
push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
strategy = StrategyFactory.create_geo_strategy(push_num)
elif mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
assert strategy is not None
self.strategy = strategy
return strategy
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer_name = envs.get_global_env(
"hyper_parameters.optimizer", None, "train.model")
if optimizer_name not in ["", "sgd", "SGD", "Sgd"]:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['is_exit'] = True
def startup(self, context):
self._exe.run(fleet.startup_program)
context['status'] = 'train_pass'
def dataloader_train(self, context):
fleet.init_worker()
reader = self._get_dataloader()
epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel(
loss_name=self.model.get_avg_cost().name,
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
for epoch in range(epochs):
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(
program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
self.save(epoch, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'infer_pass'
def dataset_train(self, context):
fleet.init_worker()
dataset = self._get_dataset()
ins = self._get_dataset_ins()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
begin_time = time.time()
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time-begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times))
self.save(i, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'infer_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer
class CtrTrainer(Trainer):
"""R
"""
def __init__(self, config):
"""R
"""
Trainer.__init__(self, config)
self.global_config = config
self._metrics = {}
self.processor_register()
def processor_register(self):
role = MPISymetricRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('train_pass', self.train)
self.regist_context_processor('terminal_pass', self.terminal)
def _get_dataset(self):
namespace = "train.reader"
inputs = self.model.get_inputs()
threads = envs.get_global_env("train.threads", None)
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "TRAIN", self._config_yaml)
train_data_path = envs.get_global_env("train_data_path", None, namespace)
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
return dataset
def instance(self, context):
models = envs.get_global_env("train.model.models")
model_class = envs.lazy_instance_by_fliename(models, "Model")
self.model = model_class(None)
context['status'] = 'init_pass'
def init(self, context):
"""R
"""
self.model.train_net()
optimizer = self.model.optimizer()
optimizer = fleet.distributed_optimizer(optimizer, strategy={"use_cvm": False})
optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'train_pass'
def server(self, context):
fleet.run_server()
fleet.stop_worker()
context['is_exit'] = True
def train(self, context):
self._exe.run(fluid.default_startup_program())
fleet.init_worker()
dataset = self._get_dataset()
shuf = np.array([fleet.worker_index()])
gs = shuf * 0
fleet._role_maker._node_type_comm.Allreduce(shuf, gs)
print("trainer id: {}, trainers: {}, gs: {}".format(fleet.worker_index(), fleet.worker_num(), gs))
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
context['status'] = 'terminal_pass'
fleet.stop_worker()
def terminal(self, context):
print("terminal ended.")
context['is_exit'] = True
此差异已折叠。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.utils import dataloader_instance
from paddlerec.core.reader import SlotReader
from paddlerec.core.trainer import EngineMode
from paddlerec.core.utils.util import split_files
__all__ = ["DatasetBase", "DataLoader", "QueueDataset"]
class DatasetBase(object):
"""R
"""
def __init__(self, context):
pass
def get_dataset(self, context):
pass
class DataLoader(DatasetBase):
def __init__(self, context):
pass
def get_dataloader(self, context, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
reader_class_name = envs.get_global_env(name + "reader_class_name",
"Reader")
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class,
dataset_name,
context["config_yaml"],
context,
reader_class_name=reader_class_name)
reader_class = envs.lazy_instance_by_fliename(reader_class,
reader_class_name)
reader_ins = reader_class(context["config_yaml"])
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, context["config_yaml"], context)
reader_ins = SlotReader(context["config_yaml"])
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
class QueueDataset(DatasetBase):
def __init__(self, context):
pass
def create_dataset(self, dataset_name, context):
name = "dataset." + dataset_name + "."
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), "Reader To Dataloader")
type_name = "DataLoader"
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name, context)
def _get_dataset(self, dataset_name, context):
name = "dataset." + dataset_name + "."
reader_class = envs.get_global_env(name + "data_converter")
reader_class_name = envs.get_global_env(name + "reader_class_name",
"Reader")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
reader_class_name,
context["config_yaml"])
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", context["config_yaml"], "fake",
sparse_slots.replace(" ", "?"),
dense_slots.replace(" ", "?"), str(padding))
batch_size = envs.get_global_env(name + "batch_size")
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(batch_size)
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
if context["engine"] == EngineMode.LOCAL_CLUSTER:
file_list = split_files(file_list, context["fleet"].worker_index(),
context["fleet"].worker_num())
print("File_list: {}".format(file_list))
dataset.set_filelist(file_list)
for model_dict in context["phases"]:
if model_dict["dataset_name"] == dataset_name:
model = context["model"][model_dict["name"]]["model"]
thread_num = int(model_dict["thread_num"])
dataset.set_thread(thread_num)
if context["is_infer"]:
inputs = model._infer_data_var
else:
inputs = model._data_var
dataset.set_use_var(inputs)
break
return dataset
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
__all__ = [
"InstanceBase", "SingleInstance", "PSInstance", "PslibInstance",
"CollectiveInstance"
]
class InstanceBase(object):
"""R
"""
def __init__(self, context):
pass
def instance(self, context):
pass
class SingleInstance(InstanceBase):
def __init__(self, context):
print("Running SingleInstance.")
pass
def instance(self, context):
context['status'] = 'network_pass'
class PSInstance(InstanceBase):
def __init__(self, context):
print("Running PSInstance.")
pass
def instance(self, context):
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
role = PaddleCloudRoleMaker()
fleet.init(role)
context['fleet'] = fleet
context['status'] = 'network_pass'
class PslibInstance(InstanceBase):
def __init__(self, context):
print("Running PslibInstance.")
pass
def instance(self, context):
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
fleet.init()
context['fleet'] = fleet
context['status'] = 'network_pass'
class CollectiveInstance(InstanceBase):
def __init__(self, context):
print("Running CollectiveInstance.")
pass
def instance(self, context):
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
role = PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
context['fleet'] = fleet
context['status'] = 'network_pass'
此差异已折叠。
此差异已折叠。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
__all__ = ["StartupBase", "SingleStartup", "PSStartup", "CollectiveStartup"]
class StartupBase(object):
"""R
"""
def __init__(self, context):
pass
def startup(self, context):
pass
def load(self, context, is_fleet=False, main_program=None):
dirname = envs.get_global_env(
"runner." + context["runner_name"] + ".init_model_path", None)
if dirname is None or dirname == "":
return
print("going to load ", dirname)
fluid.io.load_persistables(
context["exe"], dirname, main_program=main_program)
print("load from {} success".format(dirname))
class SingleStartup(StartupBase):
"""R
"""
def __init__(self, context):
print("Running SingleStartup.")
pass
def startup(self, context):
for model_dict in context["phases"]:
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
"main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
context["exe"].run(startup_prog)
self.load(context, main_program=train_prog)
context["status"] = "train_pass"
class PSStartup(StartupBase):
def __init__(self, context):
print("Running PSStartup.")
pass
def startup(self, context):
model_dict = context["env"]["phase"][0]
with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]):
train_prog = context["model"][model_dict["name"]]["main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
context["exe"].run(startup_prog)
context["status"] = "train_pass"
class CollectiveStartup(StartupBase):
def __init__(self, context):
print("Running CollectiveStartup.")
pass
def startup(self, context):
model_dict = context["env"]["phase"][0]
with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]):
train_prog = context["model"][model_dict["name"]][
"default_main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
context["exe"].run(startup_prog)
self.load(context, main_program=train_prog)
context["status"] = "train_pass"
class SingleInferStartup(StartupBase):
def __init__(self, context):
print("Running SingleInferStartup.")
pass
def startup(self, context):
for model_dict in context["phases"]:
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
"main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
context["exe"].run(startup_prog)
context["status"] = "train_pass"
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
__all__ = ["TerminalBase", "PSTerminalBase"]
class TerminalBase(object):
"""R
"""
def __init__(self, context):
pass
def terminal(self, context):
print("PaddleRec Finish")
class PSTerminal(TerminalBase):
"""R
"""
def __init__(self, context):
pass
def terminal(self, context):
context["fleet"].stop_worker()
print("PaddleRec Finish")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + PS/COLLECTIVE
"""
from __future__ import print_function
import os
from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer, EngineMode, FleetMode
class GeneralTrainer(Trainer):
"""
Trainer for various situations.
"""
def __init__(self, config=None):
Trainer.__init__(self, config)
self.processor_register()
self.abs_dir = os.path.dirname(os.path.abspath(__file__))
self.runner_env_name = "runner." + self._context["runner_name"]
def processor_register(self):
print("processor_register begin")
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('network_pass', self.network)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.runner)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
instance_class_path = envs.get_global_env(
self.runner_env_name + ".instance_class_path", default_value=None)
if instance_class_path:
instance_class = envs.lazy_instance_by_fliename(
instance_class_path, "Instance")(context)
else:
if self.engine == EngineMode.SINGLE:
instance_class_name = "SingleInstance"
elif self.fleet_mode == FleetMode.PSLIB:
instance_class_name = "PslibInstance"
elif self.fleet_mode == FleetMode.PS:
instance_class_name = "PSInstance"
elif self.fleet_mode == FleetMode.COLLECTIVE:
instance_class_name = "CollectiveInstance"
else:
raise ValueError("Instance Init Error")
instance_path = os.path.join(self.abs_dir, "framework",
"instance.py")
instance_class = envs.lazy_instance_by_fliename(
instance_path, instance_class_name)(context)
instance_class.instance(context)
def network(self, context):
network_class_path = envs.get_global_env(
self.runner_env_name + ".network_class_path", default_value=None)
if network_class_path:
network_class = envs.lazy_instance_by_fliename(network_class_path,
"Network")(context)
else:
if self.engine == EngineMode.SINGLE:
network_class_name = "SingleNetwork"
elif self.fleet_mode == FleetMode.PSLIB:
network_class_name = "PslibNetwork"
elif self.fleet_mode == FleetMode.PS:
network_class_name = "PSNetwork"
elif self.fleet_mode == FleetMode.COLLECTIVE:
network_class_name = "CollectiveNetwork"
else:
raise ValueError("NetWork Init Error")
network_path = os.path.join(self.abs_dir, "framework",
"network.py")
network_class = envs.lazy_instance_by_fliename(
network_path, network_class_name)(context)
network_class.build_network(context)
def startup(self, context):
startup_class_path = envs.get_global_env(
self.runner_env_name + ".startup_class_path", default_value=None)
if startup_class_path:
startup_class = envs.lazy_instance_by_fliename(startup_class_path,
"Startup")(context)
else:
if self.engine == EngineMode.SINGLE and context["is_infer"]:
startup_class_name = "SingleInferStartup"
elif self.engine == EngineMode.SINGLE and not context["is_infer"]:
startup_class_name = "SingleStartup"
elif self.fleet_mode == FleetMode.PS or self.fleet_mode == FleetMode.PSLIB:
startup_class_name = "PSStartup"
elif self.fleet_mode == FleetMode.COLLECTIVE:
startup_class_name = "CollectiveStartup"
else:
raise ValueError("Startup Init Error")
startup_path = os.path.join(self.abs_dir, "framework",
"startup.py")
startup_class = envs.lazy_instance_by_fliename(
startup_path, startup_class_name)(context)
startup_class.startup(context)
def runner(self, context):
runner_class_path = envs.get_global_env(
self.runner_env_name + ".runner_class_path", default_value=None)
if runner_class_path:
runner_class = envs.lazy_instance_by_fliename(runner_class_path,
"Runner")(context)
else:
if self.engine == EngineMode.SINGLE and context["is_infer"]:
runner_class_name = "SingleInferRunner"
elif self.engine == EngineMode.SINGLE and not context["is_infer"]:
runner_class_name = "SingleRunner"
elif self.fleet_mode == FleetMode.PSLIB:
runner_class_name = "PslibRunner"
elif self.fleet_mode == FleetMode.PS:
runner_class_name = "PSRunner"
elif self.fleet_mode == FleetMode.COLLECTIVE:
runner_class_name = "CollectiveRunner"
else:
raise ValueError("Runner Init Error")
runner_path = os.path.join(self.abs_dir, "framework", "runner.py")
runner_class = envs.lazy_instance_by_fliename(
runner_path, runner_class_name)(context)
runner_class.run(context)
def terminal(self, context):
terminal_class_path = envs.get_global_env(
self.runner_env_name + ".terminal_class_path", default_value=None)
if terminal_class_path:
terminal_class = envs.lazy_instance_by_fliename(
terminal_class_path, "Terminal")(context)
terminal_class.terminal(context)
else:
terminal_class_name = "TerminalBase"
if self.engine != EngineMode.SINGLE and self.fleet_mode != FleetMode.COLLECTIVE:
terminal_class_name = "PSTerminal"
terminal_path = os.path.join(self.abs_dir, "framework",
"terminal.py")
terminal_class = envs.lazy_instance_by_fliename(
terminal_path, terminal_class_name)(context)
terminal_class.terminal(context)
context['is_exit'] = True
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
# PaddleRec 贡献代码
> 占位
\ No newline at end of file
> 占位
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册