diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 474abe2424502b1130984294fc78416b002c001d..452e23cb22d011f2b67233c2ea15cbf558547dca 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -101,12 +101,7 @@ paddle.fluid.io.chain (ArgSpec(args=[], varargs='readers', keywords=None, defaul paddle.fluid.io.shuffle (ArgSpec(args=['reader', 'buf_size'], varargs=None, keywords=None, defaults=None), ('document', 'e42ea6fee23ce26b23cb142cd1d6522d')) paddle.fluid.io.firstn (ArgSpec(args=['reader', 'n'], varargs=None, keywords=None, defaults=None), ('document', 'c5bb8f7dd4f917f1569a368aab5b8aad')) paddle.fluid.io.xmap_readers (ArgSpec(args=['mapper', 'reader', 'process_num', 'buffer_size', 'order'], varargs=None, keywords=None, defaults=(False,)), ('document', '9c804a42f8a4dbaa76b3c98e0ab7f796')) -paddle.fluid.io.PipeReader ('paddle.reader.decorator.PipeReader', ('document', 'd3c250618f98c1a5fb646f869016a98e')) -paddle.fluid.io.PipeReader.__init__ (ArgSpec(args=['self', 'command', 'bufsize', 'file_type'], varargs=None, keywords=None, defaults=(8192, 'plain')), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.io.PipeReader.get_line (ArgSpec(args=['self', 'cut_lines', 'line_break'], varargs=None, keywords=None, defaults=(True, '\n')), ('document', '9621ae612e595b6c34eb3bb5f3eb1a45')) paddle.fluid.io.multiprocess_reader (ArgSpec(args=['readers', 'use_pipe', 'queue_size'], varargs=None, keywords=None, defaults=(True, 1000)), ('document', '7d8b3a96e592107c893d5d51ce968ba0')) -paddle.fluid.io.Fake ('paddle.reader.decorator.Fake', ('document', '0d8f4847b99bed6d456ade0d903202e1')) -paddle.fluid.io.Fake.__init__ (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.initializer.ConstantInitializer ('paddle.fluid.initializer.ConstantInitializer', ('document', '911263fc30c516c55e89cd72086a23f8')) paddle.fluid.initializer.ConstantInitializer.__init__ (ArgSpec(args=['self', 'value', 'force_cpu'], varargs=None, keywords=None, defaults=(0.0, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.initializer.UniformInitializer ('paddle.fluid.initializer.UniformInitializer', ('document', '587b7035cd1d56f76f2ded617b92521d')) diff --git a/python/paddle/reader/decorator.py b/python/paddle/reader/decorator.py index 55a778b19695b0227a7abf6cbaef17cbb4b36a81..eeb2009e60fe8230c4e42d2aca02d745ca29ef76 100644 --- a/python/paddle/reader/decorator.py +++ b/python/paddle/reader/decorator.py @@ -14,8 +14,7 @@ __all__ = [ 'cache', 'map_readers', 'buffered', 'compose', 'chain', 'shuffle', - 'ComposeNotAligned', 'firstn', 'xmap_readers', 'PipeReader', - 'multiprocess_reader', 'Fake' + 'ComposeNotAligned', 'firstn', 'xmap_readers', 'multiprocess_reader' ] from threading import Thread @@ -517,116 +516,3 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000): return pipe_reader else: return queue_reader - - -def _buf2lines(buf, line_break="\n"): - # FIXME: line_break should be automatically configured. - lines = buf.split(line_break) - return lines[:-1], lines[-1] - - -class PipeReader: - """ - PipeReader read data by stream from a command, take it's - stdout into a pipe buffer and redirect it to the parser to - parse, then yield data as your desired format. - - You can using standard linux command or call another program - to read data, from HDFS, Ceph, URL, AWS S3 etc: - - .. code-block:: python - cmd = "hadoop fs -cat /path/to/some/file" - cmd = "cat sample_file.tar.gz" - cmd = "curl http://someurl" - cmd = "python print_s3_bucket.py" - - An example: - - .. code-block:: python - - def example_reader(): - for f in myfiles: - pr = PipeReader("cat %s"%f) - for l in pr.get_line(): - sample = l.split(" ") - yield sample - """ - - def __init__(self, command, bufsize=8192, file_type="plain"): - if not isinstance(command, str): - raise TypeError("left_cmd must be a string") - if file_type == "gzip": - self.dec = zlib.decompressobj( - 32 + zlib.MAX_WBITS) # offset 32 to skip the header - self.file_type = file_type - self.bufsize = bufsize - self.process = subprocess.Popen( - command.split(" "), bufsize=bufsize, stdout=subprocess.PIPE) - - def get_line(self, cut_lines=True, line_break="\n"): - """ - :param cut_lines: cut buffer to lines - :type cut_lines: bool - :param line_break: line break of the file, like '\\\\n' or '\\\\r' - :type line_break: string - - :return: one line or a buffer of bytes - :rtype: string - """ - remained = "" - while True: - buff = self.process.stdout.read(self.bufsize) - if buff: - if self.file_type == "gzip": - decomp_buff = cpt.to_text(self.dec.decompress(buff)) - elif self.file_type == "plain": - decomp_buff = cpt.to_text(buff) - else: - raise TypeError("file_type %s is not allowed" % - self.file_type) - - if cut_lines: - lines, remained = _buf2lines(''.join( - [remained, decomp_buff]), line_break) - for line in lines: - yield line - else: - yield decomp_buff - else: - break - - -class Fake(object): - """ - fake reader will cache the first data it read and yield it out for data_num times. - It is used to cache a data from real reader and use it for speed testing. - - :param reader: the origin reader - :param data_num: times that this reader will yield data. - - :return: a fake reader. - - Examples: - .. code-block:: python - - def reader(): - for i in range(10): - yield i - - fake_reader = Fake()(reader, 100) - """ - - def __init__(self): - self.data = None - self.yield_num = 0 - - def __call__(self, reader, data_num): - def fake_reader(): - if self.data is None: - self.data = next(reader()) - while self.yield_num < data_num: - yield self.data - self.yield_num += 1 - self.yield_num = 0 - - return fake_reader diff --git a/python/paddle/reader/tests/decorator_test.py b/python/paddle/reader/tests/decorator_test.py index ef07640ed839419d69f47503b162be597238cae6..abe87fa04df5863d3014c7ce58c5267cc79a4c8f 100644 --- a/python/paddle/reader/tests/decorator_test.py +++ b/python/paddle/reader/tests/decorator_test.py @@ -147,34 +147,6 @@ class TestXmap(unittest.TestCase): self.assertEqual(e, mapper(idx)) -class TestPipeReader(unittest.TestCase): - def test_pipe_reader(self): - def example_reader(myfiles): - for f in myfiles: - pr = paddle.reader.PipeReader("cat %s" % f, bufsize=128) - for l in pr.get_line(): - yield l - - import tempfile - - records = [str(i) for i in range(5)] - temp = tempfile.NamedTemporaryFile() - try: - with open(temp.name, 'w') as f: - for r in records: - f.write('%s\n' % r) - - result = [] - for r in example_reader([temp.name]): - result.append(r) - - for idx, e in enumerate(records): - self.assertEqual(e, result[idx]) - finally: - # delete the temporary file - temp.close() - - class TestMultiProcessReader(unittest.TestCase): def setup(self): self.samples = [] @@ -203,21 +175,5 @@ class TestMultiProcessReader(unittest.TestCase): self.reader_test(use_pipe=True) -class TestFakeReader(unittest.TestCase): - def test_fake_reader(self): - def reader(): - for i in range(10): - yield i - - data_num = 100 - fake_reader = paddle.reader.Fake()(reader, data_num) - for _ in range(10): - i = 0 - for data in fake_reader(): - self.assertEqual(data, 0) - i += 1 - self.assertEqual(i, data_num) - - if __name__ == '__main__': unittest.main()