diff --git a/core/factory.py b/core/factory.py index 3490a8f385b410f7032886af20db84ecbaa017c2..9430c88283800e69db7043aa141b6f735212c79f 100755 --- a/core/factory.py +++ b/core/factory.py @@ -14,7 +14,6 @@ import os import sys -import yaml from paddlerec.core.utils import envs trainer_abs = os.path.join( @@ -61,7 +60,6 @@ class TrainerFactory(object): def create(config): _config = envs.load_yaml(config) envs.set_global_envs(_config) - envs.update_workspace() trainer = TrainerFactory._build_trainer(config) return trainer diff --git a/core/model.py b/core/model.py index 30bbb7fc327f1df236aeee181ebbb0a7fca0b930..e96d01fe0bbcc0541a2b5aea458e2cb34fa89303 100755 --- a/core/model.py +++ b/core/model.py @@ -35,7 +35,6 @@ class ModelBase(object): self._data_loader = None self._infer_data_loader = None self._fetch_interval = 20 - self._namespace = "train.model" self._platform = envs.get_platform() self._init_hyper_parameters() self._env = config @@ -50,7 +49,7 @@ class ModelBase(object): self._slot_inited = True dataset = {} model_dict = {} - for i in self._env["executor"]: + for i in self._env["phase"]: if i["name"] == kargs["name"]: model_dict = i break @@ -89,7 +88,7 @@ class ModelBase(object): self._data_var.append(l) self._sparse_data_var.append(l) - dataset_class = dataset["type"] + dataset_class = envs.get_global_env(name + "type") if dataset_class == "DataLoader": self._init_dataloader() @@ -139,10 +138,7 @@ class ModelBase(object): os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0' if name == "SGD": - reg = envs.get_global_env("hyper_parameters.reg", 0.0001, - self._namespace) - optimizer_i = fluid.optimizer.SGD( - lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) + optimizer_i = fluid.optimizer.SGD(lr) elif name == "ADAM": optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True) elif name == "ADAGRAD": @@ -206,31 +202,8 @@ class ModelBase(object): def net(self, is_infer=False): return None - def _construct_reader(self, is_infer=False): - if is_infer: - self._infer_data_loader = fluid.io.DataLoader.from_generator( - feed_list=self._infer_data_var, - capacity=64, - use_double_buffer=False, - iterable=False) - else: - dataset_class = envs.get_global_env("dataset_class", None, - "train.reader") - if dataset_class == "DataLoader": - self._data_loader = fluid.io.DataLoader.from_generator( - feed_list=self._data_var, - capacity=64, - use_double_buffer=False, - iterable=False) - def train_net(self): - input_data = self.input_data(is_infer=False) - self._data_var = input_data - self._construct_reader(is_infer=False) - self.net(input_data, is_infer=False) + pass def infer_net(self): - input_data = self.input_data(is_infer=True) - self._infer_data_var = input_data - self._construct_reader(is_infer=True) - self.net(input_data, is_infer=True) + pass diff --git a/core/reader.py b/core/reader.py index dd20d588a5c36754190e708484505895d19c41a4..589e6c192330baf04c202952cefb04177f3e4297 100755 --- a/core/reader.py +++ b/core/reader.py @@ -17,7 +17,6 @@ import abc import os from functools import reduce import paddle.fluid.incubate.data_generator as dg -import yaml from paddlerec.core.utils import envs @@ -28,7 +27,6 @@ class ReaderBase(dg.MultiSlotDataGenerator): dg.MultiSlotDataGenerator.__init__(self) _config = envs.load_yaml(config) envs.set_global_envs(_config) - envs.update_workspace() @abc.abstractmethod def init(self): @@ -47,7 +45,6 @@ class SlotReader(dg.MultiSlotDataGenerator): dg.MultiSlotDataGenerator.__init__(self) _config = envs.load_yaml(config) envs.set_global_envs(_config) - envs.update_workspace() def init(self, sparse_slots, dense_slots, padding=0): from operator import mul diff --git a/core/trainer.py b/core/trainer.py index 9694d0d04f9f6809e72b02045ddce11e3bbe30c8..8b1afd449a70265d5bcae9996d42795a1235197a 100755 --- a/core/trainer.py +++ b/core/trainer.py @@ -16,7 +16,6 @@ import abc import os import time import sys -import yaml import traceback from paddle import fluid @@ -64,19 +63,31 @@ class Trainer(object): self.increment_models = [] self._exector_context = {} self._context = {'status': 'uninit', 'is_exit': False} - self._config_yaml = config - self._context["config_yaml"] = self._config_yaml + self._context["config_yaml"] = config - self._config = envs.load_yaml(config) - - self._context["env"] = self._config self._model = {} self._dataset = {} - envs.set_global_envs(self._config) - envs.update_workspace() - self._runner_name = envs.get_global_env("mode") + + self._runner_name = envs.get_runtime_environ("mode") self._context["runner_name"] = self._runner_name + phase_names = envs.get_global_env( + "runner." + self._runner_name + ".phases", None) + + _config = envs.load_yaml(config) + + self._context["env"] = _config + self._context["dataset"] = _config.get("dataset") + + phases = [] + if phase_names is None: + phases = _config.get("phase") + else: + for phase in _config.get("phase"): + if phase["name"] in phase_names: + phases.append(phase) + + self._context["phases"] = phases print("PaddleRec: Runner {} Begin".format(self._runner_name)) self.which_engine() self.which_device() @@ -89,19 +100,21 @@ class Trainer(object): """ device = envs.get_global_env( "runner." + self._runner_name + ".device", default_value="CPU") - if device.upper() == 'GPU': + device = device.upper() + + if device == 'GPU': self.check_gpu() self.device = Device.GPU gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0)) self._place = fluid.CUDAPlace(gpu_id) self._exe = fluid.Executor(self._place) - elif device.upper() == "CPU": + elif device == "CPU": self.device = Device.CPU self._place = fluid.CPUPlace() self._exe = fluid.Executor(self._place) else: raise ValueError("Not Support device {}".format(device)) - self._context["device"] = device.upper() + self._context["device"] = device self._context["exe"] = self._exe self._context["place"] = self._place @@ -119,7 +132,6 @@ class Trainer(object): try: if not fluid.is_compiled_with_cuda(): raise RuntimeError(err) - sys.exit(1) except Exception as e: pass @@ -191,17 +203,10 @@ class Trainer(object): None : run a processor for this status """ status = context['status'] - try: - if status in self._status_processor: - self._status_processor[context['status']](context) - else: - self.other_status_processor(context) - except Exception as err: - traceback.print_exc() - print('Catch Exception:%s' % str(err)) - sys.stdout.flush() - self._context['is_exit'] = self.handle_processor_exception( - status, context, err) + if status in self._status_processor: + self._status_processor[context['status']](context) + else: + self.other_status_processor(context) def other_status_processor(self, context): """ @@ -212,14 +217,17 @@ class Trainer(object): print('unknow context_status:%s, do nothing' % context['status']) time.sleep(60) - def handle_processor_exception(self, status, context, exception): + def handle_processor_exception(self, context, exception): """ when exception throwed from processor, will call this func to handle it Return: bool exit_app or not """ - print('Exit app. catch exception in precoss status:%s, except:%s' % - (context['status'], str(exception))) + print("\n--------------------------------\nPaddleRec Error Message " + "Summary:\n--------------------------------\n") + print( + 'Exit PaddleRec. catch exception in precoss status: [%s], except: %s' + % (context['status'], str(exception))) return True def reload_train_context(self): @@ -233,19 +241,14 @@ class Trainer(object): keep running by statu context. """ while True: - self.reload_train_context() - self.context_process(self._context) - if self._context['is_exit']: - break - - -def user_define_engine(engine_yaml): - _config = envs.load_yaml(engine_yaml) - envs.set_runtime_environs(_config) - train_location = envs.get_global_env("engine.file") - train_dirname = os.path.dirname(train_location) - base_name = os.path.splitext(os.path.basename(train_location))[0] - sys.path.append(train_dirname) - trainer_class = envs.lazy_instance_by_fliename(base_name, - "UserDefineTraining") - return trainer_class + try: + self.reload_train_context() + self.context_process(self._context) + if self._context['is_exit']: + break + except Exception as err: + traceback.print_exc() + print('Catch Exception:%s' % str(err)) + sys.stdout.flush() + self.handle_processor_exception(self._context, err) + sys.exit(type(err).__name__) diff --git a/core/trainers/framework/dataset.py b/core/trainers/framework/dataset.py index 47122ad38c7806a262dfffc91f364c9fcca1344a..00652e358e9919f85ed9e8938dda8e2122de2fb6 100644 --- a/core/trainers/framework/dataset.py +++ b/core/trainers/framework/dataset.py @@ -126,7 +126,7 @@ class QueueDataset(DatasetBase): file_list = context["fleet"].split_files(file_list) dataset.set_filelist(file_list) - for model_dict in context["env"]["phase"]: + for model_dict in context["phases"]: if model_dict["dataset_name"] == dataset_name: model = context["model"][model_dict["name"]]["model"] thread_num = int(model_dict["thread_num"]) diff --git a/core/trainers/framework/network.py b/core/trainers/framework/network.py index 3660521892f8831847c4922fced150a05f744ca8..71f2a4e7fa6ef671c3a07724183edf2e759aec5e 100644 --- a/core/trainers/framework/network.py +++ b/core/trainers/framework/network.py @@ -48,7 +48,7 @@ class SingleNetwork(NetworkBase): def build_network(self, context): context["model"] = {} - for model_dict in context["env"]["phase"]: + for model_dict in context["phases"]: context["model"][model_dict["name"]] = {} train_program = fluid.Program() startup_program = fluid.Program() @@ -58,9 +58,8 @@ class SingleNetwork(NetworkBase): with fluid.program_guard(train_program, startup_program): with fluid.unique_name.guard(): with fluid.scope_guard(scope): - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(context["env"]["workspace"])) + model_path = envs.os_path_adapter( + envs.workspace_adapter(model_dict["model"])) model = envs.lazy_instance_by_fliename( model_path, "Model")(context["env"]) @@ -98,7 +97,8 @@ class SingleNetwork(NetworkBase): context["dataset"] = {} for dataset in context["env"]["dataset"]: - if dataset["type"] != "DataLoader": + type = envs.get_global_env("dataset." + dataset["name"] + ".type") + if type != "DataLoader": dataset_class = QueueDataset(context) context["dataset"][dataset[ "name"]] = dataset_class.create_dataset(dataset["name"], @@ -123,8 +123,8 @@ class PSNetwork(NetworkBase): context["model"][model_dict["name"]] = {} dataset_name = model_dict["dataset_name"] - model_path = model_dict["model"].replace( - "{workspace}", envs.path_adapter(context["env"]["workspace"])) + model_path = envs.os_path_adapter( + envs.workspace_adapter(model_dict["model"])) model = envs.lazy_instance_by_fliename(model_path, "Model")(context["env"]) model._data_var = model.input_data( @@ -156,7 +156,9 @@ class PSNetwork(NetworkBase): context["fleet"].init_worker() context["dataset"] = {} for dataset in context["env"]["dataset"]: - if dataset["type"] != "DataLoader": + type = envs.get_global_env("dataset." + dataset["name"] + + ".type") + if type != "DataLoader": dataset_class = QueueDataset(context) context["dataset"][dataset[ "name"]] = dataset_class.create_dataset( @@ -216,10 +218,8 @@ class PslibNetwork(NetworkBase): with fluid.unique_name.guard(): with fluid.scope_guard(scope): context["model"][model_dict["name"]] = {} - - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(context["env"]["workspace"])) + model_path = envs.os_path_adapter( + envs.workspace_adapter(model_dict["model"])) model = envs.lazy_instance_by_fliename( model_path, "Model")(context["env"]) model._data_var = model.input_data( @@ -251,7 +251,9 @@ class PslibNetwork(NetworkBase): else: context["dataset"] = {} for dataset in context["env"]["dataset"]: - if dataset["type"] != "DataLoader": + type = envs.get_global_env("dataset." + dataset["name"] + + ".type") + if type != "DataLoader": dataset_class = QueueDataset(context) context["dataset"][dataset[ "name"]] = dataset_class.create_dataset( @@ -284,9 +286,9 @@ class CollectiveNetwork(NetworkBase): scope = fluid.Scope() with fluid.program_guard(train_program, startup_program): with fluid.scope_guard(scope): - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(context["env"]["workspace"])) + model_path = envs.os_path_adapter( + envs.workspace_adapter(model_dict["model"])) + model = envs.lazy_instance_by_fliename(model_path, "Model")(context["env"]) model._data_var = model.input_data( @@ -315,7 +317,8 @@ class CollectiveNetwork(NetworkBase): context["dataset"] = {} for dataset in context["env"]["dataset"]: - if dataset["type"] != "DataLoader": + type = envs.get_global_env("dataset." + dataset["name"] + ".type") + if type != "DataLoader": dataset_class = QueueDataset(context) context["dataset"][dataset[ "name"]] = dataset_class.create_dataset(dataset["name"], diff --git a/core/trainers/framework/runner.py b/core/trainers/framework/runner.py index d11a9c0daeb2f6d1a66c5bd757a77c1cea77d582..46a7a1c49ef2e993b9974263efd07b5e81dbae00 100644 --- a/core/trainers/framework/runner.py +++ b/core/trainers/framework/runner.py @@ -40,6 +40,7 @@ class RunnerBase(object): def _run(self, context, model_dict): reader_name = model_dict["dataset_name"] name = "dataset." + reader_name + "." + if envs.get_global_env(name + "type") == "DataLoader": self._executor_dataloader_train(model_dict, context) else: @@ -156,7 +157,7 @@ class RunnerBase(object): gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized else: raise ValueError( - "Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]." + "Unsupported config. gradient_scale_strategy must be one of [0, 1, 2]." ) _build_strategy.gradient_scale_strategy = gradient_scale_strategy @@ -285,7 +286,7 @@ class SingleRunner(RunnerBase): envs.get_global_env("runner." + context["runner_name"] + ".epochs")) for epoch in range(epochs): - for model_dict in context["env"]["phase"]: + for model_dict in context["phases"]: begin_time = time.time() self._run(context, model_dict) end_time = time.time() @@ -384,7 +385,7 @@ class PslibRunner(RunnerBase): day = begin_day + datetime.timedelta(days=day, hours=hour) day_s = day.strftime('%Y%m%d/%H') - for dataset in context["env"]["dataset"]: + for dataset in envs.get_global_env("dataset"): if dataset["type"] != "DataLoader": name = dataset["name"] train_data_path = envs.get_global_env(name + diff --git a/core/trainers/framework/startup.py b/core/trainers/framework/startup.py index c96042c8cdcb6badb816d6cd4ee754e8820cee07..82e244723ecae607eb8cfc1110c0a29642bd4615 100644 --- a/core/trainers/framework/startup.py +++ b/core/trainers/framework/startup.py @@ -54,7 +54,7 @@ class SingleStartup(StartupBase): pass def startup(self, context): - for model_dict in context["env"]["phase"]: + for model_dict in context["phases"]: with fluid.scope_guard(context["model"][model_dict["name"]][ "scope"]): train_prog = context["model"][model_dict["name"]][ diff --git a/core/trainers/general_trainer.py b/core/trainers/general_trainer.py index eeae63e71eb3c41e549e3e1bdc34c7b42d42cf0c..ded99e1caa903db2eea2c16fa8f811764991f16a 100644 --- a/core/trainers/general_trainer.py +++ b/core/trainers/general_trainer.py @@ -17,10 +17,7 @@ General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + P from __future__ import print_function import os -import time -import warnings -import paddle.fluid as fluid from paddlerec.core.utils import envs from paddlerec.core.trainer import Trainer, EngineMode, FleetMode, Device from paddlerec.core.trainers.framework.dataset import * diff --git a/core/trainers/single_infer.py b/core/trainers/single_infer.py deleted file mode 100755 index 235e101ebbec22032b891c0c3cfe2aacf7c5f79c..0000000000000000000000000000000000000000 --- a/core/trainers/single_infer.py +++ /dev/null @@ -1,369 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Training use fluid with one node only. -""" - -from __future__ import print_function - -import time -import logging -import os -import json -import numpy as np -import paddle.fluid as fluid - -from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer -from paddlerec.core.utils import envs -from paddlerec.core.reader import SlotReader -from paddlerec.core.utils import dataloader_instance - -logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger("fluid") -logger.setLevel(logging.INFO) - - -class SingleInfer(TranspileTrainer): - def __init__(self, config=None): - super(TranspileTrainer, self).__init__(config) - self._env = self._config - device = envs.get_global_env("device") - if device == 'gpu': - self._place = fluid.CUDAPlace(0) - elif device == 'cpu': - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - self.processor_register() - self._model = {} - self._dataset = {} - envs.set_global_envs(self._config) - envs.update_workspace() - self._runner_name = envs.get_global_env("mode") - device = envs.get_global_env("runner." + self._runner_name + ".device") - if device == 'gpu': - self._place = fluid.CUDAPlace(0) - elif device == 'cpu': - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - - def processor_register(self): - self.regist_context_processor('uninit', self.instance) - self.regist_context_processor('init_pass', self.init) - self.regist_context_processor('startup_pass', self.startup) - self.regist_context_processor('train_pass', self.executor_train) - self.regist_context_processor('terminal_pass', self.terminal) - - def instance(self, context): - context['status'] = 'init_pass' - - def _get_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_class = envs.get_global_env(name + "data_converter") - abs_dir = os.path.dirname(os.path.abspath(__file__)) - reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - if sparse_slots == "" and dense_slots == "": - pipe_cmd = "python {} {} {} {}".format(reader, reader_class, - "TRAIN", self._config_yaml) - else: - if sparse_slots == "": - sparse_slots = "?" - if dense_slots == "": - dense_slots = "?" - padding = envs.get_global_env(name + "padding", 0) - pipe_cmd = "python {} {} {} {} {} {} {} {}".format( - reader, "slot", "slot", self._config_yaml, "fake", \ - sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding)) - - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_batch_size(envs.get_global_env(name + "batch_size")) - dataset.set_pipe_command(pipe_cmd) - train_data_path = envs.get_global_env(name + "data_path") - file_list = [ - os.path.join(train_data_path, x) - for x in os.listdir(train_data_path) - ] - dataset.set_filelist(file_list) - for model_dict in self._env["phase"]: - if model_dict["dataset_name"] == dataset_name: - model = self._model[model_dict["name"]][3] - inputs = model._infer_data_var - dataset.set_use_var(inputs) - break - return dataset - - def _get_dataloader(self, dataset_name, dataloader): - name = "dataset." + dataset_name + "." - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_class = envs.get_global_env(name + "data_converter") - abs_dir = os.path.dirname(os.path.abspath(__file__)) - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - if sparse_slots == "" and dense_slots == "": - reader = dataloader_instance.dataloader_by_name( - reader_class, dataset_name, self._config_yaml) - reader_class = envs.lazy_instance_by_fliename(reader_class, - "TrainReader") - reader_ins = reader_class(self._config_yaml) - else: - reader = dataloader_instance.slotdataloader_by_name( - "", dataset_name, self._config_yaml) - reader_ins = SlotReader(self._config_yaml) - if hasattr(reader_ins, 'generate_batch_from_trainfiles'): - dataloader.set_sample_list_generator(reader) - else: - dataloader.set_sample_generator(reader, batch_size) - return dataloader - - def _create_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(name + "sparse_slots") - dense_slots = envs.get_global_env(name + "dense_slots") - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - type_name = envs.get_global_env(name + "type") - if envs.get_platform() != "LINUX": - print("platform ", envs.get_platform(), - " change reader to DataLoader") - type_name = "DataLoader" - padding = 0 - - if type_name == "DataLoader": - return None - else: - return self._get_dataset(dataset_name) - - def init(self, context): - for model_dict in self._env["phase"]: - self._model[model_dict["name"]] = [None] * 5 - train_program = fluid.Program() - startup_program = fluid.Program() - scope = fluid.Scope() - dataset_name = model_dict["dataset_name"] - opt_name = envs.get_global_env("hyper_parameters.optimizer.class") - opt_lr = envs.get_global_env( - "hyper_parameters.optimizer.learning_rate") - opt_strategy = envs.get_global_env( - "hyper_parameters.optimizer.strategy") - with fluid.program_guard(train_program, startup_program): - with fluid.unique_name.guard(): - with fluid.scope_guard(scope): - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(self._env["workspace"])) - model = envs.lazy_instance_by_fliename( - model_path, "Model")(self._env) - model._infer_data_var = model.input_data( - is_infer=True, - dataset_name=model_dict["dataset_name"]) - if envs.get_global_env("dataset." + dataset_name + - ".type") == "DataLoader": - model._init_dataloader(is_infer=True) - self._get_dataloader(dataset_name, - model._data_loader) - model.net(model._infer_data_var, True) - self._model[model_dict["name"]][0] = train_program - self._model[model_dict["name"]][1] = startup_program - self._model[model_dict["name"]][2] = scope - self._model[model_dict["name"]][3] = model - self._model[model_dict["name"]][4] = train_program.clone() - - for dataset in self._env["dataset"]: - if dataset["type"] != "DataLoader": - self._dataset[dataset["name"]] = self._create_dataset(dataset[ - "name"]) - - context['status'] = 'startup_pass' - - def startup(self, context): - for model_dict in self._env["phase"]: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - self._exe.run(self._model[model_dict["name"]][1]) - context['status'] = 'train_pass' - - def executor_train(self, context): - epochs = int( - envs.get_global_env("runner." + self._runner_name + ".epochs", 1)) - for j in range(epochs): - for model_dict in self._env["phase"]: - if j == 0: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - train_prog = self._model[model_dict["name"]][0] - startup_prog = self._model[model_dict["name"]][1] - with fluid.program_guard(train_prog, startup_prog): - self.load() - reader_name = model_dict["dataset_name"] - name = "dataset." + reader_name + "." - begin_time = time.time() - if envs.get_global_env(name + "type") == "DataLoader": - self._executor_dataloader_train(model_dict) - else: - self._executor_dataset_train(model_dict) - with fluid.scope_guard(self._model[model_dict["name"]][2]): - train_prog = self._model[model_dict["name"]][4] - startup_prog = self._model[model_dict["name"]][1] - with fluid.program_guard(train_prog, startup_prog): - self.save(j) - end_time = time.time() - seconds = end_time - begin_time - print("epoch {} done, time elasped: {}".format(j, seconds)) - context['status'] = "terminal_pass" - - def _executor_dataset_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - fetch_vars = [] - fetch_alias = [] - fetch_period = int( - envs.get_global_env("runner." + self._runner_name + - ".print_interval", 20)) - metrics = model_class.get_infer_results() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - scope = self._model[model_name][2] - program = self._model[model_name][0] - reader = self._dataset[reader_name] - with fluid.scope_guard(scope): - self._exe.infer_from_dataset( - program=program, - dataset=reader, - fetch_list=fetch_vars, - fetch_info=fetch_alias, - print_period=fetch_period) - - def _executor_dataloader_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - program = self._model[model_name][0].clone() - fetch_vars = [] - fetch_alias = [] - metrics = model_class.get_infer_results() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - metrics_varnames = [] - metrics_format = [] - fetch_period = int( - envs.get_global_env("runner." + self._runner_name + - ".print_interval", 20)) - metrics_format.append("{}: {{}}".format("batch")) - metrics_indexes = dict() - for name, var in metrics.items(): - metrics_varnames.append(var.name) - metrics_indexes[var.name] = len(metrics_varnames) - 1 - metrics_format.append("{}: {{}}".format(name)) - metrics_format = ", ".join(metrics_format) - - reader = self._model[model_name][3]._data_loader - reader.start() - batch_id = 0 - scope = self._model[model_name][2] - - infer_results = [] - with fluid.scope_guard(scope): - try: - while True: - metrics_rets = self._exe.run(program=program, - fetch_list=metrics_varnames, - return_numpy=False) - metrics = [batch_id] - metrics.extend(metrics_rets) - - batch_infer_result = {} - for k, v in metrics_indexes.items(): - batch_infer_result[k] = np.array(metrics_rets[ - v]).tolist() - infer_results.append(batch_infer_result) - - if batch_id % fetch_period == 0 and batch_id != 0: - print(metrics_format.format(*metrics)) - batch_id += 1 - except fluid.core.EOFException: - reader.reset() - - def terminal(self, context): - context['is_exit'] = True - - def load(self, is_fleet=False): - name = "runner." + self._runner_name + "." - dirname = envs.get_global_env(name + "init_model_path", None) - if dirname is None or dirname == "": - return - print("single_infer going to load ", dirname) - if is_fleet: - fleet.load_persistables(self._exe, dirname) - else: - fluid.io.load_persistables(self._exe, dirname) - - def save(self, epoch_id, is_fleet=False): - def need_save(epoch_id, epoch_interval, is_last=False): - if is_last: - return True - if epoch_id == -1: - return False - - return epoch_id % epoch_interval == 0 - - def save_inference_model(): - name = "runner." + self._runner_name + "." - save_interval = int( - envs.get_global_env(name + "save_inference_interval", -1)) - if not need_save(epoch_id, save_interval, False): - return - feed_varnames = envs.get_global_env( - name + "save_inference_feed_varnames", None) - fetch_varnames = envs.get_global_env( - name + "save_inference_fetch_varnames", None) - if feed_varnames is None or fetch_varnames is None or feed_varnames == "": - return - fetch_vars = [ - fluid.default_main_program().global_block().vars[varname] - for varname in fetch_varnames - ] - dirname = envs.get_global_env(name + "save_inference_path", None) - - assert dirname is not None - dirname = os.path.join(dirname, str(epoch_id)) - - if is_fleet: - fleet.save_inference_model(self._exe, dirname, feed_varnames, - fetch_vars) - else: - fluid.io.save_inference_model(dirname, feed_varnames, - fetch_vars, self._exe) - - def save_persistables(): - name = "runner." + self._runner_name + "." - save_interval = int( - envs.get_global_env(name + "save_checkpoint_interval", -1)) - if not need_save(epoch_id, save_interval, False): - return - dirname = envs.get_global_env(name + "save_checkpoint_path", None) - if dirname is None or dirname == "": - return - dirname = os.path.join(dirname, str(epoch_id)) - if is_fleet: - fleet.save_persistables(self._exe, dirname) - else: - fluid.io.save_persistables(self._exe, dirname) - - save_persistables() - save_inference_model() diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py deleted file mode 100755 index d782dfa42f5695c8b04e9e2db6ba92b676b8b0f6..0000000000000000000000000000000000000000 --- a/core/trainers/single_trainer.py +++ /dev/null @@ -1,391 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Training use fluid with one node only. -""" - -from __future__ import print_function - -import time -import logging -import os -import paddle.fluid as fluid - -from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer -from paddlerec.core.utils import envs -from paddlerec.core.reader import SlotReader -from paddlerec.core.utils import dataloader_instance - -logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger("fluid") -logger.setLevel(logging.INFO) - - -class SingleTrainer(TranspileTrainer): - def __init__(self, config=None): - super(TranspileTrainer, self).__init__(config) - self._env = self._config - self.processor_register() - self._model = {} - self._dataset = {} - envs.set_global_envs(self._config) - envs.update_workspace() - self._runner_name = envs.get_global_env("mode") - device = envs.get_global_env("runner." + self._runner_name + ".device") - if device == 'gpu': - self._place = fluid.CUDAPlace(0) - elif device == 'cpu': - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - - def processor_register(self): - self.regist_context_processor('uninit', self.instance) - self.regist_context_processor('init_pass', self.init) - self.regist_context_processor('startup_pass', self.startup) - self.regist_context_processor('train_pass', self.executor_train) - self.regist_context_processor('terminal_pass', self.terminal) - - def instance(self, context): - context['status'] = 'init_pass' - - def _get_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_class = envs.get_global_env(name + "data_converter") - abs_dir = os.path.dirname(os.path.abspath(__file__)) - reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - if sparse_slots == "" and dense_slots == "": - pipe_cmd = "python {} {} {} {}".format(reader, reader_class, - "TRAIN", self._config_yaml) - else: - if sparse_slots == "": - sparse_slots = "?" - if dense_slots == "": - dense_slots = "?" - padding = envs.get_global_env(name + "padding", 0) - pipe_cmd = "python {} {} {} {} {} {} {} {}".format( - reader, "slot", "slot", self._config_yaml, "fake", \ - sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding)) - - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_batch_size(envs.get_global_env(name + "batch_size")) - dataset.set_pipe_command(pipe_cmd) - train_data_path = envs.get_global_env(name + "data_path") - file_list = [ - os.path.join(train_data_path, x) - for x in os.listdir(train_data_path) - ] - dataset.set_filelist(file_list) - for model_dict in self._env["phase"]: - if model_dict["dataset_name"] == dataset_name: - model = self._model[model_dict["name"]][3] - inputs = model._data_var - dataset.set_use_var(inputs) - break - return dataset - - def _get_dataloader(self, dataset_name, dataloader): - name = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_class = envs.get_global_env(name + "data_converter") - abs_dir = os.path.dirname(os.path.abspath(__file__)) - if sparse_slots == "" and dense_slots == "": - reader = dataloader_instance.dataloader_by_name( - reader_class, dataset_name, self._config_yaml) - reader_class = envs.lazy_instance_by_fliename(reader_class, - "TrainReader") - reader_ins = reader_class(self._config_yaml) - else: - reader = dataloader_instance.slotdataloader_by_name( - "", dataset_name, self._config_yaml) - reader_ins = SlotReader(self._config_yaml) - if hasattr(reader_ins, 'generate_batch_from_trainfiles'): - dataloader.set_sample_list_generator(reader) - else: - dataloader.set_sample_generator(reader, batch_size) - return dataloader - - def _create_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() - dense_slots = envs.get_global_env(name + "dense_slots", "").strip() - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - type_name = envs.get_global_env(name + "type") - if envs.get_platform() != "LINUX": - print("platform ", envs.get_platform(), - " change reader to DataLoader") - type_name = "DataLoader" - padding = 0 - - if type_name == "DataLoader": - return None - else: - return self._get_dataset(dataset_name) - - def init(self, context): - for model_dict in self._env["phase"]: - self._model[model_dict["name"]] = [None] * 5 - train_program = fluid.Program() - startup_program = fluid.Program() - scope = fluid.Scope() - dataset_name = model_dict["dataset_name"] - with fluid.program_guard(train_program, startup_program): - with fluid.unique_name.guard(): - with fluid.scope_guard(scope): - model_path = model_dict["model"].replace( - "{workspace}", - envs.path_adapter(self._env["workspace"])) - model = envs.lazy_instance_by_fliename( - model_path, "Model")(self._env) - model._data_var = model.input_data( - dataset_name=model_dict["dataset_name"]) - if envs.get_global_env("dataset." + dataset_name + - ".type") == "DataLoader": - model._init_dataloader(is_infer=False) - self._get_dataloader(dataset_name, - model._data_loader) - model.net(model._data_var, False) - optimizer = model.optimizer() - optimizer.minimize(model._cost) - self._model[model_dict["name"]][0] = train_program - self._model[model_dict["name"]][1] = startup_program - self._model[model_dict["name"]][2] = scope - self._model[model_dict["name"]][3] = model - self._model[model_dict["name"]][4] = train_program.clone() - - for dataset in self._env["dataset"]: - if dataset["type"] != "DataLoader": - self._dataset[dataset["name"]] = self._create_dataset(dataset[ - "name"]) - - context['status'] = 'startup_pass' - - def startup(self, context): - for model_dict in self._env["phase"]: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - self._exe.run(self._model[model_dict["name"]][1]) - context['status'] = 'train_pass' - - def executor_train(self, context): - epochs = int( - envs.get_global_env("runner." + self._runner_name + ".epochs")) - for j in range(epochs): - for model_dict in self._env["phase"]: - if j == 0: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - train_prog = self._model[model_dict["name"]][0] - startup_prog = self._model[model_dict["name"]][1] - with fluid.program_guard(train_prog, startup_prog): - self.load() - reader_name = model_dict["dataset_name"] - name = "dataset." + reader_name + "." - begin_time = time.time() - if envs.get_global_env(name + "type") == "DataLoader": - self._executor_dataloader_train(model_dict) - else: - self._executor_dataset_train(model_dict) - with fluid.scope_guard(self._model[model_dict["name"]][2]): - train_prog = self._model[model_dict["name"]][4] - startup_prog = self._model[model_dict["name"]][1] - with fluid.program_guard(train_prog, startup_prog): - self.save(j) - end_time = time.time() - seconds = end_time - begin_time - print("epoch {} done, time elasped: {}".format(j, seconds)) - context['status'] = "terminal_pass" - - def _executor_dataset_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - fetch_vars = [] - fetch_alias = [] - fetch_period = int( - envs.get_global_env("runner." + self._runner_name + - ".print_interval", 20)) - metrics = model_class.get_metrics() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - scope = self._model[model_name][2] - program = self._model[model_name][0] - reader = self._dataset[reader_name] - threads = model_dict.get("thread_num", 1) - with fluid.scope_guard(scope): - self._exe.train_from_dataset( - program=program, - dataset=reader, - thread=threads, - fetch_list=fetch_vars, - fetch_info=fetch_alias, - print_period=fetch_period) - - def _executor_dataloader_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - program = self._model[model_name][0].clone() - - _build_strategy = fluid.BuildStrategy() - _exe_strategy = fluid.ExecutionStrategy() - - # 0: kCoeffNumDevice; 1: One; 2: Customized - _gradient_scale_strategy = model_dict.get("gradient_scale_strategy", 0) - if _gradient_scale_strategy == 0: - gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.CoeffNumDevice - elif _gradient_scale_strategy == 1: - gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.One - elif _gradient_scale_strategy == 2: - gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized - else: - raise ValueError( - "Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]." - ) - _build_strategy.gradient_scale_strategy = gradient_scale_strategy - - if "thread_num" in model_dict and model_dict["thread_num"] > 1: - _build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce - _exe_strategy.num_threads = model_dict["thread_num"] - os.environ['CPU_NUM'] = str(_exe_strategy.num_threads) - - program = fluid.compiler.CompiledProgram(program).with_data_parallel( - loss_name=model_class.get_avg_cost().name, - build_strategy=_build_strategy, - exec_strategy=_exe_strategy) - - fetch_vars = [] - fetch_alias = [] - fetch_period = int( - envs.get_global_env("runner." + self._runner_name + - ".print_interval", 20)) - metrics = model_class.get_metrics() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - metrics_varnames = [] - metrics_format = [] - metrics_format.append("{}: {{}}".format("batch")) - for name, var in metrics.items(): - metrics_varnames.append(var.name) - metrics_format.append("{}: {{}}".format(name)) - metrics_format = ", ".join(metrics_format) - - reader = self._model[model_name][3]._data_loader - reader.start() - batch_id = 0 - scope = self._model[model_name][2] - with fluid.scope_guard(scope): - try: - while True: - metrics_rets = self._exe.run(program=program, - fetch_list=metrics_varnames) - metrics = [batch_id] - metrics.extend(metrics_rets) - - if batch_id % fetch_period == 0 and batch_id != 0: - print(metrics_format.format(*metrics)) - batch_id += 1 - except fluid.core.EOFException: - reader.reset() - - def terminal(self, context): - context['is_exit'] = True - - def load(self, is_fleet=False): - dirname = envs.get_global_env( - "runner." + self._runner_name + ".init_model_path", None) - load_vars = envs.get_global_env( - "runner." + self._runner_name + ".load_vars", None) - - def name_has_embedding(var): - res = False - for var_name in load_vars: - if var_name == var.name: - return True - return res - - if dirname is None or dirname == "": - return - print("going to load ", dirname) - if is_fleet: - fleet.load_persistables(self._exe, dirname) - else: - if load_vars is None or len(load_vars) == 0: - fluid.io.load_persistables(self._exe, dirname) - else: - fluid.io.load_vars( - self._exe, dirname, predicate=name_has_embedding) - - def save(self, epoch_id, is_fleet=False): - def need_save(epoch_id, epoch_interval, is_last=False): - if is_last: - return True - if epoch_id == -1: - return False - - return epoch_id % epoch_interval == 0 - - def save_inference_model(): - name = "runner." + self._runner_name + "." - save_interval = int( - envs.get_global_env(name + "save_inference_interval", -1)) - if not need_save(epoch_id, save_interval, False): - return - feed_varnames = envs.get_global_env( - name + "save_inference_feed_varnames", []) - fetch_varnames = envs.get_global_env( - name + "save_inference_fetch_varnames", []) - if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \ - len(feed_varnames) == 0 or len(fetch_varnames) == 0: - return - fetch_vars = [ - fluid.default_main_program().global_block().vars[varname] - for varname in fetch_varnames - ] - dirname = envs.get_global_env(name + "save_inference_path", None) - - assert dirname is not None - dirname = os.path.join(dirname, str(epoch_id)) - - if is_fleet: - fleet.save_inference_model(self._exe, dirname, feed_varnames, - fetch_vars) - else: - fluid.io.save_inference_model(dirname, feed_varnames, - fetch_vars, self._exe) - - def save_persistables(): - name = "runner." + self._runner_name + "." - save_interval = int( - envs.get_global_env(name + "save_checkpoint_interval", -1)) - if not need_save(epoch_id, save_interval, False): - return - dirname = envs.get_global_env(name + "save_checkpoint_path", None) - if dirname is None or dirname == "": - return - dirname = os.path.join(dirname, str(epoch_id)) - if is_fleet: - fleet.save_persistables(self._exe, dirname) - else: - fluid.io.save_persistables(self._exe, dirname) - - save_persistables() - save_inference_model() diff --git a/core/utils/envs.py b/core/utils/envs.py index bbb2a824e6d3c5268d6ae17caed99410f714f353..36993a93a9ea1f83f1cd24fc80a1b9623b61b445 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -20,9 +20,8 @@ import socket import sys import traceback -import yaml - global_envs = {} +global_envs_flatten = {} def flatten_environs(envs, separator="."): @@ -92,6 +91,16 @@ def set_global_envs(envs): fatten_env_namespace([], envs) + for name, value in global_envs.items(): + if isinstance(value, str): + value = os_path_adapter(workspace_adapter(value)) + global_envs[name] = value + + if get_platform() != "LINUX": + for dataset in envs["dataset"]: + name = ".".join(["dataset", dataset["name"], "type"]) + global_envs[name] = "DataLoader" + def get_global_env(env_name, default_value=None, namespace=None): """ @@ -106,7 +115,7 @@ def get_global_envs(): return global_envs -def path_adapter(path): +def paddlerec_adapter(path): if path.startswith("paddlerec."): package = get_runtime_environ("PACKAGE_BASE") l_p = path.split("paddlerec.")[1].replace(".", "/") @@ -115,24 +124,28 @@ def path_adapter(path): return path -def windows_path_converter(path): +def os_path_adapter(value): if get_platform() == "WINDOWS": - return path.replace("/", "\\") + value = value.replace("/", "\\") else: - return path.replace("\\", "/") + value = value.replace("\\", "/") + return value -def update_workspace(): +def workspace_adapter(value): workspace = global_envs.get("workspace") - if not workspace: + workspace = paddlerec_adapter(workspace) + value = value.replace("{workspace}", workspace) + return value + + +def reader_adapter(): + if get_platform() != "WINDOWS": return - workspace = path_adapter(workspace) - for name, value in global_envs.items(): - if isinstance(value, str): - value = value.replace("{workspace}", workspace) - value = windows_path_converter(value) - global_envs[name] = value + datasets = global_envs.get("dataset") + for dataset in datasets: + dataset["type"] = "DataLoader" def pretty_print_envs(envs, header=None): diff --git a/doc/custom_reader.md b/doc/custom_reader.md index bf0e5433b06c90dab7440b7dbf19eaa6ef975d1c..748f82c5499721857a011b08076af1b732914cc0 100644 --- a/doc/custom_reader.md +++ b/doc/custom_reader.md @@ -208,7 +208,7 @@ CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www 稀疏参数输入的定义: ```python def sparse_inputs(): - ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self._namespace) + ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None) sparse_input_ids = [ fluid.layers.data(name="S" + str(i), @@ -222,7 +222,7 @@ def sparse_inputs(): 稠密参数输入的定义: ```python def dense_input(): - dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self._namespace) + dim = envs.get_global_env("hyper_parameters.dense_input_dim", None) dense_input_var = fluid.layers.data(name="D", shape=[dim], diff --git a/doc/design.md b/doc/design.md index c09889dc437e001d617625c93e6eac2211bd1072..2a9d94533d2c40bb2f96b97fd2873364ee62f0cd 100644 --- a/doc/design.md +++ b/doc/design.md @@ -111,7 +111,7 @@ Engine的自定义实现,可以参考[local_cluster.py](../core/engine/local_c 我们以GeneralTrainer为例,概览Trainer行为: ```python -class SingleTrainer(TranspileTrainer): +class GeneralTrainer(Trainer): def processor_register(self): print("processor_register begin") self.regist_context_processor('uninit', self.instance) @@ -151,7 +151,6 @@ class Model(object): self._data_loader = None self._infer_data_loader = None self._fetch_interval = 20 - self._namespace = "train.model" self._platform = envs.get_platform() def get_inputs(self): @@ -211,7 +210,6 @@ class Reader(dg.MultiSlotDataGenerator): dg.MultiSlotDataGenerator.__init__(self) _config = envs.load_yaml(config) envs.set_global_envs(_config) - envs.update_workspace() @abc.abstractmethod def init(self): @@ -231,9 +229,6 @@ class Reader(dg.MultiSlotDataGenerator): 完成reader的构建工作。 -Reader数据处理的逻辑,可以参考[criteo_reader.py](../../models/rank/../../paddlerec/models/rank/criteo_reader.py) - - ## Metric diff --git a/doc/model.md b/doc/model.md index ca76dd360c9c9740b1d607e78682c80aded4898e..d4c09733bc2489bafb811d74489f6e6556788e9e 100644 --- a/doc/model.md +++ b/doc/model.md @@ -24,8 +24,7 @@ hyper_parameters: ```python if name == "SGD": - reg = envs.get_global_env("hyper_parameters.reg", 0.0001, - self._namespace) + reg = envs.get_global_env("hyper_parameters.reg", 0.0001) optimizer_i = fluid.optimizer.SGD( lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) elif name == "ADAM": diff --git a/doc/predict.md b/doc/predict.md index 4957713d4a9973c910ea7ce9bdb156589fa730ae..c26fdf9ab438dc14369a07d55e101edaade7eda7 100644 --- a/doc/predict.md +++ b/doc/predict.md @@ -13,6 +13,7 @@ runner: device: cpu # 执行在 cpu 上 init_model_path: "init_model" # 指定初始化模型的地址 print_interval: 10 # 预测信息的打印间隔,以batch为单位 + phases: phase_infer ``` 再定义具体的执行内容: diff --git a/doc/train.md b/doc/train.md index 3987bdc5fe583af3217e508dcdf0a5d6d5f7dbb7..e629eb628521e0cd6012348db7222517218690d6 100644 --- a/doc/train.md +++ b/doc/train.md @@ -65,7 +65,7 @@ python -m paddlerec.run -m paddlerec.models.recall.word2vec - **`runner`** : runner是训练的引擎,亦可称之为运行器,在runner中定义执行设备(cpu、gpu),执行的模式(训练、预测、单机、多机等),以及运行的超参,例如训练轮数,模型保存地址等。 - **`phase`** : phase是训练中的阶段的概念,是引擎具体执行的内容,该内容是指:具体运行哪个模型文件,使用哪个reader。 -PaddleRec每次运行时,会执行一个运行器,通过`mode`指定`runner`的名字。每个运行器可以执行多个`phase`,所以PaddleRec支持一键启动多阶段的训练。 +PaddleRec每次运行时,会执行一个或多个运行器,通过`mode`指定`runner`的名字。每个运行器可以执行一个或多个`phase`,所以PaddleRec支持一键启动多阶段的训练。 ### 单机CPU训练 @@ -73,6 +73,7 @@ PaddleRec每次运行时,会执行一个运行器,通过`mode`指定`runner` ```yaml mode: single_cpu_train # 执行名为 single_cpu_train 的运行器 +# mode 也支持多个runner的执行,此处可以改为 mode: [single_cpu_train, single_cpu_infer] runner: - name: single_cpu_train # 定义 runner 名为 single_cpu_train @@ -88,6 +89,8 @@ runner: save_inference_fetch_varnames: [] # inference model 的fetch参数的名字 init_model_path: "" # 如果是加载模型热启,则可以指定初始化模型的地址 print_interval: 10 # 训练信息的打印间隔,以batch为单位 + phases: [phase_train] # 若没有指定phases,则会默认运行所有phase + # phase 也支持自定多个phase的执行,此处可以改为 phases: [phase_train, phase_infer] ``` 再定义具体的执行内容: diff --git a/models/rank/dataset/Criteo_data/get_slot_data.py b/models/rank/dataset/Criteo_data/get_slot_data.py index 5d77ec0cd71b22331b96d8e4ac7d75c5b9c29c8a..b9a4adc3feff53499f4ff190d4a321bc58cf2033 100644 --- a/models/rank/dataset/Criteo_data/get_slot_data.py +++ b/models/rank/dataset/Criteo_data/get_slot_data.py @@ -12,10 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import yaml -import os - -from paddlerec.core.reader import ReaderBase from paddlerec.core.utils import envs import paddle.fluid.incubate.data_generator as dg try: @@ -27,12 +23,7 @@ except ImportError: class Reader(dg.MultiSlotDataGenerator): def __init__(self, config): dg.MultiSlotDataGenerator.__init__(self) - - if os.path.isfile(config): - with open(config, 'r') as rb: - _config = yaml.load(rb.read(), Loader=yaml.FullLoader) - else: - raise ValueError("reader config only support yaml") + _config = envs.load_yaml(config) def init(self): self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] diff --git a/models/rank/dnn/config.yaml b/models/rank/dnn/config.yaml index 603e6399f0971e11be9fc5c42a2e216f27973389..539cbb00fbb83197b120238a584ae13d348cc49e 100755 --- a/models/rank/dnn/config.yaml +++ b/models/rank/dnn/config.yaml @@ -51,7 +51,7 @@ hyper_parameters: fc_sizes: [512, 256, 128, 32] # select runner by name -mode: single_cpu_train +mode: [single_cpu_train, single_cpu_infer] # config of each runner. # runner is a kind of paddle training class, which wraps the train/infer process. runner: @@ -69,21 +69,8 @@ runner: save_inference_fetch_varnames: [] # fetch vars of save inference init_model_path: "" # load model path print_interval: 10 -- name: single_gpu_train - class: train - # num of epochs - epochs: 4 - # device to run training or infer - device: gpu - selected_gpus: "2" - save_checkpoint_interval: 2 # save model interval of epochs - save_inference_interval: 4 # save inference - save_checkpoint_path: "increment" # save checkpoint path - save_inference_path: "inference" # save inference path - save_inference_feed_varnames: [] # feed vars of save inference - save_inference_fetch_varnames: [] # fetch vars of save inference - init_model_path: "" # load model path - print_interval: 10 + phases: [phase1] + - name: single_cpu_infer class: infer # num of epochs @@ -91,31 +78,7 @@ runner: # device to run training or infer device: cpu init_model_path: "increment/0" # load model path -- name: local_cluster_cpu_ps_train - class: local_cluster - epochs: 4 - device: cpu - save_checkpoint_interval: 2 # save model interval of epochs - save_inference_interval: 4 # save inference - save_checkpoint_path: "increment" # save checkpoint path - save_inference_path: "inference" # save inference path - save_inference_feed_varnames: [] # feed vars of save inference - save_inference_fetch_varnames: [] # fetch vars of save inference - init_model_path: "" # load model path - print_interval: 1 -- name: multi_gpu_train - class: train - epochs: 4 - device: gpu - selected_gpus: "2,3" - save_checkpoint_interval: 2 # save model interval of epochs - save_inference_interval: 4 # save inference - save_checkpoint_path: "increment" # save checkpoint path - save_inference_path: "inference" # save inference path - save_inference_feed_varnames: [] # feed vars of save inference - save_inference_fetch_varnames: [] # fetch vars of save inference - init_model_path: "" # load model path - print_interval: 10 + phases: [phase2] # runner will run all the phase in each epoch phase: @@ -123,7 +86,8 @@ phase: model: "{workspace}/model.py" # user-defined model dataset_name: dataloader_train # select dataset by name thread_num: 1 -#- name: phase2 -# model: "{workspace}/model.py" # user-defined model -# dataset_name: dataset_infer # select dataset by name -# thread_num: 1 + +- name: phase2 + model: "{workspace}/model.py" # user-defined model + dataset_name: dataset_infer # select dataset by name + thread_num: 1 diff --git a/models/rank/logistic_regression/data/get_slot_data.py b/models/rank/logistic_regression/data/get_slot_data.py index 1a40a9bfae046e5f53e8f0d396c2fc2cbf193929..f0b95a79c70d7c9da8b327884c17206e6b76f731 100644 --- a/models/rank/logistic_regression/data/get_slot_data.py +++ b/models/rank/logistic_regression/data/get_slot_data.py @@ -12,10 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import yaml -import os - -from paddlerec.core.reader import ReaderBase from paddlerec.core.utils import envs import paddle.fluid.incubate.data_generator as dg @@ -28,12 +24,7 @@ except ImportError: class Reader(dg.MultiSlotDataGenerator): def __init__(self, config): dg.MultiSlotDataGenerator.__init__(self) - - if os.path.isfile(config): - with open(config, 'r') as rb: - _config = yaml.load(rb.read(), Loader=yaml.FullLoader) - else: - raise ValueError("reader config only support yaml") + _config = envs.load_yaml(config) def init(self): self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] diff --git a/models/rank/nfm/data/get_slot_data.py b/models/rank/nfm/data/get_slot_data.py index 5d77ec0cd71b22331b96d8e4ac7d75c5b9c29c8a..b9a4adc3feff53499f4ff190d4a321bc58cf2033 100644 --- a/models/rank/nfm/data/get_slot_data.py +++ b/models/rank/nfm/data/get_slot_data.py @@ -12,10 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import yaml -import os - -from paddlerec.core.reader import ReaderBase from paddlerec.core.utils import envs import paddle.fluid.incubate.data_generator as dg try: @@ -27,12 +23,7 @@ except ImportError: class Reader(dg.MultiSlotDataGenerator): def __init__(self, config): dg.MultiSlotDataGenerator.__init__(self) - - if os.path.isfile(config): - with open(config, 'r') as rb: - _config = yaml.load(rb.read(), Loader=yaml.FullLoader) - else: - raise ValueError("reader config only support yaml") + _config = envs.load_yaml(config) def init(self): self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] diff --git a/run.py b/run.py index 23a55d581714cf9796d047ad1d70bc63a47afb13..881abcc6ae6bc8022d76b9672a9fabb424ebcebf 100755 --- a/run.py +++ b/run.py @@ -18,11 +18,9 @@ import sys import argparse import tempfile -import yaml import copy from paddlerec.core.factory import TrainerFactory from paddlerec.core.utils import envs -from paddlerec.core.utils import validation from paddlerec.core.utils import util from paddlerec.core.utils import validation @@ -96,35 +94,61 @@ def get_all_inters_from_yaml(file, filters): return ret -def get_engine(args): +def get_modes(running_config): + if not isinstance(running_config, dict): + raise ValueError("get_modes arguments must be [dict]") + + modes = running_config.get("mode") + if not modes: + raise ValueError("yaml mast have config: mode") + + if isinstance(modes, str): + modes = [modes] + + return modes + + +def get_engine(args, running_config, mode): transpiler = get_transpiler() - with open(args.model, 'r') as rb: - _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) - run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) - engine = run_extras.get("train.engine", None) - if engine is None: - engine = run_extras.get("runner." + _envs["mode"] + ".class", None) + engine_class = ".".join(["runner", mode, "class"]) + engine_device = ".".join(["runner", mode, "device"]) + device_gpu_choices = ".".join(["runner", mode, "device", "selected_gpus"]) + + engine = running_config.get(engine_class, None) if engine is None: - engine = "train" + raise ValueError("not find {} in yaml, please check".format( + mode, engine_class)) + device = running_config.get(engine_device, None) + + engine = engine.upper() + device = device.upper() + + if device is None: + print("not find device be specified in yaml, set CPU as default") + device = "CPU" + + if device == "GPU": + selected_gpus = running_config.get(device_gpu_choices, None) + + if selected_gpus is None: + print( + "not find selected_gpus be specified in yaml, set `0` as default" + ) + selected_gpus = ["0"] + else: + print("selected_gpus {} will be specified for running".format( + selected_gpus)) - device = run_extras.get("runner." + _envs["mode"] + ".device", "CPU") - if device.upper() == "GPU": - selected_gpus = run_extras.get( - "runner." + _envs["mode"] + ".selected_gpus", "0") selected_gpus_num = len(selected_gpus.split(",")) if selected_gpus_num > 1: engine = "LOCAL_CLUSTER" - engine = engine.upper() if engine not in engine_choices: - raise ValueError("runner.class can not be chosen in {}".format( - engine_choices)) - - print("engines: \n{}".format(engines)) + raise ValueError("{} can not be chosen in {}".format(engine_class, + engine_choices)) run_engine = engines[transpiler].get(engine, None) - return run_engine @@ -146,12 +170,7 @@ def set_runtime_envs(cluster_envs, engine_yaml): if cluster_envs is None: cluster_envs = {} - engine_extras = get_inters_from_yaml(engine_yaml, "train.trainer.") - if "train.trainer.threads" in engine_extras and "CPU_NUM" in cluster_envs: - cluster_envs["CPU_NUM"] = engine_extras["train.trainer.threads"] - envs.set_runtime_environs(cluster_envs) - envs.set_runtime_environs(engine_extras) need_print = {} for k, v in os.environ.items(): @@ -162,29 +181,31 @@ def set_runtime_envs(cluster_envs, engine_yaml): def single_train_engine(args): - _envs = envs.load_yaml(args.model) - run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) - trainer_class = run_extras.get( - "runner." + _envs["mode"] + ".trainer_class", None) + run_extras = get_all_inters_from_yaml(args.model, ["runner."]) + mode = envs.get_runtime_environ("mode") + trainer_class = ".".join(["runner", mode, "trainer_class"]) + fleet_class = ".".join(["runner", mode, "fleet_mode"]) + device_class = ".".join(["runner", mode, "device"]) + selected_gpus_class = ".".join(["runner", mode, "selected_gpus"]) + + trainer = run_extras.get(trainer_class, "GeneralTrainer") + fleet_mode = run_extras.get(fleet_class, "ps") + device = run_extras.get(device_class, "cpu") + selected_gpus = run_extras.get(selected_gpus_class, "0") + executor_mode = "train" - if trainer_class: - trainer = trainer_class - else: - trainer = "GeneralTrainer" + single_envs = {} - executor_mode = "train" - fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode", - "ps") - device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu") - selected_gpus = run_extras.get( - "runner." + _envs["mode"] + ".selected_gpus", "0") - selected_gpus_num = len(selected_gpus.split(",")) if device.upper() == "GPU": - assert selected_gpus_num == 1, "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS" + selected_gpus_num = len(selected_gpus.split(",")) + if selected_gpus_num != 1: + raise ValueError( + "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS" + ) + + single_envs["selsected_gpus"] = selected_gpus + single_envs["FLAGS_selected_gpus"] = selected_gpus - single_envs = {} - single_envs["selsected_gpus"] = selected_gpus - single_envs["FLAGS_selected_gpus"] = selected_gpus single_envs["train.trainer.trainer"] = trainer single_envs["fleet_mode"] = fleet_mode single_envs["train.trainer.executor_mode"] = executor_mode @@ -199,29 +220,32 @@ def single_train_engine(args): def single_infer_engine(args): _envs = envs.load_yaml(args.model) - run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) - trainer_class = run_extras.get( - "runner." + _envs["mode"] + ".trainer_class", None) - - if trainer_class: - trainer = trainer_class - else: - trainer = "GeneralTrainer" - + run_extras = get_all_inters_from_yaml(args.model, ["runner."]) + + mode = envs.get_runtime_environ("mode") + trainer_class = ".".join(["runner", mode, "trainer_class"]) + fleet_class = ".".join(["runner", mode, "fleet_mode"]) + device_class = ".".join(["runner", mode, "device"]) + selected_gpus_class = ".".join(["runner", mode, "selected_gpus"]) + + trainer = run_extras.get(trainer_class, "GeneralTrainer") + fleet_mode = run_extras.get(fleet_class, "ps") + device = run_extras.get(device_class, "cpu") + selected_gpus = run_extras.get(selected_gpus_class, "0") executor_mode = "infer" - fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode", - "ps") - device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu") - selected_gpus = run_extras.get( - "runner." + _envs["mode"] + ".selected_gpus", "0") - selected_gpus_num = len(selected_gpus.split(",")) + single_envs = {} + if device.upper() == "GPU": - assert selected_gpus_num == 1, "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS" + selected_gpus_num = len(selected_gpus.split(",")) + if selected_gpus_num != 1: + raise ValueError( + "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS" + ) + + single_envs["selsected_gpus"] = selected_gpus + single_envs["FLAGS_selected_gpus"] = selected_gpus - single_envs = {} - single_envs["selected_gpus"] = selected_gpus - single_envs["FLAGS_selected_gpus"] = selected_gpus single_envs["train.trainer.trainer"] = trainer single_envs["train.trainer.executor_mode"] = executor_mode single_envs["fleet_mode"] = fleet_mode @@ -235,18 +259,6 @@ def single_infer_engine(args): def cluster_engine(args): - def update_workspace(cluster_envs): - workspace = cluster_envs.get("engine_workspace", None) - - if not workspace: - return - path = envs.path_adapter(workspace) - for name, value in cluster_envs.items(): - if isinstance(value, str): - value = value.replace("{workspace}", path) - value = envs.windows_path_converter(value) - cluster_envs[name] = value - def master(): role = "MASTER" from paddlerec.core.engine.cluster.cluster import ClusterEngine @@ -255,8 +267,6 @@ def cluster_engine(args): flattens["engine_role"] = role flattens["engine_run_config"] = args.model flattens["engine_temp_path"] = tempfile.mkdtemp() - update_workspace(flattens) - envs.set_runtime_environs(flattens) print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value"))) @@ -424,7 +434,7 @@ def local_mpi_engine(args): def get_abs_model(model): if model.startswith("paddlerec."): - dir = envs.path_adapter(model) + dir = envs.paddlerec_adapter(model) path = os.path.join(dir, "config.yaml") else: if not os.path.isfile(model): @@ -442,11 +452,17 @@ if __name__ == "__main__": envs.set_runtime_environs({"PACKAGE_BASE": abs_dir}) args = parser.parse_args() - model_name = args.model.split('.')[-1] args.model = get_abs_model(args.model) + if not validation.yaml_validation(args.model): sys.exit(-1) + engine_registry() - which_engine = get_engine(args) - engine = which_engine(args) - engine.run() + running_config = get_all_inters_from_yaml(args.model, ["mode", "runner."]) + modes = get_modes(running_config) + + for mode in modes: + envs.set_runtime_environs({"mode": mode}) + which_engine = get_engine(args, running_config, mode) + engine = which_engine(args) + engine.run() diff --git a/setup.py b/setup.py index b3e3a362333a9edf571477f92cd8f66eebd4dedc..5720ccccbe9b0ef8376094091a3515d78454a53a 100644 --- a/setup.py +++ b/setup.py @@ -34,27 +34,28 @@ about["__url__"] = "https://github.com/PaddlePaddle/PaddleRec" readme = "" -def run_cmd(command): - assert command is not None and isinstance(command, str) - return os.popen(command).read().strip() - - def build(dirname): package_dir = os.path.dirname(os.path.abspath(__file__)) - run_cmd("cp -r {}/* {}".format(package_dir, dirname)) - run_cmd("mkdir {}".format(os.path.join(dirname, "paddlerec"))) - run_cmd("mv {} {}".format( - os.path.join(dirname, "core"), os.path.join(dirname, "paddlerec"))) - run_cmd("mv {} {}".format( - os.path.join(dirname, "doc"), os.path.join(dirname, "paddlerec"))) - run_cmd("mv {} {}".format( - os.path.join(dirname, "models"), os.path.join(dirname, "paddlerec"))) - run_cmd("mv {} {}".format( - os.path.join(dirname, "tests"), os.path.join(dirname, "paddlerec"))) - run_cmd("mv {} {}".format( - os.path.join(dirname, "tools"), os.path.join(dirname, "paddlerec"))) - run_cmd("mv {} {}".format( - os.path.join(dirname, "*.py"), os.path.join(dirname, "paddlerec"))) + shutil.copytree( + package_dir, dirname, ignore=shutil.ignore_patterns(".git")) + os.mkdir(os.path.join(dirname, "paddlerec")) + shutil.move( + os.path.join(dirname, "core"), os.path.join(dirname, "paddlerec")) + shutil.move( + os.path.join(dirname, "doc"), os.path.join(dirname, "paddlerec")) + shutil.move( + os.path.join(dirname, "models"), os.path.join(dirname, "paddlerec")) + shutil.move( + os.path.join(dirname, "tests"), os.path.join(dirname, "paddlerec")) + shutil.move( + os.path.join(dirname, "tools"), os.path.join(dirname, "paddlerec")) + + for f in os.listdir(dirname): + if os.path.isdir(f): + continue + if os.path.splitext(f)[1] == ".py": + shutil.move( + os.path.join(dirname, f), os.path.join(dirname, "paddlerec")) packages = find_packages(dirname, include=('paddlerec.*')) package_dir = {'': dirname} @@ -90,7 +91,7 @@ def build(dirname): zip_safe=False) -dirname = tempfile.mkdtemp() +dirname = tempfile.mktemp() build(dirname) shutil.rmtree(dirname)