diff --git a/core/model.py b/core/model.py index b4150155db9677124eabab079f003743fc6c4d8b..212db44c8dc60a20f6e5ed3f7c338b5336f41e2a 100755 --- a/core/model.py +++ b/core/model.py @@ -38,6 +38,34 @@ class Model(object): self._namespace = "train.model" self._platform = envs.get_platform() + def _init_slots(self): + sparse_slots = envs.get_global_env("sparse_slots", None, "train.reader") + dense_slots = envs.get_global_env("dense_slots", None, "train.reader") + + if sparse_slots is not None or dense_slots is not None: + sparse_slots = sparse_slots.strip().split(" ") + dense_slots = dense_slots.strip().split(" ") + dense_slots_shape = [[int(j) for j in i.split(":")[1].strip("[]").split(",")] for i in dense_slots] + dense_slots = [i.split(":")[0] for i in dense_slots] + self._dense_data_var = [] + for i in range(len(dense_slots)): + l = fluid.layers.data(name=dense_slots[i], shape=dense_slots_shape[i], dtype="float32") + self._data_var.append(l) + self._dense_data_var.append(l) + self._sparse_data_var = [] + for name in sparse_slots: + l = fluid.layers.data(name=name, shape=[1], lod_level=1, dtype="int64") + self._data_var.append(l) + self._sparse_data_var.append(l) + + dataset_class = envs.get_global_env("dataset_class", None, "train.reader") + if dataset_class == "DataLoader": + self._init_dataloader() + + def _init_dataloader(self): + self._data_loader = fluid.io.DataLoader.from_generator( + feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False) + def get_inputs(self): return self._data_var diff --git a/core/reader.py b/core/reader.py index 955afa5e2f9a4d430912ad798de95df3160a3a69..01502761e30a7215c0c916dcde1825a4836280db 100755 --- a/core/reader.py +++ b/core/reader.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import print_function +import sys import abc import os @@ -44,3 +45,58 @@ class Reader(dg.MultiSlotDataGenerator): @abc.abstractmethod def generate_sample(self, line): pass + + +class SlotReader(dg.MultiSlotDataGenerator): + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + dg.MultiSlotDataGenerator.__init__(self) + if os.path.isfile(config): + with open(config, 'r') as rb: + _config = yaml.load(rb.read(), Loader=yaml.FullLoader) + else: + raise ValueError("reader config only support yaml") + envs.set_global_envs(_config) + envs.update_workspace() + + def init(self, sparse_slots, dense_slots, padding=0): + from operator import mul + self.sparse_slots = sparse_slots.strip().split(" ") + self.dense_slots = dense_slots.strip().split(" ") + self.dense_slots_shape = [reduce(mul, [int(j) for j in i.split(":")[1].strip("[]").split(",")]) for i in self.dense_slots] + self.dense_slots = [i.split(":")[0] for i in self.dense_slots] + self.slots = self.dense_slots + self.sparse_slots + self.slot2index = {} + self.visit = {} + for i in range(len(self.slots)): + self.slot2index[self.slots[i]] = i + self.visit[self.slots[i]] = False + self.padding = padding + + def generate_sample(self, l): + def reader(): + line = l.strip().split(" ") + output = [(i, []) for i in self.slots] + for i in line: + slot_feasign = i.split(":") + slot = slot_feasign[0] + if slot not in self.slots: + continue + if slot in self.sparse_slots: + feasign = int(slot_feasign[1]) + else: + feasign = float(slot_feasign[1]) + output[self.slot2index[slot]][1].append(feasign) + self.visit[slot] = True + for i in self.visit: + slot = i + if not self.visit[slot]: + if i in self.dense_slots: + output[self.slot2index[i]][1].extend([self.padding] * self.dense_slots_shape[self.slot2index[i]]) + else: + output[self.slot2index[i]][1].extend([self.padding]) + else: + self.visit[slot] = False + yield output + return reader diff --git a/core/trainers/transpiler_trainer.py b/core/trainers/transpiler_trainer.py index 81591056c94dc414fdeeba12d449f18aaaa0e216..a67d4759be7ae27c4a8c57eb43409102a8400c53 100755 --- a/core/trainers/transpiler_trainer.py +++ b/core/trainers/transpiler_trainer.py @@ -23,6 +23,7 @@ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import f from paddlerec.core.trainer import Trainer from paddlerec.core.utils import envs from paddlerec.core.utils import dataloader_instance +from paddlerec.core.reader import SlotReader class TranspileTrainer(Trainer): @@ -50,14 +51,22 @@ class TranspileTrainer(Trainer): namespace = "evaluate.reader" class_name = "EvaluateReader" + sparse_slots = envs.get_global_env("sparse_slots", None, namespace) + dense_slots = envs.get_global_env("dense_slots", None, namespace) + batch_size = envs.get_global_env("batch_size", None, namespace) - reader_class = envs.get_global_env("class", None, namespace) print("batch_size: {}".format(batch_size)) - reader = dataloader_instance.dataloader( - reader_class, state, self._config_yaml) - reader_class = envs.lazy_instance_by_fliename(reader_class, class_name) - reader_ins = reader_class(self._config_yaml) + if sparse_slots is None and dense_slots is None: + reader_class = envs.get_global_env("class", None, namespace) + reader = dataloader_instance.dataloader( + reader_class, state, self._config_yaml) + reader_class = envs.lazy_instance_by_fliename(reader_class, class_name) + reader_ins = reader_class(self._config_yaml) + else: + reader = dataloader_instance.slotdataloader("", state, self._config_yaml) + reader_ins = SlotReader(self._config_yaml) + if hasattr(reader_ins, 'generate_batch_from_trainfiles'): dataloader.set_sample_list_generator(reader) else: @@ -93,13 +102,23 @@ class TranspileTrainer(Trainer): train_data_path = envs.get_global_env( "test_data_path", None, namespace) + sparse_slots = envs.get_global_env("sparse_slots", None, namespace) + dense_slots = envs.get_global_env("dense_slots", None, namespace) + threads = int(envs.get_runtime_environ("train.trainer.threads")) batch_size = envs.get_global_env("batch_size", None, namespace) reader_class = envs.get_global_env("class", None, namespace) abs_dir = os.path.dirname(os.path.abspath(__file__)) reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - pipe_cmd = "python {} {} {} {}".format( - reader, reader_class, state, self._config_yaml) + + if sparse_slots is None and dense_slots is None: + pipe_cmd = "python {} {} {} {}".format( + reader, reader_class, state, self._config_yaml) + else: + padding = envs.get_global_env("padding", 0, namespace) + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, namespace, \ + sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) if train_data_path.startswith("paddlerec::"): package_base = envs.get_runtime_environ("PACKAGE_BASE") @@ -147,9 +166,6 @@ class TranspileTrainer(Trainer): if not need_save(epoch_id, save_interval, False): return - # print("save inference model is not supported now.") - # return - feed_varnames = envs.get_global_env( "save.inference.feed_varnames", None, namespace) fetch_varnames = envs.get_global_env( diff --git a/core/utils/dataloader_instance.py b/core/utils/dataloader_instance.py index 6234882f4d49191d3b55770078863910a356e9cd..8d4db6f82c05a41a0945f2c882caefd2a3c83d36 100755 --- a/core/utils/dataloader_instance.py +++ b/core/utils/dataloader_instance.py @@ -18,6 +18,7 @@ import os from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.utils.envs import get_global_env from paddlerec.core.utils.envs import get_runtime_environ +from paddlerec.core.reader import SlotReader def dataloader(readerclass, train, yaml_file): @@ -62,3 +63,49 @@ def dataloader(readerclass, train, yaml_file): if hasattr(reader, 'generate_batch_from_trainfiles'): return gen_batch_reader() return gen_reader + + +def slotdataloader(readerclass, train, yaml_file): + if train == "TRAIN": + reader_name = "SlotReader" + namespace = "train.reader" + data_path = get_global_env("train_data_path", None, namespace) + else: + reader_name = "SlotReader" + namespace = "evaluate.reader" + data_path = get_global_env("test_data_path", None, namespace) + + if data_path.startswith("paddlerec::"): + package_base = get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + data_path = os.path.join(package_base, data_path.split("::")[1]) + + files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + + sparse = get_global_env("sparse_slots", None, namespace) + dense = get_global_env("dense_slots", None, namespace) + padding = get_global_env("padding", 0, namespace) + reader = SlotReader(yaml_file) + reader.init(sparse, dense, int(padding)) + + def gen_reader(): + for file in files: + with open(file, 'r') as f: + for line in f: + line = line.rstrip('\n') + iter = reader.generate_sample(line) + for parsed_line in iter(): + if parsed_line is None: + continue + else: + values = [] + for pased in parsed_line: + values.append(pased[1]) + yield values + + def gen_batch_reader(): + return reader.generate_batch_from_trainfiles(files) + + if hasattr(reader, 'generate_batch_from_trainfiles'): + return gen_batch_reader() + return gen_reader diff --git a/core/utils/dataset_instance.py b/core/utils/dataset_instance.py index 731b3b47169d9e67735953c8488469d4d60cb296..f5175c48df978919c51519d027561011bd3ceb44 100755 --- a/core/utils/dataset_instance.py +++ b/core/utils/dataset_instance.py @@ -16,19 +16,33 @@ from __future__ import print_function import sys from paddlerec.core.utils.envs import lazy_instance_by_fliename +from paddlerec.core.reader import SlotReader +from paddlerec.core.utils import envs -if len(sys.argv) != 4: - raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate 3.yaml_abs_path") +if len(sys.argv) < 4: + raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path") reader_package = sys.argv[1] -if sys.argv[2] == "TRAIN": +if sys.argv[2].upper() == "TRAIN": reader_name = "TrainReader" -else: +elif sys.argv[2].upper() == "EVALUATE": reader_name = "EvaluateReader" +else: + reader_name = "SlotReader" + namespace = sys.argv[4] + sparse_slots = sys.argv[5].replace("#", " ") + dense_slots = sys.argv[6].replace("#", " ") + padding = int(sys.argv[7]) yaml_abs_path = sys.argv[3] -reader_class = lazy_instance_by_fliename(reader_package, reader_name) -reader = reader_class(yaml_abs_path) -reader.init() -reader.run_from_stdin() + +if reader_name != "SlotReader": + reader_class = lazy_instance_by_fliename(reader_package, reader_name) + reader = reader_class(yaml_abs_path) + reader.init() + reader.run_from_stdin() +else: + reader = SlotReader(yaml_abs_path) + reader.init(sparse_slots, dense_slots, padding) + reader.run_from_stdin() diff --git a/doc/custom_dataset_reader.md b/doc/custom_dataset_reader.md index 7f29c21c3a32e2cf17b775741707e4ba83e90373..82b0fd12d4f52fe83155fd371f6041be52c8bcba 100644 --- a/doc/custom_dataset_reader.md +++ b/doc/custom_dataset_reader.md @@ -1,3 +1,73 @@ +# PaddleRec 推荐数据集格式 + +当你的数据集格式为[slot:feasign]*这种模式,或者可以预处理为这种格式时,可以直接使用PaddleRec内置的Reader。 +好处是不用自己写Reader了,各个model之间的数据格式也都可以统一成一样的格式。 + +## 数据格式说明 + +假如你的原始数据格式为 + +```bash +