提交 7cfdbe03 编写于 作者: Y Yang Zhou

del audio tests

上级 13ee17cd
# 1. Prepare
First, install `pytest-benchmark` via pip.
```sh
pip install pytest-benchmark
```
# 2. Run
Run the specific script for profiling.
```sh
pytest melspectrogram.py
```
Result:
```sh
========================================================================== test session starts ==========================================================================
platform linux -- Python 3.7.7, pytest-7.0.1, pluggy-1.0.0
benchmark: 3.4.1 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
plugins: typeguard-2.12.1, benchmark-3.4.1, anyio-3.5.0
collected 4 items
melspectrogram.py .... [100%]
-------------------------------------------------------------------------------------------------- benchmark: 4 tests -------------------------------------------------------------------------------------------------
Name (time in us) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_melspect_gpu_torchaudio 202.0765 (1.0) 360.6230 (1.0) 218.1168 (1.0) 16.3022 (1.0) 214.2871 (1.0) 21.8451 (1.0) 40;3 4,584.7001 (1.0) 286 1
test_melspect_gpu 657.8509 (3.26) 908.0470 (2.52) 724.2545 (3.32) 106.5771 (6.54) 669.9096 (3.13) 113.4719 (5.19) 1;0 1,380.7300 (0.30) 5 1
test_melspect_cpu_torchaudio 1,247.6053 (6.17) 2,892.5799 (8.02) 1,443.2853 (6.62) 345.3732 (21.19) 1,262.7263 (5.89) 221.6385 (10.15) 56;53 692.8637 (0.15) 399 1
test_melspect_cpu 20,326.2549 (100.59) 20,607.8682 (57.15) 20,473.4125 (93.86) 63.8654 (3.92) 20,467.0429 (95.51) 68.4294 (3.13) 8;1 48.8438 (0.01) 29 1
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Legend:
Outliers: 1 Standard Deviation from Mean; 1.5 IQR (InterQuartile Range) from 1st Quartile and 3rd Quartile.
OPS: Operations Per Second, computed as 1 / Mean
========================================================================== 4 passed in 21.12s ===========================================================================
```
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import urllib.request
import librosa
import numpy as np
import paddle
import torch
import torchaudio
import paddlespeech.audio
wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav'
if not os.path.isfile(os.path.basename(wav_url)):
urllib.request.urlretrieve(wav_url, os.path.basename(wav_url))
waveform, sr = paddlespeech.audio.load(
os.path.abspath(os.path.basename(wav_url)))
waveform_tensor = paddle.to_tensor(waveform).unsqueeze(0)
waveform_tensor_torch = torch.from_numpy(waveform).unsqueeze(0)
# Feature conf
mel_conf = {
'sr': sr,
'n_fft': 512,
'hop_length': 128,
'n_mels': 40,
}
mel_conf_torchaudio = {
'sample_rate': sr,
'n_fft': 512,
'hop_length': 128,
'n_mels': 40,
'norm': 'slaney',
'mel_scale': 'slaney',
}
def enable_cpu_device():
paddle.set_device('cpu')
def enable_gpu_device():
paddle.set_device('gpu')
log_mel_extractor = paddlespeech.audio.features.LogMelSpectrogram(
**mel_conf, f_min=0.0, top_db=80.0, dtype=waveform_tensor.dtype)
def log_melspectrogram():
return log_mel_extractor(waveform_tensor).squeeze(0)
def test_log_melspect_cpu(benchmark):
enable_cpu_device()
feature_audio = benchmark(log_melspectrogram)
feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf)
feature_librosa = librosa.power_to_db(feature_librosa, top_db=80.0)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=3)
def test_log_melspect_gpu(benchmark):
enable_gpu_device()
feature_audio = benchmark(log_melspectrogram)
feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf)
feature_librosa = librosa.power_to_db(feature_librosa, top_db=80.0)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=2)
mel_extractor_torchaudio = torchaudio.transforms.MelSpectrogram(
**mel_conf_torchaudio, f_min=0.0)
amplitude_to_DB = torchaudio.transforms.AmplitudeToDB('power', top_db=80.0)
def melspectrogram_torchaudio():
return mel_extractor_torchaudio(waveform_tensor_torch).squeeze(0)
def log_melspectrogram_torchaudio():
mel_specgram = mel_extractor_torchaudio(waveform_tensor_torch)
return amplitude_to_DB(mel_specgram).squeeze(0)
def test_log_melspect_cpu_torchaudio(benchmark):
global waveform_tensor_torch, mel_extractor_torchaudio, amplitude_to_DB
mel_extractor_torchaudio = mel_extractor_torchaudio.to('cpu')
waveform_tensor_torch = waveform_tensor_torch.to('cpu')
amplitude_to_DB = amplitude_to_DB.to('cpu')
feature_audio = benchmark(log_melspectrogram_torchaudio)
feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf)
feature_librosa = librosa.power_to_db(feature_librosa, top_db=80.0)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=3)
def test_log_melspect_gpu_torchaudio(benchmark):
global waveform_tensor_torch, mel_extractor_torchaudio, amplitude_to_DB
mel_extractor_torchaudio = mel_extractor_torchaudio.to('cuda')
waveform_tensor_torch = waveform_tensor_torch.to('cuda')
amplitude_to_DB = amplitude_to_DB.to('cuda')
feature_torchaudio = benchmark(log_melspectrogram_torchaudio)
feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf)
feature_librosa = librosa.power_to_db(feature_librosa, top_db=80.0)
np.testing.assert_array_almost_equal(
feature_librosa, feature_torchaudio.cpu(), decimal=2)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import urllib.request
import librosa
import numpy as np
import paddle
import torch
import torchaudio
import paddlespeech.audio
wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav'
if not os.path.isfile(os.path.basename(wav_url)):
urllib.request.urlretrieve(wav_url, os.path.basename(wav_url))
waveform, sr = paddlespeech.audio.load(
os.path.abspath(os.path.basename(wav_url)))
waveform_tensor = paddle.to_tensor(waveform).unsqueeze(0)
waveform_tensor_torch = torch.from_numpy(waveform).unsqueeze(0)
# Feature conf
mel_conf = {
'sr': sr,
'n_fft': 512,
'hop_length': 128,
'n_mels': 40,
}
mel_conf_torchaudio = {
'sample_rate': sr,
'n_fft': 512,
'hop_length': 128,
'n_mels': 40,
'norm': 'slaney',
'mel_scale': 'slaney',
}
def enable_cpu_device():
paddle.set_device('cpu')
def enable_gpu_device():
paddle.set_device('gpu')
mel_extractor = paddlespeech.audio.features.MelSpectrogram(
**mel_conf, f_min=0.0, dtype=waveform_tensor.dtype)
def melspectrogram():
return mel_extractor(waveform_tensor).squeeze(0)
def test_melspect_cpu(benchmark):
enable_cpu_device()
feature_audio = benchmark(melspectrogram)
feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=3)
def test_melspect_gpu(benchmark):
enable_gpu_device()
feature_audio = benchmark(melspectrogram)
feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=3)
mel_extractor_torchaudio = torchaudio.transforms.MelSpectrogram(
**mel_conf_torchaudio, f_min=0.0)
def melspectrogram_torchaudio():
return mel_extractor_torchaudio(waveform_tensor_torch).squeeze(0)
def test_melspect_cpu_torchaudio(benchmark):
global waveform_tensor_torch, mel_extractor_torchaudio
mel_extractor_torchaudio = mel_extractor_torchaudio.to('cpu')
waveform_tensor_torch = waveform_tensor_torch.to('cpu')
feature_audio = benchmark(melspectrogram_torchaudio)
feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=3)
def test_melspect_gpu_torchaudio(benchmark):
global waveform_tensor_torch, mel_extractor_torchaudio
mel_extractor_torchaudio = mel_extractor_torchaudio.to('cuda')
waveform_tensor_torch = waveform_tensor_torch.to('cuda')
feature_torchaudio = benchmark(melspectrogram_torchaudio)
feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf)
np.testing.assert_array_almost_equal(
feature_librosa, feature_torchaudio.cpu(), decimal=3)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import urllib.request
import librosa
import numpy as np
import paddle
import torch
import torchaudio
import paddlespeech.audio
wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav'
if not os.path.isfile(os.path.basename(wav_url)):
urllib.request.urlretrieve(wav_url, os.path.basename(wav_url))
waveform, sr = paddlespeech.audio.load(
os.path.abspath(os.path.basename(wav_url)))
waveform_tensor = paddle.to_tensor(waveform).unsqueeze(0)
waveform_tensor_torch = torch.from_numpy(waveform).unsqueeze(0)
# Feature conf
mel_conf = {
'sr': sr,
'n_fft': 512,
'hop_length': 128,
'n_mels': 40,
}
mfcc_conf = {
'n_mfcc': 20,
'top_db': 80.0,
}
mfcc_conf.update(mel_conf)
mel_conf_torchaudio = {
'sample_rate': sr,
'n_fft': 512,
'hop_length': 128,
'n_mels': 40,
'norm': 'slaney',
'mel_scale': 'slaney',
}
mfcc_conf_torchaudio = {
'sample_rate': sr,
'n_mfcc': 20,
}
def enable_cpu_device():
paddle.set_device('cpu')
def enable_gpu_device():
paddle.set_device('gpu')
mfcc_extractor = paddlespeech.audio.features.MFCC(
**mfcc_conf, f_min=0.0, dtype=waveform_tensor.dtype)
def mfcc():
return mfcc_extractor(waveform_tensor).squeeze(0)
def test_mfcc_cpu(benchmark):
enable_cpu_device()
feature_audio = benchmark(mfcc)
feature_librosa = librosa.feature.mfcc(waveform, **mel_conf)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=3)
def test_mfcc_gpu(benchmark):
enable_gpu_device()
feature_audio = benchmark(mfcc)
feature_librosa = librosa.feature.mfcc(waveform, **mel_conf)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=3)
del mel_conf_torchaudio['sample_rate']
mfcc_extractor_torchaudio = torchaudio.transforms.MFCC(
**mfcc_conf_torchaudio, melkwargs=mel_conf_torchaudio)
def mfcc_torchaudio():
return mfcc_extractor_torchaudio(waveform_tensor_torch).squeeze(0)
def test_mfcc_cpu_torchaudio(benchmark):
global waveform_tensor_torch, mfcc_extractor_torchaudio
mel_extractor_torchaudio = mfcc_extractor_torchaudio.to('cpu')
waveform_tensor_torch = waveform_tensor_torch.to('cpu')
feature_audio = benchmark(mfcc_torchaudio)
feature_librosa = librosa.feature.mfcc(waveform, **mel_conf)
np.testing.assert_array_almost_equal(
feature_librosa, feature_audio, decimal=3)
def test_mfcc_gpu_torchaudio(benchmark):
global waveform_tensor_torch, mfcc_extractor_torchaudio
mel_extractor_torchaudio = mfcc_extractor_torchaudio.to('cuda')
waveform_tensor_torch = waveform_tensor_torch.to('cuda')
feature_torchaudio = benchmark(mfcc_torchaudio)
feature_librosa = librosa.feature.mfcc(waveform, **mel_conf)
np.testing.assert_array_almost_equal(
feature_librosa, feature_torchaudio.cpu(), decimal=3)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import urllib.request
mono_channel_wav = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav'
multi_channels_wav = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/cat.wav'
class BackendTest(unittest.TestCase):
def setUp(self):
self.initWavInput()
def initWavInput(self):
self.files = []
for url in [mono_channel_wav, multi_channels_wav]:
if not os.path.isfile(os.path.basename(url)):
urllib.request.urlretrieve(url, os.path.basename(url))
self.files.append(os.path.basename(url))
def initParmas(self):
raise NotImplementedError
def get_encoding(ext, dtype):
exts = {
"mp3",
"flac",
"vorbis",
}
encodings = {
"float32": "PCM_F",
"int32": "PCM_S",
"int16": "PCM_S",
"uint8": "PCM_U",
}
return ext.upper() if ext in exts else encodings[dtype]
def get_bit_depth(dtype):
bit_depths = {
"float32": 32,
"int32": 32,
"int16": 16,
"uint8": 8,
}
return bit_depths[dtype]
def get_bits_per_sample(ext, dtype):
bits_per_samples = {
"flac": 24,
"mp3": 0,
"vorbis": 0,
}
return bits_per_samples.get(ext, get_bit_depth(dtype))
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from unittest import skipIf
from parameterized import parameterized
from paddlespeech.audio._internal.module_utils import is_module_available
def name_func(func, _, params):
return f'{func.__name__}_{"_".join(str(arg) for arg in params.args)}'
def dtype2subtype(dtype):
return {
"float64": "DOUBLE",
"float32": "FLOAT",
"int32": "PCM_32",
"int16": "PCM_16",
"uint8": "PCM_U8",
"int8": "PCM_S8",
}[dtype]
def skipIfFormatNotSupported(fmt):
fmts = []
if is_module_available("soundfile"):
import soundfile
fmts = soundfile.available_formats()
return skipIf(fmt not in fmts, f'"{fmt}" is not supported by soundfile')
return skipIf(True, '"soundfile" not available.')
def parameterize(*params):
return parameterized.expand(list(itertools.product(*params)), name_func=name_func)
def fetch_wav_subtype(dtype, encoding, bits_per_sample):
subtype = {
(None, None): dtype2subtype(dtype),
(None, 8): "PCM_U8",
("PCM_U", None): "PCM_U8",
("PCM_U", 8): "PCM_U8",
("PCM_S", None): "PCM_32",
("PCM_S", 16): "PCM_16",
("PCM_S", 32): "PCM_32",
("PCM_F", None): "FLOAT",
("PCM_F", 32): "FLOAT",
("PCM_F", 64): "DOUBLE",
("ULAW", None): "ULAW",
("ULAW", 8): "ULAW",
("ALAW", None): "ALAW",
("ALAW", 8): "ALAW",
}.get((encoding, bits_per_sample))
if subtype:
return subtype
raise ValueError(f"wav does not support ({encoding}, {bits_per_sample}).")
#this code is from: https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/backend/soundfile/info_test.py
import tarfile
import warnings
import unittest
from unittest.mock import patch
import paddle
from paddlespeech.audio._internal import module_utils as _mod_utils
from paddlespeech.audio.backends import soundfile_backend
from tests.unit.audio.backends.common import get_bits_per_sample, get_encoding
from tests.unit.common_utils import (
get_wav_data,
nested_params,
save_wav,
TempDirMixin,
)
from common import parameterize, skipIfFormatNotSupported
import soundfile
class TestInfo(TempDirMixin, unittest.TestCase):
@parameterize(
["float32", "int32"],
[8000, 16000],
[1, 2],
)
def test_wav(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.info` can check wav file correctly"""
duration = 1
path = self.get_temp_path("data.wav")
data = get_wav_data(dtype, num_channels, normalize=False, num_frames=duration * sample_rate)
save_wav(path, data, sample_rate)
info = soundfile_backend.info(path)
assert info.sample_rate == sample_rate
assert info.num_frames == sample_rate * duration
assert info.num_channels == num_channels
assert info.bits_per_sample == get_bits_per_sample("wav", dtype)
assert info.encoding == get_encoding("wav", dtype)
@parameterize([8000, 16000], [1, 2])
@skipIfFormatNotSupported("FLAC")
def test_flac(self, sample_rate, num_channels):
"""`soundfile_backend.info` can check flac file correctly"""
duration = 1
num_frames = sample_rate * duration
#data = torch.randn(num_frames, num_channels).numpy()
data = paddle.randn(shape=[num_frames, num_channels]).numpy()
path = self.get_temp_path("data.flac")
soundfile.write(path, data, sample_rate)
info = soundfile_backend.info(path)
assert info.sample_rate == sample_rate
assert info.num_frames == num_frames
assert info.num_channels == num_channels
assert info.bits_per_sample == 16
assert info.encoding == "FLAC"
#@parameterize([8000, 16000], [1, 2])
#@skipIfFormatNotSupported("OGG")
#def test_ogg(self, sample_rate, num_channels):
#"""`soundfile_backend.info` can check ogg file correctly"""
#duration = 1
#num_frames = sample_rate * duration
##data = torch.randn(num_frames, num_channels).numpy()
#data = paddle.randn(shape=[num_frames, num_channels]).numpy()
#print(len(data))
#path = self.get_temp_path("data.ogg")
#soundfile.write(path, data, sample_rate)
#info = soundfile_backend.info(path)
#print(info)
#assert info.sample_rate == sample_rate
#print("info")
#print(info.num_frames)
#print("jiji")
#print(sample_rate*duration)
##assert info.num_frames == sample_rate * duration
#assert info.num_channels == num_channels
#assert info.bits_per_sample == 0
#assert info.encoding == "VORBIS"
@nested_params(
[8000, 16000],
[1, 2],
[("PCM_24", 24), ("PCM_32", 32)],
)
@skipIfFormatNotSupported("NIST")
def test_sphere(self, sample_rate, num_channels, subtype_and_bit_depth):
"""`soundfile_backend.info` can check sph file correctly"""
duration = 1
num_frames = sample_rate * duration
#data = torch.randn(num_frames, num_channels).numpy()
data = paddle.randn(shape=[num_frames, num_channels]).numpy()
path = self.get_temp_path("data.nist")
subtype, bits_per_sample = subtype_and_bit_depth
soundfile.write(path, data, sample_rate, subtype=subtype)
info = soundfile_backend.info(path)
assert info.sample_rate == sample_rate
assert info.num_frames == sample_rate * duration
assert info.num_channels == num_channels
assert info.bits_per_sample == bits_per_sample
assert info.encoding == "PCM_S"
def test_unknown_subtype_warning(self):
"""soundfile_backend.info issues a warning when the subtype is unknown
This will happen if a new subtype is supported in SoundFile: the _SUBTYPE_TO_BITS_PER_SAMPLE
dict should be updated.
"""
def _mock_info_func(_):
class MockSoundFileInfo:
samplerate = 8000
frames = 356
channels = 2
subtype = "UNSEEN_SUBTYPE"
format = "UNKNOWN"
return MockSoundFileInfo()
with patch("soundfile.info", _mock_info_func):
with warnings.catch_warnings(record=True) as w:
info = soundfile_backend.info("foo")
assert len(w) == 1
assert "UNSEEN_SUBTYPE subtype is unknown to PaddleAudio" in str(w[-1].message)
assert info.bits_per_sample == 0
class TestFileObject(TempDirMixin, unittest.TestCase):
def _test_fileobj(self, ext, subtype, bits_per_sample):
"""Query audio via file-like object works"""
duration = 2
sample_rate = 16000
num_channels = 2
num_frames = sample_rate * duration
path = self.get_temp_path(f"test.{ext}")
#data = torch.randn(num_frames, num_channels).numpy()
data = paddle.randn(shape=[num_frames, num_channels]).numpy()
soundfile.write(path, data, sample_rate, subtype=subtype)
with open(path, "rb") as fileobj:
info = soundfile_backend.info(fileobj)
assert info.sample_rate == sample_rate
assert info.num_frames == num_frames
assert info.num_channels == num_channels
assert info.bits_per_sample == bits_per_sample
assert info.encoding == "FLAC" if ext == "flac" else "PCM_S"
def test_fileobj_wav(self):
"""Loading audio via file-like object works"""
self._test_fileobj("wav", "PCM_16", 16)
@skipIfFormatNotSupported("FLAC")
def test_fileobj_flac(self):
"""Loading audio via file-like object works"""
self._test_fileobj("flac", "PCM_16", 16)
def _test_tarobj(self, ext, subtype, bits_per_sample):
"""Query compressed audio via file-like object works"""
duration = 2
sample_rate = 16000
num_channels = 2
num_frames = sample_rate * duration
audio_file = f"test.{ext}"
audio_path = self.get_temp_path(audio_file)
archive_path = self.get_temp_path("archive.tar.gz")
#data = torch.randn(num_frames, num_channels).numpy()
data = paddle.randn(shape=[num_frames, num_channels]).numpy()
soundfile.write(audio_path, data, sample_rate, subtype=subtype)
with tarfile.TarFile(archive_path, "w") as tarobj:
tarobj.add(audio_path, arcname=audio_file)
with tarfile.TarFile(archive_path, "r") as tarobj:
fileobj = tarobj.extractfile(audio_file)
info = soundfile_backend.info(fileobj)
assert info.sample_rate == sample_rate
assert info.num_frames == num_frames
assert info.num_channels == num_channels
assert info.bits_per_sample == bits_per_sample
assert info.encoding == "FLAC" if ext == "flac" else "PCM_S"
def test_tarobj_wav(self):
"""Query compressed audio via file-like object works"""
self._test_tarobj("wav", "PCM_16", 16)
@skipIfFormatNotSupported("FLAC")
def test_tarobj_flac(self):
"""Query compressed audio via file-like object works"""
self._test_tarobj("flac", "PCM_16", 16)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
#this code is from: https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/backend/soundfile/load_test.py
import os
import tarfile
import unittest
from unittest.mock import patch
import numpy as np
from parameterized import parameterized
import paddle
from paddlespeech.audio._internal import module_utils as _mod_utils
from paddlespeech.audio.backends import soundfile_backend
from tests.unit.audio.backends.common import get_bits_per_sample, get_encoding
from tests.unit.common_utils import (
get_wav_data,
load_wav,
nested_params,
normalize_wav,
save_wav,
TempDirMixin,
)
from common import dtype2subtype, parameterize, skipIfFormatNotSupported
import soundfile
def _get_mock_path(
ext: str,
dtype: str,
sample_rate: int,
num_channels: int,
num_frames: int,
):
return f"{dtype}_{sample_rate}_{num_channels}_{num_frames}.{ext}"
def _get_mock_params(path: str):
filename, ext = path.split(".")
parts = filename.split("_")
return {
"ext": ext,
"dtype": parts[0],
"sample_rate": int(parts[1]),
"num_channels": int(parts[2]),
"num_frames": int(parts[3]),
}
class SoundFileMock:
def __init__(self, path, mode):
assert mode == "r"
self.path = path
self._params = _get_mock_params(path)
self._start = None
@property
def samplerate(self):
return self._params["sample_rate"]
@property
def format(self):
if self._params["ext"] == "wav":
return "WAV"
if self._params["ext"] == "flac":
return "FLAC"
if self._params["ext"] == "ogg":
return "OGG"
if self._params["ext"] in ["sph", "nis", "nist"]:
return "NIST"
@property
def subtype(self):
if self._params["ext"] == "ogg":
return "VORBIS"
return dtype2subtype(self._params["dtype"])
def _prepare_read(self, start, stop, frames):
assert stop is None
self._start = start
return frames
def read(self, frames, dtype, always_2d):
assert always_2d
data = get_wav_data(
dtype,
self._params["num_channels"],
normalize=False,
num_frames=self._params["num_frames"],
channels_first=False,
).numpy()
return data[self._start : self._start + frames]
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
pass
class MockedLoadTest(unittest.TestCase):
def assert_dtype(self, ext, dtype, sample_rate, num_channels, normalize, channels_first):
"""When format is WAV or NIST, normalize=False will return the native dtype Tensor, otherwise float32"""
num_frames = 3 * sample_rate
path = _get_mock_path(ext, dtype, sample_rate, num_channels, num_frames)
expected_dtype = paddle.float32 if normalize or ext not in ["wav", "nist"] else getattr(paddle, dtype)
with patch("soundfile.SoundFile", SoundFileMock):
found, sr = soundfile_backend.load(path, normalize=normalize, channels_first=channels_first)
assert found.dtype == expected_dtype
assert sample_rate == sr
@parameterize(
["int32", "float32", "float64"],
[8000, 16000],
[1, 2],
[True, False],
[True, False],
)
def test_wav(self, dtype, sample_rate, num_channels, normalize, channels_first):
"""Returns native dtype when normalize=False else float32"""
self.assert_dtype("wav", dtype, sample_rate, num_channels, normalize, channels_first)
@parameterize(
["int32"],
[8000, 16000],
[1, 2],
[True, False],
[True, False],
)
def test_sphere(self, dtype, sample_rate, num_channels, normalize, channels_first):
"""Returns float32 always"""
self.assert_dtype("sph", dtype, sample_rate, num_channels, normalize, channels_first)
@parameterize([8000, 16000], [1, 2], [True, False], [True, False])
def test_ogg(self, sample_rate, num_channels, normalize, channels_first):
"""Returns float32 always"""
self.assert_dtype("ogg", "int16", sample_rate, num_channels, normalize, channels_first)
@parameterize([8000, 16000], [1, 2], [True, False], [True, False])
def test_flac(self, sample_rate, num_channels, normalize, channels_first):
"""`soundfile_backend.load` can load ogg format."""
self.assert_dtype("flac", "int16", sample_rate, num_channels, normalize, channels_first)
class LoadTestBase(TempDirMixin, unittest.TestCase):
def assert_wav(
self,
dtype,
sample_rate,
num_channels,
normalize,
channels_first=True,
duration=1,
):
"""`soundfile_backend.load` can load wav format correctly.
Wav data loaded with soundfile backend should match those with scipy
"""
path = self.get_temp_path("reference.wav")
num_frames = duration * sample_rate
data = get_wav_data(
dtype,
num_channels,
normalize=normalize,
num_frames=num_frames,
channels_first=channels_first,
)
save_wav(path, data, sample_rate, channels_first=channels_first)
expected = load_wav(path, normalize=normalize, channels_first=channels_first)[0]
data, sr = soundfile_backend.load(path, normalize=normalize, channels_first=channels_first)
assert sr == sample_rate
np.testing.assert_array_almost_equal(data.numpy(), expected.numpy())
def assert_sphere(
self,
dtype,
sample_rate,
num_channels,
channels_first=True,
duration=1,
):
"""`soundfile_backend.load` can load SPHERE format correctly."""
path = self.get_temp_path("reference.sph")
num_frames = duration * sample_rate
raw = get_wav_data(
dtype,
num_channels,
num_frames=num_frames,
normalize=False,
channels_first=False,
)
soundfile.write(path, raw, sample_rate, subtype=dtype2subtype(dtype), format="NIST")
expected = normalize_wav(raw.t() if channels_first else raw)
data, sr = soundfile_backend.load(path, channels_first=channels_first)
assert sr == sample_rate
#self.assertEqual(data, expected, atol=1e-4, rtol=1e-8)
np.testing.assert_array_almost_equal(data.numpy(), expected.numpy())
def assert_flac(
self,
dtype,
sample_rate,
num_channels,
channels_first=True,
duration=1,
):
"""`soundfile_backend.load` can load FLAC format correctly."""
path = self.get_temp_path("reference.flac")
num_frames = duration * sample_rate
raw = get_wav_data(
dtype,
num_channels,
num_frames=num_frames,
normalize=False,
channels_first=False,
)
soundfile.write(path, raw, sample_rate)
expected = normalize_wav(raw.t() if channels_first else raw)
data, sr = soundfile_backend.load(path, channels_first=channels_first)
assert sr == sample_rate
#self.assertEqual(data, expected, atol=1e-4, rtol=1e-8)
np.testing.assert_array_almost_equal(data.numpy(), expected.numpy())
class TestLoad(LoadTestBase):
"""Test the correctness of `soundfile_backend.load` for various formats"""
@parameterize(
["float32", "int32"],
[8000, 16000],
[1, 2],
[False, True],
[False, True],
)
def test_wav(self, dtype, sample_rate, num_channels, normalize, channels_first):
"""`soundfile_backend.load` can load wav format correctly."""
self.assert_wav(dtype, sample_rate, num_channels, normalize, channels_first)
@parameterize(
["int32"],
[16000],
[2],
[False],
)
def test_wav_large(self, dtype, sample_rate, num_channels, normalize):
"""`soundfile_backend.load` can load large wav file correctly."""
two_hours = 2 * 60 * 60
self.assert_wav(dtype, sample_rate, num_channels, normalize, duration=two_hours)
@parameterize(["float32", "int32"], [4, 8, 16, 32], [False, True])
def test_multiple_channels(self, dtype, num_channels, channels_first):
"""`soundfile_backend.load` can load wav file with more than 2 channels."""
sample_rate = 8000
normalize = False
self.assert_wav(dtype, sample_rate, num_channels, normalize, channels_first)
#@parameterize(["int32"], [8000, 16000], [1, 2], [False, True])
#@skipIfFormatNotSupported("NIST")
#def test_sphere(self, dtype, sample_rate, num_channels, channels_first):
#"""`soundfile_backend.load` can load sphere format correctly."""
#self.assert_sphere(dtype, sample_rate, num_channels, channels_first)
#@parameterize(["int32"], [8000, 16000], [1, 2], [False, True])
#@skipIfFormatNotSupported("FLAC")
#def test_flac(self, dtype, sample_rate, num_channels, channels_first):
#"""`soundfile_backend.load` can load flac format correctly."""
#self.assert_flac(dtype, sample_rate, num_channels, channels_first)
class TestLoadFormat(TempDirMixin, unittest.TestCase):
"""Given `format` parameter, `so.load` can load files without extension"""
original = None
path = None
def _make_file(self, format_):
sample_rate = 8000
path_with_ext = self.get_temp_path(f"test.{format_}")
data = get_wav_data("float32", num_channels=2).numpy().T
soundfile.write(path_with_ext, data, sample_rate)
expected = soundfile.read(path_with_ext, dtype="float32")[0].T
path = os.path.splitext(path_with_ext)[0]
os.rename(path_with_ext, path)
return path, expected
def _test_format(self, format_):
"""Providing format allows to read file without extension"""
path, expected = self._make_file(format_)
found, _ = soundfile_backend.load(path)
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(found, expected)
@parameterized.expand(
[
("WAV",),
("wav",),
]
)
def test_wav(self, format_):
self._test_format(format_)
@parameterized.expand(
[
("FLAC",),
("flac",),
]
)
@skipIfFormatNotSupported("FLAC")
def test_flac(self, format_):
self._test_format(format_)
class TestFileObject(TempDirMixin, unittest.TestCase):
def _test_fileobj(self, ext):
"""Loading audio via file-like object works"""
sample_rate = 16000
path = self.get_temp_path(f"test.{ext}")
data = get_wav_data("float32", num_channels=2).numpy().T
soundfile.write(path, data, sample_rate)
expected = soundfile.read(path, dtype="float32")[0].T
with open(path, "rb") as fileobj:
found, sr = soundfile_backend.load(fileobj)
assert sr == sample_rate
#self.assertEqual(expected, found)
np.testing.assert_array_almost_equal(found, expected)
def test_fileobj_wav(self):
"""Loading audio via file-like object works"""
self._test_fileobj("wav")
def test_fileobj_flac(self):
"""Loading audio via file-like object works"""
self._test_fileobj("flac")
def _test_tarfile(self, ext):
"""Loading audio via file-like object works"""
sample_rate = 16000
audio_file = f"test.{ext}"
audio_path = self.get_temp_path(audio_file)
archive_path = self.get_temp_path("archive.tar.gz")
data = get_wav_data("float32", num_channels=2).numpy().T
soundfile.write(audio_path, data, sample_rate)
expected = soundfile.read(audio_path, dtype="float32")[0].T
with tarfile.TarFile(archive_path, "w") as tarobj:
tarobj.add(audio_path, arcname=audio_file)
with tarfile.TarFile(archive_path, "r") as tarobj:
fileobj = tarobj.extractfile(audio_file)
found, sr = soundfile_backend.load(fileobj)
assert sr == sample_rate
#self.assertEqual(expected, found)
np.testing.assert_array_almost_equal(found.numpy(), expected)
def test_tarfile_wav(self):
"""Loading audio via file-like object works"""
self._test_tarfile("wav")
def test_tarfile_flac(self):
"""Loading audio via file-like object works"""
self._test_tarfile("flac")
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
import io
import unittest
from unittest.mock import patch
from paddlespeech.audio._internal import module_utils as _mod_utils
from paddlespeech.audio.backends import soundfile_backend
from tests.unit.common_utils import (
get_wav_data,
load_wav,
nested_params,
normalize_wav,
save_wav,
TempDirMixin,
)
from common import fetch_wav_subtype, parameterize, skipIfFormatNotSupported
import paddle
import numpy as np
import soundfile
class MockedSaveTest(unittest.TestCase):
@nested_params(
["float32", "int32"],
[8000, 16000],
[1, 2],
[False, True],
[
(None, None),
("PCM_U", None),
("PCM_U", 8),
("PCM_S", None),
("PCM_S", 16),
("PCM_S", 32),
("PCM_F", None),
("PCM_F", 32),
("PCM_F", 64),
("ULAW", None),
("ULAW", 8),
("ALAW", None),
("ALAW", 8),
],
)
@patch("soundfile.write")
def test_wav(self, dtype, sample_rate, num_channels, channels_first, enc_params, mocked_write):
"""soundfile_backend.save passes correct subtype to soundfile.write when WAV"""
filepath = "foo.wav"
input_tensor = get_wav_data(
dtype,
num_channels,
num_frames=3 * sample_rate,
normalize=dtype == "float32",
channels_first=channels_first,
)
input_tensor = paddle.transpose(input_tensor, [1, 0])
encoding, bits_per_sample = enc_params
soundfile_backend.save(
filepath,
input_tensor,
sample_rate,
channels_first=channels_first,
encoding=encoding,
bits_per_sample=bits_per_sample,
)
# on +Py3.8 call_args.kwargs is more descreptive
args = mocked_write.call_args[1]
assert args["file"] == filepath
assert args["samplerate"] == sample_rate
assert args["subtype"] == fetch_wav_subtype(dtype, encoding, bits_per_sample)
assert args["format"] is None
tensor_result = paddle.transpose(input_tensor, [1, 0]) if channels_first else input_tensor
#self.assertEqual(args["data"], tensor_result.numpy())
np.testing.assert_array_almost_equal(args["data"].numpy(), tensor_result.numpy())
@patch("soundfile.write")
def assert_non_wav(
self,
fmt,
dtype,
sample_rate,
num_channels,
channels_first,
mocked_write,
encoding=None,
bits_per_sample=None,
):
"""soundfile_backend.save passes correct subtype and format to soundfile.write when SPHERE"""
filepath = f"foo.{fmt}"
input_tensor = get_wav_data(
dtype,
num_channels,
num_frames=3 * sample_rate,
normalize=False,
channels_first=channels_first,
)
input_tensor = paddle.transpose(input_tensor, [1, 0])
expected_data = paddle.transpose(input_tensor, [1, 0]) if channels_first else input_tensor
soundfile_backend.save(
filepath,
input_tensor,
sample_rate,
channels_first,
encoding=encoding,
bits_per_sample=bits_per_sample,
)
# on +Py3.8 call_args.kwargs is more descreptive
args = mocked_write.call_args[1]
assert args["file"] == filepath
assert args["samplerate"] == sample_rate
if fmt in ["sph", "nist", "nis"]:
assert args["format"] == "NIST"
else:
assert args["format"] is None
np.testing.assert_array_almost_equal(args["data"].numpy(), expected_data.numpy())
#self.assertEqual(args["data"], expected_data)
@nested_params(
["sph", "nist", "nis"],
["int32"],
[8000, 16000],
[1, 2],
[False, True],
[
("PCM_S", 8),
("PCM_S", 16),
("PCM_S", 24),
("PCM_S", 32),
("ULAW", 8),
("ALAW", 8),
("ALAW", 16),
("ALAW", 24),
("ALAW", 32),
],
)
def test_sph(self, fmt, dtype, sample_rate, num_channels, channels_first, enc_params):
"""soundfile_backend.save passes default format and subtype (None-s) to
soundfile.write when not WAV"""
encoding, bits_per_sample = enc_params
self.assert_non_wav(
fmt, dtype, sample_rate, num_channels, channels_first, encoding=encoding, bits_per_sample=bits_per_sample
)
@parameterize(
["int32"],
[8000, 16000],
[1, 2],
[False, True],
[8, 16, 24],
)
def test_flac(self, dtype, sample_rate, num_channels, channels_first, bits_per_sample):
"""soundfile_backend.save passes default format and subtype (None-s) to
soundfile.write when not WAV"""
self.assert_non_wav("flac", dtype, sample_rate, num_channels, channels_first, bits_per_sample=bits_per_sample)
@parameterize(
["int32"],
[8000, 16000],
[1, 2],
[False, True],
)
def test_ogg(self, dtype, sample_rate, num_channels, channels_first):
"""soundfile_backend.save passes default format and subtype (None-s) to
soundfile.write when not WAV"""
self.assert_non_wav("ogg", dtype, sample_rate, num_channels, channels_first)
class SaveTestBase(TempDirMixin, unittest.TestCase):
def assert_wav(self, dtype, sample_rate, num_channels, num_frames):
"""`soundfile_backend.save` can save wav format."""
path = self.get_temp_path("data.wav")
expected = get_wav_data(dtype, num_channels, num_frames=num_frames, normalize=False)
soundfile_backend.save(path, expected, sample_rate)
found, sr = load_wav(path, normalize=False)
assert sample_rate == sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
def _assert_non_wav(self, fmt, dtype, sample_rate, num_channels):
"""`soundfile_backend.save` can save non-wav format.
Due to precision missmatch, and the lack of alternative way to decode the
resulting files without using soundfile, only meta data are validated.
"""
num_frames = sample_rate * 3
path = self.get_temp_path(f"data.{fmt}")
expected = get_wav_data(dtype, num_channels, num_frames=num_frames, normalize=False)
soundfile_backend.save(path, expected, sample_rate)
sinfo = soundfile.info(path)
assert sinfo.format == fmt.upper()
#assert sinfo.frames == num_frames this go wrong
assert sinfo.channels == num_channels
assert sinfo.samplerate == sample_rate
def assert_flac(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.save` can save flac format."""
self._assert_non_wav("flac", dtype, sample_rate, num_channels)
def assert_sphere(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.save` can save sph format."""
self._assert_non_wav("nist", dtype, sample_rate, num_channels)
def assert_ogg(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.save` can save ogg format.
As we cannot inspect the OGG format (it's lossy), we only check the metadata.
"""
self._assert_non_wav("ogg", dtype, sample_rate, num_channels)
class TestSave(SaveTestBase):
@parameterize(
["float32", "int32"],
[8000, 16000],
[1, 2],
)
def test_wav(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.save` can save wav format."""
self.assert_wav(dtype, sample_rate, num_channels, num_frames=None)
@parameterize(
["float32", "int32"],
[4, 8, 16, 32],
)
def test_multiple_channels(self, dtype, num_channels):
"""`soundfile_backend.save` can save wav with more than 2 channels."""
sample_rate = 8000
self.assert_wav(dtype, sample_rate, num_channels, num_frames=None)
@parameterize(
["int32"],
[8000, 16000],
[1, 2],
)
@skipIfFormatNotSupported("NIST")
def test_sphere(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.save` can save sph format."""
self.assert_sphere(dtype, sample_rate, num_channels)
@parameterize(
[8000, 16000],
[1, 2],
)
@skipIfFormatNotSupported("FLAC")
def test_flac(self, sample_rate, num_channels):
"""`soundfile_backend.save` can save flac format."""
self.assert_flac("float32", sample_rate, num_channels)
@parameterize(
[8000, 16000],
[1, 2],
)
@skipIfFormatNotSupported("OGG")
def test_ogg(self, sample_rate, num_channels):
"""`soundfile_backend.save` can save ogg/vorbis format."""
self.assert_ogg("float32", sample_rate, num_channels)
class TestSaveParams(TempDirMixin, unittest.TestCase):
"""Test the correctness of optional parameters of `soundfile_backend.save`"""
@parameterize([True, False])
def test_channels_first(self, channels_first):
"""channels_first swaps axes"""
path = self.get_temp_path("data.wav")
data = get_wav_data("int32", 2, channels_first=channels_first)
soundfile_backend.save(path, data, 8000, channels_first=channels_first)
found = load_wav(path)[0]
expected = data if channels_first else data.transpose([1, 0])
#self.assertEqual(found, expected, atol=1e-4, rtol=1e-8)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
class TestFileObject(TempDirMixin, unittest.TestCase):
def _test_fileobj(self, ext):
"""Saving audio to file-like object works"""
sample_rate = 16000
path = self.get_temp_path(f"test.{ext}")
subtype = "FLOAT" if ext == "wav" else None
data = get_wav_data("float32", num_channels=2)
soundfile.write(path, data.numpy().T, sample_rate, subtype=subtype)
expected = soundfile.read(path, dtype="float32")[0]
fileobj = io.BytesIO()
soundfile_backend.save(fileobj, data, sample_rate, format=ext)
fileobj.seek(0)
found, sr = soundfile.read(fileobj, dtype="float32")
assert sr == sample_rate
#self.assertEqual(expected, found, atol=1e-4, rtol=1e-8)
np.testing.assert_array_almost_equal(found, expected)
def test_fileobj_wav(self):
"""Saving audio via file-like object works"""
self._test_fileobj("wav")
@skipIfFormatNotSupported("FLAC")
def test_fileobj_flac(self):
"""Saving audio via file-like object works"""
self._test_fileobj("flac")
@skipIfFormatNotSupported("NIST")
def test_fileobj_nist(self):
"""Saving audio via file-like object works"""
self._test_fileobj("NIST")
@skipIfFormatNotSupported("OGG")
def test_fileobj_ogg(self):
"""Saving audio via file-like object works"""
self._test_fileobj("OGG")
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import filecmp
import os
import unittest
import numpy as np
import soundfile as sf
import paddlespeech.audio
from ..base import BackendTest
class TestIO(BackendTest):
def test_load_mono_channel(self):
sf_data, sf_sr = sf.read(self.files[0])
pa_data, pa_sr = paddlespeech.audio.load(
self.files[0], normal=False, dtype='float64')
self.assertEqual(sf_data.dtype, pa_data.dtype)
self.assertEqual(sf_sr, pa_sr)
np.testing.assert_array_almost_equal(sf_data, pa_data)
def test_load_multi_channels(self):
sf_data, sf_sr = sf.read(self.files[1])
sf_data = sf_data.T # Channel dim first
pa_data, pa_sr = paddlespeech.audio.load(
self.files[1], mono=False, normal=False, dtype='float64')
self.assertEqual(sf_data.dtype, pa_data.dtype)
self.assertEqual(sf_sr, pa_sr)
np.testing.assert_array_almost_equal(sf_data, pa_data)
def test_save_mono_channel(self):
waveform, sr = np.random.randint(
low=-32768, high=32768, size=(48000), dtype=np.int16), 16000
sf_tmp_file = 'sf_tmp.wav'
pa_tmp_file = 'pa_tmp.wav'
sf.write(sf_tmp_file, waveform, sr)
paddlespeech.audio.save(waveform, sr, pa_tmp_file)
self.assertTrue(filecmp.cmp(sf_tmp_file, pa_tmp_file))
for file in [sf_tmp_file, pa_tmp_file]:
os.remove(file)
def test_save_multi_channels(self):
waveform, sr = np.random.randint(
low=-32768, high=32768, size=(2, 48000), dtype=np.int16), 16000
sf_tmp_file = 'sf_tmp.wav'
pa_tmp_file = 'pa_tmp.wav'
sf.write(sf_tmp_file, waveform.T, sr)
paddlespeech.audio.save(waveform.T, sr, pa_tmp_file)
self.assertTrue(filecmp.cmp(sf_tmp_file, pa_tmp_file))
for file in [sf_tmp_file, pa_tmp_file]:
os.remove(file)
if __name__ == '__main__':
unittest.main()
import unittest
import itertools
import tarfile
from contextlib import contextmanager
import numpy as np
import paddle
import os
import io
from parameterized import parameterized
from tests.unit.audio.backends.common import get_bits_per_sample, get_encoding
from paddlespeech.audio.backends import sox_io_backend
from tests.unit.common_utils import (
get_wav_data,
load_wav,
save_wav,
TempDirMixin,
sox_utils,
data_utils
)
#code is from:https://github.com/pytorch/audio/blob/main/torchaudio/test/torchaudio_unittest/backend/sox_io/info_test.py
class TestInfo(TempDirMixin, unittest.TestCase):
@parameterized.expand(
list(
itertools.product(
["float32", "int32",],
[8000, 16000],
[1, 2],
)
),
)
def test_wav(self, dtype, sample_rate, num_channels):
"""`sox_io_backend.info` can check wav file correctly"""
duration = 1
path = self.get_temp_path("data.wav")
data = get_wav_data(dtype, num_channels, normalize=False, num_frames=duration * sample_rate)
save_wav(path, data, sample_rate)
info = sox_io_backend.info(path)
assert info.sample_rate == sample_rate
assert info.num_frames == sample_rate * duration
assert info.num_channels == num_channels
assert info.bits_per_sample == sox_utils.get_bit_depth(dtype)
assert info.encoding == get_encoding("wav", dtype)
@parameterized.expand(
list(
itertools.product(
["float32", "int32"],
[8000, 16000],
[4, 8, 16, 32],
)
),
)
def test_wav_multiple_channels(self, dtype, sample_rate, num_channels):
"""`sox_io_backend.info` can check wav file with channels more than 2 correctly"""
duration = 1
path = self.get_temp_path("data.wav")
data = get_wav_data(dtype, num_channels, normalize=False, num_frames=duration * sample_rate)
save_wav(path, data, sample_rate)
info = sox_io_backend.info(path)
assert info.sample_rate == sample_rate
assert info.num_frames == sample_rate * duration
assert info.num_channels == num_channels
assert info.bits_per_sample == sox_utils.get_bit_depth(dtype)
def test_ulaw(self):
"""`sox_io_backend.info` can check ulaw file correctly"""
duration = 1
num_channels = 1
sample_rate = 8000
path = self.get_temp_path("data.wav")
sox_utils.gen_audio_file(
path, sample_rate=sample_rate, num_channels=num_channels, bit_depth=8, encoding="u-law", duration=duration
)
info = sox_io_backend.info(path)
assert info.sample_rate == sample_rate
assert info.num_frames == sample_rate * duration
assert info.num_channels == num_channels
assert info.bits_per_sample == 8
assert info.encoding == "ULAW"
def test_alaw(self):
"""`sox_io_backend.info` can check alaw file correctly"""
duration = 1
num_channels = 1
sample_rate = 8000
path = self.get_temp_path("data.wav")
sox_utils.gen_audio_file(
path, sample_rate=sample_rate, num_channels=num_channels, bit_depth=8, encoding="a-law", duration=duration
)
info = sox_io_backend.info(path)
assert info.sample_rate == sample_rate
assert info.num_frames == sample_rate * duration
assert info.num_channels == num_channels
assert info.bits_per_sample == 8
assert info.encoding == "ALAW"
#class TestInfoOpus(unittest.TestCase):
#@parameterized.expand(
#list(
#itertools.product(
#["96k"],
#[1, 2],
#[0, 5, 10],
#)
#),
#)
#def test_opus(self, bitrate, num_channels, compression_level):
#"""`sox_io_backend.info` can check opus file correcty"""
#path = data_utils.get_asset_path("io", f"{bitrate}_{compression_level}_{num_channels}ch.opus")
#info = sox_io_backend.info(path)
#assert info.sample_rate == 48000
#assert info.num_frames == 32768
#assert info.num_channels == num_channels
#assert info.bits_per_sample == 0 # bit_per_sample is irrelevant for compressed formats
#assert info.encoding == "OPUS"
class FileObjTestBase(TempDirMixin):
def _gen_file(self, ext, dtype, sample_rate, num_channels, num_frames, *, comments=None):
path = self.get_temp_path(f"test.{ext}")
bit_depth = sox_utils.get_bit_depth(dtype)
duration = num_frames / sample_rate
comment_file = self._gen_comment_file(comments) if comments else None
sox_utils.gen_audio_file(
path,
sample_rate,
num_channels=num_channels,
encoding=sox_utils.get_encoding(dtype),
bit_depth=bit_depth,
duration=duration,
comment_file=comment_file,
)
return path
def _gen_comment_file(self, comments):
comment_path = self.get_temp_path("comment.txt")
with open(comment_path, "w") as file_:
file_.writelines(comments)
return comment_path
class Unseekable:
def __init__(self, fileobj):
self.fileobj = fileobj
def read(self, n):
return self.fileobj.read(n)
class TestFileObject(FileObjTestBase, unittest.TestCase):
def _query_fileobj(self, ext, dtype, sample_rate, num_channels, num_frames, *, comments=None):
path = self._gen_file(ext, dtype, sample_rate, num_channels, num_frames, comments=comments)
format_ = ext if ext in ["mp3"] else None
with open(path, "rb") as fileobj:
return sox_io_backend.info(fileobj, format_)
def _query_bytesio(self, ext, dtype, sample_rate, num_channels, num_frames):
path = self._gen_file(ext, dtype, sample_rate, num_channels, num_frames)
format_ = ext if ext in ["mp3"] else None
with open(path, "rb") as file_:
fileobj = io.BytesIO(file_.read())
return sox_io_backend.info(fileobj, format_)
def _query_tarfile(self, ext, dtype, sample_rate, num_channels, num_frames):
audio_path = self._gen_file(ext, dtype, sample_rate, num_channels, num_frames)
audio_file = os.path.basename(audio_path)
archive_path = self.get_temp_path("archive.tar.gz")
with tarfile.TarFile(archive_path, "w") as tarobj:
tarobj.add(audio_path, arcname=audio_file)
format_ = ext if ext in ["mp3"] else None
with tarfile.TarFile(archive_path, "r") as tarobj:
fileobj = tarobj.extractfile(audio_file)
return sox_io_backend.info(fileobj, format_)
@contextmanager
def _set_buffer_size(self, buffer_size):
try:
original_buffer_size = get_buffer_size()
set_buffer_size(buffer_size)
yield
finally:
set_buffer_size(original_buffer_size)
@parameterized.expand(
[
("wav", "float32"),
("wav", "int32"),
("wav", "int16"),
("wav", "uint8"),
]
)
def test_fileobj(self, ext, dtype):
"""Querying audio via file object works"""
sample_rate = 16000
num_frames = 3 * sample_rate
num_channels = 2
sinfo = self._query_fileobj(ext, dtype, sample_rate, num_channels, num_frames)
bits_per_sample = get_bits_per_sample(ext, dtype)
num_frames = 0 if ext in ["mp3", "vorbis"] else num_frames
assert sinfo.sample_rate == sample_rate
assert sinfo.num_channels == num_channels
assert sinfo.num_frames == num_frames
assert sinfo.bits_per_sample == bits_per_sample
assert sinfo.encoding == get_encoding(ext, dtype)
@parameterized.expand(
[
("wav", "float32"),
("wav", "int32"),
("wav", "int16"),
("wav", "uint8"),
]
)
def test_bytesio(self, ext, dtype):
"""Querying audio via ByteIO object works for small data"""
sample_rate = 16000
num_frames = 3 * sample_rate
num_channels = 2
sinfo = self._query_bytesio(ext, dtype, sample_rate, num_channels, num_frames)
bits_per_sample = get_bits_per_sample(ext, dtype)
num_frames = 0 if ext in ["mp3", "vorbis"] else num_frames
assert sinfo.sample_rate == sample_rate
assert sinfo.num_channels == num_channels
assert sinfo.num_frames == num_frames
assert sinfo.bits_per_sample == bits_per_sample
assert sinfo.encoding == get_encoding(ext, dtype)
@parameterized.expand(
[
("wav", "float32"),
("wav", "int32"),
("wav", "int16"),
("wav", "uint8"),
]
)
def test_bytesio_tiny(self, ext, dtype):
"""Querying audio via ByteIO object works for small data"""
sample_rate = 8000
num_frames = 4
num_channels = 2
sinfo = self._query_bytesio(ext, dtype, sample_rate, num_channels, num_frames)
bits_per_sample = get_bits_per_sample(ext, dtype)
num_frames = 0 if ext in ["mp3", "vorbis"] else num_frames
assert sinfo.sample_rate == sample_rate
assert sinfo.num_channels == num_channels
assert sinfo.num_frames == num_frames
assert sinfo.bits_per_sample == bits_per_sample
assert sinfo.encoding == get_encoding(ext, dtype)
@parameterized.expand(
[
("wav", "float32"),
("wav", "int32"),
("wav", "int16"),
("wav", "uint8"),
("flac", "float32"),
("vorbis", "float32"),
("amb", "int16"),
]
)
def test_tarfile(self, ext, dtype):
"""Querying compressed audio via file-like object works"""
sample_rate = 16000
num_frames = 3.0 * sample_rate
num_channels = 2
sinfo = self._query_tarfile(ext, dtype, sample_rate, num_channels, num_frames)
bits_per_sample = get_bits_per_sample(ext, dtype)
num_frames = 0 if ext in ["vorbis"] else num_frames
assert sinfo.sample_rate == sample_rate
assert sinfo.num_channels == num_channels
assert sinfo.num_frames == num_frames
assert sinfo.bits_per_sample == bits_per_sample
assert sinfo.encoding == get_encoding(ext, dtype)
if __name__ == '__main__':
unittest.main()
import unittest
import itertools
from parameterized import parameterized
import numpy as np
from paddlespeech.audio._internal import module_utils as _mod_utils
from paddlespeech.audio.backends import sox_io_backend
from tests.unit.common_utils import (
get_wav_data,
load_wav,
save_wav,
)
#code is from:https://github.com/pytorch/audio/blob/main/torchaudio/test/torchaudio_unittest/backend/sox_io/load_test.py
class TestLoad(unittest.TestCase):
def assert_wav(self, dtype, sample_rate, num_channels, normalize, duration):
"""`sox_io_backend.load` can load wav format correctly.
Wav data loaded with sox_io backend should match those with scipy
"""
path = 'testdata/reference.wav'
data = get_wav_data(dtype, num_channels, normalize=normalize, num_frames=duration * sample_rate)
save_wav(path, data, sample_rate)
expected = load_wav(path, normalize=normalize)[0]
data, sr = sox_io_backend.load(path, normalize=normalize)
assert sr == sample_rate
np.testing.assert_array_almost_equal(data, expected, decimal=4)
@parameterized.expand(
list(
itertools.product(
["float64", "float32", "int32",],
[8000, 16000],
[1, 2],
[False, True],
)
),
)
def test_wav(self, dtype, sample_rate, num_channels, normalize):
"""`sox_io_backend.load` can load wav format correctly."""
self.assert_wav(dtype, sample_rate, num_channels, normalize, duration=1)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
import io
import os
import unittest
import numpy as np
import paddle
from parameterized import parameterized
from paddlespeech.audio.backends import sox_io_backend
from tests.unit.common_utils import (
get_wav_data,
load_wav,
save_wav,
nested_params,
TempDirMixin,
sox_utils
)
#code is from:https://github.com/pytorch/audio/blob/main/torchaudio/test/torchaudio_unittest/backend/sox_io/save_test.py
def _get_sox_encoding(encoding):
encodings = {
"PCM_F": "floating-point",
"PCM_S": "signed-integer",
"PCM_U": "unsigned-integer",
"ULAW": "u-law",
"ALAW": "a-law",
}
return encodings.get(encoding)
class TestSaveBase(TempDirMixin):
def assert_save_consistency(
self,
format: str,
*,
compression: float = None,
encoding: str = None,
bits_per_sample: int = None,
sample_rate: float = 8000,
num_channels: int = 2,
num_frames: float = 3 * 8000,
src_dtype: str = "int32",
test_mode: str = "path",
):
"""`save` function produces file that is comparable with `sox` command
To compare that the file produced by `save` function agains the file produced by
the equivalent `sox` command, we need to load both files.
But there are many formats that cannot be opened with common Python modules (like
SciPy).
So we use `sox` command to prepare the original data and convert the saved files
into a format that SciPy can read (PCM wav).
The following diagram illustrates this process. The difference is 2.1. and 3.1.
This assumes that
- loading data with SciPy preserves the data well.
- converting the resulting files into WAV format with `sox` preserve the data well.
x
| 1. Generate source wav file with SciPy
|
v
-------------- wav ----------------
| |
| 2.1. load with scipy | 3.1. Convert to the target
| then save it into the target | format depth with sox
| format with paddleaudio |
v v
target format target format
| |
| 2.2. Convert to wav with sox | 3.2. Convert to wav with sox
| |
v v
wav wav
| |
| 2.3. load with scipy | 3.3. load with scipy
| |
v v
tensor -------> compare <--------- tensor
"""
cmp_encoding = "floating-point"
cmp_bit_depth = 32
src_path = self.get_temp_path("1.source.wav")
tgt_path = self.get_temp_path(f"2.1.paddleaudio.{format}")
tst_path = self.get_temp_path("2.2.result.wav")
sox_path = self.get_temp_path(f"3.1.sox.{format}")
ref_path = self.get_temp_path("3.2.ref.wav")
# 1. Generate original wav
data = get_wav_data(src_dtype, num_channels, normalize=False, num_frames=num_frames)
save_wav(src_path, data, sample_rate)
# 2.1. Convert the original wav to target format with paddleaudio
data = load_wav(src_path, normalize=False)[0]
if test_mode == "path":
sox_io_backend.save(
tgt_path, data, sample_rate, compression=compression, encoding=encoding, bits_per_sample=bits_per_sample
)
elif test_mode == "fileobj":
with open(tgt_path, "bw") as file_:
sox_io_backend.save(
file_,
data,
sample_rate,
format=format,
compression=compression,
encoding=encoding,
bits_per_sample=bits_per_sample,
)
elif test_mode == "bytesio":
file_ = io.BytesIO()
sox_io_backend.save(
file_,
data,
sample_rate,
format=format,
compression=compression,
encoding=encoding,
bits_per_sample=bits_per_sample,
)
file_.seek(0)
with open(tgt_path, "bw") as f:
f.write(file_.read())
else:
raise ValueError(f"Unexpected test mode: {test_mode}")
# 2.2. Convert the target format to wav with sox
sox_utils.convert_audio_file(tgt_path, tst_path, encoding=cmp_encoding, bit_depth=cmp_bit_depth)
# 2.3. Load with SciPy
found = load_wav(tst_path, normalize=False)[0]
# 3.1. Convert the original wav to target format with sox
sox_encoding = _get_sox_encoding(encoding)
sox_utils.convert_audio_file(
src_path, sox_path, compression=compression, encoding=sox_encoding, bit_depth=bits_per_sample
)
# 3.2. Convert the target format to wav with sox
sox_utils.convert_audio_file(sox_path, ref_path, encoding=cmp_encoding, bit_depth=cmp_bit_depth)
# 3.3. Load with SciPy
expected = load_wav(ref_path, normalize=False)[0]
np.testing.assert_array_almost_equal(found, expected)
class TestSave(TestSaveBase, unittest.TestCase):
@nested_params(
["path",],
[
("PCM_U", 8),
("PCM_S", 16),
("PCM_S", 32),
("PCM_F", 32),
("PCM_F", 64),
("ULAW", 8),
("ALAW", 8),
],
)
def test_save_wav(self, test_mode, enc_params):
encoding, bits_per_sample = enc_params
self.assert_save_consistency("wav", encoding=encoding, bits_per_sample=bits_per_sample, test_mode=test_mode)
@nested_params(
["path", ],
[
("float32",),
("int32",),
],
)
def test_save_wav_dtype(self, test_mode, params):
(dtype,) = params
self.assert_save_consistency("wav", src_dtype=dtype, test_mode=test_mode)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
import io
import itertools
import unittest
from parameterized import parameterized
from paddlespeech.audio.backends import sox_io_backend
from tests.unit.common_utils import (
get_wav_data,
TempDirMixin,
name_func
)
class SmokeTest(TempDirMixin, unittest.TestCase):
"""Run smoke test on various audio format
The purpose of this test suite is to verify that sox_io_backend functionalities do not exhibit
abnormal behaviors.
This test suite should be able to run without any additional tools (such as sox command),
however without such tools, the correctness of each function cannot be verified.
"""
def run_smoke_test(self, ext, sample_rate, num_channels, *, compression=None, dtype="float32"):
duration = 1
num_frames = sample_rate * duration
#path = self.get_temp_path(f"test.{ext}")
path = self.get_temp_path(f"test.{ext}")
original = get_wav_data(dtype, num_channels, normalize=False, num_frames=num_frames)
# 1. run save
sox_io_backend.save(path, original, sample_rate, compression=compression)
# 2. run info
info = sox_io_backend.info(path)
assert info.sample_rate == sample_rate
assert info.num_channels == num_channels
# 3. run load
loaded, sr = sox_io_backend.load(path, normalize=False)
assert sr == sample_rate
assert loaded.shape[0] == num_channels
@parameterized.expand(
list(
itertools.product(
["float32", "int32" ],
#["float32", "int32", "int16", "uint8"],
[8000, 16000],
[1, 2],
)
),
name_func=name_func,
)
def test_wav(self, dtype, sample_rate, num_channels):
"""Run smoke test on wav format"""
self.run_smoke_test("wav", sample_rate, num_channels, dtype=dtype)
#@parameterized.expand(
#list(
#itertools.product(
#[8000, 16000],
#[1, 2],
#[-4.2, -0.2, 0, 0.2, 96, 128, 160, 192, 224, 256, 320],
#)
#)
#)
#def test_mp3(self, sample_rate, num_channels, bit_rate):
#"""Run smoke test on mp3 format"""
#self.run_smoke_test("mp3", sample_rate, num_channels, compression=bit_rate)
#@parameterized.expand(
#list(
#itertools.product(
#[8000, 16000],
#[1, 2],
#[-1, 0, 1, 2, 3, 3.6, 5, 10],
#)
#)
#)
#def test_vorbis(self, sample_rate, num_channels, quality_level):
#"""Run smoke test on vorbis format"""
#self.run_smoke_test("vorbis", sample_rate, num_channels, compression=quality_level)
@parameterized.expand(
list(
itertools.product(
[8000, 16000],
[1, 2],
list(range(9)),
)
),
name_func=name_func,
)
def test_flac(self, sample_rate, num_channels, compression_level):
"""Run smoke test on flac format"""
self.run_smoke_test("flac", sample_rate, num_channels, compression=compression_level)
class SmokeTestFileObj(unittest.TestCase):
"""Run smoke test on various audio format
The purpose of this test suite is to verify that sox_io_backend functionalities do not exhibit
abnormal behaviors.
This test suite should be able to run without any additional tools (such as sox command),
however without such tools, the correctness of each function cannot be verified.
"""
def run_smoke_test(self, ext, sample_rate, num_channels, *, compression=None, dtype="float32"):
duration = 1
num_frames = sample_rate * duration
original = get_wav_data(dtype, num_channels, normalize=False, num_frames=num_frames)
fileobj = io.BytesIO()
# 1. run save
sox_io_backend.save(fileobj, original, sample_rate, compression=compression, format=ext)
# 2. run info
fileobj.seek(0)
info = sox_io_backend.info(fileobj, format=ext)
assert info.sample_rate == sample_rate
assert info.num_channels == num_channels
# 3. run load
fileobj.seek(0)
loaded, sr = sox_io_backend.load(fileobj, normalize=False, format=ext)
assert sr == sample_rate
assert loaded.shape[0] == num_channels
@parameterized.expand(
list(
itertools.product(
["float32", "int32"],
[8000, 16000],
[1, 2],
)
),
name_func=name_func,
)
def test_wav(self, dtype, sample_rate, num_channels):
"""Run smoke test on wav format"""
self.run_smoke_test("wav", sample_rate, num_channels, dtype=dtype)
# not support yet
#@parameterized.expand(
#list(
#itertools.product(
#[8000, 16000],
#[1, 2],
#[-4.2, -0.2, 0, 0.2, 96, 128, 160, 192, 224, 256, 320],
#)
#)
#)
#def test_mp3(self, sample_rate, num_channels, bit_rate):
#"""Run smoke test on mp3 format"""
#self.run_smoke_test("mp3", sample_rate, num_channels, compression=bit_rate)
#@parameterized.expand(
#list(
#itertools.product(
#[8000, 16000],
#[1, 2],
#[-1, 0, 1, 2, 3, 3.6, 5, 10],
#)
#)
#)
#def test_vorbis(self, sample_rate, num_channels, quality_level):
#"""Run smoke test on vorbis format"""
#self.run_smoke_test("vorbis", sample_rate, num_channels, compression=quality_level)
@parameterized.expand(
list(
itertools.product(
[8000, 16000],
[1, 2],
list(range(9)),
)
),
name_func=name_func,
)
def test_flac(self, sample_rate, num_channels, compression_level):
#"""Run smoke test on flac format"""
self.run_smoke_test("flac", sample_rate, num_channels, compression=compression_level)
if __name__ == '__main__':
#test_func()
unittest.main()
#code is from: https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/sox_effect/sox_effect_test.py
import io
import itertools
import tarfile
import unittest
from pathlib import Path
import numpy as np
from parameterized import parameterized
from paddlespeech.audio import sox_effects
from paddlespeech.audio._internal import module_utils as _mod_utils
from tests.unit.common_utils import (
get_sinusoid,
get_wav_data,
load_wav,
save_wav,
sox_utils,
TempDirMixin,
name_func,
load_effects_params
)
if _mod_utils.is_module_available("requests"):
import requests
class TestSoxEffects(unittest.TestCase):
def test_init(self):
"""Calling init_sox_effects multiple times does not crush"""
for _ in range(3):
sox_effects.init_sox_effects()
class TestSoxEffectsTensor(TempDirMixin, unittest.TestCase):
"""Test suite for `apply_effects_tensor` function"""
@parameterized.expand(
list(itertools.product(["float32", "int32"], [8000, 16000], [1, 2, 4, 8], [True, False])),
)
def test_apply_no_effect(self, dtype, sample_rate, num_channels, channels_first):
"""`apply_effects_tensor` without effects should return identical data as input"""
original = get_wav_data(dtype, num_channels, channels_first=channels_first)
expected = original.clone()
found, output_sample_rate = sox_effects.apply_effects_tensor(expected, sample_rate, [], channels_first)
assert (output_sample_rate == sample_rate)
# SoxEffect should not alter the input Tensor object
#self.assertEqual(original, expected)
np.testing.assert_array_almost_equal(original.numpy(), expected.numpy())
# SoxEffect should not return the same Tensor object
assert expected is not found
# Returned Tensor should equal to the input Tensor
#self.assertEqual(expected, found)
np.testing.assert_array_almost_equal(expected.numpy(), found.numpy())
@parameterized.expand(
load_effects_params("sox_effect_test_args.jsonl"),
name_func=lambda f, i, p: f'{f.__name__}_{i}_{p.args[0]["effects"][0][0]}',
)
def test_apply_effects(self, args):
"""`apply_effects_tensor` should return identical data as sox command"""
effects = args["effects"]
num_channels = args.get("num_channels", 2)
input_sr = args.get("input_sample_rate", 8000)
output_sr = args.get("output_sample_rate")
input_path = self.get_temp_path("input.wav")
reference_path = self.get_temp_path("reference.wav")
original = get_sinusoid(frequency=800, sample_rate=input_sr, n_channels=num_channels, dtype="float32")
save_wav(input_path, original, input_sr)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_tensor(original, input_sr, effects)
assert sr == expected_sr
#self.assertEqual(expected, found)
np.testing.assert_array_almost_equal(expected.numpy(), found.numpy())
class TestSoxEffectsFile(TempDirMixin, unittest.TestCase):
"""Test suite for `apply_effects_file` function"""
@parameterized.expand(
list(
itertools.product(
["float32", "int32"],
[8000, 16000],
[1, 2, 4, 8],
[False, True],
)
),
#name_func=name_func,
)
def test_apply_no_effect(self, dtype, sample_rate, num_channels, channels_first):
"""`apply_effects_file` without effects should return identical data as input"""
path = self.get_temp_path("input.wav")
expected = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(path, expected, sample_rate, channels_first=channels_first)
found, output_sample_rate = sox_effects.apply_effects_file(
path, [], normalize=False, channels_first=channels_first
)
assert output_sample_rate == sample_rate
#self.assertEqual(expected, found)
np.testing.assert_array_almost_equal(expected.numpy(), found.numpy())
@parameterized.expand(
load_effects_params("sox_effect_test_args.jsonl"),
#name_func=lambda f, i, p: f'{f.__name__}_{i}_{p.args[0]["effects"][0][0]}',
)
def test_apply_effects_str(self, args):
"""`apply_effects_file` should return identical data as sox command"""
dtype = "int32"
channels_first = True
effects = args["effects"]
num_channels = args.get("num_channels", 2)
input_sr = args.get("input_sample_rate", 8000)
output_sr = args.get("output_sample_rate")
input_path = self.get_temp_path("input.wav")
reference_path = self.get_temp_path("reference.wav")
data = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(input_path, data, input_sr, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(input_path, effects, normalize=False, channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(expected.numpy(), found.numpy())
def test_apply_effects_path(self):
"""`apply_effects_file` should return identical data as sox command when file path is given as a Path Object"""
dtype = "int32"
channels_first = True
effects = [["hilbert"]]
num_channels = 2
input_sr = 8000
output_sr = 8000
input_path = self.get_temp_path("input.wav")
reference_path = self.get_temp_path("reference.wav")
data = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(input_path, data, input_sr, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(
Path(input_path), effects, normalize=False, channels_first=channels_first
)
assert sr == expected_sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(expected.numpy(), found.numpy())
class TestFileFormats(TempDirMixin, unittest.TestCase):
"""`apply_effects_file` gives the same result as sox on various file formats"""
@parameterized.expand(
list(
itertools.product(
["float32", "int32"],
[8000, 16000],
[1, 2],
)
),
#name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}',
)
def test_wav(self, dtype, sample_rate, num_channels):
"""`apply_effects_file` works on various wav format"""
channels_first = True
effects = [["band", "300", "10"]]
input_path = self.get_temp_path("input.wav")
reference_path = self.get_temp_path("reference.wav")
data = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(input_path, data, sample_rate, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(input_path, effects, normalize=False, channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
#not support now
#@parameterized.expand(
#list(
#itertools.product(
#[8000, 16000],
#[1, 2],
#)
#),
##name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}',
#)
#def test_flac(self, sample_rate, num_channels):
#"""`apply_effects_file` works on various flac format"""
#channels_first = True
#effects = [["band", "300", "10"]]
#input_path = self.get_temp_path("input.flac")
#reference_path = self.get_temp_path("reference.wav")
#sox_utils.gen_audio_file(input_path, sample_rate, num_channels)
#sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
#expected, expected_sr = load_wav(reference_path)
#found, sr = sox_effects.apply_effects_file(input_path, effects, channels_first=channels_first)
#save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first)
#assert sr == expected_sr
##self.assertEqual(found, expected)
#np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
#@parameterized.expand(
#list(
#itertools.product(
#[8000, 16000],
#[1, 2],
#)
#),
##name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}',
#)
#def test_vorbis(self, sample_rate, num_channels):
#"""`apply_effects_file` works on various vorbis format"""
#channels_first = True
#effects = [["band", "300", "10"]]
#input_path = self.get_temp_path("input.vorbis")
#reference_path = self.get_temp_path("reference.wav")
#sox_utils.gen_audio_file(input_path, sample_rate, num_channels)
#sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
#expected, expected_sr = load_wav(reference_path)
#found, sr = sox_effects.apply_effects_file(input_path, effects, channels_first=channels_first)
#save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first)
#assert sr == expected_sr
##self.assertEqual(found, expected)
#np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
#@skipIfNoExec("sox")
#@skipIfNoSox
class TestFileObject(TempDirMixin, unittest.TestCase):
@parameterized.expand(
[
("wav", None),
]
)
def test_fileobj(self, ext, compression):
"""Applying effects via file object works"""
sample_rate = 16000
channels_first = True
effects = [["band", "300", "10"]]
input_path = self.get_temp_path(f"input.{ext}")
reference_path = self.get_temp_path("reference.wav")
#sox_utils.gen_audio_file(input_path, sample_rate, num_channels=2, compression=compression)
data = get_wav_data("int32", 2, channels_first=channels_first)
save_wav(input_path, data, sample_rate, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
expected, expected_sr = load_wav(reference_path)
with open(input_path, "rb") as fileobj:
found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first)
save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
@parameterized.expand(
[
("wav", None),
]
)
def test_bytesio(self, ext, compression):
"""Applying effects via BytesIO object works"""
sample_rate = 16000
channels_first = True
effects = [["band", "300", "10"]]
input_path = self.get_temp_path(f"input.{ext}")
reference_path = self.get_temp_path("reference.wav")
#sox_utils.gen_audio_file(input_path, sample_rate, num_channels=2, compression=compression)
data = get_wav_data("int32", 2, channels_first=channels_first)
save_wav(input_path, data, sample_rate, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
expected, expected_sr = load_wav(reference_path)
with open(input_path, "rb") as file_:
fileobj = io.BytesIO(file_.read())
found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first)
save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
print("found")
print(found)
print("expected")
print(expected)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
@parameterized.expand(
[
("wav", None),
]
)
def test_tarfile(self, ext, compression):
"""Applying effects to compressed audio via file-like file works"""
sample_rate = 16000
channels_first = True
effects = [["band", "300", "10"]]
audio_file = f"input.{ext}"
input_path = self.get_temp_path(audio_file)
reference_path = self.get_temp_path("reference.wav")
archive_path = self.get_temp_path("archive.tar.gz")
data = get_wav_data("int32", 2, channels_first=channels_first)
save_wav(input_path, data, sample_rate, channels_first=channels_first)
# sox_utils.gen_audio_file(input_path, sample_rate, num_channels=2, compression=compression)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
expected, expected_sr = load_wav(reference_path)
with tarfile.TarFile(archive_path, "w") as tarobj:
tarobj.add(input_path, arcname=audio_file)
with tarfile.TarFile(archive_path, "r") as tarobj:
fileobj = tarobj.extractfile(audio_file)
found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first)
save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
......@@ -17,8 +17,7 @@ import urllib.request
import numpy as np
import paddle
from paddlespeech.audio.soundfile_backend import soundfile_load as load
from paddleaudio.backends import soundfile_load as load
wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav'
......
......@@ -15,9 +15,9 @@ import unittest
import numpy as np
import paddle
from paddleaudio.functional.window import get_window
from .base import FeatTest
from paddlespeech.audio.functional.window import get_window
from paddlespeech.s2t.transform.spectrogram import IStft
from paddlespeech.s2t.transform.spectrogram import Stft
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
import torch
import torchaudio
import paddlespeech.audio
from .base import FeatTest
class TestKaldi(FeatTest):
def initParmas(self):
self.window_size = 1024
self.dtype = 'float32'
def test_window(self):
t_hann_window = torch.hann_window(
self.window_size, periodic=False, dtype=eval(f'torch.{self.dtype}'))
t_hamm_window = torch.hamming_window(
self.window_size,
periodic=False,
alpha=0.54,
beta=0.46,
dtype=eval(f'torch.{self.dtype}'))
t_povey_window = torch.hann_window(
self.window_size, periodic=False,
dtype=eval(f'torch.{self.dtype}')).pow(0.85)
p_hann_window = paddlespeech.audio.functional.window.get_window(
'hann',
self.window_size,
fftbins=False,
dtype=eval(f'paddle.{self.dtype}'))
p_hamm_window = paddlespeech.audio.functional.window.get_window(
'hamming',
self.window_size,
fftbins=False,
dtype=eval(f'paddle.{self.dtype}'))
p_povey_window = paddlespeech.audio.functional.window.get_window(
'hann',
self.window_size,
fftbins=False,
dtype=eval(f'paddle.{self.dtype}')).pow(0.85)
np.testing.assert_array_almost_equal(t_hann_window, p_hann_window)
np.testing.assert_array_almost_equal(t_hamm_window, p_hamm_window)
np.testing.assert_array_almost_equal(t_povey_window, p_povey_window)
def test_fbank(self):
ta_features = torchaudio.compliance.kaldi.fbank(
torch.from_numpy(self.waveform.astype(self.dtype)))
pa_features = paddlespeech.audio.compliance.kaldi.fbank(
paddle.to_tensor(self.waveform.astype(self.dtype)))
np.testing.assert_array_almost_equal(
ta_features, pa_features, decimal=4)
def test_mfcc(self):
ta_features = torchaudio.compliance.kaldi.mfcc(
torch.from_numpy(self.waveform.astype(self.dtype)))
pa_features = paddlespeech.audio.compliance.kaldi.mfcc(
paddle.to_tensor(self.waveform.astype(self.dtype)))
np.testing.assert_array_almost_equal(
ta_features, pa_features, decimal=4)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
from paddlespeech.audio.kaldi import fbank as fbank
from paddlespeech.audio.kaldi import pitch as pitch
from kaldiio import ReadHelper
# the groundtruth feats computed in kaldi command below.
#compute-fbank-feats --dither=0 scp:$wav_scp ark,t:fbank_feat.ark
#compute-kaldi-pitch-feats --sample-frequency=16000 scp:$wav_scp ark,t:pitch_feat.ark
class TestKaldiFbank(unittest.TestCase):
def test_fbank(self):
fbank_groundtruth = {}
with ReadHelper('ark:testdata/fbank_feat.ark') as reader:
for key, feat in reader:
fbank_groundtruth[key] = feat
with ReadHelper('ark:testdata/wav.ark') as reader:
for key, wav in reader:
fbank_feat = fbank(wav)
fbank_check = fbank_groundtruth[key]
np.testing.assert_array_almost_equal(
fbank_feat, fbank_check, decimal=4)
def test_pitch(self):
pitch_groundtruth = {}
with ReadHelper('ark:testdata/pitch_feat.ark') as reader:
for key, feat in reader:
pitch_groundtruth[key] = feat
with ReadHelper('ark:testdata/wav.ark') as reader:
for key, wav in reader:
pitch_feat = pitch(wav)
pitch_check = pitch_groundtruth[key]
np.testing.assert_array_almost_equal(
pitch_feat, pitch_check, decimal=4)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import librosa
import numpy as np
import paddle
import paddlespeech.audio
from .base import FeatTest
from paddlespeech.audio.functional.window import get_window
class TestLibrosa(FeatTest):
def initParmas(self):
self.n_fft = 512
self.hop_length = 128
self.n_mels = 40
self.n_mfcc = 20
self.fmin = 0.0
self.window_str = 'hann'
self.pad_mode = 'reflect'
self.top_db = 80.0
def test_stft(self):
if len(self.waveform.shape) == 2: # (C, T)
self.waveform = self.waveform.squeeze(
0) # 1D input for librosa.feature.melspectrogram
feature_librosa = librosa.core.stft(
y=self.waveform,
n_fft=self.n_fft,
hop_length=self.hop_length,
win_length=None,
window=self.window_str,
center=True,
dtype=None,
pad_mode=self.pad_mode, )
x = paddle.to_tensor(self.waveform).unsqueeze(0)
window = get_window(self.window_str, self.n_fft, dtype=x.dtype)
feature_paddle = paddle.signal.stft(
x=x,
n_fft=self.n_fft,
hop_length=self.hop_length,
win_length=None,
window=window,
center=True,
pad_mode=self.pad_mode,
normalized=False,
onesided=True, ).squeeze(0)
np.testing.assert_array_almost_equal(
feature_librosa, feature_paddle, decimal=5)
def test_istft(self):
if len(self.waveform.shape) == 2: # (C, T)
self.waveform = self.waveform.squeeze(
0) # 1D input for librosa.feature.melspectrogram
# Get stft result from librosa.
stft_matrix = librosa.core.stft(
y=self.waveform,
n_fft=self.n_fft,
hop_length=self.hop_length,
win_length=None,
window=self.window_str,
center=True,
pad_mode=self.pad_mode, )
feature_librosa = librosa.core.istft(
stft_matrix=stft_matrix,
hop_length=self.hop_length,
win_length=None,
window=self.window_str,
center=True,
dtype=None,
length=None, )
x = paddle.to_tensor(stft_matrix).unsqueeze(0)
window = get_window(
self.window_str,
self.n_fft,
dtype=paddle.to_tensor(self.waveform).dtype)
feature_paddle = paddle.signal.istft(
x=x,
n_fft=self.n_fft,
hop_length=self.hop_length,
win_length=None,
window=window,
center=True,
normalized=False,
onesided=True,
length=None,
return_complex=False, ).squeeze(0)
np.testing.assert_array_almost_equal(
feature_librosa, feature_paddle, decimal=5)
def test_mel(self):
feature_librosa = librosa.filters.mel(
sr=self.sr,
n_fft=self.n_fft,
n_mels=self.n_mels,
fmin=self.fmin,
fmax=None,
htk=False,
norm='slaney',
dtype=self.waveform.dtype, )
feature_compliance = paddlespeech.audio.compliance.librosa.compute_fbank_matrix(
sr=self.sr,
n_fft=self.n_fft,
n_mels=self.n_mels,
fmin=self.fmin,
fmax=None,
htk=False,
norm='slaney',
dtype=self.waveform.dtype, )
x = paddle.to_tensor(self.waveform)
feature_functional = paddlespeech.audio.functional.compute_fbank_matrix(
sr=self.sr,
n_fft=self.n_fft,
n_mels=self.n_mels,
f_min=self.fmin,
f_max=None,
htk=False,
norm='slaney',
dtype=x.dtype, )
np.testing.assert_array_almost_equal(feature_librosa,
feature_compliance)
np.testing.assert_array_almost_equal(feature_librosa,
feature_functional)
def test_melspect(self):
if len(self.waveform.shape) == 2: # (C, T)
self.waveform = self.waveform.squeeze(
0) # 1D input for librosa.feature.melspectrogram
# librosa:
feature_librosa = librosa.feature.melspectrogram(
y=self.waveform,
sr=self.sr,
n_fft=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
fmin=self.fmin)
# paddlespeech.audio.compliance.librosa:
feature_compliance = paddlespeech.audio.compliance.librosa.melspectrogram(
x=self.waveform,
sr=self.sr,
window_size=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
fmin=self.fmin,
to_db=False)
# paddlespeech.audio.features.layer
x = paddle.to_tensor(
self.waveform, dtype=paddle.float64).unsqueeze(0) # Add batch dim.
feature_extractor = paddlespeech.audio.features.MelSpectrogram(
sr=self.sr,
n_fft=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
f_min=self.fmin,
dtype=x.dtype)
feature_layer = feature_extractor(x).squeeze(0).numpy()
np.testing.assert_array_almost_equal(
feature_librosa, feature_compliance, decimal=5)
np.testing.assert_array_almost_equal(
feature_librosa, feature_layer, decimal=5)
def test_log_melspect(self):
if len(self.waveform.shape) == 2: # (C, T)
self.waveform = self.waveform.squeeze(
0) # 1D input for librosa.feature.melspectrogram
# librosa:
feature_librosa = librosa.feature.melspectrogram(
y=self.waveform,
sr=self.sr,
n_fft=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
fmin=self.fmin)
feature_librosa = librosa.power_to_db(feature_librosa, top_db=None)
# paddlespeech.audio.compliance.librosa:
feature_compliance = paddlespeech.audio.compliance.librosa.melspectrogram(
x=self.waveform,
sr=self.sr,
window_size=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
fmin=self.fmin)
# paddlespeech.audio.features.layer
x = paddle.to_tensor(
self.waveform, dtype=paddle.float64).unsqueeze(0) # Add batch dim.
feature_extractor = paddlespeech.audio.features.LogMelSpectrogram(
sr=self.sr,
n_fft=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
f_min=self.fmin,
dtype=x.dtype)
feature_layer = feature_extractor(x).squeeze(0).numpy()
np.testing.assert_array_almost_equal(
feature_librosa, feature_compliance, decimal=5)
np.testing.assert_array_almost_equal(
feature_librosa, feature_layer, decimal=4)
def test_mfcc(self):
if len(self.waveform.shape) == 2: # (C, T)
self.waveform = self.waveform.squeeze(
0) # 1D input for librosa.feature.melspectrogram
# librosa:
feature_librosa = librosa.feature.mfcc(
y=self.waveform,
sr=self.sr,
S=None,
n_mfcc=self.n_mfcc,
dct_type=2,
norm='ortho',
lifter=0,
n_fft=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
fmin=self.fmin)
# paddlespeech.audio.compliance.librosa:
feature_compliance = paddlespeech.audio.compliance.librosa.mfcc(
x=self.waveform,
sr=self.sr,
n_mfcc=self.n_mfcc,
dct_type=2,
norm='ortho',
lifter=0,
window_size=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
fmin=self.fmin,
top_db=self.top_db)
# paddlespeech.audio.features.layer
x = paddle.to_tensor(
self.waveform, dtype=paddle.float64).unsqueeze(0) # Add batch dim.
feature_extractor = paddlespeech.audio.features.MFCC(
sr=self.sr,
n_mfcc=self.n_mfcc,
n_fft=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels,
f_min=self.fmin,
top_db=self.top_db,
dtype=x.dtype)
feature_layer = feature_extractor(x).squeeze(0).numpy()
np.testing.assert_array_almost_equal(
feature_librosa, feature_compliance, decimal=4)
np.testing.assert_array_almost_equal(
feature_librosa, feature_layer, decimal=4)
if __name__ == '__main__':
unittest.main()
......@@ -15,8 +15,8 @@ import unittest
import numpy as np
import paddle
import paddleaudio
import paddlespeech.audio
from .base import FeatTest
from paddlespeech.s2t.transform.spectrogram import LogMelSpectrogram
......@@ -33,7 +33,7 @@ class TestLogMelSpectrogram(FeatTest):
ps_res = ps_melspect(self.waveform.T).squeeze(1).T
x = paddle.to_tensor(self.waveform)
ps_melspect = paddlespeech.audio.features.LogMelSpectrogram(
ps_melspect = paddleaudio.features.LogMelSpectrogram(
self.sr,
self.n_fft,
self.hop_length,
......
......@@ -15,8 +15,8 @@ import unittest
import numpy as np
import paddle
import paddleaudio
import paddlespeech.audio
from .base import FeatTest
from paddlespeech.s2t.transform.spectrogram import Spectrogram
......@@ -31,7 +31,7 @@ class TestSpectrogram(FeatTest):
ps_res = ps_spect(self.waveform.T).squeeze(1).T # Magnitude
x = paddle.to_tensor(self.waveform)
pa_spect = paddlespeech.audio.features.Spectrogram(
pa_spect = paddleaudio.features.Spectrogram(
self.n_fft, self.hop_length, power=1.0)
pa_res = pa_spect(x).squeeze(0).numpy()
......
......@@ -15,9 +15,9 @@ import unittest
import numpy as np
import paddle
from paddleaudio.functional.window import get_window
from .base import FeatTest
from paddlespeech.audio.functional.window import get_window
from paddlespeech.s2t.transform.spectrogram import Stft
......
from .wav_utils import get_wav_data, load_wav, save_wav, normalize_wav
from .parameterized_utils import nested_params
from .data_utils import get_sinusoid, load_params, load_effects_params
from .case_utils import (
TempDirMixin,
name_func
)
from .case_utils import name_func
from .case_utils import TempDirMixin
from .data_utils import get_sinusoid
from .data_utils import load_effects_params
from .data_utils import load_params
from .parameterized_utils import nested_params
from .wav_utils import get_wav_data
from .wav_utils import load_wav
from .wav_utils import normalize_wav
from .wav_utils import save_wav
__all__ = [
"get_wav_data",
"load_wav",
"save_wav",
"normalize_wav",
"load_params",
"nested_params",
"get_sinusoid",
"name_func",
"load_effects_params"
"get_wav_data", "load_wav", "save_wav", "normalize_wav", "load_params",
"nested_params", "get_sinusoid", "name_func", "load_effects_params"
]
import functools
import os.path
import shutil
import subprocess
import sys
import tempfile
import time
import unittest
#code is from:https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/common_utils/case_utils.py
import paddle
from paddlespeech.audio._internal.module_utils import (
is_kaldi_available,
is_module_available,
is_sox_available,
)
def name_func(func, _, params):
return f'{func.__name__}_{"_".join(str(arg) for arg in params.args)}'
class TempDirMixin:
"""Mixin to provide easy access to temp dir"""
......
from typing import Optional
import scipy.io.wavfile
import paddle
import numpy as np
import scipy.io.wavfile
def normalize_wav(tensor: paddle.Tensor) -> paddle.Tensor:
if tensor.dtype == paddle.float32:
......@@ -23,13 +23,12 @@ def normalize_wav(tensor: paddle.Tensor) -> paddle.Tensor:
def get_wav_data(
dtype: str,
num_channels: int,
*,
num_frames: Optional[int] = None,
normalize: bool = True,
channels_first: bool = True,
):
dtype: str,
num_channels: int,
*,
num_frames: Optional[int]=None,
normalize: bool=True,
channels_first: bool=True, ):
"""Generate linear signal of the given dtype and num_channels
Data range is
......@@ -53,25 +52,26 @@ def get_wav_data(
# paddle linspace not support uint8, int8, int16
#if dtype == "uint8":
# base = paddle.linspace(0, 255, num_frames, dtype=dtype_)
#dtype_np = getattr(np, dtype)
#base_np = np.linspace(0, 255, num_frames, dtype_np)
#base = paddle.to_tensor(base_np, dtype=dtype_)
#dtype_np = getattr(np, dtype)
#base_np = np.linspace(0, 255, num_frames, dtype_np)
#base = paddle.to_tensor(base_np, dtype=dtype_)
#elif dtype == "int8":
# base = paddle.linspace(-128, 127, num_frames, dtype=dtype_)
#dtype_np = getattr(np, dtype)
#base_np = np.linspace(-128, 127, num_frames, dtype_np)
#base = paddle.to_tensor(base_np, dtype=dtype_)
#dtype_np = getattr(np, dtype)
#base_np = np.linspace(-128, 127, num_frames, dtype_np)
#base = paddle.to_tensor(base_np, dtype=dtype_)
if dtype == "float32":
base = paddle.linspace(-1.0, 1.0, num_frames, dtype=dtype_)
elif dtype == "float64":
base = paddle.linspace(-1.0, 1.0, num_frames, dtype=dtype_)
elif dtype == "int32":
base = paddle.linspace(-2147483648, 2147483647, num_frames, dtype=dtype_)
base = paddle.linspace(
-2147483648, 2147483647, num_frames, dtype=dtype_)
#elif dtype == "int16":
# base = paddle.linspace(-32768, 32767, num_frames, dtype=dtype_)
#dtype_np = getattr(np, dtype)
#base_np = np.linspace(-32768, 32767, num_frames, dtype_np)
#base = paddle.to_tensor(base_np, dtype=dtype_)
#dtype_np = getattr(np, dtype)
#base_np = np.linspace(-32768, 32767, num_frames, dtype_np)
#base = paddle.to_tensor(base_np, dtype=dtype_)
else:
raise NotImplementedError(f"Unsupported dtype {dtype}")
data = base.tile([num_channels, 1])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册