Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
DeepSpeech
提交
64ab19c1
D
DeepSpeech
项目概览
PaddlePaddle
/
DeepSpeech
1 年多 前同步成功
通知
207
Star
8425
Fork
1598
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
245
列表
看板
标记
里程碑
合并请求
3
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
DeepSpeech
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
245
Issue
245
列表
看板
标记
里程碑
合并请求
3
合并请求
3
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
64ab19c1
编写于
10月 07, 2017
作者:
X
Xinghai Sun
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add multiprocess version of xmap_reader to speedup training.
Add seqbin data parser to adapt to internal 1w data training.
上级
37a29bf1
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
188 addition
and
30 deletion
+188
-30
data_utils/audio.py
data_utils/audio.py
+42
-0
data_utils/data.py
data_utils/data.py
+30
-26
data_utils/speech.py
data_utils/speech.py
+18
-4
data_utils/utility.py
data_utils/utility.py
+98
-0
未找到文件。
data_utils/audio.py
浏览文件 @
64ab19c1
...
...
@@ -5,6 +5,8 @@ from __future__ import print_function
import
numpy
as
np
import
io
import
struct
import
re
import
soundfile
import
resampy
from
scipy
import
signal
...
...
@@ -114,6 +116,46 @@ class AudioSegment(object):
data
=
sndfile
.
read
(
frames
=
end_frame
-
start_frame
,
dtype
=
'float32'
)
return
cls
(
data
,
sample_rate
)
@
classmethod
def
from_sequence_file
(
cls
,
filepath
):
"""Create audio segment from sequence file.
:param filepath: Filepath of sequence file.
:type filepath: basestring
:return: Audio segment instance.
:rtype: AudioSegment
"""
# parse filepath
matches
=
re
.
match
(
r
"(.+\.seqbin)_(\d+)"
,
filepath
)
if
matches
is
None
:
raise
IOError
(
"File type of %s is not supported"
%
filepath
)
filename
=
matches
.
group
(
1
)
fileno
=
int
(
matches
.
group
(
2
))
# read headers
f
=
open
(
filename
,
'rb'
)
version
=
f
.
read
(
4
)
num_utterances
=
struct
.
unpack
(
"i"
,
f
.
read
(
4
))[
0
]
bytes_per_header
=
struct
.
unpack
(
"i"
,
f
.
read
(
4
))[
0
]
header_bytes
=
f
.
read
(
bytes_per_header
*
(
num_utterances
+
1
))
header
=
[
struct
.
unpack
(
"i"
,
header_bytes
[
bytes_per_header
*
i
:
bytes_per_header
*
(
i
+
1
)])[
0
]
for
i
in
range
(
num_utterances
+
1
)
]
# read audio bytes
f
.
seek
(
header
[
fileno
-
1
])
audio_bytes
=
f
.
read
(
header
[
fileno
]
-
header
[
fileno
-
1
])
f
.
close
()
# create audio segment
try
:
return
cls
.
from_bytes
(
audio_bytes
)
except
Exception
as
e
:
samples
=
np
.
frombuffer
(
audio_bytes
,
dtype
=
'int16'
)
return
cls
(
samples
=
samples
,
sample_rate
=
8000
)
@
classmethod
def
from_bytes
(
cls
,
bytes
):
"""Create audio segment from a byte string containing audio samples.
...
...
data_utils/data.py
浏览文件 @
64ab19c1
...
...
@@ -7,11 +7,13 @@ from __future__ import print_function
import
random
import
tarfile
import
re
import
multiprocessing
import
numpy
as
np
import
paddle.v2
as
paddle
from
threading
import
local
from
data_utils.utility
import
read_manifest
from
data_utils.utility
import
xmap_readers_mp
from
data_utils.augmentor.augmentation
import
AugmentationPipeline
from
data_utils.featurizer.speech_featurizer
import
SpeechFeaturizer
from
data_utils.speech
import
SpeechSegment
...
...
@@ -100,7 +102,14 @@ class DataGenerator(object):
transcription.
:rtype: tuple of (2darray, list)
"""
speech_segment
=
SpeechSegment
.
from_file
(
filename
,
transcript
)
if
filename
.
startswith
(
'tar:'
):
speech_segment
=
SpeechSegment
.
from_file
(
self
.
_subfile_from_tar
(
filename
),
transcript
)
elif
re
.
findall
(
r
".seqbin_\d+$"
,
filename
):
speech_segment
=
SpeechSegment
.
from_sequence_file
(
filename
,
transcript
)
else
:
speech_segment
=
SpeechSegment
.
from_file
(
filename
,
transcript
)
self
.
_augmentation_pipeline
.
transform_audio
(
speech_segment
)
specgram
,
text_ids
=
self
.
_speech_featurizer
.
featurize
(
speech_segment
)
specgram
=
self
.
_normalizer
.
apply
(
specgram
)
...
...
@@ -231,27 +240,23 @@ class DataGenerator(object):
result
[
tarinfo
.
name
]
=
tarinfo
return
f
,
result
def
_
get_file_object
(
self
,
file
):
"""Get
file object by file path
.
def
_
subfile_from_tar
(
self
,
file
):
"""Get
subfile object from tar
.
I
f file startwith tar, it will return a tar file object
I
t will return a subfile object from tar file
and cached tar file info for next reading request.
It will return file directly, if the type of file is not str.
"""
if
file
.
startswith
(
'tar:'
):
tarpath
,
filename
=
file
.
split
(
':'
,
1
)[
1
].
split
(
'#'
,
1
)
if
'tar2info'
not
in
self
.
_local_data
.
__dict__
:
self
.
_local_data
.
tar2info
=
{}
if
'tar2object'
not
in
self
.
_local_data
.
__dict__
:
self
.
_local_data
.
tar2object
=
{}
if
tarpath
not
in
self
.
_local_data
.
tar2info
:
object
,
infoes
=
self
.
_parse_tar
(
tarpath
)
self
.
_local_data
.
tar2info
[
tarpath
]
=
infoes
self
.
_local_data
.
tar2object
[
tarpath
]
=
object
return
self
.
_local_data
.
tar2object
[
tarpath
].
extractfile
(
self
.
_local_data
.
tar2info
[
tarpath
][
filename
])
else
:
return
open
(
file
,
'r'
)
tarpath
,
filename
=
file
.
split
(
':'
,
1
)[
1
].
split
(
'#'
,
1
)
if
'tar2info'
not
in
self
.
_local_data
.
__dict__
:
self
.
_local_data
.
tar2info
=
{}
if
'tar2object'
not
in
self
.
_local_data
.
__dict__
:
self
.
_local_data
.
tar2object
=
{}
if
tarpath
not
in
self
.
_local_data
.
tar2info
:
object
,
infoes
=
self
.
_parse_tar
(
tarpath
)
self
.
_local_data
.
tar2info
[
tarpath
]
=
infoes
self
.
_local_data
.
tar2object
[
tarpath
]
=
object
return
self
.
_local_data
.
tar2object
[
tarpath
].
extractfile
(
self
.
_local_data
.
tar2info
[
tarpath
][
filename
])
def
_instance_reader_creator
(
self
,
manifest
):
"""
...
...
@@ -266,13 +271,12 @@ class DataGenerator(object):
for
instance
in
manifest
:
yield
instance
def
mapper
(
instance
):
return
self
.
process_utterance
(
self
.
_get_file_object
(
instance
[
"audio_filepath"
]),
instance
[
"text"
])
return
paddle
.
reader
.
xmap_readers
(
mapper
,
reader
,
self
.
_num_threads
,
1024
,
order
=
True
)
return
xmap_readers_mp
(
lambda
instance
:
self
.
process_utterance
(
instance
[
"audio_filepath"
],
instance
[
"text"
]),
reader
,
self
.
_num_threads
,
4096
,
order
=
True
)
def
_padding_batch
(
self
,
batch
,
padding_to
=-
1
,
flatten
=
False
):
"""
...
...
data_utils/speech.py
浏览文件 @
64ab19c1
...
...
@@ -44,12 +44,26 @@ class SpeechSegment(AudioSegment):
:type filepath: basestring|file
:param transcript: Transcript text for the speech.
:type transript: basestring
:return:
Audio
segment instance.
:rtype:
Audio
Segment
:return:
Speech
segment instance.
:rtype:
Speech
Segment
"""
audio
=
AudioSegment
.
from_file
(
filepath
)
return
cls
(
audio
.
samples
,
audio
.
sample_rate
,
transcript
)
@
classmethod
def
from_sequence_file
(
cls
,
filepath
,
transcript
):
"""Create speech segment from sequence file and transcript.
:param filepath: Filepath of sequence file.
:type filepath: basestring
:param transcript: Transcript text for the speech.
:type transript: basestring
:return: Speech segment instance.
:rtype: SpeechSegment
"""
audio
=
AudioSegment
.
from_sequence_file
(
filepath
)
return
cls
(
audio
.
samples
,
audio
.
sample_rate
,
transcript
)
@
classmethod
def
from_bytes
(
cls
,
bytes
,
transcript
):
"""Create speech segment from a byte string and corresponding
...
...
@@ -59,8 +73,8 @@ class SpeechSegment(AudioSegment):
:type bytes: str
:param transcript: Transcript text for the speech.
:type transript: basestring
:return:
Audio
segment instance.
:rtype:
Audio
Segment
:return:
Speech
segment instance.
:rtype:
Speech
Segment
"""
audio
=
AudioSegment
.
from_bytes
(
bytes
)
return
cls
(
audio
.
samples
,
audio
.
sample_rate
,
transcript
)
...
...
data_utils/utility.py
浏览文件 @
64ab19c1
...
...
@@ -7,6 +7,9 @@ import json
import
codecs
import
os
import
tarfile
import
time
from
Queue
import
Queue
from
multiprocessing
import
Process
,
Manager
from
paddle.v2.dataset.common
import
md5file
...
...
@@ -61,3 +64,98 @@ def unpack(filepath, target_dir, rm_tar=False):
tar
.
close
()
if
rm_tar
==
True
:
os
.
remove
(
filepath
)
class
XmapEndSignal
():
pass
def
xmap_readers_mp
(
mapper
,
reader
,
process_num
,
buffer_size
,
order
=
False
):
"""A multiprocessing pipeline wrapper for the data reader.
:param mapper: Function to map sample.
:type mapper: callable
:param reader: Given data reader.
:type reader: callable
:param process_num: Number of processes in the pipeline
:type process_num: int
:param buffer_size: Maximal buffer size.
:type buffer_size: int
:param order: Reserve the order of samples from the given reader.
:type order: bool
:return: The wrappered reader
:rtype: callable
"""
end_flag
=
XmapEndSignal
()
# define a worker to read samples from reader to in_queue
def
read_worker
(
reader
,
in_queue
):
for
sample
in
reader
():
in_queue
.
put
(
sample
)
in_queue
.
put
(
end_flag
)
# define a worker to read samples from reader to in_queue with order flag
def
order_read_worker
(
reader
,
in_queue
):
for
order_id
,
sample
in
enumerate
(
reader
()):
in_queue
.
put
((
order_id
,
sample
))
in_queue
.
put
(
end_flag
)
# define a worker to handle samples from in_queue by mapper and put results to out_queue
def
handle_worker
(
in_queue
,
out_queue
,
mapper
):
sample
=
in_queue
.
get
()
while
not
isinstance
(
sample
,
XmapEndSignal
):
out_queue
.
put
(
mapper
(
sample
))
sample
=
in_queue
.
get
()
in_queue
.
put
(
end_flag
)
out_queue
.
put
(
end_flag
)
# define a worker to handle samples from in_queue by mapper and put results to out_queue with order
def
order_handle_worker
(
in_queue
,
out_queue
,
mapper
,
out_order
):
ins
=
in_queue
.
get
()
while
not
isinstance
(
ins
,
XmapEndSignal
):
order_id
,
sample
=
ins
result
=
mapper
(
sample
)
while
order_id
!=
out_order
[
0
]:
time
.
sleep
(
0.001
)
out_queue
.
put
(
result
)
out_order
[
0
]
+=
1
ins
=
in_queue
.
get
()
in_queue
.
put
(
end_flag
)
out_queue
.
put
(
end_flag
)
def
xreader
():
# prepare shared memory
manager
=
Manager
()
in_queue
=
manager
.
Queue
(
buffer_size
)
out_queue
=
manager
.
Queue
(
buffer_size
)
out_order
=
manager
.
list
([
0
])
# start a read worker in a process
target
=
order_read_worker
if
order
else
read_worker
p
=
Process
(
target
=
target
,
args
=
(
reader
,
in_queue
))
p
.
start
()
# start handle_workers with multiple processes
target
=
order_handle_worker
if
order
else
handle_worker
args
=
(
in_queue
,
out_queue
,
mapper
,
out_order
)
if
order
else
(
in_queue
,
out_queue
,
mapper
)
workers
=
[
Process
(
target
=
target
,
args
=
args
)
for
_
in
xrange
(
process_num
)
]
for
w
in
workers
:
w
.
start
()
# get results
sample
=
out_queue
.
get
()
while
not
isinstance
(
sample
,
XmapEndSignal
):
yield
sample
sample
=
out_queue
.
get
()
finish
=
1
while
finish
<
process_num
:
sample
=
out_queue
.
get
()
if
isinstance
(
sample
,
XmapEndSignal
):
finish
+=
1
else
:
yield
sample
return
xreader
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录