diff --git a/python/paddle/fluid/incubate/data_generator/__init__.py b/python/paddle/fluid/incubate/data_generator/__init__.py deleted file mode 100644 index 3e66b75e28faf82607d3baa6477c9364d2e6c096..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/incubate/data_generator/__init__.py +++ /dev/null @@ -1,356 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -__all__ = ['MultiSlotDataGenerator', 'MultiSlotStringDataGenerator'] - - -class DataGenerator: - """ - DataGenerator is a general Base class for user to inherit - A user who wants to define his/her own python processing logic - with paddle.fluid.dataset should inherit this class - """ - - def __init__(self): - self._proto_info = None - self.batch_size_ = 32 - - def _set_line_limit(self, line_limit): - if not isinstance(line_limit, int): - raise ValueError( - "line_limit%s must be in int type" % type(line_limit) - ) - if line_limit < 1: - raise ValueError("line_limit can not less than 1") - self._line_limit = line_limit - - def set_batch(self, batch_size): - ''' - Set batch size of current DataGenerator - This is necessary only if a user wants to define generator_batch - - Example: - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - def generate_sample(self, line): - def local_iter(): - int_words = [int(x) for x in line.split()] - yield ("words", int_words) - return local_iter - def generate_batch(self, samples): - def local_iter(): - for s in samples: - yield ("words", s[1].extend([s[1][0]])) - mydata = MyData() - mydata.set_batch(128) - - ''' - self.batch_size_ = batch_size - - def run_from_memory(self): - ''' - This function generator data from memory, it is usually used for - debug and benchmarking - Example: - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - def generate_sample(self, line): - def local_iter(): - yield ("words", [1, 2, 3, 4]) - return local_iter - mydata = MyData() - mydata.run_from_memory() - ''' - batch_samples = [] - line_iter = self.generate_sample(None) - for user_parsed_line in line_iter(): - if user_parsed_line is None: - continue - batch_samples.append(user_parsed_line) - if len(batch_samples) == self.batch_size_: - batch_iter = self.generate_batch(batch_samples) - for sample in batch_iter(): - sys.stdout.write(self._gen_str(sample)) - batch_samples = [] - if len(batch_samples) > 0: - batch_iter = self.generate_batch(batch_samples) - for sample in batch_iter(): - sys.stdout.write(self._gen_str(sample)) - - def run_from_stdin(self): - ''' - This function reads the data row from stdin, parses it with the - process function, and further parses the return value of the - process function with the _gen_str function. The parsed data will - be wrote to stdout and the corresponding protofile will be - generated. - Example: - - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - def generate_sample(self, line): - def local_iter(): - int_words = [int(x) for x in line.split()] - yield ("words", [int_words]) - return local_iter - mydata = MyData() - mydata.run_from_stdin() - ''' - batch_samples = [] - for line in sys.stdin: - line_iter = self.generate_sample(line) - for user_parsed_line in line_iter(): - if user_parsed_line is None: - continue - batch_samples.append(user_parsed_line) - if len(batch_samples) == self.batch_size_: - batch_iter = self.generate_batch(batch_samples) - for sample in batch_iter(): - sys.stdout.write(self._gen_str(sample)) - batch_samples = [] - if len(batch_samples) > 0: - batch_iter = self.generate_batch(batch_samples) - for sample in batch_iter(): - sys.stdout.write(self._gen_str(sample)) - - def _gen_str(self, line): - ''' - Further processing the output of the process() function rewritten by - user, outputting data that can be directly read by the datafeed,and - updating proto_info information. - Args: - line(str): the output of the process() function rewritten by user. - Returns: - Return a string data that can be read directly by the datafeed. - ''' - raise NotImplementedError( - "pls use MultiSlotDataGenerator or PairWiseDataGenerator" - ) - - def generate_sample(self, line): - ''' - This function needs to be overridden by the user to process the - original data row into a list or tuple. - Args: - line(str): the original data row - Returns: - Returns the data processed by the user. - The data format is list or tuple: - [(name, [feasign, ...]), ...] - or ((name, [feasign, ...]), ...) - - For example: - [("words", [1926, 08, 17]), ("label", [1])] - or (("words", [1926, 08, 17]), ("label", [1])) - Note: - The type of feasigns must be in int or float. Once the float - element appears in the feasign, the type of that slot will be - processed into a float. - Example: - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - def generate_sample(self, line): - def local_iter(): - int_words = [int(x) for x in line.split()] - yield ("words", [int_words]) - return local_iter - ''' - raise NotImplementedError( - "Please rewrite this function to return a list or tuple: " - + "[(name, [feasign, ...]), ...] or ((name, [feasign, ...]), ...)" - ) - - def generate_batch(self, samples): - ''' - This function needs to be overridden by the user to process the - generated samples from generate_sample(self, str) function - It is usually used as batch processing when a user wants to - do preprocessing on a batch of samples, e.g. padding according to - the max length of a sample in the batch - Args: - samples(list tuple): generated sample from generate_sample - Returns: - a python generator, the same format as return value of generate_sample - Example: - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - def generate_sample(self, line): - def local_iter(): - int_words = [int(x) for x in line.split()] - yield ("words", int_words) - return local_iter - def generate_batch(self, samples): - def local_iter(): - for s in samples: - yield ("words", s[1].extend([s[1][0]])) - mydata = MyData() - mydata.set_batch(128) - ''' - - def local_iter(): - for sample in samples: - yield sample - - return local_iter - - -# TODO: guru4elephant -# add more generalized DataGenerator that can adapt user-defined slot -# for example, [(name, float_list), (name, str_list), (name, int_list)] -class MultiSlotStringDataGenerator(DataGenerator): - def _gen_str(self, line): - ''' - Further processing the output of the process() function rewritten by - user, outputting data that can be directly read by the MultiSlotDataFeed, - and updating proto_info information. - The input line will be in this format: - >>> [(name, [str(feasign), ...]), ...] - >>> or ((name, [str(feasign), ...]), ...) - The output will be in this format: - >>> [ids_num id1 id2 ...] ... - For example, if the input is like this: - >>> [("words", ["1926", "08", "17"]), ("label", ["1"])] - >>> or (("words", ["1926", "08", "17"]), ("label", ["1"])) - the output will be: - >>> 3 1234 2345 3456 1 1 - Args: - line(str): the output of the process() function rewritten by user. - Returns: - Return a string data that can be read directly by the MultiSlotDataFeed. - ''' - if not isinstance(line, list) and not isinstance(line, tuple): - raise ValueError( - "the output of process() must be in list or tuple type" - "Examples: [('words', ['1926', '08', '17']), ('label', ['1'])]" - ) - output = "" - for index, item in enumerate(line): - name, elements = item - if output: - output += " " - out_str = [] - out_str.append(str(len(elements))) - out_str.extend(elements) - output += " ".join(out_str) - return output + "\n" - - -class MultiSlotDataGenerator(DataGenerator): - def _gen_str(self, line): - ''' - Further processing the output of the process() function rewritten by - user, outputting data that can be directly read by the MultiSlotDataFeed, - and updating proto_info information. - The input line will be in this format: - >>> [(name, [feasign, ...]), ...] - >>> or ((name, [feasign, ...]), ...) - The output will be in this format: - >>> [ids_num id1 id2 ...] ... - The proto_info will be in this format: - >>> [(name, type), ...] - - For example, if the input is like this: - >>> [("words", [1926, 08, 17]), ("label", [1])] - >>> or (("words", [1926, 08, 17]), ("label", [1])) - the output will be: - >>> 3 1234 2345 3456 1 1 - the proto_info will be: - >>> [("words", "uint64"), ("label", "uint64")] - Args: - line(str): the output of the process() function rewritten by user. - Returns: - Return a string data that can be read directly by the MultiSlotDataFeed. - ''' - if not isinstance(line, list) and not isinstance(line, tuple): - raise ValueError( - "the output of process() must be in list or tuple type" - "Example: [('words', [1926, 08, 17]), ('label', [1])]" - ) - output = "" - - if self._proto_info is None: - self._proto_info = [] - for item in line: - name, elements = item - if not isinstance(name, str): - raise ValueError("name%s must be in str type" % type(name)) - if not isinstance(elements, list): - raise ValueError( - "elements%s must be in list type" % type(elements) - ) - if not elements: - raise ValueError( - "the elements of each field can not be empty, you need padding it in process()." - ) - self._proto_info.append((name, "uint64")) - if output: - output += " " - output += str(len(elements)) - for elem in elements: - if isinstance(elem, float): - self._proto_info[-1] = (name, "float") - elif not isinstance(elem, int) and not isinstance( - elem, long - ): - raise ValueError( - "the type of element%s must be in int or float" - % type(elem) - ) - output += " " + str(elem) - else: - if len(line) != len(self._proto_info): - raise ValueError( - "the complete field set of two given line are inconsistent." - ) - for index, item in enumerate(line): - name, elements = item - if not isinstance(name, str): - raise ValueError("name%s must be in str type" % type(name)) - if not isinstance(elements, list): - raise ValueError( - "elements%s must be in list type" % type(elements) - ) - if not elements: - raise ValueError( - "the elements of each field can not be empty, you need padding it in process()." - ) - if name != self._proto_info[index][0]: - raise ValueError( - "the field name of two given line are not match: require<%s>, get<%s>." - % (self._proto_info[index][0], name) - ) - if output: - output += " " - output += str(len(elements)) - for elem in elements: - if self._proto_info[index][1] != "float": - if isinstance(elem, float): - self._proto_info[index] = (name, "float") - elif not isinstance(elem, int) and not isinstance( - elem, long - ): - raise ValueError( - "the type of element%s must be in int or float" - % type(elem) - ) - output += " " + str(elem) - return output + "\n" diff --git a/python/paddle/fluid/tests/unittests/ps/dataset_generator_A.py b/python/paddle/fluid/tests/unittests/ps/dataset_generator_A.py index a53da27593907a17850bcb87fae6af9c3dcbb553..04fcc977019c95ce5a619df2088081527f9f4326 100755 --- a/python/paddle/fluid/tests/unittests/ps/dataset_generator_A.py +++ b/python/paddle/fluid/tests/unittests/ps/dataset_generator_A.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle.fluid.incubate.data_generator as dg +import paddle.distributed.fleet as fleet cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] @@ -22,7 +22,7 @@ continuous_range_ = range(1, 14) categorical_range_ = range(14, 40) -class CriteoDataset(dg.MultiSlotDataGenerator): +class CriteoDataset(fleet.MultiSlotDataGenerator): def generate_sample(self, line): """ Read the data line by line and process it as a dictionary diff --git a/python/paddle/fluid/tests/unittests/ps/dataset_generator_B.py b/python/paddle/fluid/tests/unittests/ps/dataset_generator_B.py index 77178ee4c333dd5d8ff043778aeaacb44befe7f5..0769a106e09af2fe5b0815574b829fc626d3fa97 100755 --- a/python/paddle/fluid/tests/unittests/ps/dataset_generator_B.py +++ b/python/paddle/fluid/tests/unittests/ps/dataset_generator_B.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle.fluid.incubate.data_generator as dg +import paddle.distributed.fleet as fleet cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] @@ -22,7 +22,7 @@ continuous_range_ = range(1, 14) categorical_range_ = range(14, 40) -class CriteoDataset(dg.MultiSlotDataGenerator): +class CriteoDataset(fleet.MultiSlotDataGenerator): def generate_sample(self, line): """ Read the data line by line and process it as a dictionary diff --git a/python/paddle/fluid/tests/unittests/test_dataset_consistency_inspection.py b/python/paddle/fluid/tests/unittests/test_dataset_consistency_inspection.py index 1a8d4de560ab79b37cc417fb83e6f980e61f77ec..447417cc8640e4fd0d6b1b1de9a845a8d22e5591 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset_consistency_inspection.py +++ b/python/paddle/fluid/tests/unittests/test_dataset_consistency_inspection.py @@ -21,8 +21,8 @@ import tempfile import unittest import paddle +import paddle.distributed.fleet as fleet import paddle.fluid as fluid -import paddle.fluid.incubate.data_generator as dg # paddle.enable_static() # fluid.disable_dygraph() @@ -51,7 +51,7 @@ query_schema = [ ] -class CTRDataset(dg.MultiSlotDataGenerator): +class CTRDataset(fleet.MultiSlotDataGenerator): def __init__(self, mode): self.test = mode diff --git a/python/setup.py.in b/python/setup.py.in index 44a09489560a3b51d4e0d902a9097b6878d8e454..29fe8b45519ebfd8b6a07da385499ed49411c8ca 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -401,7 +401,6 @@ packages=['paddle', 'paddle.fluid.transpiler', 'paddle.fluid.transpiler.details', 'paddle.fluid.incubate', - 'paddle.fluid.incubate.data_generator', 'paddle.fluid.incubate.fleet', 'paddle.fluid.incubate.checkpoint', 'paddle.fluid.incubate.fleet.base', diff --git a/setup.py b/setup.py index 50ef5ee480199d303db12a56463026f63590e56a..722c541ab013876dae685db59b2f194c29f21300 100644 --- a/setup.py +++ b/setup.py @@ -1287,7 +1287,6 @@ def get_setup_parameters(): 'paddle.fluid.transpiler', 'paddle.fluid.transpiler.details', 'paddle.fluid.incubate', - 'paddle.fluid.incubate.data_generator', 'paddle.fluid.incubate.fleet', 'paddle.fluid.incubate.checkpoint', 'paddle.fluid.incubate.fleet.base',