diff --git a/core/factory.py b/core/factory.py index 470b3a025e51d8c9fd6b2b3bcbb118fb8a619d77..2e2d013bd27eb73abd3b9ad5507b4e9373276a3b 100755 --- a/core/factory.py +++ b/core/factory.py @@ -26,6 +26,7 @@ trainers = {} def trainer_registry(): trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py") + trainers["SingleInfer"] = os.path.join(trainer_abs, "single_infer.py") trainers["ClusterTrainer"] = os.path.join(trainer_abs, "cluster_trainer.py") trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, diff --git a/core/model.py b/core/model.py index 847a5d23362e4c3db8ef0cfa59fb83c4ed9a4c91..9422dc49ea9492ff958106a1a51ea11f318bd148 100755 --- a/core/model.py +++ b/core/model.py @@ -38,18 +38,38 @@ class Model(object): self._namespace = "train.model" self._platform = envs.get_platform() self._init_hyper_parameters() + self._env = config + self._slot_inited = False def _init_hyper_parameters(self): pass - def _init_slots(self): - sparse_slots = envs.get_global_env("sparse_slots", None, - "train.reader") - dense_slots = envs.get_global_env("dense_slots", None, "train.reader") - - if sparse_slots is not None or dense_slots is not None: - sparse_slots = sparse_slots.strip().split(" ") - dense_slots = dense_slots.strip().split(" ") + def _init_slots(self, **kargs): + if self._slot_inited: + return + self._slot_inited = True + dataset = {} + model_dict = {} + for i in self._env["executor"]: + if i["name"] == kargs["name"]: + model_dict = i + break + for i in self._env["dataset"]: + if i["name"] == model_dict["dataset_name"]: + dataset = i + break + name = "dataset." + dataset["name"] + "." + sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() + dense_slots = envs.get_global_env(name + "dense_slots", "").strip() + if sparse_slots != "" or dense_slots != "": + if sparse_slots == "": + sparse_slots = [] + else: + sparse_slots = sparse_slots.strip().split(" ") + if dense_slots == "": + dense_slots = [] + else: + dense_slots = dense_slots.strip().split(" ") dense_slots_shape = [[ int(j) for j in i.split(":")[1].strip("[]").split(",") ] for i in dense_slots] @@ -69,14 +89,17 @@ class Model(object): self._data_var.append(l) self._sparse_data_var.append(l) - dataset_class = envs.get_global_env("dataset_class", None, - "train.reader") + dataset_class = dataset["type"] if dataset_class == "DataLoader": self._init_dataloader() - def _init_dataloader(self): + def _init_dataloader(self, is_infer=False): + if is_infer: + data = self._infer_data_var + else: + data = self._data_var self._data_loader = fluid.io.DataLoader.from_generator( - feed_list=self._data_var, + feed_list=data, capacity=64, use_double_buffer=False, iterable=False) @@ -103,7 +126,7 @@ class Model(object): def get_fetch_period(self): return self._fetch_interval - def _build_optimizer(self, name, lr): + def _build_optimizer(self, name, lr, strategy=None): name = name.upper() optimizers = ["SGD", "ADAM", "ADAGRAD"] if name not in optimizers: @@ -130,16 +153,23 @@ class Model(object): None, self._namespace) optimizer = envs.get_global_env("hyper_parameters.optimizer", None, self._namespace) - print(">>>>>>>>>>>.learnig rate: %s" % learning_rate) return self._build_optimizer(optimizer, learning_rate) - def input_data(self, is_infer=False): - sparse_slots = envs.get_global_env("sparse_slots", None, - "train.reader") - dense_slots = envs.get_global_env("dense_slots", None, "train.reader") - if sparse_slots is not None or dense_slots is not None: - sparse_slots = sparse_slots.strip().split(" ") - dense_slots = dense_slots.strip().split(" ") + def input_data(self, is_infer=False, **kwargs): + name = "dataset." + kwargs.get("dataset_name") + "." + sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() + dense_slots = envs.get_global_env(name + "dense_slots", "").strip() + self._sparse_data_var_map = {} + self._dense_data_var_map = {} + if sparse_slots != "" or dense_slots != "": + if sparse_slots == "": + sparse_slots = [] + else: + sparse_slots = sparse_slots.strip().split(" ") + if dense_slots == "": + dense_slots = [] + else: + dense_slots = dense_slots.strip().split(" ") dense_slots_shape = [[ int(j) for j in i.split(":")[1].strip("[]").split(",") ] for i in dense_slots] @@ -153,12 +183,14 @@ class Model(object): dtype="float32") data_var_.append(l) self._dense_data_var.append(l) + self._dense_data_var_map[dense_slots[i]] = l self._sparse_data_var = [] for name in sparse_slots: l = fluid.layers.data( name=name, shape=[1], lod_level=1, dtype="int64") data_var_.append(l) self._sparse_data_var.append(l) + self._sparse_data_var_map[name] = l return data_var_ else: diff --git a/core/reader.py b/core/reader.py index dc0b9b0b7fd784fdc86e7db5e0b488c35b9021dc..555ae4ba83fa1fd0e1e57e110c199c9cedc1b1cb 100755 --- a/core/reader.py +++ b/core/reader.py @@ -35,9 +35,6 @@ class Reader(dg.MultiSlotDataGenerator): else: raise ValueError("reader config only support yaml") - envs.set_global_envs(_config) - envs.update_workspace() - @abc.abstractmethod def init(self): """init""" @@ -58,13 +55,17 @@ class SlotReader(dg.MultiSlotDataGenerator): _config = yaml.load(rb.read(), Loader=yaml.FullLoader) else: raise ValueError("reader config only support yaml") - envs.set_global_envs(_config) - envs.update_workspace() def init(self, sparse_slots, dense_slots, padding=0): from operator import mul - self.sparse_slots = sparse_slots.strip().split(" ") - self.dense_slots = dense_slots.strip().split(" ") + self.sparse_slots = [] + if sparse_slots.strip() != "#" and sparse_slots.strip( + ) != "?" and sparse_slots.strip() != "": + self.sparse_slots = sparse_slots.strip().split(" ") + self.dense_slots = [] + if dense_slots.strip() != "#" and dense_slots.strip( + ) != "?" and dense_slots.strip() != "": + self.dense_slots = dense_slots.strip().split(" ") self.dense_slots_shape = [ reduce(mul, [int(j) for j in i.split(":")[1].strip("[]").split(",")]) diff --git a/core/trainers/single_infer.py b/core/trainers/single_infer.py new file mode 100755 index 0000000000000000000000000000000000000000..0f1c92f3f2d948c76d4cd2b0fdcca131b99cfc92 --- /dev/null +++ b/core/trainers/single_infer.py @@ -0,0 +1,355 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Training use fluid with one node only. +""" + +from __future__ import print_function + +import time +import logging +import os +import paddle.fluid as fluid + +from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer +from paddlerec.core.utils import envs +from paddlerec.core.reader import SlotReader +from paddlerec.core.utils import dataloader_instance + +logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger("fluid") +logger.setLevel(logging.INFO) + + +class SingleInfer(TranspileTrainer): + def __init__(self, config=None): + super(TranspileTrainer, self).__init__(config) + self._env = self._config + device = envs.get_global_env("device") + if device == 'gpu': + self._place = fluid.CUDAPlace(0) + elif device == 'cpu': + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + self.processor_register() + self._model = {} + self._dataset = {} + envs.set_global_envs(self._config) + envs.update_workspace() + self._runner_name = envs.get_global_env("mode") + device = envs.get_global_env("runner." + self._runner_name + ".device") + if device == 'gpu': + self._place = fluid.CUDAPlace(0) + elif device == 'cpu': + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + + def processor_register(self): + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('startup_pass', self.startup) + self.regist_context_processor('train_pass', self.executor_train) + self.regist_context_processor('terminal_pass', self.terminal) + + def instance(self, context): + context['status'] = 'init_pass' + + def _get_dataset(self, dataset_name): + name = "dataset." + dataset_name + "." + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + reader_class = envs.get_global_env(name + "data_converter") + abs_dir = os.path.dirname(os.path.abspath(__file__)) + reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') + sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() + dense_slots = envs.get_global_env(name + "dense_slots", "").strip() + if sparse_slots == "" and dense_slots == "": + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, + "TRAIN", self._config_yaml) + else: + if sparse_slots == "": + sparse_slots = "?" + if dense_slots == "": + dense_slots = "?" + padding = envs.get_global_env(name + "padding", 0) + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, "fake", \ + sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding)) + + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_batch_size(envs.get_global_env(name + "batch_size")) + dataset.set_pipe_command(pipe_cmd) + train_data_path = envs.get_global_env(name + "data_path") + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + dataset.set_filelist(file_list) + for model_dict in self._env["phase"]: + if model_dict["dataset_name"] == dataset_name: + model = self._model[model_dict["name"]][3] + inputs = model._infer_data_var + dataset.set_use_var(inputs) + break + return dataset + + def _get_dataloader(self, dataset_name, dataloader): + name = "dataset." + dataset_name + "." + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + reader_class = envs.get_global_env(name + "data_converter") + abs_dir = os.path.dirname(os.path.abspath(__file__)) + sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() + dense_slots = envs.get_global_env(name + "dense_slots", "").strip() + if sparse_slots == "" and dense_slots == "": + reader = dataloader_instance.dataloader_by_name( + reader_class, dataset_name, self._config_yaml) + reader_class = envs.lazy_instance_by_fliename(reader_class, + "TrainReader") + reader_ins = reader_class(self._config_yaml) + else: + reader = dataloader_instance.slotdataloader_by_name( + "", dataset_name, self._config_yaml) + reader_ins = SlotReader(self._config_yaml) + if hasattr(reader_ins, 'generate_batch_from_trainfiles'): + dataloader.set_sample_list_generator(reader) + else: + dataloader.set_sample_generator(reader, batch_size) + return dataloader + + def _create_dataset(self, dataset_name): + name = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(name + "sparse_slots") + dense_slots = envs.get_global_env(name + "dense_slots") + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + type_name = envs.get_global_env(name + "type") + if envs.get_platform() != "LINUX": + print("platform ", envs.get_platform(), + " change reader to DataLoader") + type_name = "DataLoader" + padding = 0 + + if type_name == "DataLoader": + return None + else: + return self._get_dataset(dataset_name) + + def init(self, context): + for model_dict in self._env["phase"]: + self._model[model_dict["name"]] = [None] * 5 + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + dataset_name = model_dict["dataset_name"] + opt_name = envs.get_global_env("hyper_parameters.optimizer.class") + opt_lr = envs.get_global_env( + "hyper_parameters.optimizer.learning_rate") + opt_strategy = envs.get_global_env( + "hyper_parameters.optimizer.strategy") + with fluid.program_guard(train_program, startup_program): + with fluid.unique_name.guard(): + with fluid.scope_guard(scope): + model_path = model_dict["model"].replace( + "{workspace}", + envs.path_adapter(self._env["workspace"])) + model = envs.lazy_instance_by_fliename( + model_path, "Model")(self._env) + model._infer_data_var = model.input_data( + dataset_name=model_dict["dataset_name"]) + if envs.get_global_env("dataset." + dataset_name + + ".type") == "DataLoader": + model._init_dataloader(is_infer=True) + self._get_dataloader(dataset_name, + model._data_loader) + model.net(model._infer_data_var, True) + self._model[model_dict["name"]][0] = train_program + self._model[model_dict["name"]][1] = startup_program + self._model[model_dict["name"]][2] = scope + self._model[model_dict["name"]][3] = model + self._model[model_dict["name"]][4] = train_program.clone() + + for dataset in self._env["dataset"]: + if dataset["type"] != "DataLoader": + self._dataset[dataset["name"]] = self._create_dataset(dataset[ + "name"]) + + context['status'] = 'startup_pass' + + def startup(self, context): + for model_dict in self._env["phase"]: + with fluid.scope_guard(self._model[model_dict["name"]][2]): + self._exe.run(self._model[model_dict["name"]][1]) + context['status'] = 'train_pass' + + def executor_train(self, context): + epochs = int( + envs.get_global_env("runner." + self._runner_name + ".epochs")) + for j in range(epochs): + for model_dict in self._env["phase"]: + if j == 0: + with fluid.scope_guard(self._model[model_dict["name"]][2]): + train_prog = self._model[model_dict["name"]][0] + startup_prog = self._model[model_dict["name"]][1] + with fluid.program_guard(train_prog, startup_prog): + self.load() + reader_name = model_dict["dataset_name"] + name = "dataset." + reader_name + "." + begin_time = time.time() + if envs.get_global_env(name + "type") == "DataLoader": + self._executor_dataloader_train(model_dict) + else: + self._executor_dataset_train(model_dict) + with fluid.scope_guard(self._model[model_dict["name"]][2]): + train_prog = self._model[model_dict["name"]][4] + startup_prog = self._model[model_dict["name"]][1] + with fluid.program_guard(train_prog, startup_prog): + self.save(j) + end_time = time.time() + seconds = end_time - begin_time + print("epoch {} done, time elasped: {}".format(j, seconds)) + context['status'] = "terminal_pass" + + def _executor_dataset_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + fetch_vars = [] + fetch_alias = [] + fetch_period = int( + envs.get_global_env("runner." + self._runner_name + + ".print_interval", 20)) + metrics = model_class.get_infer_results() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() + scope = self._model[model_name][2] + program = self._model[model_name][0] + reader = self._dataset[reader_name] + with fluid.scope_guard(scope): + self._exe.infer_from_dataset( + program=program, + dataset=reader, + fetch_list=fetch_vars, + fetch_info=fetch_alias, + print_period=fetch_period) + + def _executor_dataloader_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + program = self._model[model_name][0].clone() + fetch_vars = [] + fetch_alias = [] + metrics = model_class.get_infer_results() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() + metrics_varnames = [] + metrics_format = [] + fetch_period = int( + envs.get_global_env("runner." + self._runner_name + + ".print_interval", 20)) + metrics_format.append("{}: {{}}".format("batch")) + for name, var in metrics.items(): + metrics_varnames.append(var.name) + metrics_format.append("{}: {{}}".format(name)) + metrics_format = ", ".join(metrics_format) + + reader = self._model[model_name][3]._data_loader + reader.start() + batch_id = 0 + scope = self._model[model_name][2] + with fluid.scope_guard(scope): + try: + while True: + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames) + metrics = [batch_id] + metrics.extend(metrics_rets) + + if batch_id % fetch_period == 0 and batch_id != 0: + print(metrics_format.format(*metrics)) + batch_id += 1 + except fluid.core.EOFException: + reader.reset() + + def terminal(self, context): + context['is_exit'] = True + + def load(self, is_fleet=False): + name = "runner." + self._runner_name + "." + dirname = envs.get_global_env(name + "init_model_path", None) + if dirname is None or dirname == "": + return + print("single_infer going to load ", dirname) + if is_fleet: + fleet.load_persistables(self._exe, dirname) + else: + fluid.io.load_persistables(self._exe, dirname) + + def save(self, epoch_id, is_fleet=False): + def need_save(epoch_id, epoch_interval, is_last=False): + if is_last: + return True + if epoch_id == -1: + return False + + return epoch_id % epoch_interval == 0 + + def save_inference_model(): + name = "runner." + self._runner_name + "." + save_interval = int( + envs.get_global_env(name + "save_inference_interval", -1)) + if not need_save(epoch_id, save_interval, False): + return + feed_varnames = envs.get_global_env( + name + "save_inference_feed_varnames", None) + fetch_varnames = envs.get_global_env( + name + "save_inference_fetch_varnames", None) + if feed_varnames is None or fetch_varnames is None or feed_varnames == "": + return + fetch_vars = [ + fluid.default_main_program().global_block().vars[varname] + for varname in fetch_varnames + ] + dirname = envs.get_global_env(name + "save_inference_path", None) + + assert dirname is not None + dirname = os.path.join(dirname, str(epoch_id)) + + if is_fleet: + fleet.save_inference_model(self._exe, dirname, feed_varnames, + fetch_vars) + else: + fluid.io.save_inference_model(dirname, feed_varnames, + fetch_vars, self._exe) + + def save_persistables(): + name = "runner." + self._runner_name + "." + save_interval = int( + envs.get_global_env(name + "save_checkpoint_interval", -1)) + if not need_save(epoch_id, save_interval, False): + return + dirname = envs.get_global_env(name + "save_checkpoint_path", None) + if dirname is None or dirname == "": + return + dirname = os.path.join(dirname, str(epoch_id)) + if is_fleet: + fleet.save_persistables(self._exe, dirname) + else: + fluid.io.save_persistables(self._exe, dirname) + + save_persistables() + save_inference_model() diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py index a564ba5585c313a163542f028fa158f8c50c8d2a..66daf40276c0230505c00baa5ddf79449198074c 100755 --- a/core/trainers/single_trainer.py +++ b/core/trainers/single_trainer.py @@ -19,11 +19,13 @@ from __future__ import print_function import time import logging - +import os import paddle.fluid as fluid from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer from paddlerec.core.utils import envs +from paddlerec.core.reader import SlotReader +from paddlerec.core.utils import dataloader_instance logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("fluid") @@ -31,105 +33,323 @@ logger.setLevel(logging.INFO) class SingleTrainer(TranspileTrainer): + def __init__(self, config=None): + super(TranspileTrainer, self).__init__(config) + self._env = self._config + self.processor_register() + self._model = {} + self._dataset = {} + envs.set_global_envs(self._config) + envs.update_workspace() + self._runner_name = envs.get_global_env("mode") + device = envs.get_global_env("runner." + self._runner_name + ".device") + if device == 'gpu': + self._place = fluid.CUDAPlace(0) + elif device == 'cpu': + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + def processor_register(self): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) + self.regist_context_processor('train_pass', self.executor_train) + self.regist_context_processor('terminal_pass', self.terminal) + + def instance(self, context): + context['status'] = 'init_pass' - if envs.get_platform() == "LINUX" and envs.get_global_env( - "dataset_class", None, "train.reader") != "DataLoader": - self.regist_context_processor('train_pass', self.dataset_train) + def _get_dataset(self, dataset_name): + name = "dataset." + dataset_name + "." + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + reader_class = envs.get_global_env(name + "data_converter") + abs_dir = os.path.dirname(os.path.abspath(__file__)) + reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') + sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() + dense_slots = envs.get_global_env(name + "dense_slots", "").strip() + if sparse_slots == "" and dense_slots == "": + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, + "TRAIN", self._config_yaml) else: - self.regist_context_processor('train_pass', self.dataloader_train) + if sparse_slots == "": + sparse_slots = "?" + if dense_slots == "": + dense_slots = "?" + padding = envs.get_global_env(name + "padding", 0) + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, "fake", \ + sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding)) - self.regist_context_processor('infer_pass', self.infer) - self.regist_context_processor('terminal_pass', self.terminal) + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_batch_size(envs.get_global_env(name + "batch_size")) + dataset.set_pipe_command(pipe_cmd) + train_data_path = envs.get_global_env(name + "data_path") + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + dataset.set_filelist(file_list) + for model_dict in self._env["phase"]: + if model_dict["dataset_name"] == dataset_name: + model = self._model[model_dict["name"]][3] + inputs = model._data_var + dataset.set_use_var(inputs) + break + return dataset - def init(self, context): - self.model.train_net() - optimizer = self.model.optimizer() - optimizer.minimize((self.model.get_avg_cost())) + def _get_dataloader(self, dataset_name, dataloader): + name = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() + dense_slots = envs.get_global_env(name + "dense_slots", "").strip() + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + reader_class = envs.get_global_env(name + "data_converter") + abs_dir = os.path.dirname(os.path.abspath(__file__)) + if sparse_slots == "" and dense_slots == "": + reader = dataloader_instance.dataloader_by_name( + reader_class, dataset_name, self._config_yaml) + reader_class = envs.lazy_instance_by_fliename(reader_class, + "TrainReader") + reader_ins = reader_class(self._config_yaml) + else: + reader = dataloader_instance.slotdataloader_by_name( + "", dataset_name, self._config_yaml) + reader_ins = SlotReader(self._config_yaml) + if hasattr(reader_ins, 'generate_batch_from_trainfiles'): + dataloader.set_sample_list_generator(reader) + else: + dataloader.set_sample_generator(reader, batch_size) + return dataloader - self.fetch_vars = [] - self.fetch_alias = [] - self.fetch_period = self.model.get_fetch_period() + def _create_dataset(self, dataset_name): + name = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() + dense_slots = envs.get_global_env(name + "dense_slots", "").strip() + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + type_name = envs.get_global_env(name + "type") + if envs.get_platform() != "LINUX": + print("platform ", envs.get_platform(), + " change reader to DataLoader") + type_name = "DataLoader" + padding = 0 - metrics = self.model.get_metrics() - if metrics: - self.fetch_vars = metrics.values() - self.fetch_alias = metrics.keys() - evaluate_only = envs.get_global_env( - 'evaluate_only', False, namespace='evaluate') - if evaluate_only: - context['status'] = 'infer_pass' + if type_name == "DataLoader": + return None else: - context['status'] = 'startup_pass' + return self._get_dataset(dataset_name) + + def init(self, context): + for model_dict in self._env["phase"]: + self._model[model_dict["name"]] = [None] * 5 + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + dataset_name = model_dict["dataset_name"] + opt_name = envs.get_global_env("hyper_parameters.optimizer.class") + opt_lr = envs.get_global_env( + "hyper_parameters.optimizer.learning_rate") + opt_strategy = envs.get_global_env( + "hyper_parameters.optimizer.strategy") + with fluid.program_guard(train_program, startup_program): + with fluid.unique_name.guard(): + with fluid.scope_guard(scope): + model_path = model_dict["model"].replace( + "{workspace}", + envs.path_adapter(self._env["workspace"])) + model = envs.lazy_instance_by_fliename( + model_path, "Model")(self._env) + model._data_var = model.input_data( + dataset_name=model_dict["dataset_name"]) + if envs.get_global_env("dataset." + dataset_name + + ".type") == "DataLoader": + model._init_dataloader(is_infer=False) + self._get_dataloader(dataset_name, + model._data_loader) + model.net(model._data_var, False) + optimizer = model._build_optimizer(opt_name, opt_lr, + opt_strategy) + optimizer.minimize(model._cost) + self._model[model_dict["name"]][0] = train_program + self._model[model_dict["name"]][1] = startup_program + self._model[model_dict["name"]][2] = scope + self._model[model_dict["name"]][3] = model + self._model[model_dict["name"]][4] = train_program.clone() + + for dataset in self._env["dataset"]: + if dataset["type"] != "DataLoader": + self._dataset[dataset["name"]] = self._create_dataset(dataset[ + "name"]) + + context['status'] = 'startup_pass' def startup(self, context): - self._exe.run(fluid.default_startup_program()) + for model_dict in self._env["phase"]: + with fluid.scope_guard(self._model[model_dict["name"]][2]): + self._exe.run(self._model[model_dict["name"]][1]) context['status'] = 'train_pass' - def dataloader_train(self, context): - reader = self._get_dataloader("TRAIN") - epochs = envs.get_global_env("train.epochs") + def executor_train(self, context): + epochs = int( + envs.get_global_env("runner." + self._runner_name + ".epochs")) + for j in range(epochs): + for model_dict in self._env["phase"]: + if j == 0: + with fluid.scope_guard(self._model[model_dict["name"]][2]): + train_prog = self._model[model_dict["name"]][0] + startup_prog = self._model[model_dict["name"]][1] + with fluid.program_guard(train_prog, startup_prog): + self.load() + reader_name = model_dict["dataset_name"] + name = "dataset." + reader_name + "." + begin_time = time.time() + if envs.get_global_env(name + "type") == "DataLoader": + self._executor_dataloader_train(model_dict) + else: + self._executor_dataset_train(model_dict) + with fluid.scope_guard(self._model[model_dict["name"]][2]): + train_prog = self._model[model_dict["name"]][4] + startup_prog = self._model[model_dict["name"]][1] + with fluid.program_guard(train_prog, startup_prog): + self.save(j) + end_time = time.time() + seconds = end_time - begin_time + print("epoch {} done, time elasped: {}".format(j, seconds)) + context['status'] = "terminal_pass" - program = fluid.compiler.CompiledProgram(fluid.default_main_program( - )).with_data_parallel(loss_name=self.model.get_avg_cost().name) + def _executor_dataset_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + fetch_vars = [] + fetch_alias = [] + fetch_period = int( + envs.get_global_env("runner." + self._runner_name + + ".print_interval", 20)) + metrics = model_class.get_metrics() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() + scope = self._model[model_name][2] + program = self._model[model_name][0] + reader = self._dataset[reader_name] + with fluid.scope_guard(scope): + self._exe.train_from_dataset( + program=program, + dataset=reader, + fetch_list=fetch_vars, + fetch_info=fetch_alias, + print_period=fetch_period) + def _executor_dataloader_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + program = self._model[model_name][0].clone() + program = fluid.compiler.CompiledProgram(program).with_data_parallel( + loss_name=model_class.get_avg_cost().name) + fetch_vars = [] + fetch_alias = [] + fetch_period = int( + envs.get_global_env("runner." + self._runner_name + + ".print_interval", 20)) + metrics = model_class.get_metrics() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() metrics_varnames = [] metrics_format = [] - - metrics_format.append("{}: {{}}".format("epoch")) metrics_format.append("{}: {{}}".format("batch")) - - for name, var in self.model.get_metrics().items(): + for name, var in metrics.items(): metrics_varnames.append(var.name) metrics_format.append("{}: {{}}".format(name)) - metrics_format = ", ".join(metrics_format) - for epoch in range(epochs): - reader.start() - batch_id = 0 + reader = self._model[model_name][3]._data_loader + reader.start() + batch_id = 0 + scope = self._model[model_name][2] + with fluid.scope_guard(scope): try: while True: metrics_rets = self._exe.run(program=program, fetch_list=metrics_varnames) - - metrics = [epoch, batch_id] + metrics = [batch_id] metrics.extend(metrics_rets) - if batch_id % self.fetch_period == 0 and batch_id != 0: + if batch_id % fetch_period == 0 and batch_id != 0: print(metrics_format.format(*metrics)) batch_id += 1 except fluid.core.EOFException: reader.reset() - self.save(epoch, "train", is_fleet=False) - context['status'] = 'infer_pass' + def terminal(self, context): + context['is_exit'] = True - def dataset_train(self, context): - dataset = self._get_dataset("TRAIN") - ins = self._get_dataset_ins() + def load(self, is_fleet=False): + dirname = envs.get_global_env( + "runner." + self._runner_name + ".init_model_path", None) + if dirname is None or dirname == "": + return + print("going to load ", dirname) + if is_fleet: + fleet.load_persistables(self._exe, dirname) + else: + fluid.io.load_persistables(self._exe, dirname) - epochs = envs.get_global_env("train.epochs") - for i in range(epochs): - begin_time = time.time() - self._exe.train_from_dataset( - program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) - end_time = time.time() - times = end_time - begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format( - i, times, ins / times)) - - self.save(i, "train", is_fleet=False) - context['status'] = 'infer_pass' + def save(self, epoch_id, is_fleet=False): + def need_save(epoch_id, epoch_interval, is_last=False): + if is_last: + return True + if epoch_id == -1: + return False - def terminal(self, context): - for model in self.increment_models: - print("epoch :{}, dir: {}".format(model[0], model[1])) - context['is_exit'] = True + return epoch_id % epoch_interval == 0 + + def save_inference_model(): + name = "runner." + self._runner_name + "." + save_interval = int( + envs.get_global_env(name + "save_inference_interval", -1)) + if not need_save(epoch_id, save_interval, False): + return + feed_varnames = envs.get_global_env( + name + "save_inference_feed_varnames", []) + fetch_varnames = envs.get_global_env( + name + "save_inference_fetch_varnames", []) + if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \ + len(feed_varnames) == 0 or len(fetch_varnames) == 0: + return + fetch_vars = [ + fluid.default_main_program().global_block().vars[varname] + for varname in fetch_varnames + ] + dirname = envs.get_global_env(name + "save_inference_path", None) + + assert dirname is not None + dirname = os.path.join(dirname, str(epoch_id)) + + if is_fleet: + fleet.save_inference_model(self._exe, dirname, feed_varnames, + fetch_vars) + else: + fluid.io.save_inference_model(dirname, feed_varnames, + fetch_vars, self._exe) + + def save_persistables(): + name = "runner." + self._runner_name + "." + save_interval = int( + envs.get_global_env(name + "save_checkpoint_interval", -1)) + if not need_save(epoch_id, save_interval, False): + return + dirname = envs.get_global_env(name + "save_checkpoint_path", None) + if dirname is None or dirname == "": + return + dirname = os.path.join(dirname, str(epoch_id)) + if is_fleet: + fleet.save_persistables(self._exe, dirname) + else: + fluid.io.save_persistables(self._exe, dirname) + + save_persistables() + save_inference_model() diff --git a/core/trainers/transpiler_trainer.py b/core/trainers/transpiler_trainer.py index c121b4abb624503936faca8e77902a97e3f0cf82..b3c09797b6e023bc5968fabbd70676522d885f19 100755 --- a/core/trainers/transpiler_trainer.py +++ b/core/trainers/transpiler_trainer.py @@ -119,6 +119,10 @@ class TranspileTrainer(Trainer): pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state, self._config_yaml) else: + if sparse_slots is None: + sparse_slots = "#" + if dense_slots is None: + dense_slots = "#" padding = envs.get_global_env("padding", 0, namespace) pipe_cmd = "python {} {} {} {} {} {} {} {}".format( reader, "slot", "slot", self._config_yaml, namespace, \ diff --git a/core/utils/dataloader_instance.py b/core/utils/dataloader_instance.py index 8d4db6f82c05a41a0945f2c882caefd2a3c83d36..a26e2df20876e48bec5a1f30492987605613ef8a 100755 --- a/core/utils/dataloader_instance.py +++ b/core/utils/dataloader_instance.py @@ -14,13 +14,93 @@ from __future__ import print_function import os - from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.utils.envs import get_global_env from paddlerec.core.utils.envs import get_runtime_environ from paddlerec.core.reader import SlotReader +def dataloader_by_name(readerclass, dataset_name, yaml_file): + reader_class = lazy_instance_by_fliename(readerclass, "TrainReader") + name = "dataset." + dataset_name + "." + data_path = get_global_env(name + "data_path") + + if data_path.startswith("paddlerec::"): + package_base = get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + data_path = os.path.join(package_base, data_path.split("::")[1]) + + files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + reader = reader_class(yaml_file) + reader.init() + + def gen_reader(): + for file in files: + with open(file, 'r') as f: + for line in f: + line = line.rstrip('\n') + iter = reader.generate_sample(line) + for parsed_line in iter(): + if parsed_line is None: + continue + else: + values = [] + for pased in parsed_line: + values.append(pased[1]) + yield values + + def gen_batch_reader(): + return reader.generate_batch_from_trainfiles(files) + + if hasattr(reader, 'generate_batch_from_trainfiles'): + return gen_batch_reader() + return gen_reader + + +def slotdataloader_by_name(readerclass, dataset_name, yaml_file): + name = "dataset." + dataset_name + "." + reader_name = "SlotReader" + data_path = get_global_env(name + "data_path") + + if data_path.startswith("paddlerec::"): + package_base = get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + data_path = os.path.join(package_base, data_path.split("::")[1]) + + files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + sparse = get_global_env(name + "sparse_slots", "#") + if sparse == "": + sparse = "#" + dense = get_global_env(name + "dense_slots", "#") + if dense == "": + dense = "#" + padding = get_global_env(name + "padding", 0) + reader = SlotReader(yaml_file) + reader.init(sparse, dense, int(padding)) + + def gen_reader(): + for file in files: + with open(file, 'r') as f: + for line in f: + line = line.rstrip('\n') + iter = reader.generate_sample(line) + for parsed_line in iter(): + if parsed_line is None: + continue + else: + values = [] + for pased in parsed_line: + values.append(pased[1]) + yield values + + def gen_batch_reader(): + return reader.generate_batch_from_trainfiles(files) + + if hasattr(reader, 'generate_batch_from_trainfiles'): + return gen_batch_reader() + return gen_reader + + def dataloader(readerclass, train, yaml_file): if train == "TRAIN": reader_name = "TrainReader" @@ -82,8 +162,12 @@ def slotdataloader(readerclass, train, yaml_file): files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] - sparse = get_global_env("sparse_slots", None, namespace) - dense = get_global_env("dense_slots", None, namespace) + sparse = get_global_env("sparse_slots", "#", namespace) + if sparse == "": + sparse = "#" + dense = get_global_env("dense_slots", "#", namespace) + if dense == "": + dense = "#" padding = get_global_env("padding", 0, namespace) reader = SlotReader(yaml_file) reader.init(sparse, dense, int(padding)) diff --git a/core/utils/dataset_instance.py b/core/utils/dataset_instance.py index 2e6082dc5e381b6ac2fc46f7fb6fbe73d4214b69..3f0a3a484dfccfbc26216cc9eb09fe8443401078 100755 --- a/core/utils/dataset_instance.py +++ b/core/utils/dataset_instance.py @@ -32,8 +32,8 @@ elif sys.argv[2].upper() == "EVALUATE": else: reader_name = "SlotReader" namespace = sys.argv[4] - sparse_slots = sys.argv[5].replace("#", " ") - dense_slots = sys.argv[6].replace("#", " ") + sparse_slots = sys.argv[5].replace("?", " ") + dense_slots = sys.argv[6].replace("?", " ") padding = int(sys.argv[7]) yaml_abs_path = sys.argv[3] diff --git a/core/utils/envs.py b/core/utils/envs.py index bc222e906448435031024281a0a80298073d3979..f432950dfa50571cd307d4a370484e35ff77b408 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -68,12 +68,20 @@ def set_global_envs(envs): nests = copy.deepcopy(namespace_nests) nests.append(k) fatten_env_namespace(nests, v) + elif (k == "dataset" or k == "phase" or + k == "runner") and isinstance(v, list): + for i in v: + if i.get("name") is None: + raise ValueError("name must be in dataset list ", v) + nests = copy.deepcopy(namespace_nests) + nests.append(k) + nests.append(i["name"]) + fatten_env_namespace(nests, i) else: global_k = ".".join(namespace_nests + [k]) global_envs[global_k] = v - for k, v in envs.items(): - fatten_env_namespace([k], v) + fatten_env_namespace([], envs) def get_global_env(env_name, default_value=None, namespace=None): @@ -106,7 +114,7 @@ def windows_path_converter(path): def update_workspace(): - workspace = global_envs.get("train.workspace", None) + workspace = global_envs.get("workspace") if not workspace: return workspace = path_adapter(workspace) diff --git a/doc/yaml.md b/doc/yaml.md new file mode 100644 index 0000000000000000000000000000000000000000..18492cc7898e38d9c76c26a3586e0613acc8ebc2 --- /dev/null +++ b/doc/yaml.md @@ -0,0 +1,66 @@ +``` +# 全局配置 +debug: false +workspace: "." + + +# 用户可以配多个dataset,exector里不同阶段可以用不同的dataset +dataset: + - name: sample_1 + type: DataLoader #或者QueueDataset + batch_size: 5 + data_path: "{workspace}/data/train" + # 用户自定义reader + data_converter: "{workspace}/rsc15_reader.py" + + - name: sample_2 + type: QueueDataset #或者DataLoader + batch_size: 5 + data_path: "{workspace}/data/train" + # 用户可以配置sparse_slots和dense_slots,无需再定义data_converter + sparse_slots: "click ins_weight 6001 6002 6003 6005 6006 6007 6008 6009" + dense_slots: "readlist:9" + + +#示例一,用户自定义参数,用于组网配置 +hyper_parameters: + #优化器 + optimizer: + class: Adam + learning_rate: 0.001 + strategy: "{workspace}/conf/config_fleet.py" + # 用户自定义配置 + vocab_size: 1000 + hid_size: 100 + my_key1: 233 + my_key2: 0.1 + + +mode: runner1 + +runner: + - name: runner1 # 示例一,train + trainer_class: single_train + epochs: 10 + device: cpu + init_model_path: "" + save_checkpoint_interval: 2 + save_inference_interval: 4 + # 下面是保存模型路径配置 + save_checkpoint_path: "xxxx" + save_inference_path: "xxxx" + + - name: runner2 # 示例二,infer + trainer_class: single_train + epochs: 1 + device: cpu + init_model_path: "afs:/xxx/xxx" + + + +phase: +- name: phase1 + model: "{workspace}/model.py" + dataset_name: sample_1 + thread_num: 1 +``` diff --git a/models/rank/dnn/config.yaml b/models/rank/dnn/config.yaml index ff59d9ef023da6dbaee01fda519abd9cfa75b1e3..fd64935dd2080291fc13911befc0481604c3464a 100755 --- a/models/rank/dnn/config.yaml +++ b/models/rank/dnn/config.yaml @@ -12,39 +12,72 @@ # See the License for the specific language governing permissions and # limitations under the License. -train: - epochs: 10 - engine: single - workspace: "paddlerec.models.rank.dnn" +# workspace +workspace: "paddlerec.models.rank.dnn" - trainer: - # for cluster training - strategy: "async" +# list of dataset +dataset: +- name: dataset_train # name of dataset to distinguish different datasets + batch_size: 2 + type: DataLoader # or QueueDataset + data_path: "{workspace}/data/sample_data/train" + sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" + dense_slots: "dense_var:13" +- name: dataset_infer # name + batch_size: 2 + type: DataLoader # or QueueDataset + data_path: "{workspace}/data/sample_data/train" + sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" + dense_slots: "dense_var:13" - reader: - batch_size: 2 - train_data_path: "{workspace}/data/sample_data/train" - reader_debug_mode: False - sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" - dense_slots: "dense_var:13" +# hyper parameters of user-defined network +hyper_parameters: + # optimizer config + optimizer: + class: Adam + learning_rate: 0.001 + strategy: async + # user-defined pairs + sparse_inputs_slots: 27 + sparse_feature_number: 1000001 + sparse_feature_dim: 9 + dense_input_dim: 13 + fc_sizes: [512, 256, 128, 32] - model: - models: "{workspace}/model.py" - hyper_parameters: - sparse_inputs_slots: 27 - sparse_feature_number: 1000001 - sparse_feature_dim: 9 - dense_input_dim: 13 - fc_sizes: [512, 256, 128, 32] - learning_rate: 0.001 - optimizer: adam +# select runner by name +mode: runner1 +# config of each runner. +# runner is a kind of paddle training class, which wraps the train/infer process. +runner: +- name: runner1 + class: single_train + # num of epochs + epochs: 10 + # device to run training or infer + device: cpu + save_checkpoint_interval: 2 # save model interval of epochs + save_inference_interval: 4 # save inference + save_checkpoint_path: "increment" # save checkpoint path + save_inference_path: "inference" # save inference path + save_inference_feed_varnames: [] # feed vars of save inference + save_inference_fetch_varnames: [] # fetch vars of save inference + init_model_path: "" # load model path + print_interval: 10 +- name: runner2 + class: single_infer + # num of epochs + epochs: 10 + # device to run training or infer + device: cpu + init_model_path: "increment/0" # load model path - save: - increment: - dirname: "increment" - epoch_interval: 2 - save_last: True - inference: - dirname: "inference" - epoch_interval: 4 - save_last: True +# runner will run all the phase in each epoch +phase: +- name: phase1 + model: "{workspace}/model.py" # user-defined model + dataset_name: dataset_train # select dataset by name + thread_num: 1 +#- name: phase2 +# model: "{workspace}/model.py" # user-defined model +# dataset_name: dataset_infer # select dataset by name +# thread_num: 1 diff --git a/models/rank/dnn/model.py b/models/rank/dnn/model.py index 644bbd7c98ec8360faa22969e245f284946947d8..f4425e3d9853b7f7decbc45ff607c4173901d0cf 100755 --- a/models/rank/dnn/model.py +++ b/models/rank/dnn/model.py @@ -28,15 +28,15 @@ class Model(ModelBase): self.is_distributed = True if envs.get_trainer( ) == "CtrTrainer" else False self.sparse_feature_number = envs.get_global_env( - "hyper_parameters.sparse_feature_number", None, self._namespace) + "hyper_parameters.sparse_feature_number") self.sparse_feature_dim = envs.get_global_env( - "hyper_parameters.sparse_feature_dim", None, self._namespace) + "hyper_parameters.sparse_feature_dim") self.learning_rate = envs.get_global_env( - "hyper_parameters.learning_rate", None, self._namespace) + "hyper_parameters.learning_rate") def net(self, input, is_infer=False): self.sparse_inputs = self._sparse_data_var[1:] - self.dense_input = self._dense_data_var[0] + self.dense_input = [] #self._dense_data_var[0] self.label_input = self._sparse_data_var[0] def embedding_layer(input): @@ -52,12 +52,11 @@ class Model(ModelBase): return emb_sum sparse_embed_seq = list(map(embedding_layer, self.sparse_inputs)) - concated = fluid.layers.concat( - sparse_embed_seq + [self.dense_input], axis=1) + concated = fluid.layers.concat(sparse_embed_seq, axis=1) + #sparse_embed_seq + [self.dense_input], axis=1) fcs = [concated] - hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None, - self._namespace) + hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes") for size in hidden_layers: output = fluid.layers.fc( @@ -78,16 +77,21 @@ class Model(ModelBase): self.predict = predict - cost = fluid.layers.cross_entropy( - input=self.predict, label=self.label_input) - avg_cost = fluid.layers.reduce_mean(cost) - self._cost = avg_cost auc, batch_auc, _ = fluid.layers.auc(input=self.predict, label=self.label_input, num_thresholds=2**12, slide_steps=20) + if is_infer: + self._infer_results["AUC"] = auc + self._infer_results["BATCH_AUC"] = batch_auc + return + self._metrics["AUC"] = auc self._metrics["BATCH_AUC"] = batch_auc + cost = fluid.layers.cross_entropy( + input=self.predict, label=self.label_input) + avg_cost = fluid.layers.reduce_mean(cost) + self._cost = avg_cost def optimizer(self): optimizer = fluid.optimizer.Adam(self.learning_rate, lazy_mode=True) diff --git a/run.py b/run.py index c80c647d0c8bab5cd9918429f9bf460b6093335d..594801fcdd5edb1821799ef53994674aec6a934d 100755 --- a/run.py +++ b/run.py @@ -18,7 +18,7 @@ import subprocess import argparse import tempfile import yaml - +import copy from paddlerec.core.factory import TrainerFactory from paddlerec.core.utils import envs from paddlerec.core.utils import util @@ -27,8 +27,8 @@ engines = {} device = ["CPU", "GPU"] clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"] engine_choices = [ - "SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER", - "TDM_CLUSTER" + "SINGLE_TRAIN", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", + "TDM_LOCAL_CLUSTER", "TDM_CLUSTER", "SINGLE_INFER" ] custom_model = ['TDM'] model_name = "" @@ -38,35 +38,73 @@ def engine_registry(): engines["TRANSPILER"] = {} engines["PSLIB"] = {} - engines["TRANSPILER"]["SINGLE"] = single_engine + engines["TRANSPILER"]["SINGLE_TRAIN"] = single_train_engine + engines["TRANSPILER"]["SINGLE_INFER"] = single_infer_engine engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine engines["TRANSPILER"]["CLUSTER"] = cluster_engine - engines["PSLIB"]["SINGLE"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine -def get_inters_from_yaml(file, filter): +def get_inters_from_yaml(file, filters): with open(file, 'r') as rb: _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) flattens = envs.flatten_environs(_envs) - inters = {} for k, v in flattens.items(): - if k.startswith(filter): - inters[k] = v + for f in filters: + if k.startswith(f): + inters[k] = v return inters +def get_all_inters_from_yaml(file, filters): + with open(file, 'r') as rb: + _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) + all_flattens = {} + + def fatten_env_namespace(namespace_nests, local_envs): + for k, v in local_envs.items(): + if isinstance(v, dict): + nests = copy.deepcopy(namespace_nests) + nests.append(k) + fatten_env_namespace(nests, v) + elif (k == "dataset" or k == "phase" or + k == "runner") and isinstance(v, list): + for i in v: + if i.get("name") is None: + raise ValueError("name must be in dataset list ", v) + nests = copy.deepcopy(namespace_nests) + nests.append(k) + nests.append(i["name"]) + fatten_env_namespace(nests, i) + else: + global_k = ".".join(namespace_nests + [k]) + all_flattens[global_k] = v + + fatten_env_namespace([], _envs) + ret = {} + for k, v in all_flattens.items(): + for f in filters: + if k.startswith(f): + ret[k] = v + return ret + + def get_engine(args): transpiler = get_transpiler() - run_extras = get_inters_from_yaml(args.model, "train.") - - engine = run_extras.get("train.engine", "single") + with open(args.model, 'r') as rb: + envs = yaml.load(rb.read(), Loader=yaml.FullLoader) + run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) + + engine = run_extras.get("train.engine", None) + if engine is None: + engine = run_extras.get("runner." + envs["mode"] + ".class", None) + if engine is None: + engine = "single_train" engine = engine.upper() - if engine not in engine_choices: raise ValueError("train.engin can not be chosen in {}".format( engine_choices)) @@ -117,15 +155,27 @@ def get_trainer_prefix(args): return "" -def single_engine(args): +def single_train_engine(args): trainer = get_trainer_prefix(args) + "SingleTrainer" single_envs = {} single_envs["train.trainer.trainer"] = trainer single_envs["train.trainer.threads"] = "2" - single_envs["train.trainer.engine"] = "single" + single_envs["train.trainer.engine"] = "single_train" single_envs["train.trainer.platform"] = envs.get_platform() print("use {} engine to run model: {}".format(trainer, args.model)) + set_runtime_envs(single_envs, args.model) + trainer = TrainerFactory.create(args.model) + return trainer + +def single_infer_engine(args): + trainer = get_trainer_prefix(args) + "SingleInfer" + single_envs = {} + single_envs["train.trainer.trainer"] = trainer + single_envs["train.trainer.threads"] = "2" + single_envs["train.trainer.engine"] = "single_infer" + single_envs["train.trainer.platform"] = envs.get_platform() + print("use {} engine to run model: {}".format(trainer, args.model)) set_runtime_envs(single_envs, args.model) trainer = TrainerFactory.create(args.model) return trainer