未验证 提交 2100df8b 编写于 作者: K KP 提交者: GitHub

Merge branch 'develop' into add_photopen_module

# kwmlp_speech_commands
|模型名称|kwmlp_speech_commands|
| :--- | :---: |
|类别|语音-语言识别|
|网络|Keyword-MLP|
|数据集|Google Speech Commands V2|
|是否支持Fine-tuning|否|
|模型大小|1.6MB|
|最新更新日期|2022-01-04|
|数据指标|ACC 97.56%|
## 一、模型基本信息
### 模型介绍
kwmlp_speech_commands采用了 [Keyword-MLP](https://arxiv.org/pdf/2110.07749v1.pdf) 的轻量级模型结构,并在 [Google Speech Commands V2](https://arxiv.org/abs/1804.03209) 数据集上进行了预训练,在其测试集的测试结果为 ACC 97.56%。
<p align="center">
<img src="https://d3i71xaburhd42.cloudfront.net/fa690a97f76ba119ca08fb02fa524a546c47f031/2-Figure1-1.png" hspace='10' height="550"/> <br />
</p>
更多详情请参考
- [Speech Commands: A Dataset for Limited-Vocabulary Speech Recognition](https://arxiv.org/abs/1804.03209)
- [ATTENTION-FREE KEYWORD SPOTTING](https://arxiv.org/pdf/2110.07749v1.pdf)
- [Keyword-MLP](https://github.com/AI-Research-BD/Keyword-MLP)
## 二、安装
- ### 1、环境依赖
- paddlepaddle >= 2.2.0
- paddlehub >= 2.2.0 | [如何安装PaddleHub](../../../../docs/docs_ch/get_start/installation.rst)
- ### 2、安装
- ```shell
$ hub install kwmlp_speech_commands
```
- 如您安装时遇到问题,可参考:[零基础windows安装](../../../../docs/docs_ch/get_start/windows_quickstart.md)
| [零基础Linux安装](../../../../docs/docs_ch/get_start/linux_quickstart.md) | [零基础MacOS安装](../../../../docs/docs_ch/get_start/mac_quickstart.md)
## 三、模型API预测
- ### 1、预测代码示例
```python
import paddlehub as hub
model = hub.Module(
name='kwmlp_speech_commands',
version='1.0.0')
# 通过下列链接可下载示例音频
# https://paddlehub.bj.bcebos.com/paddlehub_dev/go.wav
# Keyword spotting
score, label = model.keyword_recognize('no.wav')
print(score, label)
# [0.89498246] no
score, label = model.keyword_recognize('go.wav')
print(score, label)
# [0.8997176] go
score, label = model.keyword_recognize('one.wav')
print(score, label)
# [0.88598305] one
```
- ### 2、API
- ```python
def keyword_recognize(
wav: os.PathLike,
)
```
- 检测音频中包含的关键词。
- **参数**
- `wav`:输入的包含关键词的音频文件,格式为`*.wav`。
- **返回**
- 输出结果的得分和对应的关键词标签。
## 四、更新历史
* 1.0.0
初始发布
```shell
$ hub install kwmlp_speech_commands
```
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import numpy as np
import paddle
import paddleaudio
def create_dct(n_mfcc: int, n_mels: int, norm: str = 'ortho'):
n = paddle.arange(float(n_mels))
k = paddle.arange(float(n_mfcc)).unsqueeze(1)
dct = paddle.cos(math.pi / float(n_mels) * (n + 0.5) * k) # size (n_mfcc, n_mels)
if norm is None:
dct *= 2.0
else:
assert norm == "ortho"
dct[0] *= 1.0 / math.sqrt(2.0)
dct *= math.sqrt(2.0 / float(n_mels))
return dct.t()
def compute_mfcc(
x: paddle.Tensor,
sr: int = 16000,
n_mels: int = 40,
n_fft: int = 480,
win_length: int = 480,
hop_length: int = 160,
f_min: float = 0.0,
f_max: float = None,
center: bool = False,
top_db: float = 80.0,
norm: str = 'ortho',
):
fbank = paddleaudio.features.spectrum.MelSpectrogram(
sr=sr,
n_mels=n_mels,
n_fft=n_fft,
win_length=win_length,
hop_length=hop_length,
f_min=0.0,
f_max=f_max,
center=center)(x) # waveforms batch ~ (B, T)
log_fbank = paddleaudio.features.spectrum.power_to_db(fbank, top_db=top_db)
dct_matrix = create_dct(n_mfcc=n_mels, n_mels=n_mels, norm=norm)
mfcc = paddle.matmul(log_fbank.transpose((0, 2, 1)), dct_matrix).transpose((0, 2, 1)) # (B, n_mels, L)
return mfcc
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
class Residual(nn.Layer):
def __init__(self, fn):
super().__init__()
self.fn = fn
def forward(self, x):
return self.fn(x) + x
class PreNorm(nn.Layer):
def __init__(self, dim, fn):
super().__init__()
self.fn = fn
self.norm = nn.LayerNorm(dim)
def forward(self, x, **kwargs):
x = self.norm(x)
return self.fn(x, **kwargs)
class PostNorm(nn.Layer):
def __init__(self, dim, fn):
super().__init__()
self.norm = nn.LayerNorm(dim)
self.fn = fn
def forward(self, x, **kwargs):
return self.norm(self.fn(x, **kwargs))
class SpatialGatingUnit(nn.Layer):
def __init__(self, dim, dim_seq, act=nn.Identity(), init_eps=1e-3):
super().__init__()
dim_out = dim // 2
self.norm = nn.LayerNorm(dim_out)
self.proj = nn.Conv1D(dim_seq, dim_seq, 1)
self.act = act
init_eps /= dim_seq
def forward(self, x):
res, gate = x.split(2, axis=-1)
gate = self.norm(gate)
weight, bias = self.proj.weight, self.proj.bias
gate = F.conv1d(gate, weight, bias)
return self.act(gate) * res
class gMLPBlock(nn.Layer):
def __init__(self, *, dim, dim_ff, seq_len, act=nn.Identity()):
super().__init__()
self.proj_in = nn.Sequential(nn.Linear(dim, dim_ff), nn.GELU())
self.sgu = SpatialGatingUnit(dim_ff, seq_len, act)
self.proj_out = nn.Linear(dim_ff // 2, dim)
def forward(self, x):
x = self.proj_in(x)
x = self.sgu(x)
x = self.proj_out(x)
return x
class Rearrange(nn.Layer):
def __init__(self):
super().__init__()
def forward(self, x):
x = x.transpose([0, 1, 3, 2]).squeeze(1)
return x
class Reduce(nn.Layer):
def __init__(self, axis=1):
super().__init__()
self.axis = axis
def forward(self, x):
x = x.mean(axis=self.axis, keepdim=False)
return x
class KW_MLP(nn.Layer):
"""Keyword-MLP."""
def __init__(self,
input_res=[40, 98],
patch_res=[40, 1],
num_classes=35,
dim=64,
depth=12,
ff_mult=4,
channels=1,
prob_survival=0.9,
pre_norm=False,
**kwargs):
super().__init__()
image_height, image_width = input_res
patch_height, patch_width = patch_res
assert (image_height % patch_height) == 0 and (
image_width % patch_width) == 0, 'image height and width must be divisible by patch size'
num_patches = (image_height // patch_height) * (image_width // patch_width)
P_Norm = PreNorm if pre_norm else PostNorm
dim_ff = dim * ff_mult
self.to_patch_embed = nn.Sequential(Rearrange(), nn.Linear(channels * patch_height * patch_width, dim))
self.prob_survival = prob_survival
self.layers = nn.LayerList(
[Residual(P_Norm(dim, gMLPBlock(dim=dim, dim_ff=dim_ff, seq_len=num_patches))) for i in range(depth)])
self.to_logits = nn.Sequential(nn.LayerNorm(dim), Reduce(axis=1), nn.Linear(dim, num_classes))
def forward(self, x):
x = self.to_patch_embed(x)
layers = self.layers
x = nn.Sequential(*layers)(x)
return self.to_logits(x)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import paddle
import paddleaudio
from .feature import compute_mfcc
from .kwmlp import KW_MLP
from paddlehub.module.module import moduleinfo
from paddlehub.utils.log import logger
@moduleinfo(
name="kwmlp_speech_commands",
version="1.0.0",
summary="",
author="paddlepaddle",
author_email="",
type="audio/language_identification")
class KWS(paddle.nn.Layer):
def __init__(self):
super(KWS, self).__init__()
ckpt_path = os.path.join(self.directory, 'assets', 'model.pdparams')
label_path = os.path.join(self.directory, 'assets', 'label.txt')
self.label_list = []
with open(label_path, 'r') as f:
for l in f:
self.label_list.append(l.strip())
self.sr = 16000
model_conf = {
'input_res': [40, 98],
'patch_res': [40, 1],
'num_classes': 35,
'channels': 1,
'dim': 64,
'depth': 12,
'pre_norm': False,
'prob_survival': 0.9,
}
self.model = KW_MLP(**model_conf)
self.model.set_state_dict(paddle.load(ckpt_path))
self.model.eval()
def load_audio(self, wav):
wav = os.path.abspath(os.path.expanduser(wav))
assert os.path.isfile(wav), 'Please check wav file: {}'.format(wav)
waveform, _ = paddleaudio.load(wav, sr=self.sr, mono=True, normal=False)
return waveform
def keyword_recognize(self, wav):
waveform = self.load_audio(wav)
# fix_length to 1s
if len(waveform) > self.sr:
waveform = waveform[:self.sr]
else:
waveform = np.pad(waveform, (0, self.sr - len(waveform)))
logits = self(paddle.to_tensor(waveform)).reshape([-1])
probs = paddle.nn.functional.softmax(logits)
idx = paddle.argmax(probs)
return probs[idx].numpy(), self.label_list[idx]
def forward(self, x):
if len(x.shape) == 1: # x: waveform tensors with (B, T) shape
x = x.unsqueeze(0)
mfcc = compute_mfcc(x).unsqueeze(1) # (B, C, n_mels, L)
logits = self.model(mfcc).squeeze(1)
return logits
# ecapa_tdnn_common_language
|模型名称|ecapa_tdnn_common_language|
| :--- | :---: |
|类别|语音-语言识别|
|网络|ECAPA-TDNN|
|数据集|CommonLanguage|
|是否支持Fine-tuning|否|
|模型大小|79MB|
|最新更新日期|2021-12-30|
|数据指标|ACC 84.9%|
## 一、模型基本信息
### 模型介绍
ecapa_tdnn_common_language采用了[ECAPA-TDNN](https://arxiv.org/abs/2005.07143)的模型结构,并在[CommonLanguage](https://zenodo.org/record/5036977/)数据集上进行了预训练,在其测试集的测试结果为 ACC 84.9%。
<p align="center">
<img src="https://d3i71xaburhd42.cloudfront.net/9609f4817a7e769f5e3e07084db35e46696e82cd/3-Figure2-1.png" hspace='10' height="550"/> <br />
</p>
更多详情请参考
- [CommonLanguage](https://zenodo.org/record/5036977#.Yc19b5Mzb0o)
- [ECAPA-TDNN: Emphasized Channel Attention, Propagation and Aggregation in TDNN Based Speaker Verification](https://arxiv.org/pdf/2005.07143.pdf)
- [The SpeechBrain Toolkit](https://github.com/speechbrain/speechbrain)
## 二、安装
- ### 1、环境依赖
- paddlepaddle >= 2.2.0
- paddlehub >= 2.2.0 | [如何安装PaddleHub](../../../../docs/docs_ch/get_start/installation.rst)
- ### 2、安装
- ```shell
$ hub install ecapa_tdnn_common_language
```
- 如您安装时遇到问题,可参考:[零基础windows安装](../../../../docs/docs_ch/get_start/windows_quickstart.md)
| [零基础Linux安装](../../../../docs/docs_ch/get_start/linux_quickstart.md) | [零基础MacOS安装](../../../../docs/docs_ch/get_start/mac_quickstart.md)
## 三、模型API预测
- ### 1、预测代码示例
```python
import paddlehub as hub
model = hub.Module(
name='ecapa_tdnn_common_language',
version='1.0.0')
# 通过下列链接可下载示例音频
# https://paddlehub.bj.bcebos.com/paddlehub_dev/zh.wav
# https://paddlehub.bj.bcebos.com/paddlehub_dev/en.wav
# https://paddlehub.bj.bcebos.com/paddlehub_dev/it.wav
# Language Identification
score, label = model.speaker_verify('zh.wav')
print(score, label)
# array([0.6214552], dtype=float32), 'Chinese_China'
score, label = model.speaker_verify('en.wav')
print(score, label)
# array([0.37193954], dtype=float32), 'English'
score, label = model.speaker_verify('it.wav')
print(score, label)
# array([0.46913534], dtype=float32), 'Italian'
```
- ### 2、API
- ```python
def language_identify(
wav: os.PathLike,
)
```
- 判断输入人声音频的语言类别。
- **参数**
- `wav`:输入的说话人的音频文件,格式为`*.wav`。
- **返回**
- 输出结果的得分和对应的语言类别。
## 四、更新历史
* 1.0.0
初始发布
```shell
$ hub install ecapa_tdnn_common_language
```
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import os
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
def length_to_mask(length, max_len=None, dtype=None):
assert len(length.shape) == 1
if max_len is None:
max_len = length.max().astype('int').item() # using arange to generate mask
mask = paddle.arange(max_len, dtype=length.dtype).expand((len(length), max_len)) < length.unsqueeze(1)
if dtype is None:
dtype = length.dtype
mask = paddle.to_tensor(mask, dtype=dtype)
return mask
class Conv1d(nn.Layer):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
stride=1,
padding="same",
dilation=1,
groups=1,
bias=True,
padding_mode="reflect",
):
super(Conv1d, self).__init__()
self.kernel_size = kernel_size
self.stride = stride
self.dilation = dilation
self.padding = padding
self.padding_mode = padding_mode
self.conv = nn.Conv1D(
in_channels,
out_channels,
self.kernel_size,
stride=self.stride,
padding=0,
dilation=self.dilation,
groups=groups,
bias_attr=bias,
)
def forward(self, x):
if self.padding == "same":
x = self._manage_padding(x, self.kernel_size, self.dilation, self.stride)
else:
raise ValueError("Padding must be 'same'. Got {self.padding}")
return self.conv(x)
def _manage_padding(self, x, kernel_size: int, dilation: int, stride: int):
L_in = x.shape[-1] # Detecting input shape
padding = self._get_padding_elem(L_in, stride, kernel_size, dilation) # Time padding
x = F.pad(x, padding, mode=self.padding_mode, data_format="NCL") # Applying padding
return x
def _get_padding_elem(self, L_in: int, stride: int, kernel_size: int, dilation: int):
if stride > 1:
n_steps = math.ceil(((L_in - kernel_size * dilation) / stride) + 1)
L_out = stride * (n_steps - 1) + kernel_size * dilation
padding = [kernel_size // 2, kernel_size // 2]
else:
L_out = (L_in - dilation * (kernel_size - 1) - 1) // stride + 1
padding = [(L_in - L_out) // 2, (L_in - L_out) // 2]
return padding
class BatchNorm1d(nn.Layer):
def __init__(
self,
input_size,
eps=1e-05,
momentum=0.9,
weight_attr=None,
bias_attr=None,
data_format='NCL',
use_global_stats=None,
):
super(BatchNorm1d, self).__init__()
self.norm = nn.BatchNorm1D(
input_size,
epsilon=eps,
momentum=momentum,
weight_attr=weight_attr,
bias_attr=bias_attr,
data_format=data_format,
use_global_stats=use_global_stats,
)
def forward(self, x):
x_n = self.norm(x)
return x_n
class TDNNBlock(nn.Layer):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
dilation,
activation=nn.ReLU,
):
super(TDNNBlock, self).__init__()
self.conv = Conv1d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
dilation=dilation,
)
self.activation = activation()
self.norm = BatchNorm1d(input_size=out_channels)
def forward(self, x):
return self.norm(self.activation(self.conv(x)))
class Res2NetBlock(nn.Layer):
def __init__(self, in_channels, out_channels, scale=8, dilation=1):
super(Res2NetBlock, self).__init__()
assert in_channels % scale == 0
assert out_channels % scale == 0
in_channel = in_channels // scale
hidden_channel = out_channels // scale
self.blocks = nn.LayerList(
[TDNNBlock(in_channel, hidden_channel, kernel_size=3, dilation=dilation) for i in range(scale - 1)])
self.scale = scale
def forward(self, x):
y = []
for i, x_i in enumerate(paddle.chunk(x, self.scale, axis=1)):
if i == 0:
y_i = x_i
elif i == 1:
y_i = self.blocks[i - 1](x_i)
else:
y_i = self.blocks[i - 1](x_i + y_i)
y.append(y_i)
y = paddle.concat(y, axis=1)
return y
class SEBlock(nn.Layer):
def __init__(self, in_channels, se_channels, out_channels):
super(SEBlock, self).__init__()
self.conv1 = Conv1d(in_channels=in_channels, out_channels=se_channels, kernel_size=1)
self.relu = paddle.nn.ReLU()
self.conv2 = Conv1d(in_channels=se_channels, out_channels=out_channels, kernel_size=1)
self.sigmoid = paddle.nn.Sigmoid()
def forward(self, x, lengths=None):
L = x.shape[-1]
if lengths is not None:
mask = length_to_mask(lengths * L, max_len=L)
mask = mask.unsqueeze(1)
total = mask.sum(axis=2, keepdim=True)
s = (x * mask).sum(axis=2, keepdim=True) / total
else:
s = x.mean(axis=2, keepdim=True)
s = self.relu(self.conv1(s))
s = self.sigmoid(self.conv2(s))
return s * x
class AttentiveStatisticsPooling(nn.Layer):
def __init__(self, channels, attention_channels=128, global_context=True):
super().__init__()
self.eps = 1e-12
self.global_context = global_context
if global_context:
self.tdnn = TDNNBlock(channels * 3, attention_channels, 1, 1)
else:
self.tdnn = TDNNBlock(channels, attention_channels, 1, 1)
self.tanh = nn.Tanh()
self.conv = Conv1d(in_channels=attention_channels, out_channels=channels, kernel_size=1)
def forward(self, x, lengths=None):
C, L = x.shape[1], x.shape[2] # KP: (N, C, L)
def _compute_statistics(x, m, axis=2, eps=self.eps):
mean = (m * x).sum(axis)
std = paddle.sqrt((m * (x - mean.unsqueeze(axis)).pow(2)).sum(axis).clip(eps))
return mean, std
if lengths is None:
lengths = paddle.ones([x.shape[0]])
# Make binary mask of shape [N, 1, L]
mask = length_to_mask(lengths * L, max_len=L)
mask = mask.unsqueeze(1)
# Expand the temporal context of the pooling layer by allowing the
# self-attention to look at global properties of the utterance.
if self.global_context:
total = mask.sum(axis=2, keepdim=True).astype('float32')
mean, std = _compute_statistics(x, mask / total)
mean = mean.unsqueeze(2).tile((1, 1, L))
std = std.unsqueeze(2).tile((1, 1, L))
attn = paddle.concat([x, mean, std], axis=1)
else:
attn = x
# Apply layers
attn = self.conv(self.tanh(self.tdnn(attn)))
# Filter out zero-paddings
attn = paddle.where(mask.tile((1, C, 1)) == 0, paddle.ones_like(attn) * float("-inf"), attn)
attn = F.softmax(attn, axis=2)
mean, std = _compute_statistics(x, attn)
# Append mean and std of the batch
pooled_stats = paddle.concat((mean, std), axis=1)
pooled_stats = pooled_stats.unsqueeze(2)
return pooled_stats
class SERes2NetBlock(nn.Layer):
def __init__(
self,
in_channels,
out_channels,
res2net_scale=8,
se_channels=128,
kernel_size=1,
dilation=1,
activation=nn.ReLU,
):
super(SERes2NetBlock, self).__init__()
self.out_channels = out_channels
self.tdnn1 = TDNNBlock(
in_channels,
out_channels,
kernel_size=1,
dilation=1,
activation=activation,
)
self.res2net_block = Res2NetBlock(out_channels, out_channels, res2net_scale, dilation)
self.tdnn2 = TDNNBlock(
out_channels,
out_channels,
kernel_size=1,
dilation=1,
activation=activation,
)
self.se_block = SEBlock(out_channels, se_channels, out_channels)
self.shortcut = None
if in_channels != out_channels:
self.shortcut = Conv1d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
)
def forward(self, x, lengths=None):
residual = x
if self.shortcut:
residual = self.shortcut(x)
x = self.tdnn1(x)
x = self.res2net_block(x)
x = self.tdnn2(x)
x = self.se_block(x, lengths)
return x + residual
class ECAPA_TDNN(nn.Layer):
def __init__(
self,
input_size,
lin_neurons=192,
activation=nn.ReLU,
channels=[512, 512, 512, 512, 1536],
kernel_sizes=[5, 3, 3, 3, 1],
dilations=[1, 2, 3, 4, 1],
attention_channels=128,
res2net_scale=8,
se_channels=128,
global_context=True,
):
super(ECAPA_TDNN, self).__init__()
assert len(channels) == len(kernel_sizes)
assert len(channels) == len(dilations)
self.channels = channels
self.blocks = nn.LayerList()
self.emb_size = lin_neurons
# The initial TDNN layer
self.blocks.append(TDNNBlock(
input_size,
channels[0],
kernel_sizes[0],
dilations[0],
activation,
))
# SE-Res2Net layers
for i in range(1, len(channels) - 1):
self.blocks.append(
SERes2NetBlock(
channels[i - 1],
channels[i],
res2net_scale=res2net_scale,
se_channels=se_channels,
kernel_size=kernel_sizes[i],
dilation=dilations[i],
activation=activation,
))
# Multi-layer feature aggregation
self.mfa = TDNNBlock(
channels[-1],
channels[-1],
kernel_sizes[-1],
dilations[-1],
activation,
)
# Attentive Statistical Pooling
self.asp = AttentiveStatisticsPooling(
channels[-1],
attention_channels=attention_channels,
global_context=global_context,
)
self.asp_bn = BatchNorm1d(input_size=channels[-1] * 2)
# Final linear transformation
self.fc = Conv1d(
in_channels=channels[-1] * 2,
out_channels=self.emb_size,
kernel_size=1,
)
def forward(self, x, lengths=None):
xl = []
for layer in self.blocks:
try:
x = layer(x, lengths=lengths)
except TypeError:
x = layer(x)
xl.append(x)
# Multi-layer feature aggregation
x = paddle.concat(xl[1:], axis=1)
x = self.mfa(x)
# Attentive Statistical Pooling
x = self.asp(x, lengths=lengths)
x = self.asp_bn(x)
# Final linear transformation
x = self.fc(x)
return x
class Classifier(nn.Layer):
def __init__(self, backbone, num_class, dtype=paddle.float32):
super(Classifier, self).__init__()
self.backbone = backbone
self.params = nn.ParameterList(
[paddle.create_parameter(shape=[num_class, self.backbone.emb_size], dtype=dtype)])
def forward(self, x):
emb = self.backbone(x.transpose([0, 2, 1])).transpose([0, 2, 1])
logits = F.linear(F.normalize(emb.squeeze(1)), F.normalize(self.params[0]).transpose([1, 0]))
return logits
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddleaudio
from paddleaudio.features.spectrum import hz_to_mel
from paddleaudio.features.spectrum import mel_to_hz
from paddleaudio.features.spectrum import power_to_db
from paddleaudio.features.spectrum import Spectrogram
from paddleaudio.features.window import get_window
def compute_fbank_matrix(sample_rate: int = 16000,
n_fft: int = 400,
n_mels: int = 80,
f_min: int = 0.0,
f_max: int = 8000.0):
mel = paddle.linspace(hz_to_mel(f_min, htk=True), hz_to_mel(f_max, htk=True), n_mels + 2, dtype=paddle.float32)
hz = mel_to_hz(mel, htk=True)
band = hz[1:] - hz[:-1]
band = band[:-1]
f_central = hz[1:-1]
n_stft = n_fft // 2 + 1
all_freqs = paddle.linspace(0, sample_rate // 2, n_stft)
all_freqs_mat = all_freqs.tile([f_central.shape[0], 1])
f_central_mat = f_central.tile([all_freqs_mat.shape[1], 1]).transpose([1, 0])
band_mat = band.tile([all_freqs_mat.shape[1], 1]).transpose([1, 0])
slope = (all_freqs_mat - f_central_mat) / band_mat
left_side = slope + 1.0
right_side = -slope + 1.0
fbank_matrix = paddle.maximum(paddle.zeros_like(left_side), paddle.minimum(left_side, right_side))
return fbank_matrix
def compute_log_fbank(
x: paddle.Tensor,
sample_rate: int = 16000,
n_fft: int = 400,
hop_length: int = 160,
win_length: int = 400,
n_mels: int = 80,
window: str = 'hamming',
center: bool = True,
pad_mode: str = 'constant',
f_min: float = 0.0,
f_max: float = None,
top_db: float = 80.0,
):
if f_max is None:
f_max = sample_rate / 2
spect = Spectrogram(
n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window, center=center, pad_mode=pad_mode)(x)
fbank_matrix = compute_fbank_matrix(
sample_rate=sample_rate,
n_fft=n_fft,
n_mels=n_mels,
f_min=f_min,
f_max=f_max,
)
fbank = paddle.matmul(fbank_matrix, spect)
log_fbank = power_to_db(fbank, top_db=top_db).transpose([0, 2, 1])
return log_fbank
def compute_stats(x: paddle.Tensor, mean_norm: bool = True, std_norm: bool = False, eps: float = 1e-10):
if mean_norm:
current_mean = paddle.mean(x, axis=0)
else:
current_mean = paddle.to_tensor([0.0])
if std_norm:
current_std = paddle.std(x, axis=0)
else:
current_std = paddle.to_tensor([1.0])
current_std = paddle.maximum(current_std, eps * paddle.ones_like(current_std))
return current_mean, current_std
def normalize(
x: paddle.Tensor,
global_mean: paddle.Tensor = None,
global_std: paddle.Tensor = None,
):
for i in range(x.shape[0]): # (B, ...)
if global_mean is None and global_std is None:
mean, std = compute_stats(x[i])
x[i] = (x[i] - mean) / std
else:
x[i] = (x[i] - global_mean) / global_std
return x
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
from typing import List
from typing import Union
import numpy as np
import paddle
import paddleaudio
from .ecapa_tdnn import Classifier
from .ecapa_tdnn import ECAPA_TDNN
from .feature import compute_log_fbank
from .feature import normalize
from paddlehub.module.module import moduleinfo
from paddlehub.utils.log import logger
@moduleinfo(
name="ecapa_tdnn_common_language",
version="1.0.0",
summary="",
author="paddlepaddle",
author_email="",
type="audio/language_identification")
class LanguageIdentification(paddle.nn.Layer):
def __init__(self):
super(LanguageIdentification, self).__init__()
ckpt_path = os.path.join(self.directory, 'assets', 'model.pdparams')
label_path = os.path.join(self.directory, 'assets', 'label.txt')
self.label_list = []
with open(label_path, 'r') as f:
for l in f:
self.label_list.append(l.strip())
self.sr = 16000
model_conf = {
'input_size': 80,
'channels': [1024, 1024, 1024, 1024, 3072],
'kernel_sizes': [5, 3, 3, 3, 1],
'dilations': [1, 2, 3, 4, 1],
'attention_channels': 128,
'lin_neurons': 192
}
self.model = Classifier(
backbone=ECAPA_TDNN(**model_conf),
num_class=45,
)
self.model.set_state_dict(paddle.load(ckpt_path))
self.model.eval()
def load_audio(self, wav):
wav = os.path.abspath(os.path.expanduser(wav))
assert os.path.isfile(wav), 'Please check wav file: {}'.format(wav)
waveform, _ = paddleaudio.load(wav, sr=self.sr, mono=True, normal=False)
return waveform
def language_identify(self, wav):
waveform = self.load_audio(wav)
logits = self(paddle.to_tensor(waveform)).reshape([-1])
idx = paddle.argmax(logits)
return logits[idx].numpy(), self.label_list[idx]
def forward(self, x):
if len(x.shape) == 1:
x = x.unsqueeze(0)
fbank = compute_log_fbank(x) # x: waveform tensors with (B, T) shape
norm_fbank = normalize(fbank)
logits = self.model(norm_fbank).squeeze(1)
return logits
# ecapa_tdnn_voxceleb
|模型名称|ecapa_tdnn_voxceleb|
| :--- | :---: |
|类别|语音-声纹识别|
|网络|ECAPA-TDNN|
|数据集|VoxCeleb|
|是否支持Fine-tuning|否|
|模型大小|79MB|
|最新更新日期|2021-12-30|
|数据指标|EER 0.69%|
## 一、模型基本信息
### 模型介绍
ecapa_tdnn_voxceleb采用了[ECAPA-TDNN](https://arxiv.org/abs/2005.07143)的模型结构,并在[VoxCeleb](http://www.robots.ox.ac.uk/~vgg/data/voxceleb/)数据集上进行了预训练,在VoxCeleb1的声纹识别测试集([veri_test.txt](https://www.robots.ox.ac.uk/~vgg/data/voxceleb/meta/veri_test.txt))上的测试结果为 EER 0.69%,达到了该数据集的SOTA。
<p align="center">
<img src="https://d3i71xaburhd42.cloudfront.net/9609f4817a7e769f5e3e07084db35e46696e82cd/3-Figure2-1.png" hspace='10' height="550"/> <br />
</p>
更多详情请参考
- [VoxCeleb: a large-scale speaker identification dataset](https://www.robots.ox.ac.uk/~vgg/publications/2017/Nagrani17/nagrani17.pdf)
- [ECAPA-TDNN: Emphasized Channel Attention, Propagation and Aggregation in TDNN Based Speaker Verification](https://arxiv.org/pdf/2005.07143.pdf)
- [The SpeechBrain Toolkit](https://github.com/speechbrain/speechbrain)
## 二、安装
- ### 1、环境依赖
- paddlepaddle >= 2.2.0
- paddlehub >= 2.2.0 | [如何安装PaddleHub](../../../../docs/docs_ch/get_start/installation.rst)
- ### 2、安装
- ```shell
$ hub install ecapa_tdnn_voxceleb
```
- 如您安装时遇到问题,可参考:[零基础windows安装](../../../../docs/docs_ch/get_start/windows_quickstart.md)
| [零基础Linux安装](../../../../docs/docs_ch/get_start/linux_quickstart.md) | [零基础MacOS安装](../../../../docs/docs_ch/get_start/mac_quickstart.md)
## 三、模型API预测
- ### 1、预测代码示例
```python
import paddlehub as hub
model = hub.Module(
name='ecapa_tdnn_voxceleb',
threshold=0.25,
version='1.0.0')
# 通过下列链接可下载示例音频
# https://paddlehub.bj.bcebos.com/paddlehub_dev/sv1.wav
# https://paddlehub.bj.bcebos.com/paddlehub_dev/sv2.wav
# Speaker Embedding
embedding = model.speaker_embedding('sv1.wav')
print(embedding.shape)
# (192,)
# Speaker Verification
score, pred = model.speaker_verify('sv1.wav', 'sv2.wav')
print(score, pred)
# [0.16354457], [False]
```
- ### 2、API
- ```python
def __init__(
threshold: float,
)
```
- 初始化声纹模型,确定判别阈值。
- **参数**
- `threshold`:设定模型判别声纹相似度的得分阈值,默认为 0.25。
- ```python
def speaker_embedding(
wav: os.PathLike,
)
```
- 获取输入音频的声纹特征
- **参数**
- `wav`:输入的说话人的音频文件,格式为`*.wav`。
- **返回**
- 输出纬度为 (192,) 的声纹特征向量。
- ```python
def speaker_verify(
wav1: os.PathLike,
wav2: os.PathLike,
)
```
- 对比两段音频,分别计算其声纹特征的相似度得分,并判断是否为同一说话人。
- **参数**
- `wav1`:输入的说话人1的音频文件,格式为`*.wav`。
- `wav2`:输入的说话人2的音频文件,格式为`*.wav`。
- **返回**
- 返回声纹相似度得分[-1, 1]和预测结果。
## 四、更新历史
* 1.0.0
初始发布
```shell
$ hub install ecapa_tdnn_voxceleb
```
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import os
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
def length_to_mask(length, max_len=None, dtype=None):
assert len(length.shape) == 1
if max_len is None:
max_len = length.max().astype('int').item() # using arange to generate mask
mask = paddle.arange(max_len, dtype=length.dtype).expand((len(length), max_len)) < length.unsqueeze(1)
if dtype is None:
dtype = length.dtype
mask = paddle.to_tensor(mask, dtype=dtype)
return mask
class Conv1d(nn.Layer):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
stride=1,
padding="same",
dilation=1,
groups=1,
bias=True,
padding_mode="reflect",
):
super(Conv1d, self).__init__()
self.kernel_size = kernel_size
self.stride = stride
self.dilation = dilation
self.padding = padding
self.padding_mode = padding_mode
self.conv = nn.Conv1D(
in_channels,
out_channels,
self.kernel_size,
stride=self.stride,
padding=0,
dilation=self.dilation,
groups=groups,
bias_attr=bias,
)
def forward(self, x):
if self.padding == "same":
x = self._manage_padding(x, self.kernel_size, self.dilation, self.stride)
else:
raise ValueError("Padding must be 'same'. Got {self.padding}")
return self.conv(x)
def _manage_padding(self, x, kernel_size: int, dilation: int, stride: int):
L_in = x.shape[-1] # Detecting input shape
padding = self._get_padding_elem(L_in, stride, kernel_size, dilation) # Time padding
x = F.pad(x, padding, mode=self.padding_mode, data_format="NCL") # Applying padding
return x
def _get_padding_elem(self, L_in: int, stride: int, kernel_size: int, dilation: int):
if stride > 1:
n_steps = math.ceil(((L_in - kernel_size * dilation) / stride) + 1)
L_out = stride * (n_steps - 1) + kernel_size * dilation
padding = [kernel_size // 2, kernel_size // 2]
else:
L_out = (L_in - dilation * (kernel_size - 1) - 1) // stride + 1
padding = [(L_in - L_out) // 2, (L_in - L_out) // 2]
return padding
class BatchNorm1d(nn.Layer):
def __init__(
self,
input_size,
eps=1e-05,
momentum=0.9,
weight_attr=None,
bias_attr=None,
data_format='NCL',
use_global_stats=None,
):
super(BatchNorm1d, self).__init__()
self.norm = nn.BatchNorm1D(
input_size,
epsilon=eps,
momentum=momentum,
weight_attr=weight_attr,
bias_attr=bias_attr,
data_format=data_format,
use_global_stats=use_global_stats,
)
def forward(self, x):
x_n = self.norm(x)
return x_n
class TDNNBlock(nn.Layer):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
dilation,
activation=nn.ReLU,
):
super(TDNNBlock, self).__init__()
self.conv = Conv1d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
dilation=dilation,
)
self.activation = activation()
self.norm = BatchNorm1d(input_size=out_channels)
def forward(self, x):
return self.norm(self.activation(self.conv(x)))
class Res2NetBlock(nn.Layer):
def __init__(self, in_channels, out_channels, scale=8, dilation=1):
super(Res2NetBlock, self).__init__()
assert in_channels % scale == 0
assert out_channels % scale == 0
in_channel = in_channels // scale
hidden_channel = out_channels // scale
self.blocks = nn.LayerList(
[TDNNBlock(in_channel, hidden_channel, kernel_size=3, dilation=dilation) for i in range(scale - 1)])
self.scale = scale
def forward(self, x):
y = []
for i, x_i in enumerate(paddle.chunk(x, self.scale, axis=1)):
if i == 0:
y_i = x_i
elif i == 1:
y_i = self.blocks[i - 1](x_i)
else:
y_i = self.blocks[i - 1](x_i + y_i)
y.append(y_i)
y = paddle.concat(y, axis=1)
return y
class SEBlock(nn.Layer):
def __init__(self, in_channels, se_channels, out_channels):
super(SEBlock, self).__init__()
self.conv1 = Conv1d(in_channels=in_channels, out_channels=se_channels, kernel_size=1)
self.relu = paddle.nn.ReLU()
self.conv2 = Conv1d(in_channels=se_channels, out_channels=out_channels, kernel_size=1)
self.sigmoid = paddle.nn.Sigmoid()
def forward(self, x, lengths=None):
L = x.shape[-1]
if lengths is not None:
mask = length_to_mask(lengths * L, max_len=L)
mask = mask.unsqueeze(1)
total = mask.sum(axis=2, keepdim=True)
s = (x * mask).sum(axis=2, keepdim=True) / total
else:
s = x.mean(axis=2, keepdim=True)
s = self.relu(self.conv1(s))
s = self.sigmoid(self.conv2(s))
return s * x
class AttentiveStatisticsPooling(nn.Layer):
def __init__(self, channels, attention_channels=128, global_context=True):
super().__init__()
self.eps = 1e-12
self.global_context = global_context
if global_context:
self.tdnn = TDNNBlock(channels * 3, attention_channels, 1, 1)
else:
self.tdnn = TDNNBlock(channels, attention_channels, 1, 1)
self.tanh = nn.Tanh()
self.conv = Conv1d(in_channels=attention_channels, out_channels=channels, kernel_size=1)
def forward(self, x, lengths=None):
C, L = x.shape[1], x.shape[2] # KP: (N, C, L)
def _compute_statistics(x, m, axis=2, eps=self.eps):
mean = (m * x).sum(axis)
std = paddle.sqrt((m * (x - mean.unsqueeze(axis)).pow(2)).sum(axis).clip(eps))
return mean, std
if lengths is None:
lengths = paddle.ones([x.shape[0]])
# Make binary mask of shape [N, 1, L]
mask = length_to_mask(lengths * L, max_len=L)
mask = mask.unsqueeze(1)
# Expand the temporal context of the pooling layer by allowing the
# self-attention to look at global properties of the utterance.
if self.global_context:
total = mask.sum(axis=2, keepdim=True).astype('float32')
mean, std = _compute_statistics(x, mask / total)
mean = mean.unsqueeze(2).tile((1, 1, L))
std = std.unsqueeze(2).tile((1, 1, L))
attn = paddle.concat([x, mean, std], axis=1)
else:
attn = x
# Apply layers
attn = self.conv(self.tanh(self.tdnn(attn)))
# Filter out zero-paddings
attn = paddle.where(mask.tile((1, C, 1)) == 0, paddle.ones_like(attn) * float("-inf"), attn)
attn = F.softmax(attn, axis=2)
mean, std = _compute_statistics(x, attn)
# Append mean and std of the batch
pooled_stats = paddle.concat((mean, std), axis=1)
pooled_stats = pooled_stats.unsqueeze(2)
return pooled_stats
class SERes2NetBlock(nn.Layer):
def __init__(
self,
in_channels,
out_channels,
res2net_scale=8,
se_channels=128,
kernel_size=1,
dilation=1,
activation=nn.ReLU,
):
super(SERes2NetBlock, self).__init__()
self.out_channels = out_channels
self.tdnn1 = TDNNBlock(
in_channels,
out_channels,
kernel_size=1,
dilation=1,
activation=activation,
)
self.res2net_block = Res2NetBlock(out_channels, out_channels, res2net_scale, dilation)
self.tdnn2 = TDNNBlock(
out_channels,
out_channels,
kernel_size=1,
dilation=1,
activation=activation,
)
self.se_block = SEBlock(out_channels, se_channels, out_channels)
self.shortcut = None
if in_channels != out_channels:
self.shortcut = Conv1d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
)
def forward(self, x, lengths=None):
residual = x
if self.shortcut:
residual = self.shortcut(x)
x = self.tdnn1(x)
x = self.res2net_block(x)
x = self.tdnn2(x)
x = self.se_block(x, lengths)
return x + residual
class ECAPA_TDNN(nn.Layer):
def __init__(
self,
input_size,
lin_neurons=192,
activation=nn.ReLU,
channels=[512, 512, 512, 512, 1536],
kernel_sizes=[5, 3, 3, 3, 1],
dilations=[1, 2, 3, 4, 1],
attention_channels=128,
res2net_scale=8,
se_channels=128,
global_context=True,
):
super(ECAPA_TDNN, self).__init__()
assert len(channels) == len(kernel_sizes)
assert len(channels) == len(dilations)
self.channels = channels
self.blocks = nn.LayerList()
self.emb_size = lin_neurons
# The initial TDNN layer
self.blocks.append(TDNNBlock(
input_size,
channels[0],
kernel_sizes[0],
dilations[0],
activation,
))
# SE-Res2Net layers
for i in range(1, len(channels) - 1):
self.blocks.append(
SERes2NetBlock(
channels[i - 1],
channels[i],
res2net_scale=res2net_scale,
se_channels=se_channels,
kernel_size=kernel_sizes[i],
dilation=dilations[i],
activation=activation,
))
# Multi-layer feature aggregation
self.mfa = TDNNBlock(
channels[-1],
channels[-1],
kernel_sizes[-1],
dilations[-1],
activation,
)
# Attentive Statistical Pooling
self.asp = AttentiveStatisticsPooling(
channels[-1],
attention_channels=attention_channels,
global_context=global_context,
)
self.asp_bn = BatchNorm1d(input_size=channels[-1] * 2)
# Final linear transformation
self.fc = Conv1d(
in_channels=channels[-1] * 2,
out_channels=self.emb_size,
kernel_size=1,
)
def forward(self, x, lengths=None):
xl = []
for layer in self.blocks:
try:
x = layer(x, lengths=lengths)
except TypeError:
x = layer(x)
xl.append(x)
# Multi-layer feature aggregation
x = paddle.concat(xl[1:], axis=1)
x = self.mfa(x)
# Attentive Statistical Pooling
x = self.asp(x, lengths=lengths)
x = self.asp_bn(x)
# Final linear transformation
x = self.fc(x)
return x
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddleaudio
from paddleaudio.features.spectrum import hz_to_mel
from paddleaudio.features.spectrum import mel_to_hz
from paddleaudio.features.spectrum import power_to_db
from paddleaudio.features.spectrum import Spectrogram
from paddleaudio.features.window import get_window
def compute_fbank_matrix(sample_rate: int = 16000,
n_fft: int = 400,
n_mels: int = 80,
f_min: int = 0.0,
f_max: int = 8000.0):
mel = paddle.linspace(hz_to_mel(f_min, htk=True), hz_to_mel(f_max, htk=True), n_mels + 2, dtype=paddle.float32)
hz = mel_to_hz(mel, htk=True)
band = hz[1:] - hz[:-1]
band = band[:-1]
f_central = hz[1:-1]
n_stft = n_fft // 2 + 1
all_freqs = paddle.linspace(0, sample_rate // 2, n_stft)
all_freqs_mat = all_freqs.tile([f_central.shape[0], 1])
f_central_mat = f_central.tile([all_freqs_mat.shape[1], 1]).transpose([1, 0])
band_mat = band.tile([all_freqs_mat.shape[1], 1]).transpose([1, 0])
slope = (all_freqs_mat - f_central_mat) / band_mat
left_side = slope + 1.0
right_side = -slope + 1.0
fbank_matrix = paddle.maximum(paddle.zeros_like(left_side), paddle.minimum(left_side, right_side))
return fbank_matrix
def compute_log_fbank(
x: paddle.Tensor,
sample_rate: int = 16000,
n_fft: int = 400,
hop_length: int = 160,
win_length: int = 400,
n_mels: int = 80,
window: str = 'hamming',
center: bool = True,
pad_mode: str = 'constant',
f_min: float = 0.0,
f_max: float = None,
top_db: float = 80.0,
):
if f_max is None:
f_max = sample_rate / 2
spect = Spectrogram(
n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window, center=center, pad_mode=pad_mode)(x)
fbank_matrix = compute_fbank_matrix(
sample_rate=sample_rate,
n_fft=n_fft,
n_mels=n_mels,
f_min=f_min,
f_max=f_max,
)
fbank = paddle.matmul(fbank_matrix, spect)
log_fbank = power_to_db(fbank, top_db=top_db).transpose([0, 2, 1])
return log_fbank
def compute_stats(x: paddle.Tensor, mean_norm: bool = True, std_norm: bool = False, eps: float = 1e-10):
if mean_norm:
current_mean = paddle.mean(x, axis=0)
else:
current_mean = paddle.to_tensor([0.0])
if std_norm:
current_std = paddle.std(x, axis=0)
else:
current_std = paddle.to_tensor([1.0])
current_std = paddle.maximum(current_std, eps * paddle.ones_like(current_std))
return current_mean, current_std
def normalize(
x: paddle.Tensor,
global_mean: paddle.Tensor = None,
global_std: paddle.Tensor = None,
):
for i in range(x.shape[0]): # (B, ...)
if global_mean is None and global_std is None:
mean, std = compute_stats(x[i])
x[i] = (x[i] - mean) / std
else:
x[i] = (x[i] - global_mean) / global_std
return x
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
from typing import List
from typing import Union
import numpy as np
import paddle
import paddleaudio
from .ecapa_tdnn import ECAPA_TDNN
from .feature import compute_log_fbank
from .feature import normalize
from paddlehub.module.module import moduleinfo
from paddlehub.utils.log import logger
@moduleinfo(
name="ecapa_tdnn_voxceleb",
version="1.0.0",
summary="",
author="paddlepaddle",
author_email="",
type="audio/speaker_recognition")
class SpeakerRecognition(paddle.nn.Layer):
def __init__(self, threshold=0.25):
super(SpeakerRecognition, self).__init__()
global_stats_path = os.path.join(self.directory, 'assets', 'global_embedding_stats.npy')
ckpt_path = os.path.join(self.directory, 'assets', 'model.pdparams')
self.sr = 16000
self.threshold = threshold
model_conf = {
'input_size': 80,
'channels': [1024, 1024, 1024, 1024, 3072],
'kernel_sizes': [5, 3, 3, 3, 1],
'dilations': [1, 2, 3, 4, 1],
'attention_channels': 128,
'lin_neurons': 192
}
self.model = ECAPA_TDNN(**model_conf)
self.model.set_state_dict(paddle.load(ckpt_path))
self.model.eval()
global_embedding_stats = np.load(global_stats_path, allow_pickle=True)
self.global_emb_mean = paddle.to_tensor(global_embedding_stats.item().get('global_emb_mean'))
self.global_emb_std = paddle.to_tensor(global_embedding_stats.item().get('global_emb_std'))
self.similarity = paddle.nn.CosineSimilarity(axis=-1, eps=1e-6)
def load_audio(self, wav):
wav = os.path.abspath(os.path.expanduser(wav))
assert os.path.isfile(wav), 'Please check wav file: {}'.format(wav)
waveform, _ = paddleaudio.load(wav, sr=self.sr, mono=True, normal=False)
return waveform
def speaker_embedding(self, wav):
waveform = self.load_audio(wav)
embedding = self(paddle.to_tensor(waveform)).reshape([-1])
return embedding.numpy()
def speaker_verify(self, wav1, wav2):
waveform1 = self.load_audio(wav1)
embedding1 = self(paddle.to_tensor(waveform1)).reshape([-1])
waveform2 = self.load_audio(wav2)
embedding2 = self(paddle.to_tensor(waveform2)).reshape([-1])
score = self.similarity(embedding1, embedding2).numpy()
return score, score > self.threshold
def forward(self, x):
if len(x.shape) == 1:
x = x.unsqueeze(0)
fbank = compute_log_fbank(x) # x: waveform tensors with (B, T) shape
norm_fbank = normalize(fbank)
embedding = self.model(norm_fbank.transpose([0, 2, 1])).transpose([0, 2, 1])
norm_embedding = normalize(x=embedding, global_mean=self.global_emb_mean, global_std=self.global_emb_std)
return norm_embedding
# styleganv2_mixing
|模型名称|styleganv2_mixing|
| :--- | :---: |
|类别|图像 - 图像生成|
|网络|StyleGAN V2|
|数据集|-|
|是否支持Fine-tuning|否|
|模型大小|190MB|
|最新更新日期|2021-12-23|
|数据指标|-|
## 一、模型基本信息
- ### 应用效果展示
- 样例结果示例:
<p align="center">
<img src="https://user-images.githubusercontent.com/22424850/147241001-3babb1bd-98d4-4a9c-a61d-2298fca041e1.jpg" width = "40%" hspace='10'/>
<br />
输入图像1
<br />
<img src="https://user-images.githubusercontent.com/22424850/147241006-0bc2cda8-d271-4cfd-8a0d-e6feea7bf167.jpg" width = "40%" hspace='10'/>
<br />
输入图像2
<br />
<img src="https://user-images.githubusercontent.com/22424850/147241020-f4420729-c489-4661-b43f-c929c62c0ce7.png" width = "40%" hspace='10'/>
<br />
输出图像
<br />
</p>
- ### 模型介绍
- StyleGAN V2 的任务是使用风格向量进行image generation,而Mixing模块则是利用其风格向量实现两张生成图像不同层次不同比例的混合。
## 二、安装
- ### 1、环境依赖
- paddlepaddle >= 2.1.0
- paddlehub >= 2.1.0 | [如何安装PaddleHub](../../../../docs/docs_ch/get_start/installation.rst)
- ### 2、安装
- ```shell
$ hub install styleganv2_mixing
```
- 如您安装时遇到问题,可参考:[零基础windows安装](../../../../docs/docs_ch/get_start/windows_quickstart.md)
| [零基础Linux安装](../../../../docs/docs_ch/get_start/linux_quickstart.md) | [零基础MacOS安装](../../../../docs/docs_ch/get_start/mac_quickstart.md)
## 三、模型API预测
- ### 1、命令行预测
- ```shell
# Read from a file
$ hub run styleganv2_mixing --image1 "/PATH/TO/IMAGE1" --image2 "/PATH/TO/IMAGE2"
```
- 通过命令行方式实现人脸融合模型的调用,更多请见 [PaddleHub命令行指令](../../../../docs/docs_ch/tutorial/cmd_usage.rst)
- ### 2、预测代码示例
- ```python
import paddlehub as hub
module = hub.Module(name="styleganv2_mixing")
input_path = ["/PATH/TO/IMAGE"]
# Read from a file
module.generate(paths=input_path, direction_name = 'age', direction_offset = 5, output_dir='./editing_result/', use_gpu=True)
```
- ### 3、API
- ```python
generate(self, images=None, paths=None, weights = [0.5] * 18, output_dir='./mixing_result/', use_gpu=False, visualization=True)
```
- 人脸融合生成API。
- **参数**
- images (list[dict]): data of images, 每一个元素都为一个 dict,有关键字 image1, image2, 相应取值为:
- image1 (numpy.ndarray): 待融合的图片1,shape 为 \[H, W, C\],BGR格式;<br/>
- image2 (numpy.ndarray) : 待融合的图片2,shape为 \[H, W, C\],BGR格式;<br/>
- paths (list[str]): paths to images, 每一个元素都为一个dict, 有关键字 image1, image2, 相应取值为:
- image1 (str): 待融合的图片1的路径;<br/>
- image2 (str) : 待融合的图片2的路径;<br/>
- weights (list(float)): 融合的权重
- images (list\[numpy.ndarray\]): 图片数据 <br/>
- paths (list\[str\]): 图片路径;<br/>
- output\_dir (str): 结果保存的路径; <br/>
- use\_gpu (bool): 是否使用 GPU;<br/>
- visualization(bool): 是否保存结果到本地文件夹
## 四、服务部署
- PaddleHub Serving可以部署一个在线人脸融合服务。
- ### 第一步:启动PaddleHub Serving
- 运行启动命令:
- ```shell
$ hub serving start -m styleganv2_mixing
```
- 这样就完成了一个人脸融合的在线服务API的部署,默认端口号为8866。
- **NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA\_VISIBLE\_DEVICES环境变量,否则不用设置。
- ### 第二步:发送预测请求
- 配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
- ```python
import requests
import json
import cv2
import base64
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
# 发送HTTP请求
data = {'images':[{'image1': cv2_to_base64(cv2.imread("/PATH/TO/IMAGE1")),'image2': cv2_to_base64(cv2.imread("/PATH/TO/IMAGE2"))}]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/styleganv2_mixing"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
# 打印预测结果
print(r.json()["results"])
## 五、更新历史
* 1.0.0
初始发布
- ```shell
$ hub install styleganv2_mixing==1.0.0
```
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import numpy as np
import paddle
from ppgan.models.generators import StyleGANv2Generator
from ppgan.utils.download import get_path_from_url
from ppgan.utils.visual import make_grid, tensor2img, save_image
model_cfgs = {
'ffhq-config-f': {
'model_urls': 'https://paddlegan.bj.bcebos.com/models/stylegan2-ffhq-config-f.pdparams',
'size': 1024,
'style_dim': 512,
'n_mlp': 8,
'channel_multiplier': 2
},
'animeface-512': {
'model_urls': 'https://paddlegan.bj.bcebos.com/models/stylegan2-animeface-512.pdparams',
'size': 512,
'style_dim': 512,
'n_mlp': 8,
'channel_multiplier': 2
}
}
@paddle.no_grad()
def get_mean_style(generator):
mean_style = None
for i in range(10):
style = generator.mean_latent(1024)
if mean_style is None:
mean_style = style
else:
mean_style += style
mean_style /= 10
return mean_style
@paddle.no_grad()
def sample(generator, mean_style, n_sample):
image = generator(
[paddle.randn([n_sample, generator.style_dim])],
truncation=0.7,
truncation_latent=mean_style,
)[0]
return image
@paddle.no_grad()
def style_mixing(generator, mean_style, n_source, n_target):
source_code = paddle.randn([n_source, generator.style_dim])
target_code = paddle.randn([n_target, generator.style_dim])
resolution = 2**((generator.n_latent + 2) // 2)
images = [paddle.ones([1, 3, resolution, resolution]) * -1]
source_image = generator([source_code], truncation_latent=mean_style, truncation=0.7)[0]
target_image = generator([target_code], truncation_latent=mean_style, truncation=0.7)[0]
images.append(source_image)
for i in range(n_target):
image = generator(
[target_code[i].unsqueeze(0).tile([n_source, 1]), source_code],
truncation_latent=mean_style,
truncation=0.7,
)[0]
images.append(target_image[i].unsqueeze(0))
images.append(image)
images = paddle.concat(images, 0)
return images
class StyleGANv2Predictor:
def __init__(self,
output_path='output_dir',
weight_path=None,
model_type=None,
seed=None,
size=1024,
style_dim=512,
n_mlp=8,
channel_multiplier=2):
self.output_path = output_path
if weight_path is None:
if model_type in model_cfgs.keys():
weight_path = get_path_from_url(model_cfgs[model_type]['model_urls'])
size = model_cfgs[model_type].get('size', size)
style_dim = model_cfgs[model_type].get('style_dim', style_dim)
n_mlp = model_cfgs[model_type].get('n_mlp', n_mlp)
channel_multiplier = model_cfgs[model_type].get('channel_multiplier', channel_multiplier)
checkpoint = paddle.load(weight_path)
else:
raise ValueError('Predictor need a weight path or a pretrained model type')
else:
checkpoint = paddle.load(weight_path)
self.generator = StyleGANv2Generator(size, style_dim, n_mlp, channel_multiplier)
self.generator.set_state_dict(checkpoint)
self.generator.eval()
if seed is not None:
paddle.seed(seed)
random.seed(seed)
np.random.seed(seed)
def run(self, n_row=3, n_col=5):
os.makedirs(self.output_path, exist_ok=True)
mean_style = get_mean_style(self.generator)
img = sample(self.generator, mean_style, n_row * n_col)
save_image(tensor2img(make_grid(img, nrow=n_col)), f'{self.output_path}/sample.png')
for j in range(2):
img = style_mixing(self.generator, mean_style, n_col, n_row)
save_image(tensor2img(make_grid(img, nrow=n_col + 1)), f'{self.output_path}/sample_mixing_{j}.png')
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import cv2
import numpy as np
import paddle
from .basemodel import StyleGANv2Predictor
def make_image(tensor):
return (((tensor.detach() + 1) / 2 * 255).clip(min=0, max=255).transpose((0, 2, 3, 1)).numpy().astype('uint8'))
class StyleGANv2MixingPredictor(StyleGANv2Predictor):
@paddle.no_grad()
def run(self, latent1, latent2, weights=[0.5] * 18):
latent1 = paddle.to_tensor(latent1).unsqueeze(0)
latent2 = paddle.to_tensor(latent2).unsqueeze(0)
assert latent1.shape[1] == latent2.shape[1] == len(
weights), 'latents and their weights should have the same level nums.'
mix_latent = []
for i, weight in enumerate(weights):
mix_latent.append(latent1[:, i:i + 1] * weight + latent2[:, i:i + 1] * (1 - weight))
mix_latent = paddle.concat(mix_latent, 1)
latent_n = paddle.concat([latent1, latent2, mix_latent], 0)
generator = self.generator
img_gen, _ = generator([latent_n], input_is_latent=True, randomize_noise=False)
imgs = make_image(img_gen)
src_img1 = imgs[0]
src_img2 = imgs[1]
dst_img = imgs[2]
return src_img1, src_img2, dst_img
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import argparse
import copy
import paddle
import paddlehub as hub
from paddlehub.module.module import moduleinfo, runnable, serving
import numpy as np
import cv2
from skimage.io import imread
from skimage.transform import rescale, resize
from .model import StyleGANv2MixingPredictor
from .util import base64_to_cv2
@moduleinfo(
name="styleganv2_mixing",
type="CV/style_transfer",
author="paddlepaddle",
author_email="",
summary="",
version="1.0.0")
class styleganv2_mixing:
def __init__(self):
self.pretrained_model = os.path.join(self.directory, "stylegan2-ffhq-config-f.pdparams")
self.network = StyleGANv2MixingPredictor(weight_path=self.pretrained_model, model_type='ffhq-config-f')
self.pixel2style2pixel_module = hub.Module(name='pixel2style2pixel')
def generate(self,
images=None,
paths=None,
weights=[0.5] * 18,
output_dir='./mixing_result/',
use_gpu=False,
visualization=True):
'''
images (list[dict]): data of images, each element is a dict,the keys are as below:
- image1 (numpy.ndarray): image1 to be mixed,shape is \[H, W, C\],BGR format;<br/>
- image2 (numpy.ndarray) : image2 to be mixed,shape is \[H, W, C\],BGR format;<br/>
paths (list[str]): paths to images, each element is a dict,the keys are as below:
- image1 (str): path to image1;<br/>
- image2 (str) : path to image2;<br/>
weights (list(float)): weight for mixing
output_dir: the dir to save the results
use_gpu: if True, use gpu to perform the computation, otherwise cpu.
visualization: if True, save results in output_dir.
'''
results = []
paddle.disable_static()
place = 'gpu:0' if use_gpu else 'cpu'
place = paddle.set_device(place)
if images == None and paths == None:
print('No image provided. Please input an image or a image path.')
return
if images != None:
for image_dict in images:
image1 = image_dict['image1'][:, :, ::-1]
image2 = image_dict['image2'][:, :, ::-1]
_, latent1 = self.pixel2style2pixel_module.network.run(image1)
_, latent2 = self.pixel2style2pixel_module.network.run(image2)
results.append(self.network.run(latent1, latent2, weights))
if paths != None:
for path_dict in paths:
path1 = path_dict['image1']
path2 = path_dict['image2']
image1 = cv2.imread(path1)[:, :, ::-1]
image2 = cv2.imread(path2)[:, :, ::-1]
_, latent1 = self.pixel2style2pixel_module.network.run(image1)
_, latent2 = self.pixel2style2pixel_module.network.run(image2)
results.append(self.network.run(latent1, latent2, weights))
if visualization == True:
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
for i, out in enumerate(results):
if out is not None:
cv2.imwrite(os.path.join(output_dir, 'src_{}_image1.png'.format(i)), out[0][:, :, ::-1])
cv2.imwrite(os.path.join(output_dir, 'src_{}_image2.png'.format(i)), out[1][:, :, ::-1])
cv2.imwrite(os.path.join(output_dir, 'dst_{}.png'.format(i)), out[2][:, :, ::-1])
return results
@runnable
def run_cmd(self, argvs: list):
"""
Run as a command.
"""
self.parser = argparse.ArgumentParser(
description="Run the {} module.".format(self.name),
prog='hub run {}'.format(self.name),
usage='%(prog)s',
add_help=True)
self.arg_input_group = self.parser.add_argument_group(title="Input options", description="Input data. Required")
self.arg_config_group = self.parser.add_argument_group(
title="Config options", description="Run configuration for controlling module behavior, not required.")
self.add_module_config_arg()
self.add_module_input_arg()
self.args = self.parser.parse_args(argvs)
results = self.generate(
paths=[{
'image1': self.args.image1,
'image2': self.args.image2
}],
weights=self.args.weights,
output_dir=self.args.output_dir,
use_gpu=self.args.use_gpu,
visualization=self.args.visualization)
return results
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = copy.deepcopy(images)
for image in images_decode:
image['image1'] = base64_to_cv2(image['image1'])
image['image2'] = base64_to_cv2(image['image2'])
results = self.generate(images_decode, **kwargs)
tolist = [result.tolist() for result in results]
return tolist
def add_module_config_arg(self):
"""
Add the command config options.
"""
self.arg_config_group.add_argument('--use_gpu', action='store_true', help="use GPU or not")
self.arg_config_group.add_argument(
'--output_dir', type=str, default='mixing_result', help='output directory for saving result.')
self.arg_config_group.add_argument('--visualization', type=bool, default=False, help='save results or not.')
def add_module_input_arg(self):
"""
Add the command input options.
"""
self.arg_input_group.add_argument('--image1', type=str, help="path to input image1.")
self.arg_input_group.add_argument('--image2', type=str, help="path to input image2.")
self.arg_input_group.add_argument(
"--weights",
type=float,
nargs="+",
default=[0.5] * 18,
help="different weights at each level of two latent codes")
import base64
import cv2
import numpy as np
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册