提交 05b1b890 编写于 作者: M malin10

Merge branch 'develop' of ssh://gitlab.baidu.com:8022/tangwei12/paddlerec into infer_dssm_w2v

文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
......@@ -20,13 +20,13 @@ import os
import copy
from fleetrec.core.engine.engine import Engine
from fleetrec.core.utils import envs
class LocalClusterEngine(Engine):
def start_procs(self):
worker_num = self.envs["worker_num"]
server_num = self.envs["server_num"]
start_port = self.envs["start_port"]
ports = [self.envs["start_port"]]
logs_dir = self.envs["log_dir"]
default_env = os.environ.copy()
......@@ -36,7 +36,13 @@ class LocalClusterEngine(Engine):
current_env.pop("https_proxy", None)
procs = []
log_fns = []
ports = range(start_port, start_port + server_num, 1)
for i in range(server_num - 1):
while True:
new_port = envs.find_free_port()
if new_port not in ports:
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
user_endpoints_ips = [x.split(":")[0] for x in user_endpoints.split(",")]
user_endpoints_port = [x.split(":")[1] for x in user_endpoints.split(",")]
......
文件模式从 100644 更改为 100755
......@@ -19,15 +19,24 @@ import yaml
from fleetrec.core.utils import envs
trainer_abs = os.path.join(os.path.dirname(os.path.abspath(__file__)), "trainers")
trainer_abs = os.path.join(os.path.dirname(
os.path.abspath(__file__)), "trainers")
trainers = {}
def trainer_registry():
trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(trainer_abs, "cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, "ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(trainer_abs, "ctr_modul_trainer.py")
trainers["SingleTrainer"] = os.path.join(
trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(
trainer_abs, "cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(
trainer_abs, "ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(
trainer_abs, "ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(
trainer_abs, "tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(
trainer_abs, "tdm_cluster_trainer.py")
trainer_registry()
......@@ -46,7 +55,8 @@ class TrainerFactory(object):
if trainer_abs is None:
if not os.path.isfile(train_mode):
raise IOError("trainer {} can not be recognized".format(train_mode))
raise IOError(
"trainer {} can not be recognized".format(train_mode))
trainer_abs = train_mode
train_mode = "UserDefineTrainer"
......
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
......@@ -43,6 +43,12 @@ class Model(object):
"""
return self._metrics
def custom_preprocess(self):
"""
do something after exe.run(stratup_program) and before run()
"""
pass
def get_fetch_period(self):
return self._fetch_interval
......@@ -50,23 +56,30 @@ class Model(object):
name = name.upper()
optimizers = ["SGD", "ADAM", "ADAGRAD"]
if name not in optimizers:
raise ValueError("configured optimizer can only supported SGD/Adam/Adagrad")
raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
if name == "SGD":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
reg = envs.get_global_env(
"hyper_parameters.reg", 0.0001, self._namespace)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
elif name == "ADAGRAD":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
optimizer_i = fluid.optimizer.Adagrad(lr)
else:
raise ValueError("configured optimizer can only supported SGD/Adam/Adagrad")
raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
return optimizer_i
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace)
optimizer = envs.get_global_env("hyper_parameters.optimizer", None, self._namespace)
learning_rate = envs.get_global_env(
"hyper_parameters.learning_rate", None, self._namespace)
optimizer = envs.get_global_env(
"hyper_parameters.optimizer", None, self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate)
@abc.abstractmethod
......
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
......@@ -95,5 +95,6 @@ def user_define_engine(engine_yaml):
train_dirname = os.path.dirname(train_location)
base_name = os.path.splitext(os.path.basename(train_location))[0]
sys.path.append(train_dirname)
trainer_class = envs.lazy_instance_by_fliename(base_name, "UserDefineTraining")
trainer_class = envs.lazy_instance_by_fliename(
base_name, "UserDefineTraining")
return trainer_class
......@@ -18,6 +18,7 @@ Training use fluid with one node only.
from __future__ import print_function
import os
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
......@@ -39,11 +40,12 @@ class ClusterTrainer(TranspileTrainer):
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
if envs.get_platform() == "LINUX":
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
self.regist_context_processor(
'train_pass', self.dataloader_train)
self.regist_context_processor('terminal_pass', self.terminal)
def build_strategy(self):
......@@ -70,6 +72,11 @@ class ClusterTrainer(TranspileTrainer):
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer_name = envs.get_global_env(
"hyper_parameters.optimizer", None, "train.model")
if optimizer_name not in ["", "sgd", "SGD", "Sgd"]:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_cost_op())
......@@ -85,16 +92,18 @@ class ClusterTrainer(TranspileTrainer):
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'train_pass'
context['status'] = 'startup_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['is_exit'] = True
def dataloader_train(self, context):
def startup(self, context):
self._exe.run(fleet.startup_program)
context['status'] = 'train_pass'
def dataloader_train(self, context):
fleet.init_worker()
reader = self._get_dataloader()
......@@ -103,8 +112,8 @@ class ClusterTrainer(TranspileTrainer):
program = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel(
loss_name=self.model.get_cost_op().name,
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
metrics_varnames = []
metrics_format = []
......@@ -140,7 +149,6 @@ class ClusterTrainer(TranspileTrainer):
context['status'] = 'terminal_pass'
def dataset_train(self, context):
self._exe.run(fleet.startup_program)
fleet.init_worker()
dataset = self._get_dataset()
......
......@@ -33,8 +33,8 @@ class SingleTrainer(TranspileTrainer):
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
if envs.get_platform() == "LINUX":
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
......@@ -55,10 +55,13 @@ class SingleTrainer(TranspileTrainer):
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def startup(self, context):
self._exe.run(fluid.default_startup_program())
context['status'] = 'train_pass'
def dataloader_train(self, context):
self._exe.run(fluid.default_startup_program())
reader = self._get_dataloader("TRAIN")
epochs = envs.get_global_env("train.epochs")
......@@ -100,8 +103,6 @@ class SingleTrainer(TranspileTrainer):
context['status'] = 'infer_pass'
def dataset_train(self, context):
# run startup program at once
self._exe.run(fluid.default_startup_program())
dataset = self._get_dataset("TRAIN")
epochs = envs.get_global_env("train.epochs")
......
# -*- coding=utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from fleetrec.core.utils import envs
from fleetrec.core.trainers.cluster_trainer import ClusterTrainer
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"]
class TDMClusterTrainer(ClusterTrainer):
def server(self, context):
namespace = "train.startup"
init_model_path = envs.get_global_env(
"cluster.init_model_path", "", namespace)
assert init_model_path != "", "Cluster train must has init_model for TDM"
fleet.init_server(init_model_path)
logger.info("TDM: load model from {}".format(init_model_path))
fleet.run_server()
context['is_exit'] = True
def startup(self, context):
self._exe.run(fleet.startup_program)
namespace = "train.startup"
load_tree = envs.get_global_env(
"tree.load_tree", True, namespace)
self.tree_layer_path = envs.get_global_env(
"tree.tree_layer_path", "", namespace)
self.tree_travel_path = envs.get_global_env(
"tree.tree_travel_path", "", namespace)
self.tree_info_path = envs.get_global_env(
"tree.tree_info_path", "", namespace)
save_init_model = envs.get_global_env(
"cluster.save_init_model", False, namespace)
init_model_path = envs.get_global_env(
"cluster.init_model_path", "", namespace)
if load_tree:
# 将明文树结构及数据,set到组网中的Variale中
# 不使用NumpyInitialize方法是考虑到树结构相关数据size过大,有性能风险
for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
param_array = self.tdm_prepare(param_name)
param_t.set(param_array.astype('int32'), self._place)
if save_init_model:
logger.info("Begin Save Init model.")
fluid.io.save_persistables(
executor=self._exe, dirname=init_model_path)
logger.info("End Save Init model.")
context['status'] = 'train_pass'
def tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self.tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self.tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self.tdm_info_prepare()
return info_array
else:
raise " {} is not a special tdm param name".format(param_name)
def tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(
travel_array.shape[0]))
return travel_array
def tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
# -*- coding=utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import paddle.fluid as fluid
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
from fleetrec.core.trainers.single_trainer import SingleTrainer
from fleetrec.core.utils import envs
import numpy as np
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer",
"TDM_Tree_Info", "TDM_Tree_Emb"]
class TDMSingleTrainer(SingleTrainer):
def startup(self, context):
namespace = "train.startup"
load_persistables = envs.get_global_env(
"single.load_persistables", False, namespace)
persistables_model_path = envs.get_global_env(
"single.persistables_model_path", "", namespace)
load_tree = envs.get_global_env(
"tree.load_tree", False, namespace)
self.tree_layer_path = envs.get_global_env(
"tree.tree_layer_path", "", namespace)
self.tree_travel_path = envs.get_global_env(
"tree.tree_travel_path", "", namespace)
self.tree_info_path = envs.get_global_env(
"tree.tree_info_path", "", namespace)
self.tree_emb_path = envs.get_global_env(
"tree.tree_emb_path", "", namespace)
save_init_model = envs.get_global_env(
"single.save_init_model", False, namespace)
init_model_path = envs.get_global_env(
"single.init_model_path", "", namespace)
self._exe.run(fluid.default_startup_program())
if load_persistables:
# 从paddle二进制模型加载参数
fluid.io.load_persistables(
executor=self._exe,
dirname=persistables_model_path,
main_program=fluid.default_main_program())
logger.info("Load persistables from \"{}\"".format(
persistables_model_path))
if load_tree:
# 将明文树结构及数据,set到组网中的Variale中
# 不使用NumpyInitialize方法是考虑到树结构相关数据size过大,有性能风险
for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
param_array = self.tdm_prepare(param_name)
if param_name == 'TDM_Tree_Emb':
param_t.set(param_array.astype('float32'), self._place)
else:
param_t.set(param_array.astype('int32'), self._place)
if save_init_model:
logger.info("Begin Save Init model.")
fluid.io.save_persistables(
executor=self._exe, dirname=init_model_path)
logger.info("End Save Init model.")
context['status'] = 'train_pass'
def tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self.tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self.tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self.tdm_info_prepare()
return info_array
elif param_name == "TDM_Tree_Emb":
emb_array = self.tdm_emb_prepare()
return emb_array
else:
raise " {} is not a special tdm param name".format(param_name)
def tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(
travel_array.shape[0]))
return travel_array
def tdm_emb_prepare(self):
"""load tdm tree param from npy/list file"""
emb_array = np.load(self.tree_emb_path)
logger.info("TDM Tree node nums from emb: {}".format(
emb_array.shape[0]))
return emb_array
def tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
......@@ -48,7 +48,13 @@ class TranspileTrainer(Trainer):
reader_class = envs.get_global_env("class", None, namespace)
reader = dataloader_instance.dataloader(reader_class, state, self._config_yaml)
dataloader.set_sample_generator(reader, batch_size)
reader_class = envs.lazy_instance_by_fliename(reader_class, "TrainReader")
reader_ins = reader_class(self._config_yaml)
if hasattr(reader_ins,'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def _get_dataset(self, state):
......
文件模式从 100644 更改为 100755
......@@ -57,4 +57,9 @@ def dataloader(readerclass, train, yaml_file):
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
文件模式从 100644 更改为 100755
......@@ -15,7 +15,8 @@
import os
import copy
import sys
import socket
from contextlib import closing
global_envs = {}
......@@ -170,3 +171,12 @@ def get_platform():
return "DARWIN"
if 'Windows' in plats:
return "WINDOWS"
def find_free_port():
def __free_port():
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
new_port = __free_port()
return new_port
......@@ -10,6 +10,8 @@ from fleetrec.core.utils import util
engines = {}
device = ["CPU", "GPU"]
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
custom_model = ['tdm']
model_name = ""
def engine_registry():
......@@ -28,13 +30,17 @@ def engine_registry():
engines["GPU"] = gpu
def get_engine(engine, device):
def get_engine(args):
device = args.device
d_engine = engines[device]
transpiler = get_transpiler()
engine = args.engine
run_engine = d_engine[transpiler].get(engine, None)
if run_engine is None:
raise ValueError("engine {} can not be supported on device: {}".format(engine, device))
raise ValueError(
"engine {} can not be supported on device: {}".format(engine, device))
return run_engine
......@@ -77,15 +83,21 @@ def set_runtime_envs(cluster_envs, engine_yaml):
print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
def single_engine(args):
print("use single engine to run model: {}".format(args.model))
def get_trainer_prefix(args):
if model_name in custom_model:
return model_name.upper()
return ""
def single_engine(args):
trainer = get_trainer_prefix(args) + "SingleTrainer"
single_envs = {}
single_envs["train.trainer.trainer"] = "SingleTrainer"
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single"
single_envs["train.trainer.device"] = args.device
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
......@@ -93,16 +105,15 @@ def single_engine(args):
def cluster_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model))
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.device"] = args.device
cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
......@@ -122,15 +133,15 @@ def cluster_mpi_engine(args):
def local_cluster_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model))
from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
cluster_envs["server_num"] = 1
cluster_envs["worker_num"] = 1
cluster_envs["start_port"] = 36001
cluster_envs["start_port"] = envs.find_free_port()
cluster_envs["log_dir"] = "logs"
cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.strategy"] = "async"
cluster_envs["train.trainer.threads"] = "2"
cluster_envs["train.trainer.engine"] = "local_cluster"
......@@ -139,9 +150,9 @@ def local_cluster_engine(args):
cluster_envs["train.trainer.platform"] = envs.get_platform()
cluster_envs["CPU_NUM"] = "2"
print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
launch = LocalClusterEngine(cluster_envs, args.model)
return launch
......@@ -184,8 +195,11 @@ def get_abs_model(model):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='fleet-rec run')
parser.add_argument("-m", "--model", type=str)
parser.add_argument("-e", "--engine", type=str, choices=["single", "local_cluster", "cluster"])
parser.add_argument("-d", "--device", type=str, choices=["cpu", "gpu"], default="cpu")
parser.add_argument("-e", "--engine", type=str,
choices=["single", "local_cluster", "cluster",
"tdm_single", "tdm_local_cluster", "tdm_cluster"])
parser.add_argument("-d", "--device", type=str,
choices=["cpu", "gpu"], default="cpu")
abs_dir = os.path.dirname(os.path.abspath(__file__))
envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})
......@@ -193,10 +207,11 @@ if __name__ == "__main__":
args = parser.parse_args()
args.engine = args.engine.upper()
args.device = args.device.upper()
model_name = args.model.split('.')[-1]
args.model = get_abs_model(args.model)
engine_registry()
which_engine = get_engine(args.engine, args.device)
which_engine = get_engine(args)
engine = which_engine(args)
engine.run()
文件模式从 100644 更改为 100755
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "fleetrec.models.rank.dcn"
reader:
batch_size: 2
class: "{workspace}/criteo_reader.py"
train_data_path: "{workspace}/data/train"
feat_dict_name: "{workspace}/data/vocab"
model:
models: "{workspace}/model.py"
hyper_parameters:
cross_num: 2
dnn_hidden_units: [128, 128]
l2_reg_cross: 0.00005
dnn_use_bn: False
clip_by_norm: 100.0
cat_feat_num: "{workspace}/data/cat_feature_num.txt"
is_sparse: False
is_test: False
num_field: 39
learning_rate: 0.0001
act: "relu"
optimizer: adam
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import math
import sys
from fleetrec.core.reader import Reader
from fleetrec.core.utils import envs
try:
import cPickle as pickle
except ImportError:
import pickle
from collections import Counter
import os
class TrainReader(Reader):
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
5775, 257675, 65535, 969, 23159456, 431037, 56311, 6047, 29019, 11,
231, 4008, 7393
]
self.cont_diff_ = [
self.cont_max_[i] - self.cont_min_[i]
for i in range(len(self.cont_min_))
]
self.cont_idx_ = list(range(1, 14))
self.cat_idx_ = list(range(14, 40))
dense_feat_names = ['I' + str(i) for i in range(1, 14)]
sparse_feat_names = ['C' + str(i) for i in range(1, 27)]
target = ['label']
self.label_feat_names = target + dense_feat_names + sparse_feat_names
self.cat_feat_idx_dict_list = [{} for _ in range(26)]
# TODO: set vocabulary dictionary
vocab_dir = envs.get_global_env("feat_dict_name", None, "train.reader")
for i in range(26):
lookup_idx = 1 # remain 0 for default value
for line in open(
os.path.join(vocab_dir, 'C' + str(i + 1) + '.txt')):
self.cat_feat_idx_dict_list[i][line.strip()] = lookup_idx
lookup_idx += 1
def _process_line(self, line):
features = line.rstrip('\n').split('\t')
label_feat_list = [[] for _ in range(40)]
for idx in self.cont_idx_:
if features[idx] == '':
label_feat_list[idx].append(0)
else:
# 0-1 minmax norm
# label_feat_list[idx].append((float(features[idx]) - self.cont_min_[idx - 1]) /
# self.cont_diff_[idx - 1])
# log transform
label_feat_list[idx].append(
math.log(4 + float(features[idx]))
if idx == 2 else math.log(1 + float(features[idx])))
for idx in self.cat_idx_:
if features[idx] == '' or features[
idx] not in self.cat_feat_idx_dict_list[idx - 14]:
label_feat_list[idx].append(0)
else:
label_feat_list[idx].append(self.cat_feat_idx_dict_list[
idx - 14][features[idx]])
label_feat_list[0].append(int(features[0]))
return label_feat_list
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def data_iter():
label_feat_list = self._process_line(line)
yield list(zip(self.label_feat_names, label_feat_list))
return data_iter
\ No newline at end of file
import os
import sys
import io
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from fleetrec.tools.tools import download_file_and_uncompress
if __name__ == '__main__':
trainfile = 'train.txt'
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url)
print("download and extract finished")
count = 0
for _ in io.open(trainfile, 'r', encoding='utf-8'):
count += 1
print("total records: %d" % count)
print("done")
from __future__ import print_function, absolute_import, division
import os
import sys
from collections import Counter
import numpy as np
"""
preprocess Criteo train data, generate extra statistic files for model input.
"""
# input filename
FILENAME = 'train.200000.txt'
# global vars
CAT_FEATURE_NUM = 'cat_feature_num.txt'
INT_FEATURE_MINMAX = 'int_feature_minmax.txt'
VOCAB_DIR = 'vocab'
TRAIN_DIR = 'train'
TEST_VALID_DIR = 'test_valid'
SPLIT_RATIO = 0.9
FREQ_THR = 10
INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)]
CAT_COLUMN_NAMES = ['C' + str(i) for i in range(1, 27)]
def check_statfiles():
"""
check if statistic files of Criteo exists
:return:
"""
statsfiles = [CAT_FEATURE_NUM, INT_FEATURE_MINMAX] + [
os.path.join(VOCAB_DIR, cat_fn + '.txt') for cat_fn in CAT_COLUMN_NAMES
]
if all([os.path.exists(fn) for fn in statsfiles]):
return True
return False
def create_statfiles():
"""
create statistic files of Criteo, including:
min/max of interger features
counts of categorical features
vocabs of each categorical features
:return:
"""
int_minmax_list = [[sys.maxsize, -sys.maxsize]
for _ in range(13)] # count integer feature min max
cat_ct_list = [Counter() for _ in range(26)] # count categorical features
for idx, line in enumerate(open(FILENAME)):
spls = line.rstrip('\n').split('\t')
assert len(spls) == 40
for i in range(13):
if not spls[1 + i]: continue
int_val = int(spls[1 + i])
int_minmax_list[i][0] = min(int_minmax_list[i][0], int_val)
int_minmax_list[i][1] = max(int_minmax_list[i][1], int_val)
for i in range(26):
cat_ct_list[i].update([spls[14 + i]])
# save min max of integer features
with open(INT_FEATURE_MINMAX, 'w') as f:
for name, minmax in zip(INT_COLUMN_NAMES, int_minmax_list):
print("{} {} {}".format(name, minmax[0], minmax[1]), file=f)
# remove '' from all cat_set[i] and filter low freq categorical value
cat_set_list = [set() for i in range(len(cat_ct_list))]
for i, ct in enumerate(cat_ct_list):
if '' in ct: del ct['']
for key in list(ct.keys()):
if ct[key] >= FREQ_THR:
cat_set_list[i].add(key)
del cat_ct_list
# create vocab dir
if not os.path.exists(VOCAB_DIR):
os.makedirs(VOCAB_DIR)
# write vocab file of categorical features
with open(CAT_FEATURE_NUM, 'w') as cat_feat_count_file:
for name, s in zip(CAT_COLUMN_NAMES, cat_set_list):
print('{} {}'.format(name, len(s)), file=cat_feat_count_file)
vocabfile = os.path.join(VOCAB_DIR, name + '.txt')
with open(vocabfile, 'w') as f:
for vocab_val in s:
print(vocab_val, file=f)
def split_data():
"""
split train.txt into train and test_valid files.
:return:
"""
if not os.path.exists(TRAIN_DIR):
os.makedirs(TRAIN_DIR)
if not os.path.exists(TEST_VALID_DIR):
os.makedirs(TEST_VALID_DIR)
fin = open('train.200000.txt', 'r')
data_dir = TRAIN_DIR
fout = open(os.path.join(data_dir, 'part-0'), 'w')
split_idx = int(45840617 * SPLIT_RATIO)
for line_idx, line in enumerate(fin):
if line_idx == split_idx:
fout.close()
data_dir = TEST_VALID_DIR
cur_part_idx = int(line_idx / 200000)
fout = open(
os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w')
if line_idx % 200000 == 0 and line_idx != 0:
fout.close()
cur_part_idx = int(line_idx / 200000)
fout = open(
os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w')
fout.write(line)
fout.close()
fin.close()
if __name__ == '__main__':
if not check_statfiles():
print('create statstic files of Criteo...')
create_statfiles()
print('split train.200000.txt...')
split_data()
print('done')
import paddle.fluid as fluid
import math
from fleetrec.core.utils import envs
from fleetrec.core.model import Model as ModelBase
from collections import OrderedDict
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def init_network(self):
self.cross_num = envs.get_global_env("hyper_parameters.cross_num", None, self._namespace)
self.dnn_hidden_units = envs.get_global_env("hyper_parameters.dnn_hidden_units", None, self._namespace)
self.l2_reg_cross = envs.get_global_env("hyper_parameters.l2_reg_cross", None, self._namespace)
self.dnn_use_bn = envs.get_global_env("hyper_parameters.dnn_use_bn", None, self._namespace)
self.clip_by_norm = envs.get_global_env("hyper_parameters.clip_by_norm", None, self._namespace)
cat_feat_num = envs.get_global_env("hyper_parameters.cat_feat_num", None, self._namespace)
cat_feat_dims_dict = OrderedDict()
for line in open(cat_feat_num):
spls = line.strip().split()
assert len(spls) == 2
cat_feat_dims_dict[spls[0]] = int(spls[1])
self.cat_feat_dims_dict = cat_feat_dims_dict if cat_feat_dims_dict else OrderedDict(
)
self.is_sparse = envs.get_global_env("hyper_parameters.is_sparse", None, self._namespace)
self.dense_feat_names = ['I' + str(i) for i in range(1, 14)]
self.sparse_feat_names = ['C' + str(i) for i in range(1, 27)]
# {feat_name: dims}
self.feat_dims_dict = OrderedDict(
[(feat_name, 1) for feat_name in self.dense_feat_names])
self.feat_dims_dict.update(self.cat_feat_dims_dict)
self.net_input = None
self.loss = None
def _create_embedding_input(self, data_dict):
# sparse embedding
sparse_emb_dict = OrderedDict((name, fluid.embedding(
input=fluid.layers.cast(
data_dict[name], dtype='int64'),
size=[
self.feat_dims_dict[name] + 1,
6 * int(pow(self.feat_dims_dict[name], 0.25))
],
is_sparse=self.is_sparse)) for name in self.sparse_feat_names)
# combine dense and sparse_emb
dense_input_list = [
data_dict[name] for name in data_dict if name.startswith('I')
]
sparse_emb_list = list(sparse_emb_dict.values())
sparse_input = fluid.layers.concat(sparse_emb_list, axis=-1)
sparse_input = fluid.layers.flatten(sparse_input)
dense_input = fluid.layers.concat(dense_input_list, axis=-1)
dense_input = fluid.layers.flatten(dense_input)
dense_input = fluid.layers.cast(dense_input, 'float32')
net_input = fluid.layers.concat([dense_input, sparse_input], axis=-1)
return net_input
def _deep_net(self, input, hidden_units, use_bn=False, is_test=False):
for units in hidden_units:
input = fluid.layers.fc(input=input, size=units)
if use_bn:
input = fluid.layers.batch_norm(input, is_test=is_test)
input = fluid.layers.relu(input)
return input
def _cross_layer(self, x0, x, prefix):
input_dim = x0.shape[-1]
w = fluid.layers.create_parameter(
[input_dim], dtype='float32', name=prefix + "_w")
b = fluid.layers.create_parameter(
[input_dim], dtype='float32', name=prefix + "_b")
xw = fluid.layers.reduce_sum(x * w, dim=1, keep_dim=True) # (N, 1)
return x0 * xw + b + x, w
def _cross_net(self, input, num_corss_layers):
x = x0 = input
l2_reg_cross_list = []
for i in range(num_corss_layers):
x, w = self._cross_layer(x0, x, "cross_layer_{}".format(i))
l2_reg_cross_list.append(self._l2_loss(w))
l2_reg_cross_loss = fluid.layers.reduce_sum(
fluid.layers.concat(
l2_reg_cross_list, axis=-1))
return x, l2_reg_cross_loss
def _l2_loss(self, w):
return fluid.layers.reduce_sum(fluid.layers.square(w))
def train_net(self):
self.init_network()
self.target_input = fluid.data(
name='label', shape=[None, 1], dtype='float32')
data_dict = OrderedDict()
for feat_name in self.feat_dims_dict:
data_dict[feat_name] = fluid.data(
name=feat_name, shape=[None, 1], dtype='float32')
self.net_input = self._create_embedding_input(data_dict)
deep_out = self._deep_net(self.net_input, self.dnn_hidden_units, self.dnn_use_bn, False)
cross_out, l2_reg_cross_loss = self._cross_net(self.net_input,
self.cross_num)
last_out = fluid.layers.concat([deep_out, cross_out], axis=-1)
logit = fluid.layers.fc(last_out, 1)
self.prob = fluid.layers.sigmoid(logit)
self._data_var = [self.target_input] + [
data_dict[dense_name] for dense_name in self.dense_feat_names
] + [data_dict[sparse_name] for sparse_name in self.sparse_feat_names]
# auc
prob_2d = fluid.layers.concat([1 - self.prob, self.prob], 1)
label_int = fluid.layers.cast(self.target_input, 'int64')
auc_var, batch_auc_var, self.auc_states = fluid.layers.auc(
input=prob_2d, label=label_int, slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
# logloss
logloss = fluid.layers.log_loss(self.prob, self.target_input)
self.avg_logloss = fluid.layers.reduce_mean(logloss)
# reg_coeff * l2_reg_cross
l2_reg_cross_loss = self.l2_reg_cross * l2_reg_cross_loss
self.loss = self.avg_logloss + l2_reg_cross_loss
self._cost = self.loss
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self, parameter_list):
self.deepfm_net()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "fleetrec.models.rank.deepfm"
reader:
batch_size: 2
class: "{workspace}/criteo_reader.py"
train_data_path: "{workspace}/data/train_data"
feat_dict_name: "{workspace}/data/aid_data/feat_dict_10.pkl2"
model:
models: "{workspace}/model.py"
hyper_parameters:
sparse_feature_number: 1086460
sparse_feature_dim: 9
num_field: 39
fc_sizes: [400, 400, 400]
learning_rate: 0.0001
reg: 0.001
act: "relu"
optimizer: SGD
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from fleetrec.core.reader import Reader
from fleetrec.core.utils import envs
try:
import cPickle as pickle
except ImportError:
import pickle
class TrainReader(Reader):
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
5775, 257675, 65535, 969, 23159456, 431037, 56311, 6047, 29019, 46,
231, 4008, 7393
]
self.cont_diff_ = [
self.cont_max_[i] - self.cont_min_[i]
for i in range(len(self.cont_min_))
]
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
# load preprocessed feature dict
self.feat_dict_name = envs.get_global_env("feat_dict_name", None, "train.reader")
self.feat_dict_ = pickle.load(open(self.feat_dict_name, 'rb'))
def _process_line(self, line):
features = line.rstrip('\n').split('\t')
feat_idx = []
feat_value = []
for idx in self.continuous_range_:
if features[idx] == '':
feat_idx.append(0)
feat_value.append(0.0)
else:
feat_idx.append(self.feat_dict_[idx])
feat_value.append(
(float(features[idx]) - self.cont_min_[idx - 1]) /
self.cont_diff_[idx - 1])
for idx in self.categorical_range_:
if features[idx] == '' or features[idx] not in self.feat_dict_:
feat_idx.append(0)
feat_value.append(0.0)
else:
feat_idx.append(self.feat_dict_[features[idx]])
feat_value.append(1.0)
label = [int(features[0])]
return feat_idx, feat_value, label
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def data_iter():
feat_idx, feat_value, label = self._process_line(line)
yield [('feat_idx', feat_idx), ('feat_value', feat_value), ('label', label)]
return data_iter
\ No newline at end of file
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from fleetrec.tools.tools import download_file_and_uncompress, download_file
if __name__ == '__main__':
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
url2 = "https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2"
print("download and extract starting...")
download_file_and_uncompress(url)
download_file(url2, "./aid_data/feat_dict_10.pkl2", True)
print("download and extract finished")
print("preprocessing...")
os.system("python preprocess.py")
print("preprocess done")
shutil.rmtree("raw_data")
print("done")
import os
import numpy
from collections import Counter
import shutil
import pickle
def get_raw_data():
if not os.path.isdir('raw_data'):
os.mkdir('raw_data')
fin = open('train.txt', 'r')
fout = open('raw_data/part-0', 'w')
for line_idx, line in enumerate(fin):
if line_idx % 200000 == 0 and line_idx != 0:
fout.close()
cur_part_idx = int(line_idx / 200000)
fout = open('raw_data/part-' + str(cur_part_idx), 'w')
fout.write(line)
fout.close()
fin.close()
def split_data():
split_rate_ = 0.9
dir_train_file_idx_ = 'aid_data/train_file_idx.txt'
filelist_ = [
'raw_data/part-%d' % x for x in range(len(os.listdir('raw_data')))
]
if not os.path.exists(dir_train_file_idx_):
train_file_idx = list(
numpy.random.choice(
len(filelist_), int(len(filelist_) * split_rate_), False))
with open(dir_train_file_idx_, 'w') as fout:
fout.write(str(train_file_idx))
else:
with open(dir_train_file_idx_, 'r') as fin:
train_file_idx = eval(fin.read())
for idx in range(len(filelist_)):
if idx in train_file_idx:
shutil.move(filelist_[idx], 'train_data')
else:
shutil.move(filelist_[idx], 'test_data')
def get_feat_dict():
freq_ = 10
dir_feat_dict_ = 'aid_data/feat_dict_' + str(freq_) + '.pkl2'
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
if not os.path.exists(dir_feat_dict_):
# print('generate a feature dict')
# Count the number of occurrences of discrete features
feat_cnt = Counter()
with open('train.txt', 'r') as fin:
for line_idx, line in enumerate(fin):
if line_idx % 100000 == 0:
print('generating feature dict', line_idx / 45000000)
features = line.rstrip('\n').split('\t')
for idx in categorical_range_:
if features[idx] == '': continue
feat_cnt.update([features[idx]])
# Only retain discrete features with high frequency
dis_feat_set = set()
for feat, ot in feat_cnt.items():
if ot >= freq_:
dis_feat_set.add(feat)
# Create a dictionary for continuous and discrete features
feat_dict = {}
tc = 1
# Continuous features
for idx in continuous_range_:
feat_dict[idx] = tc
tc += 1
for feat in dis_feat_set:
feat_dict[feat] = tc
tc += 1
# Save dictionary
with open(dir_feat_dict_, 'wb') as fout:
pickle.dump(feat_dict, fout, protocol=2)
print('args.num_feat ', len(feat_dict) + 1)
if __name__ == '__main__':
if not os.path.isdir('train_data'):
os.mkdir('train_data')
if not os.path.isdir('test_data'):
os.mkdir('test_data')
if not os.path.isdir('aid_data'):
os.mkdir('aid_data')
get_raw_data()
split_data()
get_feat_dict()
print('Done!')
import paddle.fluid as fluid
import math
from fleetrec.core.utils import envs
from fleetrec.core.model import Model as ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def deepfm_net(self):
init_value_ = 0.1
is_distributed = True if envs.get_trainer() == "CtrTrainer" else False
sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self._namespace)
sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self._namespace)
# ------------------------- network input --------------------------
num_field = envs.get_global_env("hyper_parameters.num_field", None, self._namespace)
raw_feat_idx = fluid.data(name='feat_idx', shape=[None, num_field], dtype='int64') # None * num_field(defalut:39)
raw_feat_value = fluid.data(name='feat_value', shape=[None, num_field], dtype='float32') # None * num_field
self.label = fluid.data(name='label', shape=[None, 1], dtype='float32') # None * 1
feat_idx = fluid.layers.reshape(raw_feat_idx,[-1, 1]) # (None * num_field) * 1
feat_value = fluid.layers.reshape(raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
# ------------------------- set _data_var --------------------------
self._data_var.append(raw_feat_idx)
self._data_var.append(raw_feat_value)
self._data_var.append(self.label)
if self._platform != "LINUX":
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
#------------------------- first order term --------------------------
reg = envs.get_global_env("hyper_parameters.reg", 1e-4, self._namespace)
first_weights_re = fluid.embedding(
input=feat_idx,
is_sparse=True,
is_distributed=is_distributed,
dtype='float32',
size=[sparse_feature_number + 1, 1],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_),
regularizer=fluid.regularizer.L1DecayRegularizer(reg)))
first_weights = fluid.layers.reshape(
first_weights_re, shape=[-1, num_field, 1]) # None * num_field * 1
y_first_order = fluid.layers.reduce_sum((first_weights * feat_value), 1)
#------------------------- second order term --------------------------
feat_embeddings_re = fluid.embedding(
input=feat_idx,
is_sparse=True,
is_distributed=is_distributed,
dtype='float32',
size=[sparse_feature_number + 1, sparse_feature_dim],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_ / math.sqrt(float(sparse_feature_dim)))))
feat_embeddings = fluid.layers.reshape(
feat_embeddings_re,
shape=[-1, num_field,
sparse_feature_dim]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# sum_square part
summed_features_emb = fluid.layers.reduce_sum(feat_embeddings,
1) # None * embedding_size
summed_features_emb_square = fluid.layers.square(
summed_features_emb) # None * embedding_size
# square_sum part
squared_features_emb = fluid.layers.square(
feat_embeddings) # None * num_field * embedding_size
squared_sum_features_emb = fluid.layers.reduce_sum(
squared_features_emb, 1) # None * embedding_size
y_second_order = 0.5 * fluid.layers.reduce_sum(
summed_features_emb_square - squared_sum_features_emb, 1,
keep_dim=True) # None * 1
#------------------------- DNN --------------------------
layer_sizes = envs.get_global_env("hyper_parameters.fc_sizes", None, self._namespace)
act = envs.get_global_env("hyper_parameters.act", None, self._namespace)
y_dnn = fluid.layers.reshape(feat_embeddings,
[-1, num_field * sparse_feature_dim])
for s in layer_sizes:
y_dnn = fluid.layers.fc(
input=y_dnn,
size=s,
act=act,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_ / math.sqrt(float(10)))),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)))
y_dnn = fluid.layers.fc(
input=y_dnn,
size=1,
act=None,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)))
#------------------------- DeepFM --------------------------
self.predict = fluid.layers.sigmoid(y_first_order + y_second_order + y_dnn)
def train_net(self):
self.deepfm_net()
#------------------------- Cost(logloss) --------------------------
cost = fluid.layers.log_loss(input=self.predict, label=self.label)
avg_cost = fluid.layers.reduce_sum(cost)
self._cost = avg_cost
#------------------------- Metric(Auc) --------------------------
predict_2d = fluid.layers.concat([1 - self.predict, self.predict], 1)
label_int = fluid.layers.cast(self.label, 'int64')
auc_var, batch_auc_var, _ = fluid.layers.auc(input=predict_2d,
label=label_int,
slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self, parameter_list):
self.deepfm_net()
\ No newline at end of file
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "fleetrec.models.rank.din"
reader:
batch_size: 2
class: "{workspace}/reader.py"
train_data_path: "{workspace}/data/train_data"
dataset_class: "DataLoader"
model:
models: "{workspace}/model.py"
hyper_parameters:
use_DataLoader: True
item_emb_size: 64
cat_emb_size: 64
is_sparse: False
config_path: "data/config.txt"
fc_sizes: [400, 400, 400]
learning_rate: 0.0001
reg: 0.001
act: "sigmoid"
optimizer: SGD
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
from __future__ import print_function
import random
import pickle
random.seed(1234)
print("read and process data")
with open('./raw_data/remap.pkl', 'rb') as f:
reviews_df = pickle.load(f)
cate_list = pickle.load(f)
user_count, item_count, cate_count, example_count = pickle.load(f)
train_set = []
test_set = []
for reviewerID, hist in reviews_df.groupby('reviewerID'):
pos_list = hist['asin'].tolist()
def gen_neg():
neg = pos_list[0]
while neg in pos_list:
neg = random.randint(0, item_count - 1)
return neg
neg_list = [gen_neg() for i in range(len(pos_list))]
for i in range(1, len(pos_list)):
hist = pos_list[:i]
if i != len(pos_list) - 1:
train_set.append((reviewerID, hist, pos_list[i], 1))
train_set.append((reviewerID, hist, neg_list[i], 0))
else:
label = (pos_list[i], neg_list[i])
test_set.append((reviewerID, hist, label))
random.shuffle(train_set)
random.shuffle(test_set)
assert len(test_set) == user_count
def print_to_file(data, fout):
for i in range(len(data)):
fout.write(str(data[i]))
if i != len(data) - 1:
fout.write(' ')
else:
fout.write(';')
print("make train data")
with open("paddle_train.txt", "w") as fout:
for line in train_set:
history = line[1]
target = line[2]
label = line[3]
cate = [cate_list[x] for x in history]
print_to_file(history, fout)
print_to_file(cate, fout)
fout.write(str(target) + ";")
fout.write(str(cate_list[target]) + ";")
fout.write(str(label) + "\n")
print("make test data")
with open("paddle_test.txt", "w") as fout:
for line in test_set:
history = line[1]
target = line[2]
cate = [cate_list[x] for x in history]
print_to_file(history, fout)
print_to_file(cate, fout)
fout.write(str(target[0]) + ";")
fout.write(str(cate_list[target[0]]) + ";")
fout.write("1\n")
print_to_file(history, fout)
print_to_file(cate, fout)
fout.write(str(target[1]) + ";")
fout.write(str(cate_list[target[1]]) + ";")
fout.write("0\n")
print("make config data")
with open('config.txt', 'w') as f:
f.write(str(user_count) + "\n")
f.write(str(item_count) + "\n")
f.write(str(cate_count) + "\n")
from __future__ import print_function
import pickle
import pandas as pd
def to_df(file_path):
with open(file_path, 'r') as fin:
df = {}
i = 0
for line in fin:
df[i] = eval(line)
i += 1
df = pd.DataFrame.from_dict(df, orient='index')
return df
print("start to analyse reviews_Electronics_5.json")
reviews_df = to_df('./raw_data/reviews_Electronics_5.json')
with open('./raw_data/reviews.pkl', 'wb') as f:
pickle.dump(reviews_df, f, pickle.HIGHEST_PROTOCOL)
print("start to analyse meta_Electronics.json")
meta_df = to_df('./raw_data/meta_Electronics.json')
meta_df = meta_df[meta_df['asin'].isin(reviews_df['asin'].unique())]
meta_df = meta_df.reset_index(drop=True)
with open('./raw_data/meta.pkl', 'wb') as f:
pickle.dump(meta_df, f, pickle.HIGHEST_PROTOCOL)
#! /bin/bash
set -e
echo "begin download data"
mkdir raw_data
cd raw_data
wget -c http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Electronics_5.json.gz
gzip -d reviews_Electronics_5.json.gz
wget -c http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Electronics.json.gz
gzip -d meta_Electronics.json.gz
echo "download data successfully"
cd ..
python convert_pd.py
python remap_id.py
from __future__ import print_function
import random
import pickle
import numpy as np
random.seed(1234)
with open('./raw_data/reviews.pkl', 'rb') as f:
reviews_df = pickle.load(f)
reviews_df = reviews_df[['reviewerID', 'asin', 'unixReviewTime']]
with open('./raw_data/meta.pkl', 'rb') as f:
meta_df = pickle.load(f)
meta_df = meta_df[['asin', 'categories']]
meta_df['categories'] = meta_df['categories'].map(lambda x: x[-1][-1])
def build_map(df, col_name):
key = sorted(df[col_name].unique().tolist())
m = dict(zip(key, range(len(key))))
df[col_name] = df[col_name].map(lambda x: m[x])
return m, key
asin_map, asin_key = build_map(meta_df, 'asin')
cate_map, cate_key = build_map(meta_df, 'categories')
revi_map, revi_key = build_map(reviews_df, 'reviewerID')
user_count, item_count, cate_count, example_count =\
len(revi_map), len(asin_map), len(cate_map), reviews_df.shape[0]
print('user_count: %d\titem_count: %d\tcate_count: %d\texample_count: %d' %
(user_count, item_count, cate_count, example_count))
meta_df = meta_df.sort_values('asin')
meta_df = meta_df.reset_index(drop=True)
reviews_df['asin'] = reviews_df['asin'].map(lambda x: asin_map[x])
reviews_df = reviews_df.sort_values(['reviewerID', 'unixReviewTime'])
reviews_df = reviews_df.reset_index(drop=True)
reviews_df = reviews_df[['reviewerID', 'asin', 'unixReviewTime']]
cate_list = [meta_df['categories'][i] for i in range(len(asin_map))]
cate_list = np.array(cate_list, dtype=np.int32)
with open('./raw_data/remap.pkl', 'wb') as f:
pickle.dump(reviews_df, f, pickle.HIGHEST_PROTOCOL) # uid, iid
pickle.dump(cate_list, f, pickle.HIGHEST_PROTOCOL) # cid of iid line
pickle.dump((user_count, item_count, cate_count, example_count), f,
pickle.HIGHEST_PROTOCOL)
pickle.dump((asin_key, cate_key, revi_key), f, pickle.HIGHEST_PROTOCOL)
3737 19450;288 196;18486;674;1
3647 4342 6855 3805;281 463 558 674;4206;463;1
1805 4309;87 87;21354;556;1
18209 20753;649 241;51924;610;0
13150;351;41455;792;1
35120 40418;157 714;52035;724;0
13515 20363 25356 26891 24200 11694 33378 34483 35370 27311 40689 33319 28819;558 123 61 110 738 692 110 629 714 463 281 142 382;45554;558;1
19254 9021 28156 19193 24602 31171;189 462 140 474 157 614;48895;350;1
4716;194;32497;484;1
43799 47108;368 140;3503;25;0
20554 41800 1582 1951;339 776 694 703;4320;234;0
39713 44272 45136 11687;339 339 339 140;885;168;0
14398 33997;756 347;20438;703;1
29341 25727;142 616;4170;512;0
12197 10212;558 694;31559;24;0
11551;351;53485;436;1
4553;196;7331;158;1
15190 19994 33946 30716 31879 45178 51598 46814;249 498 612 142 746 746 558 174;24353;251;0
4931 2200 8338 23530;785 792 277 523;3525;251;0
8881 13274 12683 14696 27693 1395 44373 59704 27762 54268 30326 11811 45371 51598 55859 56039 57678 47250 2073 38932;479 558 190 708 335 684 339 725 446 446 44 575 280 558 262 197 368 111 749 188;12361;616;1
16297 16797 18629 20922 16727 33946 51165 36796;281 436 462 339 611 612 288 64;34724;288;1
22237;188;40786;637;0
5396 39993 42681 49832 11208 34954 36523 45523 51618;351 339 687 281 708 142 629 656 142;38201;571;0
8881 9029 17043 16620 15021 32706;479 110 110 749 598 251;34941;657;0
53255;444;37953;724;1
1010 4172 8613 11562 11709 13118 2027 15446;674 606 708 436 179 179 692 436;36998;703;0
22357 24305 15222 19254 22914;189 504 113 189 714;18201;398;1
1905;694;23877;347;1
8444 17868;765 712;50732;44;0
42301 26186 38086;142 450 744;61547;714;0
18156 35717 32070 45650 47208 20975 36409 44856 48072 15860 47043 53289 53314 33470 47926;157 281 650 142 749 291 707 714 157 205 388 474 708 498 495;48170;746;1
56219;108;1988;389;0
22907;83;752;175;0
22009 32410 42987 48720 683 1289 2731 4736 6306 8442 8946 9928 11536 14947 15793 16694 21736 25156 25797 25874 26573 30318 33946 35420 1492 5236 5555 6625 8867 9638 11443 20225 25965 27273 29001 35302 42336 43347 36907 2012;317 462 291 142 694 10 574 278 708 281 131 142 367 281 258 345 616 708 111 115 339 113 612 24 368 616 39 197 44 214 558 108 616 558 210 210 142 142 262 351;25540;701;0
20434;196;18056;189;0
628 5461;194 234;43677;351;0
16953 15149 45143 23587 5094 25105 51913 54645;484 281 449 792 524 395 388 731;57655;75;1
13584 7509;234 744;33062;749;1
170 208 77 109 738 742 1118 15349 255 12067 21643 55453;330 559 744 115 558 674 111 351 694 694 746 111;9821;694;1
4970 16672;540 746;25685;666;1
17240 60546;708 629;42110;142;1
31503 31226 50628 22444;142 156 142 203;47812;749;0
2443 1763 3403 4225 8951;25 707 351 177 351;7954;351;1
3748;351;9171;657;1
1755 26204 42716 32991;446 188 497 746;23910;395;1
20637 27122;558 44;19669;301;0
406 872 306 218 883 1372 1705 1709 7774 2376 2879 2881 13329 4992 13594 11106 7131 8631 1736 17585 2568 16896 21971 10296 22361 24108 23300 11793 25351 2648 24593 12692 23883 25345 27129 26321 21627 20738 17784 28785 29281 28366 24723 24319 12083 29882 29974 30443 30428 17072 9783 16700 29421 32253 28830 31299 28792 33931 24973 33112 21717 28339 23978 18649 1841 17635 19696 37448 20862 30492 35736 37450 2633 8675 17412 25960 28389 31032 37157 14555 4996 33388 33393 36237 38946 22793 24337 34963 38819 41165 39551 43019 15570 25129 34593 38385 42915 41407 29907 31289 44229 24267 34975 39462 33274 43251 38302 35502 44056 44675 45233 47690 33472 50149 29409 47183 49188 48192 50628 24103 28313 28358 38882 44330 44346 2019 2484 2675 26396 48143 46039 47722 48559 41719 41720 43920 41983 51235 34964 27287 51915 33586 43630 47258 52137 40954 35120 29572 42405 53559 44900 45761;241 558 395 368 498 110 463 611 558 106 10 112 251 241 48 112 601 674 241 347 733 502 194 119 179 179 578 692 281 115 523 113 281 35 765 196 339 115 90 164 790 708 142 115 342 351 391 281 48 119 74 505 606 68 239 687 687 281 110 281 449 351 38 351 164 176 449 115 70 25 687 115 39 756 35 175 704 119 38 53 115 38 38 142 262 188 614 277 388 615 49 738 106 733 486 666 571 385 708 119 331 463 578 288 142 106 611 611 39 523 388 142 726 702 498 61 142 714 142 654 277 733 603 498 299 97 726 115 637 703 558 74 629 142 142 347 629 746 277 8 49 389 629 408 733 345 157 704 115 398 611 239;49174;368;0
29206 60955;351 684;61590;76;1
8427 9692 4411 3266 18234 22774;746 281 396 651 446 44;23393;351;0
13051 15844 9347 21973 18365 24220 28429 4799 27488 21623 13870 29346 27208 31075 31635 28390 30777 29334 33438 16469 29423 29237 25527 34808 37656 21324 38263 6699 33167 9295 40828 18894;339 342 657 194 20 466 179 225 436 364 707 115 36 523 351 674 694 391 674 500 342 216 707 345 616 495 436 363 395 189 203 766;56816;396;0
5653 18042 21137 17277 23847 25109 21837 17163 22786 27380 20789 27737 30164 36402 37166 38647 31746 38915 38366 11151 43757 38284 29817 41717 41899 43279 47539 37850 39789 43817 11208 53361 29247 51483 39940 50917 53618 44055 48997;593 251 616 110 110 110 110 105 436 558 311 142 603 738 398 766 1 351 142 584 674 597 142 483 351 157 373 142 629 39 708 251 339 142 262 1 113 142 462;13418;558;0
8719 11172;311 217;11707;179;1
14968 8297 22914 5998 20253 41425 42664 46745 51179 33481 46814 55135 53124 61559;463 766 714 486 628 444 281 714 142 242 174 118 714 714;61908;714;1
61119;714;22907;83;0
26172;157;54529;44;0
13830 10377 8193 16072 13543 18741 24205 18281 37272 27784 16658 27884;384 739 558 739 135 347 558 687 498 142 197 746;34463;177;1
20842 11756 22110 30562 30697;189 68 483 776 225;49113;483;0
13646 46782 54138;142 798 142;43698;347;0
36434;241;51537;629;0
44121 35325;397 653;43399;397;1
6438 11107 20073 25026 24434 35533 6318 25028 28352 32359 25734 26280 41466 25192 1909 11753 17770 24301 1728 9693 36444 40256 17961 36780 41093 8788 439 46397 46269 50462 40395 437 2582 4455 12361 14325 22294 26153 26607 29205 29878 33491 38795 41585 45480 51567 54245 19796 52446;356 194 389 89 474 330 347 384 330 90 19 385 177 68 624 68 674 463 624 194 177 389 197 642 239 111 115 113 48 251 554 115 36 163 616 524 84 190 465 398 89 166 113 330 616 449 90 140 330;15142;764;0
1573;540;18294;463;1
9837 13438 13690;351 629 24;26044;351;0
1708 2675 4935 7401 14413 22177 30319 32217 34342 40235 42963 43949 54816;463 115 474 616 474 44 113 279 164 142 616 649 36;31992;115;0
8025 11769 36188 42006;142 262 714 142;8209;142;0
30266;176;44167;692;0
13000 14769 2940 27638 23158;765 27 736 554 112;55050;725;0
32557 18668 43441;765 707 396;44217;681;1
5665 5964 18874;542 746 196;16747;179;0
7014 29912 42468;194 612 558;20800;355;0
8320 9743 1735 442 5216 11568;234 251 241 603 476 649;32738;153;0
533 1447;744 744;17843;744;1
48390 48191;714 714;48864;708;1
9312 16166 12754 21433 28142 7486;215 674 241 115 558 241;38629;48;1
10401 11665 10739;142 364 766;5989;463;0
10408 14363 8807 14947 24701 44676 40914 12241 14906 29247 32347 5834 18291 18313 23375 24075 7020 14307 15891;140 140 749 281 444 388 504 385 196 339 746 351 463 746 197 90 746 576 476;37949;330;1
50194;444;15572;216;0
24021;281;25850;140;1
22185 28726 55777;142 766 351;17;541;1
31776 34767 28854 34769 38022 38667 32917 9094 40879 41634 42252 19865 47983 38818 40131 40690 18915 48539 49619 18554 24836;70 239 113 48 486 541 352 197 347 385 34 476 704 388 385 281 225 474 157 706 53;25602;707;1
10544 15159 23606 33556 46886 55061 2079 27022 40345 43556 3807 28732;642 87 641 113 558 157 564 44 194 26 54 113;51293;272;0
19005 41469 42368 5739 30169 32266 54743 56959 26271;145 482 707 790 101 347 197 368 674;5602;158;0
7166 16886 21083 7328 25545;560 213 87 744 87;32494;321;1
2306;260;30286;179;0
57709 55115;351 483;25035;142;0
16641 35845;153 311;36985;68;1
31144 4107;189 168;50619;142;0
36331 9873 10659 14382 21430 28164;680 197 185 11 115 476;37887;484;1
19519 3748 33772 22436 38789 46337;649 351 210 115 113 115;23980;649;1
30789 37586 42354 26171 15017 28654 44960;142 714 142 483 484 474 157;41552;746;1
52662;576;53627;776;0
12258 15133 15681 5066 6420 13421 6577 29202 38939;216 558 111 570 447 5 111 281 347;7818;558;0
610 1258 2332 7508 10814 10797 11710;543 611 611 653 110 201 179;11495;558;1
12584 2707 1664 25878 25949;790 694 694 142 611;25286;792;1
32423 24223;135 90;2323;399;0
11959;197;15349;351;1
44448 58138 41930 57603 59009 61316 61559 599;339 629 115 388 1 142 714 297;54434;142;0
43441 12617 47970 52144;396 196 142 629;29211;351;1
25327 40258;656 398;40261;142;1
4637;474;59864;687;0
import paddle.fluid as fluid
import math
from fleetrec.core.utils import envs
from fleetrec.core.model import Model as ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def config_read(self, config_path):
with open(config_path, "r") as fin:
user_count = int(fin.readline().strip())
item_count = int(fin.readline().strip())
cat_count = int(fin.readline().strip())
return user_count, item_count, cat_count
def din_attention(self, hist, target_expand, mask):
"""activation weight"""
hidden_size = hist.shape[-1]
concat = fluid.layers.concat(
[hist, target_expand, hist - target_expand, hist * target_expand],
axis=2)
atten_fc1 = fluid.layers.fc(name="atten_fc1",
input=concat,
size=80,
act=self.act,
num_flatten_dims=2)
atten_fc2 = fluid.layers.fc(name="atten_fc2",
input=atten_fc1,
size=40,
act=self.act,
num_flatten_dims=2)
atten_fc3 = fluid.layers.fc(name="atten_fc3",
input=atten_fc2,
size=1,
num_flatten_dims=2)
atten_fc3 += mask
atten_fc3 = fluid.layers.transpose(x=atten_fc3, perm=[0, 2, 1])
atten_fc3 = fluid.layers.scale(x=atten_fc3, scale=hidden_size**-0.5)
weight = fluid.layers.softmax(atten_fc3)
out = fluid.layers.matmul(weight, hist)
out = fluid.layers.reshape(x=out, shape=[0, hidden_size])
return out
def train_net(self):
seq_len = -1
self.item_emb_size = envs.get_global_env("hyper_parameters.item_emb_size", 64, self._namespace)
self.cat_emb_size = envs.get_global_env("hyper_parameters.cat_emb_size", 64, self._namespace)
self.act = envs.get_global_env("hyper_parameters.act", "sigmoid", self._namespace)
#item_emb_size = 64
#cat_emb_size = 64
self.is_sparse = envs.get_global_env("hyper_parameters.is_sparse", False, self._namespace)
#significant for speeding up the training process
self.config_path = envs.get_global_env("hyper_parameters.config_path", "data/config.txt", self._namespace)
self.use_DataLoader = envs.get_global_env("hyper_parameters.use_DataLoader", False, self._namespace)
user_count, item_count, cat_count = self.config_read(self.config_path)
item_emb_attr = fluid.ParamAttr(name="item_emb")
cat_emb_attr = fluid.ParamAttr(name="cat_emb")
hist_item_seq = fluid.data(
name="hist_item_seq", shape=[None, seq_len], dtype="int64")
self._data_var.append(hist_item_seq)
hist_cat_seq = fluid.data(
name="hist_cat_seq", shape=[None, seq_len], dtype="int64")
self._data_var.append(hist_cat_seq)
target_item = fluid.data(name="target_item", shape=[None], dtype="int64")
self._data_var.append(target_item)
target_cat = fluid.data(name="target_cat", shape=[None], dtype="int64")
self._data_var.append(target_cat)
label = fluid.data(name="label", shape=[None, 1], dtype="float32")
self._data_var.append(label)
mask = fluid.data(name="mask", shape=[None, seq_len, 1], dtype="float32")
self._data_var.append(mask)
target_item_seq = fluid.data(
name="target_item_seq", shape=[None, seq_len], dtype="int64")
self._data_var.append(target_item_seq)
target_cat_seq = fluid.data(
name="target_cat_seq", shape=[None, seq_len], dtype="int64")
self._data_var.append(target_cat_seq)
if self.use_DataLoader:
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=10000, use_double_buffer=False, iterable=False)
hist_item_emb = fluid.embedding(
input=hist_item_seq,
size=[item_count, self.item_emb_size],
param_attr=item_emb_attr,
is_sparse=self.is_sparse)
hist_cat_emb = fluid.embedding(
input=hist_cat_seq,
size=[cat_count, self.cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=self.is_sparse)
target_item_emb = fluid.embedding(
input=target_item,
size=[item_count, self.item_emb_size],
param_attr=item_emb_attr,
is_sparse=self.is_sparse)
target_cat_emb = fluid.embedding(
input=target_cat,
size=[cat_count, self.cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=self.is_sparse)
target_item_seq_emb = fluid.embedding(
input=target_item_seq,
size=[item_count, self.item_emb_size],
param_attr=item_emb_attr,
is_sparse=self.is_sparse)
target_cat_seq_emb = fluid.embedding(
input=target_cat_seq,
size=[cat_count, self.cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=self.is_sparse)
item_b = fluid.embedding(
input=target_item,
size=[item_count, 1],
param_attr=fluid.initializer.Constant(value=0.0))
hist_seq_concat = fluid.layers.concat([hist_item_emb, hist_cat_emb], axis=2)
target_seq_concat = fluid.layers.concat(
[target_item_seq_emb, target_cat_seq_emb], axis=2)
target_concat = fluid.layers.concat(
[target_item_emb, target_cat_emb], axis=1)
out = self.din_attention(hist_seq_concat, target_seq_concat, mask)
out_fc = fluid.layers.fc(name="out_fc",
input=out,
size=self.item_emb_size + self.cat_emb_size,
num_flatten_dims=1)
embedding_concat = fluid.layers.concat([out_fc, target_concat], axis=1)
fc1 = fluid.layers.fc(name="fc1",
input=embedding_concat,
size=80,
act=self.act)
fc2 = fluid.layers.fc(name="fc2", input=fc1, size=40, act=self.act)
fc3 = fluid.layers.fc(name="fc3", input=fc2, size=1)
logit = fc3 + item_b
loss = fluid.layers.sigmoid_cross_entropy_with_logits(x=logit, label=label)
avg_loss = fluid.layers.mean(loss)
self._cost = avg_loss
self.predict = fluid.layers.sigmoid(logit)
predict_2d = fluid.layers.concat([1 - self.predict, self.predict], 1)
label_int = fluid.layers.cast(label, 'int64')
auc_var, batch_auc_var, _ = fluid.layers.auc(input=predict_2d,
label=label_int,
slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self, parameter_list):
self.deepfm_net()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from fleetrec.core.reader import Reader
from fleetrec.core.utils import envs
import numpy as np
import os
import random
try:
import cPickle as pickle
except ImportError:
import pickle
class TrainReader(Reader):
def init(self):
self.train_data_path = envs.get_global_env("train_data_path", None, "train.reader")
self.res = []
self.max_len = 0
data_file_list = os.listdir(self.train_data_path)
for i in range(0, len(data_file_list)):
train_data_file = os.path.join(self.train_data_path, data_file_list[i])
with open(train_data_file, "r") as fin:
for line in fin:
line = line.strip().split(';')
hist = line[0].split()
self.max_len = max(self.max_len, len(hist))
fo = open("tmp.txt", "w")
fo.write(str(self.max_len))
fo.close()
self.batch_size = envs.get_global_env("batch_size", 32, "train.reader")
self.group_size = self.batch_size * 20
def _process_line(self, line):
line = line.strip().split(';')
hist = line[0].split()
hist = [int(i) for i in hist]
cate = line[1].split()
cate = [int(i) for i in cate]
return [hist, cate, [int(line[2])], [int(line[3])], [float(line[4])]]
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def data_iter():
#feat_idx, feat_value, label = self._process_line(line)
yield self._process_line(line)
return data_iter
def pad_batch_data(self, input, max_len):
res = np.array([x + [0] * (max_len - len(x)) for x in input])
res = res.astype("int64").reshape([-1, max_len])
return res
def make_data(self, b):
max_len = max(len(x[0]) for x in b)
item = self.pad_batch_data([x[0] for x in b], max_len)
cat = self.pad_batch_data([x[1] for x in b], max_len)
len_array = [len(x[0]) for x in b]
mask = np.array(
[[0] * x + [-1e9] * (max_len - x) for x in len_array]).reshape(
[-1, max_len, 1])
target_item_seq = np.array(
[[x[2]] * max_len for x in b]).astype("int64").reshape([-1, max_len])
target_cat_seq = np.array(
[[x[3]] * max_len for x in b]).astype("int64").reshape([-1, max_len])
res = []
for i in range(len(b)):
res.append([
item[i], cat[i], b[i][2], b[i][3], b[i][4], mask[i],
target_item_seq[i], target_cat_seq[i]
])
return res
def batch_reader(self, reader, batch_size, group_size):
def batch_reader():
bg = []
for line in reader:
bg.append(line)
if len(bg) == group_size:
sortb = sorted(bg, key=lambda x: len(x[0]), reverse=False)
bg = []
for i in range(0, group_size, batch_size):
b = sortb[i:i + batch_size]
yield self.make_data(b)
len_bg = len(bg)
if len_bg != 0:
sortb = sorted(bg, key=lambda x: len(x[0]), reverse=False)
bg = []
remain = len_bg % batch_size
for i in range(0, len_bg - remain, batch_size):
b = sortb[i:i + batch_size]
yield self.make_data(b)
return batch_reader
def base_read(self, file_dir):
res = []
for train_file in file_dir:
with open(train_file, "r") as fin:
for line in fin:
line = line.strip().split(';')
hist = line[0].split()
cate = line[1].split()
res.append([hist, cate, line[2], line[3], float(line[4])])
return res
def generate_batch_from_trainfiles(self, files):
data_set = self.base_read(files)
random.shuffle(data_set)
return self.batch_reader(data_set, self.batch_size, self.batch_size * 20)
\ No newline at end of file
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "fleetrec.models.rank.wide_deep"
reader:
batch_size: 2
class: "{workspace}/reader.py"
train_data_path: "{workspace}/data/train_data"
model:
models: "{workspace}/model.py"
hyper_parameters:
hidden1_units: 75
hidden2_units: 50
hidden3_units: 25
learning_rate: 0.0001
reg: 0.001
act: "relu"
optimizer: SGD
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
mkdir train_data
mkdir test_data
mkdir data
train_path="/home/yaoxuefeng/repos/models/models/PaddleRec/ctr/wide_deep/data/adult.data"
test_path="/home/yaoxuefeng/repos/models/models/PaddleRec/ctr/wide_deep/data/adult.test"
train_data_path="/home/yaoxuefeng/repos/models/models/PaddleRec/ctr/wide_deep/train_data/train_data.csv"
test_data_path="/home/yaoxuefeng/repos/models/models/PaddleRec/ctr/wide_deep/test_data/test_data.csv"
#pip install -r requirements.txt
#wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
#wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
--train_data_path ${train_data_path}\
--test_data_path ${test_data_path}
import paddle.fluid as fluid
import math
from fleetrec.core.utils import envs
from fleetrec.core.model import Model as ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def wide_part(self, data):
out = fluid.layers.fc(input=data,
size=1,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.TruncatedNormal(loc=0.0, scale=1.0 / math.sqrt(data.shape[1])),
regularizer=fluid.regularizer.L2DecayRegularizer(regularization_coeff=1e-4)),
act=None,
name='wide')
return out
def fc(self, data, hidden_units, active, tag):
output = fluid.layers.fc(input=data,
size=hidden_units,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.TruncatedNormal(loc=0.0, scale=1.0 / math.sqrt(data.shape[1]))),
act=active,
name=tag)
return output
def deep_part(self, data, hidden1_units, hidden2_units, hidden3_units):
l1 = self.fc(data, hidden1_units, 'relu', 'l1')
l2 = self.fc(l1, hidden2_units, 'relu', 'l2')
l3 = self.fc(l2, hidden3_units, 'relu', 'l3')
return l3
def train_net(self):
wide_input = fluid.data(name='wide_input', shape=[None, 8], dtype='float32')
deep_input = fluid.data(name='deep_input', shape=[None, 58], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='float32')
self._data_var.append(wide_input)
self._data_var.append(deep_input)
self._data_var.append(label)
hidden1_units = envs.get_global_env("hyper_parameters.hidden1_units", 75, self._namespace)
hidden2_units = envs.get_global_env("hyper_parameters.hidden2_units", 50, self._namespace)
hidden3_units = envs.get_global_env("hyper_parameters.hidden3_units", 25, self._namespace)
wide_output = self.wide_part(wide_input)
deep_output = self.deep_part(deep_input, hidden1_units, hidden2_units, hidden3_units)
wide_model = fluid.layers.fc(input=wide_output,
size=1,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.TruncatedNormal(loc=0.0, scale=1.0)),
act=None,
name='w_wide')
deep_model = fluid.layers.fc(input=deep_output,
size=1,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.TruncatedNormal(loc=0.0, scale=1.0)),
act=None,
name='w_deep')
prediction = fluid.layers.elementwise_add(wide_model, deep_model)
pred = fluid.layers.sigmoid(fluid.layers.clip(prediction, min=-15.0, max=15.0), name="prediction")
num_seqs = fluid.layers.create_tensor(dtype='int64')
acc = fluid.layers.accuracy(input=pred, label=fluid.layers.cast(x=label, dtype='int64'), total=num_seqs)
auc_var, batch_auc, auc_states = fluid.layers.auc(input=pred, label=fluid.layers.cast(x=label, dtype='int64'))
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc
self._metrics["ACC"] = acc
cost = fluid.layers.sigmoid_cross_entropy_with_logits(x=prediction, label=label)
avg_cost = fluid.layers.mean(cost)
self._cost = avg_cost
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self, parameter_list):
self.deepfm_net()
\ No newline at end of file
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from fleetrec.core.reader import Reader
from fleetrec.core.utils import envs
try:
import cPickle as pickle
except ImportError:
import pickle
class TrainReader(Reader):
def init(self):
pass
def _process_line(self, line):
line = line.strip().split(',')
features = list(map(float, line))
wide_feat = features[0:8]
deep_feat = features[8:58+8]
label = features[-1]
return wide_feat, deep_feat, [label]
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def data_iter():
wide_feat, deep_deat, label = self._process_line(line)
yield [('wide_input', wide_feat), ('deep_input', deep_deat), ('label', label)]
return data_iter
\ No newline at end of file
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "fleetrec.models.rank.xdeepfm"
reader:
batch_size: 2
class: "{workspace}/criteo_reader.py"
train_data_path: "{workspace}/data/train_data"
model:
models: "{workspace}/model.py"
hyper_parameters:
layer_sizes_dnn: [10, 10, 10]
layer_sizes_cin: [10, 10]
sparse_feature_number: 1086460
sparse_feature_dim: 9
num_field: 39
fc_sizes: [400, 400, 400]
learning_rate: 0.0001
reg: 0.0001
act: "relu"
optimizer: SGD
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from fleetrec.core.reader import Reader
from fleetrec.core.utils import envs
try:
import cPickle as pickle
except ImportError:
import pickle
class TrainReader(Reader):
def init(self):
pass
def _process_line(self, line):
features = line.strip('\n').split('\t')
feat_idx = []
feat_value = []
for idx in range(1, 40):
feat_idx.append(int(features[idx]))
feat_value.append(1.0)
label = [int(features[0])]
return feat_idx, feat_value, label
def generate_sample(self, line):
def data_iter():
feat_idx, feat_value, label = self._process_line(line)
yield [('feat_idx', feat_idx), ('feat_value', feat_value), ('label',
label)]
return data_iter
\ No newline at end of file
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from fleetrec.tools.tools import download_file_and_uncompress, download_file
if __name__ == '__main__':
url_train = "https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr"
url_test = "https://paddlerec.bj.bcebos.com/xdeepfm%2Fev"
train_dir = "train_data"
test_dir = "test_data"
if not os.path.exists(train_dir):
os.mkdir(train_dir)
if not os.path.exists(test_dir):
os.mkdir(test_dir)
print("download and extract starting...")
download_file(url_train, "./train_data/tr", True)
download_file(url_test, "./test_data/ev", True)
print("download and extract finished")
print("done")
import paddle.fluid as fluid
import math
from fleetrec.core.utils import envs
from fleetrec.core.model import Model as ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
def xdeepfm_net(self):
init_value_ = 0.1
initer = fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)
is_distributed = True if envs.get_trainer() == "CtrTrainer" else False
sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self._namespace)
sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self._namespace)
# ------------------------- network input --------------------------
num_field = envs.get_global_env("hyper_parameters.num_field", None, self._namespace)
raw_feat_idx = fluid.data(name='feat_idx', shape=[None, num_field], dtype='int64')
raw_feat_value = fluid.data(name='feat_value', shape=[None, num_field], dtype='float32')
self.label = fluid.data(name='label', shape=[None, 1], dtype='float32') # None * 1
feat_idx = fluid.layers.reshape(raw_feat_idx, [-1, 1]) # (None * num_field) * 1
feat_value = fluid.layers.reshape(raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
feat_embeddings = fluid.embedding(
input=feat_idx,
is_sparse=True,
dtype='float32',
size=[sparse_feature_number + 1, sparse_feature_dim],
padding_idx=0,
param_attr=fluid.ParamAttr(initializer=initer))
feat_embeddings = fluid.layers.reshape(
feat_embeddings,
[-1, num_field, sparse_feature_dim]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# ------------------------- set _data_var --------------------------
self._data_var.append(raw_feat_idx)
self._data_var.append(raw_feat_value)
self._data_var.append(self.label)
if self._platform != "LINUX":
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
# -------------------- linear --------------------
weights_linear = fluid.embedding(
input=feat_idx,
is_sparse=True,
dtype='float32',
size=[sparse_feature_number + 1, 1],
padding_idx=0,
param_attr=fluid.ParamAttr(initializer=initer))
weights_linear = fluid.layers.reshape(
weights_linear, [-1, num_field, 1]) # None * num_field * 1
b_linear = fluid.layers.create_parameter(
shape=[1],
dtype='float32',
default_initializer=fluid.initializer.ConstantInitializer(value=0))
y_linear = fluid.layers.reduce_sum(
(weights_linear * feat_value), 1) + b_linear
# -------------------- CIN --------------------
layer_sizes_cin = envs.get_global_env("hyper_parameters.layer_sizes_cin", None, self._namespace)
Xs = [feat_embeddings]
last_s = num_field
for s in layer_sizes_cin:
# calculate Z^(k+1) with X^k and X^0
X_0 = fluid.layers.reshape(
fluid.layers.transpose(Xs[0], [0, 2, 1]),
[-1, sparse_feature_dim, num_field,
1]) # None, embedding_size, num_field, 1
X_k = fluid.layers.reshape(
fluid.layers.transpose(Xs[-1], [0, 2, 1]),
[-1, sparse_feature_dim, 1, last_s]) # None, embedding_size, 1, last_s
Z_k_1 = fluid.layers.matmul(
X_0, X_k) # None, embedding_size, num_field, last_s
# compresses Z^(k+1) to X^(k+1)
Z_k_1 = fluid.layers.reshape(Z_k_1, [
-1, sparse_feature_dim, last_s * num_field
]) # None, embedding_size, last_s*num_field
Z_k_1 = fluid.layers.transpose(
Z_k_1, [0, 2, 1]) # None, s*num_field, embedding_size
Z_k_1 = fluid.layers.reshape(
Z_k_1, [-1, last_s * num_field, 1, sparse_feature_dim]
) # None, last_s*num_field, 1, embedding_size (None, channal_in, h, w)
X_k_1 = fluid.layers.conv2d(
Z_k_1,
num_filters=s,
filter_size=(1, 1),
act=None,
bias_attr=False,
param_attr=fluid.ParamAttr(
initializer=initer)) # None, s, 1, embedding_size
X_k_1 = fluid.layers.reshape(
X_k_1, [-1, s, sparse_feature_dim]) # None, s, embedding_size
Xs.append(X_k_1)
last_s = s
# sum pooling
y_cin = fluid.layers.concat(Xs[1:],
1) # None, (num_field++), embedding_size
y_cin = fluid.layers.reduce_sum(y_cin, -1) # None, (num_field++)
y_cin = fluid.layers.fc(input=y_cin,
size=1,
act=None,
param_attr=fluid.ParamAttr(initializer=initer),
bias_attr=None)
y_cin = fluid.layers.reduce_sum(y_cin, dim=-1, keep_dim=True)
# -------------------- DNN --------------------
layer_sizes_dnn = envs.get_global_env("hyper_parameters.layer_sizes_dnn", None, self._namespace)
act = envs.get_global_env("hyper_parameters.act", None, self._namespace)
y_dnn = fluid.layers.reshape(feat_embeddings,
[-1, num_field * sparse_feature_dim])
for s in layer_sizes_dnn:
y_dnn = fluid.layers.fc(input=y_dnn,
size=s,
act=act,
param_attr=fluid.ParamAttr(initializer=initer),
bias_attr=None)
y_dnn = fluid.layers.fc(input=y_dnn,
size=1,
act=None,
param_attr=fluid.ParamAttr(initializer=initer),
bias_attr=None)
# ------------------- xDeepFM ------------------
self.predict = fluid.layers.sigmoid(y_linear + y_cin + y_dnn)
def train_net(self):
self.xdeepfm_net()
cost = fluid.layers.log_loss(input=self.predict, label=self.label, epsilon=0.0000001)
batch_cost = fluid.layers.reduce_mean(cost)
self._cost = batch_cost
# for auc
predict_2d = fluid.layers.concat([1 - self.predict, self.predict], 1)
label_int = fluid.layers.cast(self.label, 'int64')
auc_var, batch_auc_var, _ = fluid.layers.auc(input=predict_2d,
label=label_int,
slide_steps=0)
self._metrics["AUC"] = auc_var
self._metrics["BATCH_AUC"] = batch_auc_var
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
def infer_net(self, parameter_list):
self.xdeepfm_net()
\ No newline at end of file
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 4
workspace: "fleetrec.models.recall.tdm"
reader:
batch_size: 32
class: "{workspace}/tdm_reader.py"
train_data_path: "{workspace}/data/train"
test_data_path: "{workspace}/data/test"
model:
models: "{workspace}/model.py"
hyper_parameters:
node_emb_size: 64
input_emb_size: 768
neg_sampling_list: [1, 2, 3, 4]
output_positive: True
topK: 1
learning_rate: 0.0001
act: "tanh"
optimizer: ADAM
tree_parameters:
max_layers: 4
node_nums: 26
leaf_node_nums: 13
layer_node_num_list: [2, 4, 7, 12]
child_nums: 2
startup:
tree:
# 单机训练建议tree只load一次,保存为paddle tensor,之后从paddle模型热启
# 分布式训练trainer需要独立load
load_tree: True
tree_layer_path: "{workspace}/tree/layer_list.txt"
tree_travel_path: "{workspace}/tree/travel_list.npy"
tree_info_path: "{workspace}/tree/tree_info.npy"
tree_emb_path: "{workspace}/tree/tree_emb.npy"
single:
load_persistables: False
persistables_model_path: ""
save_init_model: True
init_model_path: "{workspace}/init_model"
cluster:
init_model_path: "{workspace}/init_model"
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
此差异已折叠。
此差异已折叠。
此差异已折叠。
# -*- coding=utf8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import print_function
from fleetrec.core.reader import Reader
class TrainReader(Reader):
def init(self):
pass
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = (line.strip('\n')).split('\t')
input_emb = map(float, features[0].split(' '))
item_label = [int(features[1])]
feature_name = ["input_emb", "item_label"]
yield zip(feature_name, [input_emb] + [item_label])
return reader
1,2
3,4,5,6
7,8,9,10,11,12,13
14,15,16,17,18,19,20,21,22,23,24,25
\ No newline at end of file
# 快速开始
## 环境准备
Fleet-Rec是基于飞桨分布式训练所开发的,包含模型、训练模式的快速开发、调试、部署的工具, 让用户更轻松的使用飞桨分布式训练。
<p align="center">
<img align="center" src="doc/imgs/logo.png">
<p>
[![License](https://img.shields.io/badge/license-Apache%202-red.svg)](LICENSE)
[![Version](https://img.shields.io/github/v/release/PaddlePaddle/Paddle.svg)](https://github.com/PaddlePaddle/PaddleRec/releases)
PaddleRec是源于飞桨生态的搜索推荐模型一站式开箱即用工具,无论您是初学者,开发者,研究者均可便捷的使用PaddleRec完成调研,训练到预测部署的全流程工作。PaddleRec提供了搜索推荐任务中语义理解、召回、粗排、精排、多任务学习的全流程解决方案。
PadlleRec以预置模型为核心,具备以下特点:
- [易于上手,开箱即用](https://www.paddlepaddle.org.cn)
- [灵活配置,个性调参](https://www.paddlepaddle.org.cn)
- [分布式训练,大规模稀疏](https://www.paddlepaddle.org.cn)
- [快速部署,一键上线](https://www.paddlepaddle.org.cn)
<p align="center">
<img align="center" src="doc/imgs/coding-gif.png">
<p>
# 目录
* [特性](#特性)
* [支持模型列表](#支持模型列表)
* [文档教程](#文档教程)
* [入门教程](#入门教程)
* [环境要求](#环境要求)
* [安装命令](#安装命令)
* [快速开始](#快速开始)
* [常见问题FAQ](#常见问题faq)
* [进阶教程](#进阶教程)
* [自定义数据集及Reader](#自定义数据集及reader)
* [模型调参](#模型调参)
* [单机训练](#单机训练)
* [分布式训练](#分布式训练)
* [预测部署](#预测部署)
* [版本历史](#版本历史)
* [版本更新](#版本更新)
* [Benchamrk](#benchamrk)
* [许可证书](#许可证书)
* [如何贡献代码](#如何贡献代码)
* [优化PaddleRec框架](#优化paddlerec框架)
* [新增模型到PaddleRec](#新增模型到paddlerec)
# 特性
- 易于上手,开箱即用
- 灵活配置,个性调参
- 分布式训练,大规模稀疏
- 快速部署,一键上线
# 支持模型列表
| 方向 | 模型 | 单机CPU训练 | 单机GPU训练 | 分布式CPU训练 | 分布式GPU训练 | 自定义数据集 | 服务器部署 |
| :------------------: | :--------------------: | :---------: | :---------: | :-----------: | :-----------: | :----------: | :--------: |
| ContentUnderstanding | [Text-Classifcation]() | ✓ | x | ✓ | x | ✓ | ✓ |
| ContentUnderstanding | [TagSpace]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Recall | [Word2Vec]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Recall | [TDM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Rank | [CTR-Dnn]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Rank | [DeepFm]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Rerank | [ListWise]() | ✓ | x | ✓ | x | ✓ | ✓ |
| MultiTask | [MMOE]() | ✓ | x | ✓ | x | ✓ | ✓ |
| MultiTask | [ESSM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Match | [DSSM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Match | [Multiview-Simnet]() | ✓ | x | ✓ | x | ✓ | ✓ |
# 文档教程
## 入门教程
### 环境要求
* Python >= 2.7
* PaddlePaddle >= 1.7.2
* 操作系统: Windows/Mac/Linux
### 安装命令
- 安装方法一<PIP源直接安装>
```bash
python -m pip install fleet-rec
```
```bash
python -m pip install fleet-rec
```
- 安装方法二
* 安装飞桨 **注:需要用户安装最新版本的飞桨<当前只支持Linux系统>。**
* 安装飞桨 **注:需要用户安装最新版本的飞桨<当前只支持Linux系统>。**
```bash
python -m pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple
```
```bash
python -m pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple
```
* 源码安装Fleet-Rec
```
git clone https://github.com/seiriosPlus/FleetRec/
cd FleetRec
python setup.py install
```
* 源码安装Fleet-Rec
```
git clone https://github.com/seiriosPlus/FleetRec/
cd FleetRec
python setup.py install
```
## ctr-dnn示例使用
### 快速开始
#### ctr-dnn示例使用
目前框架内置了多个模型,简单的命令即可使用内置模型开始单机训练和本地1*1模拟训练
### 单机训练
##### 单机训练
```bash
cd FleetRec
......@@ -44,7 +113,7 @@ python -m fleetrec.run \
-e single
```
### 本地模拟分布式训练
##### 本地模拟分布式训练
```bash
cd FleetRec
......@@ -55,7 +124,7 @@ python -m fleetrec.run \
-e local_cluster
```
### 集群提交分布式训练<需要用户预先配置好集群环境,本提交命令不包含提交客户端>
##### 集群提交分布式训练<需要用户预先配置好集群环境,本提交命令不包含提交客户端>
```bash
cd FleetRec
......@@ -66,5 +135,21 @@ python -m fleetrec.run \
-e cluster
```
### 常见问题FAQ
更多用户文档及二次开发文档,敬请期待。
\ No newline at end of file
## 进阶教程
### 自定义数据集及Reader
### 模型调参
### 单机训练
### 分布式训练
### 预测部署
# 版本历史
## 版本更新
## Benchamrk
# 许可证书
本项目的发布受[Apache 2.0 license](LICENSE)许可认证。
# 如何贡献代码
## 优化PaddleRec框架
## 新增模型到PaddleRec
......@@ -39,7 +39,7 @@ def build(dirname):
packages = find_packages(dirname, include=('fleetrec.*'))
package_dir = {'': dirname}
package_data = {}
need_copy = ['data/*/*.txt', '*.yaml']
need_copy = ['data/*/*.txt', '*.yaml', 'tree/*.npy','tree/*.txt']
for package in packages:
if package.startswith("fleetrec.models."):
package_data[package] = need_copy
......
import os
import time
import shutil
import requests
import sys
import tarfile
import zipfile
import platform
import functools
lasttime = time.time()
FLUSH_INTERVAL = 0.1
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
def get_platform():
return platform.platform()
def is_windows():
return get_platform().lower().startswith("windows")
def progress(str, end=False):
global lasttime
if end:
str += "\n"
lasttime = 0
if time.time() - lasttime >= FLUSH_INTERVAL:
sys.stdout.write("\r%s" % str)
lasttime = time.time()
sys.stdout.flush()
def download_file(url, savepath, print_progress):
r = requests.get(url, stream=True)
total_length = r.headers.get('content-length')
if total_length is None:
with open(savepath, 'wb') as f:
shutil.copyfileobj(r.raw, f)
else:
with open(savepath, 'wb') as f:
dl = 0
total_length = int(total_length)
starttime = time.time()
if print_progress:
print("Downloading %s" % os.path.basename(savepath))
for data in r.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
if print_progress:
done = int(50 * dl / total_length)
progress("[%-50s] %.2f%%" %
('=' * done, float(100 * dl) / total_length))
if print_progress:
progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True)
def _uncompress_file(filepath, extrapath, delete_file, print_progress):
if print_progress:
print("Uncompress %s" % os.path.basename(filepath))
if filepath.endswith("zip"):
handler = _uncompress_file_zip
elif filepath.endswith("tgz"):
handler = _uncompress_file_tar
else:
handler = functools.partial(_uncompress_file_tar, mode="r")
for total_num, index, rootpath in handler(filepath, extrapath):
if print_progress:
done = int(50 * float(index) / total_num)
progress("[%-50s] %.2f%%" %
('=' * done, float(100 * index) / total_num))
if print_progress:
progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True)
if delete_file:
os.remove(filepath)
return rootpath
def _uncompress_file_zip(filepath, extrapath):
files = zipfile.ZipFile(filepath, 'r')
filelist = files.namelist()
rootpath = filelist[0]
total_num = len(filelist)
for index, file in enumerate(filelist):
files.extract(file, extrapath)
yield total_num, index, rootpath
files.close()
yield total_num, index, rootpath
def _uncompress_file_tar(filepath, extrapath, mode="r:gz"):
files = tarfile.open(filepath, mode)
filelist = files.getnames()
total_num = len(filelist)
rootpath = filelist[0]
for index, file in enumerate(filelist):
files.extract(file, extrapath)
yield total_num, index, rootpath
files.close()
yield total_num, index, rootpath
def download_file_and_uncompress(url,
savepath=None,
savename=None,
extrapath=None,
print_progress=True,
cover=False,
delete_file=False):
if savepath is None:
savepath = "."
if extrapath is None:
extrapath = "."
if savename is None:
savename = url.split("/")[-1]
savepath = os.path.join(savepath, savename)
if cover:
if os.path.exists(savepath):
shutil.rmtree(savepath)
if not os.path.exists(savepath):
download_file(url, savepath, print_progress)
_ = _uncompress_file(savepath, extrapath, delete_file, print_progress)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册