未验证 提交 ab2a43a4 编写于 作者: C Chengmo 提交者: GitHub

add tdm_demo (#4546)

* add tdm_demo
上级 6c87d487
# Paddle-TDM
本代码库提供了基于PaddlePaddle实现的TDM召回算法,主要包含以下组成:
- 基于fake数据集,适用于快速调试的paddle-tdm-demo模型。主要用于理解paddle-tdm的设计原理,高效上手设计适合您的使用场景的模型。
- 基于User-Behavier数据集,复现TDM原始论文的paddle-tdm-dnn模型。
- 基于User-Behavier数据集,复现TDM原始论文的paddle-tdm-attention模型。
- 基于User-Behavier数据集,实现JTM原始论文设计思想的paddle-jtm模型。
- Paddle-TDM-Serving,快速部署paddle-tdm模型,实现高效检索。
以上内容将随paddle版本迭代不断更新,欢迎您关注该代码库。
#
## 设计思路
### 基本概念
TDM是为大规模推荐系统设计的、能承载任意先进模型来高效检索用户兴趣的推荐算法解决方案。TDM基于树结构,提出了一套对用户兴趣度量进行层次化建模与检索的方法论,使得系统能直接利高级深度学习模型在全库范围内检索用户兴趣。其基本原理是使用树结构对全库item进行索引,然后训练深度模型以支持树上的逐层检索,从而将大规模推荐中全库检索的复杂度由O(n)(n为所有item的量级)下降至O(log n)。
### 核心问题
1. 如何构建树结构?
2. 如何基于树结构做深度学习模型的训练?
3. 如何基于树及模型进行高效检索?
### PaddlePaddle的TDM方案
1. 树结构的数据,来源于各个业务的实际场景,构造方式各有不同,paddle-TDM一期暂不提供统一的树的构造流程,但会统一树构造好之后,输入paddle网络的数据组织形式。业务方可以使用任意工具构造自己的树,生成指定的数据格式,参与tdm网络训练。
2. 网络训练中,有三个核心问题:
- 如何组网?答:paddle封装了大量的深度学习OP,用户可以根据需求设计自己的网络结构。
- 训练数据如何组织?答:tdm的训练数据主要为:`user/query emb``item`的正样本,`item`需要映射到树的某个叶子节点。用户只需准备符合该构成的数据即可。负样本的生成,会基于用户提供的树结构,以及paddle提供的`tdm-sampler op`完成高效的负采样,并自动添加相应的label,参与tdm中深度学习模型的训练。
- 大规模的数据与模型训练如何实现?答:基于paddle优秀的大规模参数服务器分布式能力,可以实现高效的分布式训练。基于paddle-fleet api,学习门槛极低,且可以灵活的支持增量训练,流式训练等业务需求。
3. 训练好模型后,可以基于paddle,将检索与打分等流程都融入paddle的组网中,生成inference_model与参数文件,基于PaddlePaddle的预测库或者PaddleLite进行快速部署与高效检索。
#
\ No newline at end of file
此差异已折叠。
# -*- coding=utf-8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import six
import argparse
def str2bool(v):
"""
str2bool
"""
# because argparse does not support to parse "true, False" as python
# boolean directly
return v.lower() in ("true", "t", "1")
class ArgumentGroup(object):
"""
ArgumentGroup
"""
def __init__(self, parser, title, des):
"""
init
"""
self._group = parser.add_argument_group(title=title, description=des)
def add_arg(self, name, type, default, help, **kwargs):
"""
add_arg
"""
type = str2bool if type == bool else type
# if type == list: # by dwk
# self._group.add_argument("--" + name, nargs='+', type=int)
# else:
self._group.add_argument(
"--" + name,
default=default,
type=type,
help=help + ' Default: %(default)s.',
**kwargs)
def parse_args():
"""
parse_args
"""
# global
parser = argparse.ArgumentParser("main")
main_g = ArgumentGroup(parser, "main", "global conf")
main_g.add_arg("random_seed", int, 0, "random_seed")
main_g.add_arg("cpu_num", int, 1, "cpu_num")
main_g.add_arg("is_local", bool, False,
"whether to perform local training")
main_g.add_arg("is_cloud", bool, False, "")
main_g.add_arg("is_test", bool, False, "")
main_g.add_arg("sync_mode", str, "async", "distributed traing mode")
main_g.add_arg("need_trace", bool, False, "")
main_g.add_arg("need_detail", bool, False, "")
# model
model_g = ArgumentGroup(
parser, "model", "options to init, resume and save model.")
model_g.add_arg("epoch_num", int, 3, "number of epochs for train")
model_g.add_arg("batch_size", int, 16, "batch size for train")
model_g.add_arg("learning_rate", float, 5e-5,
"learning rate for global training")
model_g.add_arg("layer_size", int, 4, "layer size")
model_g.add_arg("node_nums", int, 26, "tree node nums")
model_g.add_arg("node_emb_size", int, 64, "node embedding size")
model_g.add_arg("query_emb_size", int, 768, "input query embedding size")
model_g.add_arg("neg_sampling_list", list, [
1, 2, 3, 4], "nce sample nums at every layer")
model_g.add_arg("layer_node_num_list", list, [
2, 4, 7, 12], "node nums at every layer")
model_g.add_arg("leaf_node_num", int, 13, "leaf node nums")
# for infer
model_g.add_arg("child_nums", int, 2, "child node of ancestor node")
model_g.add_arg("topK", int, 1, "best recall result nums")
model_g = ArgumentGroup(
parser, "path", "files path of data & model.")
model_g.add_arg("train_files_path", str, "./data/train", "train data path")
model_g.add_arg("test_files_path", str, "./data/test", "test data path")
model_g.add_arg("model_files_path", str, "./models", "model data path")
# build tree and warm up
model_g.add_arg("build_tree_init_path", str,
"./data/gen_tree/demo_fake_input.txt", "build tree embedding path")
model_g.add_arg("warm-up", bool, False,
"warm up, builing new tree.")
model_g.add_arg("rebuild_tree_per_epochs", int, -1,
"re-build tree per epochs, -1 means don't re-building")
model_g.add_arg("tree_info_init_path", str,
"./thirdparty/tree_info.txt", "embedding file path")
model_g.add_arg("tree_travel_init_path", str,
"./thirdparty/travel_list.txt", "TDM tree travel file path")
model_g.add_arg("tree_layer_init_path", str,
"./thirdparty/layer_list.txt", "TDM tree layer file path")
model_g.add_arg("tree_emb_init_path", str,
"./thirdparty/tree_emb.txt", "TDM tree emb file path")
model_g.add_arg("load_model", bool, False,
"whether load model(paddle persistables model)")
model_g.add_arg("save_init_model", bool, False,
"whether save init model(paddle persistables model)")
model_g.add_arg("init_model_files_path", str, "./models/init_model",
"init model params by paddle model files for training")
model_g.add_arg("infer_model_files_path", str, "./models/init_model",
"model files path for infer")
args = parser.parse_args()
return args
def print_arguments(args):
"""
print arguments
"""
print('----------- Configuration Arguments -----------')
for arg, value in sorted(six.iteritems(vars(args))):
print('%s: %s' % (arg, value))
print('------------------------------------------------')
DIRS=`pwd`
data_path="${DIRS}/data"
train_files_path="${data_path}/train"
test_files_path="${data_path}/test"
model_files_path="${DIRS}/model"
# init_model will download in thirdparty folder when do paddlecloud training
init_model_files_path="${model_files_path}/init_model"
thirdparty_path="${DIRS}/thirdparty"
tree_travel_init_path="${thirdparty_path}/travel_list.txt"
tree_layer_init_path="${thirdparty_path}/layer_list.txt"
tree_info_init_path="${thirdparty_path}/tree_info.txt"
python_bin="python"
echo `pwd`
function main() {
cmd="${python_bin} distributed_train.py \
--is_cloud=0 \
--sync_mode=async \
--cpu_num=1 \
--random_seed=0 \
--epoch_num=1 \
--batch_size=32 \
--learning_rate=3e-4 \
--train_files_path=${train_files_path} \
--test_files_path=${test_files_path} \
--model_files_path=${model_files_path} \
--init_model_files_path=${init_model_files_path} \
--tree_travel_init_path=${tree_travel_init_path} \
--tree_info_init_path=${tree_info_init_path} \
--tree_layer_init_path=${tree_layer_init_path} "
echo ${cmd}
${cmd}
}
main "$@"
# -*- coding=utf8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import json
import pickle
import time
import random
import os
import numpy as np
import sys
import paddle.fluid.incubate.data_generator as dg
class TDMDataset(dg.MultiSlotStringDataGenerator):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def infer_reader(self, infer_file_list, batch):
"""
Read test_data line by line & yield batch
"""
def local_iter():
"""Read file line by line"""
for fname in infer_file_list:
with open(fname, "r") as fin:
for line in fin:
one_data = (line.strip('\n')).split('\t')
input_emb = one_data[0].split(' ')
yield [input_emb]
import paddle
batch_iter = paddle.batch(local_iter, batch)
return batch_iter
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def iterator():
"""
This function needs to be implemented by the user, based on data format
"""
features = (line.strip('\n')).split('\t')
input_emb = features[0].split(' ')
item_label = [features[1]]
feature_name = ["input_emb", "item_label"]
yield zip(feature_name, [input_emb] + [item_label])
return iterator
if __name__ == "__main__":
d = TDMDataset()
d.run_from_stdin()
# -*- coding=utf-8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import print_function
import os
import time
import numpy as np
import logging
import random
from shutil import copyfile
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from args import print_arguments, parse_args
from utils import tdm_sampler_prepare, tdm_child_prepare, tdm_emb_prepare
from train_network import TdmTrainNet
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
hadoop_home = os.getenv("HADOOP_HOME")
configs = {
"fs.default.name": os.getenv("FS_NAME"),
"hadoop.job.ugi": os.getenv("FS_UGI")
}
client = HDFSClient(hadoop_home, configs)
def get_dataset(inputs, args):
"""get dataset"""
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command("python ./dataset_generator.py")
dataset.set_batch_size(args.batch_size)
dataset.set_thread(int(args.cpu_num))
file_list = [
str(args.train_files_path) + "/%s" % x
for x in os.listdir(args.train_files_path)
]
# 请确保每一个训练节点都持有不同的训练文件
# 当我们用本地多进程模拟分布式时,每个进程需要拿到不同的文件
# 使用 fleet.split_files 可以便捷的以文件为单位分配训练样本
if not int(args.is_cloud):
file_list = fleet.split_files(file_list)
logger.info("file list: {}".format(file_list))
total_example_num = get_example_num(file_list)
return dataset, file_list, total_example_num
def train(args):
"""run train"""
# set random
program = fluid.default_main_program()
program.random_seed = args.random_seed
# 根据环境变量确定当前机器/进程在分布式训练中扮演的角色
# 然后使用 fleet api的 init()方法初始化这个节点
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
# 我们还可以进一步指定分布式的运行模式,通过 DistributeTranspilerConfig进行配置
# 如下,我们设置分布式运行模式为异步(async),同时将参数进行切分,以分配到不同的节点
if args.sync_mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif args.sync_mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
elif args.sync_mode == "async":
strategy = StrategyFactory.create_async_strategy()
# set model
logger.info("TDM Begin build network.")
tdm_model = TdmTrainNet(args)
inputs = tdm_model.input_data()
logger.info("TDM Begin load tree travel & layer.")
avg_cost, acc = tdm_model.tdm(inputs)
logger.info("TDM End build network.")
# 配置分布式的optimizer,传入我们指定的strategy,构建program
optimizer = fluid.optimizer.AdamOptimizer(
learning_rate=args.learning_rate, lazy_mode=True)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
logger.info("TDM End append backward.")
# 根据节点角色,分别运行不同的逻辑
if fleet.is_server():
logger.info("TDM Run server ...")
# 初始化及运行参数服务器节点
logger.info("TDM init model path: {}".format(
args.init_model_files_path))
# 模型中除了tdm树结构相关的变量都应该在此处初始化
fleet.init_server(args.init_model_files_path)
lr = fluid.global_scope().find_var("learning_rate_0")
if lr:
lr.get_tensor().set(np.array(args.learning_rate).astype('float32'),
fluid.CPUPlace())
logger.info("TDM Set learning rate {}".format(args.learning_rate))
else:
logger.info("TDM Didn't find learning_rate_0 param")
logger.info("TDM load End")
fleet.run_server()
logger.info("TDM Run server success!")
elif fleet.is_worker():
logger.info("TDM Run worker ...")
# 初始化工作节点
fleet.init_worker()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
logger.info("TDM Run Startup Begin")
# 初始化含有分布式流程的fleet.startup_program
exe.run(fleet.startup_program)
# Set Learning Rate
lr = fluid.global_scope().find_var("learning_rate_0")
if lr:
lr.get_tensor().set(np.array(args.learning_rate).astype('float32'),
place)
logger.info("TDM Set learning rate {}".format(args.learning_rate))
# Set TDM Variable
logger.info("TDM Begin load parameter.")
# Set TDM_Tree_Info
# 树结构相关的变量不参与网络更新,不存储于参数服务器,因此需要在本地手动Set
tdm_param_prepare_dict = tdm_sampler_prepare(args)
tdm_param_prepare_dict['info_array'] = tdm_child_prepare(args)
Numpy_model = {}
Numpy_model['TDM_Tree_Travel'] = tdm_param_prepare_dict['travel_array']
Numpy_model['TDM_Tree_Layer'] = tdm_param_prepare_dict['layer_array']
Numpy_model['TDM_Tree_Info'] = tdm_param_prepare_dict['info_array']
# Numpy_model['TDM_Tree_Emb'] = tdm_emb_prepare(args)
# 分布式训练中,Emb存储与参数服务器,无需在本地set
for param_name in Numpy_model:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
param_t.set(Numpy_model[str(param_name)].astype('int32'), place)
logger.info("TDM Run Startup End")
# Train loop
dataset, file_list, example_num = get_dataset(inputs, args)
logger.info("TDM Distributed training begin ...")
for epoch in range(args.epoch_num):
# local shuffle
random.shuffle(file_list)
dataset.set_filelist(file_list)
# 训练节点运行的是经过分布式裁剪的fleet.mian_program
start_time = time.time()
exe.train_from_dataset(program=fleet.main_program,
dataset=dataset,
fetch_list=[acc, avg_cost],
fetch_info=["Epoch {} acc ".format(
epoch), "Epoch {} loss ".format(epoch)],
print_period=1,
debug=False)
end_time = time.time()
logger.info("Epoch {} finished, use time {} second, speed {} example/s".format(
epoch, end_time - start_time, example_num * 1.0 / (end_time - start_time)))
# 默认使用0号节点保存模型
if fleet.is_first_worker():
model_path = os.path.join(
args.model_files_path, "epoch_" + str(epoch))
fleet.save_persistables(executor=exe, dirname=model_path)
logger.info("Begin upload files")
# upload_files(model_path, warm_up=False)
# 在分布式环境下时,支持上传模型到hdfs
logger.info("TDM Before stop worker")
fleet.stop_worker()
logger.info("TDM Distributed training success!")
def upload_files(local_path, warm_up=False):
"""
upload files to hdfs
"""
remote = os.getenv("OUTPUT_PATH")
job_id = os.getenv("SYS_JOB_ID")
local = local_path.split('/')[-1]
remote_path = "{}/{}/{}/{}".format(remote, job_id, "model", local)
client.makedirs(remote_path)
hadoop_path = "{}/".format(remote_path)
def is_adam_param(name):
adam_name = ['bias_beta', 'bias_moment',
'moment1_0', 'moment2_0', 'pow_acc']
for i in adam_name:
if i in name:
return True
return False
if not warm_up:
infer_model_path = os.path.join(os.getcwd(), 'infer_model_'+local)
if not os.path.exists(infer_model_path):
os.makedirs(infer_model_path)
for root, _, files in os.walk(local_path):
for f in files:
if not is_adam_param(f):
copyfile(os.path.join(root, f),
os.path.join(infer_model_path, f))
local_path = infer_model_path
client.upload(hdfs_path=hadoop_path, local_path=local_path,
multi_processes=5, overwrite=False,
retry_times=3)
def get_example_num(file_list):
"""
Count the number of samples in the file
"""
count = 0
for f in file_list:
last_count = count
for index, line in enumerate(open(f, 'r')):
count += 1
logger.info("file : %s has %s example" % (f, count - last_count))
logger.info("Total example : %s" % count)
return count
if __name__ == "__main__":
print(os.getcwd())
args = parse_args()
print_arguments(args)
train(args)
# -*- coding=utf-8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import math
import argparse
import numpy as np
import paddle.fluid as fluid
from utils import tdm_sampler_prepare, tdm_child_prepare, trace_var
from train_network import DnnLayerClassifierNet, InputTransNet
class TdmInferNet(object):
def __init__(self, args):
self.input_embed_size = args.query_emb_size
self.node_embed_size = args.node_emb_size
self.label_nums = 2 # label为正负两类
self.node_nums = args.node_nums
self.max_layers = args.layer_size
self.batch_size = args.batch_size
self.topK = args.topK # 最终召回多少个item
self.child_nums = args.child_nums # 若树为二叉树,则child_nums=2
self.layer_list = self.get_layer_list(args)
self.first_layer_idx = 0
self.first_layer_node = self.create_first_layer(args)
self.layer_classifier = DnnLayerClassifierNet(args)
self.input_trans_net = InputTransNet(args)
def input_data(self):
input_emb = fluid.layers.data(
name="input_emb",
shape=[self.input_embed_size],
dtype="float32",
)
# first_layer 与 first_layer_mask 对应着infer起始层的节点
first_layer = fluid.layers.data(
name="first_layer_node",
shape=[1],
dtype="int64",
lod_level=1,
)
first_layer_mask = fluid.layers.data(
name="first_layer_node_mask",
shape=[1],
dtype="int64",
lod_level=1,
)
inputs = [input_emb] + [first_layer] + [first_layer_mask]
return inputs
def get_layer_list(self, args):
"""get layer list from layer_list.txt"""
layer_list = []
with open(args.tree_layer_init_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
l.append(node)
layer_list.append(l)
return layer_list
def create_first_layer(self, args):
"""decide which layer to start infer"""
first_layer_id = 0
for idx, layer_node in enumerate(args.layer_node_num_list):
if layer_node >= self.topK:
first_layer_id = idx
break
first_layer_node = self.layer_list[first_layer_id]
self.first_layer_idx = first_layer_id
return first_layer_node
def infer_net(self, inputs):
"""
infer的主要流程
infer的基本逻辑是:从上层开始(具体层idx由树结构及TopK值决定)
1、依次通过每一层分类器,得到当前层输入的指定节点的prob
2、根据prob值大小,取topK的节点,取这些节点的孩子节点作为下一层的输入
3、循环1、2步骤,遍历完所有层,得到每一层筛选结果的集合
4、将筛选结果集合中的叶子节点,拿出来再做一次topK,得到最终的召回输出
"""
input_emb = inputs[0]
current_layer_node = inputs[1]
current_layer_child_mask = inputs[2]
node_score = []
node_list = []
input_trans_emb = self.input_trans_net.input_fc_infer(input_emb)
for layer_idx in range(self.first_layer_idx, self.max_layers):
# 确定当前层的需要计算的节点数
if layer_idx == self.first_layer_idx:
current_layer_node_num = len(self.first_layer_node)
else:
current_layer_node_num = current_layer_node.shape[1] * \
current_layer_node.shape[2]
current_layer_node = fluid.layers.reshape(
current_layer_node, [-1, current_layer_node_num])
current_layer_child_mask = fluid.layers.reshape(
current_layer_child_mask, [-1, current_layer_node_num])
node_emb = fluid.embedding(
input=current_layer_node,
size=[self.node_nums, self.node_embed_size],
param_attr=fluid.ParamAttr(name="TDM_Tree_Emb"))
input_fc_out = self.input_trans_net.layer_fc_infer(
input_trans_emb, layer_idx)
# 过每一层的分类器
layer_classifier_res = self.layer_classifier.classifier_layer_infer(input_fc_out,
node_emb,
layer_idx)
# 过最终的判别分类器
tdm_fc = fluid.layers.fc(input=layer_classifier_res,
size=self.label_nums,
act=None,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(
name="tdm.cls_fc.weight"),
bias_attr=fluid.ParamAttr(name="tdm.cls_fc.bias"))
prob = fluid.layers.softmax(tdm_fc)
positive_prob = fluid.layers.slice(
prob, axes=[2], starts=[1], ends=[2])
prob_re = fluid.layers.reshape(
positive_prob, [-1, current_layer_node_num])
# 过滤掉padding产生的无效节点(node_id=0)
node_zero_mask = fluid.layers.cast(current_layer_node, 'bool')
node_zero_mask = fluid.layers.cast(node_zero_mask, 'float')
prob_re = prob_re * node_zero_mask
# 在当前层的分类结果中取topK,并将对应的score及node_id保存下来
k = self.topK
if current_layer_node_num < self.topK:
k = current_layer_node_num
_, topk_i = fluid.layers.topk(prob_re, k)
# index_sample op根据下标索引tensor对应位置的值
# 若paddle版本>2.0,调用方式为paddle.index_sample
top_node = fluid.contrib.layers.index_sample(
current_layer_node, topk_i)
prob_re_mask = prob_re * current_layer_child_mask # 过滤掉非叶子节点
topk_value = fluid.contrib.layers.index_sample(
prob_re_mask, topk_i)
node_score.append(topk_value)
node_list.append(top_node)
# 取当前层topK结果的孩子节点,作为下一层的输入
if layer_idx < self.max_layers - 1:
# tdm_child op 根据输入返回其 child 及 child_mask
# 若child是叶子节点,则child_mask=1,否则为0
current_layer_node, current_layer_child_mask = \
fluid.contrib.layers.tdm_child(x=top_node,
node_nums=self.node_nums,
child_nums=self.child_nums,
param_attr=fluid.ParamAttr(
name="TDM_Tree_Info"),
dtype='int64')
total_node_score = fluid.layers.concat(node_score, axis=1)
total_node = fluid.layers.concat(node_list, axis=1)
# 考虑到树可能是不平衡的,计算所有层的叶子节点的topK
res_score, res_i = fluid.layers.topk(total_node_score, self.topK)
res_layer_node = fluid.contrib.layers.index_sample(total_node, res_i)
res_node = fluid.layers.reshape(res_layer_node, [-1, self.topK, 1])
# 利用Tree_info信息,将node_id转换为item_id
tree_info = fluid.default_main_program().global_block().var("TDM_Tree_Info")
res_node_emb = fluid.layers.gather_nd(tree_info, res_node)
res_item = fluid.layers.slice(
res_node_emb, axes=[2], starts=[0], ends=[1])
res_item_re = fluid.layers.reshape(res_item, [-1, self.topK])
return res_item_re
#!/bin/bash
echo "WARNING: This script only for run PaddlePaddle Fluid on one node"
CLUSTER_DIRS="./"
if [ ! -d "${CLUSTER_DIRS}/model" ]; then
mkdir "${CLUSTER_DIRS}/model"
echo "Create model folder for store infer model"
fi
if [ ! -d "${CLUSTER_DIRS}/log" ]; then
mkdir "${CLUSTER_DIRS}/log"
echo "Create log floder for store running log"
fi
if [ ! -d "${CLUSTER_DIRS}/output" ]; then
mkdir "${CLUSTER_DIRS}/output"
echo "Create output floder"
fi
# environment variables for fleet distribute training
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=1
export OUTPUT_PATH="output"
export SYS_JOB_ID="test"
export FLAGS_communicator_thread_pool_size=5
export FLAGS_communicator_fake_rpc=0
export FLAGS_communicator_is_sgd_optimizer=0
export FLAGS_communicator_send_queue_size=1
export FLAGS_communicator_max_merge_var_num=1
export FLAGS_communicator_max_send_grad_num_before_recv=1
export FLAGS_communicator_min_send_grad_num_before_recv=1
export FLAGS_rpc_retry_times=3
export PADDLE_PSERVERS_IP_PORT_LIST="127.0.0.1:36001"
export PADDLE_PSERVER_PORT_ARRAY=(36001)
export PADDLE_PSERVER_NUMS=1
export PADDLE_TRAINERS=1
export TRAINING_ROLE=PSERVER
export GLOG_v=0
export GLOG_logtostderr=0
ps -ef|grep python|awk '{print $2}'|xargs kill -9
train_mode=$1
for((i=0;i<$PADDLE_PSERVER_NUMS;i++))
do
cur_port=${PADDLE_PSERVER_PORT_ARRAY[$i]}
echo "PADDLE WILL START PSERVER "$cur_port
export PADDLE_PORT=${cur_port}
export POD_IP=127.0.0.1
sh ./async_train.sh &> ./log/pserver.$i.log &
done
export TRAINING_ROLE=TRAINER
export GLOG_v=0
export GLOG_logtostderr=0
for((i=0;i<$PADDLE_TRAINERS;i++))
do
echo "PADDLE WILL START Trainer "$i
PADDLE_TRAINER_ID=$i
sh ./async_train.sh &> ./log/trainer.$i.log &
done
echo "Training log stored in ./log/"
# -*- coding=utf-8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import print_function
import os
import time
import numpy as np
import logging
import argparse
import paddle
import paddle.fluid as fluid
from paddle.fluid import profiler
from args import print_arguments, parse_args
from infer_network import TdmInferNet
from dataset_generator import TDMDataset
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def to_tensor(data, place):
"""
Convert data to paddle tensor
"""
flattened_data = np.concatenate(data, axis=0).astype("float32")
flattened_data = flattened_data.reshape([-1, 768])
res = fluid.Tensor()
res.set(flattened_data, place)
return res
def data2tensor(data, place):
"""
Dataset prepare
"""
input_emb = to_tensor([x[0] for x in data], place)
return input_emb
def run_infer(args, model_path):
"""run infer"""
logger.info("Infer Begin")
file_list = [
str(args.test_files_path) + "/%s" % x
for x in os.listdir(args.test_files_path)
]
tdm_model = TdmInferNet(args)
inputs = tdm_model.input_data()
res_item = tdm_model.infer_net(inputs)
test_reader = TDMDataset().infer_reader(file_list, args.batch_size)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
path = os.path.join(args.model_files_path, model_path)
fluid.io.load_persistables(
executor=exe,
dirname=path,
main_program=fluid.default_main_program())
logger.info("Load persistables from \"{}\"".format(path))
if args.save_init_model:
logger.info("Begin Save infer model.")
model_path = (str(args.model_files_path) + "/" + "infer_model")
fluid.io.save_inference_model(executor=exe, dirname=model_path,
feeded_var_names=[
'input_emb', 'first_layer_node', 'first_layer_node_mask'],
target_vars=[res_item])
logger.info("End Save infer model.")
first_layer_node = tdm_model.first_layer_node
first_layer_nums = len(first_layer_node)
first_layer_node = np.array(first_layer_node)
first_layer_node = first_layer_node.reshape((1, -1)).astype('int64')
first_layer_node = first_layer_node.repeat(args.batch_size, axis=0)
# 在demo中,假设infer起始层的节点都不是叶子节点,mask=0
# 若真实的起始层含有叶子节点,则对应位置的 mask=1
first_layer_mask = (
np.zeros((args.batch_size, first_layer_nums))).astype('int64')
for batch_id, data in enumerate(test_reader()):
input_emb = data2tensor(data, place)
item_res = exe.run(fluid.default_main_program(),
feed={"input_emb": input_emb,
"first_layer_node": first_layer_node,
"first_layer_node_mask": first_layer_mask},
fetch_list=[res_item])
logger.info("TEST --> batch: {} infer_item {}".format(
batch_id, item_res))
logger.info("Inference complete!")
if __name__ == "__main__":
args = parse_args()
print_arguments(args)
# 在此处指定infer模型所在的文件夹
path = "epoch_0"
run_infer(args, path)
# -*- coding=utf-8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import print_function
import os
import time
import numpy as np
import logging
import argparse
import paddle
import paddle.fluid as fluid
from args import print_arguments, parse_args
from utils import tdm_sampler_prepare, tdm_child_prepare
from train_network import TdmTrainNet
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def get_dataset(inputs, args):
"""
get dataset
"""
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command("python ./dataset_generator.py")
dataset.set_batch_size(args.batch_size)
dataset.set_thread(int(args.cpu_num))
file_list = [
str(args.train_files_path) + "/%s" % x
for x in os.listdir(args.train_files_path)
]
dataset.set_filelist(file_list)
logger.info("file list: {}".format(file_list))
return dataset
def run_train(args):
"""
run train
"""
logger.info("TDM Begin build network.")
tdm_model = TdmTrainNet(args)
inputs = tdm_model.input_data()
avg_cost, acc = tdm_model.tdm(inputs)
logger.info("TDM End build network.")
dataset = get_dataset(inputs, args)
optimizer = fluid.optimizer.AdamOptimizer(
learning_rate=args.learning_rate,
lazy_mode=True)
optimizer.minimize(avg_cost)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
if args.load_model:
# 从paddle二进制模型加载参数
path = args.init_model_files_path
fluid.io.load_persistables(
executor=exe,
dirname=path,
main_program=fluid.default_main_program())
lr = fluid.global_scope().find_var("learning_rate_0").get_tensor()
lr.set(np.array(args.learning_rate).astype('float32'), place)
logger.info("Load persistables from \"{}\"".format(path))
else:
# 将明文树结构及数据,set到组网中的Variale中
# 不使用NumpyInitialize方法是考虑到树结构相关数据size过大,有性能风险
Numpy_model = {}
Numpy_model['TDM_Tree_Travel'] = tdm_model.tdm_param_prepare_dict['travel_array']
Numpy_model['TDM_Tree_Layer'] = tdm_model.tdm_param_prepare_dict['layer_array']
Numpy_model['TDM_Tree_Info'] = tdm_model.tdm_param_prepare_dict['info_array']
Numpy_model['TDM_Tree_Emb'] = tdm_model.tdm_param_prepare_dict['emb_array']
for param_name in Numpy_model:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
if param_name == 'TDM_Tree_Emb':
param_t.set(Numpy_model[str(param_name)
].astype('float32'), place)
else:
param_t.set(Numpy_model[str(param_name)
].astype('int32'), place)
if args.save_init_model or not args.load_model:
logger.info("Begin Save Init model.")
model_path = os.path.join(args.model_files_path, "init_model")
fluid.io.save_persistables(executor=exe, dirname=model_path)
logger.info("End Save Init model.")
logger.info("TDM Local training begin ...")
for epoch in range(args.epoch_num):
start_time = time.time()
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_list=[acc, avg_cost],
fetch_info=["Epoch {} acc".format(
epoch), "Epoch {} loss".format(epoch)],
print_period=1,
debug=False,
)
end_time = time.time()
logger.info("Epoch %d finished, use time=%d sec\n" %
(epoch, end_time - start_time))
model_path = os.path.join(args.model_files_path, "epoch_" + str(epoch))
fluid.io.save_persistables(executor=exe, dirname=model_path)
logger.info("Local training success!")
if __name__ == "__main__":
args = parse_args()
print_arguments(args)
run_train(args)
# -*- coding: utf-8 -*-
"""
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import os
import argparse
import numpy as np
import argparse
import paddle.fluid as fluid
from paddle.fluid.core import PaddleTensor
from paddle.fluid.core import AnalysisConfig
from paddle.fluid.core import create_paddle_predictor
from dataset_generator import TDMDataset
from infer_network import TdmInferNet
from args import print_arguments, parse_args
import logging
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def to_tensor(data):
"""
Convert data to paddle tensor
"""
flattened_data = np.concatenate(data, axis=0).astype("float32")
flattened_data = flattened_data.reshape([-1, 768])
return flattened_data
def data2tensor(data):
"""
Dataset prepare
"""
input_emb = to_tensor([x[0] for x in data])
return input_emb
def tdm_input(input_emb, first_layer_node, first_layer_mask):
"""
Create input of tdm pred
"""
input_emb = PaddleTensor(input_emb)
first_layer_node = PaddleTensor(first_layer_node)
first_layer_mask = PaddleTensor(first_layer_mask)
return [input_emb, first_layer_node, first_layer_mask]
def main():
"""Predictor main"""
args = parse_args()
config = AnalysisConfig(args.model_files_path)
config.disable_gpu()
config.enable_profile()
# config.enable_mkldnn()
config.set_cpu_math_library_num_threads(args.cpu_num)
predictor = create_paddle_predictor(config)
tdm_model = TdmInferNet(args)
first_layer_node = tdm_model.first_layer_node
first_layer_nums = len(first_layer_node)
first_layer_node = np.array(first_layer_node)
first_layer_node = first_layer_node.reshape((1, -1)).astype('int64')
first_layer_node = first_layer_node.repeat(args.batch_size, axis=0)
first_layer_mask = (
np.zeros((args.batch_size, first_layer_nums))).astype('int64')
file_list = [
str(args.test_files_path) + "/%s" % x
for x in os.listdir(args.test_files_path)
]
test_reader = TDMDataset().infer_reader(file_list, args.batch_size)
for batch_id, data in enumerate(test_reader()):
input_emb = data2tensor(data)
inputs = tdm_input(input_emb, first_layer_node, first_layer_mask)
outputs = predictor.run(inputs)
output = outputs[0]
output_data = output.as_ndarray()
logger.info("TEST --> batch: {} infer_item {}".format(
batch_id, output_data))
if __name__ == "__main__":
main()
DIRS=`pwd`
data_path="${DIRS}/data"
train_files_path="${data_path}/train"
test_files_path="${data_path}/test"
model_files_path="${DIRS}/model"
thirdparty_path="${DIRS}/thirdparty"
tree_travel_init_path="${thirdparty_path}/travel_list.txt"
tree_layer_init_path="${thirdparty_path}/layer_list.txt"
tree_info_init_path="${thirdparty_path}/tree_info.txt"
export GLOG_v=0
function main() {
cmd="python local_infer.py \
--is_local=1 \
--cpu_num=1 \
--batch_size=1 \
--topK=1 \
--is_test=1 \
--save_init_model=1 \
--train_files_path=${train_files_path} \
--test_files_path=${test_files_path} \
--model_files_path=${model_files_path} \
--tree_travel_init_path=${tree_travel_init_path} \
--tree_info_init_path=${tree_info_init_path} \
--tree_layer_init_path=${tree_layer_init_path} "
${cmd}
}
main "$@"
DIRS=`pwd`
data_path="${DIRS}/data"
train_files_path="${data_path}/train"
test_files_path="${data_path}/test"
model_files_path="${DIRS}/model"
thirdparty_path="${DIRS}/thirdparty"
tree_travel_init_path="${thirdparty_path}/travel_list.txt"
tree_layer_init_path="${thirdparty_path}/layer_list.txt"
tree_info_init_path="${thirdparty_path}/tree_info.txt"
export GLOG_v=0
function main() {
cmd="python predict.py \
--is_local=1 \
--cpu_num=1 \
--batch_size=1 \
--topK=1 \
--is_test=1 \
--save_init_model=1 \
--train_files_path=${train_files_path} \
--test_files_path=${test_files_path} \
--model_files_path=${model_files_path} \
--tree_travel_init_path=${tree_travel_init_path} \
--tree_info_init_path=${tree_info_init_path} \
--tree_layer_init_path=${tree_layer_init_path} "
${cmd}
}
main "$@"
DIRS=`pwd`
data_path="${DIRS}/data"
train_files_path="${data_path}/train"
test_files_path="${data_path}/test"
model_files_path="${DIRS}/model"
thirdparty_path="${DIRS}/thirdparty"
tree_travel_init_path="${thirdparty_path}/travel_list.txt"
tree_layer_init_path="${thirdparty_path}/layer_list.txt"
tree_info_init_path="${thirdparty_path}/tree_info.txt"
export GLOG_v=0
function main() {
cmd="python local_train.py \
--is_local=1 \
--save_init_model=1 \
--load_model=0 \
--cpu_num=1 \
--random_seed=0 \
--epoch_num=1 \
--batch_size=32 \
--learning_rate=3e-4 \
--train_files_path=${train_files_path} \
--model_files_path=${model_files_path} \
--tree_travel_init_path=${tree_travel_init_path} \
--tree_info_init_path=${tree_info_init_path} \
--tree_layer_init_path=${tree_layer_init_path} "
${cmd}
}
main "$@"
1,2
3,4,5,6
7,8,9,10,11,12,13
14,15,16,17,18,19,20,21,22,23,24,25
\ No newline at end of file
1,3,7,14
1,3,7,15
1,3,8,16
1,3,8,17
1,4,9,18
1,4,9,19
1,4,10,20
1,4,10,21
2,5,11,22
2,5,11,23
2,5,12,24
2,5,12,25
2,6,13,0
\ No newline at end of file
此差异已折叠。
0,0,0,1,2
0,1,0,3,4
0,1,0,5,6
0,2,1,7,8
0,2,1,9,10
0,2,2,11,12
0,2,2,13,0
0,3,3,14,15
0,3,3,16,17
0,3,4,18,19
0,3,4,20,21
0,3,5,22,23
0,3,5,24,25
12,3,6,0,0
0,4,7,0,0
1,4,7,0,0
2,4,8,0,0
3,4,8,0,0
4,4,9,0,0
5,4,9,0,0
6,4,10,0,0
7,4,10,0,0
8,4,11,0,0
9,4,11,0,0
10,4,12,0,0
11,4,12,0,0
\ No newline at end of file
# -*- coding=utf-8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import math
import argparse
import numpy as np
import logging
import paddle.fluid as fluid
from utils import tdm_sampler_prepare, tdm_child_prepare, tdm_emb_prepare, trace_var
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class TdmTrainNet(object):
"""
TDM-Demo网络的主要流程部分
"""
def __init__(self, args):
self.input_embed_size = args.query_emb_size
self.node_emb_size = args.node_emb_size
self.label_nums = 2
self.node_nums = args.node_nums
self.max_layers = args.layer_size
self.neg_sampling_list = args.neg_sampling_list
self.output_positive = True
self.get_tree_info(args)
# 设置是否需要进行数值的debug
self.need_trace = args.need_trace
self.need_detail = args.need_detail
if not args.load_model:
# 每次模型训练仅需运行一次,灌入生成的明文,拿到树结构信息
# 将其保存为paddle的二进制init_model
# 下一次调试或训练即可load init_model,快速启动且内存占用更小
self.tdm_param_prepare_dict = tdm_sampler_prepare(args)
logger.info("--Layer node num list--: {}".format(
self.tdm_param_prepare_dict['layer_node_num_list']))
self.layer_node_num_list = self.tdm_param_prepare_dict['layer_node_num_list']
logger.info("--Leaf node num--: {}".format(
self.tdm_param_prepare_dict['leaf_node_num']))
self.leaf_node_num = self.tdm_param_prepare_dict['leaf_node_num']
self.tdm_param_prepare_dict['info_array'] = tdm_child_prepare(
args)
logger.info(
"--Tree Info array shape {}--".format(self.tdm_param_prepare_dict['info_array'].shape))
self.tdm_param_prepare_dict['emb_array'] = tdm_emb_prepare(args)
logger.info(
"--Tree Emb array shape {}--".format(self.tdm_param_prepare_dict['emb_array'].shape))
else:
self.layer_node_num_list = args.layer_node_num_list
self.leaf_node_num = args.leaf_node_num
self.input_trans_layer = InputTransNet(args)
self.layer_classifier = DnnLayerClassifierNet(args)
def get_tree_info(self, args):
"""
TDM_Tree_Info 虽然在训练过程中没有用到,但在预测网络中会使用。
如果希望保存的模型直接用来预测,不再有额外的生成tree_info参数的步骤,
则可以在训练组网中添加tree_info参数,训练保存模型时可以进行保存。
"""
fluid.default_startup_program().global_block().create_var(
name="TDM_Tree_Info", dtype=fluid.core.VarDesc.VarType.INT32, shape=[args.node_nums, 3 + args.child_nums],
persistable=True,
initializer=fluid.initializer.ConstantInitializer(0))
tdm_tree_info = fluid.default_main_program().global_block().create_var(
name="TDM_Tree_Info", dtype=fluid.core.VarDesc.VarType.INT32, shape=[args.node_nums, 3 + args.child_nums],
persistable=True)
def input_data(self):
"""
指定tdm训练网络的输入变量
"""
input_emb = fluid.data(
name="input_emb",
shape=[None, self.input_embed_size],
dtype="float32",
)
item_label = fluid.data(
name="item_label",
shape=[None, 1],
dtype="int64",
)
inputs = [input_emb] + [item_label]
return inputs
def tdm(self, inputs):
"""
tdm训练网络的主要流程部分
"""
input_emb = inputs[0]
item_label = inputs[1]
# trace_var用于在静态图的调试中打印参数信息细节:
# 将 need_trace设置为True,可以在日志中看到参数的前向信息(数值默认前20个)
# 将 need_detail设置为True,可以在日志中看到参数的前向全部数值
trace_var(input_emb, "[TDM][inputs]", "input_emb",
self.need_trace, self.need_detail)
trace_var(item_label, "[TDM][inputs]",
"item_label", self.need_trace, self.need_detail)
# 根据输入的item的正样本在给定的树上进行负采样
# sample_nodes 是采样的node_id的结果,包含正负样本
# sample_label 是采样的node_id对应的正负标签
# sample_mask 是为了保持tensor维度一致,padding部分的标签,若为0,则是padding的虚拟node_id
sample_nodes, sample_label, sample_mask = fluid.contrib.layers.tdm_sampler(
x=item_label,
neg_samples_num_list=self.neg_sampling_list,
layer_node_num_list=self.layer_node_num_list,
leaf_node_num=self.leaf_node_num,
tree_travel_attr=fluid.ParamAttr(name="TDM_Tree_Travel"),
tree_layer_attr=fluid.ParamAttr(name="TDM_Tree_Layer"),
output_positive=self.output_positive,
output_list=True,
seed=0,
tree_dtype='int64',
dtype='int64'
)
trace_var(sample_nodes, "[TDM][tdm_sample]",
"sample_nodes", self.need_trace, self.need_detail)
trace_var(sample_label, "[TDM][tdm_sample]",
"sample_label", self.need_trace, self.need_detail)
trace_var(sample_mask, "[TDM][tdm_sample]",
"sample_mask", self.need_trace, self.need_detail)
# 查表得到每个节点的Embedding
sample_nodes_emb = [
fluid.embedding(
input=sample_nodes[i],
is_sparse=True,
size=[self.node_nums, self.node_emb_size],
param_attr=fluid.ParamAttr(
name="TDM_Tree_Emb")
) for i in range(self.max_layers)
]
# 此处进行Reshape是为了之后层次化的分类器训练
sample_nodes_emb = [
fluid.layers.reshape(sample_nodes_emb[i],
[-1, self.neg_sampling_list[i] +
self.output_positive, self.node_emb_size]
) for i in range(self.max_layers)
]
trace_var(sample_nodes_emb, "[TDM][tdm_sample]",
"sample_nodes_emb", self.need_trace, self.need_detail)
# 对输入的input_emb进行转换,使其维度与node_emb维度一致
input_trans_emb = self.input_trans_layer.input_trans_layer(input_emb)
trace_var(input_trans_emb, "[TDM][input_trans]",
"input_trans_emb", self.need_trace, self.need_detail)
# 分类器的主体网络,分别训练不同层次的分类器
layer_classifier_res = self.layer_classifier.classifier_layer(
input_trans_emb, sample_nodes_emb)
trace_var(layer_classifier_res, "[TDM][classifier_layer]",
"layer_classifier_res", self.need_trace, self.need_detail)
# 最后的概率判别FC,将所有层次的node分类结果放到一起以相同的标准进行判别
# 考虑到树极大可能不平衡,有些item不在最后一层,所以需要这样的机制保证每个item都有机会被召回
tdm_fc = fluid.layers.fc(input=layer_classifier_res,
size=self.label_nums,
act=None,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(
name="tdm.cls_fc.weight"),
bias_attr=fluid.ParamAttr(name="tdm.cls_fc.bias"))
trace_var(tdm_fc, "[TDM][cls_fc]", "tdm_fc",
self.need_trace, self.need_detail)
# 将loss打平,放到一起计算整体网络的loss
tdm_fc_re = fluid.layers.reshape(tdm_fc, [-1, 2])
# 若想对各个层次的loss辅以不同的权重,则在此处无需concat
# 支持各个层次分别计算loss,再乘相应的权重
sample_label = fluid.layers.concat(sample_label, axis=1)
sample_label.stop_gradient = True
labels_reshape = fluid.layers.reshape(sample_label, [-1, 1])
# 计算整体的loss并得到softmax的输出
cost, softmax_prob = fluid.layers.softmax_with_cross_entropy(
logits=tdm_fc_re, label=labels_reshape, return_softmax=True)
# 通过mask过滤掉虚拟节点的loss
sample_mask = fluid.layers.concat(sample_mask, axis=1)
sample_mask.stop_gradient = True
mask_reshape = fluid.layers.reshape(sample_mask, [-1, 1])
mask_index = fluid.layers.where(mask_reshape != 0)
mask_cost = fluid.layers.gather_nd(cost, mask_index)
# 计算该batch的均值loss,同时计算acc, 亦可在这里计算auc
avg_cost = fluid.layers.reduce_mean(mask_cost)
acc = fluid.layers.accuracy(input=softmax_prob, label=labels_reshape)
return avg_cost, acc
class InputTransNet(object):
"""
输入侧组网的主要部分
"""
def __init__(self, args):
self.node_emb_size = args.node_emb_size
self.max_layers = args.layer_size
self.is_test = args.is_test
def input_trans_layer(self, input_emb):
"""
输入侧训练组网
"""
# 将input映射到与node相同的维度
input_fc_out = fluid.layers.fc(
input=input_emb,
size=self.node_emb_size,
act=None,
param_attr=fluid.ParamAttr(name="trans.input_fc.weight"),
bias_attr=fluid.ParamAttr(name="trans.input_fc.bias"),
)
# 将input_emb映射到各个不同层次的向量表示空间
input_layer_fc_out = [
fluid.layers.fc(
input=input_fc_out,
size=self.node_emb_size,
act="tanh",
param_attr=fluid.ParamAttr(
name="trans.layer_fc.weight." + str(i)),
bias_attr=fluid.ParamAttr(name="trans.layer_fc.bias."+str(i)),
) for i in range(self.max_layers)
]
return input_layer_fc_out
def input_fc_infer(self, input_emb):
"""
输入侧预测组网第一部分,将input转换为node同维度
"""
# 组网与训练时保持一致
input_fc_out = fluid.layers.fc(
input=input_emb,
size=self.node_emb_size,
act=None,
param_attr=fluid.ParamAttr(name="trans.input_fc.weight"),
bias_attr=fluid.ParamAttr(name="trans.input_fc.bias"),
)
return input_fc_out
def layer_fc_infer(self, input_fc_out, layer_idx):
"""
输入侧预测组网第二部分,将input映射到不同层次的向量空间
"""
# 组网与训练保持一致,通过layer_idx指定不同层的FC
input_layer_fc_out = fluid.layers.fc(
input=input_fc_out,
size=self.node_emb_size,
act="tanh",
param_attr=fluid.ParamAttr(
name="trans.layer_fc.weight." + str(layer_idx)),
bias_attr=fluid.ParamAttr(
name="trans.layer_fc.bias."+str(layer_idx)),
)
return input_layer_fc_out
class DnnLayerClassifierNet(object):
"""
层次分类器的主要部分
"""
def __init__(self, args):
self.node_emb_size = args.node_emb_size
self.max_layers = args.layer_size
self.neg_sampling_list = args.neg_sampling_list
self.output_positive = True
self.is_test = args.is_test
self.child_nums = args.child_nums
def _expand_layer(self, input_layer, node, layer_idx):
# 扩展input的输入,使数量与node一致,
# 也可以以其他broadcast的操作进行代替
# 同时兼容了训练组网与预测组网
input_layer_unsequeeze = fluid.layers.unsqueeze(
input=input_layer, axes=[1])
if self.is_test:
input_layer_expand = fluid.layers.expand(
input_layer_unsequeeze, expand_times=[1, node.shape[1], 1])
else:
input_layer_expand = fluid.layers.expand(
input_layer_unsequeeze, expand_times=[1, node[layer_idx].shape[1], 1])
return input_layer_expand
def classifier_layer(self, input, node):
# 扩展input,使维度与node匹配
input_expand = [
self._expand_layer(input[i], node, i) for i in range(self.max_layers)
]
# 将input_emb与node_emb concat到一起过分类器FC
input_node_concat = [
fluid.layers.concat(
input=[input_expand[i], node[i]],
axis=2) for i in range(self.max_layers)
]
hidden_states_fc = [
fluid.layers.fc(
input=input_node_concat[i],
size=self.node_emb_size,
num_flatten_dims=2,
act="tanh",
param_attr=fluid.ParamAttr(
name="cls.concat_fc.weight."+str(i)),
bias_attr=fluid.ParamAttr(name="cls.concat_fc.bias."+str(i))
) for i in range(self.max_layers)
]
# 如果将所有层次的node放到一起计算loss,则需要在此处concat
# 将分类器结果以batch为准绳concat到一起,而不是layer
# 维度形如[batch_size, total_node_num, node_emb_size]
hidden_states_concat = fluid.layers.concat(hidden_states_fc, axis=1)
return hidden_states_concat
def classifier_layer_infer(self, input, node, layer_idx):
# 为infer组网提供的简化版classifier,通过给定layer_idx调用不同层的分类器
# 同样需要保持input与node的维度匹配
input_expand = self._expand_layer(input, node, layer_idx)
# 与训练网络相同的concat逻辑
input_node_concat = fluid.layers.concat(
input=[input_expand, node], axis=2)
# 根据参数名param_attr调用不同的层的FC
hidden_states_fc = fluid.layers.fc(
input=input_node_concat,
size=self.node_emb_size,
num_flatten_dims=2,
act="tanh",
param_attr=fluid.ParamAttr(
name="cls.concat_fc.weight."+str(layer_idx)),
bias_attr=fluid.ParamAttr(name="cls.concat_fc.bias."+str(layer_idx)))
return hidden_states_fc
# -*- coding=utf-8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import os
import json
import argparse
import logging
import numpy as np
import paddle.fluid as fluid
from args import print_arguments, parse_args
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def read_list(path):
list = []
with open(path, 'r') as fin:
for line in fin.readlines():
data = (line.split('\n'))[0].split(',')
data = [int(i) for i in data]
list.append(data)
return list
def read_list_float(path):
list = []
with open(path, 'r') as fin:
for line in fin.readlines():
data = (line.split('\n'))[0].split(',')
data = [float(i) for i in data]
list.append(data)
return list
def read_layer_list(path):
layer_list = []
layer_list_flat = []
with open(path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
layer = [int(i) for i in layer]
for node in layer:
if node:
l.append(node)
layer_list_flat.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
return layer_list, layer_array
def tdm_sampler_prepare(args):
"""load tdm tree param from list file"""
prepare_dict = {}
travel_list = read_list(args.tree_travel_init_path)
travel_array = np.array(travel_list)
prepare_dict['travel_array'] = travel_array
leaf_num = len(travel_list)
prepare_dict['leaf_node_num'] = leaf_num
layer_list, layer_array = read_layer_list(args.tree_layer_init_path)
prepare_dict['layer_array'] = layer_array
layer_node_num_list = [len(i) for i in layer_list]
prepare_dict['layer_node_num_list'] = layer_node_num_list
node_num = int(np.sum(layer_node_num_list))
prepare_dict['node_num'] = node_num
return prepare_dict
def tdm_child_prepare(args):
"""load tdm tree param from list file"""
info_list = read_list(args.tree_info_init_path)
info_array = np.array(info_list)
return info_array
def tdm_emb_prepare(args):
"""load tdm tree emb from list file"""
emb_list = read_list_float(args.tree_emb_init_path)
emb_array = np.array(emb_list)
return emb_array
def trace_var(var, msg_prefix, var_name, need_trace=False, need_detail=False):
"""trace var and its value detail"""
summarize_level = 20
if need_detail:
summarize_level = -1
if need_trace:
if isinstance(var, list):
for i, v in enumerate(var):
fluid.layers.Print(v,
message="{}[{}.{}]".format(
msg_prefix, var_name, i),
summarize=summarize_level)
else:
fluid.layers.Print(var, message="{}[{}]".format(
msg_prefix, var_name), summarize=summarize_level)
if __name__ == "__main__":
args = parse_args()
print_arguments(args)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册