diff --git a/core/trainers/framework/dataset.py b/core/trainers/framework/dataset.py index 5c5a2357ff4a07d54d4e0c56e692b4d79fcb2095..ae3b7c38824eec0b509579bf29f122ab58fb3a30 100644 --- a/core/trainers/framework/dataset.py +++ b/core/trainers/framework/dataset.py @@ -22,9 +22,9 @@ from paddlerec.core.utils import dataloader_instance from paddlerec.core.reader import SlotReader from paddlerec.core.trainer import EngineMode from paddlerec.core.utils.util import split_files +from paddle.fluid.contrib.utils.hdfs_utils import HDFSClient -__all__ = ["DatasetBase", "DataLoader", "QueueDataset"] - +__all__ = ["DatasetBase", "DataLoader", "QueueDataset", "InMemoryDataset"] class DatasetBase(object): """R @@ -151,3 +151,68 @@ class QueueDataset(DatasetBase): dataset.set_use_var(inputs) break return dataset + +class InMemoryDataset(QueueDataset): + def _get_dataset(self, dataset_name, context): + with open("context.txt", "w+") as fout: + fout.write(str(context)) + name = "dataset." + dataset_name + "." + reader_class = envs.get_global_env(name + "data_converter") + reader_class_name = envs.get_global_env(name + "reader_class_name", + "Reader") + abs_dir = os.path.dirname(os.path.abspath(__file__)) + reader = os.path.join(abs_dir, '../../utils', 'dataset_instance.py') + sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip() + dense_slots = envs.get_global_env(name + "dense_slots", "").strip() + for dataset_config in context["env"]["dataset"]: + if dataset_config["type"] == "InMemoryDataset": + hdfs_addr = dataset_config["hdfs_addr"] + hdfs_ugi = dataset_config["hdfs_ugi"] + hadoop_home = dataset_config["hadoop_home"] + if hdfs_addr is None or hdfs_ugi is None: + raise ValueError("hdfs_addr and hdfs_ugi not set") + if sparse_slots == "" and dense_slots == "": + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, + reader_class_name, + context["config_yaml"]) + else: + if sparse_slots == "": + sparse_slots = "?" + if dense_slots == "": + dense_slots = "?" + padding = envs.get_global_env(name + "padding", 0) + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", context["config_yaml"], "fake", + sparse_slots.replace(" ", "?"), + dense_slots.replace(" ", "?"), str(padding)) + + batch_size = envs.get_global_env(name + "batch_size") + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_batch_size(batch_size) + dataset.set_pipe_command(pipe_cmd) + dataset.set_hdfs_config(hdfs_addr, hdfs_ugi) + train_data_path = envs.get_global_env(name + "data_path") + hdfs_configs = { + "fs.default.name": hdfs_addr, + "hadoop.job.ugi": hdfs_ugi + } + hdfs_client = HDFSClient(hadoop_home, hdfs_configs) + file_list = ["{}/{}".format(hdfs_addr, x) for x in hdfs_client.lsr(train_data_path)] + if context["engine"] == EngineMode.LOCAL_CLUSTER: + file_list = split_files(file_list, context["fleet"].worker_index(), + context["fleet"].worker_num()) + + dataset.set_filelist(file_list) + for model_dict in context["phases"]: + if model_dict["dataset_name"] == dataset_name: + model = context["model"][model_dict["name"]]["model"] + thread_num = int(model_dict["thread_num"]) + dataset.set_thread(thread_num) + if context["is_infer"]: + inputs = model._infer_data_var + else: + inputs = model._data_var + dataset.set_use_var(inputs) + dataset.load_into_memory() + break + return dataset diff --git a/core/trainers/framework/network.py b/core/trainers/framework/network.py index 7d7a8273b6a402bd163f653a7beb3900de899ae3..3ea84135c78bfcfad324b6b6d87c2d0026dbb02c 100644 --- a/core/trainers/framework/network.py +++ b/core/trainers/framework/network.py @@ -19,7 +19,7 @@ import warnings import paddle.fluid as fluid from paddlerec.core.utils import envs -from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset +from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset, InMemoryDataset __all__ = [ "NetworkBase", "SingleNetwork", "PSNetwork", "PslibNetwork", @@ -105,7 +105,11 @@ class SingleNetwork(NetworkBase): context["dataset"][dataset[ "name"]] = dataset_class.create_dataset(dataset["name"], context) - + elif type == "InMemoryDataset": + dataset_class = InMemoryDataset(context) + context["dataset"][dataset[ + "name"]] = dataset_class.create_dataset(dataset["name"], + context) context["status"] = "startup_pass" @@ -187,7 +191,11 @@ class FineTuningNetwork(NetworkBase): context["dataset"][dataset[ "name"]] = dataset_class.create_dataset(dataset["name"], context) - + elif type == "InMemoryDataset": + dataset_class = InMemoryDataset(context) + context["dataset"][dataset[ + "name"]] = dataset_class.create_dataset(dataset["name"], + context) context["status"] = "startup_pass" @@ -250,6 +258,11 @@ class PSNetwork(NetworkBase): context["dataset"][dataset[ "name"]] = dataset_class.create_dataset( dataset["name"], context) + elif type == "InMemoryDataset": + dataset_class = InMemoryDataset(context) + context["dataset"][dataset[ + "name"]] = dataset_class.create_dataset( + dataset["name"], context) context["status"] = "startup_pass" def _build_strategy(self, context): @@ -348,6 +361,11 @@ class PslibNetwork(NetworkBase): context["dataset"][dataset[ "name"]] = dataset_class.create_dataset( dataset["name"], context) + elif type == "InMemoryDataset": + dataset_class = InMemoryDataset(context) + context["dataset"][dataset[ + "name"]] = dataset_class.create_dataset( + dataset["name"], context) context["status"] = "startup_pass" def _server(self, context): diff --git a/models/rank/dnn/hdfs_config.yaml b/models/rank/dnn/hdfs_config.yaml new file mode 100755 index 0000000000000000000000000000000000000000..4fa769e1dcf42feb31b4db7f4743760255610a2f --- /dev/null +++ b/models/rank/dnn/hdfs_config.yaml @@ -0,0 +1,68 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# workspace +workspace: "models/rank/dnn" + +# list of dataset +dataset: +- name: dataset_train # name of dataset to distinguish different datasets + batch_size: 2 + type: InMemoryDataset # or DataLoader + data_path: "/user/paddle/wangjiawei04/paddlerec/dnn" + hdfs_addr: "afs://yinglong.afs.baidu.com:9902" + hdfs_ugi: "paddle,paddle" + hadoop_home: "~/.ndt/software/hadoop-xingtian/hadoop/" + sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" + dense_slots: "dense_var:13" + +# hyper parameters of user-defined network +hyper_parameters: + # optimizer config + optimizer: + class: Adam + learning_rate: 0.001 + strategy: async + # user-defined pairs + sparse_inputs_slots: 27 + sparse_feature_number: 1000001 + sparse_feature_dim: 9 + dense_input_dim: 13 + fc_sizes: [512, 256, 128, 32] + +# select runner by name +mode: [single_cpu_train] +# config of each runner. +# runner is a kind of paddle training class, which wraps the train/infer process. +runner: +- name: single_cpu_train + class: train + # num of epochs + epochs: 4 + # device to run training or infer + device: cpu + save_checkpoint_interval: 2 # save model interval of epochs + save_inference_interval: 4 # save inference + save_checkpoint_path: "increment_dnn" # save checkpoint path + save_inference_path: "inference" # save inference path + save_inference_feed_varnames: [] # feed vars of save inference + save_inference_fetch_varnames: [] # fetch vars of save inference + print_interval: 10 + phases: [phase1] + +# runner will run all the phase in each epoch +phase: +- name: phase1 + model: "{workspace}/model.py" # user-defined model + dataset_name: dataset_train # select dataset by name