diff --git a/doc/train.md b/doc/train.md index 16fad1b23783b5fe0c2a785f5500ba88c42ae356..b275b66f3b0ec7c88424c4c18afd855182136c41 100644 --- a/doc/train.md +++ b/doc/train.md @@ -20,7 +20,7 @@ python -m paddlerec.run -m paddlerec.models.xxx.yyy 例如启动`recall`下的`word2vec`模型的默认配置; ```shell -python -m paddlerec.run -m models/recall/word2vec +python -m paddlerec.run -m models/recall/word2vec/config.yaml ``` ### 2. 启动内置模型的个性化配置训练 diff --git a/models/rank/dnn/README.md b/models/rank/dnn/README.md index 9656adc655d6e2d8931861a492af3583906f98f5..d4167777220a12fc3d59c87a01cdf8dcac7dae4d 100644 --- a/models/rank/dnn/README.md +++ b/models/rank/dnn/README.md @@ -259,3 +259,133 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc( ``` 完成上述组网后,我们最终可以通过训练拿到`avg_cost`与`auc`两个重要指标。 + + +## 流式训练(OnlineLearning)任务启动及配置流程 + +### 流式训练简介 +流式训练是按照一定顺序进行数据的接收和处理,每接收一个数据,模型会对它进行预测并对当前模型进行更新,然后处理下一个数据。 像信息流、小视频、电商等场景,每天都会新增大量的数据, 让每天(每一刻)新增的数据基于上一天(上一刻)的模型进行新的预测和模型更新。 + +在大规模流式训练场景下, 需要使用的深度学习框架有对应的能力支持, 即: +* 支持大规模分布式训练的能力, 数据量巨大, 需要有良好的分布式训练及扩展能力,才能满足训练的时效要求 +* 支持超大规模的Embedding, 能够支持十亿甚至千亿级别的Embedding, 拥有合理的参数输出的能力,能够快速输出模型参数并和线上其他系统进行对接 +* Embedding的特征ID需要支持HASH映射,不要求ID的编码,能够自动增长及控制特征的准入(原先不存在的特征可以以适当的条件创建), 能够定期淘汰(能够以一定的策略进行过期的特征的清理) 并拥有准入及淘汰策略 +* 最后就是要基于框架开发一套完备的流式训练的 trainer.py, 能够拥有完善的流式训练流程 + +### 使用ctr-dnn online learning 进行模型的训练 +目前,PaddleRec基于飞桨分布式训练框架的能力,实现了这套流式训练的流程。 供大家参考和使用。我们基于`models/rank/ctr-dnn`修改了一个online_training的版本,供大家更好的理解和参考。 + +**注意** +1. 使用online learning 需要安装目前Paddle最新的开发者版本, 你可以从 https://www.paddlepaddle.org.cn/documentation/docs/zh/install/Tables.html#whl-dev 此处获得它,需要先卸载当前已经安装的飞桨版本,根据自己的Python环境下载相应的安装包。 +2. 使用online learning 需要安装目前PaddleRec最新的开发者版本, 你可以通过 git clone https://github.com/PaddlePaddle/PaddleRec.git 得到最新版的PaddleRec并自行安装 + +### 启动方法 +1. 修改config.yaml中的 hyper_parameters.distributed_embedding=1,表示打开大规模稀疏的模式 +2. 修改config.yaml中的 mode: [single_cpu_train, single_cpu_infer] 中的 `single_cpu_train` 为online_learning_cluster,表示使用online learning对应的运行模式 +3. 准备训练数据, ctr-dnn中使用的online learning对应的训练模式为 天级别训练, 每天又分为24个小时, 因此训练数据需要 天--小时的目录结构进行整理。 + 以 2020年08月10日 到 2020年08月11日 2天的训练数据举例, 用户需要准备的数据的目录结构如下: + ``` + train_data/ + |-- 20200810 + | |-- 00 + | | `-- train.txt + | |-- 01 + | | `-- train.txt + | |-- 02 + | | `-- train.txt + | |-- 03 + | | `-- train.txt + | |-- 04 + | | `-- train.txt + | |-- 05 + | | `-- train.txt + | |-- 06 + | | `-- train.txt + | |-- 07 + | | `-- train.txt + | |-- 08 + | | `-- train.txt + | |-- 09 + | | `-- train.txt + | |-- 10 + | | `-- train.txt + | |-- 11 + | | `-- train.txt + | |-- 12 + | | `-- train.txt + | |-- 13 + | | `-- train.txt + | |-- 14 + | | `-- train.txt + | |-- 15 + | | `-- train.txt + | |-- 16 + | | `-- train.txt + | |-- 17 + | | `-- train.txt + | |-- 18 + | | `-- train.txt + | |-- 19 + | | `-- train.txt + | |-- 20 + | | `-- train.txt + | |-- 21 + | | `-- train.txt + | |-- 22 + | | `-- train.txt + | `-- 23 + | `-- train.txt + `-- 20200811 + |-- 00 + | `-- train.txt + |-- 01 + | `-- train.txt + |-- 02 + | `-- train.txt + |-- 03 + | `-- train.txt + |-- 04 + | `-- train.txt + |-- 05 + | `-- train.txt + |-- 06 + | `-- train.txt + |-- 07 + | `-- train.txt + |-- 08 + | `-- train.txt + |-- 09 + | `-- train.txt + |-- 10 + | `-- train.txt + |-- 11 + | `-- train.txt + |-- 12 + | `-- train.txt + |-- 13 + | `-- train.txt + |-- 14 + | `-- train.txt + |-- 15 + | `-- train.txt + |-- 16 + | `-- train.txt + |-- 17 + | `-- train.txt + |-- 18 + | `-- train.txt + |-- 19 + | `-- train.txt + |-- 20 + | `-- train.txt + |-- 21 + | `-- train.txt + |-- 22 + | `-- train.txt + `-- 23 + `-- train.txt + ``` +4. 准备好数据后, 即可按照标准的训练流程进行流式训练了 + ```shell + python -m paddlerec.run -m models/rerank/ctr-dnn/config.yaml + ``` diff --git a/models/rank/dnn/config.yaml b/models/rank/dnn/config.yaml index aa84a5070470cba750f7832644a9ce676c1d4ddd..fdb4470b492ce1b1aece958ca20ac78903f84b46 100755 --- a/models/rank/dnn/config.yaml +++ b/models/rank/dnn/config.yaml @@ -49,6 +49,7 @@ hyper_parameters: sparse_feature_dim: 9 dense_input_dim: 13 fc_sizes: [512, 256, 128, 32] + distributed_embedding: 0 # select runner by name mode: [single_cpu_train, single_cpu_infer] @@ -90,6 +91,18 @@ runner: print_interval: 1 phases: [phase1] +- name: online_learning_cluster + class: cluster_train + runner_class_path: "{workspace}/online_learning_runner.py" + epochs: 2 + device: cpu + fleet_mode: ps + save_checkpoint_interval: 1 # save model interval of epochs + save_checkpoint_path: "increment_dnn" # save checkpoint path + init_model_path: "" # load model path + print_interval: 1 + phases: [phase1] + - name: collective_cluster class: cluster_train epochs: 2 diff --git a/models/rank/dnn/model.py b/models/rank/dnn/model.py index ac6f0b946ef4dfd753b79d50a6ec34d099298698..b614934bebcbec342ecd6e711d3864d6ad506faa 100755 --- a/models/rank/dnn/model.py +++ b/models/rank/dnn/model.py @@ -25,8 +25,16 @@ class Model(ModelBase): ModelBase.__init__(self, config) def _init_hyper_parameters(self): - self.is_distributed = True if envs.get_fleet_mode().upper( - ) == "PSLIB" else False + self.is_distributed = False + self.distributed_embedding = False + + if envs.get_fleet_mode().upper() == "PSLIB": + self.is_distributed = True + + if envs.get_global_env("hyper_parameters.distributed_embedding", + 0) == 1: + self.distributed_embedding = True + self.sparse_feature_number = envs.get_global_env( "hyper_parameters.sparse_feature_number") self.sparse_feature_dim = envs.get_global_env( @@ -40,14 +48,26 @@ class Model(ModelBase): self.label_input = self._sparse_data_var[0] def embedding_layer(input): - emb = fluid.layers.embedding( - input=input, - is_sparse=True, - is_distributed=self.is_distributed, - size=[self.sparse_feature_number, self.sparse_feature_dim], - param_attr=fluid.ParamAttr( - name="SparseFeatFactors", - initializer=fluid.initializer.Uniform()), ) + if self.distributed_embedding: + emb = fluid.contrib.layers.sparse_embedding( + input=input, + size=[ + self.sparse_feature_number, self.sparse_feature_dim + ], + param_attr=fluid.ParamAttr( + name="SparseFeatFactors", + initializer=fluid.initializer.Uniform())) + else: + emb = fluid.layers.embedding( + input=input, + is_sparse=True, + is_distributed=self.is_distributed, + size=[ + self.sparse_feature_number, self.sparse_feature_dim + ], + param_attr=fluid.ParamAttr( + name="SparseFeatFactors", + initializer=fluid.initializer.Uniform())) emb_sum = fluid.layers.sequence_pool(input=emb, pool_type='sum') return emb_sum diff --git a/models/rank/dnn/online_learning_runner.py b/models/rank/dnn/online_learning_runner.py new file mode 100644 index 0000000000000000000000000000000000000000..fa4e505ba5ff8b77c95a05c0a4f190ccb8b1909b --- /dev/null +++ b/models/rank/dnn/online_learning_runner.py @@ -0,0 +1,89 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import time +import warnings +import numpy as np +import logging +import paddle.fluid as fluid + +from paddlerec.core.utils import envs +from paddlerec.core.metric import Metric +from paddlerec.core.trainers.framework.runner import RunnerBase + +logging.basicConfig( + format='%(asctime)s - %(levelname)s: %(message)s', level=logging.INFO) + + +class OnlineLearningRunner(RunnerBase): + def __init__(self, context): + print("Running OnlineLearningRunner.") + + def run(self, context): + epochs = int( + envs.get_global_env("runner." + context["runner_name"] + + ".epochs")) + model_dict = context["env"]["phase"][0] + model_class = context["model"][model_dict["name"]]["model"] + metrics = model_class._metrics + + dataset_list = [] + dataset_index = 0 + for day_index in range(len(days)): + day = days[day_index] + cur_path = "%s/%s" % (path, str(day)) + filelist = fleet.split_files(hdfs_ls([cur_path])) + dataset = create_dataset(use_var, filelist) + dataset_list.append(dataset) + dataset_index += 1 + + dataset_index = 0 + for epoch in range(len(days)): + day = days[day_index] + begin_time = time.time() + result = self._run(context, model_dict) + end_time = time.time() + seconds = end_time - begin_time + message = "epoch {} done, use time: {}".format(epoch, seconds) + + # TODO, wait for PaddleCloudRoleMaker supports gloo + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + if context["fleet"] is not None and isinstance(context["fleet"], + GeneralRoleMaker): + metrics_result = [] + for key in metrics: + if isinstance(metrics[key], Metric): + _str = metrics[key].calc_global_metrics( + context["fleet"], + context["model"][model_dict["name"]]["scope"]) + metrics_result.append(_str) + elif result is not None: + _str = "{}={}".format(key, result[key]) + metrics_result.append(_str) + if len(metrics_result) > 0: + message += ", global metrics: " + ", ".join(metrics_result) + print(message) + with fluid.scope_guard(context["model"][model_dict["name"]][ + "scope"]): + train_prog = context["model"][model_dict["name"]][ + "main_program"] + startup_prog = context["model"][model_dict["name"]][ + "startup_program"] + with fluid.program_guard(train_prog, startup_prog): + self.save(epoch, context, True) + + context["status"] = "terminal_pass" diff --git a/models/treebased/tdm/build_tree.md b/models/treebased/tdm/build_tree.md new file mode 100644 index 0000000000000000000000000000000000000000..37ecb68f8bd48e3583a843b68c64ee8c1bd08f38 --- /dev/null +++ b/models/treebased/tdm/build_tree.md @@ -0,0 +1,19 @@ + + +wget https://paddlerec.bj.bcebos.com/utils/tree_build_utils.tar.gz --no-check-certificate + +# input_path: embedding的路径 +# emb_shape: embedding中key-value,value的维度 +# emb格式要求: embedding_id(int64),embedding(float),embedding(float),......,embedding(float) +# cluster_threads: 建树聚类所用线程 +python_172_anytree/bin/python -u main.py --input_path=./gen_emb/item_emb.txt --output_path=./ --emb_shape=24 --cluster_threads=4 + +建树流程是:1、读取emb -> 2、kmeans聚类 -> 3、聚类结果整理为树 -> 4、基于树结构得到模型所需的4个文件 + 1 Layer_list:记录了每一层都有哪些节点。训练用 + 2 Travel_list:记录每个叶子节点的Travel路径。训练用 + 3 Tree_Info:记录了每个节点的信息,主要为:是否是item/item_id,所在层级,父节点,子节点。检索用 + 4 Tree_Embedding:记录所有节点的Embedding。训练及检索用 + +注意一下训练数据输入的item是建树之前用的item id,还是基于树的node id,还是基于叶子的leaf id,在tdm_reader.py中,可以加载字典,做映射。 +用厂内版建树得到的输出文件夹里,有名为id2nodeid.txt的映射文件,格式是『hash值』+ 『树节点ID』+『叶子节点ID(表示第几个叶子节点,tdm_sampler op 所需的输入)』 +在另一个id2bidword.txt中,也有映射关系,格式是『hash值』+『原始item ID』,这个文件中仅存储了叶子节点的信息。