diff --git a/fluid/DeepASR/data_utils/async_data_reader.py b/fluid/DeepASR/data_utils/async_data_reader.py index 03448fadccfbcfb67ab28cdf2071fc4b743ef6e5..ffb37f5cce81553aed0fbf6ae0c3b7fd018b45a4 100644 --- a/fluid/DeepASR/data_utils/async_data_reader.py +++ b/fluid/DeepASR/data_utils/async_data_reader.py @@ -22,7 +22,6 @@ from data_utils.util import CriticalException, ForceExitWrapper, EpochEndSignal class SampleInfo(object): """SampleInfo holds the necessary information to load a sample from disk. - Args: feature_bin_path (str): File containing the feature data. feature_start (int): Start position of the sample's feature data. @@ -55,7 +54,6 @@ class SampleInfoBucket(object): data, sample start position, sample byte number etc.) to access samples' feature data and the same with the label description file. SampleInfoBucket is the minimum unit to do shuffle. - Args: feature_bin_paths (list|tuple): Files containing the binary feature data. @@ -165,7 +163,6 @@ class SampleInfoBucket(object): class AsyncDataReader(object): """DataReader provides basic audio sample preprocessing pipeline including data loading and data augmentation. - Args: feature_file_list (str): File containing paths of feature data file and corresponding description file. @@ -209,8 +206,6 @@ class AsyncDataReader(object): self.generate_bucket_list(True) self._order_id = 0 self._manager = Manager() - self._sample_buffer_size = sample_buffer_size - self._sample_info_buffer_size = sample_info_buffer_size self._batch_buffer_size = batch_buffer_size self._proc_num = proc_num if self._proc_num <= 2: @@ -218,6 +213,10 @@ class AsyncDataReader(object): self._sample_proc_num = self._proc_num - 2 self._verbose = verbose self._force_exit = ForceExitWrapper(self._manager.Value('b', False)) + # buffer queue + self._sample_info_queue = self._manager.Queue(sample_info_buffer_size) + self._sample_queue = self._manager.Queue(sample_buffer_size) + self._batch_queue = self._manager.Queue(batch_buffer_size) def generate_bucket_list(self, is_shuffle): if self._block_info_list is None: @@ -258,8 +257,6 @@ class AsyncDataReader(object): shared_ndarray.recycle(self._pool_manager.pool) def _start_async_processing(self): - sample_info_queue = self._manager.Queue(self._sample_info_buffer_size) - sample_queue = self._manager.Queue(self._sample_buffer_size) self._order_id = 0 @suppress_complaints(verbose=self._verbose, notify=self._force_exit) @@ -284,7 +281,9 @@ class AsyncDataReader(object): sample_info_queue.put(EpochEndSignal()) feeding_proc = DaemonProcessGroup( - proc_num=1, target=ordered_feeding_task, args=(sample_info_queue, )) + proc_num=1, + target=ordered_feeding_task, + args=(self._sample_info_queue, )) feeding_proc.start_all() @suppress_complaints(verbose=self._verbose, notify=self._force_exit) @@ -361,15 +360,13 @@ class AsyncDataReader(object): sample_queue.put(EpochEndSignal()) out_order = self._manager.list([0]) - args = (sample_info_queue, sample_queue, out_order) + args = (self._sample_info_queue, self._sample_queue, out_order) sample_proc = DaemonProcessGroup( proc_num=self._sample_proc_num, target=ordered_processing_task, args=args) sample_proc.start_all() - return sample_queue - def batch_iterator(self, batch_size, minimum_batch_size): @suppress_complaints(verbose=self._verbose, notify=self._force_exit) def batch_assembling_task(sample_queue, batch_queue, pool): @@ -419,8 +416,7 @@ class AsyncDataReader(object): batch_queue.put(EpochEndSignal()) - sample_queue = self._start_async_processing() - batch_queue = self._manager.Queue(self._batch_buffer_size) + self._start_async_processing() self._pool_manager = SharedMemoryPoolManager(self._batch_buffer_size * 3, self._manager) @@ -428,12 +424,13 @@ class AsyncDataReader(object): assembling_proc = DaemonProcessGroup( proc_num=1, target=batch_assembling_task, - args=(sample_queue, batch_queue, self._pool_manager.pool)) + args=(self._sample_queue, self._batch_queue, + self._pool_manager.pool)) assembling_proc.start_all() while self._force_exit == False: try: - batch_data = batch_queue.get_nowait() + batch_data = self._batch_queue.get_nowait() except Queue.Empty: time.sleep(0.001) else: