提交 03dfc413 编写于 作者: X xjqbest

fix

上级 e8d99992
...@@ -36,10 +36,6 @@ def trainer_registry(): ...@@ -36,10 +36,6 @@ def trainer_registry():
"tdm_single_trainer.py") "tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(trainer_abs, trainers["TDMClusterTrainer"] = os.path.join(trainer_abs,
"tdm_cluster_trainer.py") "tdm_cluster_trainer.py")
trainers["SingleTrainerYamlOpt"] = os.path.join(trainer_abs,
"single_trainer_yamlopt.py")
trainers["SingleAucYamlOpt"] = os.path.join(trainer_abs,
"single_auc_yamlopt.py")
trainer_registry() trainer_registry()
......
...@@ -39,15 +39,32 @@ class Model(object): ...@@ -39,15 +39,32 @@ class Model(object):
self._platform = envs.get_platform() self._platform = envs.get_platform()
self._init_hyper_parameters() self._init_hyper_parameters()
self._env = config self._env = config
self._slot_inited = False
def _init_hyper_parameters(self): def _init_hyper_parameters(self):
pass pass
def _init_slots(self): def _init_slots(self, **kargs):
sparse_slots = envs.get_global_env("sparse_slots", None, if self._slot_inited:
"train.reader") return
dense_slots = envs.get_global_env("dense_slots", None, "train.reader") self._slot_inited = True
dataset = {}
model_dict = {}#self._env["executor"]#[kargs["name"]]
for i in self._env["executor"]:
if i["name"] == kargs["name"]:
model_dict = i
break
for i in self._env["dataset"]:
if i["name"] == model_dict["dataset_name"]:
dataset = i
break
name = "dataset." + dataset["name"] + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")#"sparse_slots", None,
#"train.reader")
dense_slots = envs.get_global_env(name + "dense_slots")
#"dense_slots", None, "train.reader")
#print(sparse_slots)
#print(dense_slots)
if sparse_slots is not None or dense_slots is not None: if sparse_slots is not None or dense_slots is not None:
sparse_slots = sparse_slots.strip().split(" ") sparse_slots = sparse_slots.strip().split(" ")
dense_slots = dense_slots.strip().split(" ") dense_slots = dense_slots.strip().split(" ")
...@@ -70,12 +87,13 @@ class Model(object): ...@@ -70,12 +87,13 @@ class Model(object):
self._data_var.append(l) self._data_var.append(l)
self._sparse_data_var.append(l) self._sparse_data_var.append(l)
dataset_class = envs.get_global_env("dataset_class", None, #dataset_class = dataset["type"]#envs.get_global_env("dataset_class", None,
"train.reader") # "train.reader")
if dataset_class == "DataLoader": #if dataset_class == "DataLoader":
self._init_dataloader() # self._init_dataloader()
def _init_dataloader(self): def _init_dataloader(self):
#print(self._data_var)
self._data_loader = fluid.io.DataLoader.from_generator( self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, feed_list=self._data_var,
capacity=64, capacity=64,
...@@ -131,7 +149,6 @@ class Model(object): ...@@ -131,7 +149,6 @@ class Model(object):
None, self._namespace) None, self._namespace)
optimizer = envs.get_global_env("hyper_parameters.optimizer", None, optimizer = envs.get_global_env("hyper_parameters.optimizer", None,
self._namespace) self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate) return self._build_optimizer(optimizer, learning_rate)
def input_data(self, is_infer=False, **kwargs): def input_data(self, is_infer=False, **kwargs):
......
...@@ -35,8 +35,6 @@ class Reader(dg.MultiSlotDataGenerator): ...@@ -35,8 +35,6 @@ class Reader(dg.MultiSlotDataGenerator):
else: else:
raise ValueError("reader config only support yaml") raise ValueError("reader config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod @abc.abstractmethod
def init(self): def init(self):
...@@ -63,8 +61,12 @@ class SlotReader(dg.MultiSlotDataGenerator): ...@@ -63,8 +61,12 @@ class SlotReader(dg.MultiSlotDataGenerator):
def init(self, sparse_slots, dense_slots, padding=0): def init(self, sparse_slots, dense_slots, padding=0):
from operator import mul from operator import mul
self.sparse_slots = sparse_slots.strip().split(" ") self.sparse_slots = []
self.dense_slots = dense_slots.strip().split(" ") if sparse_slots.strip() != "#":
self.sparse_slots = sparse_slots.strip().split(" ")
self.dense_slots = []
if dense_slots.strip() != "#":
self.dense_slots = dense_slots.strip().split(" ")
self.dense_slots_shape = [ self.dense_slots_shape = [
reduce(mul, reduce(mul,
[int(j) for j in i.split(":")[1].strip("[]").split(",")]) [int(j) for j in i.split(":")[1].strip("[]").split(",")])
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleAucYamlOpt(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
device = envs.get_global_env("device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_type = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), " change reader to DataLoader")
reader_type = "DataLoader"
padding = 0
reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py"
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
type_name = envs.get_global_env(name + "type")
if type_name == "QueueDataset":
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["executor"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model.get_inputs()
dataset.set_use_var(inputs)
break
else:
pass
return dataset
def init(self, context):
for model_dict in self._env["executor"]:
self._model[model_dict["name"]] = [None] * 4
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env)
model._data_var = model.input_data(dataset_name=model_dict["dataset_name"])
model.net(None, is_infer=True)
optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy)
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
for dataset in self._env["dataset"]:
self._dataset[dataset["name"]] = self._create_dataset(dataset["name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["executor"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(self._env["epochs"])
for j in range(epochs):
for model_dict in self._env["executor"]:
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
self._model[model_name][1] = fluid.compiler.CompiledProgram(
self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name)
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in model_class.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._dataset[reader_name]
reader.start()
batch_id = 0
scope = self._model[model_name][2]
prorgram = self._model[model_name][0]
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
...@@ -19,83 +19,263 @@ from __future__ import print_function ...@@ -19,83 +19,263 @@ from __future__ import print_function
import time import time
import logging import logging
import os
import paddle.fluid as fluid import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid") logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class SingleTrainer(TranspileTrainer): class SingleTrainerYamlOpt(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
device = envs.get_global_env("device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
def processor_register(self): def processor_register(self):
self.regist_context_processor('uninit', self.instance) self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init) self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup) self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env("data_convertor")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
if sparse_slots is None and dense_slots is None:
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "fake", self._config_yaml)
if envs.get_platform() == "LINUX" and envs.get_global_env(
"dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else: else:
self.regist_context_processor('train_pass', self.dataloader_train) if sparse_slots is None:
sparse_slots = "#"
if dense_slots is None:
dense_slots = "#"
padding = envs.get_global_env(name +"padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["executor"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model.get_inputs()
dataset.set_use_var(inputs)
break
return dataset
self.regist_context_processor('infer_pass', self.infer) def _get_dataloader(self, dataset_name):
self.regist_context_processor('terminal_pass', self.terminal) name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env("data_convertor")
abs_dir = os.path.dirname(os.path.abspath(__file__))
#reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
if sparse_slots is None and dense_slots is None:
#reader_class = envs.get_global_env("class")
reader = dataloader_instance.dataloader_by_name(reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class, "TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name("", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer.minimize((self.model.get_avg_cost()))
self.fetch_vars = [] def _create_dataset(self, dataset_name):
self.fetch_alias = [] name = "dataset." + dataset_name + "."
self.fetch_period = self.model.get_fetch_period() sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), " change reader to DataLoader")
type_name = "DataLoader"
padding = 0
metrics = self.model.get_metrics() if type_name == "DataLoader":
if metrics: return None#self._get_dataloader(dataset_name)
self.fetch_vars = metrics.values() else:
self.fetch_alias = metrics.keys() return self._get_dataset(dataset_name)
evaluate_only = envs.get_global_env(
'evaluate_only', False, namespace='evaluate')
if evaluate_only: reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py"
context['status'] = 'infer_pass' pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
if type_name == "QueueDataset":
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["executor"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model.get_inputs()
dataset.set_use_var(inputs)
break
else: else:
context['status'] = 'startup_pass' pass
return dataset
def init(self, context):
for model_dict in self._env["executor"]:
self._model[model_dict["name"]] = [None] * 4
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env)
model._data_var = model.input_data(dataset_name=model_dict["dataset_name"])
#model._init_slots(name=model_dict["name"])
if envs.get_global_env("dataset." + dataset_name + ".type") == "DataLoader":
model._init_dataloader()
model.net(model._data_var)
optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy)
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset["name"])
context['status'] = 'startup_pass'
def startup(self, context): def startup(self, context):
self._exe.run(fluid.default_startup_program()) for model_dict in self._env["executor"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass' context['status'] = 'train_pass'
def dataloader_train(self, context): def executor_train(self, context):
reader = self._get_dataloader("TRAIN") epochs = int(self._env["epochs"])
epochs = envs.get_global_env("train.epochs") for j in range(epochs):
for model_dict in self._env["executor"]:
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
program = fluid.compiler.CompiledProgram(fluid.default_main_program( def _executor_dataset_train(self, model_dict):
)).with_data_parallel(loss_name=self.model.get_avg_cost().name) reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
self._model[model_name][0] = fluid.compiler.CompiledProgram(
self._model[model_name][0]).with_data_parallel(loss_name=model_class.get_avg_cost().name)
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = [] metrics_varnames = []
metrics_format = [] metrics_format = []
metrics_format.append("{}: {{}}".format("epoch")) metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch")) metrics_format.append("{}: {{}}".format("batch"))
for name, var in model_class.get_metrics().items():
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name) metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name)) metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format) metrics_format = ", ".join(metrics_format)
for epoch in range(epochs): reader = self._model[model_name][3]._data_loader
reader.start() reader.start()
batch_id = 0 batch_id = 0
scope = self._model[model_name][2]
program = self._model[model_name][0]
#print(metrics_varnames)
with fluid.scope_guard(scope):
try: try:
while True: while True:
metrics_rets = self._exe.run(program=program, metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames) fetch_list=metrics_varnames)
metrics = [epoch, batch_id] metrics = [epoch, batch_id]
metrics.extend(metrics_rets) metrics.extend(metrics_rets)
...@@ -104,32 +284,6 @@ class SingleTrainer(TranspileTrainer): ...@@ -104,32 +284,6 @@ class SingleTrainer(TranspileTrainer):
batch_id += 1 batch_id += 1
except fluid.core.EOFException: except fluid.core.EOFException:
reader.reset() reader.reset()
self.save(epoch, "train", is_fleet=False)
context['status'] = 'infer_pass'
def dataset_train(self, context):
dataset = self._get_dataset("TRAIN")
ins = self._get_dataset_ins()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
begin_time = time.time()
self._exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=False)
context['status'] = 'infer_pass'
def terminal(self, context): def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleTrainerYamlOpt(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
device = envs.get_global_env("device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_type = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), " change reader to DataLoader")
reader_type = "DataLoader"
padding = 0
reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py"
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
type_name = envs.get_global_env(name + "type")
if type_name == "QueueDataset":
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["executor"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model.get_inputs()
dataset.set_use_var(inputs)
break
else:
pass
return dataset
def init(self, context):
for model_dict in self._env["executor"]:
self._model[model_dict["name"]] = [None] * 4
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env)
model._data_var = model.input_data(dataset_name=model_dict["dataset_name"])
model.net(None)
optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy)
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
for dataset in self._env["dataset"]:
self._dataset[dataset["name"]] = self._create_dataset(dataset["name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["executor"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(self._env["epochs"])
for j in range(epochs):
for model_dict in self._env["executor"]:
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
self._model[model_name][1] = fluid.compiler.CompiledProgram(
self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name)
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in model_class.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._dataset[reader_name]
reader.start()
batch_id = 0
scope = self._model[model_name][2]
prorgram = self._model[model_name][0]
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
...@@ -119,6 +119,10 @@ class TranspileTrainer(Trainer): ...@@ -119,6 +119,10 @@ class TranspileTrainer(Trainer):
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state, pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state,
self._config_yaml) self._config_yaml)
else: else:
if sparse_slots is None:
sparse_slots = "#"
if dense_slots is None:
dense_slots = "#"
padding = envs.get_global_env("padding", 0, namespace) padding = envs.get_global_env("padding", 0, namespace)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format( pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, namespace, \ reader, "slot", "slot", self._config_yaml, namespace, \
......
...@@ -14,12 +14,102 @@ ...@@ -14,12 +14,102 @@
from __future__ import print_function from __future__ import print_function
import os import os
from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.utils.envs import get_global_env from paddlerec.core.utils.envs import get_global_env
from paddlerec.core.utils.envs import get_runtime_environ from paddlerec.core.utils.envs import get_runtime_environ
from paddlerec.core.reader import SlotReader from paddlerec.core.reader import SlotReader
def dataloader_by_name(readerclass, dataset_name, yaml_file):
reader_class = lazy_instance_by_fliename(readerclass, "TrainReader")
name = "dataset." + dataset_name + "."
data_path = get_global_env(name + "data_path")
#else:
# reader_name = "SlotReader"
# namespace = "evaluate.reader"
# data_path = get_global_env("test_data_path", None, namespace)
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
reader = reader_class(yaml_file)
reader.init()
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
def slotdataloader_by_name(readerclass, dataset_name, yaml_file):
name = "dataset." + dataset_name + "."
#if train == "TRAIN":
reader_name = "SlotReader"
# namespace = "train.reader"
print(name)
data_path = get_global_env(name + "data_path")
#else:
# reader_name = "SlotReader"
# namespace = "evaluate.reader"
# data_path = get_global_env("test_data_path", None, namespace)
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
#sparse = get_global_env("sparse_slots", None, namespace)
#dense = get_global_env("dense_slots", None, namespace)
#padding = get_global_env("padding", 0, namespace)
#name = "dataset." + dataset_name + "."
sparse = get_global_env(name + "sparse_slots")
dense = get_global_env(name + "dense_slots")
padding = get_global_env(name + "padding", 0)
reader = SlotReader(yaml_file)
reader.init(sparse, dense, int(padding))
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
def dataloader(readerclass, train, yaml_file): def dataloader(readerclass, train, yaml_file):
if train == "TRAIN": if train == "TRAIN":
......
...@@ -80,8 +80,6 @@ def set_global_envs(envs): ...@@ -80,8 +80,6 @@ def set_global_envs(envs):
global_envs[global_k] = v global_envs[global_k] = v
fatten_env_namespace([], envs) fatten_env_namespace([], envs)
for i in global_envs:
print i,":",global_envs[i]
def get_global_env(env_name, default_value=None, namespace=None): def get_global_env(env_name, default_value=None, namespace=None):
""" """
......
...@@ -21,7 +21,8 @@ workspace: "paddlerec.models.rank.dnn" ...@@ -21,7 +21,8 @@ workspace: "paddlerec.models.rank.dnn"
dataset: dataset:
- name: dataset_2 - name: dataset_2
batch_size: 2 batch_size: 2
type: QueueDataset #type: QueueDataset
type: DataLoader
data_path: "{workspace}/data/sample_data/train" data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13" dense_slots: "dense_var:13"
...@@ -38,8 +39,8 @@ hyper_parameters: ...@@ -38,8 +39,8 @@ hyper_parameters:
fc_sizes: [512, 256, 128, 32] fc_sizes: [512, 256, 128, 32]
epoch: epoch:
trainer_class: single_yamlopt name:
#trainer_class: single_auc_yamlopt trainer_class: single
save_checkpoint_interval: 2 save_checkpoint_interval: 2
save_inference_interval: 4 save_inference_interval: 4
save_checkpoint_path: "increment" save_checkpoint_path: "increment"
......
...@@ -28,7 +28,7 @@ device = ["CPU", "GPU"] ...@@ -28,7 +28,7 @@ device = ["CPU", "GPU"]
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"] clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
engine_choices = [ engine_choices = [
"SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER", "SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER",
"TDM_CLUSTER", "SINGLE_YAMLOPT", "SINGLE_AUC_YAMLOPT" "TDM_CLUSTER"
] ]
custom_model = ['TDM'] custom_model = ['TDM']
model_name = "" model_name = ""
...@@ -41,10 +41,6 @@ def engine_registry(): ...@@ -41,10 +41,6 @@ def engine_registry():
engines["TRANSPILER"]["SINGLE"] = single_engine engines["TRANSPILER"]["SINGLE"] = single_engine
engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
engines["TRANSPILER"]["CLUSTER"] = cluster_engine engines["TRANSPILER"]["CLUSTER"] = cluster_engine
engines["TRANSPILER"]["SINGLE_YAMLOPT"] = single_yamlopt_engine
engines["TRANSPILER"]["SINGLE_AUC_YAMLOPT"] = single_auc_yamlopt_engine
engines["PSLIB"]["SINGLE"] = local_mpi_engine engines["PSLIB"]["SINGLE"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
...@@ -139,32 +135,6 @@ def single_engine(args): ...@@ -139,32 +135,6 @@ def single_engine(args):
trainer = TrainerFactory.create(args.model) trainer = TrainerFactory.create(args.model)
return trainer return trainer
def single_yamlopt_engine(args):
trainer = get_trainer_prefix(args) + "SingleTrainerYamlOpt"
single_envs = {}
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single_yamlopt"
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
def single_auc_yamlopt_engine(args):
trainer = get_trainer_prefix(args) + "SingleAucYamlOpt"
single_envs = {}
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single_yamlopt"
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
def cluster_engine(args): def cluster_engine(args):
def update_workspace(cluster_envs): def update_workspace(cluster_envs):
workspace = cluster_envs.get("engine_workspace", None) workspace = cluster_envs.get("engine_workspace", None)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册