提交 6b8c051f 编写于 作者: C chengmo

Merge branch 'chengmo_dev' into 'develop'

Merge Doc & TDM

See merge request !4
......@@ -19,15 +19,24 @@ import yaml
from fleetrec.core.utils import envs
trainer_abs = os.path.join(os.path.dirname(os.path.abspath(__file__)), "trainers")
trainer_abs = os.path.join(os.path.dirname(
os.path.abspath(__file__)), "trainers")
trainers = {}
def trainer_registry():
trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(trainer_abs, "cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, "ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(trainer_abs, "ctr_modul_trainer.py")
trainers["SingleTrainer"] = os.path.join(
trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(
trainer_abs, "cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(
trainer_abs, "ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(
trainer_abs, "ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(
trainer_abs, "tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(
trainer_abs, "tdm_cluster_trainer.py")
trainer_registry()
......@@ -46,7 +55,8 @@ class TrainerFactory(object):
if trainer_abs is None:
if not os.path.isfile(train_mode):
raise IOError("trainer {} can not be recognized".format(train_mode))
raise IOError(
"trainer {} can not be recognized".format(train_mode))
trainer_abs = train_mode
train_mode = "UserDefineTrainer"
......
......@@ -34,6 +34,12 @@ class Model(object):
"""
return self._metrics
def custom_preprocess(self):
"""
do something after exe.run(stratup_program) and before run()
"""
pass
def get_fetch_period(self):
return self._fetch_interval
......@@ -41,24 +47,30 @@ class Model(object):
name = name.upper()
optimizers = ["SGD", "ADAM", "ADAGRAD"]
if name not in optimizers:
raise ValueError("configured optimizer can only supported SGD/Adam/Adagrad")
raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001, self._namespace)
optimizer_i = fluid.optimizer.SGD(lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
reg = envs.get_global_env(
"hyper_parameters.reg", 0.0001, self._namespace)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
elif name == "ADAGRAD":
optimizer_i = fluid.optimizer.Adagrad(lr)
else:
raise ValueError("configured optimizer can only supported SGD/Adam/Adagrad")
raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
return optimizer_i
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace)
optimizer = envs.get_global_env("hyper_parameters.optimizer", None, self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" %learning_rate)
learning_rate = envs.get_global_env(
"hyper_parameters.learning_rate", None, self._namespace)
optimizer = envs.get_global_env(
"hyper_parameters.optimizer", None, self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate)
@abc.abstractmethod
......
......@@ -95,5 +95,6 @@ def user_define_engine(engine_yaml):
train_dirname = os.path.dirname(train_location)
base_name = os.path.splitext(os.path.basename(train_location))[0]
sys.path.append(train_dirname)
trainer_class = envs.lazy_instance_by_fliename(base_name, "UserDefineTraining")
trainer_class = envs.lazy_instance_by_fliename(
base_name, "UserDefineTraining")
return trainer_class
......@@ -18,6 +18,7 @@ Training use fluid with one node only.
from __future__ import print_function
import os
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
......@@ -43,7 +44,8 @@ class ClusterTrainer(TranspileTrainer):
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
self.regist_context_processor(
'train_pass', self.dataloader_train)
self.regist_context_processor('terminal_pass', self.terminal)
def build_strategy(self):
......@@ -70,6 +72,11 @@ class ClusterTrainer(TranspileTrainer):
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer_name = envs.get_global_env(
"hyper_parameters.optimizer", None, "train.model")
if optimizer_name not in ["", "sgd", "SGD", "Sgd"]:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_cost_op())
......@@ -85,16 +92,18 @@ class ClusterTrainer(TranspileTrainer):
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'train_pass'
context['status'] = 'startup_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['is_exit'] = True
def dataloader_train(self, context):
def startup(self, context):
self._exe.run(fleet.startup_program)
context['status'] = 'train_pass'
def dataloader_train(self, context):
fleet.init_worker()
reader = self._get_dataloader()
......@@ -103,8 +112,8 @@ class ClusterTrainer(TranspileTrainer):
program = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel(
loss_name=self.model.get_cost_op().name,
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
metrics_varnames = []
metrics_format = []
......@@ -140,7 +149,6 @@ class ClusterTrainer(TranspileTrainer):
context['status'] = 'terminal_pass'
def dataset_train(self, context):
self._exe.run(fleet.startup_program)
fleet.init_worker()
dataset = self._get_dataset()
......
......@@ -55,10 +55,13 @@ class SingleTrainer(TranspileTrainer):
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def startup(self, context):
self._exe.run(fluid.default_startup_program())
context['status'] = 'train_pass'
def dataloader_train(self, context):
self._exe.run(fluid.default_startup_program())
reader = self._get_dataloader()
epochs = envs.get_global_env("train.epochs")
......@@ -99,8 +102,6 @@ class SingleTrainer(TranspileTrainer):
context['status'] = 'infer_pass'
def dataset_train(self, context):
# run startup program at once
self._exe.run(fluid.default_startup_program())
dataset = self._get_dataset()
epochs = envs.get_global_env("train.epochs")
......
# -*- coding=utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from fleetrec.core.utils import envs
from fleetrec.core.trainers.cluster_trainer import ClusterTrainer
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"]
class TDMClusterTrainer(ClusterTrainer):
def server(self, context):
namespace = "train.startup"
init_model_path = envs.get_global_env(
"cluster.init_model_path", "", namespace)
assert init_model_path != "", "Cluster train must has init_model for TDM"
fleet.init_server(init_model_path)
logger.info("TDM: load model from {}".format(init_model_path))
fleet.run_server()
context['is_exit'] = True
def startup(self, context):
self._exe.run(fleet.startup_program)
namespace = "train.startup"
load_tree = envs.get_global_env(
"tree.load_tree", True, namespace)
self.tree_layer_path = envs.get_global_env(
"tree.tree_layer_path", "", namespace)
self.tree_travel_path = envs.get_global_env(
"tree.tree_travel_path", "", namespace)
self.tree_info_path = envs.get_global_env(
"tree.tree_info_path", "", namespace)
save_init_model = envs.get_global_env(
"cluster.save_init_model", False, namespace)
init_model_path = envs.get_global_env(
"cluster.init_model_path", "", namespace)
if load_tree:
# 将明文树结构及数据,set到组网中的Variale中
# 不使用NumpyInitialize方法是考虑到树结构相关数据size过大,有性能风险
for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
param_array = self.tdm_prepare(param_name)
param_t.set(param_array.astype('int32'), self._place)
if save_init_model:
logger.info("Begin Save Init model.")
fluid.io.save_persistables(
executor=self._exe, dirname=init_model_path)
logger.info("End Save Init model.")
context['status'] = 'train_pass'
def tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self.tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self.tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self.tdm_info_prepare()
return info_array
else:
raise " {} is not a special tdm param name".format(param_name)
def tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(
travel_array.shape[0]))
return travel_array
def tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
# -*- coding=utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import paddle.fluid as fluid
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
from fleetrec.core.trainers.single_trainer import SingleTrainer
from fleetrec.core.utils import envs
import numpy as np
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer",
"TDM_Tree_Info", "TDM_Tree_Emb"]
class TDMSingleTrainer(SingleTrainer):
def startup(self, context):
namespace = "train.startup"
load_persistables = envs.get_global_env(
"single.load_persistables", False, namespace)
persistables_model_path = envs.get_global_env(
"single.persistables_model_path", "", namespace)
load_tree = envs.get_global_env(
"tree.load_tree", False, namespace)
self.tree_layer_path = envs.get_global_env(
"tree.tree_layer_path", "", namespace)
self.tree_travel_path = envs.get_global_env(
"tree.tree_travel_path", "", namespace)
self.tree_info_path = envs.get_global_env(
"tree.tree_info_path", "", namespace)
self.tree_emb_path = envs.get_global_env(
"tree.tree_emb_path", "", namespace)
save_init_model = envs.get_global_env(
"single.save_init_model", False, namespace)
init_model_path = envs.get_global_env(
"single.init_model_path", "", namespace)
self._exe.run(fluid.default_startup_program())
if load_persistables:
# 从paddle二进制模型加载参数
fluid.io.load_persistables(
executor=self._exe,
dirname=persistables_model_path,
main_program=fluid.default_main_program())
logger.info("Load persistables from \"{}\"".format(
persistables_model_path))
if load_tree:
# 将明文树结构及数据,set到组网中的Variale中
# 不使用NumpyInitialize方法是考虑到树结构相关数据size过大,有性能风险
for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
param_array = self.tdm_prepare(param_name)
if param_name == 'TDM_Tree_Emb':
param_t.set(param_array.astype('float32'), self._place)
else:
param_t.set(param_array.astype('int32'), self._place)
if save_init_model:
logger.info("Begin Save Init model.")
fluid.io.save_persistables(
executor=self._exe, dirname=init_model_path)
logger.info("End Save Init model.")
context['status'] = 'train_pass'
def tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self.tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self.tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self.tdm_info_prepare()
return info_array
elif param_name == "TDM_Tree_Emb":
emb_array = self.tdm_emb_prepare()
return emb_array
else:
raise " {} is not a special tdm param name".format(param_name)
def tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(
travel_array.shape[0]))
return travel_array
def tdm_emb_prepare(self):
"""load tdm tree param from npy/list file"""
emb_array = np.load(self.tree_emb_path)
logger.info("TDM Tree node nums from emb: {}".format(
emb_array.shape[0]))
return emb_array
def tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
......@@ -10,6 +10,8 @@ from fleetrec.core.utils import util
engines = {}
device = ["CPU", "GPU"]
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
custom_model = ['tdm']
model_name = ""
def engine_registry():
......@@ -28,13 +30,17 @@ def engine_registry():
engines["GPU"] = gpu
def get_engine(engine, device):
def get_engine(args):
device = args.device
d_engine = engines[device]
transpiler = get_transpiler()
engine = args.engine
run_engine = d_engine[transpiler].get(engine, None)
if run_engine is None:
raise ValueError("engine {} can not be supported on device: {}".format(engine, device))
raise ValueError(
"engine {} can not be supported on device: {}".format(engine, device))
return run_engine
......@@ -77,15 +83,21 @@ def set_runtime_envs(cluster_envs, engine_yaml):
print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
def single_engine(args):
print("use single engine to run model: {}".format(args.model))
def get_trainer_prefix(args):
if model_name in custom_model:
return model_name.upper()
return ""
def single_engine(args):
trainer = get_trainer_prefix(args) + "SingleTrainer"
single_envs = {}
single_envs["train.trainer.trainer"] = "SingleTrainer"
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single"
single_envs["train.trainer.device"] = args.device
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
......@@ -93,16 +105,15 @@ def single_engine(args):
def cluster_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model))
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.device"] = args.device
cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
......@@ -122,15 +133,15 @@ def cluster_mpi_engine(args):
def local_cluster_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model))
from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
cluster_envs["server_num"] = 1
cluster_envs["worker_num"] = 1
cluster_envs["start_port"] = 36001
cluster_envs["log_dir"] = "logs"
cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.strategy"] = "async"
cluster_envs["train.trainer.threads"] = "2"
cluster_envs["train.trainer.engine"] = "local_cluster"
......@@ -139,9 +150,9 @@ def local_cluster_engine(args):
cluster_envs["train.trainer.platform"] = envs.get_platform()
cluster_envs["CPU_NUM"] = "2"
print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
launch = LocalClusterEngine(cluster_envs, args.model)
return launch
......@@ -184,8 +195,11 @@ def get_abs_model(model):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='fleet-rec run')
parser.add_argument("-m", "--model", type=str)
parser.add_argument("-e", "--engine", type=str, choices=["single", "local_cluster", "cluster"])
parser.add_argument("-d", "--device", type=str, choices=["cpu", "gpu"], default="cpu")
parser.add_argument("-e", "--engine", type=str,
choices=["single", "local_cluster", "cluster",
"tdm_single", "tdm_local_cluster", "tdm_cluster"])
parser.add_argument("-d", "--device", type=str,
choices=["cpu", "gpu"], default="cpu")
abs_dir = os.path.dirname(os.path.abspath(__file__))
envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})
......@@ -193,10 +207,11 @@ if __name__ == "__main__":
args = parser.parse_args()
args.engine = args.engine.upper()
args.device = args.device.upper()
model_name = args.model.split('.')[-1]
args.model = get_abs_model(args.model)
engine_registry()
which_engine = get_engine(args.engine, args.device)
which_engine = get_engine(args)
engine = which_engine(args)
engine.run()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 4
workspace: "fleetrec.models.recall.tdm"
reader:
batch_size: 32
class: "{workspace}/tdm_reader.py"
train_data_path: "{workspace}/data/train"
test_data_path: "{workspace}/data/test"
model:
models: "{workspace}/model.py"
hyper_parameters:
node_emb_size: 64
input_emb_size: 768
neg_sampling_list: [1, 2, 3, 4]
output_positive: True
topK: 1
learning_rate: 0.0001
act: "tanh"
optimizer: ADAM
tree_parameters:
max_layers: 4
node_nums: 26
leaf_node_nums: 13
layer_node_num_list: [2, 4, 7, 12]
child_nums: 2
startup:
tree:
# 单机训练建议tree只load一次,保存为paddle tensor,之后从paddle模型热启
# 分布式训练trainer需要独立load
load_tree: True
tree_layer_path: "{workspace}/tree/layer_list.txt"
tree_travel_path: "{workspace}/tree/travel_list.npy"
tree_info_path: "{workspace}/tree/tree_info.npy"
tree_emb_path: "{workspace}/tree/tree_emb.npy"
single:
load_persistables: False
persistables_model_path: ""
save_init_model: True
init_model_path: "{workspace}/init_model"
cluster:
init_model_path: "{workspace}/init_model"
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
此差异已折叠。
此差异已折叠。
此差异已折叠。
# -*- coding=utf8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import print_function
from fleetrec.core.reader import Reader
class TrainReader(Reader):
def init(self):
pass
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = (line.strip('\n')).split('\t')
input_emb = map(float, features[0].split(' '))
item_label = [int(features[1])]
feature_name = ["input_emb", "item_label"]
yield zip(feature_name, [input_emb] + [item_label])
return reader
1,2
3,4,5,6
7,8,9,10,11,12,13
14,15,16,17,18,19,20,21,22,23,24,25
\ No newline at end of file
# 快速开始
## 环境准备
Fleet-Rec是基于飞桨分布式训练所开发的,包含模型、训练模式的快速开发、调试、部署的工具, 让用户更轻松的使用飞桨分布式训练。
<p align="center">
<img align="center" src="doc/imgs/logo.png">
<p>
[![License](https://img.shields.io/badge/license-Apache%202-red.svg)](LICENSE)
[![Version](https://img.shields.io/github/v/release/PaddlePaddle/Paddle.svg)](https://github.com/PaddlePaddle/PaddleRec/releases)
PaddleRec是源于飞桨生态的搜索推荐模型一站式开箱即用工具,无论您是初学者,开发者,研究者均可便捷的使用PaddleRec完成调研,训练到预测部署的全流程工作。PaddleRec提供了搜索推荐任务中语义理解、召回、粗排、精排、多任务学习的全流程解决方案。
PadlleRec以预置模型为核心,具备以下特点:
- [易于上手,开箱即用](https://www.paddlepaddle.org.cn)
- [灵活配置,个性调参](https://www.paddlepaddle.org.cn)
- [分布式训练,大规模稀疏](https://www.paddlepaddle.org.cn)
- [快速部署,一键上线](https://www.paddlepaddle.org.cn)
<p align="center">
<img align="center" src="doc/imgs/coding-gif.png">
<p>
# 目录
* [特性](#特性)
* [支持模型列表](#支持模型列表)
* [文档教程](#文档教程)
* [入门教程](#入门教程)
* [环境要求](#环境要求)
* [安装命令](#安装命令)
* [快速开始](#快速开始)
* [常见问题FAQ](#常见问题faq)
* [进阶教程](#进阶教程)
* [自定义数据集及Reader](#自定义数据集及reader)
* [模型调参](#模型调参)
* [单机训练](#单机训练)
* [分布式训练](#分布式训练)
* [预测部署](#预测部署)
* [版本历史](#版本历史)
* [版本更新](#版本更新)
* [Benchamrk](#benchamrk)
* [许可证书](#许可证书)
* [如何贡献代码](#如何贡献代码)
* [优化PaddleRec框架](#优化paddlerec框架)
* [新增模型到PaddleRec](#新增模型到paddlerec)
# 特性
- 易于上手,开箱即用
- 灵活配置,个性调参
- 分布式训练,大规模稀疏
- 快速部署,一键上线
# 支持模型列表
| 方向 | 模型 | 单机CPU训练 | 单机GPU训练 | 分布式CPU训练 | 分布式GPU训练 | 自定义数据集 | 服务器部署 |
| :------------------: | :--------------------: | :---------: | :---------: | :-----------: | :-----------: | :----------: | :--------: |
| ContentUnderstanding | [Text-Classifcation]() | ✓ | x | ✓ | x | ✓ | ✓ |
| ContentUnderstanding | [TagSpace]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Recall | [Word2Vec]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Recall | [TDM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Rank | [CTR-Dnn]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Rank | [DeepFm]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Rerank | [ListWise]() | ✓ | x | ✓ | x | ✓ | ✓ |
| MultiTask | [MMOE]() | ✓ | x | ✓ | x | ✓ | ✓ |
| MultiTask | [ESSM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Match | [DSSM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| Match | [Multiview-Simnet]() | ✓ | x | ✓ | x | ✓ | ✓ |
# 文档教程
## 入门教程
### 环境要求
* Python >= 2.7
* PaddlePaddle >= 1.7.2
* 操作系统: Windows/Mac/Linux
### 安装命令
- 安装方法一<PIP源直接安装>
```bash
python -m pip install fleet-rec
```
```bash
python -m pip install fleet-rec
```
- 安装方法二
* 安装飞桨 **注:需要用户安装最新版本的飞桨<当前只支持Linux系统>。**
* 安装飞桨 **注:需要用户安装最新版本的飞桨<当前只支持Linux系统>。**
```bash
python -m pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple
```
```bash
python -m pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple
```
* 源码安装Fleet-Rec
```
git clone https://github.com/seiriosPlus/FleetRec/
cd FleetRec
python setup.py install
```
* 源码安装Fleet-Rec
```
git clone https://github.com/seiriosPlus/FleetRec/
cd FleetRec
python setup.py install
```
## ctr-dnn示例使用
### 快速开始
#### ctr-dnn示例使用
目前框架内置了多个模型,简单的命令即可使用内置模型开始单机训练和本地1*1模拟训练
### 单机训练
##### 单机训练
```bash
cd FleetRec
......@@ -44,7 +113,7 @@ python -m fleetrec.run \
-e single
```
### 本地模拟分布式训练
##### 本地模拟分布式训练
```bash
cd FleetRec
......@@ -55,7 +124,7 @@ python -m fleetrec.run \
-e local_cluster
```
### 集群提交分布式训练<需要用户预先配置好集群环境,本提交命令不包含提交客户端>
##### 集群提交分布式训练<需要用户预先配置好集群环境,本提交命令不包含提交客户端>
```bash
cd FleetRec
......@@ -66,5 +135,21 @@ python -m fleetrec.run \
-e cluster
```
### 常见问题FAQ
更多用户文档及二次开发文档,敬请期待。
\ No newline at end of file
## 进阶教程
### 自定义数据集及Reader
### 模型调参
### 单机训练
### 分布式训练
### 预测部署
# 版本历史
## 版本更新
## Benchamrk
# 许可证书
本项目的发布受[Apache 2.0 license](LICENSE)许可认证。
# 如何贡献代码
## 优化PaddleRec框架
## 新增模型到PaddleRec
......@@ -39,7 +39,7 @@ def build(dirname):
packages = find_packages(dirname, include=('fleetrec.*'))
package_dir = {'': dirname}
package_data = {}
need_copy = ['data/*/*.txt', '*.yaml']
need_copy = ['data/*/*.txt', '*.yaml', 'tree/*.npy','tree/*.txt']
for package in packages:
if package.startswith("fleetrec.models."):
package_data[package] = need_copy
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册