diff --git a/core/model.py b/core/model.py index 30bbb7fc327f1df236aeee181ebbb0a7fca0b930..83181875fa3eb64d9c4085aa68356ade6f2ef720 100755 --- a/core/model.py +++ b/core/model.py @@ -35,7 +35,6 @@ class ModelBase(object): self._data_loader = None self._infer_data_loader = None self._fetch_interval = 20 - self._namespace = "train.model" self._platform = envs.get_platform() self._init_hyper_parameters() self._env = config @@ -50,11 +49,11 @@ class ModelBase(object): self._slot_inited = True dataset = {} model_dict = {} - for i in self._env["executor"]: + for i in envs.get_global_env("phase"): if i["name"] == kargs["name"]: model_dict = i break - for i in self._env["dataset"]: + for i in envs.get_global_env("dataset"): if i["name"] == model_dict["dataset_name"]: dataset = i break @@ -139,8 +138,7 @@ class ModelBase(object): os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0' if name == "SGD": - reg = envs.get_global_env("hyper_parameters.reg", 0.0001, - self._namespace) + reg = envs.get_global_env("hyper_parameters.reg", 0.0001) optimizer_i = fluid.optimizer.SGD( lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) elif name == "ADAM": diff --git a/core/trainer.py b/core/trainer.py index 116368b864d7048b0ac506faa7921e791f3b4051..53a9cc1eb89e6eda2d08bc8f684bc4e8e4441e5c 100755 --- a/core/trainer.py +++ b/core/trainer.py @@ -64,26 +64,21 @@ class Trainer(object): self.increment_models = [] self._exector_context = {} self._context = {'status': 'uninit', 'is_exit': False} - self._config_yaml = config - self._context["config_yaml"] = self._config_yaml + self._context["config_yaml"] = config - self._config = envs.load_yaml(config) - - self._context["env"] = self._config self._model = {} self._dataset = {} - envs.set_global_envs(self._config) - envs.update_workspace() + self._runner_name = envs.get_runtime_environ("mode") self._context["runner_name"] = self._runner_name - phase_names = self._config.get( + phase_names = envs.get_global_env( "runner." + self._runner_name + ".phases", None) phases = [] if phase_names is None: - phases = self._config.get("phase") + phases = envs.get_global_env("phase") else: - for phase in self._config.get("phase"): + for phase in envs.get_global_env("phase"): if phase["name"] in phase_names: phases.append(phase) @@ -100,19 +95,21 @@ class Trainer(object): """ device = envs.get_global_env( "runner." + self._runner_name + ".device", default_value="CPU") - if device.upper() == 'GPU': + device = device.upper() + + if device == 'GPU': self.check_gpu() self.device = Device.GPU gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0)) self._place = fluid.CUDAPlace(gpu_id) self._exe = fluid.Executor(self._place) - elif device.upper() == "CPU": + elif device == "CPU": self.device = Device.CPU self._place = fluid.CPUPlace() self._exe = fluid.Executor(self._place) else: raise ValueError("Not Support device {}".format(device)) - self._context["device"] = device.upper() + self._context["device"] = device self._context["exe"] = self._exe self._context["place"] = self._place @@ -130,7 +127,6 @@ class Trainer(object): try: if not fluid.is_compiled_with_cuda(): raise RuntimeError(err) - sys.exit(1) except Exception as e: pass diff --git a/core/trainers/framework/network.py b/core/trainers/framework/network.py index b3428b0bf82c8a377d351d69c048afc0a27c5d8d..b012fb7d31e1e8f944df4d575a57e658d560d161 100644 --- a/core/trainers/framework/network.py +++ b/core/trainers/framework/network.py @@ -58,11 +58,9 @@ class SingleNetwork(NetworkBase): with fluid.program_guard(train_program, startup_program): with fluid.unique_name.guard(): with fluid.scope_guard(scope): - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(context["env"]["workspace"])) - model = envs.lazy_instance_by_fliename( - model_path, "Model")(context["env"]) + model_path = model_dict["model"] + model = envs.lazy_instance_by_fliename(model_path, + "Model")(None) if context["is_infer"]: model._infer_data_var = model.input_data( @@ -97,7 +95,7 @@ class SingleNetwork(NetworkBase): "default_main_program"] = train_program.clone() context["dataset"] = {} - for dataset in context["env"]["dataset"]: + for dataset in envs.get_global_env("dataset"): if dataset["type"] != "DataLoader": dataset_class = QueueDataset(context) context["dataset"][dataset[ @@ -114,19 +112,17 @@ class PSNetwork(NetworkBase): def build_network(self, context): context["model"] = {} - if len(context["env"]["phase"]) > 1: + if len(envs.get_global_env("phase")) > 1: warnings.warn( "Cluster Train Only Support One Phase.", category=UserWarning, stacklevel=2) - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] context["model"][model_dict["name"]] = {} dataset_name = model_dict["dataset_name"] - model_path = model_dict["model"].replace( - "{workspace}", envs.path_adapter(context["env"]["workspace"])) - model = envs.lazy_instance_by_fliename(model_path, - "Model")(context["env"]) + model_path = model_dict["model"] + model = envs.lazy_instance_by_fliename(model_path, "Model")(None) model._data_var = model.input_data( dataset_name=model_dict["dataset_name"]) if envs.get_global_env("dataset." + dataset_name + @@ -155,7 +151,7 @@ class PSNetwork(NetworkBase): else: context["fleet"].init_worker() context["dataset"] = {} - for dataset in context["env"]["dataset"]: + for dataset in envs.get_global_env("dataset"): if dataset["type"] != "DataLoader": dataset_class = QueueDataset(context) context["dataset"][dataset[ @@ -201,12 +197,12 @@ class PslibNetwork(NetworkBase): def build_network(self, context): context["model"] = {} - if len(context["env"]["phase"]) > 1: + if len(envs.get_global_env("phase")) > 1: warnings.warn( "Cluster Train Only Support One Phase.", category=UserWarning, stacklevel=2) - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] train_program = fluid.Program() startup_program = fluid.Program() scope = fluid.Scope() @@ -216,12 +212,9 @@ class PslibNetwork(NetworkBase): with fluid.unique_name.guard(): with fluid.scope_guard(scope): context["model"][model_dict["name"]] = {} - - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(context["env"]["workspace"])) - model = envs.lazy_instance_by_fliename( - model_path, "Model")(context["env"]) + model_path = model_dict["model"] + model = envs.lazy_instance_by_fliename(model_path, + "Model")(None) model._data_var = model.input_data( dataset_name=model_dict["dataset_name"]) if envs.get_global_env("dataset." + dataset_name + @@ -250,7 +243,7 @@ class PslibNetwork(NetworkBase): self._server(context) else: context["dataset"] = {} - for dataset in context["env"]["dataset"]: + for dataset in envs.get_global_env("dataset"): if dataset["type"] != "DataLoader": dataset_class = QueueDataset(context) context["dataset"][dataset[ @@ -270,12 +263,12 @@ class CollectiveNetwork(NetworkBase): def build_network(self, context): context["model"] = {} - if len(context["env"]["phase"]) > 1: + if len(envs.get_global_env("phase")) > 1: warnings.warn( "Cluster Train Only Support One Phase.", category=UserWarning, stacklevel=2) - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] context["model"][model_dict["name"]] = {} dataset_name = model_dict["dataset_name"] @@ -284,11 +277,9 @@ class CollectiveNetwork(NetworkBase): scope = fluid.Scope() with fluid.program_guard(train_program, startup_program): with fluid.scope_guard(scope): - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(context["env"]["workspace"])) + model_path = model_dict["model"] model = envs.lazy_instance_by_fliename(model_path, - "Model")(context["env"]) + "Model")(None) model._data_var = model.input_data( dataset_name=model_dict["dataset_name"]) if envs.get_global_env("dataset." + dataset_name + @@ -314,7 +305,7 @@ class CollectiveNetwork(NetworkBase): "default_main_program"] = train_program context["dataset"] = {} - for dataset in context["env"]["dataset"]: + for dataset in envs.get_global_env("dataset"): if dataset["type"] != "DataLoader": dataset_class = QueueDataset(context) context["dataset"][dataset[ diff --git a/core/trainers/framework/runner.py b/core/trainers/framework/runner.py index 8805c6fb79b3e75058339621302b4834a63da021..dbcf40d3a3fd9277f39cbfb8499b6ce73e1e127d 100644 --- a/core/trainers/framework/runner.py +++ b/core/trainers/framework/runner.py @@ -40,6 +40,7 @@ class RunnerBase(object): def _run(self, context, model_dict): reader_name = model_dict["dataset_name"] name = "dataset." + reader_name + "." + if envs.get_global_env(name + "type") == "DataLoader": self._executor_dataloader_train(model_dict, context) else: @@ -309,7 +310,7 @@ class PSRunner(RunnerBase): epochs = int( envs.get_global_env("runner." + context["runner_name"] + ".epochs")) - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] for epoch in range(epochs): begin_time = time.time() self._run(context, model_dict) @@ -336,7 +337,7 @@ class CollectiveRunner(RunnerBase): epochs = int( envs.get_global_env("runner." + context["runner_name"] + ".epochs")) - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] for epoch in range(epochs): begin_time = time.time() self._run(context, model_dict) @@ -361,7 +362,7 @@ class PslibRunner(RunnerBase): def run(self, context): context["fleet"].init_worker() - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] epochs = int( envs.get_global_env("runner." + context["runner_name"] + ".epochs")) @@ -382,7 +383,7 @@ class PslibRunner(RunnerBase): day = begin_day + datetime.timedelta(days=day, hours=hour) day_s = day.strftime('%Y%m%d/%H') - for dataset in context["env"]["dataset"]: + for dataset in envs.get_global_env("dataset"): if dataset["type"] != "DataLoader": name = dataset["name"] train_data_path = envs.get_global_env(name + diff --git a/core/trainers/framework/startup.py b/core/trainers/framework/startup.py index 82e244723ecae607eb8cfc1110c0a29642bd4615..ea3558072bb1d61ecb9d63c07ff5d6b71a37ea22 100644 --- a/core/trainers/framework/startup.py +++ b/core/trainers/framework/startup.py @@ -73,7 +73,7 @@ class PSStartup(StartupBase): pass def startup(self, context): - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]): train_prog = context["model"][model_dict["name"]]["main_program"] @@ -91,7 +91,7 @@ class CollectiveStartup(StartupBase): pass def startup(self, context): - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]): train_prog = context["model"][model_dict["name"]][ "default_main_program"] diff --git a/core/trainers/single_infer.py b/core/trainers/single_infer.py deleted file mode 100755 index fcc92e2e01cf7af49d6d124a7a5773ac9946c989..0000000000000000000000000000000000000000 --- a/core/trainers/single_infer.py +++ /dev/null @@ -1,371 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Training use fluid with one node only. -""" - -from __future__ import print_function - -import time -import logging -import os -import json -import numpy as np -import paddle.fluid as fluid - -from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer -from paddlerec.core.utils import envs -from paddlerec.core.reader import SlotReader -from paddlerec.core.utils import dataloader_instance - -logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger("fluid") -logger.setLevel(logging.INFO) - - -class SingleInfer(TranspileTrainer): - def __init__(self, config=None): - super(TranspileTrainer, self).__init__(config) - self._env = self._config - device = envs.get_global_env("device") - if device == 'gpu': - self._place = fluid.CUDAPlace(0) - elif device == 'cpu': - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - self.processor_register() - self._model = {} - self._dataset = {} - envs.set_global_envs(self._config) - envs.update_workspace() - self._runner_name = envs.get_global_env("mode") - device = envs.get_global_env("runner." + self._runner_name + ".device") - if device == 'gpu': - self._place = fluid.CUDAPlace(0) - elif device == 'cpu': - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - - def processor_register(self): - self.regist_context_processor('uninit', self.instance) - self.regist_context_processor('init_pass', self.init) - self.regist_context_processor('startup_pass', self.startup) - self.regist_context_processor('train_pass', self.executor_train) - self.regist_context_processor('terminal_pass', self.terminal) - - def instance(self, context): - context['status'] = 'init_pass' - - def _get_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_class = envs.get_global_env(name + "data_converter") - abs_dir = os.path.dirname(os.path.abspath(__file__)) - reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - if sparse_slots == "" and dense_slots == "": - pipe_cmd = "python {} {} {} {}".format(reader, reader_class, - "TRAIN", self._config_yaml) - else: - if sparse_slots == "": - sparse_slots = "?" - if dense_slots == "": - dense_slots = "?" - padding = envs.get_global_env(name + "padding", 0) - pipe_cmd = "python {} {} {} {} {} {} {} {}".format( - reader, "slot", "slot", self._config_yaml, "fake", \ - sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding)) - - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_batch_size(envs.get_global_env(name + "batch_size")) - dataset.set_pipe_command(pipe_cmd) - train_data_path = envs.get_global_env(name + "data_path") - file_list = [ - os.path.join(train_data_path, x) - for x in os.listdir(train_data_path) - ] - dataset.set_filelist(file_list) - for model_dict in self._env["phase"]: - if model_dict["dataset_name"] == dataset_name: - model = self._model[model_dict["name"]][3] - inputs = model._infer_data_var - dataset.set_use_var(inputs) - break - return dataset - - def _get_dataloader(self, dataset_name, dataloader): - name = "dataset." + dataset_name + "." - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_class = envs.get_global_env(name + "data_converter") - abs_dir = os.path.dirname(os.path.abspath(__file__)) - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - if sparse_slots == "" and dense_slots == "": - reader = dataloader_instance.dataloader_by_name( - reader_class, dataset_name, self._config_yaml) - reader_class = envs.lazy_instance_by_fliename(reader_class, - "TrainReader") - reader_ins = reader_class(self._config_yaml) - else: - reader = dataloader_instance.slotdataloader_by_name( - "", dataset_name, self._config_yaml) - reader_ins = SlotReader(self._config_yaml) - if hasattr(reader_ins, 'generate_batch_from_trainfiles'): - dataloader.set_sample_list_generator(reader) - else: - dataloader.set_sample_generator(reader, batch_size) - return dataloader - - def _create_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(name + "sparse_slots") - dense_slots = envs.get_global_env(name + "dense_slots") - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - type_name = envs.get_global_env(name + "type") - if envs.get_platform() != "LINUX": - print("platform ", envs.get_platform(), - " change reader to DataLoader") - type_name = "DataLoader" - padding = 0 - - if type_name == "DataLoader": - return None - else: - return self._get_dataset(dataset_name) - - def init(self, context): - for model_dict in self._env["phase"]: - self._model[model_dict["name"]] = [None] * 5 - train_program = fluid.Program() - startup_program = fluid.Program() - scope = fluid.Scope() - dataset_name = model_dict["dataset_name"] - opt_name = envs.get_global_env("hyper_parameters.optimizer.class") - opt_lr = envs.get_global_env( - "hyper_parameters.optimizer.learning_rate") - opt_strategy = envs.get_global_env( - "hyper_parameters.optimizer.strategy") - with fluid.program_guard(train_program, startup_program): - with fluid.unique_name.guard(): - with fluid.scope_guard(scope): - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(self._env["workspace"])) - model = envs.lazy_instance_by_fliename( - model_path, "Model")(self._env) - model._infer_data_var = model.input_data( - is_infer=True, - dataset_name=model_dict["dataset_name"]) - if envs.get_global_env("dataset." + dataset_name + - ".type") == "DataLoader": - model._init_dataloader(is_infer=True) - self._get_dataloader(dataset_name, - model._data_loader) - model.net(model._infer_data_var, True) - self._model[model_dict["name"]][0] = train_program - self._model[model_dict["name"]][1] = startup_program - self._model[model_dict["name"]][2] = scope - self._model[model_dict["name"]][3] = model - self._model[model_dict["name"]][4] = train_program.clone() - - for dataset in self._env["dataset"]: - if dataset["type"] != "DataLoader": - self._dataset[dataset["name"]] = self._create_dataset(dataset[ - "name"]) - - context['status'] = 'startup_pass' - - def startup(self, context): - for model_dict in self._env["phase"]: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - self._exe.run(self._model[model_dict["name"]][1]) - context['status'] = 'train_pass' - - def executor_train(self, context): - epochs = int( - envs.get_global_env("runner." + self._runner_name + ".epochs")) - for j in range(epochs): - for model_dict in self._env["phase"]: - if j == 0: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - train_prog = self._model[model_dict["name"]][0] - startup_prog = self._model[model_dict["name"]][1] - with fluid.program_guard(train_prog, startup_prog): - self.load() - reader_name = model_dict["dataset_name"] - name = "dataset." + reader_name + "." - begin_time = time.time() - if envs.get_global_env(name + "type") == "DataLoader": - self._executor_dataloader_train(model_dict) - else: - self._executor_dataset_train(model_dict) - with fluid.scope_guard(self._model[model_dict["name"]][2]): - train_prog = self._model[model_dict["name"]][4] - startup_prog = self._model[model_dict["name"]][1] - with fluid.program_guard(train_prog, startup_prog): - self.save(j) - end_time = time.time() - seconds = end_time - begin_time - print("epoch {} done, time elasped: {}".format(j, seconds)) - context['status'] = "terminal_pass" - - def _executor_dataset_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - fetch_vars = [] - fetch_alias = [] - fetch_period = int( - envs.get_global_env("runner." + self._runner_name + - ".print_interval", 20)) - metrics = model_class.get_infer_results() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - scope = self._model[model_name][2] - program = self._model[model_name][0] - reader = self._dataset[reader_name] - with fluid.scope_guard(scope): - self._exe.infer_from_dataset( - program=program, - dataset=reader, - fetch_list=fetch_vars, - fetch_info=fetch_alias, - print_period=fetch_period) - - def _executor_dataloader_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - program = self._model[model_name][0].clone() - fetch_vars = [] - fetch_alias = [] - metrics = model_class.get_infer_results() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - metrics_varnames = [] - metrics_format = [] - fetch_period = int( - envs.get_global_env("runner." + self._runner_name + - ".print_interval", 20)) - metrics_format.append("{}: {{}}".format("batch")) - metrics_indexes = dict() - for name, var in metrics.items(): - metrics_varnames.append(var.name) - metrics_indexes[var.name] = len(metrics_varnames) - 1 - metrics_format.append("{}: {{}}".format(name)) - metrics_format = ", ".join(metrics_format) - - reader = self._model[model_name][3]._data_loader - reader.start() - batch_id = 0 - scope = self._model[model_name][2] - - infer_results = [] - with fluid.scope_guard(scope): - try: - while True: - metrics_rets = self._exe.run(program=program, - fetch_list=metrics_varnames, - return_numpy=False) - metrics = [batch_id] - metrics.extend(metrics_rets) - - batch_infer_result = {} - for k, v in metrics_indexes.items(): - batch_infer_result[k] = np.array(metrics_rets[ - v]).tolist() - infer_results.append(batch_infer_result) - - if batch_id % fetch_period == 0 and batch_id != 0: - print(metrics_format.format(*metrics)) - batch_id += 1 - except fluid.core.EOFException: - reader.reset() - with open(model_dict['save_path'], 'w') as fout: - json.dump(infer_results, fout) - - def terminal(self, context): - context['is_exit'] = True - - def load(self, is_fleet=False): - name = "runner." + self._runner_name + "." - dirname = envs.get_global_env(name + "init_model_path", None) - if dirname is None or dirname == "": - return - print("single_infer going to load ", dirname) - if is_fleet: - fleet.load_persistables(self._exe, dirname) - else: - fluid.io.load_persistables(self._exe, dirname) - - def save(self, epoch_id, is_fleet=False): - def need_save(epoch_id, epoch_interval, is_last=False): - if is_last: - return True - if epoch_id == -1: - return False - - return epoch_id % epoch_interval == 0 - - def save_inference_model(): - name = "runner." + self._runner_name + "." - save_interval = int( - envs.get_global_env(name + "save_inference_interval", -1)) - if not need_save(epoch_id, save_interval, False): - return - feed_varnames = envs.get_global_env( - name + "save_inference_feed_varnames", None) - fetch_varnames = envs.get_global_env( - name + "save_inference_fetch_varnames", None) - if feed_varnames is None or fetch_varnames is None or feed_varnames == "": - return - fetch_vars = [ - fluid.default_main_program().global_block().vars[varname] - for varname in fetch_varnames - ] - dirname = envs.get_global_env(name + "save_inference_path", None) - - assert dirname is not None - dirname = os.path.join(dirname, str(epoch_id)) - - if is_fleet: - fleet.save_inference_model(self._exe, dirname, feed_varnames, - fetch_vars) - else: - fluid.io.save_inference_model(dirname, feed_varnames, - fetch_vars, self._exe) - - def save_persistables(): - name = "runner." + self._runner_name + "." - save_interval = int( - envs.get_global_env(name + "save_checkpoint_interval", -1)) - if not need_save(epoch_id, save_interval, False): - return - dirname = envs.get_global_env(name + "save_checkpoint_path", None) - if dirname is None or dirname == "": - return - dirname = os.path.join(dirname, str(epoch_id)) - if is_fleet: - fleet.save_persistables(self._exe, dirname) - else: - fluid.io.save_persistables(self._exe, dirname) - - save_persistables() - save_inference_model() diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py deleted file mode 100755 index d782dfa42f5695c8b04e9e2db6ba92b676b8b0f6..0000000000000000000000000000000000000000 --- a/core/trainers/single_trainer.py +++ /dev/null @@ -1,391 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Training use fluid with one node only. -""" - -from __future__ import print_function - -import time -import logging -import os -import paddle.fluid as fluid - -from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer -from paddlerec.core.utils import envs -from paddlerec.core.reader import SlotReader -from paddlerec.core.utils import dataloader_instance - -logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger("fluid") -logger.setLevel(logging.INFO) - - -class SingleTrainer(TranspileTrainer): - def __init__(self, config=None): - super(TranspileTrainer, self).__init__(config) - self._env = self._config - self.processor_register() - self._model = {} - self._dataset = {} - envs.set_global_envs(self._config) - envs.update_workspace() - self._runner_name = envs.get_global_env("mode") - device = envs.get_global_env("runner." + self._runner_name + ".device") - if device == 'gpu': - self._place = fluid.CUDAPlace(0) - elif device == 'cpu': - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - - def processor_register(self): - self.regist_context_processor('uninit', self.instance) - self.regist_context_processor('init_pass', self.init) - self.regist_context_processor('startup_pass', self.startup) - self.regist_context_processor('train_pass', self.executor_train) - self.regist_context_processor('terminal_pass', self.terminal) - - def instance(self, context): - context['status'] = 'init_pass' - - def _get_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_class = envs.get_global_env(name + "data_converter") - abs_dir = os.path.dirname(os.path.abspath(__file__)) - reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - if sparse_slots == "" and dense_slots == "": - pipe_cmd = "python {} {} {} {}".format(reader, reader_class, - "TRAIN", self._config_yaml) - else: - if sparse_slots == "": - sparse_slots = "?" - if dense_slots == "": - dense_slots = "?" - padding = envs.get_global_env(name + "padding", 0) - pipe_cmd = "python {} {} {} {} {} {} {} {}".format( - reader, "slot", "slot", self._config_yaml, "fake", \ - sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding)) - - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_batch_size(envs.get_global_env(name + "batch_size")) - dataset.set_pipe_command(pipe_cmd) - train_data_path = envs.get_global_env(name + "data_path") - file_list = [ - os.path.join(train_data_path, x) - for x in os.listdir(train_data_path) - ] - dataset.set_filelist(file_list) - for model_dict in self._env["phase"]: - if model_dict["dataset_name"] == dataset_name: - model = self._model[model_dict["name"]][3] - inputs = model._data_var - dataset.set_use_var(inputs) - break - return dataset - - def _get_dataloader(self, dataset_name, dataloader): - name = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_class = envs.get_global_env(name + "data_converter") - abs_dir = os.path.dirname(os.path.abspath(__file__)) - if sparse_slots == "" and dense_slots == "": - reader = dataloader_instance.dataloader_by_name( - reader_class, dataset_name, self._config_yaml) - reader_class = envs.lazy_instance_by_fliename(reader_class, - "TrainReader") - reader_ins = reader_class(self._config_yaml) - else: - reader = dataloader_instance.slotdataloader_by_name( - "", dataset_name, self._config_yaml) - reader_ins = SlotReader(self._config_yaml) - if hasattr(reader_ins, 'generate_batch_from_trainfiles'): - dataloader.set_sample_list_generator(reader) - else: - dataloader.set_sample_generator(reader, batch_size) - return dataloader - - def _create_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - type_name = envs.get_global_env(name + "type") - if envs.get_platform() != "LINUX": - print("platform ", envs.get_platform(), - " change reader to DataLoader") - type_name = "DataLoader" - padding = 0 - - if type_name == "DataLoader": - return None - else: - return self._get_dataset(dataset_name) - - def init(self, context): - for model_dict in self._env["phase"]: - self._model[model_dict["name"]] = [None] * 5 - train_program = fluid.Program() - startup_program = fluid.Program() - scope = fluid.Scope() - dataset_name = model_dict["dataset_name"] - with fluid.program_guard(train_program, startup_program): - with fluid.unique_name.guard(): - with fluid.scope_guard(scope): - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(self._env["workspace"])) - model = envs.lazy_instance_by_fliename( - model_path, "Model")(self._env) - model._data_var = model.input_data( - dataset_name=model_dict["dataset_name"]) - if envs.get_global_env("dataset." + dataset_name + - ".type") == "DataLoader": - model._init_dataloader(is_infer=False) - self._get_dataloader(dataset_name, - model._data_loader) - model.net(model._data_var, False) - optimizer = model.optimizer() - optimizer.minimize(model._cost) - self._model[model_dict["name"]][0] = train_program - self._model[model_dict["name"]][1] = startup_program - self._model[model_dict["name"]][2] = scope - self._model[model_dict["name"]][3] = model - self._model[model_dict["name"]][4] = train_program.clone() - - for dataset in self._env["dataset"]: - if dataset["type"] != "DataLoader": - self._dataset[dataset["name"]] = self._create_dataset(dataset[ - "name"]) - - context['status'] = 'startup_pass' - - def startup(self, context): - for model_dict in self._env["phase"]: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - self._exe.run(self._model[model_dict["name"]][1]) - context['status'] = 'train_pass' - - def executor_train(self, context): - epochs = int( - envs.get_global_env("runner." + self._runner_name + ".epochs")) - for j in range(epochs): - for model_dict in self._env["phase"]: - if j == 0: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - train_prog = self._model[model_dict["name"]][0] - startup_prog = self._model[model_dict["name"]][1] - with fluid.program_guard(train_prog, startup_prog): - self.load() - reader_name = model_dict["dataset_name"] - name = "dataset." + reader_name + "." - begin_time = time.time() - if envs.get_global_env(name + "type") == "DataLoader": - self._executor_dataloader_train(model_dict) - else: - self._executor_dataset_train(model_dict) - with fluid.scope_guard(self._model[model_dict["name"]][2]): - train_prog = self._model[model_dict["name"]][4] - startup_prog = self._model[model_dict["name"]][1] - with fluid.program_guard(train_prog, startup_prog): - self.save(j) - end_time = time.time() - seconds = end_time - begin_time - print("epoch {} done, time elasped: {}".format(j, seconds)) - context['status'] = "terminal_pass" - - def _executor_dataset_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - fetch_vars = [] - fetch_alias = [] - fetch_period = int( - envs.get_global_env("runner." + self._runner_name + - ".print_interval", 20)) - metrics = model_class.get_metrics() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - scope = self._model[model_name][2] - program = self._model[model_name][0] - reader = self._dataset[reader_name] - threads = model_dict.get("thread_num", 1) - with fluid.scope_guard(scope): - self._exe.train_from_dataset( - program=program, - dataset=reader, - thread=threads, - fetch_list=fetch_vars, - fetch_info=fetch_alias, - print_period=fetch_period) - - def _executor_dataloader_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - program = self._model[model_name][0].clone() - - _build_strategy = fluid.BuildStrategy() - _exe_strategy = fluid.ExecutionStrategy() - - # 0: kCoeffNumDevice; 1: One; 2: Customized - _gradient_scale_strategy = model_dict.get("gradient_scale_strategy", 0) - if _gradient_scale_strategy == 0: - gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.CoeffNumDevice - elif _gradient_scale_strategy == 1: - gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.One - elif _gradient_scale_strategy == 2: - gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized - else: - raise ValueError( - "Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]." - ) - _build_strategy.gradient_scale_strategy = gradient_scale_strategy - - if "thread_num" in model_dict and model_dict["thread_num"] > 1: - _build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce - _exe_strategy.num_threads = model_dict["thread_num"] - os.environ['CPU_NUM'] = str(_exe_strategy.num_threads) - - program = fluid.compiler.CompiledProgram(program).with_data_parallel( - loss_name=model_class.get_avg_cost().name, - build_strategy=_build_strategy, - exec_strategy=_exe_strategy) - - fetch_vars = [] - fetch_alias = [] - fetch_period = int( - envs.get_global_env("runner." + self._runner_name + - ".print_interval", 20)) - metrics = model_class.get_metrics() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - metrics_varnames = [] - metrics_format = [] - metrics_format.append("{}: {{}}".format("batch")) - for name, var in metrics.items(): - metrics_varnames.append(var.name) - metrics_format.append("{}: {{}}".format(name)) - metrics_format = ", ".join(metrics_format) - - reader = self._model[model_name][3]._data_loader - reader.start() - batch_id = 0 - scope = self._model[model_name][2] - with fluid.scope_guard(scope): - try: - while True: - metrics_rets = self._exe.run(program=program, - fetch_list=metrics_varnames) - metrics = [batch_id] - metrics.extend(metrics_rets) - - if batch_id % fetch_period == 0 and batch_id != 0: - print(metrics_format.format(*metrics)) - batch_id += 1 - except fluid.core.EOFException: - reader.reset() - - def terminal(self, context): - context['is_exit'] = True - - def load(self, is_fleet=False): - dirname = envs.get_global_env( - "runner." + self._runner_name + ".init_model_path", None) - load_vars = envs.get_global_env( - "runner." + self._runner_name + ".load_vars", None) - - def name_has_embedding(var): - res = False - for var_name in load_vars: - if var_name == var.name: - return True - return res - - if dirname is None or dirname == "": - return - print("going to load ", dirname) - if is_fleet: - fleet.load_persistables(self._exe, dirname) - else: - if load_vars is None or len(load_vars) == 0: - fluid.io.load_persistables(self._exe, dirname) - else: - fluid.io.load_vars( - self._exe, dirname, predicate=name_has_embedding) - - def save(self, epoch_id, is_fleet=False): - def need_save(epoch_id, epoch_interval, is_last=False): - if is_last: - return True - if epoch_id == -1: - return False - - return epoch_id % epoch_interval == 0 - - def save_inference_model(): - name = "runner." + self._runner_name + "." - save_interval = int( - envs.get_global_env(name + "save_inference_interval", -1)) - if not need_save(epoch_id, save_interval, False): - return - feed_varnames = envs.get_global_env( - name + "save_inference_feed_varnames", []) - fetch_varnames = envs.get_global_env( - name + "save_inference_fetch_varnames", []) - if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \ - len(feed_varnames) == 0 or len(fetch_varnames) == 0: - return - fetch_vars = [ - fluid.default_main_program().global_block().vars[varname] - for varname in fetch_varnames - ] - dirname = envs.get_global_env(name + "save_inference_path", None) - - assert dirname is not None - dirname = os.path.join(dirname, str(epoch_id)) - - if is_fleet: - fleet.save_inference_model(self._exe, dirname, feed_varnames, - fetch_vars) - else: - fluid.io.save_inference_model(dirname, feed_varnames, - fetch_vars, self._exe) - - def save_persistables(): - name = "runner." + self._runner_name + "." - save_interval = int( - envs.get_global_env(name + "save_checkpoint_interval", -1)) - if not need_save(epoch_id, save_interval, False): - return - dirname = envs.get_global_env(name + "save_checkpoint_path", None) - if dirname is None or dirname == "": - return - dirname = os.path.join(dirname, str(epoch_id)) - if is_fleet: - fleet.save_persistables(self._exe, dirname) - else: - fluid.io.save_persistables(self._exe, dirname) - - save_persistables() - save_inference_model() diff --git a/doc/custom_reader.md b/doc/custom_reader.md index bf0e5433b06c90dab7440b7dbf19eaa6ef975d1c..748f82c5499721857a011b08076af1b732914cc0 100644 --- a/doc/custom_reader.md +++ b/doc/custom_reader.md @@ -208,7 +208,7 @@ CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www 稀疏参数输入的定义: ```python def sparse_inputs(): - ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self._namespace) + ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None) sparse_input_ids = [ fluid.layers.data(name="S" + str(i), @@ -222,7 +222,7 @@ def sparse_inputs(): 稠密参数输入的定义: ```python def dense_input(): - dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self._namespace) + dim = envs.get_global_env("hyper_parameters.dense_input_dim", None) dense_input_var = fluid.layers.data(name="D", shape=[dim], diff --git a/doc/design.md b/doc/design.md index f88401aafc74a1b9910bdb082fa7a6cefa301359..164fa811b1ae800c0d8b84e8fdebfc7f59ab5430 100644 --- a/doc/design.md +++ b/doc/design.md @@ -139,7 +139,6 @@ class Model(object): self._data_loader = None self._infer_data_loader = None self._fetch_interval = 20 - self._namespace = "train.model" self._platform = envs.get_platform() def get_inputs(self): diff --git a/doc/model.md b/doc/model.md index ca76dd360c9c9740b1d607e78682c80aded4898e..d4c09733bc2489bafb811d74489f6e6556788e9e 100644 --- a/doc/model.md +++ b/doc/model.md @@ -24,8 +24,7 @@ hyper_parameters: ```python if name == "SGD": - reg = envs.get_global_env("hyper_parameters.reg", 0.0001, - self._namespace) + reg = envs.get_global_env("hyper_parameters.reg", 0.0001) optimizer_i = fluid.optimizer.SGD( lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) elif name == "ADAM": diff --git a/models/treebased/tdm/tdm_startup.py b/models/treebased/tdm/tdm_startup.py index 3f1b87dbedd7a2448a77c9388f6e348dcf210245..ccc9b6c96372321a04730f7d08f9c92d9504313a 100644 --- a/models/treebased/tdm/tdm_startup.py +++ b/models/treebased/tdm/tdm_startup.py @@ -47,7 +47,7 @@ class Startup(StartupBase): def _single_startup(self, context): load_tree_from_numpy = envs.get_global_env( "hyper_parameters.tree.load_tree_from_numpy", False) - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]): context["exe"].run(context["model"][model_dict["name"]][ "startup_program"]) @@ -106,7 +106,7 @@ class Startup(StartupBase): warmup_model_path = envs.get_global_env( "runner." + context["runner_name"] + ".init_model_path", None) assert warmup_model_path != None, "set runner.init_model_path for loading model" - model_dict = context["env"]["phase"][0] + model_dict = envs.get_global_env("phase")[0] with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]): context["exe"].run(context["model"][model_dict["name"]][ "startup_program"])