提交 f385e9ce 编写于 作者: X xjqbest

fix

上级 43d49e3f
...@@ -134,18 +134,10 @@ class Model(object): ...@@ -134,18 +134,10 @@ class Model(object):
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate) print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate) return self._build_optimizer(optimizer, learning_rate)
def input_data(self, is_infer=False, dataset_name=None, program=None): def input_data(self, is_infer=False, **kwargs):
dataset = {} name = "dataset." + kwargs.get("dataset_name") + "."
for i in self._env["dataset"]: sparse_slots = envs.get_global_env(name + "sparse_slots")
if i["name"] == dataset_name: dense_slots = envs.get_global_env(name + "dense_slots")
dataset = i
break
sparse_slots = dataset.get("sparse_slots", None)
#sparse_slots =
#envs.get_global_env("sparse_slots", None,
# "train.reader")
#dense_slots = envs.get_global_env("dense_slots", None, "train.reader")
dense_slots = dataset.get("dense_slots", None)
if sparse_slots is not None or dense_slots is not None: if sparse_slots is not None or dense_slots is not None:
sparse_slots = sparse_slots.strip().split(" ") sparse_slots = sparse_slots.strip().split(" ")
dense_slots = dense_slots.strip().split(" ") dense_slots = dense_slots.strip().split(" ")
...@@ -168,8 +160,6 @@ class Model(object): ...@@ -168,8 +160,6 @@ class Model(object):
name=name, shape=[1], lod_level=1, dtype="int64") name=name, shape=[1], lod_level=1, dtype="int64")
data_var_.append(l) data_var_.append(l)
self._sparse_data_var.append(l) self._sparse_data_var.append(l)
print(self._dense_data_var)
print(self._sparse_data_var)
return data_var_ return data_var_
else: else:
......
...@@ -36,7 +36,7 @@ class SingleTrainer(TranspileTrainer): ...@@ -36,7 +36,7 @@ class SingleTrainer(TranspileTrainer):
super(TranspileTrainer, self).__init__(config) super(TranspileTrainer, self).__init__(config)
self._env = self._config#envs.get_global_envs() self._env = self._config#envs.get_global_envs()
#device = envs.get_global_env("train.device", "cpu") #device = envs.get_global_env("train.device", "cpu")
device = self._env["device"] device = envs.get_global_env("device")#self._env["device"]
if device == 'gpu': if device == 'gpu':
self._place = fluid.CUDAPlace(0) self._place = fluid.CUDAPlace(0)
elif device == 'cpu': elif device == 'cpu':
...@@ -45,6 +45,8 @@ class SingleTrainer(TranspileTrainer): ...@@ -45,6 +45,8 @@ class SingleTrainer(TranspileTrainer):
self.processor_register() self.processor_register()
self._model = {} self._model = {}
self._dataset = {} self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
#self.inference_models = [] #self.inference_models = []
#self.increment_models = [] #self.increment_models = []
...@@ -79,32 +81,46 @@ class SingleTrainer(TranspileTrainer): ...@@ -79,32 +81,46 @@ class SingleTrainer(TranspileTrainer):
# if self._env["hyper_parameters"]["optimizer"]["class"] == "Adam": # if self._env["hyper_parameters"]["optimizer"]["class"] == "Adam":
def _create_dataset(self, dataset_name): def _create_dataset(self, dataset_name):
config_dict = None #config_dict = envs.get_global_env("dataset." + dataset_name)
for i in self._env["dataset"]: #for i in self._env["dataset"]:
if i["name"] == dataset_name: # if i["name"] == dataset_name:
config_dict = i # config_dict = i
break # break
#reader_ins = SlotReader(self._config_yaml) #reader_ins = SlotReader(self._config_yaml)
sparse_slots = config_dict["sparse_slots"] name = "dataset." + dataset_name + "."
dense_slots = config_dict["dense_slots"] sparse_slots = envs.get_global_env(name + "sparse_slots")#config_dict.get("sparse_slots")#config_dict["sparse_slots"]
dense_slots = envs.get_global_env(name + "dense_slots")#config_dict.get("dense_slots")#config_dict["dense_slots"]
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_type = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), " change reader to DataLoader")
reader_type = "DataLoader"
padding = 0 padding = 0
reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py" reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py"
#reader = "{workspace}/paddlerec/core/utils/dataset_instance.py".replace("{workspace}", envs.path_adapter(self._env["workspace"])) #reader = "{workspace}/paddlerec/core/utils/dataset_instance.py".replace("{workspace}", envs.path_adapter(self._env["workspace"]))
pipe_cmd = "python {} {} {} {} {} {} {} {}".format( pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \ reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
if config_dict["type"] == "QueueDataset": #print(config_dict["type"])
dataset = fluid.DatasetFactory().create_dataset(config_dict["type"]) type_name = envs.get_global_env(name + "type")
dataset.set_batch_size(config_dict["batch_size"]) if type_name == "QueueDataset":
#if config_dict["type"] == "QueueDataset":
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
#dataset.set_thread(config_dict["thread_num"]) #dataset.set_thread(config_dict["thread_num"])
#dataset.set_hdfs_config(config_dict["data_fs_name"], config_dict["data_fs_ugi"]) #dataset.set_hdfs_config(config_dict["data_fs_name"], config_dict["data_fs_ugi"])
dataset.set_pipe_command(pipe_cmd) dataset.set_pipe_command(pipe_cmd)
train_data_path = config_dict["data_path"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) #print(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
#config_dict["data_path"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
file_list = [ file_list = [
os.path.join(train_data_path, x) os.path.join(train_data_path, x)
for x in os.listdir(train_data_path) for x in os.listdir(train_data_path)
] ]
#print(file_list)
dataset.set_filelist(file_list) dataset.set_filelist(file_list)
for model_dict in self._env["executor"]: for model_dict in self._env["executor"]:
if model_dict["dataset_name"] == dataset_name: if model_dict["dataset_name"] == dataset_name:
...@@ -118,24 +134,21 @@ class SingleTrainer(TranspileTrainer): ...@@ -118,24 +134,21 @@ class SingleTrainer(TranspileTrainer):
return dataset return dataset
def init(self, context): def init(self, context):
#self.model.train_net() #for model_dict in self._env["executor"]:
for model_dict in self._env["executor"]: for model_dict in self._env["executor"]:
self._model[model_dict["name"]] = [None] * 4 self._model[model_dict["name"]] = [None] * 4
# self._model[model_dict["name"]][0] = fluid.Program() #train_program
# self._model[model_dict["name"]][1] = fluid.Program() #startup_program
# self._model[model_dict["name"]][2] = fluid.Scope() #scope
train_program = fluid.Program() train_program = fluid.Program()
startup_program = fluid.Program() startup_program = fluid.Program()
scope = fluid.Scope() scope = fluid.Scope()
opt_name = self._env["hyper_parameters"]["optimizer"]["class"] opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = self._env["hyper_parameters"]["optimizer"]["learning_rate"] opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate")
opt_strategy = self._env["hyper_parameters"]["optimizer"]["strategy"] opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program): with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard(): with fluid.unique_name.guard():
model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env) model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env)
model._data_var = model.input_data(dataset_name=model_dict["dataset_name"]) model._data_var = model.input_data(dataset_name=model_dict["dataset_name"])
model.net(None)#### model.net(None)
optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy) optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy)
optimizer.minimize(model._cost) optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program self._model[model_dict["name"]][0] = train_program
...@@ -146,19 +159,6 @@ class SingleTrainer(TranspileTrainer): ...@@ -146,19 +159,6 @@ class SingleTrainer(TranspileTrainer):
for dataset in self._env["dataset"]: for dataset in self._env["dataset"]:
self._dataset[dataset["name"]] = self._create_dataset(dataset["name"]) self._dataset[dataset["name"]] = self._create_dataset(dataset["name"])
# self.fetch_vars = []
# self.fetch_alias = []
# self.fetch_period = self.model.get_fetch_period()
# metrics = self.model.get_metrics()
# if metrics:
# self.fetch_vars = metrics.values()
# self.fetch_alias = metrics.keys()
#evaluate_only = envs.get_global_env(
# 'evaluate_only', False, namespace='evaluate')
#if evaluate_only:
# context['status'] = 'infer_pass'
#else:
context['status'] = 'startup_pass' context['status'] = 'startup_pass'
def startup(self, context): def startup(self, context):
...@@ -172,62 +172,40 @@ class SingleTrainer(TranspileTrainer): ...@@ -172,62 +172,40 @@ class SingleTrainer(TranspileTrainer):
for j in range(epochs): for j in range(epochs):
for model_dict in self._env["executor"]: for model_dict in self._env["executor"]:
reader_name = model_dict["dataset_name"] reader_name = model_dict["dataset_name"]
#print(self._dataset) #dataset = envs.get_global_env("dataset." + reader_name)
#print(reader_name) name = "dataset." + reader_name + "."
dataset = None begin_time = time.time()
for i in self._env["dataset"]: #if dataset["type"] == "DataLoader":
if i["name"] == reader_name: if envs.get_global_env(name + "type") == "DataLoader":
dataset = i
break
if dataset["type"] == "DataLoader":
self._executor_dataloader_train(model_dict) self._executor_dataloader_train(model_dict)
else: else:
self._executor_dataset_train(model_dict) self._executor_dataset_train(model_dict)
print("epoch %s done" % j) end_time = time.time()
# self._model[model_dict["name"]][1] = fluid.compiler.CompiledProgram( seconds = end_time - begin_time
# self._model[model_dict["name"]][1]).with_data_parallel(loss_name=self._model.get_avg_cost().name) print("epoch {} done, time elasped: {}".format(j, seconds))
# fetch_vars = []
# fetch_alias = []
# fetch_period = self._model.get_fetch_period()
# metrics = self._model.get_metrics()
# if metrics:
# fetch_vars = metrics.values()
# fetch_alias = metrics.keys()
# metrics_varnames = []
context['status'] = "terminal_pass" context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict): def _executor_dataset_train(self, model_dict):
# dataset = self._get_dataset("TRAIN")
# ins = self._get_dataset_ins()
# epochs = envs.get_global_env("train.epochs")
# for i in range(epochs):
reader_name = model_dict["dataset_name"] reader_name = model_dict["dataset_name"]
model_name = model_dict["name"] model_name = model_dict["name"]
model_class = self._model[model_name][3] model_class = self._model[model_name][3]
fetch_vars = [] fetch_vars = []
fetch_alias = [] fetch_alias = []
fetch_period = 1#model_class.get_fetch_period() fetch_period = 20
metrics = model_class.get_metrics() metrics = model_class.get_metrics()
if metrics: if metrics:
fetch_vars = metrics.values() fetch_vars = metrics.values()
fetch_alias = metrics.keys() fetch_alias = metrics.keys()
scope = self._model[model_name][2] scope = self._model[model_name][2]
program = self._model[model_name][1] program = self._model[model_name][0]
reader = self._dataset[reader_name] reader = self._dataset[reader_name]
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
begin_time = time.time()
self._exe.train_from_dataset( self._exe.train_from_dataset(
program=program, program=program,
dataset=reader, dataset=reader,
fetch_list=fetch_vars, fetch_list=fetch_vars,
fetch_info=fetch_alias, fetch_info=fetch_alias,
print_period=fetch_period) print_period=fetch_period)
end_time = time.time()
times = end_time - begin_time
#print("epoch {} using time {}".format(i, times))
#print("epoch {} using time {}, speed {:.2f} lines/s".format(
# i, times, ins / times))
def _executor_dataloader_train(self, model_dict): def _executor_dataloader_train(self, model_dict):
...@@ -238,8 +216,8 @@ class SingleTrainer(TranspileTrainer): ...@@ -238,8 +216,8 @@ class SingleTrainer(TranspileTrainer):
self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name) self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name)
fetch_vars = [] fetch_vars = []
fetch_alias = [] fetch_alias = []
fetch_period = self._model.get_fetch_period() fetch_period = 20
metrics = self._model.get_metrics() metrics = model_class.get_metrics()
if metrics: if metrics:
fetch_vars = metrics.values() fetch_vars = metrics.values()
fetch_alias = metrics.keys() fetch_alias = metrics.keys()
...@@ -252,52 +230,12 @@ class SingleTrainer(TranspileTrainer): ...@@ -252,52 +230,12 @@ class SingleTrainer(TranspileTrainer):
metrics_format.append("{}: {{}}".format(name)) metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format) metrics_format = ", ".join(metrics_format)
reader = self._dataset["reader_name"] reader = self._dataset[reader_name]
reader.start() reader.start()
batch_id = 0 batch_id = 0
scope = self._model[model_name][3] scope = self._model[model_name][2]
prorgram = self._model[model_name][1] prorgram = self._model[model_name][0]
with fluid.scope_guard(self._model[model_name][3]): with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def dataloader_train(self, context):
exit(-1)
reader = self._get_dataloader(self._env["TRAIN"])
epochs = self._env["epochs"]
program = fluid.compiler.CompiledProgram(fluid.default_main_program(
)).with_data_parallel(loss_name=self.model.get_avg_cost().name)
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
for epoch in range(epochs):
reader.start()
batch_id = 0
try: try:
while True: while True:
metrics_rets = self._exe.run(program=program, metrics_rets = self._exe.run(program=program,
...@@ -311,32 +249,6 @@ class SingleTrainer(TranspileTrainer): ...@@ -311,32 +249,6 @@ class SingleTrainer(TranspileTrainer):
batch_id += 1 batch_id += 1
except fluid.core.EOFException: except fluid.core.EOFException:
reader.reset() reader.reset()
self.save(epoch, "train", is_fleet=False)
context['status'] = 'infer_pass'
def dataset_train(self, context):
dataset = self._get_dataset("TRAIN")
ins = self._get_dataset_ins()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
begin_time = time.time()
self._exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=False)
context['status'] = 'infer_pass'
def terminal(self, context): def terminal(self, context):
#for model in self.increment_models:
# print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True context['is_exit'] = True
...@@ -94,24 +94,30 @@ class TranspileTrainer(Trainer): ...@@ -94,24 +94,30 @@ class TranspileTrainer(Trainer):
count += 1 count += 1
return count return count
def _get_dataset(self, state="TRAIN"): #def _get_dataset(self, state="TRAIN"):
if state == "TRAIN": #if state == "TRAIN":
inputs = self.model.get_inputs() # inputs = self.model.get_inputs()
namespace = "train.reader" # namespace = "train.reader"
train_data_path = envs.get_global_env("train_data_path", None, # train_data_path = envs.get_global_env("train_data_path", None,
namespace) # namespace)
else: #else:
inputs = self.model.get_infer_inputs() # inputs = self.model.get_infer_inputs()
namespace = "evaluate.reader" # namespace = "evaluate.reader"
train_data_path = envs.get_global_env("test_data_path", None, # train_data_path = envs.get_global_env("test_data_path", None,
namespace) # namespace)
def _get_dataset(self, dataset_name):
sparse_slots = envs.get_global_env("sparse_slots", None, namespace) namespace = "dataset." + dataset_name + "."
dense_slots = envs.get_global_env("dense_slots", None, namespace) sparse_slots = envs.get_global_env(namespace + "sparse_slots")
dense_slots = envs.get_global_env(namespace + "dense_slots")
threads = int(envs.get_runtime_environ("train.trainer.threads")) thread_num = envs.get_global_env(namespace + "thread_num")
batch_size = envs.get_global_env("batch_size", None, namespace) #threads = int(envs.get_runtime_environ("train.trainer.threads"))
reader_class = envs.get_global_env("class", None, namespace) #batch_size = envs.get_global_env("batch_size", None, namespace)
batch_size = envs.get_global_env(namespace + "batch_size")
reader_type = envs.get_global_env(namespace + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), " change reader to DataLoader")
reader_type = "DataLoader"
reader_class = envs.get_global_env(namespace + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__)) abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
......
...@@ -20,7 +20,7 @@ import sys ...@@ -20,7 +20,7 @@ import sys
global_envs = {} global_envs = {}
global_envs_raw = {} #global_envs_raw = {}
def flatten_environs(envs, separator="."): def flatten_environs(envs, separator="."):
flatten_dict = {} flatten_dict = {}
...@@ -63,23 +63,44 @@ def get_trainer(): ...@@ -63,23 +63,44 @@ def get_trainer():
def set_global_envs(envs): def set_global_envs(envs):
assert isinstance(envs, dict) assert isinstance(envs, dict)
global_envs_raw = envs # namespace_nests = []
#print(envs)
return
def fatten_env_namespace(namespace_nests, local_envs): def fatten_env_namespace(namespace_nests, local_envs):
# if not isinstance(local_envs, dict):
# global_k = ".".join(namespace_nests)
# global_envs[global_k] = local_envs
# return
for k, v in local_envs.items(): for k, v in local_envs.items():
#print(k)
if isinstance(v, dict): if isinstance(v, dict):
nests = copy.deepcopy(namespace_nests) nests = copy.deepcopy(namespace_nests)
nests.append(k) nests.append(k)
fatten_env_namespace(nests, v) fatten_env_namespace(nests, v)
elif (k == "dataset" or k == "executor") and isinstance(v, list):
#print("=======================")
#print([i for i in v])
for i in v:
if i.get("name") is None:
raise ValueError("name must be in dataset list ", v)
nests = copy.deepcopy(namespace_nests)
nests.append(k)
nests.append(i["name"])
fatten_env_namespace(nests, i)
#global_k = ".".join(namespace_nests + [k, i["name"]])
#global_envs[global_k] = i
#print([i for i in v])
#global_k = ".".join(namespace_nests + [k])
#global_envs[global_k] = v
else: else:
global_k = ".".join(namespace_nests + [k]) global_k = ".".join(namespace_nests + [k])
global_envs[global_k] = v global_envs[global_k] = v
for k, v in envs.items(): #for k, v in envs.items():
fatten_env_namespace([k], v) # fatten_env_namespace([k], v)
fatten_env_namespace([], envs)
for i in global_envs:
print i,":",global_envs[i]
def get_global_env(env_name, default_value=None, namespace=None): def get_global_env(env_name, default_value=None, namespace=None):
""" """
...@@ -111,7 +132,7 @@ def windows_path_converter(path): ...@@ -111,7 +132,7 @@ def windows_path_converter(path):
def update_workspace(): def update_workspace():
workspace = global_envs.get("train.workspace", None) workspace = global_envs.get("workspace")
if not workspace: if not workspace:
return return
workspace = path_adapter(workspace) workspace = path_adapter(workspace)
......
...@@ -83,7 +83,6 @@ class Model(ModelBase): ...@@ -83,7 +83,6 @@ class Model(ModelBase):
avg_cost = fluid.layers.reduce_mean(cost) avg_cost = fluid.layers.reduce_mean(cost)
self._cost = avg_cost self._cost = avg_cost
auc, batch_auc, _ = fluid.layers.auc(input=self.predict, auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
label=self.label_input, label=self.label_input,
num_thresholds=2**12, num_thresholds=2**12,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册