未验证 提交 e7916a5a 编写于 作者: 小湉湉's avatar 小湉湉 提交者: GitHub

Merge pull request #1582 from KPatr1ck/docs

[Audio][Doc]Add paddleaudio doc.
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import warnings import warnings
from typing import Optional from typing import Optional
from typing import Tuple from typing import Tuple
...@@ -19,7 +20,6 @@ from typing import Union ...@@ -19,7 +20,6 @@ from typing import Union
import numpy as np import numpy as np
import resampy import resampy
import soundfile as sf import soundfile as sf
from numpy import ndarray as array
from scipy.io import wavfile from scipy.io import wavfile
from ..utils import ParameterError from ..utils import ParameterError
...@@ -38,12 +38,20 @@ RESAMPLE_MODES = ['kaiser_best', 'kaiser_fast'] ...@@ -38,12 +38,20 @@ RESAMPLE_MODES = ['kaiser_best', 'kaiser_fast']
EPS = 1e-8 EPS = 1e-8
def resample(y: array, src_sr: int, target_sr: int, def resample(y: np.ndarray,
mode: str='kaiser_fast') -> array: src_sr: int,
""" Audio resampling target_sr: int,
This function is the same as using resampy.resample(). mode: str='kaiser_fast') -> np.ndarray:
Notes: """Audio resampling.
The default mode is kaiser_fast. For better audio quality, use mode = 'kaiser_fast'
Args:
y (np.ndarray): Input waveform array in 1D or 2D.
src_sr (int): Source sample rate.
target_sr (int): Target sample rate.
mode (str, optional): The resampling filter to use. Defaults to 'kaiser_fast'.
Returns:
np.ndarray: `y` resampled to `target_sr`
""" """
if mode == 'kaiser_best': if mode == 'kaiser_best':
...@@ -53,7 +61,7 @@ def resample(y: array, src_sr: int, target_sr: int, ...@@ -53,7 +61,7 @@ def resample(y: array, src_sr: int, target_sr: int,
if not isinstance(y, np.ndarray): if not isinstance(y, np.ndarray):
raise ParameterError( raise ParameterError(
'Only support numpy array, but received y in {type(y)}') 'Only support numpy np.ndarray, but received y in {type(y)}')
if mode not in RESAMPLE_MODES: if mode not in RESAMPLE_MODES:
raise ParameterError(f'resample mode must in {RESAMPLE_MODES}') raise ParameterError(f'resample mode must in {RESAMPLE_MODES}')
...@@ -61,9 +69,17 @@ def resample(y: array, src_sr: int, target_sr: int, ...@@ -61,9 +69,17 @@ def resample(y: array, src_sr: int, target_sr: int,
return resampy.resample(y, src_sr, target_sr, filter=mode) return resampy.resample(y, src_sr, target_sr, filter=mode)
def to_mono(y: array, merge_type: str='average') -> array: def to_mono(y: np.ndarray, merge_type: str='average') -> np.ndarray:
""" convert sterior audio to mono """Convert sterior audio to mono.
Args:
y (np.ndarray): Input waveform array in 1D or 2D.
merge_type (str, optional): Merge type to generate mono waveform. Defaults to 'average'.
Returns:
np.ndarray: `y` with mono channel.
""" """
if merge_type not in MERGE_TYPES: if merge_type not in MERGE_TYPES:
raise ParameterError( raise ParameterError(
f'Unsupported merge type {merge_type}, available types are {MERGE_TYPES}' f'Unsupported merge type {merge_type}, available types are {MERGE_TYPES}'
...@@ -101,18 +117,34 @@ def to_mono(y: array, merge_type: str='average') -> array: ...@@ -101,18 +117,34 @@ def to_mono(y: array, merge_type: str='average') -> array:
return y_out return y_out
def _safe_cast(y: array, dtype: Union[type, str]) -> array: def _safe_cast(y: np.ndarray, dtype: Union[type, str]) -> np.ndarray:
""" data type casting in a safe way, i.e., prevent overflow or underflow """Data type casting in a safe way, i.e., prevent overflow or underflow.
This function is used internally.
Args:
y (np.ndarray): Input waveform array in 1D or 2D.
dtype (Union[type, str]): Data type of waveform.
Returns:
np.ndarray: `y` after safe casting.
""" """
return np.clip(y, np.iinfo(dtype).min, np.iinfo(dtype).max).astype(dtype) if 'float' in str(y.dtype):
return np.clip(y, np.finfo(dtype).min,
np.finfo(dtype).max).astype(dtype)
else:
return np.clip(y, np.iinfo(dtype).min,
np.iinfo(dtype).max).astype(dtype)
def depth_convert(y: array, dtype: Union[type, str], def depth_convert(y: np.ndarray, dtype: Union[type, str]) -> np.ndarray:
dithering: bool=True) -> array: """Convert audio array to target dtype safely. This function convert audio waveform to a target dtype, with addition steps of
"""Convert audio array to target dtype safely
This function convert audio waveform to a target dtype, with addition steps of
preventing overflow/underflow and preserving audio range. preventing overflow/underflow and preserving audio range.
Args:
y (np.ndarray): Input waveform array in 1D or 2D.
dtype (Union[type, str]): Data type of waveform.
Returns:
np.ndarray: `y` after safe casting.
""" """
SUPPORT_DTYPE = ['int16', 'int8', 'float32', 'float64'] SUPPORT_DTYPE = ['int16', 'int8', 'float32', 'float64']
...@@ -157,14 +189,20 @@ def depth_convert(y: array, dtype: Union[type, str], ...@@ -157,14 +189,20 @@ def depth_convert(y: array, dtype: Union[type, str],
return y return y
def sound_file_load(file: str, def sound_file_load(file: os.PathLike,
offset: Optional[float]=None, offset: Optional[float]=None,
dtype: str='int16', dtype: str='int16',
duration: Optional[int]=None) -> Tuple[array, int]: duration: Optional[int]=None) -> Tuple[np.ndarray, int]:
"""Load audio using soundfile library """Load audio using soundfile library. This function load audio file using libsndfile.
This function load audio file using libsndfile.
Reference: Args:
http://www.mega-nerd.com/libsndfile/#Features file (os.PathLike): File of waveform.
offset (Optional[float], optional): Offset to the start of waveform. Defaults to None.
dtype (str, optional): Data type of waveform. Defaults to 'int16'.
duration (Optional[int], optional): Duration of waveform to read. Defaults to None.
Returns:
Tuple[np.ndarray, int]: Waveform in ndarray and its samplerate.
""" """
with sf.SoundFile(file) as sf_desc: with sf.SoundFile(file) as sf_desc:
sr_native = sf_desc.samplerate sr_native = sf_desc.samplerate
...@@ -179,9 +217,17 @@ def sound_file_load(file: str, ...@@ -179,9 +217,17 @@ def sound_file_load(file: str,
return y, sf_desc.samplerate return y, sf_desc.samplerate
def normalize(y: array, norm_type: str='linear', def normalize(y: np.ndarray, norm_type: str='linear',
mul_factor: float=1.0) -> array: mul_factor: float=1.0) -> np.ndarray:
""" normalize an input audio with additional multiplier. """Normalize an input audio with additional multiplier.
Args:
y (np.ndarray): Input waveform array in 1D or 2D.
norm_type (str, optional): Type of normalization. Defaults to 'linear'.
mul_factor (float, optional): Scaling factor. Defaults to 1.0.
Returns:
np.ndarray: `y` after normalization.
""" """
if norm_type == 'linear': if norm_type == 'linear':
...@@ -199,12 +245,13 @@ def normalize(y: array, norm_type: str='linear', ...@@ -199,12 +245,13 @@ def normalize(y: array, norm_type: str='linear',
return y return y
def save(y: array, sr: int, file: str) -> None: def save(y: np.ndarray, sr: int, file: os.PathLike) -> None:
"""Save audio file to disk. """Save audio file to disk. This function saves audio to disk using scipy.io.wavfile, with additional step to convert input waveform to int16.
This function saves audio to disk using scipy.io.wavfile, with additional step
to convert input waveform to int16 unless it already is int16 Args:
Notes: y (np.ndarray): Input waveform array in 1D or 2D.
It only support raw wav format. sr (int): Sample rate.
file (os.PathLike): Path of auido file to save.
""" """
if not file.endswith('.wav'): if not file.endswith('.wav'):
raise ParameterError( raise ParameterError(
...@@ -226,7 +273,7 @@ def save(y: array, sr: int, file: str) -> None: ...@@ -226,7 +273,7 @@ def save(y: array, sr: int, file: str) -> None:
def load( def load(
file: str, file: os.PathLike,
sr: Optional[int]=None, sr: Optional[int]=None,
mono: bool=True, mono: bool=True,
merge_type: str='average', # ch0,ch1,random,average merge_type: str='average', # ch0,ch1,random,average
...@@ -236,11 +283,24 @@ def load( ...@@ -236,11 +283,24 @@ def load(
offset: float=0.0, offset: float=0.0,
duration: Optional[int]=None, duration: Optional[int]=None,
dtype: str='float32', dtype: str='float32',
resample_mode: str='kaiser_fast') -> Tuple[array, int]: resample_mode: str='kaiser_fast') -> Tuple[np.ndarray, int]:
"""Load audio file from disk. """Load audio file from disk. This function loads audio from disk using using audio beackend.
This function loads audio from disk using using audio beackend.
Parameters: Args:
Notes: file (os.PathLike): Path of auido file to load.
sr (Optional[int], optional): Sample rate of loaded waveform. Defaults to None.
mono (bool, optional): Return waveform with mono channel. Defaults to True.
merge_type (str, optional): Merge type of multi-channels waveform. Defaults to 'average'.
normal (bool, optional): Waveform normalization. Defaults to True.
norm_type (str, optional): Type of normalization. Defaults to 'linear'.
norm_mul_factor (float, optional): Scaling factor. Defaults to 1.0.
offset (float, optional): Offset to the start of waveform. Defaults to 0.0.
duration (Optional[int], optional): Duration of waveform to read. Defaults to None.
dtype (str, optional): Data type of waveform. Defaults to 'float32'.
resample_mode (str, optional): The resampling filter to use. Defaults to 'kaiser_fast'.
Returns:
Tuple[np.ndarray, int]: Waveform in ndarray and its samplerate.
""" """
y, r = sound_file_load(file, offset=offset, dtype=dtype, duration=duration) y, r = sound_file_load(file, offset=offset, dtype=dtype, duration=duration)
......
...@@ -220,7 +220,7 @@ def spectrogram(waveform: Tensor, ...@@ -220,7 +220,7 @@ def spectrogram(waveform: Tensor,
"""Compute and return a spectrogram from a waveform. The output is identical to Kaldi's. """Compute and return a spectrogram from a waveform. The output is identical to Kaldi's.
Args: Args:
waveform (Tensor): A waveform tensor with shape [C, T]. waveform (Tensor): A waveform tensor with shape `(C, T)`.
blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42. blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42.
channel (int, optional): Select the channel of waveform. Defaults to -1. channel (int, optional): Select the channel of waveform. Defaults to -1.
dither (float, optional): Dithering constant . Defaults to 0.0. dither (float, optional): Dithering constant . Defaults to 0.0.
...@@ -239,7 +239,7 @@ def spectrogram(waveform: Tensor, ...@@ -239,7 +239,7 @@ def spectrogram(waveform: Tensor,
window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY. window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY.
Returns: Returns:
Tensor: A spectrogram tensor with shape (m, padded_window_size // 2 + 1) where m is the number of frames Tensor: A spectrogram tensor with shape `(m, padded_window_size // 2 + 1)` where m is the number of frames
depends on frame_length and frame_shift. depends on frame_length and frame_shift.
""" """
dtype = waveform.dtype dtype = waveform.dtype
...@@ -422,7 +422,7 @@ def fbank(waveform: Tensor, ...@@ -422,7 +422,7 @@ def fbank(waveform: Tensor,
"""Compute and return filter banks from a waveform. The output is identical to Kaldi's. """Compute and return filter banks from a waveform. The output is identical to Kaldi's.
Args: Args:
waveform (Tensor): A waveform tensor with shape [C, T]. waveform (Tensor): A waveform tensor with shape `(C, T)`.
blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42. blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42.
channel (int, optional): Select the channel of waveform. Defaults to -1. channel (int, optional): Select the channel of waveform. Defaults to -1.
dither (float, optional): Dithering constant . Defaults to 0.0. dither (float, optional): Dithering constant . Defaults to 0.0.
...@@ -451,7 +451,7 @@ def fbank(waveform: Tensor, ...@@ -451,7 +451,7 @@ def fbank(waveform: Tensor,
window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY. window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY.
Returns: Returns:
Tensor: A filter banks tensor with shape (m, n_mels). Tensor: A filter banks tensor with shape `(m, n_mels)`.
""" """
dtype = waveform.dtype dtype = waveform.dtype
...@@ -542,7 +542,7 @@ def mfcc(waveform: Tensor, ...@@ -542,7 +542,7 @@ def mfcc(waveform: Tensor,
identical to Kaldi's. identical to Kaldi's.
Args: Args:
waveform (Tensor): A waveform tensor with shape [C, T]. waveform (Tensor): A waveform tensor with shape `(C, T)`.
blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42. blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42.
cepstral_lifter (float, optional): Scaling of output mfccs. Defaults to 22.0. cepstral_lifter (float, optional): Scaling of output mfccs. Defaults to 22.0.
channel (int, optional): Select the channel of waveform. Defaults to -1. channel (int, optional): Select the channel of waveform. Defaults to -1.
...@@ -571,7 +571,7 @@ def mfcc(waveform: Tensor, ...@@ -571,7 +571,7 @@ def mfcc(waveform: Tensor,
window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY. window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY.
Returns: Returns:
Tensor: A mel frequency cepstral coefficients tensor with shape (m, n_mfcc). Tensor: A mel frequency cepstral coefficients tensor with shape `(m, n_mfcc)`.
""" """
assert n_mfcc <= n_mels, 'n_mfcc cannot be larger than n_mels: %d vs %d' % ( assert n_mfcc <= n_mels, 'n_mfcc cannot be larger than n_mels: %d vs %d' % (
n_mfcc, n_mels) n_mfcc, n_mels)
......
...@@ -17,6 +17,7 @@ from typing import Optional ...@@ -17,6 +17,7 @@ from typing import Optional
from typing import Union from typing import Union
import paddle import paddle
from paddle import Tensor
__all__ = [ __all__ = [
'hz_to_mel', 'hz_to_mel',
...@@ -29,19 +30,20 @@ __all__ = [ ...@@ -29,19 +30,20 @@ __all__ = [
] ]
def hz_to_mel(freq: Union[paddle.Tensor, float], def hz_to_mel(freq: Union[Tensor, float],
htk: bool=False) -> Union[paddle.Tensor, float]: htk: bool=False) -> Union[Tensor, float]:
"""Convert Hz to Mels. """Convert Hz to Mels.
Parameters:
freq: the input tensor of arbitrary shape, or a single floating point number. Args:
htk: use HTK formula to do the conversion. freq (Union[Tensor, float]): The input tensor with arbitrary shape.
The default value is False. htk (bool, optional): Use htk scaling. Defaults to False.
Returns: Returns:
The frequencies represented in Mel-scale. Union[Tensor, float]: Frequency in mels.
""" """
if htk: if htk:
if isinstance(freq, paddle.Tensor): if isinstance(freq, Tensor):
return 2595.0 * paddle.log10(1.0 + freq / 700.0) return 2595.0 * paddle.log10(1.0 + freq / 700.0)
else: else:
return 2595.0 * math.log10(1.0 + freq / 700.0) return 2595.0 * math.log10(1.0 + freq / 700.0)
...@@ -58,7 +60,7 @@ def hz_to_mel(freq: Union[paddle.Tensor, float], ...@@ -58,7 +60,7 @@ def hz_to_mel(freq: Union[paddle.Tensor, float],
min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels) min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels)
logstep = math.log(6.4) / 27.0 # step size for log region logstep = math.log(6.4) / 27.0 # step size for log region
if isinstance(freq, paddle.Tensor): if isinstance(freq, Tensor):
target = min_log_mel + paddle.log( target = min_log_mel + paddle.log(
freq / min_log_hz + 1e-10) / logstep # prevent nan with 1e-10 freq / min_log_hz + 1e-10) / logstep # prevent nan with 1e-10
mask = (freq > min_log_hz).astype(freq.dtype) mask = (freq > min_log_hz).astype(freq.dtype)
...@@ -71,14 +73,16 @@ def hz_to_mel(freq: Union[paddle.Tensor, float], ...@@ -71,14 +73,16 @@ def hz_to_mel(freq: Union[paddle.Tensor, float],
return mels return mels
def mel_to_hz(mel: Union[float, paddle.Tensor], def mel_to_hz(mel: Union[float, Tensor],
htk: bool=False) -> Union[float, paddle.Tensor]: htk: bool=False) -> Union[float, Tensor]:
"""Convert mel bin numbers to frequencies. """Convert mel bin numbers to frequencies.
Parameters:
mel: the mel frequency represented as a tensor of arbitrary shape, or a floating point number. Args:
htk: use HTK formula to do the conversion. mel (Union[float, Tensor]): The mel frequency represented as a tensor with arbitrary shape.
htk (bool, optional): Use htk scaling. Defaults to False.
Returns: Returns:
The frequencies represented in hz. Union[float, Tensor]: Frequencies in Hz.
""" """
if htk: if htk:
return 700.0 * (10.0**(mel / 2595.0) - 1.0) return 700.0 * (10.0**(mel / 2595.0) - 1.0)
...@@ -90,7 +94,7 @@ def mel_to_hz(mel: Union[float, paddle.Tensor], ...@@ -90,7 +94,7 @@ def mel_to_hz(mel: Union[float, paddle.Tensor],
min_log_hz = 1000.0 # beginning of log region (Hz) min_log_hz = 1000.0 # beginning of log region (Hz)
min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels) min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels)
logstep = math.log(6.4) / 27.0 # step size for log region logstep = math.log(6.4) / 27.0 # step size for log region
if isinstance(mel, paddle.Tensor): if isinstance(mel, Tensor):
target = min_log_hz * paddle.exp(logstep * (mel - min_log_mel)) target = min_log_hz * paddle.exp(logstep * (mel - min_log_mel))
mask = (mel > min_log_mel).astype(mel.dtype) mask = (mel > min_log_mel).astype(mel.dtype)
freqs = target * mask + freqs * ( freqs = target * mask + freqs * (
...@@ -106,16 +110,18 @@ def mel_frequencies(n_mels: int=64, ...@@ -106,16 +110,18 @@ def mel_frequencies(n_mels: int=64,
f_min: float=0.0, f_min: float=0.0,
f_max: float=11025.0, f_max: float=11025.0,
htk: bool=False, htk: bool=False,
dtype: str=paddle.float32): dtype: str='float32') -> Tensor:
"""Compute mel frequencies. """Compute mel frequencies.
Parameters:
n_mels(int): number of Mel bins. Args:
f_min(float): the lower cut-off frequency, below which the filter response is zero. n_mels (int, optional): Number of mel bins. Defaults to 64.
f_max(float): the upper cut-off frequency, above which the filter response is zero. f_min (float, optional): Minimum frequency in Hz. Defaults to 0.0.
htk(bool): whether to use htk formula. fmax (float, optional): Maximum frequency in Hz. Defaults to 11025.0.
dtype(str): the datatype of the return frequencies. htk (bool, optional): Use htk scaling. Defaults to False.
dtype (str, optional): The data type of the return frequencies. Defaults to 'float32'.
Returns: Returns:
The frequencies represented in Mel-scale Tensor: Tensor of n_mels frequencies in Hz with shape `(n_mels,)`.
""" """
# 'Center freqs' of mel bands - uniformly spaced between limits # 'Center freqs' of mel bands - uniformly spaced between limits
min_mel = hz_to_mel(f_min, htk=htk) min_mel = hz_to_mel(f_min, htk=htk)
...@@ -125,14 +131,16 @@ def mel_frequencies(n_mels: int=64, ...@@ -125,14 +131,16 @@ def mel_frequencies(n_mels: int=64,
return freqs return freqs
def fft_frequencies(sr: int, n_fft: int, dtype: str=paddle.float32): def fft_frequencies(sr: int, n_fft: int, dtype: str='float32') -> Tensor:
"""Compute fourier frequencies. """Compute fourier frequencies.
Parameters:
sr(int): the audio sample rate. Args:
n_fft(float): the number of fft bins. sr (int): Sample rate.
dtype(str): the datatype of the return frequencies. n_fft (int): Number of fft bins.
dtype (str, optional): The data type of the return frequencies. Defaults to 'float32'.
Returns: Returns:
The frequencies represented in hz. Tensor: FFT frequencies in Hz with shape `(n_fft//2 + 1,)`.
""" """
return paddle.linspace(0, float(sr) / 2, int(1 + n_fft // 2), dtype=dtype) return paddle.linspace(0, float(sr) / 2, int(1 + n_fft // 2), dtype=dtype)
...@@ -144,23 +152,21 @@ def compute_fbank_matrix(sr: int, ...@@ -144,23 +152,21 @@ def compute_fbank_matrix(sr: int,
f_max: Optional[float]=None, f_max: Optional[float]=None,
htk: bool=False, htk: bool=False,
norm: Union[str, float]='slaney', norm: Union[str, float]='slaney',
dtype: str=paddle.float32): dtype: str='float32') -> Tensor:
"""Compute fbank matrix. """Compute fbank matrix.
Parameters:
sr(int): the audio sample rate. Args:
n_fft(int): the number of fft bins. sr (int): Sample rate.
n_mels(int): the number of Mel bins. n_fft (int): Number of fft bins.
f_min(float): the lower cut-off frequency, below which the filter response is zero. n_mels (int, optional): Number of mel bins. Defaults to 64.
f_max(float): the upper cut-off frequency, above which the filter response is zero. f_min (float, optional): Minimum frequency in Hz. Defaults to 0.0.
htk: whether to use htk formula. f_max (Optional[float], optional): Maximum frequency in Hz. Defaults to None.
return_complex(bool): whether to return complex matrix. If True, the matrix will htk (bool, optional): Use htk scaling. Defaults to False.
be complex type. Otherwise, the real and image part will be stored in the last norm (Union[str, float], optional): Type of normalization. Defaults to 'slaney'.
axis of returned tensor. dtype (str, optional): The data type of the return matrix. Defaults to 'float32'.
dtype(str): the datatype of the returned fbank matrix.
Returns: Returns:
The fbank matrix of shape (n_mels, int(1+n_fft//2)). Tensor: Mel transform matrix with shape `(n_mels, n_fft//2 + 1)`.
Shape:
output: (n_mels, int(1+n_fft//2))
""" """
if f_max is None: if f_max is None:
...@@ -199,27 +205,20 @@ def compute_fbank_matrix(sr: int, ...@@ -199,27 +205,20 @@ def compute_fbank_matrix(sr: int,
return weights return weights
def power_to_db(magnitude: paddle.Tensor, def power_to_db(spect: Tensor,
ref_value: float=1.0, ref_value: float=1.0,
amin: float=1e-10, amin: float=1e-10,
top_db: Optional[float]=None) -> paddle.Tensor: top_db: Optional[float]=None) -> Tensor:
"""Convert a power spectrogram (amplitude squared) to decibel (dB) units. """Convert a power spectrogram (amplitude squared) to decibel (dB) units. The function computes the scaling `10 * log10(x / ref)` in a numerically stable way.
The function computes the scaling ``10 * log10(x / ref)`` in a numerically
stable way. Args:
Parameters: spect (Tensor): STFT power spectrogram.
magnitude(Tensor): the input magnitude tensor of any shape. ref_value (float, optional): The reference value. If smaller than 1.0, the db level of the signal will be pulled up accordingly. Otherwise, the db level is pushed down. Defaults to 1.0.
ref_value(float): the reference value. If smaller than 1.0, the db level amin (float, optional): Minimum threshold. Defaults to 1e-10.
of the signal will be pulled up accordingly. Otherwise, the db level top_db (Optional[float], optional): Threshold the output at `top_db` below the peak. Defaults to None.
is pushed down.
amin(float): the minimum value of input magnitude, below which the input
magnitude is clipped(to amin).
top_db(float): the maximum db value of resulting spectrum, above which the
spectrum is clipped(to top_db).
Returns: Returns:
The spectrogram in log-scale. Tensor: Power spectrogram in db scale.
shape:
input: any shape
output: same as input
""" """
if amin <= 0: if amin <= 0:
raise Exception("amin must be strictly positive") raise Exception("amin must be strictly positive")
...@@ -227,8 +226,8 @@ def power_to_db(magnitude: paddle.Tensor, ...@@ -227,8 +226,8 @@ def power_to_db(magnitude: paddle.Tensor,
if ref_value <= 0: if ref_value <= 0:
raise Exception("ref_value must be strictly positive") raise Exception("ref_value must be strictly positive")
ones = paddle.ones_like(magnitude) ones = paddle.ones_like(spect)
log_spec = 10.0 * paddle.log10(paddle.maximum(ones * amin, magnitude)) log_spec = 10.0 * paddle.log10(paddle.maximum(ones * amin, spect))
log_spec -= 10.0 * math.log10(max(ref_value, amin)) log_spec -= 10.0 * math.log10(max(ref_value, amin))
if top_db is not None: if top_db is not None:
...@@ -242,15 +241,17 @@ def power_to_db(magnitude: paddle.Tensor, ...@@ -242,15 +241,17 @@ def power_to_db(magnitude: paddle.Tensor,
def create_dct(n_mfcc: int, def create_dct(n_mfcc: int,
n_mels: int, n_mels: int,
norm: Optional[str]='ortho', norm: Optional[str]='ortho',
dtype: Optional[str]=paddle.float32) -> paddle.Tensor: dtype: str='float32') -> Tensor:
"""Create a discrete cosine transform(DCT) matrix. """Create a discrete cosine transform(DCT) matrix.
Parameters: Args:
n_mfcc (int): Number of mel frequency cepstral coefficients. n_mfcc (int): Number of mel frequency cepstral coefficients.
n_mels (int): Number of mel filterbanks. n_mels (int): Number of mel filterbanks.
norm (str, optional): Normalizaiton type. Defaults to 'ortho'. norm (Optional[str], optional): Normalizaiton type. Defaults to 'ortho'.
dtype (str, optional): The data type of the return matrix. Defaults to 'float32'.
Returns: Returns:
Tensor: The DCT matrix with shape (n_mels, n_mfcc). Tensor: The DCT matrix with shape `(n_mels, n_mfcc)`.
""" """
n = paddle.arange(n_mels, dtype=dtype) n = paddle.arange(n_mels, dtype=dtype)
k = paddle.arange(n_mfcc, dtype=dtype).unsqueeze(1) k = paddle.arange(n_mfcc, dtype=dtype).unsqueeze(1)
......
...@@ -20,24 +20,11 @@ from paddle import Tensor ...@@ -20,24 +20,11 @@ from paddle import Tensor
__all__ = [ __all__ = [
'get_window', 'get_window',
# windows
'taylor',
'hamming',
'hann',
'tukey',
'kaiser',
'gaussian',
'exponential',
'triang',
'bohman',
'blackman',
'cosine',
] ]
def _cat(a: List[Tensor], data_type: str) -> Tensor: def _cat(x: List[Tensor], data_type: str) -> Tensor:
l = [paddle.to_tensor(_a, data_type) for _a in a] l = [paddle.to_tensor(_, data_type) for _ in x]
return paddle.concat(l) return paddle.concat(l)
...@@ -48,7 +35,7 @@ def _acosh(x: Union[Tensor, float]) -> Tensor: ...@@ -48,7 +35,7 @@ def _acosh(x: Union[Tensor, float]) -> Tensor:
def _extend(M: int, sym: bool) -> bool: def _extend(M: int, sym: bool) -> bool:
"""Extend window by 1 sample if needed for DFT-even symmetry""" """Extend window by 1 sample if needed for DFT-even symmetry. """
if not sym: if not sym:
return M + 1, True return M + 1, True
else: else:
...@@ -56,7 +43,7 @@ def _extend(M: int, sym: bool) -> bool: ...@@ -56,7 +43,7 @@ def _extend(M: int, sym: bool) -> bool:
def _len_guards(M: int) -> bool: def _len_guards(M: int) -> bool:
"""Handle small or incorrect window lengths""" """Handle small or incorrect window lengths. """
if int(M) != M or M < 0: if int(M) != M or M < 0:
raise ValueError('Window length M must be a non-negative integer') raise ValueError('Window length M must be a non-negative integer')
...@@ -64,14 +51,14 @@ def _len_guards(M: int) -> bool: ...@@ -64,14 +51,14 @@ def _len_guards(M: int) -> bool:
def _truncate(w: Tensor, needed: bool) -> Tensor: def _truncate(w: Tensor, needed: bool) -> Tensor:
"""Truncate window by 1 sample if needed for DFT-even symmetry""" """Truncate window by 1 sample if needed for DFT-even symmetry. """
if needed: if needed:
return w[:-1] return w[:-1]
else: else:
return w return w
def general_gaussian(M: int, p, sig, sym: bool=True, def _general_gaussian(M: int, p, sig, sym: bool=True,
dtype: str='float64') -> Tensor: dtype: str='float64') -> Tensor:
"""Compute a window with a generalized Gaussian shape. """Compute a window with a generalized Gaussian shape.
This function is consistent with scipy.signal.windows.general_gaussian(). This function is consistent with scipy.signal.windows.general_gaussian().
...@@ -86,7 +73,7 @@ def general_gaussian(M: int, p, sig, sym: bool=True, ...@@ -86,7 +73,7 @@ def general_gaussian(M: int, p, sig, sym: bool=True,
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def general_cosine(M: int, a: float, sym: bool=True, def _general_cosine(M: int, a: float, sym: bool=True,
dtype: str='float64') -> Tensor: dtype: str='float64') -> Tensor:
"""Compute a generic weighted sum of cosine terms window. """Compute a generic weighted sum of cosine terms window.
This function is consistent with scipy.signal.windows.general_cosine(). This function is consistent with scipy.signal.windows.general_cosine().
...@@ -101,15 +88,15 @@ def general_cosine(M: int, a: float, sym: bool=True, ...@@ -101,15 +88,15 @@ def general_cosine(M: int, a: float, sym: bool=True,
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def general_hamming(M: int, alpha: float, sym: bool=True, def _general_hamming(M: int, alpha: float, sym: bool=True,
dtype: str='float64') -> Tensor: dtype: str='float64') -> Tensor:
"""Compute a generalized Hamming window. """Compute a generalized Hamming window.
This function is consistent with scipy.signal.windows.general_hamming() This function is consistent with scipy.signal.windows.general_hamming()
""" """
return general_cosine(M, [alpha, 1. - alpha], sym, dtype=dtype) return _general_cosine(M, [alpha, 1. - alpha], sym, dtype=dtype)
def taylor(M: int, def _taylor(M: int,
nbar=4, nbar=4,
sll=30, sll=30,
norm=True, norm=True,
...@@ -118,14 +105,6 @@ def taylor(M: int, ...@@ -118,14 +105,6 @@ def taylor(M: int,
"""Compute a Taylor window. """Compute a Taylor window.
The Taylor window taper function approximates the Dolph-Chebyshev window's The Taylor window taper function approximates the Dolph-Chebyshev window's
constant sidelobe level for a parameterized number of near-in sidelobes. constant sidelobe level for a parameterized number of near-in sidelobes.
Parameters:
M(int): window size
nbar, sil, norm: the window-specific parameter.
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
if _len_guards(M): if _len_guards(M):
return paddle.ones((M, ), dtype=dtype) return paddle.ones((M, ), dtype=dtype)
...@@ -171,46 +150,25 @@ def taylor(M: int, ...@@ -171,46 +150,25 @@ def taylor(M: int,
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def hamming(M: int, sym: bool=True, dtype: str='float64') -> Tensor: def _hamming(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
"""Compute a Hamming window. """Compute a Hamming window.
The Hamming window is a taper formed by using a raised cosine with The Hamming window is a taper formed by using a raised cosine with
non-zero endpoints, optimized to minimize the nearest side lobe. non-zero endpoints, optimized to minimize the nearest side lobe.
Parameters:
M(int): window size
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
return general_hamming(M, 0.54, sym, dtype=dtype) return _general_hamming(M, 0.54, sym, dtype=dtype)
def hann(M: int, sym: bool=True, dtype: str='float64') -> Tensor: def _hann(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
"""Compute a Hann window. """Compute a Hann window.
The Hann window is a taper formed by using a raised cosine or sine-squared The Hann window is a taper formed by using a raised cosine or sine-squared
with ends that touch zero. with ends that touch zero.
Parameters:
M(int): window size
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
return general_hamming(M, 0.5, sym, dtype=dtype) return _general_hamming(M, 0.5, sym, dtype=dtype)
def tukey(M: int, alpha=0.5, sym: bool=True, dtype: str='float64') -> Tensor: def _tukey(M: int, alpha=0.5, sym: bool=True, dtype: str='float64') -> Tensor:
"""Compute a Tukey window. """Compute a Tukey window.
The Tukey window is also known as a tapered cosine window. The Tukey window is also known as a tapered cosine window.
Parameters:
M(int): window size
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
if _len_guards(M): if _len_guards(M):
return paddle.ones((M, ), dtype=dtype) return paddle.ones((M, ), dtype=dtype)
...@@ -237,32 +195,18 @@ def tukey(M: int, alpha=0.5, sym: bool=True, dtype: str='float64') -> Tensor: ...@@ -237,32 +195,18 @@ def tukey(M: int, alpha=0.5, sym: bool=True, dtype: str='float64') -> Tensor:
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def kaiser(M: int, beta: float, sym: bool=True, dtype: str='float64') -> Tensor: def _kaiser(M: int, beta: float, sym: bool=True,
dtype: str='float64') -> Tensor:
"""Compute a Kaiser window. """Compute a Kaiser window.
The Kaiser window is a taper formed by using a Bessel function. The Kaiser window is a taper formed by using a Bessel function.
Parameters:
M(int): window size.
beta(float): the window-specific parameter.
sym(bool):whether to return symmetric window.
The default value is True
Returns:
Tensor: the window tensor
""" """
raise NotImplementedError() raise NotImplementedError()
def gaussian(M: int, std: float, sym: bool=True, def _gaussian(M: int, std: float, sym: bool=True,
dtype: str='float64') -> Tensor: dtype: str='float64') -> Tensor:
"""Compute a Gaussian window. """Compute a Gaussian window.
The Gaussian widows has a Gaussian shape defined by the standard deviation(std). The Gaussian widows has a Gaussian shape defined by the standard deviation(std).
Parameters:
M(int): window size.
std(float): the window-specific parameter.
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
if _len_guards(M): if _len_guards(M):
return paddle.ones((M, ), dtype=dtype) return paddle.ones((M, ), dtype=dtype)
...@@ -275,21 +219,12 @@ def gaussian(M: int, std: float, sym: bool=True, ...@@ -275,21 +219,12 @@ def gaussian(M: int, std: float, sym: bool=True,
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def exponential(M: int, def _exponential(M: int,
center=None, center=None,
tau=1., tau=1.,
sym: bool=True, sym: bool=True,
dtype: str='float64') -> Tensor: dtype: str='float64') -> Tensor:
"""Compute an exponential (or Poisson) window. """Compute an exponential (or Poisson) window. """
Parameters:
M(int): window size.
tau(float): the window-specific parameter.
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
"""
if sym and center is not None: if sym and center is not None:
raise ValueError("If sym==True, center must be None.") raise ValueError("If sym==True, center must be None.")
if _len_guards(M): if _len_guards(M):
...@@ -305,15 +240,8 @@ def exponential(M: int, ...@@ -305,15 +240,8 @@ def exponential(M: int,
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def triang(M: int, sym: bool=True, dtype: str='float64') -> Tensor: def _triang(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
"""Compute a triangular window. """Compute a triangular window.
Parameters:
M(int): window size.
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
if _len_guards(M): if _len_guards(M):
return paddle.ones((M, ), dtype=dtype) return paddle.ones((M, ), dtype=dtype)
...@@ -330,16 +258,9 @@ def triang(M: int, sym: bool=True, dtype: str='float64') -> Tensor: ...@@ -330,16 +258,9 @@ def triang(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def bohman(M: int, sym: bool=True, dtype: str='float64') -> Tensor: def _bohman(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
"""Compute a Bohman window. """Compute a Bohman window.
The Bohman window is the autocorrelation of a cosine window. The Bohman window is the autocorrelation of a cosine window.
Parameters:
M(int): window size.
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
if _len_guards(M): if _len_guards(M):
return paddle.ones((M, ), dtype=dtype) return paddle.ones((M, ), dtype=dtype)
...@@ -353,32 +274,18 @@ def bohman(M: int, sym: bool=True, dtype: str='float64') -> Tensor: ...@@ -353,32 +274,18 @@ def bohman(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def blackman(M: int, sym: bool=True, dtype: str='float64') -> Tensor: def _blackman(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
"""Compute a Blackman window. """Compute a Blackman window.
The Blackman window is a taper formed by using the first three terms of The Blackman window is a taper formed by using the first three terms of
a summation of cosines. It was designed to have close to the minimal a summation of cosines. It was designed to have close to the minimal
leakage possible. It is close to optimal, only slightly worse than a leakage possible. It is close to optimal, only slightly worse than a
Kaiser window. Kaiser window.
Parameters:
M(int): window size.
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
return general_cosine(M, [0.42, 0.50, 0.08], sym, dtype=dtype) return _general_cosine(M, [0.42, 0.50, 0.08], sym, dtype=dtype)
def cosine(M: int, sym: bool=True, dtype: str='float64') -> Tensor: def _cosine(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
"""Compute a window with a simple cosine shape. """Compute a window with a simple cosine shape.
Parameters:
M(int): window size.
sym(bool):whether to return symmetric window.
The default value is True
dtype(str): the datatype of returned tensor.
Returns:
Tensor: the window tensor
""" """
if _len_guards(M): if _len_guards(M):
return paddle.ones((M, ), dtype=dtype) return paddle.ones((M, ), dtype=dtype)
...@@ -388,19 +295,20 @@ def cosine(M: int, sym: bool=True, dtype: str='float64') -> Tensor: ...@@ -388,19 +295,20 @@ def cosine(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
## factory function
def get_window(window: Union[str, Tuple[str, float]], def get_window(window: Union[str, Tuple[str, float]],
win_length: int, win_length: int,
fftbins: bool=True, fftbins: bool=True,
dtype: str='float64') -> Tensor: dtype: str='float64') -> Tensor:
"""Return a window of a given length and type. """Return a window of a given length and type.
Parameters:
window(str|(str,float)): the type of window to create. Args:
win_length(int): the number of samples in the window. window (Union[str, Tuple[str, float]]): The window function applied to the signal before the Fourier transform. Supported window functions: 'hamming', 'hann', 'kaiser', 'gaussian', 'exponential', 'triang', 'bohman', 'blackman', 'cosine', 'tukey', 'taylor'.
fftbins(bool): If True, create a "periodic" window. Otherwise, win_length (int): Number of samples.
create a "symmetric" window, for use in filter design. fftbins (bool, optional): If True, create a "periodic" window. Otherwise, create a "symmetric" window, for use in filter design. Defaults to True.
dtype (str, optional): The data type of the return window. Defaults to 'float64'.
Returns: Returns:
The window represented as a tensor. Tensor: The window represented as a tensor.
""" """
sym = not fftbins sym = not fftbins
...@@ -420,7 +328,7 @@ def get_window(window: Union[str, Tuple[str, float]], ...@@ -420,7 +328,7 @@ def get_window(window: Union[str, Tuple[str, float]],
str(type(window))) str(type(window)))
try: try:
winfunc = eval(winstr) winfunc = eval('_' + winstr)
except KeyError as e: except KeyError as e:
raise ValueError("Unknown window type.") from e raise ValueError("Unknown window type.") from e
......
...@@ -20,9 +20,7 @@ __all__ = [ ...@@ -20,9 +20,7 @@ __all__ = [
def dtw_distance(xs: np.ndarray, ys: np.ndarray) -> float: def dtw_distance(xs: np.ndarray, ys: np.ndarray) -> float:
"""dtw distance """Dynamic Time Warping.
Dynamic Time Warping.
This function keeps a compact matrix, not the full warping paths matrix. This function keeps a compact matrix, not the full warping paths matrix.
Uses dynamic programming to compute: Uses dynamic programming to compute:
......
...@@ -178,7 +178,8 @@ class BaseExecutor(ABC): ...@@ -178,7 +178,8 @@ class BaseExecutor(ABC):
Returns: Returns:
bool: return `True` for job input, `False` otherwise. bool: return `True` for job input, `False` otherwise.
""" """
return input_ and os.path.isfile(input_) and input_.endswith('.job') return input_ and os.path.isfile(input_) and (input_.endswith('.job') or
input_.endswith('.txt'))
def _get_job_contents( def _get_job_contents(
self, job_input: os.PathLike) -> Dict[str, Union[str, os.PathLike]]: self, job_input: os.PathLike) -> Dict[str, Union[str, os.PathLike]]:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册