diff --git a/README.md b/README.md index ed205a677bcb1136696a5bdc2e6dadca4f079094..84c53d2a06ee7b52ae7c89187fb0316730390f01 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,7 @@ python -m paddlerec.run -m models/rank/dnn/config.yaml * [启动分布式训练](doc/distributed_train.md) * [启动预测](doc/predict.md) * [快速部署](doc/serving.md) +* [预训练模型](doc/pre_train_model.md) ### 进阶教程 diff --git a/core/factory.py b/core/factory.py index 9430c88283800e69db7043aa141b6f735212c79f..95e0e7778141ad76d1166205213bccdaae67aff7 100755 --- a/core/factory.py +++ b/core/factory.py @@ -22,6 +22,19 @@ trainers = {} def trainer_registry(): + trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py") + trainers["ClusterTrainer"] = os.path.join(trainer_abs, + "cluster_trainer.py") + trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, + "ctr_coding_trainer.py") + trainers["CtrModulTrainer"] = os.path.join(trainer_abs, + "ctr_modul_trainer.py") + trainers["TDMSingleTrainer"] = os.path.join(trainer_abs, + "tdm_single_trainer.py") + trainers["TDMClusterTrainer"] = os.path.join(trainer_abs, + "tdm_cluster_trainer.py") + trainers["OnlineLearningTrainer"] = os.path.join( + trainer_abs, "online_learning_trainer.py") # Definition of procedure execution process trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, "ctr_coding_trainer.py") diff --git a/core/trainers/finetuning_trainer.py b/core/trainers/finetuning_trainer.py new file mode 100644 index 0000000000000000000000000000000000000000..4525a18867ff232121256c876c185c502427c130 --- /dev/null +++ b/core/trainers/finetuning_trainer.py @@ -0,0 +1,140 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + PS/COLLECTIVE +""" +from __future__ import print_function + +import os + +from paddlerec.core.utils import envs +from paddlerec.core.trainer import Trainer, EngineMode, FleetMode + + +class FineTuningTrainer(Trainer): + """ + Trainer for various situations + """ + + def __init__(self, config=None): + Trainer.__init__(self, config) + self.processor_register() + self.abs_dir = os.path.dirname(os.path.abspath(__file__)) + self.runner_env_name = "runner." + self._context["runner_name"] + + def processor_register(self): + print("processor_register begin") + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('network_pass', self.network) + self.regist_context_processor('startup_pass', self.startup) + self.regist_context_processor('train_pass', self.runner) + self.regist_context_processor('terminal_pass', self.terminal) + + def instance(self, context): + instance_class_path = envs.get_global_env( + self.runner_env_name + ".instance_class_path", default_value=None) + if instance_class_path: + instance_class = envs.lazy_instance_by_fliename( + instance_class_path, "Instance")(context) + else: + if self.engine == EngineMode.SINGLE: + instance_class_name = "SingleInstance" + else: + raise ValueError( + "FineTuningTrainer can only support SingleTraining.") + + instance_path = os.path.join(self.abs_dir, "framework", + "instance.py") + + instance_class = envs.lazy_instance_by_fliename( + instance_path, instance_class_name)(context) + + instance_class.instance(context) + + def network(self, context): + network_class_path = envs.get_global_env( + self.runner_env_name + ".network_class_path", default_value=None) + if network_class_path: + network_class = envs.lazy_instance_by_fliename(network_class_path, + "Network")(context) + else: + if self.engine == EngineMode.SINGLE: + network_class_name = "FineTuningNetwork" + else: + raise ValueError( + "FineTuningTrainer can only support SingleTraining.") + + network_path = os.path.join(self.abs_dir, "framework", + "network.py") + network_class = envs.lazy_instance_by_fliename( + network_path, network_class_name)(context) + + network_class.build_network(context) + + def startup(self, context): + startup_class_path = envs.get_global_env( + self.runner_env_name + ".startup_class_path", default_value=None) + if startup_class_path: + startup_class = envs.lazy_instance_by_fliename(startup_class_path, + "Startup")(context) + else: + if self.engine == EngineMode.SINGLE and not context["is_infer"]: + startup_class_name = "FineTuningStartup" + else: + raise ValueError( + "FineTuningTrainer can only support SingleTraining.") + + startup_path = os.path.join(self.abs_dir, "framework", + "startup.py") + + startup_class = envs.lazy_instance_by_fliename( + startup_path, startup_class_name)(context) + startup_class.startup(context) + + def runner(self, context): + runner_class_path = envs.get_global_env( + self.runner_env_name + ".runner_class_path", default_value=None) + if runner_class_path: + runner_class = envs.lazy_instance_by_fliename(runner_class_path, + "Runner")(context) + else: + if self.engine == EngineMode.SINGLE and not context["is_infer"]: + runner_class_name = "SingleRunner" + else: + raise ValueError( + "FineTuningTrainer can only support SingleTraining.") + + runner_path = os.path.join(self.abs_dir, "framework", "runner.py") + runner_class = envs.lazy_instance_by_fliename( + runner_path, runner_class_name)(context) + runner_class.run(context) + + def terminal(self, context): + terminal_class_path = envs.get_global_env( + self.runner_env_name + ".terminal_class_path", default_value=None) + if terminal_class_path: + terminal_class = envs.lazy_instance_by_fliename( + terminal_class_path, "Terminal")(context) + terminal_class.terminal(context) + else: + terminal_class_name = "TerminalBase" + if self.engine != EngineMode.SINGLE and self.fleet_mode != FleetMode.COLLECTIVE: + terminal_class_name = "PSTerminal" + + terminal_path = os.path.join(self.abs_dir, "framework", + "terminal.py") + terminal_class = envs.lazy_instance_by_fliename( + terminal_path, terminal_class_name)(context) + terminal_class.terminal(context) + context['is_exit'] = True diff --git a/core/trainers/framework/network.py b/core/trainers/framework/network.py index d2a6b71e4f74a639095eb404a82c9c1fefaf7fdf..7d7a8273b6a402bd163f653a7beb3900de899ae3 100644 --- a/core/trainers/framework/network.py +++ b/core/trainers/framework/network.py @@ -23,7 +23,7 @@ from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset __all__ = [ "NetworkBase", "SingleNetwork", "PSNetwork", "PslibNetwork", - "CollectiveNetwork" + "CollectiveNetwork", "FineTuningNetwork" ] @@ -109,6 +109,88 @@ class SingleNetwork(NetworkBase): context["status"] = "startup_pass" +class FineTuningNetwork(NetworkBase): + """R + """ + + def __init__(self, context): + print("Running FineTuningNetwork.") + + def build_network(self, context): + context["model"] = {} + for model_dict in context["phases"]: + context["model"][model_dict["name"]] = {} + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + dataset_name = model_dict["dataset_name"] + + with fluid.program_guard(train_program, startup_program): + with fluid.unique_name.guard(): + with fluid.scope_guard(scope): + model_path = envs.os_path_adapter( + envs.workspace_adapter(model_dict["model"])) + model = envs.lazy_instance_by_fliename( + model_path, "Model")(context["env"]) + + model._data_var = model.input_data( + dataset_name=model_dict["dataset_name"]) + + if envs.get_global_env("dataset." + dataset_name + + ".type") == "DataLoader": + model._init_dataloader( + is_infer=context["is_infer"]) + data_loader = DataLoader(context) + data_loader.get_dataloader(context, dataset_name, + model._data_loader) + + model.net(model._data_var, context["is_infer"]) + + finetuning_varnames = envs.get_global_env( + "runner." + context["runner_name"] + + ".finetuning_aspect_varnames", + default_value=[]) + + if len(finetuning_varnames) == 0: + raise ValueError( + "nothing need to be fine tuning, you may use other traning mode" + ) + + if len(finetuning_varnames) != 1: + raise ValueError( + "fine tuning mode can only accept one varname now" + ) + + varname = finetuning_varnames[0] + finetuning_vars = train_program.global_block().vars[ + varname] + finetuning_vars.stop_gradient = True + optimizer = model.optimizer() + optimizer.minimize(model._cost) + + context["model"][model_dict["name"]][ + "main_program"] = train_program + context["model"][model_dict["name"]][ + "startup_program"] = startup_program + context["model"][model_dict["name"]]["scope"] = scope + context["model"][model_dict["name"]]["model"] = model + context["model"][model_dict["name"]][ + "default_main_program"] = train_program.clone() + context["model"][model_dict["name"]]["compiled_program"] = None + + context["dataset"] = {} + for dataset in context["env"]["dataset"]: + type = envs.get_global_env("dataset." + dataset["name"] + ".type") + + if type == "QueueDataset": + dataset_class = QueueDataset(context) + context["dataset"][dataset[ + "name"]] = dataset_class.create_dataset(dataset["name"], + context) + + context["status"] = "startup_pass" + + class PSNetwork(NetworkBase): def __init__(self, context): print("Running PSNetwork.") diff --git a/core/trainers/framework/startup.py b/core/trainers/framework/startup.py index 362592e6de64a4bbfecb6868726b4a733edf4e14..a38dbd5bb3c2cea268fc5551e10e488f2fbdabd6 100644 --- a/core/trainers/framework/startup.py +++ b/core/trainers/framework/startup.py @@ -17,9 +17,13 @@ from __future__ import print_function import warnings import paddle.fluid as fluid +import paddle.fluid.core as core from paddlerec.core.utils import envs -__all__ = ["StartupBase", "SingleStartup", "PSStartup", "CollectiveStartup"] +__all__ = [ + "StartupBase", "SingleStartup", "PSStartup", "CollectiveStartup", + "FineTuningStartup" +] class StartupBase(object): @@ -65,6 +69,122 @@ class SingleStartup(StartupBase): context["status"] = "train_pass" +class FineTuningStartup(StartupBase): + """R + """ + + def __init__(self, context): + self.op_name_scope = "op_namescope" + self.clip_op_name_scope = "@CLIP" + self.self.op_role_var_attr_name = core.op_proto_and_checker_maker.kOpRoleVarAttrName( + ) + + print("Running SingleStartup.") + + def _is_opt_role_op(self, op): + # NOTE: depend on oprole to find out whether this op is for + # optimize + op_maker = core.op_proto_and_checker_maker + optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize + if op_maker.kOpRoleAttrName() in op.attr_names and \ + int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role): + return True + return False + + def _get_params_grads(self, program): + """ + Get optimizer operators, parameters and gradients from origin_program + Returns: + opt_ops (list): optimize operators. + params_grads (dict): parameter->gradient. + """ + block = program.global_block() + params_grads = [] + # tmp set to dedup + optimize_params = set() + origin_var_dict = program.global_block().vars + for op in block.ops: + if self._is_opt_role_op(op): + # Todo(chengmo): Whether clip related op belongs to Optimize guard should be discussed + # delete clip op from opt_ops when run in Parameter Server mode + if self.op_name_scope in op.all_attrs( + ) and self.clip_op_name_scope in op.attr(self.op_name_scope): + op._set_attr( + "op_role", + int(core.op_proto_and_checker_maker.OpRole.Backward)) + continue + + if op.attr(self.op_role_var_attr_name): + param_name = op.attr(self.op_role_var_attr_name)[0] + grad_name = op.attr(self.op_role_var_attr_name)[1] + if not param_name in optimize_params: + optimize_params.add(param_name) + params_grads.append([ + origin_var_dict[param_name], + origin_var_dict[grad_name] + ]) + return params_grads + + @staticmethod + def is_persistable(var): + """ + Check whether the given variable is persistable. + + Args: + var(Variable): The variable to be checked. + + Returns: + bool: True if the given `var` is persistable + False if not. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + param = fluid.default_main_program().global_block().var('fc.b') + res = fluid.io.is_persistable(param) + """ + if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ + var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ + var.desc.type() == core.VarDesc.VarType.READER: + return False + return var.persistable + + def load(self, context, is_fleet=False, main_program=None): + dirname = envs.get_global_env( + "runner." + context["runner_name"] + ".init_model_path", None) + if dirname is None or dirname == "": + return + print("going to load ", dirname) + + params_grads = self._get_params_grads(main_program) + update_params = [p for p, _ in params_grads] + need_load_vars = [] + parameters = list( + filter(FineTuningStartup.is_persistable, main_program.list_vars())) + + for param in parameters: + if param not in update_params: + need_load_vars.append(param) + + fluid.io.load_vars(context["exe"], dirname, main_program, + need_load_vars) + print("load from {} success".format(dirname)) + + def startup(self, context): + for model_dict in context["phases"]: + with fluid.scope_guard(context["model"][model_dict["name"]][ + "scope"]): + train_prog = context["model"][model_dict["name"]][ + "main_program"] + startup_prog = context["model"][model_dict["name"]][ + "startup_program"] + with fluid.program_guard(train_prog, startup_prog): + context["exe"].run(startup_prog) + self.load(context, main_program=train_prog) + context["status"] = "train_pass" + + class PSStartup(StartupBase): def __init__(self, context): print("Running PSStartup.") diff --git a/doc/pre_train_model.md b/doc/pre_train_model.md new file mode 100644 index 0000000000000000000000000000000000000000..134710a430992cc756cd37fcc1e01ee3aef2dfb1 --- /dev/null +++ b/doc/pre_train_model.md @@ -0,0 +1,15 @@ +# PaddleRec 预训练模型 + +PaddleRec基于业务实践,使用真实数据,产出了推荐领域算法的若干预训练模型,方便开发者进行算法调研。 + +## 文本分类预训练模型 + +### 获取地址 + +```bash +wget xxx.tar.gz +``` + +### 使用方法 + +解压后,得到的是一个paddle的模型文件夹,使用`PaddleRec/models/contentunderstanding/classification_finetue`模型进行加载 diff --git a/models/rank/dnn/config.yaml b/models/rank/dnn/config.yaml index e984501548529bcd0e65bbf6e53554a3b340aa47..aa84a5070470cba750f7832644a9ce676c1d4ddd 100755 --- a/models/rank/dnn/config.yaml +++ b/models/rank/dnn/config.yaml @@ -67,7 +67,6 @@ runner: save_inference_path: "inference" # save inference path save_inference_feed_varnames: [] # feed vars of save inference save_inference_fetch_varnames: [] # fetch vars of save inference - init_model_path: "" # load model path print_interval: 10 phases: [phase1] diff --git a/run.py b/run.py index 6340adfc1c6026d7c67f5576ba8d0230055ec19d..c916ecd0ab3b0efe71ef86a4bf1d7f357aa9d563 100755 --- a/run.py +++ b/run.py @@ -16,7 +16,6 @@ import os import subprocess import sys import argparse -import tempfile import warnings import copy @@ -39,6 +38,7 @@ def engine_registry(): engines["TRANSPILER"]["INFER"] = single_infer_engine engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine engines["TRANSPILER"]["CLUSTER_TRAIN"] = cluster_engine + engines["TRANSPILER"]["ONLINE_LEARNING"] = online_learning engines["PSLIB"]["TRAIN"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine @@ -259,6 +259,20 @@ def single_infer_engine(args): return trainer +def online_learning(args): + trainer = "OnlineLearningTrainer" + single_envs = {} + single_envs["train.trainer.trainer"] = trainer + single_envs["train.trainer.threads"] = "2" + single_envs["train.trainer.engine"] = "online_learning" + single_envs["train.trainer.platform"] = envs.get_platform() + print("use {} engine to run model: {}".format(trainer, args.model)) + + set_runtime_envs(single_envs, args.model) + trainer = TrainerFactory.create(args.model) + return trainer + + def cluster_engine(args): def master(): from paddlerec.core.engine.cluster.cluster import ClusterEngine