diff --git a/tests/benchmark/audio/README.md b/tests/benchmark/audio/README.md deleted file mode 100644 index 9cade74e0bdd8d847ba77849d6e597e259f029a2..0000000000000000000000000000000000000000 --- a/tests/benchmark/audio/README.md +++ /dev/null @@ -1,38 +0,0 @@ -# 1. Prepare -First, install `pytest-benchmark` via pip. -```sh -pip install pytest-benchmark -``` - -# 2. Run -Run the specific script for profiling. -```sh -pytest melspectrogram.py -``` - -Result: -```sh -========================================================================== test session starts ========================================================================== -platform linux -- Python 3.7.7, pytest-7.0.1, pluggy-1.0.0 -benchmark: 3.4.1 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000) -plugins: typeguard-2.12.1, benchmark-3.4.1, anyio-3.5.0 -collected 4 items - -melspectrogram.py .... [100%] - - --------------------------------------------------------------------------------------------------- benchmark: 4 tests ------------------------------------------------------------------------------------------------- -Name (time in us) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ -test_melspect_gpu_torchaudio 202.0765 (1.0) 360.6230 (1.0) 218.1168 (1.0) 16.3022 (1.0) 214.2871 (1.0) 21.8451 (1.0) 40;3 4,584.7001 (1.0) 286 1 -test_melspect_gpu 657.8509 (3.26) 908.0470 (2.52) 724.2545 (3.32) 106.5771 (6.54) 669.9096 (3.13) 113.4719 (5.19) 1;0 1,380.7300 (0.30) 5 1 -test_melspect_cpu_torchaudio 1,247.6053 (6.17) 2,892.5799 (8.02) 1,443.2853 (6.62) 345.3732 (21.19) 1,262.7263 (5.89) 221.6385 (10.15) 56;53 692.8637 (0.15) 399 1 -test_melspect_cpu 20,326.2549 (100.59) 20,607.8682 (57.15) 20,473.4125 (93.86) 63.8654 (3.92) 20,467.0429 (95.51) 68.4294 (3.13) 8;1 48.8438 (0.01) 29 1 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ - -Legend: - Outliers: 1 Standard Deviation from Mean; 1.5 IQR (InterQuartile Range) from 1st Quartile and 3rd Quartile. - OPS: Operations Per Second, computed as 1 / Mean -========================================================================== 4 passed in 21.12s =========================================================================== - -``` diff --git a/tests/benchmark/audio/log_melspectrogram.py b/tests/benchmark/audio/log_melspectrogram.py deleted file mode 100644 index c85fcecfbadb394807eee4e9fcb2b5b23fcc20ab..0000000000000000000000000000000000000000 --- a/tests/benchmark/audio/log_melspectrogram.py +++ /dev/null @@ -1,125 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import urllib.request - -import librosa -import numpy as np -import paddle -import torch -import torchaudio - -import paddlespeech.audio - -wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav' -if not os.path.isfile(os.path.basename(wav_url)): - urllib.request.urlretrieve(wav_url, os.path.basename(wav_url)) - -waveform, sr = paddlespeech.audio.load( - os.path.abspath(os.path.basename(wav_url))) -waveform_tensor = paddle.to_tensor(waveform).unsqueeze(0) -waveform_tensor_torch = torch.from_numpy(waveform).unsqueeze(0) - -# Feature conf -mel_conf = { - 'sr': sr, - 'n_fft': 512, - 'hop_length': 128, - 'n_mels': 40, -} - -mel_conf_torchaudio = { - 'sample_rate': sr, - 'n_fft': 512, - 'hop_length': 128, - 'n_mels': 40, - 'norm': 'slaney', - 'mel_scale': 'slaney', -} - - -def enable_cpu_device(): - paddle.set_device('cpu') - - -def enable_gpu_device(): - paddle.set_device('gpu') - - -log_mel_extractor = paddlespeech.audio.features.LogMelSpectrogram( - **mel_conf, f_min=0.0, top_db=80.0, dtype=waveform_tensor.dtype) - - -def log_melspectrogram(): - return log_mel_extractor(waveform_tensor).squeeze(0) - - -def test_log_melspect_cpu(benchmark): - enable_cpu_device() - feature_audio = benchmark(log_melspectrogram) - feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf) - feature_librosa = librosa.power_to_db(feature_librosa, top_db=80.0) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=3) - - -def test_log_melspect_gpu(benchmark): - enable_gpu_device() - feature_audio = benchmark(log_melspectrogram) - feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf) - feature_librosa = librosa.power_to_db(feature_librosa, top_db=80.0) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=2) - - -mel_extractor_torchaudio = torchaudio.transforms.MelSpectrogram( - **mel_conf_torchaudio, f_min=0.0) -amplitude_to_DB = torchaudio.transforms.AmplitudeToDB('power', top_db=80.0) - - -def melspectrogram_torchaudio(): - return mel_extractor_torchaudio(waveform_tensor_torch).squeeze(0) - - -def log_melspectrogram_torchaudio(): - mel_specgram = mel_extractor_torchaudio(waveform_tensor_torch) - return amplitude_to_DB(mel_specgram).squeeze(0) - - -def test_log_melspect_cpu_torchaudio(benchmark): - global waveform_tensor_torch, mel_extractor_torchaudio, amplitude_to_DB - - mel_extractor_torchaudio = mel_extractor_torchaudio.to('cpu') - waveform_tensor_torch = waveform_tensor_torch.to('cpu') - amplitude_to_DB = amplitude_to_DB.to('cpu') - - feature_audio = benchmark(log_melspectrogram_torchaudio) - feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf) - feature_librosa = librosa.power_to_db(feature_librosa, top_db=80.0) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=3) - - -def test_log_melspect_gpu_torchaudio(benchmark): - global waveform_tensor_torch, mel_extractor_torchaudio, amplitude_to_DB - - mel_extractor_torchaudio = mel_extractor_torchaudio.to('cuda') - waveform_tensor_torch = waveform_tensor_torch.to('cuda') - amplitude_to_DB = amplitude_to_DB.to('cuda') - - feature_torchaudio = benchmark(log_melspectrogram_torchaudio) - feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf) - feature_librosa = librosa.power_to_db(feature_librosa, top_db=80.0) - np.testing.assert_array_almost_equal( - feature_librosa, feature_torchaudio.cpu(), decimal=2) diff --git a/tests/benchmark/audio/melspectrogram.py b/tests/benchmark/audio/melspectrogram.py deleted file mode 100644 index 498158941d55f5fae9fb952b6cae2c984fe9b574..0000000000000000000000000000000000000000 --- a/tests/benchmark/audio/melspectrogram.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import urllib.request - -import librosa -import numpy as np -import paddle -import torch -import torchaudio - -import paddlespeech.audio - -wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav' -if not os.path.isfile(os.path.basename(wav_url)): - urllib.request.urlretrieve(wav_url, os.path.basename(wav_url)) - -waveform, sr = paddlespeech.audio.load( - os.path.abspath(os.path.basename(wav_url))) -waveform_tensor = paddle.to_tensor(waveform).unsqueeze(0) -waveform_tensor_torch = torch.from_numpy(waveform).unsqueeze(0) - -# Feature conf -mel_conf = { - 'sr': sr, - 'n_fft': 512, - 'hop_length': 128, - 'n_mels': 40, -} - -mel_conf_torchaudio = { - 'sample_rate': sr, - 'n_fft': 512, - 'hop_length': 128, - 'n_mels': 40, - 'norm': 'slaney', - 'mel_scale': 'slaney', -} - - -def enable_cpu_device(): - paddle.set_device('cpu') - - -def enable_gpu_device(): - paddle.set_device('gpu') - - -mel_extractor = paddlespeech.audio.features.MelSpectrogram( - **mel_conf, f_min=0.0, dtype=waveform_tensor.dtype) - - -def melspectrogram(): - return mel_extractor(waveform_tensor).squeeze(0) - - -def test_melspect_cpu(benchmark): - enable_cpu_device() - feature_audio = benchmark(melspectrogram) - feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=3) - - -def test_melspect_gpu(benchmark): - enable_gpu_device() - feature_audio = benchmark(melspectrogram) - feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=3) - - -mel_extractor_torchaudio = torchaudio.transforms.MelSpectrogram( - **mel_conf_torchaudio, f_min=0.0) - - -def melspectrogram_torchaudio(): - return mel_extractor_torchaudio(waveform_tensor_torch).squeeze(0) - - -def test_melspect_cpu_torchaudio(benchmark): - global waveform_tensor_torch, mel_extractor_torchaudio - mel_extractor_torchaudio = mel_extractor_torchaudio.to('cpu') - waveform_tensor_torch = waveform_tensor_torch.to('cpu') - feature_audio = benchmark(melspectrogram_torchaudio) - feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=3) - - -def test_melspect_gpu_torchaudio(benchmark): - global waveform_tensor_torch, mel_extractor_torchaudio - mel_extractor_torchaudio = mel_extractor_torchaudio.to('cuda') - waveform_tensor_torch = waveform_tensor_torch.to('cuda') - feature_torchaudio = benchmark(melspectrogram_torchaudio) - feature_librosa = librosa.feature.melspectrogram(waveform, **mel_conf) - np.testing.assert_array_almost_equal( - feature_librosa, feature_torchaudio.cpu(), decimal=3) diff --git a/tests/benchmark/audio/mfcc.py b/tests/benchmark/audio/mfcc.py deleted file mode 100644 index 4e286de907aada17f5a0eda033403bfda03f15ad..0000000000000000000000000000000000000000 --- a/tests/benchmark/audio/mfcc.py +++ /dev/null @@ -1,123 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import urllib.request - -import librosa -import numpy as np -import paddle -import torch -import torchaudio - -import paddlespeech.audio - -wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav' -if not os.path.isfile(os.path.basename(wav_url)): - urllib.request.urlretrieve(wav_url, os.path.basename(wav_url)) - -waveform, sr = paddlespeech.audio.load( - os.path.abspath(os.path.basename(wav_url))) -waveform_tensor = paddle.to_tensor(waveform).unsqueeze(0) -waveform_tensor_torch = torch.from_numpy(waveform).unsqueeze(0) - -# Feature conf -mel_conf = { - 'sr': sr, - 'n_fft': 512, - 'hop_length': 128, - 'n_mels': 40, -} -mfcc_conf = { - 'n_mfcc': 20, - 'top_db': 80.0, -} -mfcc_conf.update(mel_conf) - -mel_conf_torchaudio = { - 'sample_rate': sr, - 'n_fft': 512, - 'hop_length': 128, - 'n_mels': 40, - 'norm': 'slaney', - 'mel_scale': 'slaney', -} -mfcc_conf_torchaudio = { - 'sample_rate': sr, - 'n_mfcc': 20, -} - - -def enable_cpu_device(): - paddle.set_device('cpu') - - -def enable_gpu_device(): - paddle.set_device('gpu') - - -mfcc_extractor = paddlespeech.audio.features.MFCC( - **mfcc_conf, f_min=0.0, dtype=waveform_tensor.dtype) - - -def mfcc(): - return mfcc_extractor(waveform_tensor).squeeze(0) - - -def test_mfcc_cpu(benchmark): - enable_cpu_device() - feature_audio = benchmark(mfcc) - feature_librosa = librosa.feature.mfcc(waveform, **mel_conf) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=3) - - -def test_mfcc_gpu(benchmark): - enable_gpu_device() - feature_audio = benchmark(mfcc) - feature_librosa = librosa.feature.mfcc(waveform, **mel_conf) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=3) - - -del mel_conf_torchaudio['sample_rate'] -mfcc_extractor_torchaudio = torchaudio.transforms.MFCC( - **mfcc_conf_torchaudio, melkwargs=mel_conf_torchaudio) - - -def mfcc_torchaudio(): - return mfcc_extractor_torchaudio(waveform_tensor_torch).squeeze(0) - - -def test_mfcc_cpu_torchaudio(benchmark): - global waveform_tensor_torch, mfcc_extractor_torchaudio - - mel_extractor_torchaudio = mfcc_extractor_torchaudio.to('cpu') - waveform_tensor_torch = waveform_tensor_torch.to('cpu') - - feature_audio = benchmark(mfcc_torchaudio) - feature_librosa = librosa.feature.mfcc(waveform, **mel_conf) - np.testing.assert_array_almost_equal( - feature_librosa, feature_audio, decimal=3) - - -def test_mfcc_gpu_torchaudio(benchmark): - global waveform_tensor_torch, mfcc_extractor_torchaudio - - mel_extractor_torchaudio = mfcc_extractor_torchaudio.to('cuda') - waveform_tensor_torch = waveform_tensor_torch.to('cuda') - - feature_torchaudio = benchmark(mfcc_torchaudio) - feature_librosa = librosa.feature.mfcc(waveform, **mel_conf) - np.testing.assert_array_almost_equal( - feature_librosa, feature_torchaudio.cpu(), decimal=3) diff --git a/tests/unit/audio/backends/__init__.py b/tests/unit/audio/backends/__init__.py deleted file mode 100644 index 97043fd7ba6885aac81cad5a49924c23c67d4d47..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/unit/audio/backends/base.py b/tests/unit/audio/backends/base.py deleted file mode 100644 index a67191887ff2e4cbe5a722f8867e0bdf2eaf5490..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/base.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import unittest -import urllib.request - -mono_channel_wav = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav' -multi_channels_wav = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/cat.wav' - - -class BackendTest(unittest.TestCase): - def setUp(self): - self.initWavInput() - - def initWavInput(self): - self.files = [] - for url in [mono_channel_wav, multi_channels_wav]: - if not os.path.isfile(os.path.basename(url)): - urllib.request.urlretrieve(url, os.path.basename(url)) - self.files.append(os.path.basename(url)) - - def initParmas(self): - raise NotImplementedError diff --git a/tests/unit/audio/backends/common.py b/tests/unit/audio/backends/common.py deleted file mode 100644 index 79b922a912de85a3b6758e33c27a018c58a4e591..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/common.py +++ /dev/null @@ -1,32 +0,0 @@ - -def get_encoding(ext, dtype): - exts = { - "mp3", - "flac", - "vorbis", - } - encodings = { - "float32": "PCM_F", - "int32": "PCM_S", - "int16": "PCM_S", - "uint8": "PCM_U", - } - return ext.upper() if ext in exts else encodings[dtype] - - -def get_bit_depth(dtype): - bit_depths = { - "float32": 32, - "int32": 32, - "int16": 16, - "uint8": 8, - } - return bit_depths[dtype] - -def get_bits_per_sample(ext, dtype): - bits_per_samples = { - "flac": 24, - "mp3": 0, - "vorbis": 0, - } - return bits_per_samples.get(ext, get_bit_depth(dtype)) diff --git a/tests/unit/audio/backends/soundfile/__init__.py b/tests/unit/audio/backends/soundfile/__init__.py deleted file mode 100644 index 97043fd7ba6885aac81cad5a49924c23c67d4d47..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/soundfile/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/unit/audio/backends/soundfile/common.py b/tests/unit/audio/backends/soundfile/common.py deleted file mode 100644 index 7067b4a983695e61d9f6e63b1236fdb6535fa742..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/soundfile/common.py +++ /dev/null @@ -1,57 +0,0 @@ -import itertools -from unittest import skipIf - -from parameterized import parameterized -from paddlespeech.audio._internal.module_utils import is_module_available - - -def name_func(func, _, params): - return f'{func.__name__}_{"_".join(str(arg) for arg in params.args)}' - - -def dtype2subtype(dtype): - return { - "float64": "DOUBLE", - "float32": "FLOAT", - "int32": "PCM_32", - "int16": "PCM_16", - "uint8": "PCM_U8", - "int8": "PCM_S8", - }[dtype] - - -def skipIfFormatNotSupported(fmt): - fmts = [] - if is_module_available("soundfile"): - import soundfile - - fmts = soundfile.available_formats() - return skipIf(fmt not in fmts, f'"{fmt}" is not supported by soundfile') - return skipIf(True, '"soundfile" not available.') - - -def parameterize(*params): - return parameterized.expand(list(itertools.product(*params)), name_func=name_func) - - -def fetch_wav_subtype(dtype, encoding, bits_per_sample): - subtype = { - (None, None): dtype2subtype(dtype), - (None, 8): "PCM_U8", - ("PCM_U", None): "PCM_U8", - ("PCM_U", 8): "PCM_U8", - ("PCM_S", None): "PCM_32", - ("PCM_S", 16): "PCM_16", - ("PCM_S", 32): "PCM_32", - ("PCM_F", None): "FLOAT", - ("PCM_F", 32): "FLOAT", - ("PCM_F", 64): "DOUBLE", - ("ULAW", None): "ULAW", - ("ULAW", 8): "ULAW", - ("ALAW", None): "ALAW", - ("ALAW", 8): "ALAW", - }.get((encoding, bits_per_sample)) - if subtype: - return subtype - raise ValueError(f"wav does not support ({encoding}, {bits_per_sample}).") - diff --git a/tests/unit/audio/backends/soundfile/info_test.py b/tests/unit/audio/backends/soundfile/info_test.py deleted file mode 100644 index c94826858ff6c91baaec27edf9a2942d5d76ae39..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/soundfile/info_test.py +++ /dev/null @@ -1,199 +0,0 @@ -#this code is from: https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/backend/soundfile/info_test.py - -import tarfile -import warnings -import unittest -from unittest.mock import patch - -import paddle -from paddlespeech.audio._internal import module_utils as _mod_utils -from paddlespeech.audio.backends import soundfile_backend -from tests.unit.audio.backends.common import get_bits_per_sample, get_encoding -from tests.unit.common_utils import ( - get_wav_data, - nested_params, - save_wav, - TempDirMixin, -) - -from common import parameterize, skipIfFormatNotSupported - -import soundfile - - -class TestInfo(TempDirMixin, unittest.TestCase): - @parameterize( - ["float32", "int32"], - [8000, 16000], - [1, 2], - ) - def test_wav(self, dtype, sample_rate, num_channels): - """`soundfile_backend.info` can check wav file correctly""" - duration = 1 - path = self.get_temp_path("data.wav") - data = get_wav_data(dtype, num_channels, normalize=False, num_frames=duration * sample_rate) - save_wav(path, data, sample_rate) - info = soundfile_backend.info(path) - assert info.sample_rate == sample_rate - assert info.num_frames == sample_rate * duration - assert info.num_channels == num_channels - assert info.bits_per_sample == get_bits_per_sample("wav", dtype) - assert info.encoding == get_encoding("wav", dtype) - - @parameterize([8000, 16000], [1, 2]) - @skipIfFormatNotSupported("FLAC") - def test_flac(self, sample_rate, num_channels): - """`soundfile_backend.info` can check flac file correctly""" - duration = 1 - num_frames = sample_rate * duration - #data = torch.randn(num_frames, num_channels).numpy() - data = paddle.randn(shape=[num_frames, num_channels]).numpy() - - path = self.get_temp_path("data.flac") - soundfile.write(path, data, sample_rate) - - info = soundfile_backend.info(path) - assert info.sample_rate == sample_rate - assert info.num_frames == num_frames - assert info.num_channels == num_channels - assert info.bits_per_sample == 16 - assert info.encoding == "FLAC" - - #@parameterize([8000, 16000], [1, 2]) - #@skipIfFormatNotSupported("OGG") - #def test_ogg(self, sample_rate, num_channels): - #"""`soundfile_backend.info` can check ogg file correctly""" - #duration = 1 - #num_frames = sample_rate * duration - ##data = torch.randn(num_frames, num_channels).numpy() - #data = paddle.randn(shape=[num_frames, num_channels]).numpy() - #print(len(data)) - #path = self.get_temp_path("data.ogg") - #soundfile.write(path, data, sample_rate) - - #info = soundfile_backend.info(path) - #print(info) - #assert info.sample_rate == sample_rate - #print("info") - #print(info.num_frames) - #print("jiji") - #print(sample_rate*duration) - ##assert info.num_frames == sample_rate * duration - #assert info.num_channels == num_channels - #assert info.bits_per_sample == 0 - #assert info.encoding == "VORBIS" - - @nested_params( - [8000, 16000], - [1, 2], - [("PCM_24", 24), ("PCM_32", 32)], - ) - @skipIfFormatNotSupported("NIST") - def test_sphere(self, sample_rate, num_channels, subtype_and_bit_depth): - """`soundfile_backend.info` can check sph file correctly""" - duration = 1 - num_frames = sample_rate * duration - #data = torch.randn(num_frames, num_channels).numpy() - data = paddle.randn(shape=[num_frames, num_channels]).numpy() - path = self.get_temp_path("data.nist") - subtype, bits_per_sample = subtype_and_bit_depth - soundfile.write(path, data, sample_rate, subtype=subtype) - - info = soundfile_backend.info(path) - assert info.sample_rate == sample_rate - assert info.num_frames == sample_rate * duration - assert info.num_channels == num_channels - assert info.bits_per_sample == bits_per_sample - assert info.encoding == "PCM_S" - - def test_unknown_subtype_warning(self): - """soundfile_backend.info issues a warning when the subtype is unknown - - This will happen if a new subtype is supported in SoundFile: the _SUBTYPE_TO_BITS_PER_SAMPLE - dict should be updated. - """ - - def _mock_info_func(_): - class MockSoundFileInfo: - samplerate = 8000 - frames = 356 - channels = 2 - subtype = "UNSEEN_SUBTYPE" - format = "UNKNOWN" - - return MockSoundFileInfo() - - with patch("soundfile.info", _mock_info_func): - with warnings.catch_warnings(record=True) as w: - info = soundfile_backend.info("foo") - assert len(w) == 1 - assert "UNSEEN_SUBTYPE subtype is unknown to PaddleAudio" in str(w[-1].message) - assert info.bits_per_sample == 0 - - -class TestFileObject(TempDirMixin, unittest.TestCase): - def _test_fileobj(self, ext, subtype, bits_per_sample): - """Query audio via file-like object works""" - duration = 2 - sample_rate = 16000 - num_channels = 2 - num_frames = sample_rate * duration - path = self.get_temp_path(f"test.{ext}") - - #data = torch.randn(num_frames, num_channels).numpy() - data = paddle.randn(shape=[num_frames, num_channels]).numpy() - soundfile.write(path, data, sample_rate, subtype=subtype) - - with open(path, "rb") as fileobj: - info = soundfile_backend.info(fileobj) - assert info.sample_rate == sample_rate - assert info.num_frames == num_frames - assert info.num_channels == num_channels - assert info.bits_per_sample == bits_per_sample - assert info.encoding == "FLAC" if ext == "flac" else "PCM_S" - - def test_fileobj_wav(self): - """Loading audio via file-like object works""" - self._test_fileobj("wav", "PCM_16", 16) - - @skipIfFormatNotSupported("FLAC") - def test_fileobj_flac(self): - """Loading audio via file-like object works""" - self._test_fileobj("flac", "PCM_16", 16) - - def _test_tarobj(self, ext, subtype, bits_per_sample): - """Query compressed audio via file-like object works""" - duration = 2 - sample_rate = 16000 - num_channels = 2 - num_frames = sample_rate * duration - audio_file = f"test.{ext}" - audio_path = self.get_temp_path(audio_file) - archive_path = self.get_temp_path("archive.tar.gz") - - #data = torch.randn(num_frames, num_channels).numpy() - data = paddle.randn(shape=[num_frames, num_channels]).numpy() - soundfile.write(audio_path, data, sample_rate, subtype=subtype) - - with tarfile.TarFile(archive_path, "w") as tarobj: - tarobj.add(audio_path, arcname=audio_file) - with tarfile.TarFile(archive_path, "r") as tarobj: - fileobj = tarobj.extractfile(audio_file) - info = soundfile_backend.info(fileobj) - assert info.sample_rate == sample_rate - assert info.num_frames == num_frames - assert info.num_channels == num_channels - assert info.bits_per_sample == bits_per_sample - assert info.encoding == "FLAC" if ext == "flac" else "PCM_S" - - def test_tarobj_wav(self): - """Query compressed audio via file-like object works""" - self._test_tarobj("wav", "PCM_16", 16) - - @skipIfFormatNotSupported("FLAC") - def test_tarobj_flac(self): - """Query compressed audio via file-like object works""" - self._test_tarobj("flac", "PCM_16", 16) - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/unit/audio/backends/soundfile/load_test.py b/tests/unit/audio/backends/soundfile/load_test.py deleted file mode 100644 index 6260093828db274e0232b6cf9df475a3d97b9303..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/soundfile/load_test.py +++ /dev/null @@ -1,369 +0,0 @@ -#this code is from: https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/backend/soundfile/load_test.py - -import os -import tarfile -import unittest -from unittest.mock import patch -import numpy as np - -from parameterized import parameterized -import paddle -from paddlespeech.audio._internal import module_utils as _mod_utils -from paddlespeech.audio.backends import soundfile_backend -from tests.unit.audio.backends.common import get_bits_per_sample, get_encoding -from tests.unit.common_utils import ( - get_wav_data, - load_wav, - nested_params, - normalize_wav, - save_wav, - TempDirMixin, -) - -from common import dtype2subtype, parameterize, skipIfFormatNotSupported - -import soundfile - - -def _get_mock_path( - ext: str, - dtype: str, - sample_rate: int, - num_channels: int, - num_frames: int, -): - return f"{dtype}_{sample_rate}_{num_channels}_{num_frames}.{ext}" - - -def _get_mock_params(path: str): - filename, ext = path.split(".") - parts = filename.split("_") - return { - "ext": ext, - "dtype": parts[0], - "sample_rate": int(parts[1]), - "num_channels": int(parts[2]), - "num_frames": int(parts[3]), - } - - -class SoundFileMock: - def __init__(self, path, mode): - assert mode == "r" - self.path = path - self._params = _get_mock_params(path) - self._start = None - - @property - def samplerate(self): - return self._params["sample_rate"] - - @property - def format(self): - if self._params["ext"] == "wav": - return "WAV" - if self._params["ext"] == "flac": - return "FLAC" - if self._params["ext"] == "ogg": - return "OGG" - if self._params["ext"] in ["sph", "nis", "nist"]: - return "NIST" - - @property - def subtype(self): - if self._params["ext"] == "ogg": - return "VORBIS" - return dtype2subtype(self._params["dtype"]) - - def _prepare_read(self, start, stop, frames): - assert stop is None - self._start = start - return frames - - def read(self, frames, dtype, always_2d): - assert always_2d - data = get_wav_data( - dtype, - self._params["num_channels"], - normalize=False, - num_frames=self._params["num_frames"], - channels_first=False, - ).numpy() - return data[self._start : self._start + frames] - - def __enter__(self): - return self - - def __exit__(self, *args, **kwargs): - pass - - -class MockedLoadTest(unittest.TestCase): - def assert_dtype(self, ext, dtype, sample_rate, num_channels, normalize, channels_first): - """When format is WAV or NIST, normalize=False will return the native dtype Tensor, otherwise float32""" - num_frames = 3 * sample_rate - path = _get_mock_path(ext, dtype, sample_rate, num_channels, num_frames) - expected_dtype = paddle.float32 if normalize or ext not in ["wav", "nist"] else getattr(paddle, dtype) - with patch("soundfile.SoundFile", SoundFileMock): - found, sr = soundfile_backend.load(path, normalize=normalize, channels_first=channels_first) - assert found.dtype == expected_dtype - assert sample_rate == sr - - @parameterize( - ["int32", "float32", "float64"], - [8000, 16000], - [1, 2], - [True, False], - [True, False], - ) - def test_wav(self, dtype, sample_rate, num_channels, normalize, channels_first): - """Returns native dtype when normalize=False else float32""" - self.assert_dtype("wav", dtype, sample_rate, num_channels, normalize, channels_first) - - @parameterize( - ["int32"], - [8000, 16000], - [1, 2], - [True, False], - [True, False], - ) - def test_sphere(self, dtype, sample_rate, num_channels, normalize, channels_first): - """Returns float32 always""" - self.assert_dtype("sph", dtype, sample_rate, num_channels, normalize, channels_first) - - @parameterize([8000, 16000], [1, 2], [True, False], [True, False]) - def test_ogg(self, sample_rate, num_channels, normalize, channels_first): - """Returns float32 always""" - self.assert_dtype("ogg", "int16", sample_rate, num_channels, normalize, channels_first) - - @parameterize([8000, 16000], [1, 2], [True, False], [True, False]) - def test_flac(self, sample_rate, num_channels, normalize, channels_first): - """`soundfile_backend.load` can load ogg format.""" - self.assert_dtype("flac", "int16", sample_rate, num_channels, normalize, channels_first) - - -class LoadTestBase(TempDirMixin, unittest.TestCase): - def assert_wav( - self, - dtype, - sample_rate, - num_channels, - normalize, - channels_first=True, - duration=1, - ): - """`soundfile_backend.load` can load wav format correctly. - - Wav data loaded with soundfile backend should match those with scipy - """ - path = self.get_temp_path("reference.wav") - num_frames = duration * sample_rate - data = get_wav_data( - dtype, - num_channels, - normalize=normalize, - num_frames=num_frames, - channels_first=channels_first, - ) - save_wav(path, data, sample_rate, channels_first=channels_first) - expected = load_wav(path, normalize=normalize, channels_first=channels_first)[0] - data, sr = soundfile_backend.load(path, normalize=normalize, channels_first=channels_first) - assert sr == sample_rate - np.testing.assert_array_almost_equal(data.numpy(), expected.numpy()) - - def assert_sphere( - self, - dtype, - sample_rate, - num_channels, - channels_first=True, - duration=1, - ): - """`soundfile_backend.load` can load SPHERE format correctly.""" - path = self.get_temp_path("reference.sph") - num_frames = duration * sample_rate - raw = get_wav_data( - dtype, - num_channels, - num_frames=num_frames, - normalize=False, - channels_first=False, - ) - soundfile.write(path, raw, sample_rate, subtype=dtype2subtype(dtype), format="NIST") - expected = normalize_wav(raw.t() if channels_first else raw) - data, sr = soundfile_backend.load(path, channels_first=channels_first) - assert sr == sample_rate - #self.assertEqual(data, expected, atol=1e-4, rtol=1e-8) - np.testing.assert_array_almost_equal(data.numpy(), expected.numpy()) - - def assert_flac( - self, - dtype, - sample_rate, - num_channels, - channels_first=True, - duration=1, - ): - """`soundfile_backend.load` can load FLAC format correctly.""" - path = self.get_temp_path("reference.flac") - num_frames = duration * sample_rate - raw = get_wav_data( - dtype, - num_channels, - num_frames=num_frames, - normalize=False, - channels_first=False, - ) - soundfile.write(path, raw, sample_rate) - expected = normalize_wav(raw.t() if channels_first else raw) - data, sr = soundfile_backend.load(path, channels_first=channels_first) - assert sr == sample_rate - #self.assertEqual(data, expected, atol=1e-4, rtol=1e-8) - np.testing.assert_array_almost_equal(data.numpy(), expected.numpy()) - - - -class TestLoad(LoadTestBase): - """Test the correctness of `soundfile_backend.load` for various formats""" - - @parameterize( - ["float32", "int32"], - [8000, 16000], - [1, 2], - [False, True], - [False, True], - ) - def test_wav(self, dtype, sample_rate, num_channels, normalize, channels_first): - """`soundfile_backend.load` can load wav format correctly.""" - self.assert_wav(dtype, sample_rate, num_channels, normalize, channels_first) - - @parameterize( - ["int32"], - [16000], - [2], - [False], - ) - def test_wav_large(self, dtype, sample_rate, num_channels, normalize): - """`soundfile_backend.load` can load large wav file correctly.""" - two_hours = 2 * 60 * 60 - self.assert_wav(dtype, sample_rate, num_channels, normalize, duration=two_hours) - - @parameterize(["float32", "int32"], [4, 8, 16, 32], [False, True]) - def test_multiple_channels(self, dtype, num_channels, channels_first): - """`soundfile_backend.load` can load wav file with more than 2 channels.""" - sample_rate = 8000 - normalize = False - self.assert_wav(dtype, sample_rate, num_channels, normalize, channels_first) - - #@parameterize(["int32"], [8000, 16000], [1, 2], [False, True]) - #@skipIfFormatNotSupported("NIST") - #def test_sphere(self, dtype, sample_rate, num_channels, channels_first): - #"""`soundfile_backend.load` can load sphere format correctly.""" - #self.assert_sphere(dtype, sample_rate, num_channels, channels_first) - - #@parameterize(["int32"], [8000, 16000], [1, 2], [False, True]) - #@skipIfFormatNotSupported("FLAC") - #def test_flac(self, dtype, sample_rate, num_channels, channels_first): - #"""`soundfile_backend.load` can load flac format correctly.""" - #self.assert_flac(dtype, sample_rate, num_channels, channels_first) - - -class TestLoadFormat(TempDirMixin, unittest.TestCase): - """Given `format` parameter, `so.load` can load files without extension""" - - original = None - path = None - - def _make_file(self, format_): - sample_rate = 8000 - path_with_ext = self.get_temp_path(f"test.{format_}") - data = get_wav_data("float32", num_channels=2).numpy().T - soundfile.write(path_with_ext, data, sample_rate) - expected = soundfile.read(path_with_ext, dtype="float32")[0].T - path = os.path.splitext(path_with_ext)[0] - os.rename(path_with_ext, path) - return path, expected - - def _test_format(self, format_): - """Providing format allows to read file without extension""" - path, expected = self._make_file(format_) - found, _ = soundfile_backend.load(path) - #self.assertEqual(found, expected) - np.testing.assert_array_almost_equal(found, expected) - - @parameterized.expand( - [ - ("WAV",), - ("wav",), - ] - ) - def test_wav(self, format_): - self._test_format(format_) - - @parameterized.expand( - [ - ("FLAC",), - ("flac",), - ] - ) - @skipIfFormatNotSupported("FLAC") - def test_flac(self, format_): - self._test_format(format_) - - -class TestFileObject(TempDirMixin, unittest.TestCase): - def _test_fileobj(self, ext): - """Loading audio via file-like object works""" - sample_rate = 16000 - path = self.get_temp_path(f"test.{ext}") - - data = get_wav_data("float32", num_channels=2).numpy().T - soundfile.write(path, data, sample_rate) - expected = soundfile.read(path, dtype="float32")[0].T - - with open(path, "rb") as fileobj: - found, sr = soundfile_backend.load(fileobj) - assert sr == sample_rate - #self.assertEqual(expected, found) - np.testing.assert_array_almost_equal(found, expected) - - def test_fileobj_wav(self): - """Loading audio via file-like object works""" - self._test_fileobj("wav") - - def test_fileobj_flac(self): - """Loading audio via file-like object works""" - self._test_fileobj("flac") - - def _test_tarfile(self, ext): - """Loading audio via file-like object works""" - sample_rate = 16000 - audio_file = f"test.{ext}" - audio_path = self.get_temp_path(audio_file) - archive_path = self.get_temp_path("archive.tar.gz") - - data = get_wav_data("float32", num_channels=2).numpy().T - soundfile.write(audio_path, data, sample_rate) - expected = soundfile.read(audio_path, dtype="float32")[0].T - - with tarfile.TarFile(archive_path, "w") as tarobj: - tarobj.add(audio_path, arcname=audio_file) - with tarfile.TarFile(archive_path, "r") as tarobj: - fileobj = tarobj.extractfile(audio_file) - found, sr = soundfile_backend.load(fileobj) - - assert sr == sample_rate - #self.assertEqual(expected, found) - np.testing.assert_array_almost_equal(found.numpy(), expected) - - - def test_tarfile_wav(self): - """Loading audio via file-like object works""" - self._test_tarfile("wav") - - def test_tarfile_flac(self): - """Loading audio via file-like object works""" - self._test_tarfile("flac") - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/unit/audio/backends/soundfile/save_test.py b/tests/unit/audio/backends/soundfile/save_test.py deleted file mode 100644 index 9139d84cd08d7a053117246fd03f10af8ef9a9fe..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/soundfile/save_test.py +++ /dev/null @@ -1,322 +0,0 @@ -import io -import unittest -from unittest.mock import patch - -from paddlespeech.audio._internal import module_utils as _mod_utils -from paddlespeech.audio.backends import soundfile_backend -from tests.unit.common_utils import ( - get_wav_data, - load_wav, - nested_params, - normalize_wav, - save_wav, - TempDirMixin, -) - -from common import fetch_wav_subtype, parameterize, skipIfFormatNotSupported - -import paddle -import numpy as np - -import soundfile - - -class MockedSaveTest(unittest.TestCase): - @nested_params( - ["float32", "int32"], - [8000, 16000], - [1, 2], - [False, True], - [ - (None, None), - ("PCM_U", None), - ("PCM_U", 8), - ("PCM_S", None), - ("PCM_S", 16), - ("PCM_S", 32), - ("PCM_F", None), - ("PCM_F", 32), - ("PCM_F", 64), - ("ULAW", None), - ("ULAW", 8), - ("ALAW", None), - ("ALAW", 8), - ], - ) - @patch("soundfile.write") - def test_wav(self, dtype, sample_rate, num_channels, channels_first, enc_params, mocked_write): - """soundfile_backend.save passes correct subtype to soundfile.write when WAV""" - filepath = "foo.wav" - input_tensor = get_wav_data( - dtype, - num_channels, - num_frames=3 * sample_rate, - normalize=dtype == "float32", - channels_first=channels_first, - ) - input_tensor = paddle.transpose(input_tensor, [1, 0]) - - encoding, bits_per_sample = enc_params - soundfile_backend.save( - filepath, - input_tensor, - sample_rate, - channels_first=channels_first, - encoding=encoding, - bits_per_sample=bits_per_sample, - ) - - # on +Py3.8 call_args.kwargs is more descreptive - args = mocked_write.call_args[1] - assert args["file"] == filepath - assert args["samplerate"] == sample_rate - assert args["subtype"] == fetch_wav_subtype(dtype, encoding, bits_per_sample) - assert args["format"] is None - tensor_result = paddle.transpose(input_tensor, [1, 0]) if channels_first else input_tensor - #self.assertEqual(args["data"], tensor_result.numpy()) - np.testing.assert_array_almost_equal(args["data"].numpy(), tensor_result.numpy()) - - - - @patch("soundfile.write") - def assert_non_wav( - self, - fmt, - dtype, - sample_rate, - num_channels, - channels_first, - mocked_write, - encoding=None, - bits_per_sample=None, - ): - """soundfile_backend.save passes correct subtype and format to soundfile.write when SPHERE""" - filepath = f"foo.{fmt}" - input_tensor = get_wav_data( - dtype, - num_channels, - num_frames=3 * sample_rate, - normalize=False, - channels_first=channels_first, - ) - input_tensor = paddle.transpose(input_tensor, [1, 0]) - - expected_data = paddle.transpose(input_tensor, [1, 0]) if channels_first else input_tensor - - soundfile_backend.save( - filepath, - input_tensor, - sample_rate, - channels_first, - encoding=encoding, - bits_per_sample=bits_per_sample, - ) - - # on +Py3.8 call_args.kwargs is more descreptive - args = mocked_write.call_args[1] - assert args["file"] == filepath - assert args["samplerate"] == sample_rate - if fmt in ["sph", "nist", "nis"]: - assert args["format"] == "NIST" - else: - assert args["format"] is None - np.testing.assert_array_almost_equal(args["data"].numpy(), expected_data.numpy()) - #self.assertEqual(args["data"], expected_data) - - @nested_params( - ["sph", "nist", "nis"], - ["int32"], - [8000, 16000], - [1, 2], - [False, True], - [ - ("PCM_S", 8), - ("PCM_S", 16), - ("PCM_S", 24), - ("PCM_S", 32), - ("ULAW", 8), - ("ALAW", 8), - ("ALAW", 16), - ("ALAW", 24), - ("ALAW", 32), - ], - ) - def test_sph(self, fmt, dtype, sample_rate, num_channels, channels_first, enc_params): - """soundfile_backend.save passes default format and subtype (None-s) to - soundfile.write when not WAV""" - encoding, bits_per_sample = enc_params - self.assert_non_wav( - fmt, dtype, sample_rate, num_channels, channels_first, encoding=encoding, bits_per_sample=bits_per_sample - ) - - @parameterize( - ["int32"], - [8000, 16000], - [1, 2], - [False, True], - [8, 16, 24], - ) - def test_flac(self, dtype, sample_rate, num_channels, channels_first, bits_per_sample): - """soundfile_backend.save passes default format and subtype (None-s) to - soundfile.write when not WAV""" - self.assert_non_wav("flac", dtype, sample_rate, num_channels, channels_first, bits_per_sample=bits_per_sample) - - @parameterize( - ["int32"], - [8000, 16000], - [1, 2], - [False, True], - ) - def test_ogg(self, dtype, sample_rate, num_channels, channels_first): - """soundfile_backend.save passes default format and subtype (None-s) to - soundfile.write when not WAV""" - self.assert_non_wav("ogg", dtype, sample_rate, num_channels, channels_first) - - -class SaveTestBase(TempDirMixin, unittest.TestCase): - def assert_wav(self, dtype, sample_rate, num_channels, num_frames): - """`soundfile_backend.save` can save wav format.""" - path = self.get_temp_path("data.wav") - expected = get_wav_data(dtype, num_channels, num_frames=num_frames, normalize=False) - soundfile_backend.save(path, expected, sample_rate) - found, sr = load_wav(path, normalize=False) - assert sample_rate == sr - #self.assertEqual(found, expected) - np.testing.assert_array_almost_equal(found.numpy(), expected.numpy()) - - def _assert_non_wav(self, fmt, dtype, sample_rate, num_channels): - """`soundfile_backend.save` can save non-wav format. - - Due to precision missmatch, and the lack of alternative way to decode the - resulting files without using soundfile, only meta data are validated. - """ - num_frames = sample_rate * 3 - path = self.get_temp_path(f"data.{fmt}") - expected = get_wav_data(dtype, num_channels, num_frames=num_frames, normalize=False) - soundfile_backend.save(path, expected, sample_rate) - sinfo = soundfile.info(path) - assert sinfo.format == fmt.upper() - #assert sinfo.frames == num_frames this go wrong - assert sinfo.channels == num_channels - assert sinfo.samplerate == sample_rate - - def assert_flac(self, dtype, sample_rate, num_channels): - """`soundfile_backend.save` can save flac format.""" - self._assert_non_wav("flac", dtype, sample_rate, num_channels) - - def assert_sphere(self, dtype, sample_rate, num_channels): - """`soundfile_backend.save` can save sph format.""" - self._assert_non_wav("nist", dtype, sample_rate, num_channels) - - def assert_ogg(self, dtype, sample_rate, num_channels): - """`soundfile_backend.save` can save ogg format. - - As we cannot inspect the OGG format (it's lossy), we only check the metadata. - """ - self._assert_non_wav("ogg", dtype, sample_rate, num_channels) - - -class TestSave(SaveTestBase): - @parameterize( - ["float32", "int32"], - [8000, 16000], - [1, 2], - ) - def test_wav(self, dtype, sample_rate, num_channels): - """`soundfile_backend.save` can save wav format.""" - self.assert_wav(dtype, sample_rate, num_channels, num_frames=None) - - @parameterize( - ["float32", "int32"], - [4, 8, 16, 32], - ) - def test_multiple_channels(self, dtype, num_channels): - """`soundfile_backend.save` can save wav with more than 2 channels.""" - sample_rate = 8000 - self.assert_wav(dtype, sample_rate, num_channels, num_frames=None) - - @parameterize( - ["int32"], - [8000, 16000], - [1, 2], - ) - @skipIfFormatNotSupported("NIST") - def test_sphere(self, dtype, sample_rate, num_channels): - """`soundfile_backend.save` can save sph format.""" - self.assert_sphere(dtype, sample_rate, num_channels) - - @parameterize( - [8000, 16000], - [1, 2], - ) - @skipIfFormatNotSupported("FLAC") - def test_flac(self, sample_rate, num_channels): - """`soundfile_backend.save` can save flac format.""" - self.assert_flac("float32", sample_rate, num_channels) - - @parameterize( - [8000, 16000], - [1, 2], - ) - @skipIfFormatNotSupported("OGG") - def test_ogg(self, sample_rate, num_channels): - """`soundfile_backend.save` can save ogg/vorbis format.""" - self.assert_ogg("float32", sample_rate, num_channels) - - -class TestSaveParams(TempDirMixin, unittest.TestCase): - """Test the correctness of optional parameters of `soundfile_backend.save`""" - - @parameterize([True, False]) - def test_channels_first(self, channels_first): - """channels_first swaps axes""" - path = self.get_temp_path("data.wav") - data = get_wav_data("int32", 2, channels_first=channels_first) - soundfile_backend.save(path, data, 8000, channels_first=channels_first) - found = load_wav(path)[0] - expected = data if channels_first else data.transpose([1, 0]) - #self.assertEqual(found, expected, atol=1e-4, rtol=1e-8) - np.testing.assert_array_almost_equal(found.numpy(), expected.numpy()) - - -class TestFileObject(TempDirMixin, unittest.TestCase): - def _test_fileobj(self, ext): - """Saving audio to file-like object works""" - sample_rate = 16000 - path = self.get_temp_path(f"test.{ext}") - - subtype = "FLOAT" if ext == "wav" else None - data = get_wav_data("float32", num_channels=2) - soundfile.write(path, data.numpy().T, sample_rate, subtype=subtype) - expected = soundfile.read(path, dtype="float32")[0] - - fileobj = io.BytesIO() - soundfile_backend.save(fileobj, data, sample_rate, format=ext) - fileobj.seek(0) - found, sr = soundfile.read(fileobj, dtype="float32") - - assert sr == sample_rate - #self.assertEqual(expected, found, atol=1e-4, rtol=1e-8) - np.testing.assert_array_almost_equal(found, expected) - - def test_fileobj_wav(self): - """Saving audio via file-like object works""" - self._test_fileobj("wav") - - @skipIfFormatNotSupported("FLAC") - def test_fileobj_flac(self): - """Saving audio via file-like object works""" - self._test_fileobj("flac") - - @skipIfFormatNotSupported("NIST") - def test_fileobj_nist(self): - """Saving audio via file-like object works""" - self._test_fileobj("NIST") - - @skipIfFormatNotSupported("OGG") - def test_fileobj_ogg(self): - """Saving audio via file-like object works""" - self._test_fileobj("OGG") - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/unit/audio/backends/soundfile/test_io.py b/tests/unit/audio/backends/soundfile/test_io.py deleted file mode 100644 index 26276751f7d89579bed8d958699ab0ac348f0414..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/soundfile/test_io.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import filecmp -import os -import unittest - -import numpy as np -import soundfile as sf - -import paddlespeech.audio -from ..base import BackendTest - - -class TestIO(BackendTest): - def test_load_mono_channel(self): - sf_data, sf_sr = sf.read(self.files[0]) - pa_data, pa_sr = paddlespeech.audio.load( - self.files[0], normal=False, dtype='float64') - - self.assertEqual(sf_data.dtype, pa_data.dtype) - self.assertEqual(sf_sr, pa_sr) - np.testing.assert_array_almost_equal(sf_data, pa_data) - - def test_load_multi_channels(self): - sf_data, sf_sr = sf.read(self.files[1]) - sf_data = sf_data.T # Channel dim first - pa_data, pa_sr = paddlespeech.audio.load( - self.files[1], mono=False, normal=False, dtype='float64') - - self.assertEqual(sf_data.dtype, pa_data.dtype) - self.assertEqual(sf_sr, pa_sr) - np.testing.assert_array_almost_equal(sf_data, pa_data) - - def test_save_mono_channel(self): - waveform, sr = np.random.randint( - low=-32768, high=32768, size=(48000), dtype=np.int16), 16000 - sf_tmp_file = 'sf_tmp.wav' - pa_tmp_file = 'pa_tmp.wav' - - sf.write(sf_tmp_file, waveform, sr) - paddlespeech.audio.save(waveform, sr, pa_tmp_file) - - self.assertTrue(filecmp.cmp(sf_tmp_file, pa_tmp_file)) - for file in [sf_tmp_file, pa_tmp_file]: - os.remove(file) - - def test_save_multi_channels(self): - waveform, sr = np.random.randint( - low=-32768, high=32768, size=(2, 48000), dtype=np.int16), 16000 - sf_tmp_file = 'sf_tmp.wav' - pa_tmp_file = 'pa_tmp.wav' - - sf.write(sf_tmp_file, waveform.T, sr) - paddlespeech.audio.save(waveform.T, sr, pa_tmp_file) - - self.assertTrue(filecmp.cmp(sf_tmp_file, pa_tmp_file)) - for file in [sf_tmp_file, pa_tmp_file]: - os.remove(file) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/audio/backends/sox_io/info_test.py b/tests/unit/audio/backends/sox_io/info_test.py deleted file mode 100644 index 077d6051d21c0acd03922b21bd795412251cda18..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/sox_io/info_test.py +++ /dev/null @@ -1,289 +0,0 @@ -import unittest -import itertools -import tarfile -from contextlib import contextmanager - -import numpy as np -import paddle -import os -import io - -from parameterized import parameterized -from tests.unit.audio.backends.common import get_bits_per_sample, get_encoding -from paddlespeech.audio.backends import sox_io_backend - -from tests.unit.common_utils import ( - get_wav_data, - load_wav, - save_wav, - TempDirMixin, - sox_utils, - data_utils -) - -#code is from:https://github.com/pytorch/audio/blob/main/torchaudio/test/torchaudio_unittest/backend/sox_io/info_test.py - -class TestInfo(TempDirMixin, unittest.TestCase): - @parameterized.expand( - list( - itertools.product( - ["float32", "int32",], - [8000, 16000], - [1, 2], - ) - ), - ) - def test_wav(self, dtype, sample_rate, num_channels): - """`sox_io_backend.info` can check wav file correctly""" - duration = 1 - path = self.get_temp_path("data.wav") - data = get_wav_data(dtype, num_channels, normalize=False, num_frames=duration * sample_rate) - save_wav(path, data, sample_rate) - info = sox_io_backend.info(path) - assert info.sample_rate == sample_rate - assert info.num_frames == sample_rate * duration - assert info.num_channels == num_channels - assert info.bits_per_sample == sox_utils.get_bit_depth(dtype) - assert info.encoding == get_encoding("wav", dtype) - - @parameterized.expand( - list( - itertools.product( - ["float32", "int32"], - [8000, 16000], - [4, 8, 16, 32], - ) - ), - ) - def test_wav_multiple_channels(self, dtype, sample_rate, num_channels): - """`sox_io_backend.info` can check wav file with channels more than 2 correctly""" - duration = 1 - path = self.get_temp_path("data.wav") - data = get_wav_data(dtype, num_channels, normalize=False, num_frames=duration * sample_rate) - save_wav(path, data, sample_rate) - info = sox_io_backend.info(path) - assert info.sample_rate == sample_rate - assert info.num_frames == sample_rate * duration - assert info.num_channels == num_channels - assert info.bits_per_sample == sox_utils.get_bit_depth(dtype) - - def test_ulaw(self): - """`sox_io_backend.info` can check ulaw file correctly""" - duration = 1 - num_channels = 1 - sample_rate = 8000 - path = self.get_temp_path("data.wav") - sox_utils.gen_audio_file( - path, sample_rate=sample_rate, num_channels=num_channels, bit_depth=8, encoding="u-law", duration=duration - ) - info = sox_io_backend.info(path) - assert info.sample_rate == sample_rate - assert info.num_frames == sample_rate * duration - assert info.num_channels == num_channels - assert info.bits_per_sample == 8 - assert info.encoding == "ULAW" - - def test_alaw(self): - """`sox_io_backend.info` can check alaw file correctly""" - duration = 1 - num_channels = 1 - sample_rate = 8000 - path = self.get_temp_path("data.wav") - sox_utils.gen_audio_file( - path, sample_rate=sample_rate, num_channels=num_channels, bit_depth=8, encoding="a-law", duration=duration - ) - info = sox_io_backend.info(path) - assert info.sample_rate == sample_rate - assert info.num_frames == sample_rate * duration - assert info.num_channels == num_channels - assert info.bits_per_sample == 8 - assert info.encoding == "ALAW" - -#class TestInfoOpus(unittest.TestCase): - #@parameterized.expand( - #list( - #itertools.product( - #["96k"], - #[1, 2], - #[0, 5, 10], - #) - #), - #) - #def test_opus(self, bitrate, num_channels, compression_level): - #"""`sox_io_backend.info` can check opus file correcty""" - #path = data_utils.get_asset_path("io", f"{bitrate}_{compression_level}_{num_channels}ch.opus") - #info = sox_io_backend.info(path) - #assert info.sample_rate == 48000 - #assert info.num_frames == 32768 - #assert info.num_channels == num_channels - #assert info.bits_per_sample == 0 # bit_per_sample is irrelevant for compressed formats - #assert info.encoding == "OPUS" - -class FileObjTestBase(TempDirMixin): - def _gen_file(self, ext, dtype, sample_rate, num_channels, num_frames, *, comments=None): - path = self.get_temp_path(f"test.{ext}") - bit_depth = sox_utils.get_bit_depth(dtype) - duration = num_frames / sample_rate - comment_file = self._gen_comment_file(comments) if comments else None - - sox_utils.gen_audio_file( - path, - sample_rate, - num_channels=num_channels, - encoding=sox_utils.get_encoding(dtype), - bit_depth=bit_depth, - duration=duration, - comment_file=comment_file, - ) - return path - - def _gen_comment_file(self, comments): - comment_path = self.get_temp_path("comment.txt") - with open(comment_path, "w") as file_: - file_.writelines(comments) - return comment_path - -class Unseekable: - def __init__(self, fileobj): - self.fileobj = fileobj - - def read(self, n): - return self.fileobj.read(n) - -class TestFileObject(FileObjTestBase, unittest.TestCase): - def _query_fileobj(self, ext, dtype, sample_rate, num_channels, num_frames, *, comments=None): - path = self._gen_file(ext, dtype, sample_rate, num_channels, num_frames, comments=comments) - format_ = ext if ext in ["mp3"] else None - with open(path, "rb") as fileobj: - return sox_io_backend.info(fileobj, format_) - - def _query_bytesio(self, ext, dtype, sample_rate, num_channels, num_frames): - path = self._gen_file(ext, dtype, sample_rate, num_channels, num_frames) - format_ = ext if ext in ["mp3"] else None - with open(path, "rb") as file_: - fileobj = io.BytesIO(file_.read()) - return sox_io_backend.info(fileobj, format_) - - def _query_tarfile(self, ext, dtype, sample_rate, num_channels, num_frames): - audio_path = self._gen_file(ext, dtype, sample_rate, num_channels, num_frames) - audio_file = os.path.basename(audio_path) - archive_path = self.get_temp_path("archive.tar.gz") - with tarfile.TarFile(archive_path, "w") as tarobj: - tarobj.add(audio_path, arcname=audio_file) - format_ = ext if ext in ["mp3"] else None - with tarfile.TarFile(archive_path, "r") as tarobj: - fileobj = tarobj.extractfile(audio_file) - return sox_io_backend.info(fileobj, format_) - - @contextmanager - def _set_buffer_size(self, buffer_size): - try: - original_buffer_size = get_buffer_size() - set_buffer_size(buffer_size) - yield - finally: - set_buffer_size(original_buffer_size) - - @parameterized.expand( - [ - ("wav", "float32"), - ("wav", "int32"), - ("wav", "int16"), - ("wav", "uint8"), - ] - ) - def test_fileobj(self, ext, dtype): - """Querying audio via file object works""" - sample_rate = 16000 - num_frames = 3 * sample_rate - num_channels = 2 - sinfo = self._query_fileobj(ext, dtype, sample_rate, num_channels, num_frames) - - bits_per_sample = get_bits_per_sample(ext, dtype) - num_frames = 0 if ext in ["mp3", "vorbis"] else num_frames - - assert sinfo.sample_rate == sample_rate - assert sinfo.num_channels == num_channels - assert sinfo.num_frames == num_frames - assert sinfo.bits_per_sample == bits_per_sample - assert sinfo.encoding == get_encoding(ext, dtype) - - @parameterized.expand( - [ - ("wav", "float32"), - ("wav", "int32"), - ("wav", "int16"), - ("wav", "uint8"), - ] - ) - def test_bytesio(self, ext, dtype): - """Querying audio via ByteIO object works for small data""" - sample_rate = 16000 - num_frames = 3 * sample_rate - num_channels = 2 - sinfo = self._query_bytesio(ext, dtype, sample_rate, num_channels, num_frames) - - bits_per_sample = get_bits_per_sample(ext, dtype) - num_frames = 0 if ext in ["mp3", "vorbis"] else num_frames - - assert sinfo.sample_rate == sample_rate - assert sinfo.num_channels == num_channels - assert sinfo.num_frames == num_frames - assert sinfo.bits_per_sample == bits_per_sample - assert sinfo.encoding == get_encoding(ext, dtype) - - @parameterized.expand( - [ - ("wav", "float32"), - ("wav", "int32"), - ("wav", "int16"), - ("wav", "uint8"), - ] - ) - def test_bytesio_tiny(self, ext, dtype): - """Querying audio via ByteIO object works for small data""" - sample_rate = 8000 - num_frames = 4 - num_channels = 2 - sinfo = self._query_bytesio(ext, dtype, sample_rate, num_channels, num_frames) - - bits_per_sample = get_bits_per_sample(ext, dtype) - num_frames = 0 if ext in ["mp3", "vorbis"] else num_frames - - assert sinfo.sample_rate == sample_rate - assert sinfo.num_channels == num_channels - assert sinfo.num_frames == num_frames - assert sinfo.bits_per_sample == bits_per_sample - assert sinfo.encoding == get_encoding(ext, dtype) - - @parameterized.expand( - [ - ("wav", "float32"), - ("wav", "int32"), - ("wav", "int16"), - ("wav", "uint8"), - ("flac", "float32"), - ("vorbis", "float32"), - ("amb", "int16"), - ] - ) - def test_tarfile(self, ext, dtype): - """Querying compressed audio via file-like object works""" - sample_rate = 16000 - num_frames = 3.0 * sample_rate - num_channels = 2 - sinfo = self._query_tarfile(ext, dtype, sample_rate, num_channels, num_frames) - - bits_per_sample = get_bits_per_sample(ext, dtype) - num_frames = 0 if ext in ["vorbis"] else num_frames - - assert sinfo.sample_rate == sample_rate - assert sinfo.num_channels == num_channels - assert sinfo.num_frames == num_frames - assert sinfo.bits_per_sample == bits_per_sample - assert sinfo.encoding == get_encoding(ext, dtype) - - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/audio/backends/sox_io/load_test.py b/tests/unit/audio/backends/sox_io/load_test.py deleted file mode 100644 index 8e141750beeb90f1117f0aab1fe7b40e6074e617..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/sox_io/load_test.py +++ /dev/null @@ -1,47 +0,0 @@ -import unittest -import itertools - -from parameterized import parameterized -import numpy as np -from paddlespeech.audio._internal import module_utils as _mod_utils -from paddlespeech.audio.backends import sox_io_backend - -from tests.unit.common_utils import ( - get_wav_data, - load_wav, - save_wav, -) - -#code is from:https://github.com/pytorch/audio/blob/main/torchaudio/test/torchaudio_unittest/backend/sox_io/load_test.py - -class TestLoad(unittest.TestCase): - - def assert_wav(self, dtype, sample_rate, num_channels, normalize, duration): - """`sox_io_backend.load` can load wav format correctly. - - Wav data loaded with sox_io backend should match those with scipy - """ - path = 'testdata/reference.wav' - data = get_wav_data(dtype, num_channels, normalize=normalize, num_frames=duration * sample_rate) - save_wav(path, data, sample_rate) - expected = load_wav(path, normalize=normalize)[0] - data, sr = sox_io_backend.load(path, normalize=normalize) - assert sr == sample_rate - np.testing.assert_array_almost_equal(data, expected, decimal=4) - - @parameterized.expand( - list( - itertools.product( - ["float64", "float32", "int32",], - [8000, 16000], - [1, 2], - [False, True], - ) - ), - ) - def test_wav(self, dtype, sample_rate, num_channels, normalize): - """`sox_io_backend.load` can load wav format correctly.""" - self.assert_wav(dtype, sample_rate, num_channels, normalize, duration=1) - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/unit/audio/backends/sox_io/save_test.py b/tests/unit/audio/backends/sox_io/save_test.py deleted file mode 100644 index 7942f018d085fcfbfba7a95dd44743e0508ede20..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/sox_io/save_test.py +++ /dev/null @@ -1,175 +0,0 @@ -import io -import os -import unittest - -import numpy as np -import paddle -from parameterized import parameterized -from paddlespeech.audio.backends import sox_io_backend - -from tests.unit.common_utils import ( - get_wav_data, - load_wav, - save_wav, - nested_params, - TempDirMixin, - sox_utils -) - -#code is from:https://github.com/pytorch/audio/blob/main/torchaudio/test/torchaudio_unittest/backend/sox_io/save_test.py - -def _get_sox_encoding(encoding): - encodings = { - "PCM_F": "floating-point", - "PCM_S": "signed-integer", - "PCM_U": "unsigned-integer", - "ULAW": "u-law", - "ALAW": "a-law", - } - return encodings.get(encoding) - -class TestSaveBase(TempDirMixin): - def assert_save_consistency( - self, - format: str, - *, - compression: float = None, - encoding: str = None, - bits_per_sample: int = None, - sample_rate: float = 8000, - num_channels: int = 2, - num_frames: float = 3 * 8000, - src_dtype: str = "int32", - test_mode: str = "path", - ): - """`save` function produces file that is comparable with `sox` command - - To compare that the file produced by `save` function agains the file produced by - the equivalent `sox` command, we need to load both files. - But there are many formats that cannot be opened with common Python modules (like - SciPy). - So we use `sox` command to prepare the original data and convert the saved files - into a format that SciPy can read (PCM wav). - The following diagram illustrates this process. The difference is 2.1. and 3.1. - - This assumes that - - loading data with SciPy preserves the data well. - - converting the resulting files into WAV format with `sox` preserve the data well. - - x - | 1. Generate source wav file with SciPy - | - v - -------------- wav ---------------- - | | - | 2.1. load with scipy | 3.1. Convert to the target - | then save it into the target | format depth with sox - | format with paddleaudio | - v v - target format target format - | | - | 2.2. Convert to wav with sox | 3.2. Convert to wav with sox - | | - v v - wav wav - | | - | 2.3. load with scipy | 3.3. load with scipy - | | - v v - tensor -------> compare <--------- tensor - - """ - cmp_encoding = "floating-point" - cmp_bit_depth = 32 - - src_path = self.get_temp_path("1.source.wav") - tgt_path = self.get_temp_path(f"2.1.paddleaudio.{format}") - tst_path = self.get_temp_path("2.2.result.wav") - sox_path = self.get_temp_path(f"3.1.sox.{format}") - ref_path = self.get_temp_path("3.2.ref.wav") - - # 1. Generate original wav - data = get_wav_data(src_dtype, num_channels, normalize=False, num_frames=num_frames) - save_wav(src_path, data, sample_rate) - - # 2.1. Convert the original wav to target format with paddleaudio - data = load_wav(src_path, normalize=False)[0] - if test_mode == "path": - sox_io_backend.save( - tgt_path, data, sample_rate, compression=compression, encoding=encoding, bits_per_sample=bits_per_sample - ) - elif test_mode == "fileobj": - with open(tgt_path, "bw") as file_: - sox_io_backend.save( - file_, - data, - sample_rate, - format=format, - compression=compression, - encoding=encoding, - bits_per_sample=bits_per_sample, - ) - elif test_mode == "bytesio": - file_ = io.BytesIO() - sox_io_backend.save( - file_, - data, - sample_rate, - format=format, - compression=compression, - encoding=encoding, - bits_per_sample=bits_per_sample, - ) - file_.seek(0) - with open(tgt_path, "bw") as f: - f.write(file_.read()) - else: - raise ValueError(f"Unexpected test mode: {test_mode}") - # 2.2. Convert the target format to wav with sox - sox_utils.convert_audio_file(tgt_path, tst_path, encoding=cmp_encoding, bit_depth=cmp_bit_depth) - # 2.3. Load with SciPy - found = load_wav(tst_path, normalize=False)[0] - - # 3.1. Convert the original wav to target format with sox - sox_encoding = _get_sox_encoding(encoding) - sox_utils.convert_audio_file( - src_path, sox_path, compression=compression, encoding=sox_encoding, bit_depth=bits_per_sample - ) - # 3.2. Convert the target format to wav with sox - sox_utils.convert_audio_file(sox_path, ref_path, encoding=cmp_encoding, bit_depth=cmp_bit_depth) - # 3.3. Load with SciPy - expected = load_wav(ref_path, normalize=False)[0] - - np.testing.assert_array_almost_equal(found, expected) - -class TestSave(TestSaveBase, unittest.TestCase): - @nested_params( - ["path",], - [ - ("PCM_U", 8), - ("PCM_S", 16), - ("PCM_S", 32), - ("PCM_F", 32), - ("PCM_F", 64), - ("ULAW", 8), - ("ALAW", 8), - ], - ) - def test_save_wav(self, test_mode, enc_params): - encoding, bits_per_sample = enc_params - self.assert_save_consistency("wav", encoding=encoding, bits_per_sample=bits_per_sample, test_mode=test_mode) - - @nested_params( - ["path", ], - [ - ("float32",), - ("int32",), - ], - ) - def test_save_wav_dtype(self, test_mode, params): - (dtype,) = params - self.assert_save_consistency("wav", src_dtype=dtype, test_mode=test_mode) - - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/unit/audio/backends/sox_io/smoke_test.py b/tests/unit/audio/backends/sox_io/smoke_test.py deleted file mode 100644 index 1f191bc51515e65244fbfde23db5efadf5d45e74..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/sox_io/smoke_test.py +++ /dev/null @@ -1,183 +0,0 @@ -import io -import itertools -import unittest - -from parameterized import parameterized -from paddlespeech.audio.backends import sox_io_backend -from tests.unit.common_utils import ( - get_wav_data, - TempDirMixin, - name_func -) - -class SmokeTest(TempDirMixin, unittest.TestCase): - """Run smoke test on various audio format - - The purpose of this test suite is to verify that sox_io_backend functionalities do not exhibit - abnormal behaviors. - - This test suite should be able to run without any additional tools (such as sox command), - however without such tools, the correctness of each function cannot be verified. - """ - - def run_smoke_test(self, ext, sample_rate, num_channels, *, compression=None, dtype="float32"): - duration = 1 - num_frames = sample_rate * duration - #path = self.get_temp_path(f"test.{ext}") - path = self.get_temp_path(f"test.{ext}") - original = get_wav_data(dtype, num_channels, normalize=False, num_frames=num_frames) - - # 1. run save - sox_io_backend.save(path, original, sample_rate, compression=compression) - # 2. run info - info = sox_io_backend.info(path) - assert info.sample_rate == sample_rate - assert info.num_channels == num_channels - # 3. run load - loaded, sr = sox_io_backend.load(path, normalize=False) - assert sr == sample_rate - assert loaded.shape[0] == num_channels - - @parameterized.expand( - list( - itertools.product( - ["float32", "int32" ], - #["float32", "int32", "int16", "uint8"], - [8000, 16000], - [1, 2], - ) - ), - name_func=name_func, - ) - def test_wav(self, dtype, sample_rate, num_channels): - """Run smoke test on wav format""" - self.run_smoke_test("wav", sample_rate, num_channels, dtype=dtype) - - #@parameterized.expand( - #list( - #itertools.product( - #[8000, 16000], - #[1, 2], - #[-4.2, -0.2, 0, 0.2, 96, 128, 160, 192, 224, 256, 320], - #) - #) - #) - #def test_mp3(self, sample_rate, num_channels, bit_rate): - #"""Run smoke test on mp3 format""" - #self.run_smoke_test("mp3", sample_rate, num_channels, compression=bit_rate) - - #@parameterized.expand( - #list( - #itertools.product( - #[8000, 16000], - #[1, 2], - #[-1, 0, 1, 2, 3, 3.6, 5, 10], - #) - #) - #) - #def test_vorbis(self, sample_rate, num_channels, quality_level): - #"""Run smoke test on vorbis format""" - #self.run_smoke_test("vorbis", sample_rate, num_channels, compression=quality_level) - - @parameterized.expand( - list( - itertools.product( - [8000, 16000], - [1, 2], - list(range(9)), - ) - ), - name_func=name_func, - ) - def test_flac(self, sample_rate, num_channels, compression_level): - """Run smoke test on flac format""" - self.run_smoke_test("flac", sample_rate, num_channels, compression=compression_level) - - -class SmokeTestFileObj(unittest.TestCase): - """Run smoke test on various audio format - - The purpose of this test suite is to verify that sox_io_backend functionalities do not exhibit - abnormal behaviors. - - This test suite should be able to run without any additional tools (such as sox command), - however without such tools, the correctness of each function cannot be verified. - """ - - def run_smoke_test(self, ext, sample_rate, num_channels, *, compression=None, dtype="float32"): - duration = 1 - num_frames = sample_rate * duration - original = get_wav_data(dtype, num_channels, normalize=False, num_frames=num_frames) - - fileobj = io.BytesIO() - # 1. run save - sox_io_backend.save(fileobj, original, sample_rate, compression=compression, format=ext) - # 2. run info - fileobj.seek(0) - info = sox_io_backend.info(fileobj, format=ext) - assert info.sample_rate == sample_rate - assert info.num_channels == num_channels - # 3. run load - fileobj.seek(0) - loaded, sr = sox_io_backend.load(fileobj, normalize=False, format=ext) - assert sr == sample_rate - assert loaded.shape[0] == num_channels - - @parameterized.expand( - list( - itertools.product( - ["float32", "int32"], - [8000, 16000], - [1, 2], - ) - ), - name_func=name_func, - ) - def test_wav(self, dtype, sample_rate, num_channels): - """Run smoke test on wav format""" - self.run_smoke_test("wav", sample_rate, num_channels, dtype=dtype) - - # not support yet - #@parameterized.expand( - #list( - #itertools.product( - #[8000, 16000], - #[1, 2], - #[-4.2, -0.2, 0, 0.2, 96, 128, 160, 192, 224, 256, 320], - #) - #) - #) - #def test_mp3(self, sample_rate, num_channels, bit_rate): - #"""Run smoke test on mp3 format""" - #self.run_smoke_test("mp3", sample_rate, num_channels, compression=bit_rate) - - #@parameterized.expand( - #list( - #itertools.product( - #[8000, 16000], - #[1, 2], - #[-1, 0, 1, 2, 3, 3.6, 5, 10], - #) - #) - #) - #def test_vorbis(self, sample_rate, num_channels, quality_level): - #"""Run smoke test on vorbis format""" - #self.run_smoke_test("vorbis", sample_rate, num_channels, compression=quality_level) - - @parameterized.expand( - list( - itertools.product( - [8000, 16000], - [1, 2], - list(range(9)), - ) - ), - name_func=name_func, - ) - def test_flac(self, sample_rate, num_channels, compression_level): - #"""Run smoke test on flac format""" - self.run_smoke_test("flac", sample_rate, num_channels, compression=compression_level) - -if __name__ == '__main__': - #test_func() - unittest.main() diff --git a/tests/unit/audio/backends/sox_io/sox_effect_test.py b/tests/unit/audio/backends/sox_io/sox_effect_test.py deleted file mode 100644 index d9c70bc5ef68c537e29d5c4adf302c9f36bc62c9..0000000000000000000000000000000000000000 --- a/tests/unit/audio/backends/sox_io/sox_effect_test.py +++ /dev/null @@ -1,347 +0,0 @@ -#code is from: https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/sox_effect/sox_effect_test.py -import io -import itertools -import tarfile -import unittest -from pathlib import Path -import numpy as np - -from parameterized import parameterized -from paddlespeech.audio import sox_effects -from paddlespeech.audio._internal import module_utils as _mod_utils -from tests.unit.common_utils import ( - get_sinusoid, - get_wav_data, - load_wav, - save_wav, - sox_utils, - TempDirMixin, - name_func, - load_effects_params -) - -if _mod_utils.is_module_available("requests"): - import requests - - -class TestSoxEffects(unittest.TestCase): - def test_init(self): - """Calling init_sox_effects multiple times does not crush""" - for _ in range(3): - sox_effects.init_sox_effects() - - -class TestSoxEffectsTensor(TempDirMixin, unittest.TestCase): - """Test suite for `apply_effects_tensor` function""" - - @parameterized.expand( - list(itertools.product(["float32", "int32"], [8000, 16000], [1, 2, 4, 8], [True, False])), - ) - def test_apply_no_effect(self, dtype, sample_rate, num_channels, channels_first): - """`apply_effects_tensor` without effects should return identical data as input""" - original = get_wav_data(dtype, num_channels, channels_first=channels_first) - expected = original.clone() - - found, output_sample_rate = sox_effects.apply_effects_tensor(expected, sample_rate, [], channels_first) - - assert (output_sample_rate == sample_rate) - # SoxEffect should not alter the input Tensor object - #self.assertEqual(original, expected) - np.testing.assert_array_almost_equal(original.numpy(), expected.numpy()) - - # SoxEffect should not return the same Tensor object - assert expected is not found - # Returned Tensor should equal to the input Tensor - #self.assertEqual(expected, found) - np.testing.assert_array_almost_equal(expected.numpy(), found.numpy()) - - @parameterized.expand( - load_effects_params("sox_effect_test_args.jsonl"), - name_func=lambda f, i, p: f'{f.__name__}_{i}_{p.args[0]["effects"][0][0]}', - ) - def test_apply_effects(self, args): - """`apply_effects_tensor` should return identical data as sox command""" - effects = args["effects"] - num_channels = args.get("num_channels", 2) - input_sr = args.get("input_sample_rate", 8000) - output_sr = args.get("output_sample_rate") - - input_path = self.get_temp_path("input.wav") - reference_path = self.get_temp_path("reference.wav") - - original = get_sinusoid(frequency=800, sample_rate=input_sr, n_channels=num_channels, dtype="float32") - save_wav(input_path, original, input_sr) - sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr) - - expected, expected_sr = load_wav(reference_path) - found, sr = sox_effects.apply_effects_tensor(original, input_sr, effects) - - assert sr == expected_sr - #self.assertEqual(expected, found) - np.testing.assert_array_almost_equal(expected.numpy(), found.numpy()) - - -class TestSoxEffectsFile(TempDirMixin, unittest.TestCase): - """Test suite for `apply_effects_file` function""" - - @parameterized.expand( - list( - itertools.product( - ["float32", "int32"], - [8000, 16000], - [1, 2, 4, 8], - [False, True], - ) - ), - #name_func=name_func, - ) - def test_apply_no_effect(self, dtype, sample_rate, num_channels, channels_first): - """`apply_effects_file` without effects should return identical data as input""" - path = self.get_temp_path("input.wav") - expected = get_wav_data(dtype, num_channels, channels_first=channels_first) - save_wav(path, expected, sample_rate, channels_first=channels_first) - - found, output_sample_rate = sox_effects.apply_effects_file( - path, [], normalize=False, channels_first=channels_first - ) - - assert output_sample_rate == sample_rate - #self.assertEqual(expected, found) - np.testing.assert_array_almost_equal(expected.numpy(), found.numpy()) - - @parameterized.expand( - load_effects_params("sox_effect_test_args.jsonl"), - #name_func=lambda f, i, p: f'{f.__name__}_{i}_{p.args[0]["effects"][0][0]}', - ) - def test_apply_effects_str(self, args): - """`apply_effects_file` should return identical data as sox command""" - dtype = "int32" - channels_first = True - effects = args["effects"] - num_channels = args.get("num_channels", 2) - input_sr = args.get("input_sample_rate", 8000) - output_sr = args.get("output_sample_rate") - - input_path = self.get_temp_path("input.wav") - reference_path = self.get_temp_path("reference.wav") - data = get_wav_data(dtype, num_channels, channels_first=channels_first) - save_wav(input_path, data, input_sr, channels_first=channels_first) - sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr) - - expected, expected_sr = load_wav(reference_path) - found, sr = sox_effects.apply_effects_file(input_path, effects, normalize=False, channels_first=channels_first) - - assert sr == expected_sr - #self.assertEqual(found, expected) - np.testing.assert_array_almost_equal(expected.numpy(), found.numpy()) - - - def test_apply_effects_path(self): - """`apply_effects_file` should return identical data as sox command when file path is given as a Path Object""" - dtype = "int32" - channels_first = True - effects = [["hilbert"]] - num_channels = 2 - input_sr = 8000 - output_sr = 8000 - - input_path = self.get_temp_path("input.wav") - reference_path = self.get_temp_path("reference.wav") - data = get_wav_data(dtype, num_channels, channels_first=channels_first) - save_wav(input_path, data, input_sr, channels_first=channels_first) - sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr) - - expected, expected_sr = load_wav(reference_path) - found, sr = sox_effects.apply_effects_file( - Path(input_path), effects, normalize=False, channels_first=channels_first - ) - - assert sr == expected_sr - #self.assertEqual(found, expected) - np.testing.assert_array_almost_equal(expected.numpy(), found.numpy()) - - -class TestFileFormats(TempDirMixin, unittest.TestCase): - """`apply_effects_file` gives the same result as sox on various file formats""" - - @parameterized.expand( - list( - itertools.product( - ["float32", "int32"], - [8000, 16000], - [1, 2], - ) - ), - #name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}', - ) - def test_wav(self, dtype, sample_rate, num_channels): - """`apply_effects_file` works on various wav format""" - channels_first = True - effects = [["band", "300", "10"]] - - input_path = self.get_temp_path("input.wav") - reference_path = self.get_temp_path("reference.wav") - data = get_wav_data(dtype, num_channels, channels_first=channels_first) - save_wav(input_path, data, sample_rate, channels_first=channels_first) - sox_utils.run_sox_effect(input_path, reference_path, effects) - - expected, expected_sr = load_wav(reference_path) - found, sr = sox_effects.apply_effects_file(input_path, effects, normalize=False, channels_first=channels_first) - - assert sr == expected_sr - #self.assertEqual(found, expected) - np.testing.assert_array_almost_equal(found.numpy(), expected.numpy()) - - #not support now - #@parameterized.expand( - #list( - #itertools.product( - #[8000, 16000], - #[1, 2], - #) - #), - ##name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}', - #) - #def test_flac(self, sample_rate, num_channels): - #"""`apply_effects_file` works on various flac format""" - #channels_first = True - #effects = [["band", "300", "10"]] - - #input_path = self.get_temp_path("input.flac") - #reference_path = self.get_temp_path("reference.wav") - #sox_utils.gen_audio_file(input_path, sample_rate, num_channels) - #sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32) - - #expected, expected_sr = load_wav(reference_path) - #found, sr = sox_effects.apply_effects_file(input_path, effects, channels_first=channels_first) - #save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first) - - #assert sr == expected_sr - ##self.assertEqual(found, expected) - #np.testing.assert_array_almost_equal(found.numpy(), expected.numpy()) - - #@parameterized.expand( - #list( - #itertools.product( - #[8000, 16000], - #[1, 2], - #) - #), - ##name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}', - #) - #def test_vorbis(self, sample_rate, num_channels): - #"""`apply_effects_file` works on various vorbis format""" - #channels_first = True - #effects = [["band", "300", "10"]] - - #input_path = self.get_temp_path("input.vorbis") - #reference_path = self.get_temp_path("reference.wav") - #sox_utils.gen_audio_file(input_path, sample_rate, num_channels) - #sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32) - - #expected, expected_sr = load_wav(reference_path) - #found, sr = sox_effects.apply_effects_file(input_path, effects, channels_first=channels_first) - #save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first) - - #assert sr == expected_sr - ##self.assertEqual(found, expected) - #np.testing.assert_array_almost_equal(found.numpy(), expected.numpy()) - - -#@skipIfNoExec("sox") -#@skipIfNoSox -class TestFileObject(TempDirMixin, unittest.TestCase): - @parameterized.expand( - [ - ("wav", None), - ] - ) - def test_fileobj(self, ext, compression): - """Applying effects via file object works""" - sample_rate = 16000 - channels_first = True - effects = [["band", "300", "10"]] - input_path = self.get_temp_path(f"input.{ext}") - reference_path = self.get_temp_path("reference.wav") - - #sox_utils.gen_audio_file(input_path, sample_rate, num_channels=2, compression=compression) - data = get_wav_data("int32", 2, channels_first=channels_first) - save_wav(input_path, data, sample_rate, channels_first=channels_first) - - sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32) - expected, expected_sr = load_wav(reference_path) - - with open(input_path, "rb") as fileobj: - found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first) - save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first) - assert sr == expected_sr - #self.assertEqual(found, expected) - np.testing.assert_array_almost_equal(found.numpy(), expected.numpy()) - - @parameterized.expand( - [ - ("wav", None), - ] - ) - def test_bytesio(self, ext, compression): - """Applying effects via BytesIO object works""" - sample_rate = 16000 - channels_first = True - effects = [["band", "300", "10"]] - input_path = self.get_temp_path(f"input.{ext}") - reference_path = self.get_temp_path("reference.wav") - - #sox_utils.gen_audio_file(input_path, sample_rate, num_channels=2, compression=compression) - data = get_wav_data("int32", 2, channels_first=channels_first) - save_wav(input_path, data, sample_rate, channels_first=channels_first) - sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32) - expected, expected_sr = load_wav(reference_path) - - with open(input_path, "rb") as file_: - fileobj = io.BytesIO(file_.read()) - found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first) - save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first) - assert sr == expected_sr - #self.assertEqual(found, expected) - print("found") - print(found) - print("expected") - print(expected) - np.testing.assert_array_almost_equal(found.numpy(), expected.numpy()) - - @parameterized.expand( - [ - ("wav", None), - ] - ) - def test_tarfile(self, ext, compression): - """Applying effects to compressed audio via file-like file works""" - sample_rate = 16000 - channels_first = True - effects = [["band", "300", "10"]] - audio_file = f"input.{ext}" - - input_path = self.get_temp_path(audio_file) - reference_path = self.get_temp_path("reference.wav") - archive_path = self.get_temp_path("archive.tar.gz") - data = get_wav_data("int32", 2, channels_first=channels_first) - save_wav(input_path, data, sample_rate, channels_first=channels_first) - - # sox_utils.gen_audio_file(input_path, sample_rate, num_channels=2, compression=compression) - sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32) - - expected, expected_sr = load_wav(reference_path) - - with tarfile.TarFile(archive_path, "w") as tarobj: - tarobj.add(input_path, arcname=audio_file) - with tarfile.TarFile(archive_path, "r") as tarobj: - fileobj = tarobj.extractfile(audio_file) - found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first) - save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first) - assert sr == expected_sr - #self.assertEqual(found, expected) - np.testing.assert_array_almost_equal(found.numpy(), expected.numpy()) - - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/unit/audio/features/base.py b/tests/unit/audio/features/base.py index 4049b61024a4dd6ab62a9751325aa52b3cde6617..d183b72ade749994ab4df5d8b0d421bd8d8ec733 100644 --- a/tests/unit/audio/features/base.py +++ b/tests/unit/audio/features/base.py @@ -17,8 +17,7 @@ import urllib.request import numpy as np import paddle - -from paddlespeech.audio.soundfile_backend import soundfile_load as load +from paddleaudio.backends import soundfile_load as load wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav' diff --git a/tests/unit/audio/features/test_istft.py b/tests/unit/audio/features/test_istft.py index f1e6e4e339a168906ba19b339b649074dfdf9d5f..9cf8cdd65582c0300d59749db621155eebd3faee 100644 --- a/tests/unit/audio/features/test_istft.py +++ b/tests/unit/audio/features/test_istft.py @@ -15,9 +15,9 @@ import unittest import numpy as np import paddle +from paddleaudio.functional.window import get_window from .base import FeatTest -from paddlespeech.audio.functional.window import get_window from paddlespeech.s2t.transform.spectrogram import IStft from paddlespeech.s2t.transform.spectrogram import Stft diff --git a/tests/unit/audio/features/test_kaldi.py b/tests/unit/audio/features/test_kaldi.py deleted file mode 100644 index 2b0ece890b206a7e42525154a3eb5f4ffc02a472..0000000000000000000000000000000000000000 --- a/tests/unit/audio/features/test_kaldi.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import unittest - -import numpy as np -import paddle -import torch -import torchaudio - -import paddlespeech.audio -from .base import FeatTest - - -class TestKaldi(FeatTest): - def initParmas(self): - self.window_size = 1024 - self.dtype = 'float32' - - def test_window(self): - t_hann_window = torch.hann_window( - self.window_size, periodic=False, dtype=eval(f'torch.{self.dtype}')) - t_hamm_window = torch.hamming_window( - self.window_size, - periodic=False, - alpha=0.54, - beta=0.46, - dtype=eval(f'torch.{self.dtype}')) - t_povey_window = torch.hann_window( - self.window_size, periodic=False, - dtype=eval(f'torch.{self.dtype}')).pow(0.85) - - p_hann_window = paddlespeech.audio.functional.window.get_window( - 'hann', - self.window_size, - fftbins=False, - dtype=eval(f'paddle.{self.dtype}')) - p_hamm_window = paddlespeech.audio.functional.window.get_window( - 'hamming', - self.window_size, - fftbins=False, - dtype=eval(f'paddle.{self.dtype}')) - p_povey_window = paddlespeech.audio.functional.window.get_window( - 'hann', - self.window_size, - fftbins=False, - dtype=eval(f'paddle.{self.dtype}')).pow(0.85) - - np.testing.assert_array_almost_equal(t_hann_window, p_hann_window) - np.testing.assert_array_almost_equal(t_hamm_window, p_hamm_window) - np.testing.assert_array_almost_equal(t_povey_window, p_povey_window) - - def test_fbank(self): - ta_features = torchaudio.compliance.kaldi.fbank( - torch.from_numpy(self.waveform.astype(self.dtype))) - pa_features = paddlespeech.audio.compliance.kaldi.fbank( - paddle.to_tensor(self.waveform.astype(self.dtype))) - np.testing.assert_array_almost_equal( - ta_features, pa_features, decimal=4) - - def test_mfcc(self): - ta_features = torchaudio.compliance.kaldi.mfcc( - torch.from_numpy(self.waveform.astype(self.dtype))) - pa_features = paddlespeech.audio.compliance.kaldi.mfcc( - paddle.to_tensor(self.waveform.astype(self.dtype))) - np.testing.assert_array_almost_equal( - ta_features, pa_features, decimal=4) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/audio/features/test_kaldi_feat.py b/tests/unit/audio/features/test_kaldi_feat.py deleted file mode 100644 index e0ca1fa1dbe391375380e98534d554a09e40818c..0000000000000000000000000000000000000000 --- a/tests/unit/audio/features/test_kaldi_feat.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import unittest - -import numpy as np -import paddle - -from paddlespeech.audio.kaldi import fbank as fbank -from paddlespeech.audio.kaldi import pitch as pitch -from kaldiio import ReadHelper - -# the groundtruth feats computed in kaldi command below. -#compute-fbank-feats --dither=0 scp:$wav_scp ark,t:fbank_feat.ark -#compute-kaldi-pitch-feats --sample-frequency=16000 scp:$wav_scp ark,t:pitch_feat.ark - -class TestKaldiFbank(unittest.TestCase): - - def test_fbank(self): - fbank_groundtruth = {} - with ReadHelper('ark:testdata/fbank_feat.ark') as reader: - for key, feat in reader: - fbank_groundtruth[key] = feat - - with ReadHelper('ark:testdata/wav.ark') as reader: - for key, wav in reader: - fbank_feat = fbank(wav) - fbank_check = fbank_groundtruth[key] - np.testing.assert_array_almost_equal( - fbank_feat, fbank_check, decimal=4) - - def test_pitch(self): - pitch_groundtruth = {} - with ReadHelper('ark:testdata/pitch_feat.ark') as reader: - for key, feat in reader: - pitch_groundtruth[key] = feat - - with ReadHelper('ark:testdata/wav.ark') as reader: - for key, wav in reader: - pitch_feat = pitch(wav) - pitch_check = pitch_groundtruth[key] - np.testing.assert_array_almost_equal( - pitch_feat, pitch_check, decimal=4) - - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/audio/features/test_librosa.py b/tests/unit/audio/features/test_librosa.py deleted file mode 100644 index ffdec3e788860320faf0ce28d0afb05ddc0238ba..0000000000000000000000000000000000000000 --- a/tests/unit/audio/features/test_librosa.py +++ /dev/null @@ -1,281 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import unittest - -import librosa -import numpy as np -import paddle - -import paddlespeech.audio -from .base import FeatTest -from paddlespeech.audio.functional.window import get_window - - -class TestLibrosa(FeatTest): - def initParmas(self): - self.n_fft = 512 - self.hop_length = 128 - self.n_mels = 40 - self.n_mfcc = 20 - self.fmin = 0.0 - self.window_str = 'hann' - self.pad_mode = 'reflect' - self.top_db = 80.0 - - def test_stft(self): - if len(self.waveform.shape) == 2: # (C, T) - self.waveform = self.waveform.squeeze( - 0) # 1D input for librosa.feature.melspectrogram - - feature_librosa = librosa.core.stft( - y=self.waveform, - n_fft=self.n_fft, - hop_length=self.hop_length, - win_length=None, - window=self.window_str, - center=True, - dtype=None, - pad_mode=self.pad_mode, ) - x = paddle.to_tensor(self.waveform).unsqueeze(0) - window = get_window(self.window_str, self.n_fft, dtype=x.dtype) - feature_paddle = paddle.signal.stft( - x=x, - n_fft=self.n_fft, - hop_length=self.hop_length, - win_length=None, - window=window, - center=True, - pad_mode=self.pad_mode, - normalized=False, - onesided=True, ).squeeze(0) - - np.testing.assert_array_almost_equal( - feature_librosa, feature_paddle, decimal=5) - - def test_istft(self): - if len(self.waveform.shape) == 2: # (C, T) - self.waveform = self.waveform.squeeze( - 0) # 1D input for librosa.feature.melspectrogram - - # Get stft result from librosa. - stft_matrix = librosa.core.stft( - y=self.waveform, - n_fft=self.n_fft, - hop_length=self.hop_length, - win_length=None, - window=self.window_str, - center=True, - pad_mode=self.pad_mode, ) - - feature_librosa = librosa.core.istft( - stft_matrix=stft_matrix, - hop_length=self.hop_length, - win_length=None, - window=self.window_str, - center=True, - dtype=None, - length=None, ) - - x = paddle.to_tensor(stft_matrix).unsqueeze(0) - window = get_window( - self.window_str, - self.n_fft, - dtype=paddle.to_tensor(self.waveform).dtype) - feature_paddle = paddle.signal.istft( - x=x, - n_fft=self.n_fft, - hop_length=self.hop_length, - win_length=None, - window=window, - center=True, - normalized=False, - onesided=True, - length=None, - return_complex=False, ).squeeze(0) - - np.testing.assert_array_almost_equal( - feature_librosa, feature_paddle, decimal=5) - - def test_mel(self): - feature_librosa = librosa.filters.mel( - sr=self.sr, - n_fft=self.n_fft, - n_mels=self.n_mels, - fmin=self.fmin, - fmax=None, - htk=False, - norm='slaney', - dtype=self.waveform.dtype, ) - feature_compliance = paddlespeech.audio.compliance.librosa.compute_fbank_matrix( - sr=self.sr, - n_fft=self.n_fft, - n_mels=self.n_mels, - fmin=self.fmin, - fmax=None, - htk=False, - norm='slaney', - dtype=self.waveform.dtype, ) - x = paddle.to_tensor(self.waveform) - feature_functional = paddlespeech.audio.functional.compute_fbank_matrix( - sr=self.sr, - n_fft=self.n_fft, - n_mels=self.n_mels, - f_min=self.fmin, - f_max=None, - htk=False, - norm='slaney', - dtype=x.dtype, ) - - np.testing.assert_array_almost_equal(feature_librosa, - feature_compliance) - np.testing.assert_array_almost_equal(feature_librosa, - feature_functional) - - def test_melspect(self): - if len(self.waveform.shape) == 2: # (C, T) - self.waveform = self.waveform.squeeze( - 0) # 1D input for librosa.feature.melspectrogram - - # librosa: - feature_librosa = librosa.feature.melspectrogram( - y=self.waveform, - sr=self.sr, - n_fft=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - fmin=self.fmin) - - # paddlespeech.audio.compliance.librosa: - feature_compliance = paddlespeech.audio.compliance.librosa.melspectrogram( - x=self.waveform, - sr=self.sr, - window_size=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - fmin=self.fmin, - to_db=False) - - # paddlespeech.audio.features.layer - x = paddle.to_tensor( - self.waveform, dtype=paddle.float64).unsqueeze(0) # Add batch dim. - feature_extractor = paddlespeech.audio.features.MelSpectrogram( - sr=self.sr, - n_fft=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - f_min=self.fmin, - dtype=x.dtype) - feature_layer = feature_extractor(x).squeeze(0).numpy() - - np.testing.assert_array_almost_equal( - feature_librosa, feature_compliance, decimal=5) - np.testing.assert_array_almost_equal( - feature_librosa, feature_layer, decimal=5) - - def test_log_melspect(self): - if len(self.waveform.shape) == 2: # (C, T) - self.waveform = self.waveform.squeeze( - 0) # 1D input for librosa.feature.melspectrogram - - # librosa: - feature_librosa = librosa.feature.melspectrogram( - y=self.waveform, - sr=self.sr, - n_fft=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - fmin=self.fmin) - feature_librosa = librosa.power_to_db(feature_librosa, top_db=None) - - # paddlespeech.audio.compliance.librosa: - feature_compliance = paddlespeech.audio.compliance.librosa.melspectrogram( - x=self.waveform, - sr=self.sr, - window_size=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - fmin=self.fmin) - - # paddlespeech.audio.features.layer - x = paddle.to_tensor( - self.waveform, dtype=paddle.float64).unsqueeze(0) # Add batch dim. - feature_extractor = paddlespeech.audio.features.LogMelSpectrogram( - sr=self.sr, - n_fft=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - f_min=self.fmin, - dtype=x.dtype) - feature_layer = feature_extractor(x).squeeze(0).numpy() - - np.testing.assert_array_almost_equal( - feature_librosa, feature_compliance, decimal=5) - np.testing.assert_array_almost_equal( - feature_librosa, feature_layer, decimal=4) - - def test_mfcc(self): - if len(self.waveform.shape) == 2: # (C, T) - self.waveform = self.waveform.squeeze( - 0) # 1D input for librosa.feature.melspectrogram - - # librosa: - feature_librosa = librosa.feature.mfcc( - y=self.waveform, - sr=self.sr, - S=None, - n_mfcc=self.n_mfcc, - dct_type=2, - norm='ortho', - lifter=0, - n_fft=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - fmin=self.fmin) - - # paddlespeech.audio.compliance.librosa: - feature_compliance = paddlespeech.audio.compliance.librosa.mfcc( - x=self.waveform, - sr=self.sr, - n_mfcc=self.n_mfcc, - dct_type=2, - norm='ortho', - lifter=0, - window_size=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - fmin=self.fmin, - top_db=self.top_db) - - # paddlespeech.audio.features.layer - x = paddle.to_tensor( - self.waveform, dtype=paddle.float64).unsqueeze(0) # Add batch dim. - feature_extractor = paddlespeech.audio.features.MFCC( - sr=self.sr, - n_mfcc=self.n_mfcc, - n_fft=self.n_fft, - hop_length=self.hop_length, - n_mels=self.n_mels, - f_min=self.fmin, - top_db=self.top_db, - dtype=x.dtype) - feature_layer = feature_extractor(x).squeeze(0).numpy() - - np.testing.assert_array_almost_equal( - feature_librosa, feature_compliance, decimal=4) - np.testing.assert_array_almost_equal( - feature_librosa, feature_layer, decimal=4) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/audio/features/test_log_melspectrogram.py b/tests/unit/audio/features/test_log_melspectrogram.py index 59eb73e8ca3ae1671f36b482cffa98032ba7d9d1..7d56803871c7e1537056517dbea81a38bedb85ed 100644 --- a/tests/unit/audio/features/test_log_melspectrogram.py +++ b/tests/unit/audio/features/test_log_melspectrogram.py @@ -15,8 +15,8 @@ import unittest import numpy as np import paddle +import paddleaudio -import paddlespeech.audio from .base import FeatTest from paddlespeech.s2t.transform.spectrogram import LogMelSpectrogram @@ -33,7 +33,7 @@ class TestLogMelSpectrogram(FeatTest): ps_res = ps_melspect(self.waveform.T).squeeze(1).T x = paddle.to_tensor(self.waveform) - ps_melspect = paddlespeech.audio.features.LogMelSpectrogram( + ps_melspect = paddleaudio.features.LogMelSpectrogram( self.sr, self.n_fft, self.hop_length, diff --git a/tests/unit/audio/features/test_spectrogram.py b/tests/unit/audio/features/test_spectrogram.py index 7d908a7ef5b8a12a950161a5a8ea2997f7ef4275..1774fe61975c4b4ae11b7ff2c9200a4d67499efe 100644 --- a/tests/unit/audio/features/test_spectrogram.py +++ b/tests/unit/audio/features/test_spectrogram.py @@ -15,8 +15,8 @@ import unittest import numpy as np import paddle +import paddleaudio -import paddlespeech.audio from .base import FeatTest from paddlespeech.s2t.transform.spectrogram import Spectrogram @@ -31,7 +31,7 @@ class TestSpectrogram(FeatTest): ps_res = ps_spect(self.waveform.T).squeeze(1).T # Magnitude x = paddle.to_tensor(self.waveform) - pa_spect = paddlespeech.audio.features.Spectrogram( + pa_spect = paddleaudio.features.Spectrogram( self.n_fft, self.hop_length, power=1.0) pa_res = pa_spect(x).squeeze(0).numpy() diff --git a/tests/unit/audio/features/test_stft.py b/tests/unit/audio/features/test_stft.py index 03448ca806abf5a83659f601f46ba91ece862759..58792ffe2477058958a4e31ed122263306e83388 100644 --- a/tests/unit/audio/features/test_stft.py +++ b/tests/unit/audio/features/test_stft.py @@ -15,9 +15,9 @@ import unittest import numpy as np import paddle +from paddleaudio.functional.window import get_window from .base import FeatTest -from paddlespeech.audio.functional.window import get_window from paddlespeech.s2t.transform.spectrogram import Stft diff --git a/tests/unit/common_utils/__init__.py b/tests/unit/common_utils/__init__.py index 7bc718f388f80cef9114bd90f1c6008e290d56e6..70e5331539660a94c87bee21544b38cabcd67532 100644 --- a/tests/unit/common_utils/__init__.py +++ b/tests/unit/common_utils/__init__.py @@ -1,19 +1,15 @@ -from .wav_utils import get_wav_data, load_wav, save_wav, normalize_wav -from .parameterized_utils import nested_params -from .data_utils import get_sinusoid, load_params, load_effects_params -from .case_utils import ( - TempDirMixin, - name_func -) +from .case_utils import name_func +from .case_utils import TempDirMixin +from .data_utils import get_sinusoid +from .data_utils import load_effects_params +from .data_utils import load_params +from .parameterized_utils import nested_params +from .wav_utils import get_wav_data +from .wav_utils import load_wav +from .wav_utils import normalize_wav +from .wav_utils import save_wav __all__ = [ - "get_wav_data", - "load_wav", - "save_wav", - "normalize_wav", - "load_params", - "nested_params", - "get_sinusoid", - "name_func", - "load_effects_params" + "get_wav_data", "load_wav", "save_wav", "normalize_wav", "load_params", + "nested_params", "get_sinusoid", "name_func", "load_effects_params" ] diff --git a/tests/unit/common_utils/case_utils.py b/tests/unit/common_utils/case_utils.py index 406d293b6ab29ed985520225085db13a3c1a28c3..65a78c5df0d7de9515954be966236b9fc674527b 100644 --- a/tests/unit/common_utils/case_utils.py +++ b/tests/unit/common_utils/case_utils.py @@ -1,24 +1,13 @@ -import functools import os.path -import shutil -import subprocess -import sys import tempfile -import time -import unittest #code is from:https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/common_utils/case_utils.py -import paddle -from paddlespeech.audio._internal.module_utils import ( - is_kaldi_available, - is_module_available, - is_sox_available, -) def name_func(func, _, params): return f'{func.__name__}_{"_".join(str(arg) for arg in params.args)}' + class TempDirMixin: """Mixin to provide easy access to temp dir""" diff --git a/tests/unit/common_utils/wav_utils.py b/tests/unit/common_utils/wav_utils.py index 25d0b1971c1ca7e6ba0ca4326a0af0793e78ecde..5cae6d8e6f1641f2b8c228de375d359910c2f6c4 100644 --- a/tests/unit/common_utils/wav_utils.py +++ b/tests/unit/common_utils/wav_utils.py @@ -1,8 +1,8 @@ from typing import Optional -import scipy.io.wavfile import paddle -import numpy as np +import scipy.io.wavfile + def normalize_wav(tensor: paddle.Tensor) -> paddle.Tensor: if tensor.dtype == paddle.float32: @@ -23,13 +23,12 @@ def normalize_wav(tensor: paddle.Tensor) -> paddle.Tensor: def get_wav_data( - dtype: str, - num_channels: int, - *, - num_frames: Optional[int] = None, - normalize: bool = True, - channels_first: bool = True, -): + dtype: str, + num_channels: int, + *, + num_frames: Optional[int]=None, + normalize: bool=True, + channels_first: bool=True, ): """Generate linear signal of the given dtype and num_channels Data range is @@ -53,25 +52,26 @@ def get_wav_data( # paddle linspace not support uint8, int8, int16 #if dtype == "uint8": # base = paddle.linspace(0, 255, num_frames, dtype=dtype_) - #dtype_np = getattr(np, dtype) - #base_np = np.linspace(0, 255, num_frames, dtype_np) - #base = paddle.to_tensor(base_np, dtype=dtype_) + #dtype_np = getattr(np, dtype) + #base_np = np.linspace(0, 255, num_frames, dtype_np) + #base = paddle.to_tensor(base_np, dtype=dtype_) #elif dtype == "int8": # base = paddle.linspace(-128, 127, num_frames, dtype=dtype_) - #dtype_np = getattr(np, dtype) - #base_np = np.linspace(-128, 127, num_frames, dtype_np) - #base = paddle.to_tensor(base_np, dtype=dtype_) + #dtype_np = getattr(np, dtype) + #base_np = np.linspace(-128, 127, num_frames, dtype_np) + #base = paddle.to_tensor(base_np, dtype=dtype_) if dtype == "float32": base = paddle.linspace(-1.0, 1.0, num_frames, dtype=dtype_) elif dtype == "float64": base = paddle.linspace(-1.0, 1.0, num_frames, dtype=dtype_) elif dtype == "int32": - base = paddle.linspace(-2147483648, 2147483647, num_frames, dtype=dtype_) + base = paddle.linspace( + -2147483648, 2147483647, num_frames, dtype=dtype_) #elif dtype == "int16": # base = paddle.linspace(-32768, 32767, num_frames, dtype=dtype_) - #dtype_np = getattr(np, dtype) - #base_np = np.linspace(-32768, 32767, num_frames, dtype_np) - #base = paddle.to_tensor(base_np, dtype=dtype_) + #dtype_np = getattr(np, dtype) + #base_np = np.linspace(-32768, 32767, num_frames, dtype_np) + #base = paddle.to_tensor(base_np, dtype=dtype_) else: raise NotImplementedError(f"Unsupported dtype {dtype}") data = base.tile([num_channels, 1])