未验证 提交 d7b67f2b 编写于 作者: Y Yancey 提交者: GitHub

fix pipe_reader on multi passes (#6627)

fix pipe reader on multi passes
上级 d40d28d8
......@@ -390,8 +390,6 @@ def pipe_reader(left_cmd,
if not callable(parser):
raise TypeError("parser must be a callable object")
process = subprocess.Popen(
left_cmd.split(" "), bufsize=bufsize, stdout=subprocess.PIPE)
# TODO(typhoonzero): add a thread to read stderr
# Always init a decompress object is better than
......@@ -400,6 +398,8 @@ def pipe_reader(left_cmd,
32 + zlib.MAX_WBITS) # offset 32 to skip the header
def reader():
process = subprocess.Popen(
left_cmd.split(" "), bufsize=bufsize, stdout=subprocess.PIPE)
remained = ""
while True:
buff = process.stdout.read(bufsize)
......
......@@ -145,5 +145,35 @@ class TestXmap(unittest.TestCase):
self.assertEqual(e, mapper(idx))
class TestPipeReader(unittest.TestCase):
def test_pipe_reader(self):
def simple_parser(lines):
return lines
import tempfile
records = [str(i) for i in xrange(5)]
temp = tempfile.NamedTemporaryFile()
try:
with open(temp.name, 'w') as f:
for r in records:
f.write('%s\n' % r)
cmd = "cat %s" % temp.name
reader = paddle.v2.reader.pipe_reader(
cmd, simple_parser, bufsize=128)
for i in xrange(4):
result = []
for r in reader():
result.append(r)
for idx, e in enumerate(records):
print e, result[idx]
self.assertEqual(e, result[idx])
finally:
# delete the temporary file
temp.close()
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册