未验证 提交 ce0538fd 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #2 from seiriosPlus/master

PaddleRec Milestone
*.o
output
.idea/
paddlerec.egg-info/
*~
*.pyc
# PaddleRec
推荐算法,大规模并行训练支持
<p align="center">
<img align="center" src="doc/imgs/logo.png">
<p>
<p align="center">
<br>
<img alt="Release" src="https://img.shields.io/badge/Release-0.1.0-yellowgreen">
<img alt="License" src="https://img.shields.io/github/license/PaddlePaddle/Serving">
<img alt="Slack" src="https://img.shields.io/badge/Join-Slack-green">
<br>
<p>
<h2 align="center">什么是PaddleRec</h2>
<p align="center">
<img align="center" src="doc/imgs/structure.png">
<p>
- 源于飞桨生态的搜索推荐模型**一站式开箱即用工具**
- 适合初学者,开发者,研究者从调研,训练到预测部署的全流程解决方案
- 包含语义理解、召回、粗排、精排、多任务学习、融合等多个任务的推荐搜索算法库
- 配置**yaml**自定义选项,即可快速上手使用单机训练、大规模分布式训练、离线预测、在线部署
<h2 align="center">PadlleRec概览</h2>
<p align="center">
<img align="center" src="doc/imgs/overview.png">
<p>
<h2 align="center">推荐系统-流程概览</h2>
<p align="center">
<img align="center" src="doc/imgs/rec-overview.png">
<p>
<h2 align="center">便捷安装</h2>
### 环境要求
* Python 2.7/ 3.5 / 3.6 / 3.7
* PaddlePaddle >= 1.7.2
* 操作系统: Windows/Mac/Linux
> Windows下目前仅提供单机训练,建议使用Linux
### 安装命令
- 安装方法一<PIP源直接安装>
```bash
python -m pip install paddle-rec
```
- 安装方法二
源码编译安装
1. 安装飞桨 **注:需要用户安装版本 >1.7.2 的飞桨**
```shell
python -m pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple
```
2. 源码安装PaddleRec
```
git clone https://github.com/PaddlePaddle/PaddleRec/
cd PaddleRec
python setup.py install
```
<h2 align="center">快速启动</h2>
### 启动内置模型的默认配置
目前框架内置了多个模型,一行命令即可使用内置模型开始单机训练和本地模拟分布式训练。
> 本地模拟分布式(`local_cluster`)为`1个server + 1个trainer`的参数服务器模式
我们以排序模型中的`dnn`模型为例介绍PaddleRec的简单使用。训练数据来源为[Criteo数据集](https://www.kaggle.com/c/criteo-display-ad-challenge/),我们从中截取了100条方便您快速上手体验完整的PaddleRec流程。
```bash
# 使用CPU进行单机训练
python -m paddlerec.run -m paddlerec.models.rank.dnn
```
### 启动内置模型的自定配置
若您复用内置模型,对**yaml**配置文件进行了修改,如更改超参,重新配置数据后,可以直接使用paddlerec运行该yaml文件。
我们以dnn模型为例,在paddlerec代码目录下:
```bash
cd paddlerec
```
修改dnn模型的[超参配置](./models/rank/dnn/config.yaml),例如将迭代训练轮数从10轮修改为5轮:
```yaml
train:
# epochs: 10
epochs: 5
```
在Linux环境下,可以使用`vim`等文本编辑工具修改yaml文件:
```bash
vim ./models/rank/dnn/config.yaml
# 键入 i, 进入编辑模式
# 修改yaml文件配置
# 完成修改后,点击esc,退出编辑模式
# 键入 :wq 保存文件并退出
```
完成dnn模型`models/rank/dnn/config.yaml`的配置修改后,运行`dnn`模型:
```bash
# 使用自定配置进行训练
python -m paddlerec.run -m ./models/rank/dnn/config.yaml
```
### 分布式训练
分布式训练需要配置`config.yaml`,加入或修改`engine`选项为`cluster``local_cluster`,以进行分布式训练,或本地模拟分布式训练。
#### 本地模拟分布式训练
我们以dnn模型为例,在paddlerec代码目录下,修改dnn模型的`config.yaml`文件:
```yaml
train:
#engine: single
engine: local_cluster
```
然后启动paddlerec训练:
```bash
# 进行本地模拟分布式训练
python -m paddlerec.run -m ./models/rank/dnn/config.yaml
```
#### 集群分布式训练
我们以dnn模型为例,在paddlerec代码目录下,首先修改dnn模型`config.yaml`文件:
```yaml
train:
#engine: single
engine: cluster
```
再添加分布式启动配置文件`backend.yaml`,具体配置规则在[分布式训练](doc/distributed_train.md)教程中介绍。最后启动paddlerec训练:
```bash
# 配置好 mpi/k8s/paddlecloud集群环境后
python -m paddlerec.run -m ./models/rank/dnn/config.yaml -b backend.yaml
```
<h2 align="center">支持模型列表</h2>
| 方向 | 模型 | 单机CPU训练 | 单机GPU训练 | 分布式CPU训练 |
| :------: | :-----------------------------------------------------------------------: | :---------: | :---------: | :-----------: |
| 内容理解 | [Text-Classifcation](models/contentunderstanding/classification/model.py) | ✓ | x | ✓ |
| 内容理解 | [TagSpace](models/contentunderstanding/tagspace/model.py) | ✓ | x | ✓ |
| 召回 | [DSSM](models/match/dssm/model.py) | ✓ | x | ✓ |
| 召回 | [MultiView-Simnet](models/match/multiview-simnet/model.py) | ✓ | x | ✓ |
| 召回 | [TDM](models/treebased/tdm/model.py) | ✓ | x | ✓ |
| 召回 | [Word2Vec](models/recall/word2vec/model.py) | ✓ | x | ✓ |
| 召回 | [SSR](models/recall/ssr/model.py) | ✓ | ✓ | ✓ |
| 召回 | [Gru4Rec](models/recall/gru4rec/model.py) | ✓ | ✓ | ✓ |
| 召回 | [Youtube_dnn](models/recall/youtube_dnn/model.py) | ✓ | ✓ | ✓ |
| 召回 | [NCF](models/recall/ncf/model.py) | ✓ | ✓ | ✓ |
| 排序 | [Dnn](models/rank/dnn/model.py) | ✓ | x | ✓ |
| 排序 | [DeepFM](models/rank/deepfm/model.py) | ✓ | x | ✓ |
| 排序 | [xDeepFM](models/rank/xdeepfm/model.py) | ✓ | x | ✓ |
| 排序 | [DIN](models/rank/din/model.py) | ✓ | x | ✓ |
| 排序 | [Wide&Deep](models/rank/wide_deep/model.py) | ✓ | x | ✓ |
| 多任务 | [ESMM](models/multitask/esmm/model.py) | ✓ | ✓ | ✓ |
| 多任务 | [MMOE](models/multitask/mmoe/model.py) | ✓ | ✓ | ✓ |
| 多任务 | [ShareBottom](models/multitask/share-bottom/model.py) | ✓ | ✓ | ✓ |
<h2 align="center">文档</h2>
### 背景介绍
* [推荐系统介绍](doc/rec_background.md)
* [分布式深度学习介绍](doc/ps_background.md)
### 新手教程
* [环境要求](#环境要求)
* [安装命令](#安装命令)
* [快速开始](#启动内置模型的默认配置)
### 进阶教程
* [自定义数据集及Reader](doc/custom_dataset_reader.md)
* [分布式训练](doc/distributed_train.md)
### 开发者教程
* [PaddleRec设计文档](doc/design.md)
### 关于PaddleRec性能
* [Benchmark](doc/benchmark.md)
### FAQ
* [常见问题FAQ](doc/faq.md)
<h2 align="center">社区</h2>
### 反馈
如有意见、建议及使用中的BUG,欢迎在`GitHub Issue`提交
### 版本历史
- 2020.5.14 - PaddleRec v0.1
### 许可证书
本项目的发布受[Apache 2.0 license](LICENSE)许可认证。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###################################################
# Usage: submit.sh
# Description: run mpi submit client implement
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
#-----------------------------------------------------------------------------------------------------------------
#fun : package
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function package_hook() {
g_run_stage="package"
package
}
#-----------------------------------------------------------------------------------------------------------------
#fun : before hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function _before_submit() {
echo "before_submit"
before_submit_hook
}
#-----------------------------------------------------------------------------------------------------------------
#fun : after hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function _after_submit() {
echo "after_submit"
after_submit_hook
}
#-----------------------------------------------------------------------------------------------------------------
#fun : submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function _submit() {
g_run_stage="submit"
cd ${engine_temp_path}
paddlecloud job --ak ${engine_submit_ak} --sk ${engine_submit_sk} train --cluster-name ${engine_submit_cluster} \
--job-version ${engine_submit_version} \
--mpi-priority ${engine_submit_priority} \
--mpi-wall-time 300:59:00 \
--mpi-nodes ${engine_submit_nodes} --is-standalone 0 \
--mpi-memory 110Gi \
--job-name ${engine_submit_jobname} \
--start-cmd "${g_run_cmd}" \
--group-name ${engine_submit_group} \
--job-conf ${engine_submit_config} \
--files ${g_submitfiles} \
--json
cd -
}
function submit_hook() {
_before_submit
_submit
_after_submit
}
function main() {
source ${engine_submit_scrpit}
package_hook
submit_hook
}
main
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import unicode_literals
import copy
import os
import subprocess
from paddlerec.core.engine.engine import Engine
from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs
class ClusterEngine(Engine):
def __init_impl__(self):
abs_dir = os.path.dirname(os.path.abspath(__file__))
backend = envs.get_runtime_environ("engine_backend")
if backend == "PaddleCloud":
self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh")
else:
raise ValueError("{} can not be supported now".format(backend))
def start_worker_procs(self):
trainer = TrainerFactory.create(self.trainer)
trainer.run()
def start_master_procs(self):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
cmd = ("bash {}".format(self.submit_script)).split(" ")
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
proc.wait()
def run(self):
role = envs.get_runtime_environ("engine_role")
if role == "MASTER":
self.start_master_procs()
elif role == "WORKER":
self.start_worker_procs()
else:
raise ValueError("role {} error, must in MASTER/WORKER".format(role))
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
class Engine:
__metaclass__ = abc.ABCMeta
def __init__(self, envs, trainer):
self.envs = envs
self.trainer = trainer
self.__init_impl__()
def __init_impl__(self):
pass
@abc.abstractmethod
def run(self):
pass
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import unicode_literals
import copy
import os
import sys
import subprocess
from paddlerec.core.engine.engine import Engine
from paddlerec.core.utils import envs
class LocalClusterEngine(Engine):
def start_procs(self):
worker_num = self.envs["worker_num"]
server_num = self.envs["server_num"]
ports = [self.envs["start_port"]]
logs_dir = self.envs["log_dir"]
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env["CLUSTER_INSTANCE"] = "1"
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
log_fns = []
for i in range(server_num - 1):
while True:
new_port = envs.find_free_port()
if new_port not in ports:
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
user_endpoints_ips = [x.split(":")[0]
for x in user_endpoints.split(",")]
user_endpoints_port = [x.split(":")[1]
for x in user_endpoints.split(",")]
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
for i in range(server_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_PORT": user_endpoints_port[i],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": user_endpoints_ips[i]
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/server.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
procs.append(proc)
for i in range(worker_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i)
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/worker.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
procs.append(proc)
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < server_num:
continue
procs[i].wait()
if len(log_fns) > 0:
log_fns[i].close()
for i in range(server_num):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
print("all workers already completed, you can view logs under the `{}` directory".format(logs_dir),
file=sys.stderr)
def run(self):
self.start_procs()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import unicode_literals
import copy
import os
import sys
import subprocess
from paddlerec.core.engine.engine import Engine
class LocalMPIEngine(Engine):
def start_procs(self):
logs_dir = self.envs["log_dir"]
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
log_fns = []
factory = "paddlerec.core.factory"
cmd = "mpirun -npernode 2 -timestamp-output -tag-output".split(" ")
cmd.extend([sys.executable, "-u", "-m", factory, self.trainer])
if logs_dir is not None:
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/job.log" % logs_dir, "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
else:
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
procs.append(proc)
for i in range(len(procs)):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].wait()
print("all workers and parameter servers already completed", file=sys.stderr)
def run(self):
self.start_procs()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import yaml
from paddlerec.core.utils import envs
trainer_abs = os.path.join(os.path.dirname(
os.path.abspath(__file__)), "trainers")
trainers = {}
def trainer_registry():
trainers["SingleTrainer"] = os.path.join(
trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(
trainer_abs, "cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(
trainer_abs, "ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(
trainer_abs, "ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(
trainer_abs, "tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(
trainer_abs, "tdm_cluster_trainer.py")
trainer_registry()
class TrainerFactory(object):
def __init__(self):
pass
@staticmethod
def _build_trainer(yaml_path):
print(envs.pretty_print_envs(envs.get_global_envs()))
train_mode = envs.get_trainer()
trainer_abs = trainers.get(train_mode, None)
if trainer_abs is None:
if not os.path.isfile(train_mode):
raise IOError(
"trainer {} can not be recognized".format(train_mode))
trainer_abs = train_mode
train_mode = "UserDefineTrainer"
trainer_class = envs.lazy_instance_by_fliename(trainer_abs, train_mode)
trainer = trainer_class(yaml_path)
return trainer
@staticmethod
def create(config):
_config = None
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("paddlerec's config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
trainer = TrainerFactory._build_trainer(config)
return trainer
# server num, worker num
if __name__ == "__main__":
if len(sys.argv) != 2:
raise ValueError("need a yaml file path argv")
trainer = TrainerFactory.create(sys.argv[1])
trainer.run()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
class Layer(object):
"""R
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
"""R
"""
pass
@abc.abstractmethod
def generate(self, param):
"""R
"""
pass
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
class Metric(object):
"""R
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
""" """
pass
@abc.abstractmethod
def clear(self, scope, params):
"""
clear current value
Args:
scope: value container
params: extend varilable for clear
"""
pass
@abc.abstractmethod
def calculate(self, scope, params):
"""
calculate result
Args:
scope: value container
params: extend varilable for clear
"""
pass
@abc.abstractmethod
def get_result(self):
"""
Return:
result(dict) : calculate result
"""
pass
@abc.abstractmethod
def __str__(self):
"""
Return:
result(string) : calculate result with string format, for output
"""
pass
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
class AUCMetric(Metric):
"""
Metric For Paddle Model
"""
def __init__(self, config, fleet):
""" """
self.config = config
self.fleet = fleet
def clear(self, scope, params):
"""
Clear current metric value, usually set to zero
Args:
scope : paddle runtime var container
params(dict) :
label : a group name for metric
metric_dict : current metric_items in group
Return:
None
"""
self._label = params['label']
self._metric_dict = params['metric_dict']
self._result = {}
place = fluid.CPUPlace()
for metric_name in self._metric_dict:
metric_config = self._metric_dict[metric_name]
if scope.find_var(metric_config['var'].name) is None:
continue
metric_var = scope.var(metric_config['var'].name).get_tensor()
data_type = 'float32'
if 'data_type' in metric_config:
data_type = metric_config['data_type']
data_array = np.zeros(metric_var._get_dims()).astype(data_type)
metric_var.set(data_array, place)
def get_metric(self, scope, metric_name):
"""
reduce metric named metric_name from all worker
Return:
metric reduce result
"""
metric = np.array(scope.find_var(metric_name).get_tensor())
old_metric_shape = np.array(metric.shape)
metric = metric.reshape(-1)
global_metric = np.copy(metric) * 0
self.fleet._role_maker._node_type_comm.Allreduce(metric, global_metric)
global_metric = global_metric.reshape(old_metric_shape)
return global_metric[0]
def get_global_metrics(self, scope, metric_dict):
"""
reduce all metric in metric_dict from all worker
Return:
dict : {matric_name : metric_result}
"""
self.fleet._role_maker._barrier_worker()
result = {}
for metric_name in metric_dict:
metric_item = metric_dict[metric_name]
if scope.find_var(metric_item['var'].name) is None:
result[metric_name] = None
continue
result[metric_name] = self.get_metric(scope, metric_item['var'].name)
return result
def calculate_auc(self, global_pos, global_neg):
"""R
"""
num_bucket = len(global_pos)
area = 0.0
pos = 0.0
neg = 0.0
new_pos = 0.0
new_neg = 0.0
total_ins_num = 0
for i in range(num_bucket):
index = num_bucket - 1 - i
new_pos = pos + global_pos[index]
total_ins_num += global_pos[index]
new_neg = neg + global_neg[index]
total_ins_num += global_neg[index]
area += (new_neg - neg) * (pos + new_pos) / 2
pos = new_pos
neg = new_neg
auc_value = None
if pos * neg == 0 or total_ins_num == 0:
auc_value = 0.5
else:
auc_value = area / (pos * neg)
return auc_value
def calculate_bucket_error(self, global_pos, global_neg):
"""R
"""
num_bucket = len(global_pos)
last_ctr = -1.0
impression_sum = 0.0
ctr_sum = 0.0
click_sum = 0.0
error_sum = 0.0
error_count = 0.0
click = 0.0
show = 0.0
ctr = 0.0
adjust_ctr = 0.0
relative_error = 0.0
actual_ctr = 0.0
relative_ctr_error = 0.0
k_max_span = 0.01
k_relative_error_bound = 0.05
for i in range(num_bucket):
click = global_pos[i]
show = global_pos[i] + global_neg[i]
ctr = float(i) / num_bucket
if abs(ctr - last_ctr) > k_max_span:
last_ctr = ctr
impression_sum = 0.0
ctr_sum = 0.0
click_sum = 0.0
impression_sum += show
ctr_sum += ctr * show
click_sum += click
if impression_sum == 0:
continue
adjust_ctr = ctr_sum / impression_sum
if adjust_ctr == 0:
continue
relative_error = \
math.sqrt((1 - adjust_ctr) / (adjust_ctr * impression_sum))
if relative_error < k_relative_error_bound:
actual_ctr = click_sum / impression_sum
relative_ctr_error = abs(actual_ctr / adjust_ctr - 1)
error_sum += relative_ctr_error * impression_sum
error_count += impression_sum
last_ctr = -1
bucket_error = error_sum / error_count if error_count > 0 else 0.0
return bucket_error
def calculate(self, scope, params):
""" """
self._label = params['label']
self._metric_dict = params['metric_dict']
self.fleet._role_maker._barrier_worker()
result = self.get_global_metrics(scope, self._metric_dict)
if result['total_ins_num'] == 0:
self._result = result
self._result['auc'] = 0
self._result['bucket_error'] = 0
self._result['actual_ctr'] = 0
self._result['predict_ctr'] = 0
self._result['mae'] = 0
self._result['rmse'] = 0
self._result['copc'] = 0
self._result['mean_q'] = 0
return self._result
if 'stat_pos' in result and 'stat_neg' in result:
result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
if 'pos_ins_num' in result:
result['actual_ctr'] = result['pos_ins_num'] / result['total_ins_num']
if 'abserr' in result:
result['mae'] = result['abserr'] / result['total_ins_num']
if 'sqrerr' in result:
result['rmse'] = math.sqrt(result['sqrerr'] / result['total_ins_num'])
if 'prob' in result:
result['predict_ctr'] = result['prob'] / result['total_ins_num']
if abs(result['predict_ctr']) > 1e-6:
result['copc'] = result['actual_ctr'] / result['predict_ctr']
if 'q' in result:
result['mean_q'] = result['q'] / result['total_ins_num']
self._result = result
return result
def get_result(self):
""" """
return self._result
def __str__(self):
""" """
result = self.get_result()
result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f " \
"Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \
(self._label, result['auc'], result['bucket_error'], result['mae'], result['rmse'],
result['actual_ctr'],
result['predict_ctr'], result['copc'], result['mean_q'], result['total_ins_num'])
return result_str
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import paddle.fluid as fluid
from paddlerec.core.utils import envs
class Model(object):
"""R
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
"""R
"""
self._cost = None
self._metrics = {}
self._data_var = []
self._infer_data_var = []
self._infer_results = {}
self._data_loader = None
self._infer_data_loader = None
self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform()
def _init_slots(self):
sparse_slots = envs.get_global_env("sparse_slots", None, "train.reader")
dense_slots = envs.get_global_env("dense_slots", None, "train.reader")
if sparse_slots is not None or dense_slots is not None:
sparse_slots = sparse_slots.strip().split(" ")
dense_slots = dense_slots.strip().split(" ")
dense_slots_shape = [[int(j) for j in i.split(":")[1].strip("[]").split(",")] for i in dense_slots]
dense_slots = [i.split(":")[0] for i in dense_slots]
self._dense_data_var = []
for i in range(len(dense_slots)):
l = fluid.layers.data(name=dense_slots[i], shape=dense_slots_shape[i], dtype="float32")
self._data_var.append(l)
self._dense_data_var.append(l)
self._sparse_data_var = []
for name in sparse_slots:
l = fluid.layers.data(name=name, shape=[1], lod_level=1, dtype="int64")
self._data_var.append(l)
self._sparse_data_var.append(l)
dataset_class = envs.get_global_env("dataset_class", None, "train.reader")
if dataset_class == "DataLoader":
self._init_dataloader()
def _init_dataloader(self):
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
def get_inputs(self):
return self._data_var
def get_infer_inputs(self):
return self._infer_data_var
def get_infer_results(self):
return self._infer_results
def get_avg_cost(self):
"""R
"""
return self._cost
def get_metrics(self):
"""R
"""
return self._metrics
def get_fetch_period(self):
return self._fetch_interval
def _build_optimizer(self, name, lr):
name = name.upper()
optimizers = ["SGD", "ADAM", "ADAGRAD"]
if name not in optimizers:
raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
if name == "SGD":
reg = envs.get_global_env(
"hyper_parameters.reg", 0.0001, self._namespace)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
elif name == "ADAGRAD":
optimizer_i = fluid.optimizer.Adagrad(lr)
else:
raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
return optimizer_i
def optimizer(self):
learning_rate = envs.get_global_env(
"hyper_parameters.learning_rate", None, self._namespace)
optimizer = envs.get_global_env(
"hyper_parameters.optimizer", None, self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate)
@abc.abstractmethod
def train_net(self):
"""R
"""
pass
@abc.abstractmethod
def infer_net(self):
pass
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
import yaml
from paddlerec.core.model import Model
from paddlerec.core.utils import table
def create(config):
"""
Create a model instance by config
Args:
config(dict) : desc model type and net
Return:
Model Instance
"""
model = None
if config['mode'] == 'fluid':
model = YamlModel(config)
model.train_net()
return model
class YamlModel(Model):
"""R
"""
def __init__(self, config):
"""R
"""
Model.__init__(self, config)
self._config = config
self._name = config['name']
f = open(config['layer_file'], 'r')
self._build_nodes = yaml.safe_load(f.read())
self._build_phase = ['input', 'param', 'summary', 'layer']
self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}}
self._inference_meta = {'dependency': {}, 'params': {}}
def train_net(self):
"""R
build a fluid model with config
Return:
modle_instance(dict)
train_program
startup_program
inference_param : all params name list
table: table-meta to ps-server
"""
for layer in self._build_nodes['layer']:
self._build_param['inner_layer'][layer['name']] = layer
self._build_param['table'] = {}
self._build_param['model']['train_program'] = fluid.Program()
self._build_param['model']['startup_program'] = fluid.Program()
with fluid.program_guard(self._build_param['model']['train_program'], \
self._build_param['model']['startup_program']):
with fluid.unique_name.guard():
for phase in self._build_phase:
if self._build_nodes[phase] is None:
continue
for node in self._build_nodes[phase]:
exec("""layer=layer.{}(node)""".format(node['class']))
layer_output, extend_output = layer.generate(self._config['mode'], self._build_param)
self._build_param['layer'][node['name']] = layer_output
self._build_param['layer_extend'][node['name']] = extend_output
if extend_output is None:
continue
if 'loss' in extend_output:
if self._cost is None:
self._cost = extend_output['loss']
else:
self._cost += extend_output['loss']
if 'data_var' in extend_output:
self._data_var += extend_output['data_var']
if 'metric_label' in extend_output and extend_output['metric_label'] is not None:
self._metrics[extend_output['metric_label']] = extend_output['metric_dict']
if 'inference_param' in extend_output:
inference_param = extend_output['inference_param']
param_name = inference_param['name']
if param_name not in self._build_param['table']:
self._build_param['table'][param_name] = {'params': []}
table_meta = table.TableMeta.alloc_new_table(inference_param['table_id'])
self._build_param['table'][param_name]['_meta'] = table_meta
self._build_param['table'][param_name]['params'] += inference_param['params']
pass
@classmethod
def build_optimizer(self, params):
"""R
"""
optimizer_conf = params['optimizer_conf']
strategy = None
if 'strategy' in optimizer_conf:
strategy = optimizer_conf['strategy']
stat_var_names = []
metrics = params['metrics']
for name in metrics:
model_metrics = metrics[name]
stat_var_names += [model_metrics[metric]['var'].name for metric in model_metrics]
strategy['stat_var_names'] = list(set(stat_var_names))
optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \
'(learning_rate=' + str(optimizer_conf['learning_rate']) + ')'
exec(optimizer_generator)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
return optimizer
def dump_model_program(self, path):
"""R
"""
with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout:
print >> fout, self._build_param['model']['train_program']
with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout:
print >> fout, self._build_param['model']['startup_program']
pass
def shrink(self, params):
"""R
"""
scope = params['scope']
decay = params['decay']
for param_table in self._build_param['table']:
table_id = self._build_param['table'][param_table]['_meta']._table_id
fleet.shrink_dense_table(decay, scope=scope, table_id=table_id)
def dump_inference_program(self, inference_layer, path):
"""R
"""
pass
def dump_inference_param(self, params):
"""R
"""
scope = params['scope']
executor = params['executor']
program = self._build_param['model']['train_program']
for table_name, table in self._build_param['table'].items():
fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params'])
for infernce_item in params['inference_list']:
params_name_list = self.inference_params(infernce_item['layer_name'])
params_var_list = [program.global_block().var(i) for i in params_name_list]
params_file_name = infernce_item['save_file_name']
with fluid.scope_guard(scope):
if params['save_combine']:
fluid.io.save_vars(executor, "./", \
program, vars=params_var_list, filename=params_file_name)
else:
fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list)
def inference_params(self, inference_layer):
"""
get params name for inference_layer
Args:
inference_layer(str): layer for inference
Return:
params(list): params name list that for inference layer
"""
layer = inference_layer
if layer in self._inference_meta['params']:
return self._inference_meta['params'][layer]
self._inference_meta['params'][layer] = []
self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer)
for node in self._build_nodes['layer']:
if node['name'] not in self._inference_meta['dependency'][layer]:
continue
if 'inference_param' in self._build_param['layer_extend'][node['name']]:
self._inference_meta['params'][layer] += \
self._build_param['layer_extend'][node['name']]['inference_param']['params']
return self._inference_meta['params'][layer]
def get_dependency(self, layer_graph, dest_layer):
"""
get model of dest_layer depends on
Args:
layer_graph(dict) : all model in graph
Return:
depend_layers(list) : sub-graph model for calculate dest_layer
"""
dependency_list = []
if dest_layer in layer_graph:
dependencys = copy.deepcopy(layer_graph[dest_layer]['input'])
dependency_list = copy.deepcopy(dependencys)
for dependency in dependencys:
dependency_list = dependency_list + self.get_dependency(layer_graph, dependency)
return list(set(dependency_list))
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
from paddlerec.core.layer import Layer
class EmbeddingFuseLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._cvm = config['cvm']
self._name = config['name']
self._slots = [str(slot) for slot in config['slots']]
self._mf_dim = config['mf_dim']
self._backward = config['backward']
self._emb_dim = self._mf_dim + 3 # append show ctr lr
self._emb_layers = []
def generate(self, param):
"""R
"""
show_clk = fluid.layers.concat(
[param['layer']['show'], param['layer']['click']], axis=1)
show_clk.stop_gradient = True
data_var = []
for slot in self._slots:
l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1)
data_var.append(l)
emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \
is_sparse=True, is_distributed=True,
param_attr=fluid.ParamAttr(name="embedding"))
emb = fluid.layers.sequence_pool(input=emb, pool_type='sum')
emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm)
self._emb_layers.append(emb)
output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name)
return output, {'data_var': data_var}
class LabelInputLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._dim = config.get('dim', 1)
self._data_type = config.get('data_type', "int64")
self._label_idx = config['label_idx']
def generate(self, param):
"""R
"""
label = fluid.layers.data(name=self._name, shape=[-1, self._dim], \
dtype=self._data_type, lod_level=0, append_batch_size=False)
cast_label = fluid.layers.cast(label, dtype='float32')
cast_label.stop_gradient = True
return cast_label, {'data_var': [label]}
class TagInputLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._tag = config['tag']
self._dim = config.get('dim', 1)
self._data_type = config['data_type']
def generate(self, param):
"""R
"""
output = fluid.layers.data(name=self._name, shape=[-1, self._dim], \
dtype=self._data_type, lod_level=0, append_batch_size=False, stop_gradient=True)
return output, {'data_var': [output]}
class ParamLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._coln = config['coln']
self._table_id = config.get('table_id', -1)
self._init_range = config.get('init_range', 1)
self._data_type = config.get('data_type', 'float32')
self._config = config
def generate(self, param):
"""R
"""
return self._config, {'inference_param': {'name': 'param', 'params': [], 'table_id': self._table_id}}
class SummaryLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._table_id = config.get('table_id', -1)
self._data_type = config.get('data_type', 'float32')
self._config = config
def generate(self, param):
"""R
"""
return self._config, {'inference_param': {'name': 'summary', 'params': [], 'table_id': self._table_id}}
class NormalizationLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._input = config['input']
self._summary = config['summary']
self._table_id = config.get('table_id', -1)
def generate(self, param):
"""R
"""
input_layer = param['layer'][self._input[0]]
summary_layer = param['layer'][self._summary]
if len(self._input) > 0:
input_list = [param['layer'][i] for i in self._input]
input_layer = fluid.layers.concat(input=input_list, axis=1)
bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={
"batch_size": 1e4, "batch_sum_default": 0.0, "batch_square": 1e4})
inference_param = [self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum']
return bn, {'inference_param': {'name': 'summary', \
'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}}
class FCLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._param = config['param']
self._input = config['input']
self._bias = config.get('bias', True)
self._act_func = config.get('act_func', None)
def generate(self, param):
"""R
"""
param_layer = param['layer'][self._param]
input_layer = param['layer'][self._input[0]]
if len(self._input) > 0:
input_list = [param['layer'][i] for i in self._input]
input_layer = fluid.layers.concat(input=input_list, axis=1)
input_coln = input_layer.shape[1]
scale = param_layer['init_range'] / (input_coln ** 0.5)
bias = None
if self._bias:
bias = fluid.ParamAttr(learning_rate=1.0,
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale))
fc = fluid.layers.fc(
name=self._name,
input=input_layer,
size=param_layer['coln'],
act=self._act_func,
param_attr= \
fluid.ParamAttr(learning_rate=1.0, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)),
bias_attr=bias)
inference_param = [self._name + '.w_0', self._name + '.b_0']
return fc, {'inference_param': {'name': 'param', 'params': inference_param, \
'table_id': param_layer.get('table_id', -1)}}
class LogLossLayer(Layer):
"""R
"""
def __init__(self, config):
"""R
"""
self._name = config['name']
self._label = config['label']
self._input = config['input']
self._weight = config.get('weight', None)
self._metric_label = config.get('metric_label', None)
self._bound = config.get('bound', [-15.0, 15.0])
self._extend_output = {
'metric_label': self._metric_label,
'metric_dict': {
'auc': {'var': None},
'batch_auc': {'var': None},
'stat_pos': {'var': None, 'data_type': 'int64'},
'stat_neg': {'var': None, 'data_type': 'int64'},
'batch_stat_pos': {'var': None, 'data_type': 'int64'},
'batch_stat_neg': {'var': None, 'data_type': 'int64'},
'pos_ins_num': {'var': None},
'abserr': {'var': None},
'sqrerr': {'var': None},
'prob': {'var': None},
'total_ins_num': {'var': None},
'q': {'var': None}
}
}
def generate(self, param):
"""R
"""
input_layer = param['layer'][self._input[0]]
label_layer = param['layer'][self._label]
output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name=self._name)
norm = fluid.layers.sigmoid(output, name=self._name)
output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32'))
if self._weight:
weight_layer = param['layer'][self._weight]
output = fluid.layers.elementwise_mul(output, weight_layer)
output = fluid.layers.mean(x=output)
self._extend_output['loss'] = output
# For AUC Metric
metric = self._extend_output['metric_dict']
binary_predict = fluid.layers.concat(
input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1)
metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \
metric['batch_stat_neg']['var'], metric['stat_pos']['var'],
metric['stat_neg']['var']] = \
fluid.layers.auc(input=binary_predict, label=fluid.layers.cast(x=label_layer, dtype='int64'), \
curve='ROC', num_thresholds=32)
metric['sqrerr']['var'], metric['abserr']['var'], metric['prob']['var'], metric['q']['var'], \
metric['pos_ins_num']['var'], metric['total_ins_num']['var'] = \
fluid.contrib.layers.ctr_metric_bundle(norm, fluid.layers.cast(x=label_layer, dtype='float32'))
return norm, self._extend_output
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import sys
import abc
import os
import paddle.fluid.incubate.data_generator as dg
import yaml
from paddlerec.core.utils import envs
class Reader(dg.MultiSlotDataGenerator):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod
def init(self):
pass
@abc.abstractmethod
def generate_sample(self, line):
pass
class SlotReader(dg.MultiSlotDataGenerator):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
def init(self, sparse_slots, dense_slots, padding=0):
from operator import mul
self.sparse_slots = sparse_slots.strip().split(" ")
self.dense_slots = dense_slots.strip().split(" ")
self.dense_slots_shape = [reduce(mul, [int(j) for j in i.split(":")[1].strip("[]").split(",")]) for i in self.dense_slots]
self.dense_slots = [i.split(":")[0] for i in self.dense_slots]
self.slots = self.dense_slots + self.sparse_slots
self.slot2index = {}
self.visit = {}
for i in range(len(self.slots)):
self.slot2index[self.slots[i]] = i
self.visit[self.slots[i]] = False
self.padding = padding
def generate_sample(self, l):
def reader():
line = l.strip().split(" ")
output = [(i, []) for i in self.slots]
for i in line:
slot_feasign = i.split(":")
slot = slot_feasign[0]
if slot not in self.slots:
continue
if slot in self.sparse_slots:
feasign = int(slot_feasign[1])
else:
feasign = float(slot_feasign[1])
output[self.slot2index[slot]][1].append(feasign)
self.visit[slot] = True
for i in self.visit:
slot = i
if not self.visit[slot]:
if i in self.dense_slots:
output[self.slot2index[i]][1].extend([self.padding] * self.dense_slots_shape[self.slot2index[i]])
else:
output[self.slot2index[i]][1].extend([self.padding])
else:
self.visit[slot] = False
yield output
return reader
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import os
import time
import sys
import yaml
from paddle import fluid
from paddlerec.core.utils import envs
class Trainer(object):
"""R
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config=None):
self._status_processor = {}
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self._exector_context = {}
self._context = {'status': 'uninit', 'is_exit': False}
self._config_yaml = config
with open(config, 'r') as rb:
self._config = yaml.load(rb.read(), Loader=yaml.FullLoader)
def regist_context_processor(self, status_name, processor):
"""
regist a processor for specify status
"""
self._status_processor[status_name] = processor
def context_process(self, context):
"""
select a processor to deal specify context
Args:
context : context with status
Return:
None : run a processor for this status
"""
if context['status'] in self._status_processor:
self._status_processor[context['status']](context)
else:
self.other_status_processor(context)
def other_status_processor(self, context):
"""
if no processor match context.status, use defalut processor
Return:
None, just sleep in base
"""
print('unknow context_status:%s, do nothing' % context['status'])
time.sleep(60)
def reload_train_context(self):
"""
context maybe update timely, reload for update
"""
pass
def run(self):
"""
keep running by statu context.
"""
while True:
self.reload_train_context()
self.context_process(self._context)
if self._context['is_exit']:
break
def user_define_engine(engine_yaml):
with open(engine_yaml, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
assert _config is not None
envs.set_runtime_environs(_config)
train_location = envs.get_global_env("engine.file")
train_dirname = os.path.dirname(train_location)
base_name = os.path.splitext(os.path.basename(train_location))[0]
sys.path.append(train_dirname)
trainer_class = envs.lazy_instance_by_fliename(
base_name, "UserDefineTraining")
return trainer_class
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
trainer implement.
↗ (single/cluster) CtrTrainer
Trainer
↗ (for single training) SingleTrainer/TDMSingleTrainer
↘ TranspilerTrainer → (for cluster training) ClusterTrainer/TDMClusterTrainer
↘ (for online learning training) OnlineLearningTrainer
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import os
import time
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from paddlerec.core.utils import envs
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
class ClusterTrainer(TranspileTrainer):
def processor_register(self):
role = PaddleCloudRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor(
'train_pass', self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def build_strategy(self):
mode = envs.get_runtime_environ("train.trainer.strategy")
assert mode in ["async", "geo", "sync", "half_async"]
strategy = None
if mode == "async":
strategy = StrategyFactory.create_async_strategy()
elif mode == "geo":
push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
strategy = StrategyFactory.create_geo_strategy(push_num)
elif mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
assert strategy is not None
self.strategy = strategy
return strategy
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer_name = envs.get_global_env(
"hyper_parameters.optimizer", None, "train.model")
if optimizer_name not in ["", "sgd", "SGD", "Sgd"]:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['is_exit'] = True
def startup(self, context):
self._exe.run(fleet.startup_program)
context['status'] = 'train_pass'
def dataloader_train(self, context):
fleet.init_worker()
reader = self._get_dataloader()
epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel(
loss_name=self.model.get_avg_cost().name,
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
for epoch in range(epochs):
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(
program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
self.save(epoch, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'infer_pass'
def dataset_train(self, context):
fleet.init_worker()
dataset = self._get_dataset()
ins = self._get_dataset_ins()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
begin_time = time.time()
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time-begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times))
self.save(i, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'infer_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer
class CtrTrainer(Trainer):
"""R
"""
def __init__(self, config):
"""R
"""
Trainer.__init__(self, config)
self.global_config = config
self._metrics = {}
self.processor_register()
def processor_register(self):
role = MPISymetricRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('train_pass', self.train)
self.regist_context_processor('terminal_pass', self.terminal)
def _get_dataset(self):
namespace = "train.reader"
inputs = self.model.get_inputs()
threads = envs.get_global_env("train.threads", None)
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "TRAIN", self._config_yaml)
train_data_path = envs.get_global_env("train_data_path", None, namespace)
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
return dataset
def instance(self, context):
models = envs.get_global_env("train.model.models")
model_class = envs.lazy_instance_by_fliename(models, "Model")
self.model = model_class(None)
context['status'] = 'init_pass'
def init(self, context):
"""R
"""
self.model.train_net()
optimizer = self.model.optimizer()
optimizer = fleet.distributed_optimizer(optimizer, strategy={"use_cvm": False})
optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'train_pass'
def server(self, context):
fleet.run_server()
fleet.stop_worker()
context['is_exit'] = True
def train(self, context):
self._exe.run(fluid.default_startup_program())
fleet.init_worker()
dataset = self._get_dataset()
shuf = np.array([fleet.worker_index()])
gs = shuf * 0
fleet._role_maker._node_type_comm.Allreduce(shuf, gs)
print("trainer id: {}, trainers: {}, gs: {}".format(fleet.worker_index(), fleet.worker_num(), gs))
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
context['status'] = 'terminal_pass'
fleet.stop_worker()
def terminal(self, context):
print("terminal ended.")
context['is_exit'] = True
此差异已折叠。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import datetime
import os
import time
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from paddlerec.core.utils import envs
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
class OnlineLearningTrainer(TranspileTrainer):
def processor_register(self):
role = PaddleCloudRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor(
'train_pass', self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def build_strategy(self):
mode = envs.get_runtime_environ("train.trainer.strategy")
assert mode in ["async", "geo", "sync", "half_async"]
strategy = None
if mode == "async":
strategy = StrategyFactory.create_async_strategy()
elif mode == "geo":
push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
strategy = StrategyFactory.create_geo_strategy(push_num)
elif mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
assert strategy is not None
self.strategy = strategy
return strategy
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['is_exit'] = True
def startup(self, context):
self._exe.run(fleet.startup_program)
context['status'] = 'train_pass'
def dataloader_train(self, context):
print("online learning can only support LINUX only")
context['status'] = 'terminal_pass'
def _get_dataset(self, state="TRAIN", hour=None):
if state == "TRAIN":
inputs = self.model.get_inputs()
namespace = "train.reader"
train_data_path = envs.get_global_env(
"train_data_path", None, namespace)
else:
inputs = self.model.get_infer_inputs()
namespace = "evaluate.reader"
train_data_path = envs.get_global_env(
"test_data_path", None, namespace)
threads = int(envs.get_runtime_environ("train.trainer.threads"))
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
pipe_cmd = "python {} {} {} {}".format(
reader, reader_class, state, self._config_yaml)
if train_data_path.startswith("paddlerec::"):
package_base = envs.get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
train_data_path = os.path.join(
package_base, train_data_path.split("::")[1])
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
if hour is not None:
train_data_path = os.path.join(train_data_path, hour)
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
self.files = file_list
dataset.set_filelist(self.files)
return dataset
def dataset_train(self, context):
fleet.init_worker()
days = envs.get_global_env("train.days")
begin_day = datetime.datetime.strptime("begin_day_d", '%Y%m%d')
for day in range(days):
for hour in range(24):
day = begin_day + datetime.timedelta(days=day, hours=hour)
day_s = day.strftime('%Y%m%d/%H')
i = day.strftime('%Y%m%d_%H')
dataset = self._get_dataset(hour=day_s)
ins = self._get_dataset_ins()
begin_time = time.time()
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time-begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times))
self.save(i, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'infer_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleTrainer(TranspileTrainer):
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None,
"train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer.minimize((self.model.get_avg_cost()))
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
evaluate_only = envs.get_global_env(
'evaluate_only', False, namespace='evaluate')
if evaluate_only:
context['status'] = 'infer_pass'
else:
context['status'] = 'startup_pass'
def startup(self, context):
self._exe.run(fluid.default_startup_program())
context['status'] = 'train_pass'
def dataloader_train(self, context):
reader = self._get_dataloader("TRAIN")
epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram(
fluid.default_main_program()).with_data_parallel(
loss_name=self.model.get_avg_cost().name)
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
for epoch in range(epochs):
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(
program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
self.save(epoch, "train", is_fleet=False)
context['status'] = 'infer_pass'
def dataset_train(self, context):
dataset = self._get_dataset("TRAIN")
ins = self._get_dataset_ins()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
begin_time = time.time()
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins / times))
self.save(i, "train", is_fleet=False)
context['status'] = 'infer_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddlerec.core.utils import envs
from paddlerec.core.trainers.cluster_trainer import ClusterTrainer
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"]
class TDMClusterTrainer(ClusterTrainer):
def server(self, context):
namespace = "train.startup"
init_model_path = envs.get_global_env(
"cluster.init_model_path", "", namespace)
assert init_model_path != "", "Cluster train must has init_model for TDM"
fleet.init_server(init_model_path)
logger.info("TDM: load model from {}".format(init_model_path))
fleet.run_server()
context['is_exit'] = True
def startup(self, context):
self._exe.run(fleet.startup_program)
namespace = "train.startup"
load_tree = envs.get_global_env(
"tree.load_tree", True, namespace)
self.tree_layer_path = envs.get_global_env(
"tree.tree_layer_path", "", namespace)
self.tree_travel_path = envs.get_global_env(
"tree.tree_travel_path", "", namespace)
self.tree_info_path = envs.get_global_env(
"tree.tree_info_path", "", namespace)
save_init_model = envs.get_global_env(
"cluster.save_init_model", False, namespace)
init_model_path = envs.get_global_env(
"cluster.init_model_path", "", namespace)
if load_tree:
# covert tree to tensor, set it into Fluid's variable.
for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
param_array = self._tdm_prepare(param_name)
param_t.set(param_array.astype('int32'), self._place)
if save_init_model:
logger.info("Begin Save Init model.")
fluid.io.save_persistables(
executor=self._exe, dirname=init_model_path)
logger.info("End Save Init model.")
context['status'] = 'train_pass'
def _tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self._tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self._tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self._tdm_info_prepare()
return info_array
else:
raise " {} is not a special tdm param name".format(param_name)
def _tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(
travel_array.shape[0]))
return travel_array
def _tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def _tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.trainers.single_trainer import SingleTrainer
from paddlerec.core.utils import envs
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer",
"TDM_Tree_Info", "TDM_Tree_Emb"]
class TDMSingleTrainer(SingleTrainer):
def startup(self, context):
namespace = "train.startup"
load_persistables = envs.get_global_env(
"single.load_persistables", False, namespace)
persistables_model_path = envs.get_global_env(
"single.persistables_model_path", "", namespace)
load_tree = envs.get_global_env(
"tree.load_tree", False, namespace)
self.tree_layer_path = envs.get_global_env(
"tree.tree_layer_path", "", namespace)
self.tree_travel_path = envs.get_global_env(
"tree.tree_travel_path", "", namespace)
self.tree_info_path = envs.get_global_env(
"tree.tree_info_path", "", namespace)
self.tree_emb_path = envs.get_global_env(
"tree.tree_emb_path", "", namespace)
save_init_model = envs.get_global_env(
"single.save_init_model", False, namespace)
init_model_path = envs.get_global_env(
"single.init_model_path", "", namespace)
self._exe.run(fluid.default_startup_program())
if load_persistables:
# 从paddle二进制模型加载参数
fluid.io.load_persistables(
executor=self._exe,
dirname=persistables_model_path,
main_program=fluid.default_main_program())
logger.info("Load persistables from \"{}\"".format(
persistables_model_path))
if load_tree:
# covert tree to tensor, set it into Fluid's variable.
for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
param_array = self._tdm_prepare(param_name)
if param_name == 'TDM_Tree_Emb':
param_t.set(param_array.astype('float32'), self._place)
else:
param_t.set(param_array.astype('int32'), self._place)
if save_init_model:
logger.info("Begin Save Init model.")
fluid.io.save_persistables(
executor=self._exe, dirname=init_model_path)
logger.info("End Save Init model.")
context['status'] = 'train_pass'
def _tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self._tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self._tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self._tdm_info_prepare()
return info_array
elif param_name == "TDM_Tree_Emb":
emb_array = self._tdm_emb_prepare()
return emb_array
else:
raise " {} is not a special tdm param name".format(param_name)
def _tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(
travel_array.shape[0]))
return travel_array
def _tdm_emb_prepare(self):
"""load tdm tree param from npy/list file"""
emb_array = np.load(self.tree_emb_path)
logger.info("TDM Tree node nums from emb: {}".format(
emb_array.shape[0]))
return emb_array
def _tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def _tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with DistributeTranspiler
"""
import os
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddlerec.core.trainer import Trainer
from paddlerec.core.utils import envs
from paddlerec.core.utils import dataloader_instance
from paddlerec.core.reader import SlotReader
class TranspileTrainer(Trainer):
def __init__(self, config=None):
Trainer.__init__(self, config)
device = envs.get_global_env("train.device", "cpu")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
self._exe = fluid.Executor(self._place)
self.processor_register()
self.model = None
self.inference_models = []
self.increment_models = []
def processor_register(self):
print("Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first")
def _get_dataloader(self, state="TRAIN"):
if state == "TRAIN":
dataloader = self.model._data_loader
namespace = "train.reader"
class_name = "TrainReader"
else:
dataloader = self.model._infer_data_loader
namespace = "evaluate.reader"
class_name = "EvaluateReader"
sparse_slots = envs.get_global_env("sparse_slots", None, namespace)
dense_slots = envs.get_global_env("dense_slots", None, namespace)
batch_size = envs.get_global_env("batch_size", None, namespace)
print("batch_size: {}".format(batch_size))
if sparse_slots is None and dense_slots is None:
reader_class = envs.get_global_env("class", None, namespace)
reader = dataloader_instance.dataloader(
reader_class, state, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class, class_name)
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader("", state, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
debug_mode = envs.get_global_env("reader_debug_mode", False, namespace)
if debug_mode:
print("--- DataLoader Debug Mode Begin , show pre 10 data ---")
for idx, line in enumerate(reader()):
print(line)
if idx >= 9:
break
print("--- DataLoader Debug Mode End , show pre 10 data ---")
exit(0)
return dataloader
def _get_dataset_ins(self):
count = 0
for f in self.files:
for _, _ in enumerate(open(f, 'r')):
count += 1
return count
def _get_dataset(self, state="TRAIN"):
if state == "TRAIN":
inputs = self.model.get_inputs()
namespace = "train.reader"
train_data_path = envs.get_global_env(
"train_data_path", None, namespace)
else:
inputs = self.model.get_infer_inputs()
namespace = "evaluate.reader"
train_data_path = envs.get_global_env(
"test_data_path", None, namespace)
sparse_slots = envs.get_global_env("sparse_slots", None, namespace)
dense_slots = envs.get_global_env("dense_slots", None, namespace)
threads = int(envs.get_runtime_environ("train.trainer.threads"))
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
if sparse_slots is None and dense_slots is None:
pipe_cmd = "python {} {} {} {}".format(
reader, reader_class, state, self._config_yaml)
else:
padding = envs.get_global_env("padding", 0, namespace)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, namespace, \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
if train_data_path.startswith("paddlerec::"):
package_base = envs.get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
train_data_path = os.path.join(
package_base, train_data_path.split("::")[1])
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
self.files = file_list
dataset.set_filelist(self.files)
debug_mode = envs.get_global_env("reader_debug_mode", False, namespace)
if debug_mode:
print(
"--- Dataset Debug Mode Begin , show pre 10 data of {}---".format(file_list[0]))
os.system("cat {} | {} | head -10".format(file_list[0], pipe_cmd))
print(
"--- Dataset Debug Mode End , show pre 10 data of {}---".format(file_list[0]))
exit(0)
return dataset
def save(self, epoch_id, namespace, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
save_interval = envs.get_global_env(
"save.inference.epoch_interval", -1, namespace)
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
"save.inference.feed_varnames", None, namespace)
fetch_varnames = envs.get_global_env(
"save.inference.fetch_varnames", None, namespace)
if feed_varnames is None or fetch_varnames is None:
return
fetch_vars = [fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames]
dirname = envs.get_global_env(
"save.inference.dirname", None, namespace)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(
self._exe, dirname, feed_varnames, fetch_vars)
else:
fluid.io.save_inference_model(
dirname, feed_varnames, fetch_vars, self._exe)
self.inference_models.append((epoch_id, dirname))
def save_persistables():
save_interval = envs.get_global_env(
"save.increment.epoch_interval", -1, namespace)
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(
"save.increment.dirname", None, namespace)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
self.increment_models.append((epoch_id, dirname))
save_persistables()
save_inference_model()
def instance(self, context):
models = envs.get_global_env("train.model.models")
model_class = envs.lazy_instance_by_fliename(models, "Model")
self.model = model_class(None)
context['status'] = 'init_pass'
def init(self, context):
print("Need to be implement")
context['is_exit'] = True
def dataloader_train(self, context):
print("Need to be implement")
context['is_exit'] = True
def dataset_train(self, context):
print("Need to be implement")
context['is_exit'] = True
def infer(self, context):
infer_program = fluid.Program()
startup_program = fluid.Program()
with fluid.unique_name.guard():
with fluid.program_guard(infer_program, startup_program):
self.model.infer_net()
if self.model._infer_data_loader is None:
context['status'] = 'terminal_pass'
return
reader = self._get_dataloader("Evaluate")
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_infer_results().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
self._exe.run(startup_program)
model_list = self.increment_models
evaluate_only = envs.get_global_env(
'evaluate_only', False, namespace='evaluate')
if evaluate_only:
model_list = [(0, envs.get_global_env(
'evaluate_model_path', "", namespace='evaluate'))]
is_return_numpy = envs.get_global_env(
'is_return_numpy', True, namespace='evaluate')
for (epoch, model_dir) in model_list:
print("Begin to infer No.{} model, model_dir: {}".format(
epoch, model_dir))
program = infer_program.clone()
fluid.io.load_persistables(self._exe, model_dir, program)
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(
program=program,
fetch_list=metrics_varnames,
return_numpy=is_return_numpy)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % 2 == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
context['status'] = 'terminal_pass'
def terminal(self, context):
print("clean up and exit")
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.utils.envs import get_global_env
from paddlerec.core.utils.envs import get_runtime_environ
from paddlerec.core.reader import SlotReader
def dataloader(readerclass, train, yaml_file):
if train == "TRAIN":
reader_name = "TrainReader"
namespace = "train.reader"
data_path = get_global_env("train_data_path", None, namespace)
else:
reader_name = "EvaluateReader"
namespace = "evaluate.reader"
data_path = get_global_env("test_data_path", None, namespace)
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
reader_class = lazy_instance_by_fliename(readerclass, reader_name)
reader = reader_class(yaml_file)
reader.init()
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
def slotdataloader(readerclass, train, yaml_file):
if train == "TRAIN":
reader_name = "SlotReader"
namespace = "train.reader"
data_path = get_global_env("train_data_path", None, namespace)
else:
reader_name = "SlotReader"
namespace = "evaluate.reader"
data_path = get_global_env("test_data_path", None, namespace)
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
sparse = get_global_env("sparse_slots", None, namespace)
dense = get_global_env("dense_slots", None, namespace)
padding = get_global_env("padding", 0, namespace)
reader = SlotReader(yaml_file)
reader.init(sparse, dense, int(padding))
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import datetime
import time
import paddle.fluid as fluid
from paddlerec.core.utils import fs as fs
from paddlerec.core.utils import util as util
class DatasetHolder(object):
"""
Dataset Base
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
"""
"""
self._datasets = {}
self._config = config
@abc.abstractmethod
def check_ready(self, params):
"""
check data ready or not
Return:
True/False
"""
pass
@abc.abstractmethod
def load_dataset(self, params):
"""R
"""
pass
@abc.abstractmethod
def preload_dataset(self, params):
"""R
"""
pass
@abc.abstractmethod
def release_dataset(self, params):
"""R
"""
pass
class TimeSplitDatasetHolder(DatasetHolder):
"""
Dataset with time split dir. root_path/$DAY/$HOUR
"""
def __init__(self, config):
"""
init data root_path, time_split_interval, data_path_format
"""
Dataset.__init__(self, config)
if 'data_donefile' not in config or config['data_donefile'] is None:
config['data_donefile'] = config['data_path'] + "/to.hadoop.done"
self._path_generator = util.PathGenerator({'templates': [
{'name': 'data_path', 'template': config['data_path']},
{'name': 'donefile_path', 'template': config['data_donefile']}
]})
self._split_interval = config['split_interval'] # data split N mins per dir
self._data_file_handler = fs.FileHandler(config)
def _format_data_time(self, daytime_str, time_window_mins):
""" """
data_time = util.make_datetime(daytime_str)
mins_of_day = data_time.hour * 60 + data_time.minute
begin_stage = mins_of_day / self._split_interval
end_stage = (mins_of_day + time_window_mins) / self._split_interval
if begin_stage == end_stage and mins_of_day % self._split_interval != 0:
return None, 0
if mins_of_day % self._split_interval != 0:
skip_mins = self._split_interval - (mins_of_day % self._split_interval)
data_time = data_time + datetime.timedelta(minutes=skip_mins)
time_window_mins = time_window_mins - skip_mins
return data_time, time_window_mins
def check_ready(self, daytime_str, time_window_mins):
"""
data in [daytime_str, daytime_str + time_window_mins] is ready or not
Args:
daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00"
time_window_mins(int): from daytime_str to daytime_str + time_window_mins
Return:
True/False
"""
is_ready = True
data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0:
file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time})
if not self._data_file_handler.is_exist(file_path):
is_ready = False
break
time_window_mins = time_window_mins - self._split_interval
data_time = data_time + datetime.timedelta(minutes=self._split_interval)
return is_ready
def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0):
"""
data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx]
Args:
daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00"
time_window_mins(int): from daytime_str to daytime_str + time_window_mins
node_num(int): data split shard num
node_idx(int): shard_idx
Return:
list, data_shard[node_idx]
"""
data_file_list = []
data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0:
file_path = self._path_generator.generate_path('data_path', {'time_format': data_time})
sub_file_list = self._data_file_handler.ls(file_path)
for sub_file in sub_file_list:
sub_file_name = self._data_file_handler.get_file_name(sub_file)
if not sub_file_name.startswith(self._config['filename_prefix']):
continue
if hash(sub_file_name) % node_num == node_idx:
data_file_list.append(sub_file)
time_window_mins = time_window_mins - self._split_interval
data_time = data_time + datetime.timedelta(minutes=self._split_interval)
return data_file_list
def _alloc_dataset(self, file_list):
""" """
dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type'])
dataset.set_batch_size(self._config['batch_size'])
dataset.set_thread(self._config['load_thread'])
dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi'])
dataset.set_pipe_command(self._config['data_converter'])
dataset.set_filelist(file_list)
dataset.set_use_var(self._config['data_vars'])
# dataset.set_fleet_send_sleep_seconds(2)
# dataset.set_fleet_send_batch_size(80000)
return dataset
def load_dataset(self, params):
""" """
begin_time = params['begin_time']
windown_min = params['time_window_min']
if begin_time not in self._datasets:
while self.check_ready(begin_time, windown_min) == False:
print("dataset not ready, time:" + begin_time)
time.sleep(30)
file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx'])
self._datasets[begin_time] = self._alloc_dataset(file_list)
self._datasets[begin_time].load_into_memory()
else:
self._datasets[begin_time].wait_preload_done()
return self._datasets[begin_time]
def preload_dataset(self, params):
""" """
begin_time = params['begin_time']
windown_min = params['time_window_min']
if begin_time not in self._datasets:
if self.check_ready(begin_time, windown_min):
file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx'])
self._datasets[begin_time] = self._alloc_dataset(file_list)
self._datasets[begin_time].preload_into_memory(self._config['preload_thread'])
return True
return False
def release_dataset(self, params):
""" """
begin_time = params['begin_time']
windown_min = params['time_window_min']
if begin_time in self._datasets:
self._datasets[begin_time].release_memory()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import sys
from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import envs
if len(sys.argv) < 4:
raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path")
reader_package = sys.argv[1]
if sys.argv[2].upper() == "TRAIN":
reader_name = "TrainReader"
elif sys.argv[2].upper() == "EVALUATE":
reader_name = "EvaluateReader"
else:
reader_name = "SlotReader"
namespace = sys.argv[4]
sparse_slots = sys.argv[5].replace("#", " ")
dense_slots = sys.argv[6].replace("#", " ")
padding = int(sys.argv[7])
yaml_abs_path = sys.argv[3]
if reader_name != "SlotReader":
reader_class = lazy_instance_by_fliename(reader_package, reader_name)
reader = reader_class(yaml_abs_path)
reader.init()
reader.run_from_stdin()
else:
reader = SlotReader(yaml_abs_path)
reader.init(sparse_slots, dense_slots, padding)
reader.run_from_stdin()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import closing
import copy
import os
import socket
import sys
global_envs = {}
def flatten_environs(envs, separator="."):
flatten_dict = {}
assert isinstance(envs, dict)
def fatten_env_namespace(namespace_nests, local_envs):
if not isinstance(local_envs, dict):
global_k = separator.join(namespace_nests)
flatten_dict[global_k] = str(local_envs)
else:
for k, v in local_envs.items():
if isinstance(v, dict):
nests = copy.deepcopy(namespace_nests)
nests.append(k)
fatten_env_namespace(nests, v)
else:
global_k = separator.join(namespace_nests + [k])
flatten_dict[global_k] = str(v)
for k, v in envs.items():
fatten_env_namespace([k], v)
return flatten_dict
def set_runtime_environs(environs):
for k, v in environs.items():
os.environ[k] = str(v)
def get_runtime_environ(key):
return os.getenv(key, None)
def get_trainer():
train_mode = get_runtime_environ("train.trainer.trainer")
return train_mode
def set_global_envs(envs):
assert isinstance(envs, dict)
def fatten_env_namespace(namespace_nests, local_envs):
for k, v in local_envs.items():
if isinstance(v, dict):
nests = copy.deepcopy(namespace_nests)
nests.append(k)
fatten_env_namespace(nests, v)
else:
global_k = ".".join(namespace_nests + [k])
global_envs[global_k] = v
for k, v in envs.items():
fatten_env_namespace([k], v)
def get_global_env(env_name, default_value=None, namespace=None):
"""
get os environment value
"""
_env_name = env_name if namespace is None else ".".join(
[namespace, env_name])
return global_envs.get(_env_name, default_value)
def get_global_envs():
return global_envs
def path_adapter(path):
if path.startswith("paddlerec."):
package = get_runtime_environ("PACKAGE_BASE")
l_p = path.split("paddlerec.")[1].replace(".", "/")
return os.path.join(package, l_p)
else:
return path
def windows_path_converter(path):
if get_platform() == "WINDOWS":
return path.replace("/", "\\")
else:
return path.replace("\\", "/")
def update_workspace():
workspace = global_envs.get("train.workspace", None)
if not workspace:
return
workspace = path_adapter(workspace)
for name, value in global_envs.items():
if isinstance(value, str):
value = value.replace("{workspace}", workspace)
value = windows_path_converter(value)
global_envs[name] = value
def pretty_print_envs(envs, header=None):
spacing = 5
max_k = 45
max_v = 50
for k, v in envs.items():
max_k = max(max_k, len(k))
h_format = "{{:^{}s}}{}{{:<{}s}}\n".format(max_k, " " * spacing, max_v)
l_format = "{{:<{}s}}{{}}{{:<{}s}}\n".format(max_k, max_v)
length = max_k + max_v + spacing
border = "".join(["="] * length)
line = "".join(["-"] * length)
draws = ""
draws += border + "\n"
if header:
draws += h_format.format(header[0], header[1])
else:
draws += h_format.format("paddlerec Global Envs", "Value")
draws += line + "\n"
for k, v in envs.items():
if isinstance(v, str) and len(v) >= max_v:
str_v = "... " + v[-46:]
else:
str_v = v
draws += l_format.format(k, " " * spacing, str(str_v))
draws += border
_str = "\n{}\n".format(draws)
return _str
def lazy_instance_by_package(package, class_name):
models = get_global_env("train.model.models")
model_package = __import__(
package, globals(), locals(), package.split("."))
instance = getattr(model_package, class_name)
return instance
def lazy_instance_by_fliename(abs, class_name):
dirname = os.path.dirname(abs)
sys.path.append(dirname)
package = os.path.splitext(os.path.basename(abs))[0]
model_package = __import__(
package, globals(), locals(), package.split("."))
instance = getattr(model_package, class_name)
return instance
def get_platform():
import platform
plats = platform.platform()
if 'Linux' in plats:
return "LINUX"
if 'Darwin' in plats:
return "DARWIN"
if 'Windows' in plats:
return "WINDOWS"
def find_free_port():
def __free_port():
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
new_port = __free_port()
return new_port
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
def is_afs_path(path):
"""R
"""
if path.startswith("afs") or path.startswith("hdfs"):
return True
return False
class LocalFSClient(object):
"""
Util for local disk file_system io
"""
def __init__(self):
"""R
"""
pass
def write(self, content, path, mode):
"""
write to file
Args:
content(string)
path(string)
mode(string): w/a w:clear_write a:append_write
"""
temp_dir = os.path.dirname(path)
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
f = open(path, mode)
f.write(content)
f.flush()
f.close()
def cp(self, org_path, dest_path):
"""R
"""
temp_dir = os.path.dirname(dest_path)
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
return os.system("cp -r " + org_path + " " + dest_path)
def cat(self, file_path):
"""R
"""
f = open(file_path)
content = f.read()
f.close()
return content
def mkdir(self, dir_name):
"""R
"""
os.makedirs(dir_name)
def remove(self, path):
"""R
"""
os.system("rm -rf " + path)
def is_exist(self, path):
"""R
"""
if os.system("ls " + path) == 0:
return True
return False
def ls(self, path):
"""R
"""
files = os.listdir(path)
return files
class FileHandler(object):
"""
A Smart file handler. auto judge local/afs by path
"""
def __init__(self, config):
"""R
"""
if 'fs_name' in config:
hadoop_home = "$HADOOP_HOME"
hdfs_configs = {
"hadoop.job.ugi": config['fs_ugi'],
"fs.default.name": config['fs_name']
}
self._hdfs_client = HDFSClient(hadoop_home, hdfs_configs)
self._local_fs_client = LocalFSClient()
def is_exist(self, path):
"""R
"""
if is_afs_path(path):
return self._hdfs_client.is_exist(path)
else:
return self._local_fs_client.is_exist(path)
def get_file_name(self, path):
"""R
"""
sub_paths = path.split('/')
return sub_paths[-1]
def write(self, content, dest_path, mode='w'):
"""R
"""
if is_afs_path(dest_path):
file_name = self.get_file_name(dest_path)
temp_local_file = "./tmp/" + file_name
self._local_fs_client.remove(temp_local_file)
org_content = ""
if mode.find('a') >= 0:
org_content = self._hdfs_client.cat(dest_path)
content = content + org_content
self._local_fs_client.write(content, temp_local_file,
mode) # fleet hdfs_client only support upload, so write tmp file
self._hdfs_client.delete(dest_path + ".tmp")
self._hdfs_client.upload(dest_path + ".tmp", temp_local_file)
self._hdfs_client.delete(dest_path + ".bak")
self._hdfs_client.rename(dest_path, dest_path + '.bak')
self._hdfs_client.rename(dest_path + ".tmp", dest_path)
else:
self._local_fs_client.write(content, dest_path, mode)
def cat(self, path):
"""R
"""
if is_afs_path(path):
hdfs_cat = self._hdfs_client.cat(path)
return hdfs_cat
else:
return self._local_fs_client.cat(path)
def ls(self, path):
"""R
"""
files = []
if is_afs_path(path):
files = self._hdfs_client.ls(path)
files = [path + '/' + self.get_file_name(fi) for fi in files] # absulte path
else:
files = self._local_fs_client.ls(path)
files = [path + '/' + fi for fi in files] # absulte path
return files
def cp(self, org_path, dest_path):
"""R
"""
org_is_afs = is_afs_path(org_path)
dest_is_afs = is_afs_path(dest_path)
if not org_is_afs and not dest_is_afs:
return self._local_fs_client.cp(org_path, dest_path)
if not org_is_afs and dest_is_afs:
return self._hdfs_client.upload(dest_path, org_path)
if org_is_afs and not dest_is_afs:
return self._hdfs_client.download(org_path, dest_path)
print("Not Suppor hdfs cp currently")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class TableMeta(object):
"""
Simple ParamTable Meta, Contain table_id
"""
TableId = 1
@staticmethod
def alloc_new_table(table_id):
"""
create table with table_id
Args:
table_id(int)
Return:
table(TableMeta) : a TableMeta instance with table_id
"""
if table_id < 0:
table_id = TableMeta.TableId
if table_id >= TableMeta.TableId:
TableMeta.TableId += 1
table = TableMeta(table_id)
return table
def __init__(self, table_id):
""" """
self._table_id = table_id
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os
import time
from paddle import fluid
from paddlerec.core.utils import fs as fs
def save_program_proto(path, program=None):
if program is None:
_program = fluid.default_main_program()
else:
_program = program
with open(path, "wb") as f:
f.write(_program.desc.serialize_to_string())
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise ValueError('Boolean value expected.')
def run_which(command):
regex = "/usr/bin/which: no {} in"
ret = run_shell_cmd("which {}".format(command))
if ret.startswith(regex.format(command)):
return None
else:
return ret
def run_shell_cmd(command):
assert command is not None and isinstance(command, str)
return os.popen(command).read().strip()
def get_env_value(env_name):
"""
get os environment value
"""
return os.popen("echo -n ${" + env_name + "}").read().strip()
def now_time_str():
"""
get current format str_time
"""
return "\n" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[0]:"
def get_absolute_path(path, params):
"""R
"""
if path.startswith('afs:') or path.startswith('hdfs:'):
sub_path = path.split('fs:')[1]
if ':' in sub_path: # such as afs://xxx:prot/xxxx
return path
elif 'fs_name' in params:
return params['fs_name'] + sub_path
else:
return path
def make_datetime(date_str, fmt=None):
"""
create a datetime instance by date_string
Args:
date_str: such as 2020-01-14
date_str_format: "%Y-%m-%d"
Return:
datetime
"""
if fmt is None:
if len(date_str) == 8: # %Y%m%d
return datetime.datetime.strptime(date_str, '%Y%m%d')
if len(date_str) == 12: # %Y%m%d%H%M
return datetime.datetime.strptime(date_str, '%Y%m%d%H%M')
return datetime.datetime.strptime(date_str, fmt)
def rank0_print(log_str):
"""R
"""
print_log(log_str, {'master': True})
def print_cost(cost, params):
"""R
"""
log_str = params['log_format'] % cost
print_log(log_str, params)
return log_str
class CostPrinter(object):
"""
For count cost time && print cost log
"""
def __init__(self, callback, callback_params):
"""R
"""
self.reset(callback, callback_params)
pass
def __del__(self):
"""R
"""
if not self._done:
self.done()
pass
def reset(self, callback, callback_params):
"""R
"""
self._done = False
self._callback = callback
self._callback_params = callback_params
self._begin_time = time.time()
pass
def done(self):
"""R
"""
cost = time.time() - self._begin_time
log_str = self._callback(cost, self._callback_params) # cost(s)
self._done = True
return cost, log_str
class PathGenerator(object):
"""
generate path with template & runtime variables
"""
def __init__(self, config):
"""R
"""
self._templates = {}
self.add_path_template(config)
pass
def add_path_template(self, config):
"""R
"""
if 'templates' in config:
for template in config['templates']:
self._templates[template['name']] = template['template']
pass
def generate_path(self, template_name, param):
"""R
"""
if template_name in self._templates:
if 'time_format' in param:
str = param['time_format'].strftime(self._templates[template_name])
return str.format(**param)
return self._templates[template_name].format(**param)
else:
return ""
class TimeTrainPass(object):
"""
timely pass
define pass time_interval && start_time && end_time
"""
def __init__(self, global_config):
"""R
"""
self._config = global_config['epoch']
if '+' in self._config['days']:
day_str = self._config['days'].replace(' ', '')
day_fields = day_str.split('+')
self._begin_day = make_datetime(day_fields[0].strip())
if len(day_fields) == 1 or len(day_fields[1]) == 0:
# 100 years, meaning to continuous running
self._end_day = self._begin_day + datetime.timedelta(days=36500)
else:
# example: 2020212+10
run_day = int(day_fields[1].strip())
self._end_day = self._begin_day + datetime.timedelta(days=run_day)
else:
# example: {20191001..20191031}
days = os.popen("echo -n " + self._config['days']).read().split(" ")
self._begin_day = make_datetime(days[0])
self._end_day = make_datetime(days[len(days) - 1])
self._checkpoint_interval = self._config['checkpoint_interval']
self._dump_inference_interval = self._config['dump_inference_interval']
self._interval_per_pass = self._config['train_time_interval'] # train N min data per pass
self._pass_id = 0
self._inference_pass_id = 0
self._pass_donefile_handler = None
if 'pass_donefile_name' in self._config:
self._train_pass_donefile = global_config['output_path'] + '/' + self._config['pass_donefile_name']
if fs.is_afs_path(self._train_pass_donefile):
self._pass_donefile_handler = fs.FileHandler(global_config['io']['afs'])
else:
self._pass_donefile_handler = fs.FileHandler(global_config['io']['local_fs'])
last_done = self._pass_donefile_handler.cat(self._train_pass_donefile).strip().split('\n')[-1]
done_fileds = last_done.split('\t')
if len(done_fileds) > 4:
self._base_key = done_fileds[1]
self._checkpoint_model_path = done_fileds[2]
self._checkpoint_pass_id = int(done_fileds[3])
self._inference_pass_id = int(done_fileds[4])
self.init_pass_by_id(done_fileds[0], self._checkpoint_pass_id)
def max_pass_num_day(self):
"""R
"""
return 24 * 60 / self._interval_per_pass
def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint):
"""R
"""
if is_checkpoint:
self._checkpoint_pass_id = pass_id
self._checkpoint_model_path = model_path
done_content = "%s\t%s\t%s\t%s\t%d\n" % (day, base_key,
self._checkpoint_model_path, self._checkpoint_pass_id, pass_id)
self._pass_donefile_handler.write(done_content, self._train_pass_donefile, 'a')
pass
def init_pass_by_id(self, date_str, pass_id):
"""
init pass context with pass_id
Args:
date_str: example "20200110"
pass_id(int): pass_id of date
"""
date_time = make_datetime(date_str)
if pass_id < 1:
pass_id = 0
if (date_time - self._begin_day).total_seconds() > 0:
self._begin_day = date_time
self._pass_id = pass_id
mins = self._interval_per_pass * (pass_id - 1)
self._current_train_time = date_time + datetime.timedelta(minutes=mins)
def init_pass_by_time(self, datetime_str):
"""
init pass context with datetime
Args:
date_str: example "20200110000" -> "%Y%m%d%H%M"
"""
self._current_train_time = make_datetime(datetime_str)
minus = self._current_train_time.hour * 60 + self._current_train_time.minute
self._pass_id = minus / self._interval_per_pass + 1
def current_pass(self):
"""R
"""
return self._pass_id
def next(self):
"""R
"""
has_next = True
old_pass_id = self._pass_id
if self._pass_id < 1:
self.init_pass_by_time(self._begin_day.strftime("%Y%m%d%H%M"))
else:
next_time = self._current_train_time + datetime.timedelta(minutes=self._interval_per_pass)
if (next_time - self._end_day).total_seconds() > 0:
has_next = False
else:
self.init_pass_by_time(next_time.strftime("%Y%m%d%H%M"))
if has_next and (self._inference_pass_id < self._pass_id or self._pass_id < old_pass_id):
self._inference_pass_id = self._pass_id - 1
return has_next
def is_checkpoint_pass(self, pass_id):
"""R
"""
if pass_id < 1:
return True
if pass_id == self.max_pass_num_day():
return False
if pass_id % self._checkpoint_interval == 0:
return True
return False
def need_dump_inference(self, pass_id):
"""R
"""
return self._inference_pass_id < pass_id and pass_id % self._dump_inference_interval == 0
def date(self, delta_day=0):
"""
get train date
Args:
delta_day(int): n day afer current_train_date
Return:
date(current_train_time + delta_day)
"""
return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d")
def timestamp(self, delta_day=0):
"""R
"""
return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp()
文件已添加
# PaddleRec Benchmark
> 占位
\ No newline at end of file
# PaddleRec 贡献代码
> 占位
\ No newline at end of file
此差异已折叠。
# PaddleRec 设计
## PaddleRec 整体设计概览
PaddleRec将推荐模型的训练与预测流程,整体抽象为了五个大模块:
* [Engine 流程执行引擎](#engine)
* [Trainer 流程具体定义](#trainer)
* [Model 模型组网定义](#model)
* [Reader 数据读取定义](#reader)
* [Metric 精度指标打印](#metric)
层级结构,以及一键启动训练时的调用关系如下图所示:
<p align="center">
<img align="center" src="imgs/design.png">
<p>
core的文件结构如下,后续分别对各个模块进行介绍。
```
.core
├── engine/ 运行引擎实现
├── metrics/ 全局指标实现
├── modules/ 自定义op实现
├── trainers/ 运行流程实现
├── utils/ 辅助工具
├── factory.py 运行流程的注册
├── layer.py 自定义op基类定义
├── metric.py Metric基类定义
├── model.py Model基类定义
├── reader.py Reader基类定义
└── trainer.py Trainer基类定义
```
## Engine
Engine是整体训练的执行引擎,与组网逻辑及数据无关,只与当前运行模式、运行环境及运行设备有关。
运行模式具体是指:
- 单机运行
- 分布式运行
- 本地模拟分布式
运行环境是指:
- Linux
- Windows
- Mac
运行设备是指:
- CPU
- GPU
- AI芯片
在用户调用`python -m paddlerec.run`时,首先会根据`yaml`文件中的配置信息选择合适的执行引擎, 以下代码位于[run.py](../run.py)
```python
engine_registry()
which_engine = get_engine(args)
engine = which_engine(args)
engine.run()
```
我们以`single engine`为例,概览engine的行为:
```python
def single_engine(args):
trainer = get_trainer_prefix(args) + "SingleTrainer"
single_envs = {}
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single"
single_envs["train.trainer.device"] = args.device
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
```
single_engine被调用后,主要进行了以下两个工作:
1. 根据`yaml`配置文件,设置了**当前进程的环境变量**,后续的所有流程都依赖于环境变量。
2. 根据模型及环境,指定并初始化了运行流程所用的`Trainer`
进一步细化第一步工作
- 本地模拟分布式引擎会在单机环境变量的基础上,额外设置本地模拟分布式的环境变量,比如:为各个进程设置不同通信端口,分配ID。最后会启动多个`Trainer`完成本地模拟分布式的工作。
- 分布式引擎会在单机环境变量的基础上,基于运行参数`-b --backend`所指定的脚本或配置文件,完成分布式任务的文件打包,上传,提交等操作。该脚本格式与分布式任务运行的集群有关,如MPI/K8S/PaddleCloud等,用户可以自定义分布式运行逻辑。
Engine的自定义实现,可以参考[local_cluster.py](../core/engine/local_cluster.py)
## Trainer
`Trainer`是训练与预测流程的具体实现,会run模型中定义的各个流程,与model、reader、metric紧密相关。PaddleRec以有限状态机的逻辑定义了训练中的各个阶段,不同的Trainer子类会分别实现阶段中的特殊需求。有限状态机的流程在`def processor_register()`中注册。
我们以SingleTrainer为例,概览Trainer行为:
```python
class SingleTrainer(TranspileTrainer):
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
```
SingleTrainer首先注册了完成任务所需的步骤,各步骤首先按照注册顺序加入`Trainer`基类中名为`status_processor`的字典,运行的先后顺序,可以在每个执行步骤中改变`context['status']`的值,指定下一步运行哪个步骤。
SingleTrainer指定了以下6个步骤:
1. uninit:默认排在首位,通过环境变量决定model的对象
1. init_pass:调用model_的接口,生成模型的组网,初始化fetch及metric的变量
2. startup_pass:初始化模型组网中的各个参数,run(fluid.default_startup_program)
3. train_pass:会根据环境分别调用`dataset``dataloader`进行训练的流程。
4. infer_pass:在训练结束后,会对训练保存的模型在测试集上验证效果
5. terminal_pass:打印全局变量及预测结果等自定义的信息。
Trainer的自定义实现,可以参照[single_trainer.py](../core/trainers/single_trainer.py)
## Model
Model定义了各个模型实现的范式,模型只要继承并实现基类中的函数,并给一些成员赋值,就可以保证模型被Trainer正确调用。
我们首先看一下Model基类中的部分重要定义,对模型的实现流程有初步概念。
```python
class Model(object):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
self._cost = None
self._metrics = {}
self._data_var = []
self._infer_data_var = []
self._infer_results = {}
self._data_loader = None
self._infer_data_loader = None
self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform()
def get_inputs(self):
return self._data_var
@abc.abstractmethod
def train_net(self):
pass
@abc.abstractmethod
def infer_net(self):
pass
def get_avg_cost(self):
return self._cost
```
每个模型都一定需要继承`def train_net``def infer_net`,并且给`self._data_var``self._cost`成员赋值,指定模型入口,实现组网的整体逻辑。若有更多或更复杂的需求,可以参照下面的接口,分别继承各个函数,并实现需要的功能:
```python
def get_infer_inputs(self):
return self._infer_data_var
def get_infer_results(self):
return self._infer_results
def get_metrics(self):
return self._metrics
def get_fetch_period(self):
return self._fetch_interval
```
model的具体实现,可以参考dnn的示例[model.py](../../models/rank/dnn/../../../paddlerec/core/model.py)
## Reader
PaddleRec会根据运行环境,分别指定不同的数据IO方式。在Linux下,优先使用`Dataset`,Win及Mac优先使用`Dataloader`
Dataset的使用介绍可以参考[DatasetFactory](https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/dataset_cn/DatasetFactory_cn.html)
Dataloader的使用介绍可以参考[异步数据读取](https://www.paddlepaddle.org.cn/documentation/docs/zh/advanced_guide/data_preparing/use_py_reader.html)
考虑到以上两种高效的数据IO方式仍然有很高的学习门槛,PaddleRec将两种数据读取方式进行了更高层次的封装,用户需要实现的仅是每行数据的处理逻辑,剩下的工作交给PaddleRec的Reader基类完成。
首先浏览以下Reader基类的定义,有一个初步的印象:
```python
class Reader(dg.MultiSlotDataGenerator):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod
def init(self):
pass
@abc.abstractmethod
def generate_sample(self, line):
pass
```
用户需要关注并实现的是`def init(self)``def generate_sample(self,line)`函数,分别执行数据读取中预处理所需变量的初始化,以及每一行string的切分及处理逻辑。
当用户定义好以上两个函数,完成自己的Reader后,PaddleRec分别使用
- [dataset_instance.py](../core/utils/dataset_instance.py)
- [dataloader_instance.py](../core/utils/dataloader_instance.py)
完成reader的构建工作。
Reader数据处理的逻辑,可以参考[criteo_reader.py](../../models/rank/../../paddlerec/models/rank/criteo_reader.py)
## Metric
训练必然伴随着训练指标的打印,当单机运行时,打印相关信息比较简单。但分布式训练时,单机指标与全局指标往往有很大diff,比如`auc`以及正逆序`pn`。PaddleRec面向大规模分布式训练,将指标打印的逻辑抽象出来单独实现,以解决分布式训练时全局指标打印的问题。
Metric基类定义了基本的接口,如下:
```python
class Metric(object):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
""" init """
pass
@abc.abstractmethod
def clear(self, scope, params):
"""
clear current value
Args:
scope: value container
params: extend varilable for clear
"""
pass
@abc.abstractmethod
def calculate(self, scope, params):
"""
calculate result
Args:
scope: value container
params: extend varilable for clear
"""
pass
@abc.abstractmethod
def get_result(self):
"""
Return:
result(dict) : calculate result
"""
pass
@abc.abstractmethod
def get_result_to_string(self):
"""
Return:
result(string) : calculate result with string format, for output
"""
pass
```
全局指标的计算及输出,需要分别继承并实现以上四个成员函数。具体实现的例子,可以参考[auc_metric.py](../core/metrics/auc_metrics.py)
\ No newline at end of file
# PaddleRec 分布式训练
## PaddleRec分布式运行
> 占位
### 本地模拟分布式
> 占位
### K8S集群运行分布式
> 占位
# 常见问题FAQ
> 占位
\ No newline at end of file
# PaddleRec 单机训练
> 占位
\ No newline at end of file
# 支持模型列表
| 方向 | 模型 | 单机CPU训练 | 单机GPU训练 | 分布式CPU训练 | 大规模稀疏 | 分布式GPU训练 | 自定义数据集 |
| :------: | :--------------------: | :---------: | :---------: | :-----------: | :--------: | :-----------: | :----------: |
| 内容理解 | [Text-Classifcation]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 内容理解 | [TagSpace]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 召回 | [Word2Vec]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 召回 | [TDM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 排序 | [CTR-Dnn]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 排序 | [DeepFm]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 排序 | [ListWise]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 多任务 | [MMOE]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 多任务 | [ESMM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 匹配 | [DSSM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 匹配 | [Multiview-Simnet]() | ✓ | x | ✓ | x | ✓ | ✓ |
# PaddleRec 模型调参
> 占位
\ No newline at end of file
# PaddleRec 离线预测
\ No newline at end of file
## [分布式训练概述](https://www.paddlepaddle.org.cn/tutorials/projectdetail/459124)
## [多机多卡训练](https://www.paddlepaddle.org.cn/tutorials/projectdetail/459127)
## [参数服务器训练](https://www.paddlepaddle.org.cn/tutorials/projectdetail/464839)
# 推荐系统背景知识
本文来源于[个性化推荐](https://github.com/PaddlePaddle/book/blob/develop/05.recommender_system/README.cn.md),进行了节选。
本文代码目录在[book/recommender_system](https://github.com/PaddlePaddle/book/tree/develop/05.recommender_system),初次使用请您参考[Book文档使用说明](https://github.com/PaddlePaddle/book/blob/develop/README.cn.md#运行这本书)
更多教程及背景知识可以查阅[深度学习实践应用:个性化推荐](https://www.paddlepaddle.org.cn/tutorials/projectdetail/443958)
## 背景介绍
在网络技术不断发展和电子商务规模不断扩大的背景下,商品数量和种类快速增长,用户需要花费大量时间才能找到自己想买的商品,这就是信息超载问题。为了解决这个难题,个性化推荐系统(Recommender System)应运而生。
个性化推荐系统是信息过滤系统(Information Filtering System)的子集,它可以用在很多领域,如电影、音乐、电商和 Feed 流推荐等。个性化推荐系统通过分析、挖掘用户行为,发现用户的个性化需求与兴趣特点,将用户可能感兴趣的信息或商品推荐给用户。与搜索引擎不同,个性化推荐系统不需要用户准确地描述出自己的需求,而是根据用户的历史行为进行建模,主动提供满足用户兴趣和需求的信息。
1994年明尼苏达大学推出的GroupLens系统[[1](#参考文献)]一般被认为是个性化推荐系统成为一个相对独立的研究方向的标志。该系统首次提出了基于协同过滤来完成推荐任务的思想,此后,基于该模型的协同过滤推荐引领了个性化推荐系统十几年的发展方向。
传统的个性化推荐系统方法主要有:
- 协同过滤推荐(Collaborative Filtering Recommendation):该方法是应用最广泛的技术之一,需要收集和分析用户的历史行为、活动和偏好。它通常可以分为两个子类:基于用户 (User-Based)的推荐[[1](#参考文献)] 和基于物品(Item-Based)的推荐[[2](#参考文献)]。该方法的一个关键优势是它不依赖于机器去分析物品的内容特征,因此它无需理解物品本身也能够准确地推荐诸如电影之类的复杂物品;缺点是对于没有任何行为的新用户存在冷启动的问题,同时也存在用户与商品之间的交互数据不够多造成的稀疏问题。值得一提的是,社交网络[[3](#参考文献)]或地理位置等上下文信息都可以结合到协同过滤中去。
- 基于内容过滤推荐[[4](#参考文献)](Content-based Filtering Recommendation):该方法利用商品的内容描述,抽象出有意义的特征,通过计算用户的兴趣和商品描述之间的相似度,来给用户做推荐。优点是简单直接,不需要依据其他用户对商品的评价,而是通过商品属性进行商品相似度度量,从而推荐给用户所感兴趣商品的相似商品;缺点是对于没有任何行为的新用户同样存在冷启动的问题。
- 组合推荐[[5](#参考文献)](Hybrid Recommendation):运用不同的输入和技术共同进行推荐,以弥补各自推荐技术的缺点。
近些年来,深度学习在很多领域都取得了巨大的成功。学术界和工业界都在尝试将深度学习应用于个性化推荐系统领域中。深度学习具有优秀的自动提取特征的能力,能够学习多层次的抽象特征表示,并对异质或跨域的内容信息进行学习,可以一定程度上处理个性化推荐系统冷启动问题[[6](#参考文献)]。
### YouTube的深度神经网络个性化推荐系统
YouTube是世界上最大的视频上传、分享和发现网站,YouTube个性化推荐系统为超过10亿用户从不断增长的视频库中推荐个性化的内容。整个系统由两个神经网络组成:候选生成网络和排序网络。候选生成网络从百万量级的视频库中生成上百个候选,排序网络对候选进行打分排序,输出排名最高的数十个结果。系统结构如图1所示:
<p align="center">
<img src="https://github.com/PaddlePaddle/book/blob/develop/05.recommender_system/image/YouTube_Overview.png?raw=true" width="70%" ><br/>
图1. YouTube 个性化推荐系统结构
</p>
#### 候选生成网络(Candidate Generation Network)
候选生成网络将推荐问题建模为一个类别数极大的多类分类问题:对于一个Youtube用户,使用其观看历史(视频ID)、搜索词记录(search tokens)、人口学信息(如地理位置、用户登录设备)、二值特征(如性别,是否登录)和连续特征(如用户年龄)等,对视频库中所有视频进行多分类,得到每一类别的分类结果(即每一个视频的推荐概率),最终输出概率较高的几百个视频。
首先,将观看历史及搜索词记录这类历史信息,映射为向量后取平均值得到定长表示;同时,输入人口学特征以优化新用户的推荐效果,并将二值特征和连续特征归一化处理到[0, 1]范围。接下来,将所有特征表示拼接为一个向量,并输入给非线形多层感知器(MLP,详见[识别数字](https://github.com/PaddlePaddle/book/blob/develop/02.recognize_digits/README.cn.md)教程)处理。最后,训练时将MLP的输出给softmax做分类,预测时计算用户的综合特征(MLP的输出)与所有视频的相似度,取得分最高的K个作为候选生成网络的筛选结果。
#### 排序网络(Ranking Network)
排序网络的结构类似于候选生成网络,但是它的目标是对候选进行更细致的打分排序。和传统广告排序中的特征抽取方法类似,这里也构造了大量的用于视频排序的相关特征(如视频 ID、上次观看时间等)。这些特征的处理方式和候选生成网络类似,不同之处是排序网络的顶部是一个加权逻辑回归(weighted logistic regression),它对所有候选视频进行打分,从高到底排序后将分数较高的一些视频返回给用户。
### 融合推荐模型
本节会使用卷积神经网络(Convolutional Neural Networks)来学习电影名称的表示。下面会依次介绍文本卷积神经网络以及融合推荐模型。
#### 文本卷积神经网络(CNN)
卷积神经网络经常用来处理具有类似网格拓扑结构(grid-like topology)的数据。例如,图像可以视为二维网格的像素点,自然语言可以视为一维的词序列。卷积神经网络可以提取多种局部特征,并对其进行组合抽象得到更高级的特征表示。实验表明,卷积神经网络能高效地对图像及文本问题进行建模处理。
卷积神经网络主要由卷积(convolution)和池化(pooling)操作构成,其应用及组合方式灵活多变,种类繁多。
<a name="参考文献"></a>
## 参考文献
1. P. Resnick, N. Iacovou, etc. “[GroupLens: An Open Architecture for Collaborative Filtering of Netnews](http://ccs.mit.edu/papers/CCSWP165.html)”, Proceedings of ACM Conference on Computer Supported Cooperative Work, CSCW 1994. pp.175-186.
2. Sarwar, Badrul, et al. "[Item-based collaborative filtering recommendation algorithms.](http://files.grouplens.org/papers/www10_sarwar.pdf)*Proceedings of the 10th international conference on World Wide Web*. ACM, 2001.
3. Kautz, Henry, Bart Selman, and Mehul Shah. "[Referral Web: combining social networks and collaborative filtering.](http://www.cs.cornell.edu/selman/papers/pdf/97.cacm.refweb.pdf)" Communications of the ACM 40.3 (1997): 63-65. APA
4. [Peter Brusilovsky](https://en.wikipedia.org/wiki/Peter_Brusilovsky) (2007). *The Adaptive Web*. p. 325.
5. Robin Burke , [Hybrid Web Recommender Systems](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.435.7538&rep=rep1&type=pdf), pp. 377-408, The Adaptive Web, Peter Brusilovsky, Alfred Kobsa, Wolfgang Nejdl (Ed.), Lecture Notes in Computer Science, Springer-Verlag, Berlin, Germany, Lecture Notes in Computer Science, Vol. 4321, May 2007, 978-3-540-72078-2.
6. Yuan, Jianbo, et al. ["Solving Cold-Start Problem in Large-scale Recommendation Engines: A Deep Learning Approach."](https://arxiv.org/pdf/1611.05480v1.pdf) *arXiv preprint arXiv:1611.05480* (2016).
<br/>
<a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/"><img alt="知识共享许可协议" style="border-width:0" src="https://paddlepaddleimage.cdn.bcebos.com/bookimage/camo.png" /></a><br /><span xmlns:dct="http://purl.org/dc/terms/" href="http://purl.org/dc/dcmitype/Text" property="dct:title" rel="dct:type">本教程</span><a xmlns:cc="http://creativecommons.org/ns#" href="http://book.paddlepaddle.org" property="cc:attributionName" rel="cc:attributionURL">PaddlePaddle</a> 创作,采用 <a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/">知识共享 署名-相同方式共享 4.0 国际 许可协议</a>进行许可。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册