From 019cb085755912efe01e8abe86a8391abfcfeebd Mon Sep 17 00:00:00 2001 From: xjqbest <173596896@qq.com> Date: Thu, 28 May 2020 16:23:55 +0800 Subject: [PATCH] fix --- core/factory.py | 1 + core/trainers/single_trainer.py | 90 ++++++++++++--------------------- core/utils/envs.py | 3 +- models/rank/dnn/config.yaml | 48 ++++++++++-------- run.py | 67 ++++++++++++++++++++---- 5 files changed, 121 insertions(+), 88 deletions(-) diff --git a/core/factory.py b/core/factory.py index 470b3a02..2e2d013b 100755 --- a/core/factory.py +++ b/core/factory.py @@ -26,6 +26,7 @@ trainers = {} def trainer_registry(): trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py") + trainers["SingleInfer"] = os.path.join(trainer_abs, "single_infer.py") trainers["ClusterTrainer"] = os.path.join(trainer_abs, "cluster_trainer.py") trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py index a6c21265..73d82b1a 100755 --- a/core/trainers/single_trainer.py +++ b/core/trainers/single_trainer.py @@ -47,6 +47,7 @@ class SingleTrainer(TranspileTrainer): self._dataset = {} envs.set_global_envs(self._config) envs.update_workspace() + self._runner_name = envs.get_global_env("mode") def processor_register(self): self.regist_context_processor('uninit', self.instance) @@ -90,13 +91,10 @@ class SingleTrainer(TranspileTrainer): for x in os.listdir(train_data_path) ] dataset.set_filelist(file_list) - for model_dict in self._env["executor"]: + for model_dict in self._env["phase"]: if model_dict["dataset_name"] == dataset_name: model = self._model[model_dict["name"]][3] - if model_dict["is_infer"]: - inputs = model._infer_data_var - else: - inputs = model._data_var + inputs = model._data_var dataset.set_use_var(inputs) break return dataset @@ -144,7 +142,7 @@ class SingleTrainer(TranspileTrainer): return self._get_dataset(dataset_name) def init(self, context): - for model_dict in self._env["executor"]: + for model_dict in self._env["phase"]: self._model[model_dict["name"]] = [None] * 5 train_program = fluid.Program() startup_program = fluid.Program() @@ -163,26 +161,17 @@ class SingleTrainer(TranspileTrainer): envs.path_adapter(self._env["workspace"])) model = envs.lazy_instance_by_fliename( model_path, "Model")(self._env) - is_infer = model_dict.get("is_infer", False) - if is_infer: - model._infer_data_var = model.input_data( - dataset_name=model_dict["dataset_name"]) - else: - model._data_var = model.input_data( - dataset_name=model_dict["dataset_name"]) + model._data_var = model.input_data( + dataset_name=model_dict["dataset_name"]) if envs.get_global_env("dataset." + dataset_name + ".type") == "DataLoader": - model._init_dataloader(is_infer=is_infer) + model._init_dataloader(is_infer=False) self._get_dataloader(dataset_name, model._data_loader) - if is_infer: - model.net(model._infer_data_var, True) - else: - model.net(model._data_var, False) - optimizer = model._build_optimizer( - opt_name, opt_lr, opt_strategy) - optimizer.minimize(model._cost) - model_dict["is_infer"] = is_infer + model.net(model._data_var, False) + optimizer = model._build_optimizer(opt_name, opt_lr, + opt_strategy) + optimizer.minimize(model._cost) self._model[model_dict["name"]][0] = train_program self._model[model_dict["name"]][1] = startup_program self._model[model_dict["name"]][2] = scope @@ -197,7 +186,7 @@ class SingleTrainer(TranspileTrainer): context['status'] = 'startup_pass' def startup(self, context): - for model_dict in self._env["executor"]: + for model_dict in self._env["phase"]: with fluid.scope_guard(self._model[model_dict["name"]][2]): self._exe.run(self._model[model_dict["name"]][1]) context['status'] = 'train_pass' @@ -205,7 +194,7 @@ class SingleTrainer(TranspileTrainer): def executor_train(self, context): epochs = int(self._env["epochs"]) for j in range(epochs): - for model_dict in self._env["executor"]: + for model_dict in self._env["phase"]: if j == 0: with fluid.scope_guard(self._model[model_dict["name"]][2]): train_prog = self._model[model_dict["name"]][0] @@ -236,10 +225,7 @@ class SingleTrainer(TranspileTrainer): fetch_vars = [] fetch_alias = [] fetch_period = 20 - if model_dict["is_infer"]: - metrics = model_class.get_infer_results() - else: - metrics = model_class.get_metrics() + metrics = model_class.get_metrics() if metrics: fetch_vars = metrics.values() fetch_alias = metrics.keys() @@ -247,37 +233,24 @@ class SingleTrainer(TranspileTrainer): program = self._model[model_name][0] reader = self._dataset[reader_name] with fluid.scope_guard(scope): - if model_dict["is_infer"]: - self._exe.infer_from_dataset( - program=program, - dataset=reader, - fetch_list=fetch_vars, - fetch_info=fetch_alias, - print_period=fetch_period) - else: - self._exe.train_from_dataset( - program=program, - dataset=reader, - fetch_list=fetch_vars, - fetch_info=fetch_alias, - print_period=fetch_period) + self._exe.train_from_dataset( + program=program, + dataset=reader, + fetch_list=fetch_vars, + fetch_info=fetch_alias, + print_period=fetch_period) def _executor_dataloader_train(self, model_dict): reader_name = model_dict["dataset_name"] model_name = model_dict["name"] model_class = self._model[model_name][3] program = self._model[model_name][0].clone() - if not model_dict["is_infer"]: - program = fluid.compiler.CompiledProgram( - program).with_data_parallel( - loss_name=model_class.get_avg_cost().name) + program = fluid.compiler.CompiledProgram(program).with_data_parallel( + loss_name=model_class.get_avg_cost().name) fetch_vars = [] fetch_alias = [] fetch_period = 20 - if model_dict["is_infer"]: - metrics = model_class.get_infer_results() - else: - metrics = model_class.get_metrics() + metrics = model_class.get_metrics() if metrics: fetch_vars = metrics.values() fetch_alias = metrics.keys() @@ -312,7 +285,8 @@ class SingleTrainer(TranspileTrainer): context['is_exit'] = True def load(self, is_fleet=False): - dirname = envs.get_global_env("epoch.init_model_path", None) + dirname = envs.get_global_env( + "runner." + self._runner_name + ".init_model_path", None) if dirname is None: return print("going to load ", dirname) @@ -331,21 +305,22 @@ class SingleTrainer(TranspileTrainer): return epoch_id % epoch_interval == 0 def save_inference_model(): + name = "runner." + self._runner_name + "." save_interval = int( - envs.get_global_env("epoch.save_inference_interval", -1)) + envs.get_global_env(name + "save_inference_interval", -1)) if not need_save(epoch_id, save_interval, False): return feed_varnames = envs.get_global_env( - "epoch.save_inference_feed_varnames", None) + name + "save_inference_feed_varnames", None) fetch_varnames = envs.get_global_env( - "epoch.save_inference_fetch_varnames", None) + name + "save_inference_fetch_varnames", None) if feed_varnames is None or fetch_varnames is None or feed_varnames == "": return fetch_vars = [ fluid.default_main_program().global_block().vars[varname] for varname in fetch_varnames ] - dirname = envs.get_global_env("epoch.save_inference_path", None) + dirname = envs.get_global_env(name + "save_inference_path", None) assert dirname is not None dirname = os.path.join(dirname, str(epoch_id)) @@ -358,11 +333,12 @@ class SingleTrainer(TranspileTrainer): fetch_vars, self._exe) def save_persistables(): + name = "runner." + self._runner_name + "." save_interval = int( - envs.get_global_env("epoch.save_checkpoint_interval", -1)) + envs.get_global_env(name + "save_checkpoint_interval", -1)) if not need_save(epoch_id, save_interval, False): return - dirname = envs.get_global_env("epoch.save_checkpoint_path", None) + dirname = envs.get_global_env(name + "save_checkpoint_path", None) if dirname is None or dirname == "": return dirname = os.path.join(dirname, str(epoch_id)) diff --git a/core/utils/envs.py b/core/utils/envs.py index 1771adc9..f432950d 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -68,7 +68,8 @@ def set_global_envs(envs): nests = copy.deepcopy(namespace_nests) nests.append(k) fatten_env_namespace(nests, v) - elif (k == "dataset" or k == "executor") and isinstance(v, list): + elif (k == "dataset" or k == "phase" or + k == "runner") and isinstance(v, list): for i in v: if i.get("name") is None: raise ValueError("name must be in dataset list ", v) diff --git a/models/rank/dnn/config.yaml b/models/rank/dnn/config.yaml index 29080797..ae06cf3f 100755 --- a/models/rank/dnn/config.yaml +++ b/models/rank/dnn/config.yaml @@ -21,12 +21,18 @@ workspace: "paddlerec.models.rank.dnn" # dataset列表 dataset: -- name: dataset_2 # 名字,用来区分不同的dataset +- name: dataset_train # 名字,用来区分不同的dataset batch_size: 2 type: DataLoader # 或者QueueDataset data_path: "{workspace}/data/sample_data/train" # 数据路径 sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" dense_slots: "dense_var:13" +- name: dataset_infer # 名字,用来区分不同的dataset + batch_size: 2 + type: DataLoader # 或者QueueDataset + data_path: "{workspace}/data/sample_data/test" # 数据路径 + sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" + dense_slots: "dense_var:13" # 超参数 hyper_parameters: @@ -42,27 +48,29 @@ hyper_parameters: dense_input_dim: 13 fc_sizes: [512, 256, 128, 32] -# executor配置 -epoch: - name: - trainer_class: single - save_checkpoint_interval: 2 # 保存模型 - save_inference_interval: 4 # 保存预测模型 - save_checkpoint_path: "increment" # 保存模型路径 - save_inference_path: "inference" # 保存预测模型路径 - #save_inference_feed_varnames: [] # 预测模型feed vars - #save_inference_fetch_varnames: [] # 预测模型 fetch vars - #init_model_path: "xxxx" # 加载模型 +mode: runner1 +# runner配置 +runner: + - name: runner1 + class: single_train + save_checkpoint_interval: 2 # 保存模型 + save_inference_interval: 4 # 保存预测模型 + save_checkpoint_path: "increment" # 保存模型路径 + save_inference_path: "inference" # 保存预测模型路径 + #save_inference_feed_varnames: [] # 预测模型feed vars + #save_inference_fetch_varnames: [] # 预测模型 fetch vars + #init_model_path: "xxxx" # 加载模型 + - name: runner2 + class: single_infer + init_model_path: "increment/0" # 加载模型 -# 执行器,每轮要跑的所有模型 -executor: - - name: train +# 执行器,每轮要跑的所有阶段 +phase: + - name: phase1 model: "{workspace}/model.py" # 模型路径 - dataset_name: dataset_2 # 名字,用来区分不同的阶段 + dataset_name: dataset_train # 名字,用来区分不同的阶段 thread_num: 1 # 线程数 - is_infer: False # 是否是infer -# - name: infer +# - name: phase2 # model: "{workspace}/model.py" # 模型路径 -# dataset_name: dataset_2 # 名字,用来区分不同的阶段 +# dataset_name: dataset_infer # 名字,用来区分不同的阶段 # thread_num: 1 # 线程数 -# is_infer: True # 是否是infer diff --git a/run.py b/run.py index 830ec905..594801fc 100755 --- a/run.py +++ b/run.py @@ -18,7 +18,7 @@ import subprocess import argparse import tempfile import yaml - +import copy from paddlerec.core.factory import TrainerFactory from paddlerec.core.utils import envs from paddlerec.core.utils import util @@ -27,8 +27,8 @@ engines = {} device = ["CPU", "GPU"] clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"] engine_choices = [ - "SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER", - "TDM_CLUSTER" + "SINGLE_TRAIN", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", + "TDM_LOCAL_CLUSTER", "TDM_CLUSTER", "SINGLE_INFER" ] custom_model = ['TDM'] model_name = "" @@ -38,7 +38,8 @@ def engine_registry(): engines["TRANSPILER"] = {} engines["PSLIB"] = {} - engines["TRANSPILER"]["SINGLE"] = single_engine + engines["TRANSPILER"]["SINGLE_TRAIN"] = single_train_engine + engines["TRANSPILER"]["SINGLE_INFER"] = single_infer_engine engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine engines["TRANSPILER"]["CLUSTER"] = cluster_engine engines["PSLIB"]["SINGLE"] = local_mpi_engine @@ -51,7 +52,6 @@ def get_inters_from_yaml(file, filters): _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) flattens = envs.flatten_environs(_envs) - inters = {} for k, v in flattens.items(): for f in filters: @@ -60,15 +60,50 @@ def get_inters_from_yaml(file, filters): return inters +def get_all_inters_from_yaml(file, filters): + with open(file, 'r') as rb: + _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) + all_flattens = {} + + def fatten_env_namespace(namespace_nests, local_envs): + for k, v in local_envs.items(): + if isinstance(v, dict): + nests = copy.deepcopy(namespace_nests) + nests.append(k) + fatten_env_namespace(nests, v) + elif (k == "dataset" or k == "phase" or + k == "runner") and isinstance(v, list): + for i in v: + if i.get("name") is None: + raise ValueError("name must be in dataset list ", v) + nests = copy.deepcopy(namespace_nests) + nests.append(k) + nests.append(i["name"]) + fatten_env_namespace(nests, i) + else: + global_k = ".".join(namespace_nests + [k]) + all_flattens[global_k] = v + + fatten_env_namespace([], _envs) + ret = {} + for k, v in all_flattens.items(): + for f in filters: + if k.startswith(f): + ret[k] = v + return ret + + def get_engine(args): transpiler = get_transpiler() - run_extras = get_inters_from_yaml(args.model, ["train.", "epoch."]) + with open(args.model, 'r') as rb: + envs = yaml.load(rb.read(), Loader=yaml.FullLoader) + run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) engine = run_extras.get("train.engine", None) if engine is None: - engine = run_extras.get("epoch.trainer_class", None) + engine = run_extras.get("runner." + envs["mode"] + ".class", None) if engine is None: - engine = "single" + engine = "single_train" engine = engine.upper() if engine not in engine_choices: raise ValueError("train.engin can not be chosen in {}".format( @@ -120,15 +155,27 @@ def get_trainer_prefix(args): return "" -def single_engine(args): +def single_train_engine(args): trainer = get_trainer_prefix(args) + "SingleTrainer" single_envs = {} single_envs["train.trainer.trainer"] = trainer single_envs["train.trainer.threads"] = "2" - single_envs["train.trainer.engine"] = "single" + single_envs["train.trainer.engine"] = "single_train" single_envs["train.trainer.platform"] = envs.get_platform() print("use {} engine to run model: {}".format(trainer, args.model)) + set_runtime_envs(single_envs, args.model) + trainer = TrainerFactory.create(args.model) + return trainer + +def single_infer_engine(args): + trainer = get_trainer_prefix(args) + "SingleInfer" + single_envs = {} + single_envs["train.trainer.trainer"] = trainer + single_envs["train.trainer.threads"] = "2" + single_envs["train.trainer.engine"] = "single_infer" + single_envs["train.trainer.platform"] = envs.get_platform() + print("use {} engine to run model: {}".format(trainer, args.model)) set_runtime_envs(single_envs, args.model) trainer = TrainerFactory.create(args.model) return trainer -- GitLab