# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. __all__ = ['buffered'] from Queue import Queue from threading import Thread def buffered(reader, size): """Creates a buffered data reader. The buffered data reader will read and save data entries into a buffer. Reading from the buffered data reader will proceed as long as the buffer is not empty. Args: reader: the data reader to read from. size: max buffer size. Returns: The buffered data reader. """ class EndSignal(): pass end = EndSignal() def read_worker(r, q): for d in r: q.put(d) q.put(end) def create_reader(): r = reader() q = Queue(maxsize=size) t = Thread( target=read_worker, args=( r, q, )) t.daemon = True t.start() e = q.get() while e != end: yield e e = q.get() return create_reader