提交 43d49e3f 编写于 作者: X xjqbest

test

上级 e4683727
......@@ -38,6 +38,7 @@ class Model(object):
self._namespace = "train.model"
self._platform = envs.get_platform()
self._init_hyper_parameters()
self._env = config
def _init_hyper_parameters(self):
pass
......@@ -103,7 +104,7 @@ class Model(object):
def get_fetch_period(self):
return self._fetch_interval
def _build_optimizer(self, name, lr):
def _build_optimizer(self, name, lr, strategy=None):
name = name.upper()
optimizers = ["SGD", "ADAM", "ADAGRAD"]
if name not in optimizers:
......@@ -133,10 +134,18 @@ class Model(object):
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate)
def input_data(self, is_infer=False):
sparse_slots = envs.get_global_env("sparse_slots", None,
"train.reader")
dense_slots = envs.get_global_env("dense_slots", None, "train.reader")
def input_data(self, is_infer=False, dataset_name=None, program=None):
dataset = {}
for i in self._env["dataset"]:
if i["name"] == dataset_name:
dataset = i
break
sparse_slots = dataset.get("sparse_slots", None)
#sparse_slots =
#envs.get_global_env("sparse_slots", None,
# "train.reader")
#dense_slots = envs.get_global_env("dense_slots", None, "train.reader")
dense_slots = dataset.get("dense_slots", None)
if sparse_slots is not None or dense_slots is not None:
sparse_slots = sparse_slots.strip().split(" ")
dense_slots = dense_slots.strip().split(" ")
......@@ -159,6 +168,8 @@ class Model(object):
name=name, shape=[1], lod_level=1, dtype="int64")
data_var_.append(l)
self._sparse_data_var.append(l)
print(self._dense_data_var)
print(self._sparse_data_var)
return data_var_
else:
......
......@@ -58,8 +58,8 @@ class SlotReader(dg.MultiSlotDataGenerator):
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
#envs.set_global_envs(_config)
#envs.update_workspace()
def init(self, sparse_slots, dense_slots, padding=0):
from operator import mul
......
......@@ -19,11 +19,12 @@ from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
......@@ -31,47 +32,253 @@ logger.setLevel(logging.INFO)
class SingleTrainer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config#envs.get_global_envs()
#device = envs.get_global_env("train.device", "cpu")
device = self._env["device"]
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
#self.inference_models = []
#self.increment_models = []
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env(
"dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
#if envs.get_platform() == "LINUX" and envs.get_global_env(
# "dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.executor_train)
# if envs.get_platform() == "LINUX" and envs.get_global_env(
# ""
# self.regist_context_processor('train_pass', self.dataset_train)
# else:
# self.regist_context_processor('train_pass', self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
#self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer.minimize((self.model.get_avg_cost()))
def instance(self, context):
context['status'] = 'init_pass'
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
def dataloader_train(self, context):
pass
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
evaluate_only = envs.get_global_env(
'evaluate_only', False, namespace='evaluate')
if evaluate_only:
context['status'] = 'infer_pass'
def dataset_train(self, context):
pass
#def _get_optmizer(self, cost):
# if self._env["hyper_parameters"]["optimizer"]["class"] == "Adam":
def _create_dataset(self, dataset_name):
config_dict = None
for i in self._env["dataset"]:
if i["name"] == dataset_name:
config_dict = i
break
#reader_ins = SlotReader(self._config_yaml)
sparse_slots = config_dict["sparse_slots"]
dense_slots = config_dict["dense_slots"]
padding = 0
reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py"
#reader = "{workspace}/paddlerec/core/utils/dataset_instance.py".replace("{workspace}", envs.path_adapter(self._env["workspace"]))
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
if config_dict["type"] == "QueueDataset":
dataset = fluid.DatasetFactory().create_dataset(config_dict["type"])
dataset.set_batch_size(config_dict["batch_size"])
#dataset.set_thread(config_dict["thread_num"])
#dataset.set_hdfs_config(config_dict["data_fs_name"], config_dict["data_fs_ugi"])
dataset.set_pipe_command(pipe_cmd)
train_data_path = config_dict["data_path"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["executor"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model.get_inputs()
dataset.set_use_var(inputs)
break
else:
pass
return dataset
def init(self, context):
#self.model.train_net()
for model_dict in self._env["executor"]:
self._model[model_dict["name"]] = [None] * 4
# self._model[model_dict["name"]][0] = fluid.Program() #train_program
# self._model[model_dict["name"]][1] = fluid.Program() #startup_program
# self._model[model_dict["name"]][2] = fluid.Scope() #scope
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
opt_name = self._env["hyper_parameters"]["optimizer"]["class"]
opt_lr = self._env["hyper_parameters"]["optimizer"]["learning_rate"]
opt_strategy = self._env["hyper_parameters"]["optimizer"]["strategy"]
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env)
model._data_var = model.input_data(dataset_name=model_dict["dataset_name"])
model.net(None)####
optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy)
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
for dataset in self._env["dataset"]:
self._dataset[dataset["name"]] = self._create_dataset(dataset["name"])
# self.fetch_vars = []
# self.fetch_alias = []
# self.fetch_period = self.model.get_fetch_period()
# metrics = self.model.get_metrics()
# if metrics:
# self.fetch_vars = metrics.values()
# self.fetch_alias = metrics.keys()
#evaluate_only = envs.get_global_env(
# 'evaluate_only', False, namespace='evaluate')
#if evaluate_only:
# context['status'] = 'infer_pass'
#else:
context['status'] = 'startup_pass'
def startup(self, context):
self._exe.run(fluid.default_startup_program())
for model_dict in self._env["executor"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(self._env["epochs"])
for j in range(epochs):
for model_dict in self._env["executor"]:
reader_name = model_dict["dataset_name"]
#print(self._dataset)
#print(reader_name)
dataset = None
for i in self._env["dataset"]:
if i["name"] == reader_name:
dataset = i
break
if dataset["type"] == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
print("epoch %s done" % j)
# self._model[model_dict["name"]][1] = fluid.compiler.CompiledProgram(
# self._model[model_dict["name"]][1]).with_data_parallel(loss_name=self._model.get_avg_cost().name)
# fetch_vars = []
# fetch_alias = []
# fetch_period = self._model.get_fetch_period()
# metrics = self._model.get_metrics()
# if metrics:
# fetch_vars = metrics.values()
# fetch_alias = metrics.keys()
# metrics_varnames = []
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
# dataset = self._get_dataset("TRAIN")
# ins = self._get_dataset_ins()
# epochs = envs.get_global_env("train.epochs")
# for i in range(epochs):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = 1#model_class.get_fetch_period()
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][1]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
begin_time = time.time()
self._exe.train_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
end_time = time.time()
times = end_time - begin_time
#print("epoch {} using time {}".format(i, times))
#print("epoch {} using time {}, speed {:.2f} lines/s".format(
# i, times, ins / times))
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model][3]
self._model[model_name][1] = fluid.compiler.CompiledProgram(
self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name)
fetch_vars = []
fetch_alias = []
fetch_period = self._model.get_fetch_period()
metrics = self._model.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in model_class.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._dataset["reader_name"]
reader.start()
batch_id = 0
scope = self._model[model_name][3]
prorgram = self._model[model_name][1]
with fluid.scope_guard(self._model[model_name][3]):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def dataloader_train(self, context):
reader = self._get_dataloader("TRAIN")
epochs = envs.get_global_env("train.epochs")
exit(-1)
reader = self._get_dataloader(self._env["TRAIN"])
epochs = self._env["epochs"]
program = fluid.compiler.CompiledProgram(fluid.default_main_program(
)).with_data_parallel(loss_name=self.model.get_avg_cost().name)
......@@ -130,6 +337,6 @@ class SingleTrainer(TranspileTrainer):
context['status'] = 'infer_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
#for model in self.increment_models:
# print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
......@@ -20,6 +20,7 @@ import sys
global_envs = {}
global_envs_raw = {}
def flatten_environs(envs, separator="."):
flatten_dict = {}
......@@ -62,6 +63,10 @@ def get_trainer():
def set_global_envs(envs):
assert isinstance(envs, dict)
global_envs_raw = envs
return
def fatten_env_namespace(namespace_nests, local_envs):
for k, v in local_envs.items():
if isinstance(v, dict):
......
......@@ -12,39 +12,48 @@
# See the License for the specific language governing permissions and
# limitations under the License.
train:
epochs: 10
engine: single
workspace: "paddlerec.models.rank.dnn"
debug: false
cold_start: true
epochs: 10
device: cpu
workspace: "paddlerec.models.rank.dnn"
trainer:
# for cluster training
strategy: "async"
dataset:
#- name: dataset_1
# batch_size: 2
# type: DataLoader
# data_path: "{workspace}/data/sample_data/train"
# sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
# dense_slots: "dense_var:13"
reader:
- name: dataset_2
batch_size: 2
train_data_path: "{workspace}/data/sample_data/train"
reader_debug_mode: False
type: QueueDataset
data_path: "{workspace}/data/sample_data/train"
# data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
model:
models: "{workspace}/model.py"
hyper_parameters:
hyper_parameters:
optimizer:
class: Adam
learning_rate: 0.001
strategy: async
sparse_inputs_slots: 27
sparse_feature_number: 1000001
sparse_feature_dim: 9
dense_input_dim: 13
fc_sizes: [512, 256, 128, 32]
learning_rate: 0.001
optimizer: adam
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
epoch:
trainer_class: Single
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
save_inference_path: "inference"
executor:
- name: train
model: "{workspace}/model.py"
dataset_name: dataset_2
thread_num: 1
......@@ -27,12 +27,12 @@ class Model(ModelBase):
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None, self._namespace)
self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim", None, self._namespace)
self.learning_rate = envs.get_global_env(
"hyper_parameters.learning_rate", None, self._namespace)
self.sparse_feature_number = 1000001 #envs.get_global_env(
#"hyper_parameters.sparse_feature_number", None, self._namespace)
self.sparse_feature_dim = 9#envs.get_global_env(
#"hyper_parameters.sparse_feature_dim", None, self._namespace)
self.learning_rate = 0.001#envs.get_global_env(
#"hyper_parameters.learning_rate", None, self._namespace)
def net(self, input, is_infer=False):
self.sparse_inputs = self._sparse_data_var[1:]
......@@ -56,8 +56,8 @@ class Model(ModelBase):
sparse_embed_seq + [self.dense_input], axis=1)
fcs = [concated]
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None,
self._namespace)
hidden_layers = [512, 256, 128, 32]#envs.get_global_env("hyper_parameters.fc_sizes", None,
# self._namespace)
for size in hidden_layers:
output = fluid.layers.fc(
......@@ -82,6 +82,8 @@ class Model(ModelBase):
input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_mean(cost)
self._cost = avg_cost
auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
label=self.label_input,
num_thresholds=2**12,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册