From f12deac863f221e297f7363b72b54328b07580f6 Mon Sep 17 00:00:00 2001 From: yangyaming Date: Mon, 5 Feb 2018 11:46:02 +0800 Subject: [PATCH] Add comments for data reader and adapt the model according to parallel reader. --- fluid/DeepASR/data_utils/data_reader.py | 549 +++++++++++--------- fluid/DeepASR/data_utils/parallel_reader.py | 277 ---------- fluid/DeepASR/stacked_dynamic_lstm.py | 32 +- 3 files changed, 327 insertions(+), 531 deletions(-) delete mode 100644 fluid/DeepASR/data_utils/parallel_reader.py diff --git a/fluid/DeepASR/data_utils/data_reader.py b/fluid/DeepASR/data_utils/data_reader.py index 69620629..bd4dad88 100644 --- a/fluid/DeepASR/data_utils/data_reader.py +++ b/fluid/DeepASR/data_utils/data_reader.py @@ -1,255 +1,328 @@ -"""This model read the sample from disk. - use multiprocessing to reading samples - push samples from one block to multiprocessing queue - Todos: - 1. multiprocess read block from disk +"""This module contains data processing related logic. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import random -import Queue import numpy as np import struct import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm import data_utils.augmentor.trans_add_delta as trans_add_delta - - -class OneBlock(object): - """ struct for one block : - contain label, label desc, feature, feature_desc - - Attributes: - label(str) : label path of one block - label_desc(str) : label description path of one block - feature(str) : feature path of on block - feature_desc(str) : feature description path of on block +from multiprocessing import Manager, Process +from threading import Thread +import time + + +class SampleInfo(object): + """SampleInfo holds the necessary information to load an example from disk. + + Args: + feature_bin_path (str): File containing the feature data. + feature_start (int): Start position of the sample's feature data. + feature_size (int): Byte count of the sample's feature data. + feature_frame_num (int): Time length of the sample. + feature_dim (int): Feature dimension of one frame. + label_bin_path (str): File containing the label data. + label_size (int): Byte count of the sample's label data. + label_frame_num (int): Label number of the sample. """ - def __init__(self): - """the constructor.""" - - self.label = "label" - self.label_desc = "label_desc" - self.feature = "feature" - self.feature_desc = "feature_desc" - - -class DataRead(object): + def __init__(self, feature_bin_path, feature_start, feature_size, + feature_frame_num, feature_dim, label_bin_path, label_start, + label_size, label_frame_num): + self.feature_bin_path = feature_bin_path + self.feature_start = feature_start + self.feature_size = feature_size + self.feature_frame_num = feature_frame_num + self.feature_dim = feature_dim + + self.label_bin_path = label_bin_path + self.label_start = label_start + self.label_size = label_size + self.label_frame_num = label_frame_num + + +class SampleInfoBucket(object): + """SampleInfoBucket contains paths of several description files. Feature + description file contains necessary information to access samples' feature + data and label description file contains necessary information to + access samples' label data. SampleInfoBucket is the minimum unit to do + shuffle. + + Args: + feature_bin_paths (list|tuple): Files containing the binary feature + data. + feature_desc_paths (list|tuple): Files containing the description of + samples' feature data. + label_bin_paths (list|tuple): Files containing the binary label data. + label_desc_paths (list|tuple): Files containing the description of + samples' label data. """ - Attributes: - _lblock(obj:`OneBlock`) : the list of OneBlock - _ndrop_sentence_len(int): dropout the sentence which's frame_num large than _ndrop_sentence_len - _que_sample(obj:`Queue`): sample buffer - _nframe_dim(int): the batch sample frame_dim(todo remove) - _nstart_block_idx(int): the start block id - _nload_block_num(int): the block num + + def __init__(self, feature_bin_paths, feature_desc_paths, label_bin_paths, + label_desc_paths): + block_num = len(label_bin_paths) + assert len(label_desc_paths) == block_num + assert len(feature_bin_paths) == block_num + assert len(feature_desc_paths) == block_num + self._block_num = block_num + + self._feature_bin_paths = feature_bin_paths + self._feature_desc_paths = feature_desc_paths + self._label_bin_paths = label_bin_paths + self._label_desc_paths = label_desc_paths + + def generate_sample_info_list(self): + sample_info_list = [] + for block_idx in xrange(self._block_num): + label_bin_path = self._label_bin_paths[block_idx] + label_desc_path = self._label_desc_paths[block_idx] + feature_bin_path = self._feature_bin_paths[block_idx] + feature_desc_path = self._feature_desc_paths[block_idx] + + label_desc_lines = open(label_desc_path).readlines() + feature_desc_lines = open(feature_desc_path).readlines() + + sample_num = int(label_desc_lines[0].split()[1]) + assert sample_num == int(feature_desc_lines[0].split()[1]) + + for i in xrange(sample_num): + feature_desc_split = feature_desc_lines[i + 1].split() + feature_start = int(feature_desc_split[2]) + feature_size = int(feature_desc_split[3]) + feature_frame_num = int(feature_desc_split[4]) + feature_dim = int(feature_desc_split[5]) + + label_desc_split = label_desc_lines[i + 1].split() + label_start = int(label_desc_split[2]) + label_size = int(label_desc_split[3]) + label_frame_num = int(label_desc_split[4]) + + sample_info_list.append( + SampleInfo(feature_bin_path, feature_start, feature_size, + feature_frame_num, feature_dim, label_bin_path, + label_start, label_size, label_frame_num)) + + return sample_info_list + + +class EpochEndSignal(): + pass + + +class DataReader(object): + """DataReader provides basic audio sample preprocessing pipeline including + I/O and augmentation transformation. + + Args: + feature_file_list (str): File containing feature data related files. + label_file_list (str): File containing label data related files. + frame_dim (int): The final feature dimension of one frame after all + augmentation applied. + drop_sentence_len (int): Lower threshold bound to filter samples having + long sentence. + process_num (int): Number of processes for processing data. + sample_buffer_size (int): Buffer size to indicate the maximum samples + cached. + sample_info_buffer_size (int): Buffer size to indicate the maximum + sample information cached. + shuffle_block_num (int): Block number indicating the minimum unit to do + shuffle. + random_seed (int): Random seed. """ - def __init__(self, sfeature_lst, slabel_lst, ndrop_sentence_len=512): - """ - Args: - sfeature_lst(str):feature lst path - slabel_lst(str):label lst path - Returns: - None - """ - self._lblock = [] - self._ndrop_sentence_len = ndrop_sentence_len - self._que_sample = Queue.Queue() - self._nframe_dim = 120 * 11 - self._nstart_block_idx = 0 - self._nload_block_num = 1 - self._ndrop_frame_len = 256 - - self._load_list(sfeature_lst, slabel_lst) - - def _load_list(self, sfeature_lst, slabel_lst): - """ load list and shuffle - Args: - sfeature_lst(str):feature lst path - slabel_lst(str):label lst path - Returns: - None - """ - lfeature = open(sfeature_lst).readlines() - llabel = open(slabel_lst).readlines() - assert len(llabel) == len(lfeature) - for i in range(0, len(lfeature), 2): - one_block = OneBlock() - - one_block.label = llabel[i] - one_block.label_desc = llabel[i + 1] - one_block.feature = lfeature[i] - one_block.feature_desc = lfeature[i + 1] - self._lblock.append(one_block) - - random.shuffle(self._lblock) - - def _load_one_block(self, lsample, id): - """read one block by id and push load sample in list lsample - Args: - lsample(list): return sample list - id(int): block id - Returns: - None - """ - if id >= len(self._lblock): - return - - slabel_path = self._lblock[id].label.strip() - slabel_desc_path = self._lblock[id].label_desc.strip() - sfeature_path = self._lblock[id].feature.strip() - sfeature_desc_path = self._lblock[id].feature_desc.strip() - - llabel_line = open(slabel_desc_path).readlines() - lfeature_line = open(sfeature_desc_path).readlines() - - file_lable_bin = open(slabel_path, "r") - file_feature_bin = open(sfeature_path, "r") - - sample_num = int(llabel_line[0].split()[1]) - assert sample_num == int(lfeature_line[0].split()[1]) - - llabel_line = llabel_line[1:] - lfeature_line = lfeature_line[1:] - - for i in range(sample_num): - # read label - llabel_split = llabel_line[i].split() - nlabel_start = int(llabel_split[2]) - nlabel_size = int(llabel_split[3]) - nlabel_frame_num = int(llabel_split[4]) - - file_lable_bin.seek(nlabel_start, 0) - label_bytes = file_lable_bin.read(nlabel_size) - assert nlabel_frame_num * 4 == len(label_bytes) - label_array = struct.unpack('I' * nlabel_frame_num, label_bytes) - label_data = np.array(label_array, dtype="int64") - label_data = label_data.reshape((nlabel_frame_num, 1)) - - # read feature - lfeature_split = lfeature_line[i].split() - nfeature_start = int(lfeature_split[2]) - nfeature_size = int(lfeature_split[3]) - nfeature_frame_num = int(lfeature_split[4]) - nfeature_frame_dim = int(lfeature_split[5]) - - file_feature_bin.seek(nfeature_start, 0) - feature_bytes = file_feature_bin.read(nfeature_size) - assert nfeature_frame_num * nfeature_frame_dim * 4 == len( - feature_bytes) - feature_array = struct.unpack('f' * nfeature_frame_num * - nfeature_frame_dim, feature_bytes) - feature_data = np.array(feature_array, dtype="float32") - feature_data = feature_data.reshape( - (nfeature_frame_num, nfeature_frame_dim)) - - #drop long sentence - if self._ndrop_frame_len < feature_data.shape[0]: + def __init__( + self, + feature_file_list, + label_file_list, + frame_dim=120 * 11, # @TODO augmentor is responsible for the value + drop_sentence_len=512, + drop_frame_len=256, + process_num=10, + sample_buffer_size=1024, + sample_info_buffer_size=10000, + shuffle_block_num=1, + random_seed=0): + self._feature_file_list = feature_file_list + self._label_file_list = label_file_list + self._drop_sentence_len = drop_sentence_len + self._frame_dim = frame_dim + self._drop_frame_len = drop_frame_len + self._shuffle_block_num = shuffle_block_num + self._block_info_list = None + self._rng = random.Random(random_seed) + self._bucket_list = None + self.generate_bucket_list(True) + self._order_id = 0 + self._manager = Manager() + self._sample_buffer_size = sample_buffer_size + self._sample_info_buffer_size = sample_info_buffer_size + self._process_num = process_num + + def generate_bucket_list(self, is_shuffle): + if self._block_info_list is None: + block_feature_info_lines = open(self._feature_file_list).readlines() + block_label_info_lines = open(self._label_file_list).readlines() + assert len(block_feature_info_lines) == len(block_label_info_lines) + self._block_info_list = [] + for i in xrange(0, len(block_feature_info_lines), 2): + block_info = (block_feature_info_lines[i], + block_feature_info_lines[i + 1], + block_label_info_lines[i], + block_label_info_lines[i + 1]) + self._block_info_list.append( + map(lambda line: line.strip(), block_info)) + + if is_shuffle: + self._rng.shuffle(self._block_info_list) + + self._bucket_list = [] + for i in xrange(0, len(self._block_info_list), self._shuffle_block_num): + bucket_block_info = self._block_info_list[i:i + + self._shuffle_block_num] + self._bucket_list.append( + SampleInfoBucket( + map(lambda info: info[0], bucket_block_info), + map(lambda info: info[1], bucket_block_info), + map(lambda info: info[2], bucket_block_info), + map(lambda info: info[3], bucket_block_info))) + + # @TODO make this configurable + def set_transformers(self, transformers): + self._transformers = transformers + + def _sample_generator(self): + sample_info_queue = self._manager.Queue(self._sample_info_buffer_size) + sample_queue = self._manager.Queue(self._sample_buffer_size) + self._order_id = 0 + + def ordered_feeding_worker(sample_info_queue): + for sample_info_bucket in self._bucket_list: + sample_info_list = sample_info_bucket.generate_sample_info_list( + ) + self._rng.shuffle(sample_info_list) # do shuffle here + for sample_info in sample_info_list: + sample_info_queue.put((sample_info, self._order_id)) + self._order_id += 1 + + for i in xrange(self._process_num): + sample_info_queue.put(EpochEndSignal()) + + feeding_thread = Thread( + target=ordered_feeding_worker, args=(sample_info_queue, )) + feeding_thread.daemon = True + feeding_thread.start() + + def ordered_processing_worker(sample_info_queue, sample_queue, + out_order): + def read_bytes(fpath, start, size): + f = open(fpath, 'r') + f.seek(start, 0) + binary_bytes = f.read(size) + f.close() + return binary_bytes + + ins = sample_info_queue.get() + + while not isinstance(ins, EpochEndSignal): + sample_info, order_id = ins + + feature_bytes = read_bytes(sample_info.feature_bin_path, + sample_info.feature_start, + sample_info.feature_size) + + label_bytes = read_bytes(sample_info.label_bin_path, + sample_info.label_start, + sample_info.label_size) + + assert sample_info.label_frame_num * 4 == len(label_bytes) + label_array = struct.unpack('I' * sample_info.label_frame_num, + label_bytes) + label_data = np.array( + label_array, dtype='int64').reshape( + (sample_info.label_frame_num, 1)) + + feature_frame_num = sample_info.feature_frame_num + feature_dim = sample_info.feature_dim + assert feature_frame_num * feature_dim * 4 == len(feature_bytes) + feature_array = struct.unpack('f' * feature_frame_num * + feature_dim, feature_bytes) + feature_data = np.array( + feature_array, dtype='float32').reshape(( + sample_info.feature_frame_num, sample_info.feature_dim)) + + sample_data = (feature_data, label_data) + for transformer in self._transformers: + # @TODO(pkuyym) to make transfomer only accept feature_data + sample_data = transformer.perform_trans(sample_data) + + while order_id != out_order[0]: + time.sleep(0.001) + + # drop long sentence + if self._drop_sentence_len >= sample_data[0].shape[0]: + sample_queue.put(sample_data) + + out_order[0] += 1 + ins = sample_info_queue.get() + + sample_queue.put(EpochEndSignal()) + + out_order = self._manager.list([0]) + args = (sample_info_queue, sample_queue, out_order) + workers = [ + Process( + target=ordered_processing_worker, args=args) + for _ in xrange(self._process_num) + ] + + for w in workers: + w.daemon = True + w.start() + + finished_process_num = 0 + + while finished_process_num < self._process_num: + sample = sample_queue.get() + if isinstance(sample, EpochEndSignal): + finished_process_num += 1 continue - lsample.append((feature_data, label_data)) - - def get_one_batch(self, nbatch_size): - """construct one batch(feature, label), batch size is nbatch_size - Args: - nbatch_size(int): batch size - Returns: - None - """ - if self._que_sample.empty(): - lsample = self._load_block( - range(self._nstart_block_idx, self._nstart_block_idx + - self._nload_block_num, 1)) - self._move_sample(lsample) - self._nstart_block_idx += self._nload_block_num - - if self._que_sample.empty(): - self._nstart_block_idx = 0 - return None - #cal all frame num - ncur_len = 0 + yield sample + + feeding_thread.join() + for w in workers: + w.join() + + def batch_iterator(self, batch_size, minimum_batch_size): + batch_samples = [] lod = [0] - samples = [] - bat_feature = np.zeros((nbatch_size, self._nframe_dim)) - for i in range(nbatch_size): - # empty clear zero - if self._que_sample.empty(): - self._nstart_block_idx = 0 - # copy - else: - (one_feature, one_label) = self._que_sample.get() - samples.append((one_feature, one_label)) - ncur_len += one_feature.shape[0] - lod.append(ncur_len) - - bat_feature = np.zeros((ncur_len, self._nframe_dim), dtype="float32") - bat_label = np.zeros((ncur_len, 1), dtype="int64") - ncur_len = 0 - for sample in samples: - one_feature = sample[0] - one_label = sample[1] - nframe_num = one_feature.shape[0] - nstart = ncur_len - nend = ncur_len + nframe_num - bat_feature[nstart:nend, :] = one_feature - bat_label[nstart:nend, :] = one_label - ncur_len += nframe_num - return (bat_feature, bat_label, lod) - - def set_trans(self, ltrans): - """ set transform list - Args: - ltrans(list): data tranform list - Returns: - None - """ - self._ltrans = ltrans - - def _load_block(self, lblock_id): - """read blocks - """ - lsample = [] - for id in lblock_id: - self._load_one_block(lsample, id) - - # transform sample - for (nidx, sample) in enumerate(lsample): - for trans in self._ltrans: - sample = trans.perform_trans(sample) - lsample[nidx] = sample - - return lsample - - def load_block(self, lblock_id): - """read blocks - Args: - lblock_id(list):the block list id - Returns: - None - """ - lsample = [] - for id in lblock_id: - self._load_one_block(lsample, id) - - # transform sample - for (nidx, sample) in enumerate(lsample): - for trans in self._ltrans: - sample = trans.perform_trans(sample) - lsample[nidx] = sample - - return lsample - - def _move_sample(self, lsample): - """move sample to queue - Args: - lsample(list): one block of samples read from disk - Returns: - None - """ - # random - random.shuffle(lsample) - - for sample in lsample: - self._que_sample.put(sample) + # check whether need parallel here + for sample in self._sample_generator(): + batch_samples.append(sample) + lod.append(lod[-1] + sample[0].shape[0]) + if len(batch_samples) == batch_size: + batch_feature = np.zeros( + (lod[-1], self._frame_dim), dtype="float32") + batch_label = np.zeros((lod[-1], 1), dtype="int64") + start = 0 + for sample in batch_samples: + frame_num = sample[0].shape[0] + batch_feature[start:start + frame_num, :] = sample[0] + batch_label[start:start + frame_num, :] = sample[1] + start += frame_num + yield (batch_feature, batch_label, lod) + batch_samples = [] + lod = [0] + + if len(batch_samples) >= minimum_batch_size: + batch_feature = np.zeros( + (lod[-1], self._frame_dim), dtype="float32") + batch_label = np.zeros((lod[-1], 1), dtype="int64") + start = 0 + for sample in batch_samples: + frame_num = sample[0].shape[0] + batch_feature[start:start + frame_num, :] = sample[0] + batch_label[start:start + frame_num, :] = sample[1] + start += frame_num + yield (batch_feature, batch_label, lod) diff --git a/fluid/DeepASR/data_utils/parallel_reader.py b/fluid/DeepASR/data_utils/parallel_reader.py deleted file mode 100644 index e0821f73..00000000 --- a/fluid/DeepASR/data_utils/parallel_reader.py +++ /dev/null @@ -1,277 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import random -import numpy as np -import struct -import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm -import data_utils.augmentor.trans_add_delta as trans_add_delta -from multiprocessing import Manager, Process -from threading import Thread -import time - - -class SampleInfo(object): - def __init__(self, feature_bin_path, feature_start, feature_size, - feature_frame_num, feature_dim, label_bin_path, label_start, - label_size, label_frame_num): - - self.feature_bin_path = feature_bin_path - self.feature_start = feature_start - self.feature_size = feature_size - self.feature_frame_num = feature_frame_num - self.feature_dim = feature_dim - - self.label_bin_path = label_bin_path - self.label_start = label_start - self.label_size = label_size - self.label_frame_num = label_frame_num - - -class SampleInfoBucket(object): - def __init__(self, feature_bin_paths, feature_desc_paths, label_bin_paths, - label_desc_paths): - block_num = len(label_bin_paths) - assert len(label_desc_paths) == block_num - assert len(feature_bin_paths) == block_num - assert len(feature_desc_paths) == block_num - self._block_num = block_num - - self._feature_bin_paths = feature_bin_paths - self._feature_desc_paths = feature_desc_paths - self._label_bin_paths = label_bin_paths - self._label_desc_paths = label_desc_paths - - def generate_sample_info_list(self): - sample_info_list = [] - for block_idx in xrange(self._block_num): - label_bin_path = self._label_bin_paths[block_idx] - label_desc_path = self._label_desc_paths[block_idx] - feature_bin_path = self._feature_bin_paths[block_idx] - feature_desc_path = self._feature_desc_paths[block_idx] - - label_desc_lines = open(label_desc_path).readlines() - feature_desc_lines = open(feature_desc_path).readlines() - - sample_num = int(label_desc_lines[0].split()[1]) - assert sample_num == int(feature_desc_lines[0].split()[1]) - - for i in xrange(sample_num): - feature_desc_split = feature_desc_lines[i + 1].split() - feature_start = int(feature_desc_split[2]) - feature_size = int(feature_desc_split[3]) - feature_frame_num = int(feature_desc_split[4]) - feature_dim = int(feature_desc_split[5]) - - label_desc_split = label_desc_lines[i + 1].split() - label_start = int(label_desc_split[2]) - label_size = int(label_desc_split[3]) - label_frame_num = int(label_desc_split[4]) - - sample_info_list.append( - SampleInfo(feature_bin_path, feature_start, feature_size, - feature_frame_num, feature_dim, label_bin_path, - label_start, label_size, label_frame_num)) - - return sample_info_list - - -class EpochEndSignal(): - pass - - -class DataReader(object): - def __init__(self, - feature_file_list, - label_file_list, - frame_dim=120 * 11, - drop_sentence_len=512, - drop_frame_len=256, - parallel_num=10, - sample_buffer_size=1024, - sample_info_buffer_size=10000, - shuffle_block_num=1, - random_seed=0): - self._feature_file_list = feature_file_list - self._label_file_list = label_file_list - self._drop_sentence_len = drop_sentence_len - self._frame_dim = frame_dim - self._drop_frame_len = drop_frame_len - self._shuffle_block_num = shuffle_block_num - self._block_info_list = None - self._rng = random.Random(random_seed) - self._bucket_list = None - self.generate_bucket_list(True) - self._order_id = 0 - self._manager = Manager() - self._sample_buffer_size = sample_buffer_size - self._sample_info_buffer_size = sample_info_buffer_size - self._process_num = parallel_num - - def generate_bucket_list(self, is_shuffle): - if self._block_info_list is None: - block_feature_info_lines = open(self._feature_file_list).readlines() - block_label_info_lines = open(self._label_file_list).readlines() - assert len(block_feature_info_lines) == len(block_label_info_lines) - self._block_info_list = [] - for i in xrange(0, len(block_feature_info_lines), 2): - block_info = (block_feature_info_lines[i], - block_feature_info_lines[i + 1], - block_label_info_lines[i], - block_label_info_lines[i + 1]) - self._block_info_list.append( - map(lambda line: line.strip(), block_info)) - - if is_shuffle: - self._rng.shuffle(self._block_info_list) - - self._bucket_list = [] - for i in xrange(0, len(self._block_info_list), self._shuffle_block_num): - bucket_block_info = self._block_info_list[i:i + - self._shuffle_block_num] - self._bucket_list.append( - SampleInfoBucket( - map(lambda info: info[0], bucket_block_info), - map(lambda info: info[1], bucket_block_info), - map(lambda info: info[2], bucket_block_info), - map(lambda info: info[3], bucket_block_info))) - - # @TODO make this configurable - def set_transformers(self, transformers): - self._transformers = transformers - - def _sample_generator(self): - sample_info_queue = self._manager.Queue(self._sample_info_buffer_size) - sample_queue = self._manager.Queue(self._sample_buffer_size) - self._order_id = 0 - - def ordered_feeding_worker(sample_info_queue): - for sample_info_bucket in self._bucket_list: - sample_info_list = sample_info_bucket.generate_sample_info_list( - ) - self._rng.shuffle(sample_info_list) # do shuffle here - for sample_info in sample_info_list: - sample_info_queue.put((sample_info, self._order_id)) - self._order_id += 1 - - for i in xrange(self._process_num): - sample_info_queue.put(EpochEndSignal()) - - feeding_thread = Thread( - target=ordered_feeding_worker, args=(sample_info_queue, )) - feeding_thread.daemon = True - feeding_thread.start() - - def ordered_processing_worker(sample_info_queue, sample_queue, - out_order): - def read_bytes(fpath, start, size): - f = open(fpath, 'r') - f.seek(start, 0) - binary_bytes = f.read(size) - f.close() - return binary_bytes - - ins = sample_info_queue.get() - - while not isinstance(ins, EpochEndSignal): - sample_info, order_id = ins - - feature_bytes = read_bytes(sample_info.feature_bin_path, - sample_info.feature_start, - sample_info.feature_size) - - label_bytes = read_bytes(sample_info.label_bin_path, - sample_info.label_start, - sample_info.label_size) - - assert sample_info.label_frame_num * 4 == len(label_bytes) - label_array = struct.unpack('I' * sample_info.label_frame_num, - label_bytes) - label_data = np.array( - label_array, dtype='int64').reshape( - (sample_info.label_frame_num, 1)) - - feature_frame_num = sample_info.feature_frame_num - feature_dim = sample_info.feature_dim - assert feature_frame_num * feature_dim * 4 == len(feature_bytes) - feature_array = struct.unpack('f' * feature_frame_num * - feature_dim, feature_bytes) - feature_data = np.array( - feature_array, dtype='float32').reshape(( - sample_info.feature_frame_num, sample_info.feature_dim)) - - sample_data = (feature_data, label_data) - for transformer in self._transformers: - # @TODO(pkuyym) to make transfomer only accept feature_data - sample_data = transformer.perform_trans(sample_data) - - while order_id != out_order[0]: - time.sleep(0.001) - - # drop long sentence - if self._drop_sentence_len >= sample_data[0].shape[0]: - sample_queue.put(sample_data) - - out_order[0] += 1 - ins = sample_info_queue.get() - - sample_queue.put(EpochEndSignal()) - - out_order = self._manager.list([0]) - args = (sample_info_queue, sample_queue, out_order) - workers = [ - Process( - target=ordered_processing_worker, args=args) - for _ in xrange(self._process_num) - ] - - for w in workers: - w.daemon = True - w.start() - - finished_process_num = 0 - - while finished_process_num < self._process_num: - sample = sample_queue.get() - if isinstance(sample, EpochEndSignal): - finished_process_num += 1 - continue - yield sample - - feeding_thread.join() - for w in workers: - w.join() - - def batch_iterator(self, batch_size, minimum_batch_size): - batch_samples = [] - lod = [0] - # check whether need parallel here - for sample in self._sample_generator(): - batch_samples.append(sample) - lod.append(lod[-1] + sample[0].shape[0]) - if len(batch_samples) == batch_size: - batch_feature = np.zeros( - (lod[-1], self._frame_dim), dtype="float32") - batch_label = np.zeros((lod[-1], 1), dtype="int64") - start = 0 - for sample in batch_samples: - frame_num = sample[0].shape[0] - batch_feature[start:start + frame_num, :] = sample[0] - batch_label[start:start + frame_num, :] = sample[1] - start += frame_num - yield (batch_feature, batch_label, lod) - batch_samples = [] - lod = [0] - - if len(batch_samples) >= minimum_batch_size: - batch_feature = np.zeros( - (lod[-1], self._frame_dim), dtype="float32") - batch_label = np.zeros((lod[-1], 1), dtype="int64") - start = 0 - for sample in batch_samples: - frame_num = sample[0].shape[0] - batch_feature[start:start + frame_num, :] = sample[0] - batch_label[start:start + frame_num, :] = sample[1] - start += frame_num - yield (batch_feature, batch_label, lod) diff --git a/fluid/DeepASR/stacked_dynamic_lstm.py b/fluid/DeepASR/stacked_dynamic_lstm.py index 51ba7a6b..d726a277 100644 --- a/fluid/DeepASR/stacked_dynamic_lstm.py +++ b/fluid/DeepASR/stacked_dynamic_lstm.py @@ -9,9 +9,9 @@ import time import paddle.v2 as paddle import paddle.v2.fluid as fluid import paddle.v2.fluid.profiler as profiler -import data_utils.trans_mean_variance_norm as trans_mean_variance_norm -import data_utils.trans_add_delta as trans_add_delta -import data_utils.trans_splice as trans_splice +import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm +import data_utils.augmentor.trans_add_delta as trans_add_delta +import data_utils.augmentor.trans_splice as trans_splice import data_utils.data_reader as reader @@ -22,6 +22,12 @@ def parse_args(): type=int, default=32, help='The sequence number of a batch data. (default: %(default)d)') + parser.add_argument( + '--minimum_batch_size', + type=int, + default=32, + help='The minimum sequence number of a batch data. (default: %(default)d)' + ) parser.add_argument( '--stacked_num', type=int, @@ -160,14 +166,15 @@ def train(args): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) + # @TODO datareader should take the responsibility (parsing from config file) ltrans = [ trans_add_delta.TransAddDelta(2, 2), trans_mean_variance_norm.TransMeanVarianceNorm(args.mean_var), trans_splice.TransSplice() ] - data_reader = reader.DataRead(args.feature_lst, args.label_lst) - data_reader.set_trans(ltrans) + data_reader = reader.DataReader(args.feature_lst, args.label_lst) + data_reader.set_transformers(ltrans) res_feature = fluid.LoDTensor() res_label = fluid.LoDTensor() @@ -175,22 +182,15 @@ def train(args): pass_start_time = time.time() words_seen = 0 accuracy.reset(exe) - batch_id = 0 - while True: - # load_data - one_batch = data_reader.get_one_batch(args.batch_size) - if one_batch == None: - break - (bat_feature, bat_label, lod) = one_batch + for batch_id, batch_data in enumerate( + data_reader.batch_iterator(args.batch_size, + args.minimum_batch_size)): + (bat_feature, bat_label, lod) = batch_data res_feature.set(bat_feature, place) res_feature.set_lod([lod]) res_label.set(bat_label, place) res_label.set_lod([lod]) - - batch_id += 1 - words_seen += lod[-1] - loss, acc = exe.run( fluid.default_main_program(), feed={"feature": res_feature, -- GitLab