Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
PaddleRec
提交
3061d467
P
PaddleRec
项目概览
BaiXuePrincess
/
PaddleRec
与 Fork 源项目一致
Fork自
PaddlePaddle / PaddleRec
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleRec
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
3061d467
编写于
9月 07, 2020
作者:
W
wuzhihua
提交者:
GitHub
9月 07, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #208 from MrChengmo/tdm_build_tree_v2
Tdm build tree & Online Learning
上级
c26b0e75
2aa4e664
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
282 addition
and
11 deletion
+282
-11
doc/train.md
doc/train.md
+1
-1
models/rank/dnn/README.md
models/rank/dnn/README.md
+130
-0
models/rank/dnn/config.yaml
models/rank/dnn/config.yaml
+13
-0
models/rank/dnn/model.py
models/rank/dnn/model.py
+30
-10
models/rank/dnn/online_learning_runner.py
models/rank/dnn/online_learning_runner.py
+89
-0
models/treebased/tdm/build_tree.md
models/treebased/tdm/build_tree.md
+19
-0
未找到文件。
doc/train.md
浏览文件 @
3061d467
...
...
@@ -20,7 +20,7 @@ python -m paddlerec.run -m paddlerec.models.xxx.yyy
例如启动
`recall`
下的
`word2vec`
模型的默认配置;
```
shell
python
-m
paddlerec.run
-m
models/recall/word2vec
python
-m
paddlerec.run
-m
models/recall/word2vec
/config.yaml
```
### 2. 启动内置模型的个性化配置训练
...
...
models/rank/dnn/README.md
浏览文件 @
3061d467
...
...
@@ -259,3 +259,133 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc(
```
完成上述组网后,我们最终可以通过训练拿到
`avg_cost`
与
`auc`
两个重要指标。
## 流式训练(OnlineLearning)任务启动及配置流程
### 流式训练简介
流式训练是按照一定顺序进行数据的接收和处理,每接收一个数据,模型会对它进行预测并对当前模型进行更新,然后处理下一个数据。 像信息流、小视频、电商等场景,每天都会新增大量的数据, 让每天(每一刻)新增的数据基于上一天(上一刻)的模型进行新的预测和模型更新。
在大规模流式训练场景下, 需要使用的深度学习框架有对应的能力支持, 即:
*
支持大规模分布式训练的能力, 数据量巨大, 需要有良好的分布式训练及扩展能力,才能满足训练的时效要求
*
支持超大规模的Embedding, 能够支持十亿甚至千亿级别的Embedding, 拥有合理的参数输出的能力,能够快速输出模型参数并和线上其他系统进行对接
*
Embedding的特征ID需要支持HASH映射,不要求ID的编码,能够自动增长及控制特征的准入(原先不存在的特征可以以适当的条件创建), 能够定期淘汰(能够以一定的策略进行过期的特征的清理) 并拥有准入及淘汰策略
*
最后就是要基于框架开发一套完备的流式训练的 trainer.py, 能够拥有完善的流式训练流程
### 使用ctr-dnn online learning 进行模型的训练
目前,PaddleRec基于飞桨分布式训练框架的能力,实现了这套流式训练的流程。 供大家参考和使用。我们基于
`models/rank/ctr-dnn`
修改了一个online_training的版本,供大家更好的理解和参考。
**注意**
1.
使用online learning 需要安装目前Paddle最新的开发者版本, 你可以从 https://www.paddlepaddle.org.cn/documentation/docs/zh/install/Tables.html#whl-dev 此处获得它,需要先卸载当前已经安装的飞桨版本,根据自己的Python环境下载相应的安装包。
2.
使用online learning 需要安装目前PaddleRec最新的开发者版本, 你可以通过 git clone https://github.com/PaddlePaddle/PaddleRec.git 得到最新版的PaddleRec并自行安装
### 启动方法
1.
修改config.yaml中的 hyper_parameters.distributed_embedding=1,表示打开大规模稀疏的模式
2.
修改config.yaml中的 mode: [single_cpu_train, single_cpu_infer] 中的
`single_cpu_train`
为online_learning_cluster,表示使用online learning对应的运行模式
3.
准备训练数据, ctr-dnn中使用的online learning对应的训练模式为 天级别训练, 每天又分为24个小时, 因此训练数据需要 天--小时的目录结构进行整理。
以 2020年08月10日 到 2020年08月11日 2天的训练数据举例, 用户需要准备的数据的目录结构如下:
```
train_data/
|-- 20200810
| |-- 00
| | `-- train.txt
| |-- 01
| | `-- train.txt
| |-- 02
| | `-- train.txt
| |-- 03
| | `-- train.txt
| |-- 04
| | `-- train.txt
| |-- 05
| | `-- train.txt
| |-- 06
| | `-- train.txt
| |-- 07
| | `-- train.txt
| |-- 08
| | `-- train.txt
| |-- 09
| | `-- train.txt
| |-- 10
| | `-- train.txt
| |-- 11
| | `-- train.txt
| |-- 12
| | `-- train.txt
| |-- 13
| | `-- train.txt
| |-- 14
| | `-- train.txt
| |-- 15
| | `-- train.txt
| |-- 16
| | `-- train.txt
| |-- 17
| | `-- train.txt
| |-- 18
| | `-- train.txt
| |-- 19
| | `-- train.txt
| |-- 20
| | `-- train.txt
| |-- 21
| | `-- train.txt
| |-- 22
| | `-- train.txt
| `-- 23
| `-- train.txt
`-- 20200811
|-- 00
| `-- train.txt
|-- 01
| `-- train.txt
|-- 02
| `-- train.txt
|-- 03
| `-- train.txt
|-- 04
| `-- train.txt
|-- 05
| `-- train.txt
|-- 06
| `-- train.txt
|-- 07
| `-- train.txt
|-- 08
| `-- train.txt
|-- 09
| `-- train.txt
|-- 10
| `-- train.txt
|-- 11
| `-- train.txt
|-- 12
| `-- train.txt
|-- 13
| `-- train.txt
|-- 14
| `-- train.txt
|-- 15
| `-- train.txt
|-- 16
| `-- train.txt
|-- 17
| `-- train.txt
|-- 18
| `-- train.txt
|-- 19
| `-- train.txt
|-- 20
| `-- train.txt
|-- 21
| `-- train.txt
|-- 22
| `-- train.txt
`-- 23
`-- train.txt
```
4.
准备好数据后, 即可按照标准的训练流程进行流式训练了
```
shell
python
-m
paddlerec.run
-m
models/rerank/ctr-dnn/config.yaml
```
models/rank/dnn/config.yaml
浏览文件 @
3061d467
...
...
@@ -49,6 +49,7 @@ hyper_parameters:
sparse_feature_dim
:
9
dense_input_dim
:
13
fc_sizes
:
[
512
,
256
,
128
,
32
]
distributed_embedding
:
0
# select runner by name
mode
:
[
single_cpu_train
,
single_cpu_infer
]
...
...
@@ -90,6 +91,18 @@ runner:
print_interval
:
1
phases
:
[
phase1
]
-
name
:
online_learning_cluster
class
:
cluster_train
runner_class_path
:
"
{workspace}/online_learning_runner.py"
epochs
:
2
device
:
cpu
fleet_mode
:
ps
save_checkpoint_interval
:
1
# save model interval of epochs
save_checkpoint_path
:
"
increment_dnn"
# save checkpoint path
init_model_path
:
"
"
# load model path
print_interval
:
1
phases
:
[
phase1
]
-
name
:
collective_cluster
class
:
cluster_train
epochs
:
2
...
...
models/rank/dnn/model.py
浏览文件 @
3061d467
...
...
@@ -25,8 +25,16 @@ class Model(ModelBase):
ModelBase
.
__init__
(
self
,
config
)
def
_init_hyper_parameters
(
self
):
self
.
is_distributed
=
True
if
envs
.
get_fleet_mode
().
upper
(
)
==
"PSLIB"
else
False
self
.
is_distributed
=
False
self
.
distributed_embedding
=
False
if
envs
.
get_fleet_mode
().
upper
()
==
"PSLIB"
:
self
.
is_distributed
=
True
if
envs
.
get_global_env
(
"hyper_parameters.distributed_embedding"
,
0
)
==
1
:
self
.
distributed_embedding
=
True
self
.
sparse_feature_number
=
envs
.
get_global_env
(
"hyper_parameters.sparse_feature_number"
)
self
.
sparse_feature_dim
=
envs
.
get_global_env
(
...
...
@@ -40,14 +48,26 @@ class Model(ModelBase):
self
.
label_input
=
self
.
_sparse_data_var
[
0
]
def
embedding_layer
(
input
):
emb
=
fluid
.
layers
.
embedding
(
input
=
input
,
is_sparse
=
True
,
is_distributed
=
self
.
is_distributed
,
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()),
)
if
self
.
distributed_embedding
:
emb
=
fluid
.
contrib
.
layers
.
sparse_embedding
(
input
=
input
,
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()))
else
:
emb
=
fluid
.
layers
.
embedding
(
input
=
input
,
is_sparse
=
True
,
is_distributed
=
self
.
is_distributed
,
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()))
emb_sum
=
fluid
.
layers
.
sequence_pool
(
input
=
emb
,
pool_type
=
'sum'
)
return
emb_sum
...
...
models/rank/dnn/online_learning_runner.py
0 → 100644
浏览文件 @
3061d467
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
os
import
time
import
warnings
import
numpy
as
np
import
logging
import
paddle.fluid
as
fluid
from
paddlerec.core.utils
import
envs
from
paddlerec.core.metric
import
Metric
from
paddlerec.core.trainers.framework.runner
import
RunnerBase
logging
.
basicConfig
(
format
=
'%(asctime)s - %(levelname)s: %(message)s'
,
level
=
logging
.
INFO
)
class
OnlineLearningRunner
(
RunnerBase
):
def
__init__
(
self
,
context
):
print
(
"Running OnlineLearningRunner."
)
def
run
(
self
,
context
):
epochs
=
int
(
envs
.
get_global_env
(
"runner."
+
context
[
"runner_name"
]
+
".epochs"
))
model_dict
=
context
[
"env"
][
"phase"
][
0
]
model_class
=
context
[
"model"
][
model_dict
[
"name"
]][
"model"
]
metrics
=
model_class
.
_metrics
dataset_list
=
[]
dataset_index
=
0
for
day_index
in
range
(
len
(
days
)):
day
=
days
[
day_index
]
cur_path
=
"%s/%s"
%
(
path
,
str
(
day
))
filelist
=
fleet
.
split_files
(
hdfs_ls
([
cur_path
]))
dataset
=
create_dataset
(
use_var
,
filelist
)
dataset_list
.
append
(
dataset
)
dataset_index
+=
1
dataset_index
=
0
for
epoch
in
range
(
len
(
days
)):
day
=
days
[
day_index
]
begin_time
=
time
.
time
()
result
=
self
.
_run
(
context
,
model_dict
)
end_time
=
time
.
time
()
seconds
=
end_time
-
begin_time
message
=
"epoch {} done, use time: {}"
.
format
(
epoch
,
seconds
)
# TODO, wait for PaddleCloudRoleMaker supports gloo
from
paddle.fluid.incubate.fleet.base.role_maker
import
GeneralRoleMaker
if
context
[
"fleet"
]
is
not
None
and
isinstance
(
context
[
"fleet"
],
GeneralRoleMaker
):
metrics_result
=
[]
for
key
in
metrics
:
if
isinstance
(
metrics
[
key
],
Metric
):
_str
=
metrics
[
key
].
calc_global_metrics
(
context
[
"fleet"
],
context
[
"model"
][
model_dict
[
"name"
]][
"scope"
])
metrics_result
.
append
(
_str
)
elif
result
is
not
None
:
_str
=
"{}={}"
.
format
(
key
,
result
[
key
])
metrics_result
.
append
(
_str
)
if
len
(
metrics_result
)
>
0
:
message
+=
", global metrics: "
+
", "
.
join
(
metrics_result
)
print
(
message
)
with
fluid
.
scope_guard
(
context
[
"model"
][
model_dict
[
"name"
]][
"scope"
]):
train_prog
=
context
[
"model"
][
model_dict
[
"name"
]][
"main_program"
]
startup_prog
=
context
[
"model"
][
model_dict
[
"name"
]][
"startup_program"
]
with
fluid
.
program_guard
(
train_prog
,
startup_prog
):
self
.
save
(
epoch
,
context
,
True
)
context
[
"status"
]
=
"terminal_pass"
models/treebased/tdm/build_tree.md
0 → 100644
浏览文件 @
3061d467
wget https://paddlerec.bj.bcebos.com/utils/tree_build_utils.tar.gz --no-check-certificate
# input_path: embedding的路径
# emb_shape: embedding中key-value,value的维度
# emb格式要求: embedding_id(int64),embedding(float),embedding(float),......,embedding(float)
# cluster_threads: 建树聚类所用线程
python_172_anytree/bin/python -u main.py --input_path=./gen_emb/item_emb.txt --output_path=./ --emb_shape=24 --cluster_threads=4
建树流程是:1、读取emb -> 2、kmeans聚类 -> 3、聚类结果整理为树 -> 4、基于树结构得到模型所需的4个文件
1 Layer_list:记录了每一层都有哪些节点。训练用
2 Travel_list:记录每个叶子节点的Travel路径。训练用
3 Tree_Info:记录了每个节点的信息,主要为:是否是item/item_id,所在层级,父节点,子节点。检索用
4 Tree_Embedding:记录所有节点的Embedding。训练及检索用
注意一下训练数据输入的item是建树之前用的item id,还是基于树的node id,还是基于叶子的leaf id,在tdm_reader.py中,可以加载字典,做映射。
用厂内版建树得到的输出文件夹里,有名为id2nodeid.txt的映射文件,格式是『hash值』+ 『树节点ID』+『叶子节点ID(表示第几个叶子节点,tdm_sampler op 所需的输入)』
在另一个id2bidword.txt中,也有映射关系,格式是『hash值』+『原始item ID』,这个文件中仅存储了叶子节点的信息。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录