diff --git a/core/model.py b/core/model.py index 52526659c9633b5c061db5467e9221948c442853..04061b3973f75bfd81461a5e31e660d8e05cb1a0 100755 --- a/core/model.py +++ b/core/model.py @@ -134,18 +134,10 @@ class Model(object): print(">>>>>>>>>>>.learnig rate: %s" % learning_rate) return self._build_optimizer(optimizer, learning_rate) - def input_data(self, is_infer=False, dataset_name=None, program=None): - dataset = {} - for i in self._env["dataset"]: - if i["name"] == dataset_name: - dataset = i - break - sparse_slots = dataset.get("sparse_slots", None) - #sparse_slots = - #envs.get_global_env("sparse_slots", None, - # "train.reader") - #dense_slots = envs.get_global_env("dense_slots", None, "train.reader") - dense_slots = dataset.get("dense_slots", None) + def input_data(self, is_infer=False, **kwargs): + name = "dataset." + kwargs.get("dataset_name") + "." + sparse_slots = envs.get_global_env(name + "sparse_slots") + dense_slots = envs.get_global_env(name + "dense_slots") if sparse_slots is not None or dense_slots is not None: sparse_slots = sparse_slots.strip().split(" ") dense_slots = dense_slots.strip().split(" ") @@ -168,8 +160,6 @@ class Model(object): name=name, shape=[1], lod_level=1, dtype="int64") data_var_.append(l) self._sparse_data_var.append(l) - print(self._dense_data_var) - print(self._sparse_data_var) return data_var_ else: diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py index 526ede424246daa48cee7881b07c33405362fe26..dea166548ac228ace3699cdb26838093bc5cf09e 100755 --- a/core/trainers/single_trainer.py +++ b/core/trainers/single_trainer.py @@ -36,7 +36,7 @@ class SingleTrainer(TranspileTrainer): super(TranspileTrainer, self).__init__(config) self._env = self._config#envs.get_global_envs() #device = envs.get_global_env("train.device", "cpu") - device = self._env["device"] + device = envs.get_global_env("device")#self._env["device"] if device == 'gpu': self._place = fluid.CUDAPlace(0) elif device == 'cpu': @@ -45,6 +45,8 @@ class SingleTrainer(TranspileTrainer): self.processor_register() self._model = {} self._dataset = {} + envs.set_global_envs(self._config) + envs.update_workspace() #self.inference_models = [] #self.increment_models = [] @@ -79,32 +81,46 @@ class SingleTrainer(TranspileTrainer): # if self._env["hyper_parameters"]["optimizer"]["class"] == "Adam": def _create_dataset(self, dataset_name): - config_dict = None - for i in self._env["dataset"]: - if i["name"] == dataset_name: - config_dict = i - break + #config_dict = envs.get_global_env("dataset." + dataset_name) + #for i in self._env["dataset"]: + # if i["name"] == dataset_name: + # config_dict = i + # break #reader_ins = SlotReader(self._config_yaml) - sparse_slots = config_dict["sparse_slots"] - dense_slots = config_dict["dense_slots"] + name = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(name + "sparse_slots")#config_dict.get("sparse_slots")#config_dict["sparse_slots"] + dense_slots = envs.get_global_env(name + "dense_slots")#config_dict.get("dense_slots")#config_dict["dense_slots"] + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + reader_type = envs.get_global_env(name + "type") + if envs.get_platform() != "LINUX": + print("platform ", envs.get_platform(), " change reader to DataLoader") + reader_type = "DataLoader" padding = 0 + reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py" #reader = "{workspace}/paddlerec/core/utils/dataset_instance.py".replace("{workspace}", envs.path_adapter(self._env["workspace"])) pipe_cmd = "python {} {} {} {} {} {} {} {}".format( reader, "slot", "slot", self._config_yaml, "fake", \ sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) - if config_dict["type"] == "QueueDataset": - dataset = fluid.DatasetFactory().create_dataset(config_dict["type"]) - dataset.set_batch_size(config_dict["batch_size"]) + #print(config_dict["type"]) + type_name = envs.get_global_env(name + "type") + if type_name == "QueueDataset": + #if config_dict["type"] == "QueueDataset": + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_batch_size(envs.get_global_env(name + "batch_size")) #dataset.set_thread(config_dict["thread_num"]) #dataset.set_hdfs_config(config_dict["data_fs_name"], config_dict["data_fs_ugi"]) dataset.set_pipe_command(pipe_cmd) - train_data_path = config_dict["data_path"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) + #print(pipe_cmd) + train_data_path = envs.get_global_env(name + "data_path") + #config_dict["data_path"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) file_list = [ os.path.join(train_data_path, x) for x in os.listdir(train_data_path) ] + #print(file_list) dataset.set_filelist(file_list) for model_dict in self._env["executor"]: if model_dict["dataset_name"] == dataset_name: @@ -118,24 +134,21 @@ class SingleTrainer(TranspileTrainer): return dataset def init(self, context): - #self.model.train_net() + #for model_dict in self._env["executor"]: for model_dict in self._env["executor"]: self._model[model_dict["name"]] = [None] * 4 -# self._model[model_dict["name"]][0] = fluid.Program() #train_program -# self._model[model_dict["name"]][1] = fluid.Program() #startup_program -# self._model[model_dict["name"]][2] = fluid.Scope() #scope train_program = fluid.Program() startup_program = fluid.Program() scope = fluid.Scope() - opt_name = self._env["hyper_parameters"]["optimizer"]["class"] - opt_lr = self._env["hyper_parameters"]["optimizer"]["learning_rate"] - opt_strategy = self._env["hyper_parameters"]["optimizer"]["strategy"] + opt_name = envs.get_global_env("hyper_parameters.optimizer.class") + opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate") + opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy") with fluid.program_guard(train_program, startup_program): with fluid.unique_name.guard(): model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env) model._data_var = model.input_data(dataset_name=model_dict["dataset_name"]) - model.net(None)#### + model.net(None) optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy) optimizer.minimize(model._cost) self._model[model_dict["name"]][0] = train_program @@ -146,19 +159,6 @@ class SingleTrainer(TranspileTrainer): for dataset in self._env["dataset"]: self._dataset[dataset["name"]] = self._create_dataset(dataset["name"]) -# self.fetch_vars = [] -# self.fetch_alias = [] -# self.fetch_period = self.model.get_fetch_period() - -# metrics = self.model.get_metrics() -# if metrics: -# self.fetch_vars = metrics.values() -# self.fetch_alias = metrics.keys() - #evaluate_only = envs.get_global_env( - # 'evaluate_only', False, namespace='evaluate') - #if evaluate_only: - # context['status'] = 'infer_pass' - #else: context['status'] = 'startup_pass' def startup(self, context): @@ -172,62 +172,40 @@ class SingleTrainer(TranspileTrainer): for j in range(epochs): for model_dict in self._env["executor"]: reader_name = model_dict["dataset_name"] - #print(self._dataset) - #print(reader_name) - dataset = None - for i in self._env["dataset"]: - if i["name"] == reader_name: - dataset = i - break - if dataset["type"] == "DataLoader": + #dataset = envs.get_global_env("dataset." + reader_name) + name = "dataset." + reader_name + "." + begin_time = time.time() + #if dataset["type"] == "DataLoader": + if envs.get_global_env(name + "type") == "DataLoader": self._executor_dataloader_train(model_dict) else: self._executor_dataset_train(model_dict) - print("epoch %s done" % j) -# self._model[model_dict["name"]][1] = fluid.compiler.CompiledProgram( -# self._model[model_dict["name"]][1]).with_data_parallel(loss_name=self._model.get_avg_cost().name) -# fetch_vars = [] -# fetch_alias = [] -# fetch_period = self._model.get_fetch_period() -# metrics = self._model.get_metrics() -# if metrics: -# fetch_vars = metrics.values() -# fetch_alias = metrics.keys() -# metrics_varnames = [] + end_time = time.time() + seconds = end_time - begin_time + print("epoch {} done, time elasped: {}".format(j, seconds)) context['status'] = "terminal_pass" def _executor_dataset_train(self, model_dict): -# dataset = self._get_dataset("TRAIN") -# ins = self._get_dataset_ins() - -# epochs = envs.get_global_env("train.epochs") -# for i in range(epochs): reader_name = model_dict["dataset_name"] model_name = model_dict["name"] model_class = self._model[model_name][3] fetch_vars = [] fetch_alias = [] - fetch_period = 1#model_class.get_fetch_period() + fetch_period = 20 metrics = model_class.get_metrics() if metrics: fetch_vars = metrics.values() fetch_alias = metrics.keys() scope = self._model[model_name][2] - program = self._model[model_name][1] + program = self._model[model_name][0] reader = self._dataset[reader_name] with fluid.scope_guard(scope): - begin_time = time.time() self._exe.train_from_dataset( program=program, dataset=reader, fetch_list=fetch_vars, fetch_info=fetch_alias, print_period=fetch_period) - end_time = time.time() - times = end_time - begin_time - #print("epoch {} using time {}".format(i, times)) - #print("epoch {} using time {}, speed {:.2f} lines/s".format( - # i, times, ins / times)) def _executor_dataloader_train(self, model_dict): @@ -238,8 +216,8 @@ class SingleTrainer(TranspileTrainer): self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name) fetch_vars = [] fetch_alias = [] - fetch_period = self._model.get_fetch_period() - metrics = self._model.get_metrics() + fetch_period = 20 + metrics = model_class.get_metrics() if metrics: fetch_vars = metrics.values() fetch_alias = metrics.keys() @@ -252,52 +230,12 @@ class SingleTrainer(TranspileTrainer): metrics_format.append("{}: {{}}".format(name)) metrics_format = ", ".join(metrics_format) - reader = self._dataset["reader_name"] + reader = self._dataset[reader_name] reader.start() batch_id = 0 - scope = self._model[model_name][3] - prorgram = self._model[model_name][1] - with fluid.scope_guard(self._model[model_name][3]): - try: - while True: - metrics_rets = self._exe.run(program=program, - fetch_list=metrics_varnames) - - metrics = [epoch, batch_id] - metrics.extend(metrics_rets) - - if batch_id % self.fetch_period == 0 and batch_id != 0: - print(metrics_format.format(*metrics)) - batch_id += 1 - except fluid.core.EOFException: - reader.reset() - - - def dataloader_train(self, context): - - exit(-1) - - reader = self._get_dataloader(self._env["TRAIN"]) - epochs = self._env["epochs"] - - program = fluid.compiler.CompiledProgram(fluid.default_main_program( - )).with_data_parallel(loss_name=self.model.get_avg_cost().name) - - metrics_varnames = [] - metrics_format = [] - - metrics_format.append("{}: {{}}".format("epoch")) - metrics_format.append("{}: {{}}".format("batch")) - - for name, var in self.model.get_metrics().items(): - metrics_varnames.append(var.name) - metrics_format.append("{}: {{}}".format(name)) - - metrics_format = ", ".join(metrics_format) - - for epoch in range(epochs): - reader.start() - batch_id = 0 + scope = self._model[model_name][2] + prorgram = self._model[model_name][0] + with fluid.scope_guard(scope): try: while True: metrics_rets = self._exe.run(program=program, @@ -311,32 +249,6 @@ class SingleTrainer(TranspileTrainer): batch_id += 1 except fluid.core.EOFException: reader.reset() - self.save(epoch, "train", is_fleet=False) - - context['status'] = 'infer_pass' - - def dataset_train(self, context): - dataset = self._get_dataset("TRAIN") - ins = self._get_dataset_ins() - - epochs = envs.get_global_env("train.epochs") - for i in range(epochs): - begin_time = time.time() - self._exe.train_from_dataset( - program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) - end_time = time.time() - times = end_time - begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format( - i, times, ins / times)) - - self.save(i, "train", is_fleet=False) - context['status'] = 'infer_pass' def terminal(self, context): - #for model in self.increment_models: - # print("epoch :{}, dir: {}".format(model[0], model[1])) context['is_exit'] = True diff --git a/core/trainers/transpiler_trainer.py b/core/trainers/transpiler_trainer.py index c121b4abb624503936faca8e77902a97e3f0cf82..86c33ceb382509b700097513c5e034f1496869ce 100755 --- a/core/trainers/transpiler_trainer.py +++ b/core/trainers/transpiler_trainer.py @@ -94,24 +94,30 @@ class TranspileTrainer(Trainer): count += 1 return count - def _get_dataset(self, state="TRAIN"): - if state == "TRAIN": - inputs = self.model.get_inputs() - namespace = "train.reader" - train_data_path = envs.get_global_env("train_data_path", None, - namespace) - else: - inputs = self.model.get_infer_inputs() - namespace = "evaluate.reader" - train_data_path = envs.get_global_env("test_data_path", None, - namespace) - - sparse_slots = envs.get_global_env("sparse_slots", None, namespace) - dense_slots = envs.get_global_env("dense_slots", None, namespace) - - threads = int(envs.get_runtime_environ("train.trainer.threads")) - batch_size = envs.get_global_env("batch_size", None, namespace) - reader_class = envs.get_global_env("class", None, namespace) + #def _get_dataset(self, state="TRAIN"): + #if state == "TRAIN": + # inputs = self.model.get_inputs() + # namespace = "train.reader" + # train_data_path = envs.get_global_env("train_data_path", None, + # namespace) + #else: + # inputs = self.model.get_infer_inputs() + # namespace = "evaluate.reader" + # train_data_path = envs.get_global_env("test_data_path", None, + # namespace) + def _get_dataset(self, dataset_name): + namespace = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(namespace + "sparse_slots") + dense_slots = envs.get_global_env(namespace + "dense_slots") + thread_num = envs.get_global_env(namespace + "thread_num") + #threads = int(envs.get_runtime_environ("train.trainer.threads")) + #batch_size = envs.get_global_env("batch_size", None, namespace) + batch_size = envs.get_global_env(namespace + "batch_size") + reader_type = envs.get_global_env(namespace + "type") + if envs.get_platform() != "LINUX": + print("platform ", envs.get_platform(), " change reader to DataLoader") + reader_type = "DataLoader" + reader_class = envs.get_global_env(namespace + "data_converter") abs_dir = os.path.dirname(os.path.abspath(__file__)) reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') diff --git a/core/utils/envs.py b/core/utils/envs.py index ab631ac4f517c7cbb190c317c5fdc5667a953d91..1522f6c9ba3b96711d5f9dcd202e3453537e655c 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -20,7 +20,7 @@ import sys global_envs = {} -global_envs_raw = {} +#global_envs_raw = {} def flatten_environs(envs, separator="."): flatten_dict = {} @@ -63,23 +63,44 @@ def get_trainer(): def set_global_envs(envs): assert isinstance(envs, dict) - global_envs_raw = envs - - return - +# namespace_nests = [] + #print(envs) def fatten_env_namespace(namespace_nests, local_envs): +# if not isinstance(local_envs, dict): +# global_k = ".".join(namespace_nests) +# global_envs[global_k] = local_envs +# return for k, v in local_envs.items(): + #print(k) if isinstance(v, dict): nests = copy.deepcopy(namespace_nests) nests.append(k) fatten_env_namespace(nests, v) + elif (k == "dataset" or k == "executor") and isinstance(v, list): + #print("=======================") + #print([i for i in v]) + for i in v: + if i.get("name") is None: + raise ValueError("name must be in dataset list ", v) + nests = copy.deepcopy(namespace_nests) + nests.append(k) + nests.append(i["name"]) + fatten_env_namespace(nests, i) + #global_k = ".".join(namespace_nests + [k, i["name"]]) + #global_envs[global_k] = i + + #print([i for i in v]) + #global_k = ".".join(namespace_nests + [k]) + #global_envs[global_k] = v else: global_k = ".".join(namespace_nests + [k]) global_envs[global_k] = v - for k, v in envs.items(): - fatten_env_namespace([k], v) - + #for k, v in envs.items(): + # fatten_env_namespace([k], v) + fatten_env_namespace([], envs) + for i in global_envs: + print i,":",global_envs[i] def get_global_env(env_name, default_value=None, namespace=None): """ @@ -111,7 +132,7 @@ def windows_path_converter(path): def update_workspace(): - workspace = global_envs.get("train.workspace", None) + workspace = global_envs.get("workspace") if not workspace: return workspace = path_adapter(workspace) diff --git a/models/rank/dnn/model.py b/models/rank/dnn/model.py index e45c4a181ab16b52861511a332c6cd72310a32c4..66ce9246528b719c38f660a5767bdf70d13b724c 100755 --- a/models/rank/dnn/model.py +++ b/models/rank/dnn/model.py @@ -83,7 +83,6 @@ class Model(ModelBase): avg_cost = fluid.layers.reduce_mean(cost) self._cost = avg_cost - auc, batch_auc, _ = fluid.layers.auc(input=self.predict, label=self.label_input, num_thresholds=2**12,