提交 51534ec6 编写于 作者: X xjqbest

fix

上级 7b9849ac
......@@ -19,12 +19,11 @@ from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
......@@ -32,210 +31,66 @@ logger.setLevel(logging.INFO)
class SingleTrainer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config#envs.get_global_envs()
#device = envs.get_global_env("train.device", "cpu")
device = envs.get_global_env("device")#self._env["device"]
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
#self.inference_models = []
#self.increment_models = []
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
#if envs.get_platform() == "LINUX" and envs.get_global_env(
# "dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.executor_train)
# if envs.get_platform() == "LINUX" and envs.get_global_env(
# ""
# self.regist_context_processor('train_pass', self.dataset_train)
# else:
# self.regist_context_processor('train_pass', self.dataloader_train)
if envs.get_platform() == "LINUX" and envs.get_global_env(
"dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
#self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer.minimize((self.model.get_avg_cost()))
def dataloader_train(self, context):
pass
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
def dataset_train(self, context):
pass
#def _get_optmizer(self, cost):
# if self._env["hyper_parameters"]["optimizer"]["class"] == "Adam":
def _create_dataset(self, dataset_name):
#config_dict = envs.get_global_env("dataset." + dataset_name)
#for i in self._env["dataset"]:
# if i["name"] == dataset_name:
# config_dict = i
# break
#reader_ins = SlotReader(self._config_yaml)
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")#config_dict.get("sparse_slots")#config_dict["sparse_slots"]
dense_slots = envs.get_global_env(name + "dense_slots")#config_dict.get("dense_slots")#config_dict["dense_slots"]
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_type = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), " change reader to DataLoader")
reader_type = "DataLoader"
padding = 0
reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py"
#reader = "{workspace}/paddlerec/core/utils/dataset_instance.py".replace("{workspace}", envs.path_adapter(self._env["workspace"]))
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
#print(config_dict["type"])
type_name = envs.get_global_env(name + "type")
if type_name == "QueueDataset":
#if config_dict["type"] == "QueueDataset":
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
#dataset.set_thread(config_dict["thread_num"])
#dataset.set_hdfs_config(config_dict["data_fs_name"], config_dict["data_fs_ugi"])
dataset.set_pipe_command(pipe_cmd)
#print(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
#config_dict["data_path"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
#print(file_list)
dataset.set_filelist(file_list)
for model_dict in self._env["executor"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model.get_inputs()
dataset.set_use_var(inputs)
break
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
evaluate_only = envs.get_global_env(
'evaluate_only', False, namespace='evaluate')
if evaluate_only:
context['status'] = 'infer_pass'
else:
pass
return dataset
def init(self, context):
#for model_dict in self._env["executor"]:
for model_dict in self._env["executor"]:
self._model[model_dict["name"]] = [None] * 4
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env)
model._data_var = model.input_data(dataset_name=model_dict["dataset_name"])
model.net(None)
optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy)
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
for dataset in self._env["dataset"]:
self._dataset[dataset["name"]] = self._create_dataset(dataset["name"])
context['status'] = 'startup_pass'
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["executor"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
self._exe.run(fluid.default_startup_program())
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(self._env["epochs"])
for j in range(epochs):
for model_dict in self._env["executor"]:
reader_name = model_dict["dataset_name"]
#dataset = envs.get_global_env("dataset." + reader_name)
name = "dataset." + reader_name + "."
begin_time = time.time()
#if dataset["type"] == "DataLoader":
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model][3]
self._model[model_name][1] = fluid.compiler.CompiledProgram(
self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name)
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
def dataloader_train(self, context):
reader = self._get_dataloader("TRAIN")
epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram(fluid.default_main_program(
)).with_data_parallel(loss_name=self.model.get_avg_cost().name)
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in model_class.items():
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._dataset[reader_name]
reader.start()
batch_id = 0
scope = self._model[model_name][2]
prorgram = self._model[model_name][0]
with fluid.scope_guard(scope):
for epoch in range(epochs):
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(program=program,
......@@ -249,6 +104,32 @@ class SingleTrainer(TranspileTrainer):
batch_id += 1
except fluid.core.EOFException:
reader.reset()
self.save(epoch, "train", is_fleet=False)
context['status'] = 'infer_pass'
def dataset_train(self, context):
dataset = self._get_dataset("TRAIN")
ins = self._get_dataset_ins()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
begin_time = time.time()
self._exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=False)
context['status'] = 'infer_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
......@@ -53,8 +53,7 @@ class Model(ModelBase):
sparse_embed_seq + [self.dense_input], axis=1)
fcs = [concated]
hidden_layers = [512, 256, 128, 32]#envs.get_global_env("hyper_parameters.fc_sizes", None,
# self._namespace)
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes")
for size in hidden_layers:
output = fluid.layers.fc(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册