未验证 提交 8d442864 编写于 作者: W wuzhihua 提交者: GitHub

Merge branch 'master' into vslyu-fixhidefiles

...@@ -147,6 +147,7 @@ python -m paddlerec.run -m models/rank/dnn/config.yaml ...@@ -147,6 +147,7 @@ python -m paddlerec.run -m models/rank/dnn/config.yaml
* [启动分布式训练](doc/distributed_train.md) * [启动分布式训练](doc/distributed_train.md)
* [启动预测](doc/predict.md) * [启动预测](doc/predict.md)
* [快速部署](doc/serving.md) * [快速部署](doc/serving.md)
* [预训练模型](doc/pre_train_model.md)
### 进阶教程 ### 进阶教程
......
...@@ -22,6 +22,19 @@ trainers = {} ...@@ -22,6 +22,19 @@ trainers = {}
def trainer_registry(): def trainer_registry():
trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(trainer_abs,
"cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs,
"ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(trainer_abs,
"ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(trainer_abs,
"tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(trainer_abs,
"tdm_cluster_trainer.py")
trainers["OnlineLearningTrainer"] = os.path.join(
trainer_abs, "online_learning_trainer.py")
# Definition of procedure execution process # Definition of procedure execution process
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, trainers["CtrCodingTrainer"] = os.path.join(trainer_abs,
"ctr_coding_trainer.py") "ctr_coding_trainer.py")
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + PS/COLLECTIVE
"""
from __future__ import print_function
import os
from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer, EngineMode, FleetMode
class FineTuningTrainer(Trainer):
"""
Trainer for various situations
"""
def __init__(self, config=None):
Trainer.__init__(self, config)
self.processor_register()
self.abs_dir = os.path.dirname(os.path.abspath(__file__))
self.runner_env_name = "runner." + self._context["runner_name"]
def processor_register(self):
print("processor_register begin")
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('network_pass', self.network)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.runner)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
instance_class_path = envs.get_global_env(
self.runner_env_name + ".instance_class_path", default_value=None)
if instance_class_path:
instance_class = envs.lazy_instance_by_fliename(
instance_class_path, "Instance")(context)
else:
if self.engine == EngineMode.SINGLE:
instance_class_name = "SingleInstance"
else:
raise ValueError(
"FineTuningTrainer can only support SingleTraining.")
instance_path = os.path.join(self.abs_dir, "framework",
"instance.py")
instance_class = envs.lazy_instance_by_fliename(
instance_path, instance_class_name)(context)
instance_class.instance(context)
def network(self, context):
network_class_path = envs.get_global_env(
self.runner_env_name + ".network_class_path", default_value=None)
if network_class_path:
network_class = envs.lazy_instance_by_fliename(network_class_path,
"Network")(context)
else:
if self.engine == EngineMode.SINGLE:
network_class_name = "FineTuningNetwork"
else:
raise ValueError(
"FineTuningTrainer can only support SingleTraining.")
network_path = os.path.join(self.abs_dir, "framework",
"network.py")
network_class = envs.lazy_instance_by_fliename(
network_path, network_class_name)(context)
network_class.build_network(context)
def startup(self, context):
startup_class_path = envs.get_global_env(
self.runner_env_name + ".startup_class_path", default_value=None)
if startup_class_path:
startup_class = envs.lazy_instance_by_fliename(startup_class_path,
"Startup")(context)
else:
if self.engine == EngineMode.SINGLE and not context["is_infer"]:
startup_class_name = "FineTuningStartup"
else:
raise ValueError(
"FineTuningTrainer can only support SingleTraining.")
startup_path = os.path.join(self.abs_dir, "framework",
"startup.py")
startup_class = envs.lazy_instance_by_fliename(
startup_path, startup_class_name)(context)
startup_class.startup(context)
def runner(self, context):
runner_class_path = envs.get_global_env(
self.runner_env_name + ".runner_class_path", default_value=None)
if runner_class_path:
runner_class = envs.lazy_instance_by_fliename(runner_class_path,
"Runner")(context)
else:
if self.engine == EngineMode.SINGLE and not context["is_infer"]:
runner_class_name = "SingleRunner"
else:
raise ValueError(
"FineTuningTrainer can only support SingleTraining.")
runner_path = os.path.join(self.abs_dir, "framework", "runner.py")
runner_class = envs.lazy_instance_by_fliename(
runner_path, runner_class_name)(context)
runner_class.run(context)
def terminal(self, context):
terminal_class_path = envs.get_global_env(
self.runner_env_name + ".terminal_class_path", default_value=None)
if terminal_class_path:
terminal_class = envs.lazy_instance_by_fliename(
terminal_class_path, "Terminal")(context)
terminal_class.terminal(context)
else:
terminal_class_name = "TerminalBase"
if self.engine != EngineMode.SINGLE and self.fleet_mode != FleetMode.COLLECTIVE:
terminal_class_name = "PSTerminal"
terminal_path = os.path.join(self.abs_dir, "framework",
"terminal.py")
terminal_class = envs.lazy_instance_by_fliename(
terminal_path, terminal_class_name)(context)
terminal_class.terminal(context)
context['is_exit'] = True
...@@ -23,7 +23,7 @@ from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset ...@@ -23,7 +23,7 @@ from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset
__all__ = [ __all__ = [
"NetworkBase", "SingleNetwork", "PSNetwork", "PslibNetwork", "NetworkBase", "SingleNetwork", "PSNetwork", "PslibNetwork",
"CollectiveNetwork" "CollectiveNetwork", "FineTuningNetwork"
] ]
...@@ -109,6 +109,88 @@ class SingleNetwork(NetworkBase): ...@@ -109,6 +109,88 @@ class SingleNetwork(NetworkBase):
context["status"] = "startup_pass" context["status"] = "startup_pass"
class FineTuningNetwork(NetworkBase):
"""R
"""
def __init__(self, context):
print("Running FineTuningNetwork.")
def build_network(self, context):
context["model"] = {}
for model_dict in context["phases"]:
context["model"][model_dict["name"]] = {}
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(
is_infer=context["is_infer"])
data_loader = DataLoader(context)
data_loader.get_dataloader(context, dataset_name,
model._data_loader)
model.net(model._data_var, context["is_infer"])
finetuning_varnames = envs.get_global_env(
"runner." + context["runner_name"] +
".finetuning_aspect_varnames",
default_value=[])
if len(finetuning_varnames) == 0:
raise ValueError(
"nothing need to be fine tuning, you may use other traning mode"
)
if len(finetuning_varnames) != 1:
raise ValueError(
"fine tuning mode can only accept one varname now"
)
varname = finetuning_varnames[0]
finetuning_vars = train_program.global_block().vars[
varname]
finetuning_vars.stop_gradient = True
optimizer = model.optimizer()
optimizer.minimize(model._cost)
context["model"][model_dict["name"]][
"main_program"] = train_program
context["model"][model_dict["name"]][
"startup_program"] = startup_program
context["model"][model_dict["name"]]["scope"] = scope
context["model"][model_dict["name"]]["model"] = model
context["model"][model_dict["name"]][
"default_main_program"] = train_program.clone()
context["model"][model_dict["name"]]["compiled_program"] = None
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
type = envs.get_global_env("dataset." + dataset["name"] + ".type")
if type == "QueueDataset":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
context)
context["status"] = "startup_pass"
class PSNetwork(NetworkBase): class PSNetwork(NetworkBase):
def __init__(self, context): def __init__(self, context):
print("Running PSNetwork.") print("Running PSNetwork.")
......
...@@ -17,9 +17,13 @@ from __future__ import print_function ...@@ -17,9 +17,13 @@ from __future__ import print_function
import warnings import warnings
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
__all__ = ["StartupBase", "SingleStartup", "PSStartup", "CollectiveStartup"] __all__ = [
"StartupBase", "SingleStartup", "PSStartup", "CollectiveStartup",
"FineTuningStartup"
]
class StartupBase(object): class StartupBase(object):
...@@ -65,6 +69,122 @@ class SingleStartup(StartupBase): ...@@ -65,6 +69,122 @@ class SingleStartup(StartupBase):
context["status"] = "train_pass" context["status"] = "train_pass"
class FineTuningStartup(StartupBase):
"""R
"""
def __init__(self, context):
self.op_name_scope = "op_namescope"
self.clip_op_name_scope = "@CLIP"
self.self.op_role_var_attr_name = core.op_proto_and_checker_maker.kOpRoleVarAttrName(
)
print("Running SingleStartup.")
def _is_opt_role_op(self, op):
# NOTE: depend on oprole to find out whether this op is for
# optimize
op_maker = core.op_proto_and_checker_maker
optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize
if op_maker.kOpRoleAttrName() in op.attr_names and \
int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role):
return True
return False
def _get_params_grads(self, program):
"""
Get optimizer operators, parameters and gradients from origin_program
Returns:
opt_ops (list): optimize operators.
params_grads (dict): parameter->gradient.
"""
block = program.global_block()
params_grads = []
# tmp set to dedup
optimize_params = set()
origin_var_dict = program.global_block().vars
for op in block.ops:
if self._is_opt_role_op(op):
# Todo(chengmo): Whether clip related op belongs to Optimize guard should be discussed
# delete clip op from opt_ops when run in Parameter Server mode
if self.op_name_scope in op.all_attrs(
) and self.clip_op_name_scope in op.attr(self.op_name_scope):
op._set_attr(
"op_role",
int(core.op_proto_and_checker_maker.OpRole.Backward))
continue
if op.attr(self.op_role_var_attr_name):
param_name = op.attr(self.op_role_var_attr_name)[0]
grad_name = op.attr(self.op_role_var_attr_name)[1]
if not param_name in optimize_params:
optimize_params.add(param_name)
params_grads.append([
origin_var_dict[param_name],
origin_var_dict[grad_name]
])
return params_grads
@staticmethod
def is_persistable(var):
"""
Check whether the given variable is persistable.
Args:
var(Variable): The variable to be checked.
Returns:
bool: True if the given `var` is persistable
False if not.
Examples:
.. code-block:: python
import paddle.fluid as fluid
param = fluid.default_main_program().global_block().var('fc.b')
res = fluid.io.is_persistable(param)
"""
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var.desc.type() == core.VarDesc.VarType.READER:
return False
return var.persistable
def load(self, context, is_fleet=False, main_program=None):
dirname = envs.get_global_env(
"runner." + context["runner_name"] + ".init_model_path", None)
if dirname is None or dirname == "":
return
print("going to load ", dirname)
params_grads = self._get_params_grads(main_program)
update_params = [p for p, _ in params_grads]
need_load_vars = []
parameters = list(
filter(FineTuningStartup.is_persistable, main_program.list_vars()))
for param in parameters:
if param not in update_params:
need_load_vars.append(param)
fluid.io.load_vars(context["exe"], dirname, main_program,
need_load_vars)
print("load from {} success".format(dirname))
def startup(self, context):
for model_dict in context["phases"]:
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
"main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
context["exe"].run(startup_prog)
self.load(context, main_program=train_prog)
context["status"] = "train_pass"
class PSStartup(StartupBase): class PSStartup(StartupBase):
def __init__(self, context): def __init__(self, context):
print("Running PSStartup.") print("Running PSStartup.")
......
# PaddleRec 预训练模型
PaddleRec基于业务实践,使用真实数据,产出了推荐领域算法的若干预训练模型,方便开发者进行算法调研。
## 文本分类预训练模型
### 获取地址
```bash
wget xxx.tar.gz
```
### 使用方法
解压后,得到的是一个paddle的模型文件夹,使用`PaddleRec/models/contentunderstanding/classification_finetue`模型进行加载
...@@ -67,7 +67,6 @@ runner: ...@@ -67,7 +67,6 @@ runner:
save_inference_path: "inference" # save inference path save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10 print_interval: 10
phases: [phase1] phases: [phase1]
......
...@@ -16,7 +16,6 @@ import os ...@@ -16,7 +16,6 @@ import os
import subprocess import subprocess
import sys import sys
import argparse import argparse
import tempfile
import warnings import warnings
import copy import copy
...@@ -39,6 +38,7 @@ def engine_registry(): ...@@ -39,6 +38,7 @@ def engine_registry():
engines["TRANSPILER"]["INFER"] = single_infer_engine engines["TRANSPILER"]["INFER"] = single_infer_engine
engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine
engines["TRANSPILER"]["CLUSTER_TRAIN"] = cluster_engine engines["TRANSPILER"]["CLUSTER_TRAIN"] = cluster_engine
engines["TRANSPILER"]["ONLINE_LEARNING"] = online_learning
engines["PSLIB"]["TRAIN"] = local_mpi_engine engines["PSLIB"]["TRAIN"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine
engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine
...@@ -259,6 +259,20 @@ def single_infer_engine(args): ...@@ -259,6 +259,20 @@ def single_infer_engine(args):
return trainer return trainer
def online_learning(args):
trainer = "OnlineLearningTrainer"
single_envs = {}
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "online_learning"
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
def cluster_engine(args): def cluster_engine(args):
def master(): def master():
from paddlerec.core.engine.cluster.cluster import ClusterEngine from paddlerec.core.engine.cluster.cluster import ClusterEngine
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册