未验证 提交 16e60160 编写于 作者: H Hui Zhang 提交者: GitHub

Kaldi (#839)

* can do frames, real stft

* format

* stft complex, powspec, magspec

* add common utils

* add window process func

* using frames and matmul as stft

* read with 2d; window process

* test with dither, remove dc offset, preermphs

* add doc string

* more frontend utils

* add logspec

* fix typing

* add delpoy mergify label
上级 a75be257
......@@ -76,19 +76,19 @@ class ManifestDataset(Dataset):
Args:
manifest_path (str): manifest josn file path
max_input_len ([type], optional): maximum output seq length,
max_input_len ([type], optional): maximum output seq length,
in seconds for raw wav, in frame numbers for feature data. Defaults to float('inf').
min_input_len (float, optional): minimum input seq length,
min_input_len (float, optional): minimum input seq length,
in seconds for raw wav, in frame numbers for feature data. Defaults to 0.0.
max_output_len (float, optional): maximum input seq length,
max_output_len (float, optional): maximum input seq length,
in modeling units. Defaults to 500.0.
min_output_len (float, optional): minimum input seq length,
min_output_len (float, optional): minimum input seq length,
in modeling units. Defaults to 0.0.
max_output_input_ratio (float, optional): maximum output seq length/output seq length ratio.
max_output_input_ratio (float, optional): maximum output seq length/output seq length ratio.
Defaults to 10.0.
min_output_input_ratio (float, optional): minimum output seq length/output seq length ratio.
Defaults to 0.05.
"""
super().__init__()
......
from typing import Tuple
import numpy as np
import paddle
from paddle import Tensor
from paddle import nn
from paddle.nn import functional as F
def frame(x: Tensor,
num_samples: Tensor,
win_length: int,
hop_length: int,
clip: bool = True) -> Tuple[Tensor, Tensor]:
"""Extract frames from audio.
Parameters
----------
x : Tensor
Shape (N, T), batched waveform.
num_samples : Tensor
Shape (N, ), number of samples of each waveform.
win_length : int
Window length.
hop_length : int
Number of samples shifted between ajancent frames.
clip : bool, optional
Whether to clip audio that does not fit into the last frame, by
default True
Returns
-------
frames : Tensor
Shape (N, T', win_length).
num_frames : Tensor
Shape (N, ) number of valid frames
"""
assert hop_length <= win_length
num_frames = (num_samples - win_length) // hop_length
padding = (0, 0)
if not clip:
num_frames += 1
# NOTE: pad hop_length - 1 to the right to ensure that there is at most
# one frame dangling to the righe edge
padding = (0, hop_length - 1)
weight = paddle.eye(win_length).unsqueeze(1)
frames = F.conv1d(x.unsqueeze(1),
weight,
padding=padding,
stride=(hop_length, ))
return frames, num_frames
class STFT(nn.Layer):
"""A module for computing stft transformation in a differentiable way.
Parameters
------------
n_fft : int
Number of samples in a frame.
hop_length : int
Number of samples shifted between adjacent frames.
win_length : int
Length of the window.
clip: bool
Whether to clip audio is necesaary.
"""
def __init__(self,
n_fft: int,
hop_length: int,
win_length: int,
window_type: str = None,
clip: bool = True):
super().__init__()
self.hop_length = hop_length
self.n_bin = 1 + n_fft // 2
self.n_fft = n_fft
self.clip = clip
# calculate window
if window_type is None:
window = np.ones(win_length)
elif window_type == "hann":
window = np.hanning(win_length)
elif window_type == "hamming":
window = np.hamming(win_length)
else:
raise ValueError("Not supported yet!")
if win_length < n_fft:
window = F.pad(window, (0, n_fft - win_length))
elif win_length > n_fft:
window = window[:n_fft]
# (n_bins, n_fft) complex
kernel_size = min(n_fft, win_length)
weight = np.fft.fft(np.eye(n_fft))[:self.n_bin, :kernel_size]
w_real = weight.real
w_imag = weight.imag
# (2 * n_bins, kernel_size)
w = np.concatenate([w_real, w_imag], axis=0)
w = w * window
# (2 * n_bins, 1, kernel_size) # (C_out, C_in, kernel_size)
w = np.expand_dims(w, 1)
weight = paddle.cast(paddle.to_tensor(w), paddle.get_default_dtype())
self.register_buffer("weight", weight)
def forward(self, x: Tensor, num_samples: Tensor) -> Tuple[Tensor, Tensor]:
"""Compute the stft transform.
Parameters
------------
x : Tensor [shape=(B, T)]
The input waveform.
num_samples : Tensor
Number of samples of each waveform.
Returns
------------
D : Tensor
Shape(N, T', n_bins, 2) Spectrogram.
num_frames: Tensor
Shape (N,) number of samples of each spectrogram
"""
num_frames = (num_samples - self.win_length) // self.hop_length
padding = (0, 0)
if not self.clip:
num_frames += 1
padding = (0, self.hop_length - 1)
batch_size, _, _ = paddle.shape(x)
x = x.unsqueeze(-1)
D = F.conv1d(self.weight,
x,
stride=(self.hop_length, ),
padding=padding,
data_format="NLC")
D = paddle.reshape(D, [batch_size, -1, self.n_bin, 2])
return D, num_frames
import paddle
import numpy as np
from typing import Tuple, Optional, Union
# https://github.com/kaldi-asr/kaldi/blob/cbed4ff688/src/feat/feature-window.cc#L109
def povey_window(frame_len:int) -> np.ndarray:
win = np.empty(frame_len)
a = 2 * np.pi / (frame_len -1)
for i in range(frame_len):
win[i] = (0.5 - 0.5 * np.cos(a * i) )**0.85
return win
def hann_window(frame_len:int) -> np.ndarray:
win = np.empty(frame_len)
a = 2 * np.pi / (frame_len -1)
for i in range(frame_len):
win[i] = 0.5 - 0.5 * np.cos(a * i)
return win
def sine_window(frame_len:int) -> np.ndarray:
win = np.empty(frame_len)
a = 2 * np.pi / (frame_len -1)
for i in range(frame_len):
win[i] = np.sin(0.5 * a * i)
return win
def hamm_window(frame_len:int) -> np.ndarray:
win = np.empty(frame_len)
a = 2 * np.pi / (frame_len -1)
for i in range(frame_len):
win[i] = 0.54 - 0.46 * np.cos(a * i)
return win
def get_window(wintype:Optional[str], winlen:int) -> np.ndarray:
"""get window function
Args:
wintype (Optional[str]): window type.
winlen (int): window length in samples.
Raises:
ValueError: not support window.
Returns:
np.ndarray: window coeffs.
"""
# calculate window
if not wintype or wintype == 'rectangular':
window = np.ones(winlen)
elif wintype == "hann":
window = hann_window(winlen)
elif wintype == "hamm":
window = hamm_window(winlen)
elif wintype == "povey":
window = povey_window(winlen)
else:
msg = f"{wintype} Not supported yet!"
raise ValueError(msg)
return window
def dft_matrix(n_fft:int, winlen:int=None, n_bin:int=None) -> Tuple[np.ndarray, np.ndarray, int]:
# https://en.wikipedia.org/wiki/Discrete_Fourier_transform
# (n_bins, n_fft) complex
if n_bin is None:
n_bin = 1 + n_fft // 2
if winlen is None:
winlen = n_bin
# https://github.com/numpy/numpy/blob/v1.20.0/numpy/fft/_pocketfft.py#L49
kernel_size = min(n_fft, winlen)
n = np.arange(0, n_fft, 1.)
wsin = np.empty((n_bin, kernel_size)) #[Cout, kernel_size]
wcos = np.empty((n_bin, kernel_size)) #[Cout, kernel_size]
for k in range(n_bin): # Only half of the bins contain useful info
wsin[k,:] = -np.sin(2*np.pi*k*n/n_fft)[:kernel_size]
wcos[k,:] = np.cos(2*np.pi*k*n/n_fft)[:kernel_size]
w_real = wcos
w_imag = wsin
return w_real, w_imag, kernel_size
def dft_matrix_fast(n_fft:int, winlen:int=None, n_bin:int=None) -> Tuple[np.ndarray, np.ndarray, int]:
# (n_bins, n_fft) complex
if n_bin is None:
n_bin = 1 + n_fft // 2
if winlen is None:
winlen = n_bin
# https://github.com/numpy/numpy/blob/v1.20.0/numpy/fft/_pocketfft.py#L49
kernel_size = min(n_fft, winlen)
# https://en.wikipedia.org/wiki/DFT_matrix
# https://ccrma.stanford.edu/~jos/st/Matrix_Formulation_DFT.html
weight = np.fft.fft(np.eye(n_fft))[:self.n_bin, :kernel_size]
w_real = weight.real
w_imag = weight.imag
return w_real, w_imag, kernel_size
def bin2hz(bin:Union[List[int], np.ndarray], N:int, sr:int)->List[float]:
"""FFT bins to Hz.
http://practicalcryptography.com/miscellaneous/machine-learning/intuitive-guide-discrete-fourier-transform/
Args:
bins (List[int] or np.ndarray): bin index.
N (int): the number of samples, or FFT points.
sr (int): sampling rate.
Returns:
List[float]: Hz's.
"""
hz = bin * float(sr) / N
def hz2mel(hz):
"""Convert a value in Hertz to Mels
:param hz: a value in Hz. This can also be a numpy array, conversion proceeds element-wise.
:returns: a value in Mels. If an array was passed in, an identical sized array is returned.
"""
return 1127 * np.log(1+hz/700.0)
def mel2hz(mel):
"""Convert a value in Mels to Hertz
:param mel: a value in Mels. This can also be a numpy array, conversion proceeds element-wise.
:returns: a value in Hertz. If an array was passed in, an identical sized array is returned.
"""
return 700 * (np.exp(mel/1127.0)-1)
def rms_to_db(rms: float):
"""Root Mean Square to dB.
Args:
rms ([float]): root mean square
Returns:
float: dB
"""
return 20.0 * math.log10(max(1e-16, rms))
def rms_to_dbfs(rms: float):
"""Root Mean Square to dBFS.
https://fireattack.wordpress.com/2017/02/06/replaygain-loudness-normalization-and-applications/
Audio is mix of sine wave, so 1 amp sine wave's Full scale is 0.7071, equal to -3.0103dB.
dB = dBFS + 3.0103
dBFS = db - 3.0103
e.g. 0 dB = -3.0103 dBFS
Args:
rms ([float]): root mean square
Returns:
float: dBFS
"""
return rms_to_db(rms) - 3.0103
def max_dbfs(sample_data: np.ndarray):
"""Peak dBFS based on the maximum energy sample.
Args:
sample_data ([np.ndarray]): float array, [-1, 1].
Returns:
float: dBFS
"""
# Peak dBFS based on the maximum energy sample. Will prevent overdrive if used for normalization.
return rms_to_dbfs(max(abs(np.min(sample_data)), abs(np.max(sample_data))))
def mean_dbfs(sample_data):
"""Peak dBFS based on the RMS energy.
Args:
sample_data ([np.ndarray]): float array, [-1, 1].
Returns:
float: dBFS
"""
return rms_to_dbfs(
math.sqrt(np.mean(np.square(sample_data, dtype=np.float64))))
def gain_db_to_ratio(gain_db: float):
"""dB to ratio
Args:
gain_db (float): gain in dB
Returns:
float: scale in amp
"""
return math.pow(10.0, gain_db / 20.0)
\ No newline at end of file
from typing import Tuple
import numpy as np
import paddle
from paddle import Tensor
from paddle import nn
from paddle.nn import functional as F
import soundfile as sf
from .common import get_window
from .common import dft_matrix
def read(wavpath:str, sr:int = None, start=0, stop=None, dtype='int16', always_2d=True)->Tuple[int, np.ndarray]:
"""load wav file.
Args:
wavpath (str): wav path.
sr (int, optional): expect sample rate. Defaults to None.
dtype (str, optional): wav data bits. Defaults to 'int16'.
Returns:
Tuple[int, np.ndarray]: sr (int), wav (int16) [T, C].
"""
wav, r_sr = sf.read(wavpath, start=start, stop=stop, dtype=dtype, always_2d=always_2d)
if sr:
assert sr == r_sr
return r_sr, wav
def write(wavpath:str, wav:np.ndarray, sr:int, dtype='PCM_16'):
"""write wav file.
Args:
wavpath (str): file path to save.
wav (np.ndarray): wav data.
sr (int): data samplerate.
dtype (str, optional): wav bit format. Defaults to 'PCM_16'.
"""
sf.write(wavpath, wav, sr, subtype=dtype)
def frames(x: Tensor,
num_samples: Tensor,
sr: int,
win_length: float,
stride_length: float,
clip: bool = False) -> Tuple[Tensor, Tensor]:
"""Extract frames from audio.
Parameters
----------
x : Tensor
Shape (B, T), batched waveform.
num_samples : Tensor
Shape (B, ), number of samples of each waveform.
sr: int
Sampling Rate.
win_length : float
Window length in ms.
stride_length : float
Stride length in ms.
clip : bool, optional
Whether to clip audio that does not fit into the last frame, by
default True
Returns
-------
frames : Tensor
Shape (B, T', win_length).
num_frames : Tensor
Shape (B, ) number of valid frames
"""
assert stride_length <= win_length
stride_length = int(stride_length * sr)
win_length = int(win_length * sr)
num_frames = (num_samples - win_length) // stride_length
padding = (0, 0)
if not clip:
num_frames += 1
need_samples = num_frames * stride_length + win_length
padding = (0, need_samples - num_samples - 1)
weight = paddle.eye(win_length).unsqueeze(1) #[win_length, 1, win_length]
frames = F.conv1d(x.unsqueeze(-1),
weight,
padding=padding,
stride=(stride_length, ),
data_format='NLC')
return frames, num_frames
def dither(signal:Tensor, dither_value=1.0)->Tensor:
"""dither frames for log compute.
Args:
signal (Tensor): [B, T, D]
dither_value (float, optional): [scalar]. Defaults to 1.0.
Returns:
Tensor: [B, T, D]
"""
D = paddle.shape(signal)[-1]
signal += paddle.normal(shape=[1, 1, D]) * dither_value
return signal
def remove_dc_offset(signal:Tensor)->Tensor:
"""remove dc.
Args:
signal (Tensor): [B, T, D]
Returns:
Tensor: [B, T, D]
"""
signal -= paddle.mean(signal, axis=-1, keepdim=True)
return signal
def preemphasis(signal:Tensor, coeff=0.97)->Tensor:
"""perform preemphasis on the input signal.
Args:
signal (Tensor): [B, T, D], The signal to filter.
coeff (float, optional): [scalar].The preemphasis coefficient. 0 is no filter, Defaults to 0.97.
Returns:
Tensor: [B, T, D]
"""
return paddle.concat([
(1-coeff)*signal[:, :, 0:1],
signal[:, :, 1:] - coeff * signal[:, :, :-1]
], axis=-1)
class STFT(nn.Layer):
"""A module for computing stft transformation in a differentiable way.
http://practicalcryptography.com/miscellaneous/machine-learning/intuitive-guide-discrete-fourier-transform/
Parameters
------------
n_fft : int
Number of samples in a frame.
sr: int
Number of Samplilng rate.
stride_length : float
Number of samples shifted between adjacent frames.
win_length : float
Length of the window.
clip: bool
Whether to clip audio is necesaary.
"""
def __init__(self,
n_fft: int,
sr: int,
win_length: float,
stride_length: float,
dither:float=0.0,
preemph_coeff:float=0.97,
remove_dc_offset:bool=True,
window_type: str = 'povey',
clip: bool = False):
super().__init__()
self.sr = sr
self.win_length = win_length
self.stride_length = stride_length
self.dither = dither
self.preemph_coeff = preemph_coeff
self.remove_dc_offset = remove_dc_offset
self.window_type = window_type
self.clip = clip
self.n_fft = n_fft
self.n_bin = 1 + n_fft // 2
w_real, w_imag, kernel_size = dft_matrix(
self.n_fft, int(self.win_length * self.sr), self.n_bin
)
# calculate window
window = get_window(window_type, kernel_size)
# (2 * n_bins, kernel_size)
w = np.concatenate([w_real, w_imag], axis=0)
w = w * window
# (kernel_size, 2 * n_bins)
w = np.transpose(w)
weight = paddle.cast(paddle.to_tensor(w), paddle.get_default_dtype())
self.register_buffer("weight", weight)
def forward(self, x: Tensor, num_samples: Tensor) -> Tuple[Tensor, Tensor]:
"""Compute the stft transform.
Parameters
------------
x : Tensor [shape=(B, T)]
The input waveform.
num_samples : Tensor [shape=(B,)]
Number of samples of each waveform.
Returns
------------
C : Tensor
Shape(B, T', n_bins, 2) Spectrogram.
num_frames: Tensor
Shape (B,) number of samples of each spectrogram
"""
batch_size = paddle.shape(num_samples)
F, nframe = frames(x, num_samples, self.sr, self.win_length, self.stride_length, clip=self.clip)
if self.dither:
F = dither(F, self.dither)
if self.remove_dc_offset:
F = remove_dc_offset(F)
if self.preemph_coeff:
F = preemphasis(F)
C = paddle.matmul(F, self.weight) # [B, T, K] [K, 2 * n_bins]
C = paddle.reshape(C, [batch_size, -1, 2, self.n_bin])
C = C.transpose([0, 1, 3, 2])
return C, nframe
def powspec(C:Tensor) -> Tensor:
"""Compute the power spectrum |X_k|^2.
Args:
C (Tensor): [B, T, C, 2]
Returns:
Tensor: [B, T, C]
"""
real, imag = paddle.chunk(C, 2, axis=-1)
return paddle.square(real.squeeze(-1)) + paddle.square(imag.squeeze(-1))
def magspec(C: Tensor, eps=1e-10) -> Tensor:
"""Compute the magnitude spectrum |X_k|.
Args:
C (Tensor): [B, T, C, 2]
eps (float): epsilon.
Returns:
Tensor: [B, T, C]
"""
pspec = powspec(C)
return paddle.sqrt(pspec + eps)
def logspec(C: Tensor, eps=1e-10) -> Tensor:
"""Compute log-spectrum 20log10∣X_k∣.
Args:
C (Tensor): [description]
eps ([type], optional): [description]. Defaults to 1e-10.
Returns:
Tensor: [description]
"""
spec = magspec(C)
return 20 * paddle.log10(spec + eps)
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册