diff --git a/core/factory.py b/core/factory.py index 470b3a025e51d8c9fd6b2b3bcbb118fb8a619d77..e576320416cfb182924617893dde67c63b395d0d 100755 --- a/core/factory.py +++ b/core/factory.py @@ -36,7 +36,8 @@ def trainer_registry(): "tdm_single_trainer.py") trainers["TDMClusterTrainer"] = os.path.join(trainer_abs, "tdm_cluster_trainer.py") - + trainers["SingleTrainerYamlOpt"] = os.path.join(trainer_abs, + "single_trainer_yamlopt.py") trainer_registry() diff --git a/core/trainers/single_trainer_yamlopt.py b/core/trainers/single_trainer_yamlopt.py new file mode 100755 index 0000000000000000000000000000000000000000..b14cf4437ce0528e0c8ff5e0e7adaa27d5fb1c11 --- /dev/null +++ b/core/trainers/single_trainer_yamlopt.py @@ -0,0 +1,220 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Training use fluid with one node only. +""" + +from __future__ import print_function + +import time +import logging +import os +import paddle.fluid as fluid + +from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer +from paddlerec.core.utils import envs +from paddlerec.core.reader import SlotReader + +logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger("fluid") +logger.setLevel(logging.INFO) + + +class SingleTrainerYamlOpt(TranspileTrainer): + def __init__(self, config=None): + super(TranspileTrainer, self).__init__(config) + self._env = self._config + device = envs.get_global_env("device") + if device == 'gpu': + self._place = fluid.CUDAPlace(0) + elif device == 'cpu': + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + self.processor_register() + self._model = {} + self._dataset = {} + envs.set_global_envs(self._config) + envs.update_workspace() + + def processor_register(self): + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('startup_pass', self.startup) + self.regist_context_processor('train_pass', self.executor_train) + self.regist_context_processor('terminal_pass', self.terminal) + + def instance(self, context): + context['status'] = 'init_pass' + + def dataloader_train(self, context): + pass + + def dataset_train(self, context): + pass + + def _create_dataset(self, dataset_name): + name = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(name + "sparse_slots") + dense_slots = envs.get_global_env(name + "dense_slots") + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + reader_type = envs.get_global_env(name + "type") + if envs.get_platform() != "LINUX": + print("platform ", envs.get_platform(), " change reader to DataLoader") + reader_type = "DataLoader" + padding = 0 + + reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py" + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, "fake", \ + sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) + + type_name = envs.get_global_env(name + "type") + if type_name == "QueueDataset": + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_batch_size(envs.get_global_env(name + "batch_size")) + dataset.set_pipe_command(pipe_cmd) + train_data_path = envs.get_global_env(name + "data_path") + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + dataset.set_filelist(file_list) + for model_dict in self._env["executor"]: + if model_dict["dataset_name"] == dataset_name: + model = self._model[model_dict["name"]][3] + inputs = model.get_inputs() + dataset.set_use_var(inputs) + break + else: + pass + + return dataset + + def init(self, context): + for model_dict in self._env["executor"]: + self._model[model_dict["name"]] = [None] * 4 + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + opt_name = envs.get_global_env("hyper_parameters.optimizer.class") + opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate") + opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy") + with fluid.program_guard(train_program, startup_program): + with fluid.unique_name.guard(): + model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) + model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env) + model._data_var = model.input_data(dataset_name=model_dict["dataset_name"]) + model.net(None) + optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy) + optimizer.minimize(model._cost) + self._model[model_dict["name"]][0] = train_program + self._model[model_dict["name"]][1] = startup_program + self._model[model_dict["name"]][2] = scope + self._model[model_dict["name"]][3] = model + + for dataset in self._env["dataset"]: + self._dataset[dataset["name"]] = self._create_dataset(dataset["name"]) + + context['status'] = 'startup_pass' + + def startup(self, context): + for model_dict in self._env["executor"]: + with fluid.scope_guard(self._model[model_dict["name"]][2]): + self._exe.run(self._model[model_dict["name"]][1]) + context['status'] = 'train_pass' + + def executor_train(self, context): + epochs = int(self._env["epochs"]) + for j in range(epochs): + for model_dict in self._env["executor"]: + reader_name = model_dict["dataset_name"] + name = "dataset." + reader_name + "." + begin_time = time.time() + if envs.get_global_env(name + "type") == "DataLoader": + self._executor_dataloader_train(model_dict) + else: + self._executor_dataset_train(model_dict) + end_time = time.time() + seconds = end_time - begin_time + print("epoch {} done, time elasped: {}".format(j, seconds)) + context['status'] = "terminal_pass" + + def _executor_dataset_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + fetch_vars = [] + fetch_alias = [] + fetch_period = 20 + metrics = model_class.get_metrics() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() + scope = self._model[model_name][2] + program = self._model[model_name][0] + reader = self._dataset[reader_name] + with fluid.scope_guard(scope): + self._exe.train_from_dataset( + program=program, + dataset=reader, + fetch_list=fetch_vars, + fetch_info=fetch_alias, + print_period=fetch_period) + + + def _executor_dataloader_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + self._model[model_name][1] = fluid.compiler.CompiledProgram( + self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name) + fetch_vars = [] + fetch_alias = [] + fetch_period = 20 + metrics = model_class.get_metrics() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() + metrics_varnames = [] + metrics_format = [] + metrics_format.append("{}: {{}}".format("epoch")) + metrics_format.append("{}: {{}}".format("batch")) + for name, var in model_class.items(): + metrics_varnames.append(var.name) + metrics_format.append("{}: {{}}".format(name)) + metrics_format = ", ".join(metrics_format) + + reader = self._dataset[reader_name] + reader.start() + batch_id = 0 + scope = self._model[model_name][2] + prorgram = self._model[model_name][0] + with fluid.scope_guard(scope): + try: + while True: + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames) + + metrics = [epoch, batch_id] + metrics.extend(metrics_rets) + + if batch_id % self.fetch_period == 0 and batch_id != 0: + print(metrics_format.format(*metrics)) + batch_id += 1 + except fluid.core.EOFException: + reader.reset() + + def terminal(self, context): + context['is_exit'] = True diff --git a/core/trainers/transpiler_trainer.py b/core/trainers/transpiler_trainer.py index 86c33ceb382509b700097513c5e034f1496869ce..c121b4abb624503936faca8e77902a97e3f0cf82 100755 --- a/core/trainers/transpiler_trainer.py +++ b/core/trainers/transpiler_trainer.py @@ -94,30 +94,24 @@ class TranspileTrainer(Trainer): count += 1 return count - #def _get_dataset(self, state="TRAIN"): - #if state == "TRAIN": - # inputs = self.model.get_inputs() - # namespace = "train.reader" - # train_data_path = envs.get_global_env("train_data_path", None, - # namespace) - #else: - # inputs = self.model.get_infer_inputs() - # namespace = "evaluate.reader" - # train_data_path = envs.get_global_env("test_data_path", None, - # namespace) - def _get_dataset(self, dataset_name): - namespace = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(namespace + "sparse_slots") - dense_slots = envs.get_global_env(namespace + "dense_slots") - thread_num = envs.get_global_env(namespace + "thread_num") - #threads = int(envs.get_runtime_environ("train.trainer.threads")) - #batch_size = envs.get_global_env("batch_size", None, namespace) - batch_size = envs.get_global_env(namespace + "batch_size") - reader_type = envs.get_global_env(namespace + "type") - if envs.get_platform() != "LINUX": - print("platform ", envs.get_platform(), " change reader to DataLoader") - reader_type = "DataLoader" - reader_class = envs.get_global_env(namespace + "data_converter") + def _get_dataset(self, state="TRAIN"): + if state == "TRAIN": + inputs = self.model.get_inputs() + namespace = "train.reader" + train_data_path = envs.get_global_env("train_data_path", None, + namespace) + else: + inputs = self.model.get_infer_inputs() + namespace = "evaluate.reader" + train_data_path = envs.get_global_env("test_data_path", None, + namespace) + + sparse_slots = envs.get_global_env("sparse_slots", None, namespace) + dense_slots = envs.get_global_env("dense_slots", None, namespace) + + threads = int(envs.get_runtime_environ("train.trainer.threads")) + batch_size = envs.get_global_env("batch_size", None, namespace) + reader_class = envs.get_global_env("class", None, namespace) abs_dir = os.path.dirname(os.path.abspath(__file__)) reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') diff --git a/core/utils/envs.py b/core/utils/envs.py index 1522f6c9ba3b96711d5f9dcd202e3453537e655c..ac9f2d0f5a473a5134c9606d77c4cfdc1e4ad38d 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -20,8 +20,6 @@ import sys global_envs = {} -#global_envs_raw = {} - def flatten_environs(envs, separator="."): flatten_dict = {} assert isinstance(envs, dict) @@ -63,22 +61,13 @@ def get_trainer(): def set_global_envs(envs): assert isinstance(envs, dict) -# namespace_nests = [] - #print(envs) def fatten_env_namespace(namespace_nests, local_envs): -# if not isinstance(local_envs, dict): -# global_k = ".".join(namespace_nests) -# global_envs[global_k] = local_envs -# return for k, v in local_envs.items(): - #print(k) if isinstance(v, dict): nests = copy.deepcopy(namespace_nests) nests.append(k) fatten_env_namespace(nests, v) elif (k == "dataset" or k == "executor") and isinstance(v, list): - #print("=======================") - #print([i for i in v]) for i in v: if i.get("name") is None: raise ValueError("name must be in dataset list ", v) @@ -86,18 +75,10 @@ def set_global_envs(envs): nests.append(k) nests.append(i["name"]) fatten_env_namespace(nests, i) - #global_k = ".".join(namespace_nests + [k, i["name"]]) - #global_envs[global_k] = i - - #print([i for i in v]) - #global_k = ".".join(namespace_nests + [k]) - #global_envs[global_k] = v else: global_k = ".".join(namespace_nests + [k]) global_envs[global_k] = v - #for k, v in envs.items(): - # fatten_env_namespace([k], v) fatten_env_namespace([], envs) for i in global_envs: print i,":",global_envs[i] diff --git a/models/rank/dnn/config.yaml b/models/rank/dnn/config.yaml index 9329c9bacfd26b9c9d7a68e5ec98520935a30e5c..4a1a2f141327b17223ed5714ca1728cadb053656 100755 --- a/models/rank/dnn/config.yaml +++ b/models/rank/dnn/config.yaml @@ -46,7 +46,7 @@ hyper_parameters: fc_sizes: [512, 256, 128, 32] epoch: - trainer_class: Single + trainer_class: single_yamlopt save_checkpoint_interval: 2 save_inference_interval: 4 save_checkpoint_path: "increment" diff --git a/models/rank/dnn/model.py b/models/rank/dnn/model.py index 66ce9246528b719c38f660a5767bdf70d13b724c..937eb540607a6de453f657b2c8dd3b55305d1215 100755 --- a/models/rank/dnn/model.py +++ b/models/rank/dnn/model.py @@ -27,12 +27,9 @@ class Model(ModelBase): def _init_hyper_parameters(self): self.is_distributed = True if envs.get_trainer( ) == "CtrTrainer" else False - self.sparse_feature_number = 1000001 #envs.get_global_env( - #"hyper_parameters.sparse_feature_number", None, self._namespace) - self.sparse_feature_dim = 9#envs.get_global_env( - #"hyper_parameters.sparse_feature_dim", None, self._namespace) - self.learning_rate = 0.001#envs.get_global_env( - #"hyper_parameters.learning_rate", None, self._namespace) + self.sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number") + self.sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim") + self.learning_rate = envs.get_global_env("hyper_parameters.learning_rate") def net(self, input, is_infer=False): self.sparse_inputs = self._sparse_data_var[1:] diff --git a/run.py b/run.py index c80c647d0c8bab5cd9918429f9bf460b6093335d..b89672356895602b3aae9a8b74898b172ceafc77 100755 --- a/run.py +++ b/run.py @@ -28,7 +28,7 @@ device = ["CPU", "GPU"] clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"] engine_choices = [ "SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER", - "TDM_CLUSTER" + "TDM_CLUSTER", "SINGLE_YAMLOPT" ] custom_model = ['TDM'] model_name = "" @@ -42,12 +42,14 @@ def engine_registry(): engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine engines["TRANSPILER"]["CLUSTER"] = cluster_engine + engines["TRANSPILER"]["SINGLE_YAMLOPT"] = single_yamlopt_engine + engines["PSLIB"]["SINGLE"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine -def get_inters_from_yaml(file, filter): +def get_inters_from_yaml(file, filters): with open(file, 'r') as rb: _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) @@ -55,16 +57,22 @@ def get_inters_from_yaml(file, filter): inters = {} for k, v in flattens.items(): - if k.startswith(filter): - inters[k] = v + for f in filters: + if k.startswith(f): + inters[k] = v return inters def get_engine(args): transpiler = get_transpiler() - run_extras = get_inters_from_yaml(args.model, "train.") - - engine = run_extras.get("train.engine", "single") + run_extras = get_inters_from_yaml(args.model, ["train.", "epoch."]) + + engine = run_extras.get("train.engine", None) + if engine is None: + engine = run_extras.get("epoch.trainer_class", None) + if engine is None: + engine = "single" + engine = engine.upper() if engine not in engine_choices: @@ -130,6 +138,18 @@ def single_engine(args): trainer = TrainerFactory.create(args.model) return trainer +def single_yamlopt_engine(args): + trainer = get_trainer_prefix(args) + "SingleTrainerYamlOpt" + single_envs = {} + single_envs["train.trainer.trainer"] = trainer + single_envs["train.trainer.threads"] = "2" + single_envs["train.trainer.engine"] = "single_yamlopt" + single_envs["train.trainer.platform"] = envs.get_platform() + print("use {} engine to run model: {}".format(trainer, args.model)) + + set_runtime_envs(single_envs, args.model) + trainer = TrainerFactory.create(args.model) + return trainer def cluster_engine(args): def update_workspace(cluster_envs):