提交 4987f5c5 编写于 作者: X Xinghai Sun

Refine data reader and move data_reader to async_data_reader.

上级 df8060e7
...@@ -11,11 +11,14 @@ import time ...@@ -11,11 +11,14 @@ import time
import numpy as np import numpy as np
from threading import Thread from threading import Thread
import signal import signal
import multiprocessing
from multiprocessing import Manager, Process from multiprocessing import Manager, Process
import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm
import data_utils.augmentor.trans_add_delta as trans_add_delta import data_utils.augmentor.trans_add_delta as trans_add_delta
from data_utils.util import suppress_complaints, suppress_signal from data_utils.util import suppress_complaints, suppress_signal
from data_utils.util import CriticalException, ForceExitWrapper from data_utils.util import SharedNDArray, SharedMemoryPoolManager
from data_utils.util import DaemonProcessGroup, batch_to_ndarray
from data_utils.util import CriticalException, ForceExitWrapper, EpochEndSignal
class SampleInfo(object): class SampleInfo(object):
...@@ -67,7 +70,8 @@ class SampleInfoBucket(object): ...@@ -67,7 +70,8 @@ class SampleInfoBucket(object):
split_sentence_threshold(int): Sentence whose length larger than split_sentence_threshold(int): Sentence whose length larger than
the value will trigger split operation. the value will trigger split operation.
split_sub_sentence_len(int): sub-sentence length is equal to split_sub_sentence_len(int): sub-sentence length is equal to
(split_sub_sentence_len + rand() % split_perturb). (split_sub_sentence_len + \
rand() % split_perturb).
""" """
def __init__(self, def __init__(self,
...@@ -159,11 +163,7 @@ class SampleInfoBucket(object): ...@@ -159,11 +163,7 @@ class SampleInfoBucket(object):
return sample_info_list return sample_info_list
class EpochEndSignal(): class AsyncDataReader(object):
pass
class DataReader(object):
"""DataReader provides basic audio sample preprocessing pipeline including """DataReader provides basic audio sample preprocessing pipeline including
data loading and data augmentation. data loading and data augmentation.
...@@ -174,7 +174,7 @@ class DataReader(object): ...@@ -174,7 +174,7 @@ class DataReader(object):
corresponding description file. corresponding description file.
drop_frame_len (int): Samples whose label length above the value will be drop_frame_len (int): Samples whose label length above the value will be
dropped.(Using '-1' to disable the policy) dropped.(Using '-1' to disable the policy)
process_num (int): Number of processes for processing data. proc_num (int): Number of processes for processing data.
sample_buffer_size (int): Buffer size to indicate the maximum samples sample_buffer_size (int): Buffer size to indicate the maximum samples
cached. cached.
sample_info_buffer_size (int): Buffer size to indicate the maximum sample_info_buffer_size (int): Buffer size to indicate the maximum
...@@ -193,10 +193,10 @@ class DataReader(object): ...@@ -193,10 +193,10 @@ class DataReader(object):
feature_file_list, feature_file_list,
label_file_list, label_file_list,
drop_frame_len=512, drop_frame_len=512,
process_num=10, proc_num=10,
sample_buffer_size=1024, sample_buffer_size=1024,
sample_info_buffer_size=1024, sample_info_buffer_size=1024,
batch_buffer_size=1024, batch_buffer_size=10,
shuffle_block_num=10, shuffle_block_num=10,
random_seed=0, random_seed=0,
verbose=0): verbose=0):
...@@ -213,9 +213,14 @@ class DataReader(object): ...@@ -213,9 +213,14 @@ class DataReader(object):
self._sample_buffer_size = sample_buffer_size self._sample_buffer_size = sample_buffer_size
self._sample_info_buffer_size = sample_info_buffer_size self._sample_info_buffer_size = sample_info_buffer_size
self._batch_buffer_size = batch_buffer_size self._batch_buffer_size = batch_buffer_size
self._process_num = process_num self._proc_num = proc_num
if self._proc_num <= 2:
raise ValueError("Value of `proc_num` should be greater than 2.")
self._sample_proc_num = self._proc_num - 2
self._verbose = verbose self._verbose = verbose
self._force_exit = ForceExitWrapper(self._manager.Value('b', False)) self._force_exit = ForceExitWrapper(self._manager.Value('b', False))
self._pool_manager = SharedMemoryPoolManager(self._batch_buffer_size *
3, self._manager)
def generate_bucket_list(self, is_shuffle): def generate_bucket_list(self, is_shuffle):
if self._block_info_list is None: if self._block_info_list is None:
...@@ -249,13 +254,23 @@ class DataReader(object): ...@@ -249,13 +254,23 @@ class DataReader(object):
def set_transformers(self, transformers): def set_transformers(self, transformers):
self._transformers = transformers self._transformers = transformers
def _sample_generator(self): def recycle(self, *args):
for shared_ndarray in args:
if not isinstance(shared_ndarray, SharedNDArray):
raise Value("Only support recycle SharedNDArray object.")
shared_ndarray.recycle(self._pool_manager.pool)
def _start_async_processing(self):
sample_info_queue = self._manager.Queue(self._sample_info_buffer_size) sample_info_queue = self._manager.Queue(self._sample_info_buffer_size)
sample_queue = self._manager.Queue(self._sample_buffer_size) sample_queue = self._manager.Queue(self._sample_buffer_size)
self._order_id = 0 self._order_id = 0
@suppress_complaints(verbose=self._verbose, notify=self._force_exit) @suppress_complaints(verbose=self._verbose, notify=self._force_exit)
def ordered_feeding_task(sample_info_queue): def ordered_feeding_task(sample_info_queue):
if self._verbose == 0:
signal.signal(signal.SIGTERM, suppress_signal)
signal.signal(signal.SIGINT, suppress_signal)
for sample_info_bucket in self._bucket_list: for sample_info_bucket in self._bucket_list:
try: try:
sample_info_list = \ sample_info_list = \
...@@ -268,13 +283,12 @@ class DataReader(object): ...@@ -268,13 +283,12 @@ class DataReader(object):
sample_info_queue.put((sample_info, self._order_id)) sample_info_queue.put((sample_info, self._order_id))
self._order_id += 1 self._order_id += 1
for i in xrange(self._process_num): for i in xrange(self._sample_proc_num):
sample_info_queue.put(EpochEndSignal()) sample_info_queue.put(EpochEndSignal())
feeding_thread = Thread( feeding_proc = DaemonProcessGroup(
target=ordered_feeding_task, args=(sample_info_queue, )) proc_num=1, target=ordered_feeding_task, args=(sample_info_queue, ))
feeding_thread.daemon = True feeding_proc.start_all()
feeding_thread.start()
@suppress_complaints(verbose=self._verbose, notify=self._force_exit) @suppress_complaints(verbose=self._verbose, notify=self._force_exit)
def ordered_processing_task(sample_info_queue, sample_queue, out_order): def ordered_processing_task(sample_info_queue, sample_queue, out_order):
...@@ -301,12 +315,12 @@ class DataReader(object): ...@@ -301,12 +315,12 @@ class DataReader(object):
sample_info.feature_start, sample_info.feature_start,
sample_info.feature_size) sample_info.feature_size)
assert sample_info.feature_frame_num * sample_info.feature_dim * 4 \ assert sample_info.feature_frame_num \
== len(feature_bytes), \ * sample_info.feature_dim * 4 == len(feature_bytes), \
(sample_info.feature_bin_path, (sample_info.feature_bin_path,
sample_info.feature_frame_num, sample_info.feature_frame_num,
sample_info.feature_dim, sample_info.feature_dim,
len(feature_bytes)) len(feature_bytes))
label_bytes = read_bytes(sample_info.label_bin_path, label_bytes = read_bytes(sample_info.label_bin_path,
sample_info.label_start, sample_info.label_start,
...@@ -351,75 +365,71 @@ class DataReader(object): ...@@ -351,75 +365,71 @@ class DataReader(object):
out_order = self._manager.list([0]) out_order = self._manager.list([0])
args = (sample_info_queue, sample_queue, out_order) args = (sample_info_queue, sample_queue, out_order)
workers = [ sample_proc = DaemonProcessGroup(
Process( proc_num=self._sample_proc_num,
target=ordered_processing_task, args=args) target=ordered_processing_task,
for _ in xrange(self._process_num) args=args)
] sample_proc.start_all()
for w in workers: return sample_queue
w.daemon = True
w.start()
finished_process_num = 0 def batch_iterator(self, batch_size, minimum_batch_size):
@suppress_complaints(verbose=self._verbose, notify=self._force_exit)
while self._force_exit == False: def batch_assembling_task(sample_queue, batch_queue, pool):
try: def conv_to_shared(ndarray):
sample = sample_queue.get_nowait() while self._force_exit == False:
except Queue.Empty: try:
time.sleep(0.001) (name, shared_ndarray) = pool.popitem()
else: except Exception as e:
if isinstance(sample, EpochEndSignal): time.sleep(0.001)
finished_process_num += 1
if finished_process_num >= self._process_num:
break
else: else:
continue shared_ndarray.copy(ndarray)
return shared_ndarray
yield sample if self._verbose == 0:
signal.signal(signal.SIGTERM, suppress_signal)
def batch_iterator(self, batch_size, minimum_batch_size): signal.signal(signal.SIGINT, suppress_signal)
def batch_to_ndarray(batch_samples, lod):
assert len(batch_samples)
frame_dim = batch_samples[0][0].shape[1]
batch_feature = np.zeros((lod[-1], frame_dim), dtype="float32")
batch_label = np.zeros((lod[-1], 1), dtype="int64")
start = 0
for sample in batch_samples:
frame_num = sample[0].shape[0]
batch_feature[start:start + frame_num, :] = sample[0]
batch_label[start:start + frame_num, :] = sample[1]
start += frame_num
return (batch_feature, batch_label)
@suppress_complaints(verbose=self._verbose, notify=self._force_exit)
def batch_assembling_task(sample_generator, batch_queue):
batch_samples = [] batch_samples = []
lod = [0] lod = [0]
for sample in sample_generator(): done_num = 0
batch_samples.append(sample) while done_num < self._sample_proc_num:
lod.append(lod[-1] + sample[0].shape[0]) sample = sample_queue.get()
if len(batch_samples) == batch_size: if isinstance(sample, EpochEndSignal):
(batch_feature, batch_label) = batch_to_ndarray( done_num += 1
batch_samples, lod) else:
batch_queue.put((batch_feature, batch_label, lod)) batch_samples.append(sample)
batch_samples = [] lod.append(lod[-1] + sample[0].shape[0])
lod = [0] if len(batch_samples) == batch_size:
feature, label = batch_to_ndarray(batch_samples, lod)
feature = conv_to_shared(feature)
label = conv_to_shared(label)
lod = conv_to_shared(np.array(lod).astype('int64'))
batch_queue.put((feature, label, lod))
batch_samples = []
lod = [0]
if len(batch_samples) >= minimum_batch_size: if len(batch_samples) >= minimum_batch_size:
(batch_feature, batch_label) = batch_to_ndarray(batch_samples, (feature, label) = batch_to_ndarray(batch_samples, lod)
lod)
batch_queue.put((batch_feature, batch_label, lod)) feature = conv_to_shared(feature)
label = conv_to_shared(label)
lod = conv_to_shared(np.array(lod).astype('int64'))
batch_queue.put((feature, label, lod))
batch_queue.put(EpochEndSignal()) batch_queue.put(EpochEndSignal())
batch_queue = Queue.Queue(self._batch_buffer_size) sample_queue = self._start_async_processing()
batch_queue = self._manager.Queue(self._batch_buffer_size)
assembling_thread = Thread( assembling_proc = DaemonProcessGroup(
proc_num=1,
target=batch_assembling_task, target=batch_assembling_task,
args=(self._sample_generator, batch_queue)) args=(sample_queue, batch_queue, self._pool_manager.pool))
assembling_thread.daemon = True assembling_proc.start_all()
assembling_thread.start()
while self._force_exit == False: while self._force_exit == False:
try: try:
......
...@@ -4,6 +4,8 @@ from __future__ import print_function ...@@ -4,6 +4,8 @@ from __future__ import print_function
import sys import sys
from six import reraise from six import reraise
from tblib import Traceback from tblib import Traceback
from multiprocessing import Manager, Process
import posix_ipc, mmap
import numpy as np import numpy as np
...@@ -35,10 +37,118 @@ def lodtensor_to_ndarray(lod_tensor): ...@@ -35,10 +37,118 @@ def lodtensor_to_ndarray(lod_tensor):
return ret, lod_tensor.lod() return ret, lod_tensor.lod()
def batch_to_ndarray(batch_samples, lod):
frame_dim = batch_samples[0][0].shape[1]
batch_feature = np.zeros((lod[-1], frame_dim), dtype="float32")
batch_label = np.zeros((lod[-1], 1), dtype="int64")
start = 0
for sample in batch_samples:
frame_num = sample[0].shape[0]
batch_feature[start:start + frame_num, :] = sample[0]
batch_label[start:start + frame_num, :] = sample[1]
start += frame_num
return (batch_feature, batch_label)
class DaemonProcessGroup(object):
def __init__(self, proc_num, target, args):
self._proc_num = proc_num
self._workers = [
Process(
target=target, args=args) for _ in xrange(self._proc_num)
]
def start_all(self):
for w in self._workers:
w.daemon = True
w.start()
@property
def proc_num(self):
return self._proc_num
class EpochEndSignal(object):
pass
class CriticalException(Exception): class CriticalException(Exception):
pass pass
class SharedNDArray(object):
def __init__(self, name, is_verify=False):
self._name = name
self._shm = None
self._buf = None
self._array = np.zeros(1, dtype=np.float32)
self._inited = False
self._is_verify = is_verify
def zeros_like(self, shape, dtype):
size = int(np.prod(shape)) * np.dtype(dtype).itemsize
if self._inited:
self._shm = posix_ipc.SharedMemory(self._name)
else:
self._shm = posix_ipc.SharedMemory(
self._name, posix_ipc.O_CREAT, size=size)
self._buf = mmap.mmap(self._shm.fd, size)
self._array = np.ndarray(shape, dtype, self._buf, order='C')
def copy(self, ndarray):
size = int(np.prod(ndarray.shape)) * np.dtype(ndarray.dtype).itemsize
self.zeros_like(ndarray.shape, ndarray.dtype)
self._array[:] = ndarray
self._buf.flush()
self._inited = True
if self._is_verify:
shm = posix_ipc.SharedMemory(self._name)
buf = mmap.mmap(shm.fd, size)
array = np.ndarray(ndarray.shape, ndarray.dtype, buf, order='C')
np.testing.assert_array_equal(array, ndarray)
@property
def ndarray(self):
return self._array
def recycle(self, pool):
self._buf.close()
self._shm.close_fd()
self._inited = False
pool[self._name] = self
def __getstate__(self):
return (self._name, self._array.shape, self._array.dtype, self._inited,
self._is_verify)
def __setstate__(self, state):
self._name = state[0]
self._inited = state[3]
self.zeros_like(state[1], state[2])
self._is_verify = state[4]
class SharedMemoryPoolManager(object):
def __init__(self, pool_size, manager, name_prefix='/deep_asr'):
self._names = []
self._dict = manager.dict()
for i in xrange(pool_size):
name = name_prefix + '_' + str(i)
self._dict[name] = SharedNDArray(name)
self._names.append(name)
@property
def pool(self):
return self._dict
def __del__(self):
for name in self._names:
# have to unlink the shared memory
posix_ipc.unlink_shared_memory(name)
def suppress_signal(signo, stack_frame): def suppress_signal(signo, stack_frame):
pass pass
......
...@@ -13,7 +13,7 @@ import _init_paths ...@@ -13,7 +13,7 @@ import _init_paths
import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm
import data_utils.augmentor.trans_add_delta as trans_add_delta import data_utils.augmentor.trans_add_delta as trans_add_delta
import data_utils.augmentor.trans_splice as trans_splice import data_utils.augmentor.trans_splice as trans_splice
import data_utils.data_reader as reader import data_utils.async_data_reader as reader
from model_utils.model import stacked_lstmp_model from model_utils.model import stacked_lstmp_model
from data_utils.util import lodtensor_to_ndarray from data_utils.util import lodtensor_to_ndarray
...@@ -138,7 +138,7 @@ def profile(args): ...@@ -138,7 +138,7 @@ def profile(args):
trans_splice.TransSplice() trans_splice.TransSplice()
] ]
data_reader = reader.DataReader(args.feature_lst, args.label_lst) data_reader = reader.AsyncDataReader(args.feature_lst, args.label_lst)
data_reader.set_transformers(ltrans) data_reader.set_transformers(ltrans)
feature_t = fluid.LoDTensor() feature_t = fluid.LoDTensor()
...@@ -158,12 +158,14 @@ def profile(args): ...@@ -158,12 +158,14 @@ def profile(args):
frames_seen = 0 frames_seen = 0
# load_data # load_data
(features, labels, lod) = batch_data (features, labels, lod) = batch_data
feature_t.set(features, place) feature_t.set(features.ndarray, place)
feature_t.set_lod([lod]) feature_t.set_lod([lod.ndarray])
label_t.set(labels, place) label_t.set(labels.ndarray, place)
label_t.set_lod([lod]) label_t.set_lod([lod.ndarray])
frames_seen += lod[-1] frames_seen += lod.ndarray[-1]
data_reader.recycle(features, labels, lod)
outs = exe.run(fluid.default_main_program(), outs = exe.run(fluid.default_main_program(),
feed={"feature": feature_t, feed={"feature": feature_t,
......
...@@ -12,7 +12,7 @@ import paddle.fluid as fluid ...@@ -12,7 +12,7 @@ import paddle.fluid as fluid
import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm import data_utils.augmentor.trans_mean_variance_norm as trans_mean_variance_norm
import data_utils.augmentor.trans_add_delta as trans_add_delta import data_utils.augmentor.trans_add_delta as trans_add_delta
import data_utils.augmentor.trans_splice as trans_splice import data_utils.augmentor.trans_splice as trans_splice
import data_utils.data_reader as reader import data_utils.async_data_reader as reader
from data_utils.util import lodtensor_to_ndarray from data_utils.util import lodtensor_to_ndarray
from model_utils.model import stacked_lstmp_model from model_utils.model import stacked_lstmp_model
...@@ -175,8 +175,8 @@ def train(args): ...@@ -175,8 +175,8 @@ def train(args):
os.path.exists(args.val_label_lst)): os.path.exists(args.val_label_lst)):
return -1.0, -1.0 return -1.0, -1.0
# test data reader # test data reader
test_data_reader = reader.DataReader(args.val_feature_lst, test_data_reader = reader.AsyncDataReader(args.val_feature_lst,
args.val_label_lst) args.val_label_lst)
test_data_reader.set_transformers(ltrans) test_data_reader.set_transformers(ltrans)
test_costs, test_accs = [], [] test_costs, test_accs = [], []
for batch_id, batch_data in enumerate( for batch_id, batch_data in enumerate(
...@@ -184,10 +184,12 @@ def train(args): ...@@ -184,10 +184,12 @@ def train(args):
args.minimum_batch_size)): args.minimum_batch_size)):
# load_data # load_data
(features, labels, lod) = batch_data (features, labels, lod) = batch_data
feature_t.set(features, place) feature_t.set(features.ndarray, place)
feature_t.set_lod([lod]) feature_t.set_lod([lod.ndarray])
label_t.set(labels, place) label_t.set(labels.ndarray, place)
label_t.set_lod([lod]) label_t.set_lod([lod.ndarray])
test_data_reader.recycle(features, labels, lod)
cost, acc = exe.run(test_program, cost, acc = exe.run(test_program,
feed={"feature": feature_t, feed={"feature": feature_t,
...@@ -199,8 +201,8 @@ def train(args): ...@@ -199,8 +201,8 @@ def train(args):
return np.mean(test_costs), np.mean(test_accs) return np.mean(test_costs), np.mean(test_accs)
# train data reader # train data reader
train_data_reader = reader.DataReader(args.train_feature_lst, train_data_reader = reader.AsyncDataReader(args.train_feature_lst,
args.train_label_lst, -1) args.train_label_lst, -1)
train_data_reader.set_transformers(ltrans) train_data_reader.set_transformers(ltrans)
# train # train
for pass_id in xrange(args.pass_num): for pass_id in xrange(args.pass_num):
...@@ -210,10 +212,12 @@ def train(args): ...@@ -210,10 +212,12 @@ def train(args):
args.minimum_batch_size)): args.minimum_batch_size)):
# load_data # load_data
(features, labels, lod) = batch_data (features, labels, lod) = batch_data
feature_t.set(features, place) feature_t.set(features.ndarray, place)
feature_t.set_lod([lod]) feature_t.set_lod([lod.ndarray])
label_t.set(labels, place) label_t.set(labels.ndarray, place)
label_t.set_lod([lod]) label_t.set_lod([lod.ndarray])
train_data_reader.recycle(features, labels, lod)
to_print = batch_id > 0 and (batch_id % args.print_per_batches == 0) to_print = batch_id > 0 and (batch_id % args.print_per_batches == 0)
outs = exe.run(fluid.default_main_program(), outs = exe.run(fluid.default_main_program(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册