提交 91fda308 编写于 作者: F frankwhzhang
...@@ -26,6 +26,7 @@ trainers = {} ...@@ -26,6 +26,7 @@ trainers = {}
def trainer_registry(): def trainer_registry():
trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py") trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py")
trainers["SingleInfer"] = os.path.join(trainer_abs, "single_infer.py")
trainers["ClusterTrainer"] = os.path.join(trainer_abs, trainers["ClusterTrainer"] = os.path.join(trainer_abs,
"cluster_trainer.py") "cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, trainers["CtrCodingTrainer"] = os.path.join(trainer_abs,
......
...@@ -38,17 +38,37 @@ class Model(object): ...@@ -38,17 +38,37 @@ class Model(object):
self._namespace = "train.model" self._namespace = "train.model"
self._platform = envs.get_platform() self._platform = envs.get_platform()
self._init_hyper_parameters() self._init_hyper_parameters()
self._env = config
self._slot_inited = False
def _init_hyper_parameters(self): def _init_hyper_parameters(self):
pass pass
def _init_slots(self): def _init_slots(self, **kargs):
sparse_slots = envs.get_global_env("sparse_slots", None, if self._slot_inited:
"train.reader") return
dense_slots = envs.get_global_env("dense_slots", None, "train.reader") self._slot_inited = True
dataset = {}
if sparse_slots is not None or dense_slots is not None: model_dict = {}
for i in self._env["executor"]:
if i["name"] == kargs["name"]:
model_dict = i
break
for i in self._env["dataset"]:
if i["name"] == model_dict["dataset_name"]:
dataset = i
break
name = "dataset." + dataset["name"] + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots != "" or dense_slots != "":
if sparse_slots == "":
sparse_slots = []
else:
sparse_slots = sparse_slots.strip().split(" ") sparse_slots = sparse_slots.strip().split(" ")
if dense_slots == "":
dense_slots = []
else:
dense_slots = dense_slots.strip().split(" ") dense_slots = dense_slots.strip().split(" ")
dense_slots_shape = [[ dense_slots_shape = [[
int(j) for j in i.split(":")[1].strip("[]").split(",") int(j) for j in i.split(":")[1].strip("[]").split(",")
...@@ -69,14 +89,17 @@ class Model(object): ...@@ -69,14 +89,17 @@ class Model(object):
self._data_var.append(l) self._data_var.append(l)
self._sparse_data_var.append(l) self._sparse_data_var.append(l)
dataset_class = envs.get_global_env("dataset_class", None, dataset_class = dataset["type"]
"train.reader")
if dataset_class == "DataLoader": if dataset_class == "DataLoader":
self._init_dataloader() self._init_dataloader()
def _init_dataloader(self): def _init_dataloader(self, is_infer=False):
if is_infer:
data = self._infer_data_var
else:
data = self._data_var
self._data_loader = fluid.io.DataLoader.from_generator( self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, feed_list=data,
capacity=64, capacity=64,
use_double_buffer=False, use_double_buffer=False,
iterable=False) iterable=False)
...@@ -103,7 +126,7 @@ class Model(object): ...@@ -103,7 +126,7 @@ class Model(object):
def get_fetch_period(self): def get_fetch_period(self):
return self._fetch_interval return self._fetch_interval
def _build_optimizer(self, name, lr): def _build_optimizer(self, name, lr, strategy=None):
name = name.upper() name = name.upper()
optimizers = ["SGD", "ADAM", "ADAGRAD"] optimizers = ["SGD", "ADAM", "ADAGRAD"]
if name not in optimizers: if name not in optimizers:
...@@ -130,15 +153,22 @@ class Model(object): ...@@ -130,15 +153,22 @@ class Model(object):
None, self._namespace) None, self._namespace)
optimizer = envs.get_global_env("hyper_parameters.optimizer", None, optimizer = envs.get_global_env("hyper_parameters.optimizer", None,
self._namespace) self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate) return self._build_optimizer(optimizer, learning_rate)
def input_data(self, is_infer=False): def input_data(self, is_infer=False, **kwargs):
sparse_slots = envs.get_global_env("sparse_slots", None, name = "dataset." + kwargs.get("dataset_name") + "."
"train.reader") sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env("dense_slots", None, "train.reader") dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots is not None or dense_slots is not None: self._sparse_data_var_map = {}
self._dense_data_var_map = {}
if sparse_slots != "" or dense_slots != "":
if sparse_slots == "":
sparse_slots = []
else:
sparse_slots = sparse_slots.strip().split(" ") sparse_slots = sparse_slots.strip().split(" ")
if dense_slots == "":
dense_slots = []
else:
dense_slots = dense_slots.strip().split(" ") dense_slots = dense_slots.strip().split(" ")
dense_slots_shape = [[ dense_slots_shape = [[
int(j) for j in i.split(":")[1].strip("[]").split(",") int(j) for j in i.split(":")[1].strip("[]").split(",")
...@@ -153,12 +183,14 @@ class Model(object): ...@@ -153,12 +183,14 @@ class Model(object):
dtype="float32") dtype="float32")
data_var_.append(l) data_var_.append(l)
self._dense_data_var.append(l) self._dense_data_var.append(l)
self._dense_data_var_map[dense_slots[i]] = l
self._sparse_data_var = [] self._sparse_data_var = []
for name in sparse_slots: for name in sparse_slots:
l = fluid.layers.data( l = fluid.layers.data(
name=name, shape=[1], lod_level=1, dtype="int64") name=name, shape=[1], lod_level=1, dtype="int64")
data_var_.append(l) data_var_.append(l)
self._sparse_data_var.append(l) self._sparse_data_var.append(l)
self._sparse_data_var_map[name] = l
return data_var_ return data_var_
else: else:
......
...@@ -35,9 +35,6 @@ class Reader(dg.MultiSlotDataGenerator): ...@@ -35,9 +35,6 @@ class Reader(dg.MultiSlotDataGenerator):
else: else:
raise ValueError("reader config only support yaml") raise ValueError("reader config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod @abc.abstractmethod
def init(self): def init(self):
"""init""" """init"""
...@@ -58,12 +55,16 @@ class SlotReader(dg.MultiSlotDataGenerator): ...@@ -58,12 +55,16 @@ class SlotReader(dg.MultiSlotDataGenerator):
_config = yaml.load(rb.read(), Loader=yaml.FullLoader) _config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else: else:
raise ValueError("reader config only support yaml") raise ValueError("reader config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
def init(self, sparse_slots, dense_slots, padding=0): def init(self, sparse_slots, dense_slots, padding=0):
from operator import mul from operator import mul
self.sparse_slots = []
if sparse_slots.strip() != "#" and sparse_slots.strip(
) != "?" and sparse_slots.strip() != "":
self.sparse_slots = sparse_slots.strip().split(" ") self.sparse_slots = sparse_slots.strip().split(" ")
self.dense_slots = []
if dense_slots.strip() != "#" and dense_slots.strip(
) != "?" and dense_slots.strip() != "":
self.dense_slots = dense_slots.strip().split(" ") self.dense_slots = dense_slots.strip().split(" ")
self.dense_slots_shape = [ self.dense_slots_shape = [
reduce(mul, reduce(mul,
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleInfer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
device = envs.get_global_env("device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
device = envs.get_global_env("runner." + self._runner_name + ".device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
"TRAIN", self._config_yaml)
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding))
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model._infer_data_var
dataset.set_use_var(inputs)
break
return dataset
def _get_dataloader(self, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
"TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(),
" change reader to DataLoader")
type_name = "DataLoader"
padding = 0
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name)
def init(self, context):
for model_dict in self._env["phase"]:
self._model[model_dict["name"]] = [None] * 5
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env(
"hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(self._env)
model._infer_data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=True)
self._get_dataloader(dataset_name,
model._data_loader)
model.net(model._infer_data_var, True)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
self._model[model_dict["name"]][4] = train_program.clone()
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset[
"name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["phase"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(
envs.get_global_env("runner." + self._runner_name + ".epochs"))
for j in range(epochs):
for model_dict in self._env["phase"]:
if j == 0:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][0]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.load()
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][4]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.save(j)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.infer_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
program = self._model[model_name][0].clone()
fetch_vars = []
fetch_alias = []
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._model[model_name][3]._data_loader
reader.start()
batch_id = 0
scope = self._model[model_name][2]
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [batch_id]
metrics.extend(metrics_rets)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
def load(self, is_fleet=False):
name = "runner." + self._runner_name + "."
dirname = envs.get_global_env(name + "init_model_path", None)
if dirname is None or dirname == "":
return
print("single_infer going to load ", dirname)
if is_fleet:
fleet.load_persistables(self._exe, dirname)
else:
fluid.io.load_persistables(self._exe, dirname)
def save(self, epoch_id, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", None)
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", None)
if feed_varnames is None or fetch_varnames is None or feed_varnames == "":
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
def save_persistables():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
save_persistables()
save_inference_model()
...@@ -19,11 +19,13 @@ from __future__ import print_function ...@@ -19,11 +19,13 @@ from __future__ import print_function
import time import time
import logging import logging
import os
import paddle.fluid as fluid import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid") logger = logging.getLogger("fluid")
...@@ -31,105 +33,323 @@ logger.setLevel(logging.INFO) ...@@ -31,105 +33,323 @@ logger.setLevel(logging.INFO)
class SingleTrainer(TranspileTrainer): class SingleTrainer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
device = envs.get_global_env("runner." + self._runner_name + ".device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
def processor_register(self): def processor_register(self):
self.regist_context_processor('uninit', self.instance) self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init) self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup) self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
if envs.get_platform() == "LINUX" and envs.get_global_env( def instance(self, context):
"dataset_class", None, "train.reader") != "DataLoader": context['status'] = 'init_pass'
self.regist_context_processor('train_pass', self.dataset_train)
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
"TRAIN", self._config_yaml)
else: else:
self.regist_context_processor('train_pass', self.dataloader_train) if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding))
self.regist_context_processor('infer_pass', self.infer) dataset = fluid.DatasetFactory().create_dataset()
self.regist_context_processor('terminal_pass', self.terminal) dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model._data_var
dataset.set_use_var(inputs)
break
return dataset
def init(self, context): def _get_dataloader(self, dataset_name, dataloader):
self.model.train_net() name = "dataset." + dataset_name + "."
optimizer = self.model.optimizer() sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
optimizer.minimize((self.model.get_avg_cost())) dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
"TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
self.fetch_vars = [] def _create_dataset(self, dataset_name):
self.fetch_alias = [] name = "dataset." + dataset_name + "."
self.fetch_period = self.model.get_fetch_period() sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(),
" change reader to DataLoader")
type_name = "DataLoader"
padding = 0
metrics = self.model.get_metrics() if type_name == "DataLoader":
if metrics: return None
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
evaluate_only = envs.get_global_env(
'evaluate_only', False, namespace='evaluate')
if evaluate_only:
context['status'] = 'infer_pass'
else: else:
return self._get_dataset(dataset_name)
def init(self, context):
for model_dict in self._env["phase"]:
self._model[model_dict["name"]] = [None] * 5
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env(
"hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(self._env)
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=False)
self._get_dataloader(dataset_name,
model._data_loader)
model.net(model._data_var, False)
optimizer = model._build_optimizer(opt_name, opt_lr,
opt_strategy)
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
self._model[model_dict["name"]][4] = train_program.clone()
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset[
"name"])
context['status'] = 'startup_pass' context['status'] = 'startup_pass'
def startup(self, context): def startup(self, context):
self._exe.run(fluid.default_startup_program()) for model_dict in self._env["phase"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass' context['status'] = 'train_pass'
def dataloader_train(self, context): def executor_train(self, context):
reader = self._get_dataloader("TRAIN") epochs = int(
epochs = envs.get_global_env("train.epochs") envs.get_global_env("runner." + self._runner_name + ".epochs"))
for j in range(epochs):
for model_dict in self._env["phase"]:
if j == 0:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][0]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.load()
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][4]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.save(j)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
program = fluid.compiler.CompiledProgram(fluid.default_main_program( def _executor_dataset_train(self, model_dict):
)).with_data_parallel(loss_name=self.model.get_avg_cost().name) reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
program = self._model[model_name][0].clone()
program = fluid.compiler.CompiledProgram(program).with_data_parallel(
loss_name=model_class.get_avg_cost().name)
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = [] metrics_varnames = []
metrics_format = [] metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch")) metrics_format.append("{}: {{}}".format("batch"))
for name, var in metrics.items():
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name) metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name)) metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format) metrics_format = ", ".join(metrics_format)
for epoch in range(epochs): reader = self._model[model_name][3]._data_loader
reader.start() reader.start()
batch_id = 0 batch_id = 0
scope = self._model[model_name][2]
with fluid.scope_guard(scope):
try: try:
while True: while True:
metrics_rets = self._exe.run(program=program, metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames) fetch_list=metrics_varnames)
metrics = [batch_id]
metrics = [epoch, batch_id]
metrics.extend(metrics_rets) metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0: if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics)) print(metrics_format.format(*metrics))
batch_id += 1 batch_id += 1
except fluid.core.EOFException: except fluid.core.EOFException:
reader.reset() reader.reset()
self.save(epoch, "train", is_fleet=False)
context['status'] = 'infer_pass' def terminal(self, context):
context['is_exit'] = True
def dataset_train(self, context): def load(self, is_fleet=False):
dataset = self._get_dataset("TRAIN") dirname = envs.get_global_env(
ins = self._get_dataset_ins() "runner." + self._runner_name + ".init_model_path", None)
if dirname is None or dirname == "":
return
print("going to load ", dirname)
if is_fleet:
fleet.load_persistables(self._exe, dirname)
else:
fluid.io.load_persistables(self._exe, dirname)
epochs = envs.get_global_env("train.epochs") def save(self, epoch_id, is_fleet=False):
for i in range(epochs): def need_save(epoch_id, epoch_interval, is_last=False):
begin_time = time.time() if is_last:
self._exe.train_from_dataset( return True
program=fluid.default_main_program(), if epoch_id == -1:
dataset=dataset, return False
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=False) return epoch_id % epoch_interval == 0
context['status'] = 'infer_pass'
def terminal(self, context): def save_inference_model():
for model in self.increment_models: name = "runner." + self._runner_name + "."
print("epoch :{}, dir: {}".format(model[0], model[1])) save_interval = int(
context['is_exit'] = True envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", [])
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", [])
if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \
len(feed_varnames) == 0 or len(fetch_varnames) == 0:
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
def save_persistables():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
save_persistables()
save_inference_model()
...@@ -119,6 +119,10 @@ class TranspileTrainer(Trainer): ...@@ -119,6 +119,10 @@ class TranspileTrainer(Trainer):
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state, pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state,
self._config_yaml) self._config_yaml)
else: else:
if sparse_slots is None:
sparse_slots = "#"
if dense_slots is None:
dense_slots = "#"
padding = envs.get_global_env("padding", 0, namespace) padding = envs.get_global_env("padding", 0, namespace)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format( pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, namespace, \ reader, "slot", "slot", self._config_yaml, namespace, \
......
...@@ -14,13 +14,93 @@ ...@@ -14,13 +14,93 @@
from __future__ import print_function from __future__ import print_function
import os import os
from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.utils.envs import get_global_env from paddlerec.core.utils.envs import get_global_env
from paddlerec.core.utils.envs import get_runtime_environ from paddlerec.core.utils.envs import get_runtime_environ
from paddlerec.core.reader import SlotReader from paddlerec.core.reader import SlotReader
def dataloader_by_name(readerclass, dataset_name, yaml_file):
reader_class = lazy_instance_by_fliename(readerclass, "TrainReader")
name = "dataset." + dataset_name + "."
data_path = get_global_env(name + "data_path")
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
reader = reader_class(yaml_file)
reader.init()
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
def slotdataloader_by_name(readerclass, dataset_name, yaml_file):
name = "dataset." + dataset_name + "."
reader_name = "SlotReader"
data_path = get_global_env(name + "data_path")
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
sparse = get_global_env(name + "sparse_slots", "#")
if sparse == "":
sparse = "#"
dense = get_global_env(name + "dense_slots", "#")
if dense == "":
dense = "#"
padding = get_global_env(name + "padding", 0)
reader = SlotReader(yaml_file)
reader.init(sparse, dense, int(padding))
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
def dataloader(readerclass, train, yaml_file): def dataloader(readerclass, train, yaml_file):
if train == "TRAIN": if train == "TRAIN":
reader_name = "TrainReader" reader_name = "TrainReader"
...@@ -82,8 +162,12 @@ def slotdataloader(readerclass, train, yaml_file): ...@@ -82,8 +162,12 @@ def slotdataloader(readerclass, train, yaml_file):
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
sparse = get_global_env("sparse_slots", None, namespace) sparse = get_global_env("sparse_slots", "#", namespace)
dense = get_global_env("dense_slots", None, namespace) if sparse == "":
sparse = "#"
dense = get_global_env("dense_slots", "#", namespace)
if dense == "":
dense = "#"
padding = get_global_env("padding", 0, namespace) padding = get_global_env("padding", 0, namespace)
reader = SlotReader(yaml_file) reader = SlotReader(yaml_file)
reader.init(sparse, dense, int(padding)) reader.init(sparse, dense, int(padding))
......
...@@ -32,8 +32,8 @@ elif sys.argv[2].upper() == "EVALUATE": ...@@ -32,8 +32,8 @@ elif sys.argv[2].upper() == "EVALUATE":
else: else:
reader_name = "SlotReader" reader_name = "SlotReader"
namespace = sys.argv[4] namespace = sys.argv[4]
sparse_slots = sys.argv[5].replace("#", " ") sparse_slots = sys.argv[5].replace("?", " ")
dense_slots = sys.argv[6].replace("#", " ") dense_slots = sys.argv[6].replace("?", " ")
padding = int(sys.argv[7]) padding = int(sys.argv[7])
yaml_abs_path = sys.argv[3] yaml_abs_path = sys.argv[3]
......
...@@ -68,12 +68,20 @@ def set_global_envs(envs): ...@@ -68,12 +68,20 @@ def set_global_envs(envs):
nests = copy.deepcopy(namespace_nests) nests = copy.deepcopy(namespace_nests)
nests.append(k) nests.append(k)
fatten_env_namespace(nests, v) fatten_env_namespace(nests, v)
elif (k == "dataset" or k == "phase" or
k == "runner") and isinstance(v, list):
for i in v:
if i.get("name") is None:
raise ValueError("name must be in dataset list ", v)
nests = copy.deepcopy(namespace_nests)
nests.append(k)
nests.append(i["name"])
fatten_env_namespace(nests, i)
else: else:
global_k = ".".join(namespace_nests + [k]) global_k = ".".join(namespace_nests + [k])
global_envs[global_k] = v global_envs[global_k] = v
for k, v in envs.items(): fatten_env_namespace([], envs)
fatten_env_namespace([k], v)
def get_global_env(env_name, default_value=None, namespace=None): def get_global_env(env_name, default_value=None, namespace=None):
...@@ -106,7 +114,7 @@ def windows_path_converter(path): ...@@ -106,7 +114,7 @@ def windows_path_converter(path):
def update_workspace(): def update_workspace():
workspace = global_envs.get("train.workspace", None) workspace = global_envs.get("workspace")
if not workspace: if not workspace:
return return
workspace = path_adapter(workspace) workspace = path_adapter(workspace)
......
```
# 全局配置
debug: false
workspace: "."
# 用户可以配多个dataset,exector里不同阶段可以用不同的dataset
dataset:
- name: sample_1
type: DataLoader #或者QueueDataset
batch_size: 5
data_path: "{workspace}/data/train"
# 用户自定义reader
data_converter: "{workspace}/rsc15_reader.py"
- name: sample_2
type: QueueDataset #或者DataLoader
batch_size: 5
data_path: "{workspace}/data/train"
# 用户可以配置sparse_slots和dense_slots,无需再定义data_converter
sparse_slots: "click ins_weight 6001 6002 6003 6005 6006 6007 6008 6009"
dense_slots: "readlist:9"
#示例一,用户自定义参数,用于组网配置
hyper_parameters:
#优化器
optimizer:
class: Adam
learning_rate: 0.001
strategy: "{workspace}/conf/config_fleet.py"
# 用户自定义配置
vocab_size: 1000
hid_size: 100
my_key1: 233
my_key2: 0.1
mode: runner1
runner:
- name: runner1 # 示例一,train
trainer_class: single_train
epochs: 10
device: cpu
init_model_path: ""
save_checkpoint_interval: 2
save_inference_interval: 4
# 下面是保存模型路径配置
save_checkpoint_path: "xxxx"
save_inference_path: "xxxx"
- name: runner2 # 示例二,infer
trainer_class: single_train
epochs: 1
device: cpu
init_model_path: "afs:/xxx/xxx"
phase:
- name: phase1
model: "{workspace}/model.py"
dataset_name: sample_1
thread_num: 1
```
...@@ -12,39 +12,72 @@ ...@@ -12,39 +12,72 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
train: # workspace
epochs: 10 workspace: "paddlerec.models.rank.dnn"
engine: single
workspace: "paddlerec.models.rank.dnn"
trainer:
# for cluster training
strategy: "async"
reader: # list of dataset
dataset:
- name: dataset_train # name of dataset to distinguish different datasets
batch_size: 2
type: DataLoader # or QueueDataset
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
- name: dataset_infer # name
batch_size: 2 batch_size: 2
train_data_path: "{workspace}/data/sample_data/train" type: DataLoader # or QueueDataset
reader_debug_mode: False data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13" dense_slots: "dense_var:13"
model: # hyper parameters of user-defined network
models: "{workspace}/model.py" hyper_parameters:
hyper_parameters: # optimizer config
optimizer:
class: Adam
learning_rate: 0.001
strategy: async
# user-defined <key, value> pairs
sparse_inputs_slots: 27 sparse_inputs_slots: 27
sparse_feature_number: 1000001 sparse_feature_number: 1000001
sparse_feature_dim: 9 sparse_feature_dim: 9
dense_input_dim: 13 dense_input_dim: 13
fc_sizes: [512, 256, 128, 32] fc_sizes: [512, 256, 128, 32]
learning_rate: 0.001
optimizer: adam
save: # select runner by name
increment: mode: runner1
dirname: "increment" # config of each runner.
epoch_interval: 2 # runner is a kind of paddle training class, which wraps the train/infer process.
save_last: True runner:
inference: - name: runner1
dirname: "inference" class: single_train
epoch_interval: 4 # num of epochs
save_last: True epochs: 10
# device to run training or infer
device: cpu
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
- name: runner2
class: single_infer
# num of epochs
epochs: 10
# device to run training or infer
device: cpu
init_model_path: "increment/0" # load model path
# runner will run all the phase in each epoch
phase:
- name: phase1
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_train # select dataset by name
thread_num: 1
#- name: phase2
# model: "{workspace}/model.py" # user-defined model
# dataset_name: dataset_infer # select dataset by name
# thread_num: 1
...@@ -28,15 +28,15 @@ class Model(ModelBase): ...@@ -28,15 +28,15 @@ class Model(ModelBase):
self.is_distributed = True if envs.get_trainer( self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False ) == "CtrTrainer" else False
self.sparse_feature_number = envs.get_global_env( self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None, self._namespace) "hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env( self.sparse_feature_dim = envs.get_global_env(
"hyper_parameters.sparse_feature_dim", None, self._namespace) "hyper_parameters.sparse_feature_dim")
self.learning_rate = envs.get_global_env( self.learning_rate = envs.get_global_env(
"hyper_parameters.learning_rate", None, self._namespace) "hyper_parameters.learning_rate")
def net(self, input, is_infer=False): def net(self, input, is_infer=False):
self.sparse_inputs = self._sparse_data_var[1:] self.sparse_inputs = self._sparse_data_var[1:]
self.dense_input = self._dense_data_var[0] self.dense_input = [] #self._dense_data_var[0]
self.label_input = self._sparse_data_var[0] self.label_input = self._sparse_data_var[0]
def embedding_layer(input): def embedding_layer(input):
...@@ -52,12 +52,11 @@ class Model(ModelBase): ...@@ -52,12 +52,11 @@ class Model(ModelBase):
return emb_sum return emb_sum
sparse_embed_seq = list(map(embedding_layer, self.sparse_inputs)) sparse_embed_seq = list(map(embedding_layer, self.sparse_inputs))
concated = fluid.layers.concat( concated = fluid.layers.concat(sparse_embed_seq, axis=1)
sparse_embed_seq + [self.dense_input], axis=1) #sparse_embed_seq + [self.dense_input], axis=1)
fcs = [concated] fcs = [concated]
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None, hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes")
self._namespace)
for size in hidden_layers: for size in hidden_layers:
output = fluid.layers.fc( output = fluid.layers.fc(
...@@ -78,16 +77,21 @@ class Model(ModelBase): ...@@ -78,16 +77,21 @@ class Model(ModelBase):
self.predict = predict self.predict = predict
cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_mean(cost)
self._cost = avg_cost
auc, batch_auc, _ = fluid.layers.auc(input=self.predict, auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
label=self.label_input, label=self.label_input,
num_thresholds=2**12, num_thresholds=2**12,
slide_steps=20) slide_steps=20)
if is_infer:
self._infer_results["AUC"] = auc
self._infer_results["BATCH_AUC"] = batch_auc
return
self._metrics["AUC"] = auc self._metrics["AUC"] = auc
self._metrics["BATCH_AUC"] = batch_auc self._metrics["BATCH_AUC"] = batch_auc
cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_mean(cost)
self._cost = avg_cost
def optimizer(self): def optimizer(self):
optimizer = fluid.optimizer.Adam(self.learning_rate, lazy_mode=True) optimizer = fluid.optimizer.Adam(self.learning_rate, lazy_mode=True)
......
...@@ -18,7 +18,7 @@ import subprocess ...@@ -18,7 +18,7 @@ import subprocess
import argparse import argparse
import tempfile import tempfile
import yaml import yaml
import copy
from paddlerec.core.factory import TrainerFactory from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
from paddlerec.core.utils import util from paddlerec.core.utils import util
...@@ -27,8 +27,8 @@ engines = {} ...@@ -27,8 +27,8 @@ engines = {}
device = ["CPU", "GPU"] device = ["CPU", "GPU"]
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"] clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
engine_choices = [ engine_choices = [
"SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER", "SINGLE_TRAIN", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE",
"TDM_CLUSTER" "TDM_LOCAL_CLUSTER", "TDM_CLUSTER", "SINGLE_INFER"
] ]
custom_model = ['TDM'] custom_model = ['TDM']
model_name = "" model_name = ""
...@@ -38,35 +38,73 @@ def engine_registry(): ...@@ -38,35 +38,73 @@ def engine_registry():
engines["TRANSPILER"] = {} engines["TRANSPILER"] = {}
engines["PSLIB"] = {} engines["PSLIB"] = {}
engines["TRANSPILER"]["SINGLE"] = single_engine engines["TRANSPILER"]["SINGLE_TRAIN"] = single_train_engine
engines["TRANSPILER"]["SINGLE_INFER"] = single_infer_engine
engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
engines["TRANSPILER"]["CLUSTER"] = cluster_engine engines["TRANSPILER"]["CLUSTER"] = cluster_engine
engines["PSLIB"]["SINGLE"] = local_mpi_engine engines["PSLIB"]["SINGLE"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
def get_inters_from_yaml(file, filter): def get_inters_from_yaml(file, filters):
with open(file, 'r') as rb: with open(file, 'r') as rb:
_envs = yaml.load(rb.read(), Loader=yaml.FullLoader) _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
flattens = envs.flatten_environs(_envs) flattens = envs.flatten_environs(_envs)
inters = {} inters = {}
for k, v in flattens.items(): for k, v in flattens.items():
if k.startswith(filter): for f in filters:
if k.startswith(f):
inters[k] = v inters[k] = v
return inters return inters
def get_all_inters_from_yaml(file, filters):
with open(file, 'r') as rb:
_envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
all_flattens = {}
def fatten_env_namespace(namespace_nests, local_envs):
for k, v in local_envs.items():
if isinstance(v, dict):
nests = copy.deepcopy(namespace_nests)
nests.append(k)
fatten_env_namespace(nests, v)
elif (k == "dataset" or k == "phase" or
k == "runner") and isinstance(v, list):
for i in v:
if i.get("name") is None:
raise ValueError("name must be in dataset list ", v)
nests = copy.deepcopy(namespace_nests)
nests.append(k)
nests.append(i["name"])
fatten_env_namespace(nests, i)
else:
global_k = ".".join(namespace_nests + [k])
all_flattens[global_k] = v
fatten_env_namespace([], _envs)
ret = {}
for k, v in all_flattens.items():
for f in filters:
if k.startswith(f):
ret[k] = v
return ret
def get_engine(args): def get_engine(args):
transpiler = get_transpiler() transpiler = get_transpiler()
run_extras = get_inters_from_yaml(args.model, "train.") with open(args.model, 'r') as rb:
envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
engine = run_extras.get("train.engine", "single") run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
engine = run_extras.get("train.engine", None)
if engine is None:
engine = run_extras.get("runner." + envs["mode"] + ".class", None)
if engine is None:
engine = "single_train"
engine = engine.upper() engine = engine.upper()
if engine not in engine_choices: if engine not in engine_choices:
raise ValueError("train.engin can not be chosen in {}".format( raise ValueError("train.engin can not be chosen in {}".format(
engine_choices)) engine_choices))
...@@ -117,15 +155,27 @@ def get_trainer_prefix(args): ...@@ -117,15 +155,27 @@ def get_trainer_prefix(args):
return "" return ""
def single_engine(args): def single_train_engine(args):
trainer = get_trainer_prefix(args) + "SingleTrainer" trainer = get_trainer_prefix(args) + "SingleTrainer"
single_envs = {} single_envs = {}
single_envs["train.trainer.trainer"] = trainer single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2" single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single" single_envs["train.trainer.engine"] = "single_train"
single_envs["train.trainer.platform"] = envs.get_platform() single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model)) print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
def single_infer_engine(args):
trainer = get_trainer_prefix(args) + "SingleInfer"
single_envs = {}
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single_infer"
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model) set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model) trainer = TrainerFactory.create(args.model)
return trainer return trainer
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册