提交 f81ecb77 编写于 作者: S seiriosPlus

fix online learnning on ctr-dnn

上级 9c0b0a06
# PaddleRec 流式训练(OnlineLearning)任务启动及配置流程
## 流式训练简介
流式训练是按照一定顺序进行数据的接收和处理,每接收一个数据,模型会对它进行预测并对当前模型进行更新,然后处理下一个数据。 像信息流、小视频、电商等场景,每天都会新增大量的数据, 让每天(每一刻)新增的数据基于上一天(上一刻)的模型进行新的预测和模型更新。
在大规模流式训练场景下, 需要使用的深度学习框架有对应的能力支持, 即:
* 支持大规模分布式训练的能力, 数据量巨大, 需要有良好的分布式训练及扩展能力,才能满足训练的时效要求
* 支持超大规模的Embedding, 能够支持十亿甚至千亿级别的Embedding, 拥有合理的参数输出的能力,能够快速输出模型参数并和线上其他系统进行对接
* Embedding的特征ID需要支持HASH映射,不要求ID的编码,能够自动增长及控制特征的准入(原先不存在的特征可以以适当的条件创建), 能够定期淘汰(能够以一定的策略进行过期的特征的清理) 并拥有准入及淘汰策略
* 最后就是要基于框架开发一套完备的流式训练的 trainer.py, 能够拥有完善的流式训练流程
## 使用PaddleRec内置的 online learning 进行模型的训练
目前,PaddleRec基于飞桨分布式训练框架的能力,实现了这套流式训练的流程。 供大家参考和使用。我们在`models/online_learning`目录下提供了一个ctr-dnn的online_training的版本,供大家更好的理解和参考。
**注意**
1. 使用online learning 需要安装目前Paddle最新的开发者版本, 你可以从 https://www.paddlepaddle.org.cn/documentation/docs/zh/install/Tables.html#whl-dev 此处获得它,需要先卸载当前已经安装的飞桨版本,根据自己的Python环境下载相应的安装包。
2. 使用流式训练及大规模稀疏的能力,需要对模型做一些微调, 因此需要你修改部分代码。
3. 当前只有参数服务器的分布式训练是支持带大规模稀疏的流式训练的,因此运行时,请直接选择参数服务器本地训练或集群训练方法。
## 启动方法
### 1. 启动内置模型的默认配置训练
在安装好`paddlepaddle``paddlerec`后,可以直接使用一行命令快速启动内置模型的默认配置训练,命令如下;
```shell
python -m paddlerec.run -m paddlerec.models.xxx.yyy
```
注意事项:
1. 请确保调用的是安装了paddlerec的`python`环境
2. `xxx`为paddlerec.models下有多个大类,如:`recall`/`rank`/`rerank`
3. `yyy`为每个类别下又有多个模型,如`recall`下有:`gnn`/`grup4rec`/`ncf`
例如启动`recall`下的`word2vec`模型的默认配置;
```shell
python -m paddlerec.run -m models/recall/word2vec
```
### 2. 启动内置模型的个性化配置训练
如果我们修改了默认模型的config.yaml文件,怎么运行修改后的模型呢?
- **没有改动模型组网**
假如你将paddlerec代码库克隆在了`/home/PaddleRec`,并修改了`/home/PaddleRec/models/rank/dnn/config.yaml`,则如下启动训练
```shell
python -m paddlerec.run -m /home/PaddleRec/models/rank/dnn/config.yaml
```
paddlerec 运行的是在paddlerec库安装目录下的组网文件(model.py),但个性化配置`config.yaml`是用的是指定路径下的yaml文件。
- **改动了模型组网**
假如你将paddlerec代码库克隆在了`/home/PaddleRec`,并修改了`/home/PaddleRec/models/rank/dnn/model.py`, 以及`/home/PaddleRec/models/rank/dnn/config.yaml`,则首先需要更改`yaml`中的`workspace`的设置:
```yaml
workspace: /home/PaddleRec/models/rank/dnn/
```
再执行:
```shell
python -m paddlerec.run -m /home/PaddleRec/models/rank/dnn/config.yaml
```
paddlerec 运行的是绝对路径下的组网文件(model.py)以及个性化配置文件(config.yaml)
## yaml训练配置
### yaml中训练相关的概念
`config.yaml`中训练流程相关有两个重要的逻辑概念,`runner``phase`
- **`runner`** : runner是训练的引擎,亦可称之为运行器,在runner中定义执行设备(cpu、gpu),执行的模式(训练、预测、单机、多机等),以及运行的超参,例如训练轮数,模型保存地址等。
- **`phase`** : phase是训练中的阶段的概念,是引擎具体执行的内容,该内容是指:具体运行哪个模型文件,使用哪个reader。
PaddleRec每次运行时,会执行一个或多个运行器,通过`mode`指定`runner`的名字。每个运行器可以执行一个或多个`phase`,所以PaddleRec支持一键启动多阶段的训练。
### 单机CPU训练
下面我们开始定义一个单机CPU训练的`runner`:
```yaml
mode: single_cpu_train # 执行名为 single_cpu_train 的运行器
# mode 也支持多个runner的执行,此处可以改为 mode: [single_cpu_train, single_cpu_infer]
runner:
- name: single_cpu_train # 定义 runner 名为 single_cpu_train
class: train # 执行单机训练
device: cpu # 执行在 cpu 上
epochs: 10 # 训练轮数
save_checkpoint_interval: 2 # 每隔2轮保存一次checkpoint
save_inference_interval: 4 # 每隔4轮保存一次inference model
save_checkpoint_path: "increment" # checkpoint 的保存地址
save_inference_path: "inference" # inference model 的保存地址
save_inference_feed_varnames: [] # inference model 的feed参数的名字
save_inference_fetch_varnames: [] # inference model 的fetch参数的名字
init_model_path: "" # 如果是加载模型热启,则可以指定初始化模型的地址
print_interval: 10 # 训练信息的打印间隔,以batch为单位
phases: [phase_train] # 若没有指定phases,则会默认运行所有phase
# phase 也支持自定多个phase的执行,此处可以改为 phases: [phase_train, phase_infer]
```
再定义具体的执行内容:
```yaml
phase:
- name: phase_train # 该阶段名为 phase1
model: "{workspace}/model.py" # 模型文件为workspace下的model.py
dataset_name: dataset_train # reader的名字
dataset:
- name: dataset_train
type: DataLoader # 使用DataLoader的数据读取方式
batch_size: 2
data_path: "{workspace}/train_data" # 数据地址
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" # sparse 输入的位置定义
dense_slots: "dense_var:13" # dense参数的维度定义
```
### 单机单卡GPU训练
具体执行内容与reader与前述相同,下面介绍需要改动的地方
```yaml
mode: single_gpu_train # 执行名为 single_gpu_train 的运行器
runner:
- name: single_gpu_train # 定义 runner 名为 single_gpu_train
class: train # 执行单机训练
device: gpu # 执行在 gpu 上
selected_gpus: "0" # 默认选择在id=0的卡上执行训练
epochs: 10 # 训练轮数
```
### 单机多卡GPU训练
具体执行内容与reader与前述相同,下面介绍需要改动的地方
```yaml
mode: single_multi_gpu_train # 执行名为 single_multi_gpu_train 的运行器
runner:
- name: single_multi_gpu_train # 定义 runner 名为 single_multi_gpu_train
class: train # 执行单机训练
device: gpu # 执行在 gpu 上
selected_gpus: "0,1,2,3" # 选择多卡执行训练
epochs: 10 # 训练轮数
```
### 本地模拟参数服务器训练
具体执行内容与reader与前述相同,下面介绍需要改动的地方
```yaml
mode: local_cluster_cpu_train # 执行名为 local_cluster_cpu_train 的运行器
runner:
- name: local_cluster_cpu_train # 定义 runner 名为 runner_train
class: local_cluster_train # 执行本地模拟分布式——参数服务器训练
device: cpu # 执行在 cpu 上(paddle后续版本会支持PS-GPU)
worker_num: 1 # (可选)worker进程数量,默认1
server_num: 1 # (可选)server进程数量,默认1
epochs: 10 # 训练轮数
```
...@@ -20,7 +20,7 @@ python -m paddlerec.run -m paddlerec.models.xxx.yyy ...@@ -20,7 +20,7 @@ python -m paddlerec.run -m paddlerec.models.xxx.yyy
例如启动`recall`下的`word2vec`模型的默认配置; 例如启动`recall`下的`word2vec`模型的默认配置;
```shell ```shell
python -m paddlerec.run -m models/recall/word2vec python -m paddlerec.run -m models/recall/word2vec/config.yaml
``` ```
### 2. 启动内置模型的个性化配置训练 ### 2. 启动内置模型的个性化配置训练
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# workspace
workspace: "models/demo/online_learning"
# list of dataset
dataset:
- name: dataloader_train # name of dataset to distinguish different datasets
batch_size: 2
type: DataLoader # or QueueDataset
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
- name: dataset_train # name of dataset to distinguish different datasets
batch_size: 2
type: QueueDataset # or DataLoader
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
- name: dataset_infer # name
batch_size: 2
type: DataLoader # or QueueDataset
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
# hyper parameters of user-defined network
hyper_parameters:
# optimizer config
optimizer:
class: Adam
learning_rate: 0.001
strategy: async
# user-defined <key, value> pairs
sparse_inputs_slots: 27
sparse_feature_dim: 9
dense_input_dim: 13
fc_sizes: [512, 256, 128, 32]
# select runner by name
mode: [ps_cluster, single_cpu_infer]
# config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: single_cpu_infer
class: infer
# num of epochs
epochs: 1
# device to run training or infer
device: cpu
init_model_path: "increment_dnn" # load model path
phases: [phase2]
- name: ps_cluster
class: cluster_train
runner_class_path: "{workspace}/online_learning_runner.py"
epochs: 2
device: cpu
fleet_mode: ps
save_checkpoint_interval: 1 # save model interval of epochs
save_checkpoint_path: "increment_dnn" # save checkpoint path
init_model_path: "" # load model path
print_interval: 1
phases: [phase1]
# runner will run all the phase in each epoch
phase:
- name: phase1
model: "{workspace}/model.py" # user-defined model
dataset_name: dataloader_train # select dataset by name
thread_num: 1
- name: phase2
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_infer # select dataset by name
thread_num: 1
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
mv ./raw_data ./train_data_full
mkdir train_data && cd train_data
cp ../train_data_full/part-0 ../train_data_full/part-1 ./ && cd ..
mv ./test_data ./test_data_full
mkdir test_data && cd test_data
cp ../test_data_full/part-220 ./ && cd ..
echo "Complete data download."
echo "Full Train data stored in ./train_data_full "
echo "Full Test data stored in ./test_data_full "
echo "Rapid Verification train data stored in ./train_data "
echo "Rapid Verification test data stored in ./test_data "
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in continuous_range_:
if features[idx] == "":
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) - cont_min_[idx - 1]) /
cont_diff_[idx - 1])
for idx in categorical_range_:
sparse_feature.append(
[hash(str(idx) + features[idx]) % hash_dim_])
label = [int(features[0])]
process_line = dense_feature, sparse_feature, label
feature_name = ["dense_feature"]
for idx in categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("label")
s = "click:" + str(label[0])
for i in dense_feature:
s += " dense_feature:" + str(i)
for i in range(1, 1 + len(categorical_range_)):
s += " " + str(i) + ":" + str(sparse_feature[i - 1][0])
print(s.strip())
yield None
return reader
d = CriteoDataset()
d.run_from_stdin()
sh download.sh
mkdir slot_train_data_full
for i in `ls ./train_data_full`
do
cat train_data_full/$i | python get_slot_data.py > slot_train_data_full/$i
done
mkdir slot_test_data_full
for i in `ls ./test_data_full`
do
cat test_data_full/$i | python get_slot_data.py > slot_test_data_full/$i
done
mkdir slot_train_data
for i in `ls ./train_data`
do
cat train_data/$i | python get_slot_data.py > slot_train_data/$i
done
mkdir slot_test_data
for i in `ls ./test_data`
do
cat test_data/$i | python get_slot_data.py > slot_test_data/$i
done
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim")
self.learning_rate = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
def net(self, input, is_infer=False):
self.sparse_inputs = self._sparse_data_var[1:]
self.dense_input = self._dense_data_var[0]
self.label_input = self._sparse_data_var[0]
def embedding_layer(input):
emb = fluid.contrib.layers.sparse_embedding(
input=input,
is_test=False,
# for distributed sparse embedding, dim0 just fake.
size=[1024, self.sparse_feature_dim],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()), )
emb_sum = fluid.layers.sequence_pool(input=emb, pool_type='sum')
return emb_sum
sparse_embed_seq = list(map(embedding_layer, self.sparse_inputs))
concated = fluid.layers.concat(
sparse_embed_seq + [self.dense_input], axis=1)
fcs = [concated]
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes")
for size in hidden_layers:
output = fluid.layers.fc(
input=fcs[-1],
size=size,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1.0 / math.sqrt(fcs[-1].shape[1]))))
fcs.append(output)
predict = fluid.layers.fc(
input=fcs[-1],
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fcs[-1].shape[1]))))
self.predict = predict
auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
label=self.label_input,
num_thresholds=2**12,
slide_steps=20)
if is_infer:
self._infer_results["AUC"] = auc
self._infer_results["BATCH_AUC"] = batch_auc
return
self._metrics["AUC"] = auc
self._metrics["BATCH_AUC"] = batch_auc
cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_mean(cost)
self._cost = avg_cost
def optimizer(self):
optimizer = fluid.optimizer.Adam(self.learning_rate, lazy_mode=True)
return optimizer
def infer_net(self):
pass
...@@ -259,3 +259,133 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc( ...@@ -259,3 +259,133 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc(
``` ```
完成上述组网后,我们最终可以通过训练拿到`avg_cost``auc`两个重要指标。 完成上述组网后,我们最终可以通过训练拿到`avg_cost``auc`两个重要指标。
## 流式训练(OnlineLearning)任务启动及配置流程
### 流式训练简介
流式训练是按照一定顺序进行数据的接收和处理,每接收一个数据,模型会对它进行预测并对当前模型进行更新,然后处理下一个数据。 像信息流、小视频、电商等场景,每天都会新增大量的数据, 让每天(每一刻)新增的数据基于上一天(上一刻)的模型进行新的预测和模型更新。
在大规模流式训练场景下, 需要使用的深度学习框架有对应的能力支持, 即:
* 支持大规模分布式训练的能力, 数据量巨大, 需要有良好的分布式训练及扩展能力,才能满足训练的时效要求
* 支持超大规模的Embedding, 能够支持十亿甚至千亿级别的Embedding, 拥有合理的参数输出的能力,能够快速输出模型参数并和线上其他系统进行对接
* Embedding的特征ID需要支持HASH映射,不要求ID的编码,能够自动增长及控制特征的准入(原先不存在的特征可以以适当的条件创建), 能够定期淘汰(能够以一定的策略进行过期的特征的清理) 并拥有准入及淘汰策略
* 最后就是要基于框架开发一套完备的流式训练的 trainer.py, 能够拥有完善的流式训练流程
### 使用ctr-dnn online learning 进行模型的训练
目前,PaddleRec基于飞桨分布式训练框架的能力,实现了这套流式训练的流程。 供大家参考和使用。我们基于`models/rank/ctr-dnn`修改了一个online_training的版本,供大家更好的理解和参考。
**注意**
1. 使用online learning 需要安装目前Paddle最新的开发者版本, 你可以从 https://www.paddlepaddle.org.cn/documentation/docs/zh/install/Tables.html#whl-dev 此处获得它,需要先卸载当前已经安装的飞桨版本,根据自己的Python环境下载相应的安装包。
2. 使用online learning 需要安装目前PaddleRec最新的开发者版本, 你可以通过 git clone https://github.com/PaddlePaddle/PaddleRec.git 得到最新版的PaddleRec并自行安装
### 启动方法
1. 修改config.yaml中的 hyper_parameters.distributed_embedding=1,表示打开大规模稀疏的模式
2. 修改config.yaml中的 mode: [single_cpu_train, single_cpu_infer] 中的 `single_cpu_train` 为online_learning_cluster,表示使用online learning对应的运行模式
3. 准备训练数据, ctr-dnn中使用的online learning对应的训练模式为 天级别训练, 每天又分为24个小时, 因此训练数据需要 天--小时的目录结构进行整理。
以 2020年08月10日 到 2020年08月11日 2天的训练数据举例, 用户需要准备的数据的目录结构如下:
```
train_data/
|-- 20200810
| |-- 00
| | `-- train.txt
| |-- 01
| | `-- train.txt
| |-- 02
| | `-- train.txt
| |-- 03
| | `-- train.txt
| |-- 04
| | `-- train.txt
| |-- 05
| | `-- train.txt
| |-- 06
| | `-- train.txt
| |-- 07
| | `-- train.txt
| |-- 08
| | `-- train.txt
| |-- 09
| | `-- train.txt
| |-- 10
| | `-- train.txt
| |-- 11
| | `-- train.txt
| |-- 12
| | `-- train.txt
| |-- 13
| | `-- train.txt
| |-- 14
| | `-- train.txt
| |-- 15
| | `-- train.txt
| |-- 16
| | `-- train.txt
| |-- 17
| | `-- train.txt
| |-- 18
| | `-- train.txt
| |-- 19
| | `-- train.txt
| |-- 20
| | `-- train.txt
| |-- 21
| | `-- train.txt
| |-- 22
| | `-- train.txt
| `-- 23
| `-- train.txt
`-- 20200811
|-- 00
| `-- train.txt
|-- 01
| `-- train.txt
|-- 02
| `-- train.txt
|-- 03
| `-- train.txt
|-- 04
| `-- train.txt
|-- 05
| `-- train.txt
|-- 06
| `-- train.txt
|-- 07
| `-- train.txt
|-- 08
| `-- train.txt
|-- 09
| `-- train.txt
|-- 10
| `-- train.txt
|-- 11
| `-- train.txt
|-- 12
| `-- train.txt
|-- 13
| `-- train.txt
|-- 14
| `-- train.txt
|-- 15
| `-- train.txt
|-- 16
| `-- train.txt
|-- 17
| `-- train.txt
|-- 18
| `-- train.txt
|-- 19
| `-- train.txt
|-- 20
| `-- train.txt
|-- 21
| `-- train.txt
|-- 22
| `-- train.txt
`-- 23
`-- train.txt
```
4. 准备好数据后, 即可按照标准的训练流程进行流式训练了
```shell
python -m paddlerec.run -m models/rerank/ctr-dnn/config.yaml
```
...@@ -49,6 +49,7 @@ hyper_parameters: ...@@ -49,6 +49,7 @@ hyper_parameters:
sparse_feature_dim: 9 sparse_feature_dim: 9
dense_input_dim: 13 dense_input_dim: 13
fc_sizes: [512, 256, 128, 32] fc_sizes: [512, 256, 128, 32]
distributed_embedding: 0
# select runner by name # select runner by name
mode: [single_cpu_train, single_cpu_infer] mode: [single_cpu_train, single_cpu_infer]
...@@ -90,6 +91,18 @@ runner: ...@@ -90,6 +91,18 @@ runner:
print_interval: 1 print_interval: 1
phases: [phase1] phases: [phase1]
- name: online_learning_cluster
class: cluster_train
runner_class_path: "{workspace}/online_learning_runner.py"
epochs: 2
device: cpu
fleet_mode: ps
save_checkpoint_interval: 1 # save model interval of epochs
save_checkpoint_path: "increment_dnn" # save checkpoint path
init_model_path: "" # load model path
print_interval: 1
phases: [phase1]
- name: collective_cluster - name: collective_cluster
class: cluster_train class: cluster_train
epochs: 2 epochs: 2
......
...@@ -25,8 +25,16 @@ class Model(ModelBase): ...@@ -25,8 +25,16 @@ class Model(ModelBase):
ModelBase.__init__(self, config) ModelBase.__init__(self, config)
def _init_hyper_parameters(self): def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_fleet_mode().upper( self.is_distributed = False
) == "PSLIB" else False self.distributed_embedding = False
if envs.get_fleet_mode().upper() == "PSLIB":
self.is_distributed = True
if envs.get_global_env("hyper_parameters.distributed_embedding",
0) == 1:
self.distributed_embedding = True
self.sparse_feature_number = envs.get_global_env( self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number") "hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env( self.sparse_feature_dim = envs.get_global_env(
...@@ -40,14 +48,26 @@ class Model(ModelBase): ...@@ -40,14 +48,26 @@ class Model(ModelBase):
self.label_input = self._sparse_data_var[0] self.label_input = self._sparse_data_var[0]
def embedding_layer(input): def embedding_layer(input):
emb = fluid.layers.embedding( if self.distributed_embedding:
input=input, emb = fluid.contrib.layers.sparse_embedding(
is_sparse=True, input=input,
is_distributed=self.is_distributed, size=[
size=[self.sparse_feature_number, self.sparse_feature_dim], self.sparse_feature_number, self.sparse_feature_dim
param_attr=fluid.ParamAttr( ],
name="SparseFeatFactors", param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform()), ) name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
else:
emb = fluid.layers.embedding(
input=input,
is_sparse=True,
is_distributed=self.is_distributed,
size=[
self.sparse_feature_number, self.sparse_feature_dim
],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
emb_sum = fluid.layers.sequence_pool(input=emb, pool_type='sum') emb_sum = fluid.layers.sequence_pool(input=emb, pool_type='sum')
return emb_sum return emb_sum
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册