提交 7b9849ac 编写于 作者: X xjqbest

fix

上级 f385e9ce
......@@ -36,7 +36,8 @@ def trainer_registry():
"tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(trainer_abs,
"tdm_cluster_trainer.py")
trainers["SingleTrainerYamlOpt"] = os.path.join(trainer_abs,
"single_trainer_yamlopt.py")
trainer_registry()
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleTrainerYamlOpt(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
device = envs.get_global_env("device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def dataloader_train(self, context):
pass
def dataset_train(self, context):
pass
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_type = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), " change reader to DataLoader")
reader_type = "DataLoader"
padding = 0
reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py"
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
type_name = envs.get_global_env(name + "type")
if type_name == "QueueDataset":
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["executor"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model.get_inputs()
dataset.set_use_var(inputs)
break
else:
pass
return dataset
def init(self, context):
for model_dict in self._env["executor"]:
self._model[model_dict["name"]] = [None] * 4
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env)
model._data_var = model.input_data(dataset_name=model_dict["dataset_name"])
model.net(None)
optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy)
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
for dataset in self._env["dataset"]:
self._dataset[dataset["name"]] = self._create_dataset(dataset["name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["executor"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(self._env["epochs"])
for j in range(epochs):
for model_dict in self._env["executor"]:
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
self._model[model_name][1] = fluid.compiler.CompiledProgram(
self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name)
fetch_vars = []
fetch_alias = []
fetch_period = 20
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in model_class.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._dataset[reader_name]
reader.start()
batch_id = 0
scope = self._model[model_name][2]
prorgram = self._model[model_name][0]
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
......@@ -94,30 +94,24 @@ class TranspileTrainer(Trainer):
count += 1
return count
#def _get_dataset(self, state="TRAIN"):
#if state == "TRAIN":
# inputs = self.model.get_inputs()
# namespace = "train.reader"
# train_data_path = envs.get_global_env("train_data_path", None,
# namespace)
#else:
# inputs = self.model.get_infer_inputs()
# namespace = "evaluate.reader"
# train_data_path = envs.get_global_env("test_data_path", None,
# namespace)
def _get_dataset(self, dataset_name):
namespace = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(namespace + "sparse_slots")
dense_slots = envs.get_global_env(namespace + "dense_slots")
thread_num = envs.get_global_env(namespace + "thread_num")
#threads = int(envs.get_runtime_environ("train.trainer.threads"))
#batch_size = envs.get_global_env("batch_size", None, namespace)
batch_size = envs.get_global_env(namespace + "batch_size")
reader_type = envs.get_global_env(namespace + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), " change reader to DataLoader")
reader_type = "DataLoader"
reader_class = envs.get_global_env(namespace + "data_converter")
def _get_dataset(self, state="TRAIN"):
if state == "TRAIN":
inputs = self.model.get_inputs()
namespace = "train.reader"
train_data_path = envs.get_global_env("train_data_path", None,
namespace)
else:
inputs = self.model.get_infer_inputs()
namespace = "evaluate.reader"
train_data_path = envs.get_global_env("test_data_path", None,
namespace)
sparse_slots = envs.get_global_env("sparse_slots", None, namespace)
dense_slots = envs.get_global_env("dense_slots", None, namespace)
threads = int(envs.get_runtime_environ("train.trainer.threads"))
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
......
......@@ -20,8 +20,6 @@ import sys
global_envs = {}
#global_envs_raw = {}
def flatten_environs(envs, separator="."):
flatten_dict = {}
assert isinstance(envs, dict)
......@@ -63,22 +61,13 @@ def get_trainer():
def set_global_envs(envs):
assert isinstance(envs, dict)
# namespace_nests = []
#print(envs)
def fatten_env_namespace(namespace_nests, local_envs):
# if not isinstance(local_envs, dict):
# global_k = ".".join(namespace_nests)
# global_envs[global_k] = local_envs
# return
for k, v in local_envs.items():
#print(k)
if isinstance(v, dict):
nests = copy.deepcopy(namespace_nests)
nests.append(k)
fatten_env_namespace(nests, v)
elif (k == "dataset" or k == "executor") and isinstance(v, list):
#print("=======================")
#print([i for i in v])
for i in v:
if i.get("name") is None:
raise ValueError("name must be in dataset list ", v)
......@@ -86,18 +75,10 @@ def set_global_envs(envs):
nests.append(k)
nests.append(i["name"])
fatten_env_namespace(nests, i)
#global_k = ".".join(namespace_nests + [k, i["name"]])
#global_envs[global_k] = i
#print([i for i in v])
#global_k = ".".join(namespace_nests + [k])
#global_envs[global_k] = v
else:
global_k = ".".join(namespace_nests + [k])
global_envs[global_k] = v
#for k, v in envs.items():
# fatten_env_namespace([k], v)
fatten_env_namespace([], envs)
for i in global_envs:
print i,":",global_envs[i]
......
......@@ -46,7 +46,7 @@ hyper_parameters:
fc_sizes: [512, 256, 128, 32]
epoch:
trainer_class: Single
trainer_class: single_yamlopt
save_checkpoint_interval: 2
save_inference_interval: 4
save_checkpoint_path: "increment"
......
......@@ -27,12 +27,9 @@ class Model(ModelBase):
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.sparse_feature_number = 1000001 #envs.get_global_env(
#"hyper_parameters.sparse_feature_number", None, self._namespace)
self.sparse_feature_dim = 9#envs.get_global_env(
#"hyper_parameters.sparse_feature_dim", None, self._namespace)
self.learning_rate = 0.001#envs.get_global_env(
#"hyper_parameters.learning_rate", None, self._namespace)
self.sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim")
self.learning_rate = envs.get_global_env("hyper_parameters.learning_rate")
def net(self, input, is_infer=False):
self.sparse_inputs = self._sparse_data_var[1:]
......
......@@ -28,7 +28,7 @@ device = ["CPU", "GPU"]
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
engine_choices = [
"SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER",
"TDM_CLUSTER"
"TDM_CLUSTER", "SINGLE_YAMLOPT"
]
custom_model = ['TDM']
model_name = ""
......@@ -42,12 +42,14 @@ def engine_registry():
engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
engines["TRANSPILER"]["CLUSTER"] = cluster_engine
engines["TRANSPILER"]["SINGLE_YAMLOPT"] = single_yamlopt_engine
engines["PSLIB"]["SINGLE"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
def get_inters_from_yaml(file, filter):
def get_inters_from_yaml(file, filters):
with open(file, 'r') as rb:
_envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
......@@ -55,16 +57,22 @@ def get_inters_from_yaml(file, filter):
inters = {}
for k, v in flattens.items():
if k.startswith(filter):
inters[k] = v
for f in filters:
if k.startswith(f):
inters[k] = v
return inters
def get_engine(args):
transpiler = get_transpiler()
run_extras = get_inters_from_yaml(args.model, "train.")
engine = run_extras.get("train.engine", "single")
run_extras = get_inters_from_yaml(args.model, ["train.", "epoch."])
engine = run_extras.get("train.engine", None)
if engine is None:
engine = run_extras.get("epoch.trainer_class", None)
if engine is None:
engine = "single"
engine = engine.upper()
if engine not in engine_choices:
......@@ -130,6 +138,18 @@ def single_engine(args):
trainer = TrainerFactory.create(args.model)
return trainer
def single_yamlopt_engine(args):
trainer = get_trainer_prefix(args) + "SingleTrainerYamlOpt"
single_envs = {}
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single_yamlopt"
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
def cluster_engine(args):
def update_workspace(cluster_envs):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册