diff --git a/.travis.yml b/.travis.yml index 8cd015e186af0f53b3827662cba9ff5d108a4a5a..cee9ec6db72f4f84da037faafae6dc15db6a23cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,15 +16,20 @@ before_install: # For pylint dockstring checker - sudo apt-get update - sudo apt-get install -y python-pip libpython-dev + - sudo apt-get remove python-urllib3 + - sudo apt-get purge python-urllib3 + - sudo rm /usr/lib/python2.7/dist-packages/chardet-* - sudo pip install -U pip + - sudo pip install --upgrade setuptools - sudo pip install six --upgrade --ignore-installed six - - sudo pip install pillow - sudo pip install PyYAML - sudo pip install pylint pytest astroid isort pre-commit - sudo pip install kiwisolver - - sudo pip install paddlepaddle==1.7.2 --ignore-installed urllib3 - - sudo pip uninstall -y rarfile + - sudo pip install scikit-build + - sudo pip install Pillow==5.3.0 + - sudo pip install opencv-python==3.4.3.18 - sudo pip install rarfile==3.0 + - sudo pip install paddlepaddle==1.7.2 - sudo python setup.py install - | function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; } diff --git a/README.md b/README.md index 5d4d26484f2cc3fe72a23a0980efc56cda22647f..84c53d2a06ee7b52ae7c89187fb0316730390f01 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,10 @@ ```bash # 使用CPU进行单机训练 -python -m paddlerec.run -m paddlerec.models.rank.dnn +git clone https://github.com/PaddlePaddle/PaddleRec.git paddle-rec +cd paddle-rec + +python -m paddlerec.run -m models/rank/dnn/config.yaml ``` @@ -144,6 +147,7 @@ python -m paddlerec.run -m paddlerec.models.rank.dnn * [启动分布式训练](doc/distributed_train.md) * [启动预测](doc/predict.md) * [快速部署](doc/serving.md) +* [预训练模型](doc/pre_train_model.md) ### 进阶教程 diff --git a/README_EN.md b/README_EN.md index 985d2ea2198e1df799066e7f703d8416c4b7a916..b409c1ad96406c30c8423eb8c693f74a2182088f 100644 --- a/README_EN.md +++ b/README_EN.md @@ -119,7 +119,10 @@ We take the `dnn` algorithm as an example to get start of `PaddleRec`, and we ta ```bash # Training with cpu -python -m paddlerec.run -m paddlerec.models.rank.dnn +git clone https://github.com/PaddlePaddle/PaddleRec.git paddle-rec +cd paddle-rec + +python -m paddlerec.run -m models/rank/dnn/config.yaml ``` diff --git a/core/engine/cluster/cloud/k8s_config.ini.template b/core/engine/cluster/cloud/k8s_config.ini.template index 8979cc6f0d996c132fbc2259a01134ba4a8a1ee5..471bd1a0dd2931591b0d6eda7f87cc25458b3f80 100644 --- a/core/engine/cluster/cloud/k8s_config.ini.template +++ b/core/engine/cluster/cloud/k8s_config.ini.template @@ -19,6 +19,7 @@ afs_local_mount_point="/root/paddlejob/workspace/env_run/afs/" # 新k8s afs挂载帮助文档: http://wiki.baidu.com/pages/viewpage.action?pageId=906443193 PADDLE_PADDLEREC_ROLE=WORKER +PADDLEREC_CLUSTER_TYPE=K8S use_python3=<$ USE_PYTHON3 $> CPU_NUM=<$ CPU_NUM $> GLOG_v=0 diff --git a/core/engine/cluster/cloud/mpi_config.ini.template b/core/engine/cluster/cloud/mpi_config.ini.template index 7d9f7fbb97a53c23e566e925a87eae990cef9f2a..a3ac22f0c7fc09e9b6eda44306972dd296d19ab7 100644 --- a/core/engine/cluster/cloud/mpi_config.ini.template +++ b/core/engine/cluster/cloud/mpi_config.ini.template @@ -17,6 +17,7 @@ output_path=<$ OUTPUT_PATH $> thirdparty_path=<$ THIRDPARTY_PATH $> PADDLE_PADDLEREC_ROLE=WORKER +PADDLEREC_CLUSTER_TYPE=MPI use_python3=<$ USE_PYTHON3 $> CPU_NUM=<$ CPU_NUM $> GLOG_v=0 diff --git a/core/factory.py b/core/factory.py index 9430c88283800e69db7043aa141b6f735212c79f..95e0e7778141ad76d1166205213bccdaae67aff7 100755 --- a/core/factory.py +++ b/core/factory.py @@ -22,6 +22,19 @@ trainers = {} def trainer_registry(): + trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py") + trainers["ClusterTrainer"] = os.path.join(trainer_abs, + "cluster_trainer.py") + trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, + "ctr_coding_trainer.py") + trainers["CtrModulTrainer"] = os.path.join(trainer_abs, + "ctr_modul_trainer.py") + trainers["TDMSingleTrainer"] = os.path.join(trainer_abs, + "tdm_single_trainer.py") + trainers["TDMClusterTrainer"] = os.path.join(trainer_abs, + "tdm_cluster_trainer.py") + trainers["OnlineLearningTrainer"] = os.path.join( + trainer_abs, "online_learning_trainer.py") # Definition of procedure execution process trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, "ctr_coding_trainer.py") diff --git a/core/trainer.py b/core/trainer.py index 8b1afd449a70265d5bcae9996d42795a1235197a..bbba6250529283d24389e2719b7110f8aa321973 100755 --- a/core/trainer.py +++ b/core/trainer.py @@ -107,6 +107,7 @@ class Trainer(object): self.device = Device.GPU gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0)) self._place = fluid.CUDAPlace(gpu_id) + print("PaddleRec run on device GPU: {}".format(gpu_id)) self._exe = fluid.Executor(self._place) elif device == "CPU": self.device = Device.CPU @@ -146,6 +147,7 @@ class Trainer(object): elif engine.upper() == "CLUSTER": self.engine = EngineMode.CLUSTER self.is_fleet = True + self.which_cluster_type() else: raise ValueError("Not Support Engine {}".format(engine)) self._context["is_fleet"] = self.is_fleet @@ -165,6 +167,14 @@ class Trainer(object): self._context["is_pslib"] = (fleet_mode.upper() == "PSLIB") self._context["fleet_mode"] = fleet_mode + def which_cluster_type(self): + cluster_type = os.getenv("PADDLEREC_CLUSTER_TYPE", "MPI") + print("PADDLEREC_CLUSTER_TYPE: {}".format(cluster_type)) + if cluster_type and cluster_type.upper() == "K8S": + self._context["cluster_type"] = "K8S" + else: + self._context["cluster_type"] = "MPI" + def which_executor_mode(self): executor_mode = envs.get_runtime_environ("train.trainer.executor_mode") if executor_mode.upper() not in ["TRAIN", "INFER"]: diff --git a/core/trainers/finetuning_trainer.py b/core/trainers/finetuning_trainer.py new file mode 100644 index 0000000000000000000000000000000000000000..4525a18867ff232121256c876c185c502427c130 --- /dev/null +++ b/core/trainers/finetuning_trainer.py @@ -0,0 +1,140 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + PS/COLLECTIVE +""" +from __future__ import print_function + +import os + +from paddlerec.core.utils import envs +from paddlerec.core.trainer import Trainer, EngineMode, FleetMode + + +class FineTuningTrainer(Trainer): + """ + Trainer for various situations + """ + + def __init__(self, config=None): + Trainer.__init__(self, config) + self.processor_register() + self.abs_dir = os.path.dirname(os.path.abspath(__file__)) + self.runner_env_name = "runner." + self._context["runner_name"] + + def processor_register(self): + print("processor_register begin") + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('network_pass', self.network) + self.regist_context_processor('startup_pass', self.startup) + self.regist_context_processor('train_pass', self.runner) + self.regist_context_processor('terminal_pass', self.terminal) + + def instance(self, context): + instance_class_path = envs.get_global_env( + self.runner_env_name + ".instance_class_path", default_value=None) + if instance_class_path: + instance_class = envs.lazy_instance_by_fliename( + instance_class_path, "Instance")(context) + else: + if self.engine == EngineMode.SINGLE: + instance_class_name = "SingleInstance" + else: + raise ValueError( + "FineTuningTrainer can only support SingleTraining.") + + instance_path = os.path.join(self.abs_dir, "framework", + "instance.py") + + instance_class = envs.lazy_instance_by_fliename( + instance_path, instance_class_name)(context) + + instance_class.instance(context) + + def network(self, context): + network_class_path = envs.get_global_env( + self.runner_env_name + ".network_class_path", default_value=None) + if network_class_path: + network_class = envs.lazy_instance_by_fliename(network_class_path, + "Network")(context) + else: + if self.engine == EngineMode.SINGLE: + network_class_name = "FineTuningNetwork" + else: + raise ValueError( + "FineTuningTrainer can only support SingleTraining.") + + network_path = os.path.join(self.abs_dir, "framework", + "network.py") + network_class = envs.lazy_instance_by_fliename( + network_path, network_class_name)(context) + + network_class.build_network(context) + + def startup(self, context): + startup_class_path = envs.get_global_env( + self.runner_env_name + ".startup_class_path", default_value=None) + if startup_class_path: + startup_class = envs.lazy_instance_by_fliename(startup_class_path, + "Startup")(context) + else: + if self.engine == EngineMode.SINGLE and not context["is_infer"]: + startup_class_name = "FineTuningStartup" + else: + raise ValueError( + "FineTuningTrainer can only support SingleTraining.") + + startup_path = os.path.join(self.abs_dir, "framework", + "startup.py") + + startup_class = envs.lazy_instance_by_fliename( + startup_path, startup_class_name)(context) + startup_class.startup(context) + + def runner(self, context): + runner_class_path = envs.get_global_env( + self.runner_env_name + ".runner_class_path", default_value=None) + if runner_class_path: + runner_class = envs.lazy_instance_by_fliename(runner_class_path, + "Runner")(context) + else: + if self.engine == EngineMode.SINGLE and not context["is_infer"]: + runner_class_name = "SingleRunner" + else: + raise ValueError( + "FineTuningTrainer can only support SingleTraining.") + + runner_path = os.path.join(self.abs_dir, "framework", "runner.py") + runner_class = envs.lazy_instance_by_fliename( + runner_path, runner_class_name)(context) + runner_class.run(context) + + def terminal(self, context): + terminal_class_path = envs.get_global_env( + self.runner_env_name + ".terminal_class_path", default_value=None) + if terminal_class_path: + terminal_class = envs.lazy_instance_by_fliename( + terminal_class_path, "Terminal")(context) + terminal_class.terminal(context) + else: + terminal_class_name = "TerminalBase" + if self.engine != EngineMode.SINGLE and self.fleet_mode != FleetMode.COLLECTIVE: + terminal_class_name = "PSTerminal" + + terminal_path = os.path.join(self.abs_dir, "framework", + "terminal.py") + terminal_class = envs.lazy_instance_by_fliename( + terminal_path, terminal_class_name)(context) + terminal_class.terminal(context) + context['is_exit'] = True diff --git a/core/trainers/framework/dataset.py b/core/trainers/framework/dataset.py index d7f8016c2020074ebbe42d6178c69a226e16e2ca..239b568be34793c5ddb0830e9cca06951da143f4 100644 --- a/core/trainers/framework/dataset.py +++ b/core/trainers/framework/dataset.py @@ -21,7 +21,7 @@ from paddlerec.core.utils import envs from paddlerec.core.utils import dataloader_instance from paddlerec.core.reader import SlotReader from paddlerec.core.trainer import EngineMode -from paddlerec.core.utils.util import split_files +from paddlerec.core.utils.util import split_files, check_filelist __all__ = ["DatasetBase", "DataLoader", "QueueDataset"] @@ -121,14 +121,30 @@ class QueueDataset(DatasetBase): dataset.set_pipe_command(pipe_cmd) train_data_path = envs.get_global_env(name + "data_path") - file_list = [ - os.path.join(train_data_path, x) - for x in os.listdir(train_data_path) - ] + hidden_file_list, file_list = check_filelist( + hidden_file_list=[], + data_file_list=[], + train_data_path=train_data_path) + if (hidden_file_list is not None): + print( + "Warning:please make sure there are no hidden files in the dataset folder and check these hidden files:{}". + format(hidden_file_list)) + + file_list.sort() + need_split_files = False if context["engine"] == EngineMode.LOCAL_CLUSTER: + # for local cluster: split files for multi process + need_split_files = True + elif context["engine"] == EngineMode.CLUSTER and context[ + "cluster_type"] == "K8S": + # for k8s mount afs, split files for every node + need_split_files = True + + if need_split_files: file_list = split_files(file_list, context["fleet"].worker_index(), context["fleet"].worker_num()) print("File_list: {}".format(file_list)) + dataset.set_filelist(file_list) for model_dict in context["phases"]: if model_dict["dataset_name"] == dataset_name: diff --git a/core/trainers/framework/network.py b/core/trainers/framework/network.py index d2a6b71e4f74a639095eb404a82c9c1fefaf7fdf..7d7a8273b6a402bd163f653a7beb3900de899ae3 100644 --- a/core/trainers/framework/network.py +++ b/core/trainers/framework/network.py @@ -23,7 +23,7 @@ from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset __all__ = [ "NetworkBase", "SingleNetwork", "PSNetwork", "PslibNetwork", - "CollectiveNetwork" + "CollectiveNetwork", "FineTuningNetwork" ] @@ -109,6 +109,88 @@ class SingleNetwork(NetworkBase): context["status"] = "startup_pass" +class FineTuningNetwork(NetworkBase): + """R + """ + + def __init__(self, context): + print("Running FineTuningNetwork.") + + def build_network(self, context): + context["model"] = {} + for model_dict in context["phases"]: + context["model"][model_dict["name"]] = {} + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + dataset_name = model_dict["dataset_name"] + + with fluid.program_guard(train_program, startup_program): + with fluid.unique_name.guard(): + with fluid.scope_guard(scope): + model_path = envs.os_path_adapter( + envs.workspace_adapter(model_dict["model"])) + model = envs.lazy_instance_by_fliename( + model_path, "Model")(context["env"]) + + model._data_var = model.input_data( + dataset_name=model_dict["dataset_name"]) + + if envs.get_global_env("dataset." + dataset_name + + ".type") == "DataLoader": + model._init_dataloader( + is_infer=context["is_infer"]) + data_loader = DataLoader(context) + data_loader.get_dataloader(context, dataset_name, + model._data_loader) + + model.net(model._data_var, context["is_infer"]) + + finetuning_varnames = envs.get_global_env( + "runner." + context["runner_name"] + + ".finetuning_aspect_varnames", + default_value=[]) + + if len(finetuning_varnames) == 0: + raise ValueError( + "nothing need to be fine tuning, you may use other traning mode" + ) + + if len(finetuning_varnames) != 1: + raise ValueError( + "fine tuning mode can only accept one varname now" + ) + + varname = finetuning_varnames[0] + finetuning_vars = train_program.global_block().vars[ + varname] + finetuning_vars.stop_gradient = True + optimizer = model.optimizer() + optimizer.minimize(model._cost) + + context["model"][model_dict["name"]][ + "main_program"] = train_program + context["model"][model_dict["name"]][ + "startup_program"] = startup_program + context["model"][model_dict["name"]]["scope"] = scope + context["model"][model_dict["name"]]["model"] = model + context["model"][model_dict["name"]][ + "default_main_program"] = train_program.clone() + context["model"][model_dict["name"]]["compiled_program"] = None + + context["dataset"] = {} + for dataset in context["env"]["dataset"]: + type = envs.get_global_env("dataset." + dataset["name"] + ".type") + + if type == "QueueDataset": + dataset_class = QueueDataset(context) + context["dataset"][dataset[ + "name"]] = dataset_class.create_dataset(dataset["name"], + context) + + context["status"] = "startup_pass" + + class PSNetwork(NetworkBase): def __init__(self, context): print("Running PSNetwork.") diff --git a/core/trainers/framework/runner.py b/core/trainers/framework/runner.py index 277b2ac7bbd1ad42f40f3501f6497fc471b762f6..79d7be66e58d0c4244980cf4bf871f42984d186e 100644 --- a/core/trainers/framework/runner.py +++ b/core/trainers/framework/runner.py @@ -16,6 +16,7 @@ from __future__ import print_function import os import time +import warnings import numpy as np import paddle.fluid as fluid @@ -284,6 +285,7 @@ class RunnerBase(object): return (epoch_id + 1) % epoch_interval == 0 def save_inference_model(): + # get global env name = "runner." + context["runner_name"] + "." save_interval = int( envs.get_global_env(name + "save_inference_interval", -1)) @@ -296,18 +298,44 @@ class RunnerBase(object): if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \ len(feed_varnames) == 0 or len(fetch_varnames) == 0: return - fetch_vars = [ - fluid.default_main_program().global_block().vars[varname] - for varname in fetch_varnames - ] + + # check feed var exist + for var_name in feed_varnames: + if var_name not in fluid.default_main_program().global_block( + ).vars: + raise ValueError( + "Feed variable: {} not in default_main_program, global block has follow vars: {}". + format(var_name, + fluid.default_main_program().global_block() + .vars.keys())) + + # check fetch var exist + fetch_vars = [] + for var_name in fetch_varnames: + if var_name not in fluid.default_main_program().global_block( + ).vars: + raise ValueError( + "Fetch variable: {} not in default_main_program, global block has follow vars: {}". + format(var_name, + fluid.default_main_program().global_block() + .vars.keys())) + else: + fetch_vars.append(fluid.default_main_program() + .global_block().vars[var_name]) + dirname = envs.get_global_env(name + "save_inference_path", None) assert dirname is not None dirname = os.path.join(dirname, str(epoch_id)) if is_fleet: - context["fleet"].save_inference_model( - context["exe"], dirname, feed_varnames, fetch_vars) + warnings.warn( + "Save inference model in cluster training is not recommended! Using save checkpoint instead.", + category=UserWarning, + stacklevel=2) + if context["fleet"].worker_index() == 0: + context["fleet"].save_inference_model( + context["exe"], dirname, feed_varnames, fetch_vars) else: fluid.io.save_inference_model(dirname, feed_varnames, fetch_vars, context["exe"]) @@ -323,7 +351,8 @@ class RunnerBase(object): return dirname = os.path.join(dirname, str(epoch_id)) if is_fleet: - context["fleet"].save_persistables(context["exe"], dirname) + if context["fleet"].worker_index() == 0: + context["fleet"].save_persistables(context["exe"], dirname) else: fluid.io.save_persistables(context["exe"], dirname) diff --git a/core/trainers/framework/startup.py b/core/trainers/framework/startup.py index 362592e6de64a4bbfecb6868726b4a733edf4e14..a38dbd5bb3c2cea268fc5551e10e488f2fbdabd6 100644 --- a/core/trainers/framework/startup.py +++ b/core/trainers/framework/startup.py @@ -17,9 +17,13 @@ from __future__ import print_function import warnings import paddle.fluid as fluid +import paddle.fluid.core as core from paddlerec.core.utils import envs -__all__ = ["StartupBase", "SingleStartup", "PSStartup", "CollectiveStartup"] +__all__ = [ + "StartupBase", "SingleStartup", "PSStartup", "CollectiveStartup", + "FineTuningStartup" +] class StartupBase(object): @@ -65,6 +69,122 @@ class SingleStartup(StartupBase): context["status"] = "train_pass" +class FineTuningStartup(StartupBase): + """R + """ + + def __init__(self, context): + self.op_name_scope = "op_namescope" + self.clip_op_name_scope = "@CLIP" + self.self.op_role_var_attr_name = core.op_proto_and_checker_maker.kOpRoleVarAttrName( + ) + + print("Running SingleStartup.") + + def _is_opt_role_op(self, op): + # NOTE: depend on oprole to find out whether this op is for + # optimize + op_maker = core.op_proto_and_checker_maker + optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize + if op_maker.kOpRoleAttrName() in op.attr_names and \ + int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role): + return True + return False + + def _get_params_grads(self, program): + """ + Get optimizer operators, parameters and gradients from origin_program + Returns: + opt_ops (list): optimize operators. + params_grads (dict): parameter->gradient. + """ + block = program.global_block() + params_grads = [] + # tmp set to dedup + optimize_params = set() + origin_var_dict = program.global_block().vars + for op in block.ops: + if self._is_opt_role_op(op): + # Todo(chengmo): Whether clip related op belongs to Optimize guard should be discussed + # delete clip op from opt_ops when run in Parameter Server mode + if self.op_name_scope in op.all_attrs( + ) and self.clip_op_name_scope in op.attr(self.op_name_scope): + op._set_attr( + "op_role", + int(core.op_proto_and_checker_maker.OpRole.Backward)) + continue + + if op.attr(self.op_role_var_attr_name): + param_name = op.attr(self.op_role_var_attr_name)[0] + grad_name = op.attr(self.op_role_var_attr_name)[1] + if not param_name in optimize_params: + optimize_params.add(param_name) + params_grads.append([ + origin_var_dict[param_name], + origin_var_dict[grad_name] + ]) + return params_grads + + @staticmethod + def is_persistable(var): + """ + Check whether the given variable is persistable. + + Args: + var(Variable): The variable to be checked. + + Returns: + bool: True if the given `var` is persistable + False if not. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + param = fluid.default_main_program().global_block().var('fc.b') + res = fluid.io.is_persistable(param) + """ + if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ + var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ + var.desc.type() == core.VarDesc.VarType.READER: + return False + return var.persistable + + def load(self, context, is_fleet=False, main_program=None): + dirname = envs.get_global_env( + "runner." + context["runner_name"] + ".init_model_path", None) + if dirname is None or dirname == "": + return + print("going to load ", dirname) + + params_grads = self._get_params_grads(main_program) + update_params = [p for p, _ in params_grads] + need_load_vars = [] + parameters = list( + filter(FineTuningStartup.is_persistable, main_program.list_vars())) + + for param in parameters: + if param not in update_params: + need_load_vars.append(param) + + fluid.io.load_vars(context["exe"], dirname, main_program, + need_load_vars) + print("load from {} success".format(dirname)) + + def startup(self, context): + for model_dict in context["phases"]: + with fluid.scope_guard(context["model"][model_dict["name"]][ + "scope"]): + train_prog = context["model"][model_dict["name"]][ + "main_program"] + startup_prog = context["model"][model_dict["name"]][ + "startup_program"] + with fluid.program_guard(train_prog, startup_prog): + context["exe"].run(startup_prog) + self.load(context, main_program=train_prog) + context["status"] = "train_pass" + + class PSStartup(StartupBase): def __init__(self, context): print("Running PSStartup.") diff --git a/core/utils/dataloader_instance.py b/core/utils/dataloader_instance.py index 15fbd3a3222d0a22d34cec4ca17ac726675feb29..03e6f0a67884917e9af2d02d13eb86576620ceef 100755 --- a/core/utils/dataloader_instance.py +++ b/core/utils/dataloader_instance.py @@ -19,7 +19,7 @@ from paddlerec.core.utils.envs import get_global_env from paddlerec.core.utils.envs import get_runtime_environ from paddlerec.core.reader import SlotReader from paddlerec.core.trainer import EngineMode -from paddlerec.core.utils.util import split_files +from paddlerec.core.utils.util import split_files, check_filelist def dataloader_by_name(readerclass, @@ -38,11 +38,27 @@ def dataloader_by_name(readerclass, assert package_base is not None data_path = os.path.join(package_base, data_path.split("::")[1]) - files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + hidden_file_list, files = check_filelist( + hidden_file_list=[], data_file_list=[], train_data_path=data_path) + if (hidden_file_list is not None): + print( + "Warning:please make sure there are no hidden files in the dataset folder and check these hidden files:{}". + format(hidden_file_list)) + + files.sort() + + need_split_files = False if context["engine"] == EngineMode.LOCAL_CLUSTER: + # for local cluster: split files for multi process + need_split_files = True + elif context["engine"] == EngineMode.CLUSTER and context[ + "cluster_type"] == "K8S": + # for k8s mount mode, split files for every node + need_split_files = True + print("need_split_files: {}".format(need_split_files)) + if need_split_files: files = split_files(files, context["fleet"].worker_index(), context["fleet"].worker_num()) - print("file_list : {}".format(files)) reader = reader_class(yaml_file) reader.init() @@ -84,11 +100,27 @@ def slotdataloader_by_name(readerclass, dataset_name, yaml_file, context): assert package_base is not None data_path = os.path.join(package_base, data_path.split("::")[1]) - files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + hidden_file_list, files = check_filelist( + hidden_file_list=[], data_file_list=[], train_data_path=data_path) + if (hidden_file_list is not None): + print( + "Warning:please make sure there are no hidden files in the dataset folder and check these hidden files:{}". + format(hidden_file_list)) + + files.sort() + + need_split_files = False if context["engine"] == EngineMode.LOCAL_CLUSTER: + # for local cluster: split files for multi process + need_split_files = True + elif context["engine"] == EngineMode.CLUSTER and context[ + "cluster_type"] == "K8S": + # for k8s mount mode, split files for every node + need_split_files = True + + if need_split_files: files = split_files(files, context["fleet"].worker_index(), context["fleet"].worker_num()) - print("file_list: {}".format(files)) sparse = get_global_env(name + "sparse_slots", "#") if sparse == "": @@ -138,11 +170,27 @@ def slotdataloader(readerclass, train, yaml_file, context): assert package_base is not None data_path = os.path.join(package_base, data_path.split("::")[1]) - files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + hidden_file_list, files = check_filelist( + hidden_file_list=[], data_file_list=[], train_data_path=data_path) + if (hidden_file_list is not None): + print( + "Warning:please make sure there are no hidden files in the dataset folder and check these hidden files:{}". + format(hidden_file_list)) + + files.sort() + + need_split_files = False if context["engine"] == EngineMode.LOCAL_CLUSTER: + # for local cluster: split files for multi process + need_split_files = True + elif context["engine"] == EngineMode.CLUSTER and context[ + "cluster_type"] == "K8S": + # for k8s mount mode, split files for every node + need_split_files = True + + if need_split_files: files = split_files(files, context["fleet"].worker_index(), context["fleet"].worker_num()) - print("file_list: {}".format(files)) sparse = get_global_env("sparse_slots", "#", namespace) if sparse == "": diff --git a/core/utils/util.py b/core/utils/util.py index 4eba912cafda6619ba37c3f8bc170d7d41ea40c4..f6acfe203612326a77f41326581583278dac4183 100755 --- a/core/utils/util.py +++ b/core/utils/util.py @@ -201,6 +201,28 @@ def split_files(files, trainer_id, trainers): return trainer_files[trainer_id] +def check_filelist(hidden_file_list, data_file_list, train_data_path): + for root, dirs, files in os.walk(train_data_path): + if (files == None and dirs == None): + return None, None + else: + # use files and dirs + for file_name in files: + file_path = os.path.join(train_data_path, file_name) + if file_name[0] == '.': + hidden_file_list.append(file_path) + else: + data_file_list.append(file_path) + for dirs_name in dirs: + dirs_path = os.path.join(train_data_path, dirs_name) + if dirs_name[0] == '.': + hidden_file_list.append(dirs_path) + else: + #train_data_path = os.path.join(train_data_path, dirs_name) + check_filelist(hidden_file_list, data_file_list, dirs_path) + return hidden_file_list, data_file_list + + class CostPrinter(object): """ For count cost time && print cost log diff --git a/doc/custom_reader.md b/doc/custom_reader.md deleted file mode 100644 index c9079b5397057f35191bd376d22e978806e6c646..0000000000000000000000000000000000000000 --- a/doc/custom_reader.md +++ /dev/null @@ -1,362 +0,0 @@ -# PaddleRec 自定义数据集及Reader - -用户自定义数据集及配置异步Reader,需要关注以下几个步骤: - -* [数据集整理](#数据集整理) -* [在模型组网中加入输入占位符](#在模型组网中加入输入占位符) -* [Reader实现](#Reader的实现) -* [在yaml文件中配置Reader](#在yaml文件中配置reader) - -我们以CTR-DNN模型为例,给出了从数据整理,变量定义,Reader写法,调试的完整历程。 - -* [数据及Reader示例-DNN](#数据及Reader示例-DNN) - - -## 数据集整理 - -PaddleRec支持模型自定义数据集。 - -关于数据的tips: -1. 数据量: - - PaddleRec面向大规模数据设计,可以轻松支持亿级的数据读取,工业级的数据读写api:`dataset`在搜索、推荐、信息流等业务得到了充分打磨。 -2. 文件类型: - - 支持任意直接可读的文本数据,`dataset`同时支持`.gz`格式的文本压缩数据,无需额外代码,可直接读取。数据样本应以`\n`为标志,按行组织。 - -3. 文件存放位置: - - 文件通常存放在训练节点本地,但同时,`dataset`支持使用`hadoop`远程读取数据,数据无需下载到本地,为dataset配置hadoop相关账户及地址即可。 -4. 数据类型 - - Reader处理的是以行为单位的`string`数据,喂入网络的数据需要转为`int`,`float`的数值数据,不支持`string`喂入网络,不建议明文保存及处理训练数据。 -5. Tips - - Dataset模式下,训练线程与数据读取线程的关系强相关,为了多线程充分利用,`强烈建议将文件合理的拆为多个小文件`,尤其是在分布式训练场景下,可以均衡各个节点的数据量,同时加快数据的下载速度。 - -## 在模型组网中加入输入占位符 - -Reader读取文件后,产出的数据喂入网络,需要有占位符进行接收。占位符在Paddle中使用`fluid.data`或`fluid.layers.data`进行定义。`data`的定义可以参考[fluid.data](https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/fluid_cn/data_cn.html#data)以及[fluid.layers.data](https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/layers_cn/data_cn.html#data)。 - -假如您希望输入三个数据,分别是维度32的数据A,维度变长的稀疏数据B,以及一个一维的标签数据C,并希望梯度可以经过该变量向前传递,则示例如下: - -数据A的定义: -```python -var_a = fluid.data(name='A', shape= [-1, 32], dtype='float32') -``` - -数据B的定义,变长数据的使用可以参考[LoDTensor](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/basic_concept/lod_tensor.html#cn-user-guide-lod-tensor): -```python -var_b = fluid.data(name='B', shape=[-1, 1], lod_level=1, dtype='int64') -``` - -数据C的定义: -```python -var_c = fluid.data(name='C', shape=[-1, 1], dtype='int32') -var_c.stop_gradient = False -``` - -当我们完成以上三个数据的定义后,在PaddleRec的模型定义中,还需将其加入model基类成员变量`self._data_var` - -```python -self._data_var.append(var_a) -self._data_var.append(var_b) -self._data_var.append(var_c) -``` -至此,我们完成了在组网中定义输入数据的工作。 - -## Reader的实现 - -### Reader的实现范式 - -Reader的逻辑需要一个单独的python文件进行描述。我们试写一个`test_reader.py`,实现的具体流程如下: -1. 首先我们需要引入Reader基类 - - ```python - from paddlerec.core.reader import ReaderBase - ``` -2. 创建一个子类,继承Reader的基类,训练所需Reader命名为`TrainerReader` - ```python - class TrainerReader(ReaderBase): - def init(self): - pass - - def generator_sample(self, line): - pass - ``` - -3. 在`init(self)`函数中声明一些在数据读取中会用到的变量,必要时可以在`config.yaml`文件中配置变量,利用`env.get_global_env()`拿到。 - - 比如,我们希望从yaml文件中读取一个数据预处理变量`avg=10`,目的是将数据A的数据缩小10倍,可以这样实现: - - 首先更改yaml文件,在某个space下加入该变量 - - ```yaml - ... - train: - reader: - avg: 10 - ... - ``` - - - 再更改Reader的init函数 - - ```python - from paddlerec.core.utils import envs - class TrainerReader(Reader): - def init(self): - self.avg = envs.get_global_env("avg", None, "train.reader") - - def generator_sample(self, line): - pass - ``` - -4. 继承并实现基类中的`generate_sample(self, line)`函数,逐行读取数据。 - - 该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.) - - 在这个可以迭代的函数中,如示例代码中的`def reader()`,我们定义数据读取的逻辑。以行为单位的数据进行截取,转换及预处理。 - - 最后,我们需要将数据整理为特定的格式,才能够被PaddleRec的Reader正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的`inputs`必须是严格一一对应的,并转换为类似字典的形式。 - - 示例: 假设数据ABC在文本数据中,每行以这样的形式存储: - ```shell - 0.1,0.2,0.3...3.0,3.1,3.2 \t 99999,99998,99997 \t 1 \n - ``` - - 则示例代码如下: - ```python - from paddlerec.core.utils import envs - class TrainerReader(Reader): - def init(self): - self.avg = envs.get_global_env("avg", None, "train.reader") - - def generator_sample(self, line): - - def reader(self, line): - # 先分割 '\n', 再以 '\t'为标志分割为list - variables = (line.strip('\n')).split('\t') - - # A是第一个元素,并且每个数据之间使用','分割 - var_a = variables[0].split(',') # list - var_a = [float(i) / self.avg for i in var_a] # 将str数据转换为float - - - # B是第二个元素,同样以 ',' 分割 - var_b = variables[1].split(',') # list - var_b = [int(i) for i in var_b] # 将str数据转换为int - - # C是第三个元素, 只有一个元素,没有分割符 - var_c = variables[2] - var_c = int(var_c) # 将str数据转换为int - var_c = [var_c] # 将单独的数据元素置入list中 - - # 将数据与数据名结合,组织为dict的形式 - # 如下,output形式为{ A: var_a, B: var_b, C: var_c} - variable_name = ['A', 'B', 'C'] - output = zip(variable_name, [var_a] + [var_b] + [var_c]) - - # 将数据输出,使用yield方法,将该函数变为了一个可迭代的对象 - yield output - - ``` - - 至此,我们完成了Reader的实现。 - - -### 在yaml文件中配置Reader - -在模型的yaml配置文件中,主要的修改是三个,如下 - -```yaml -reader: - batch_size: 2 - class: "{workspace}/reader.py" - train_data_path: "{workspace}/data/train_data" - reader_debug_mode: False -``` - -batch_size: 顾名思义,是小批量训练时的样本大小 -class: 运行改模型所需reader的路径 -train_data_path: 训练数据所在文件夹 -reader_debug_mode: 测试reader语法,及输出是否符合预期的debug模式的开关 - - -## 数据及Reader示例-DNN - -Reader代码来源于[criteo_reader.py](../models/rank/criteo_reader.py), 组网代码来源于[model.py](../models/rank/dnn/model.py) - -### Criteo数据集格式 - -CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/)所用的Criteo数据集。该数据集包括两部分:训练集和测试集。训练集包含一段时间内Criteo的部分流量,测试集则对应训练数据后一天的广告点击流量。 -每一行数据格式如下所示: -```bash -