未验证 提交 7a28aaad 编写于 作者: K KP 提交者: GitHub

Add voice cloning module: lstm_tacotron2

上级 933284b0
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pathlib import Path
from warnings import warn
import struct
from scipy.ndimage.morphology import binary_dilation
import numpy as np
import librosa
try:
import webrtcvad
except ModuleNotFoundError:
warn("Unable to import 'webrtcvad'." "This package enables noise removal and is recommended.")
webrtcvad = None
INT16_MAX = (2**15) - 1
def normalize_volume(wav, target_dBFS, increase_only=False, decrease_only=False):
# this function implements Loudness normalization, instead of peak
# normalization, See https://en.wikipedia.org/wiki/Audio_normalization
# dBFS: Decibels relative to full scale
# See https://en.wikipedia.org/wiki/DBFS for more details
# for 16Bit PCM audio, minimal level is -96dB
# compute the mean dBFS and adjust to target dBFS, with by increasing
# or decreasing
if increase_only and decrease_only:
raise ValueError("Both increase only and decrease only are set")
dBFS_change = target_dBFS - 10 * np.log10(np.mean(wav**2))
if ((dBFS_change < 0 and increase_only) or (dBFS_change > 0 and decrease_only)):
return wav
gain = 10**(dBFS_change / 20)
return wav * gain
def trim_long_silences(wav, vad_window_length: int, vad_moving_average_width: int, vad_max_silence_length: int,
sampling_rate: int):
"""
Ensures that segments without voice in the waveform remain no longer than a
threshold determined by the VAD parameters in params.py.
:param wav: the raw waveform as a numpy array of floats
:return: the same waveform with silences trimmed away (length <= original wav length)
"""
# Compute the voice detection window size
samples_per_window = (vad_window_length * sampling_rate) // 1000
# Trim the end of the audio to have a multiple of the window size
wav = wav[:len(wav) - (len(wav) % samples_per_window)]
# Convert the float waveform to 16-bit mono PCM
pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * INT16_MAX)).astype(np.int16))
# Perform voice activation detection
voice_flags = []
vad = webrtcvad.Vad(mode=3)
for window_start in range(0, len(wav), samples_per_window):
window_end = window_start + samples_per_window
voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2], sample_rate=sampling_rate))
voice_flags = np.array(voice_flags)
# Smooth the voice detection with a moving average
def moving_average(array, width):
array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2)))
ret = np.cumsum(array_padded, dtype=float)
ret[width:] = ret[width:] - ret[:-width]
return ret[width - 1:] / width
audio_mask = moving_average(voice_flags, vad_moving_average_width)
audio_mask = np.round(audio_mask).astype(np.bool)
# Dilate the voiced regions
audio_mask = binary_dilation(audio_mask, np.ones(vad_max_silence_length + 1))
audio_mask = np.repeat(audio_mask, samples_per_window)
return wav[audio_mask]
def compute_partial_slices(n_samples: int,
partial_utterance_n_frames: int,
hop_length: int,
min_pad_coverage: float = 0.75,
overlap: float = 0.5):
"""
Computes where to split an utterance waveform and its corresponding mel spectrogram to obtain
partial utterances of <partial_utterance_n_frames> each. Both the waveform and the mel
spectrogram slices are returned, so as to make each partial utterance waveform correspond to
its spectrogram. This function assumes that the mel spectrogram parameters used are those
defined in params_data.py.
The returned ranges may be indexing further than the length of the waveform. It is
recommended that you pad the waveform with zeros up to wave_slices[-1].stop.
:param n_samples: the number of samples in the waveform
:param partial_utterance_n_frames: the number of mel spectrogram frames in each partial
utterance
:param min_pad_coverage: when reaching the last partial utterance, it may or may not have
enough frames. If at least <min_pad_coverage> of <partial_utterance_n_frames> are present,
then the last partial utterance will be considered, as if we padded the audio. Otherwise,
it will be discarded, as if we trimmed the audio. If there aren't enough frames for 1 partial
utterance, this parameter is ignored so that the function always returns at least 1 slice.
:param overlap: by how much the partial utterance should overlap. If set to 0, the partial
utterances are entirely disjoint.
:return: the waveform slices and mel spectrogram slices as lists of array slices. Index
respectively the waveform and the mel spectrogram with these slices to obtain the partial
utterances.
"""
assert 0 <= overlap < 1
assert 0 < min_pad_coverage <= 1
# librosa's function to compute num_frames from num_samples
n_frames = int(np.ceil((n_samples + 1) / hop_length))
# frame shift between ajacent partials
frame_step = max(1, int(np.round(partial_utterance_n_frames * (1 - overlap))))
# Compute the slices
wav_slices, mel_slices = [], []
steps = max(1, n_frames - partial_utterance_n_frames + frame_step + 1)
for i in range(0, steps, frame_step):
mel_range = np.array([i, i + partial_utterance_n_frames])
wav_range = mel_range * hop_length
mel_slices.append(slice(*mel_range))
wav_slices.append(slice(*wav_range))
# Evaluate whether extra padding is warranted or not
last_wav_range = wav_slices[-1]
coverage = (n_samples - last_wav_range.start) / (last_wav_range.stop - last_wav_range.start)
if coverage < min_pad_coverage and len(mel_slices) > 1:
mel_slices = mel_slices[:-1]
wav_slices = wav_slices[:-1]
return wav_slices, mel_slices
class SpeakerVerificationPreprocessor(object):
def __init__(self,
sampling_rate: int,
audio_norm_target_dBFS: float,
vad_window_length,
vad_moving_average_width,
vad_max_silence_length,
mel_window_length,
mel_window_step,
n_mels,
partial_n_frames: int,
min_pad_coverage: float = 0.75,
partial_overlap_ratio: float = 0.5):
self.sampling_rate = sampling_rate
self.audio_norm_target_dBFS = audio_norm_target_dBFS
self.vad_window_length = vad_window_length
self.vad_moving_average_width = vad_moving_average_width
self.vad_max_silence_length = vad_max_silence_length
self.n_fft = int(mel_window_length * sampling_rate / 1000)
self.hop_length = int(mel_window_step * sampling_rate / 1000)
self.n_mels = n_mels
self.partial_n_frames = partial_n_frames
self.min_pad_coverage = min_pad_coverage
self.partial_overlap_ratio = partial_overlap_ratio
def preprocess_wav(self, fpath_or_wav, source_sr=None):
# Load the wav from disk if needed
if isinstance(fpath_or_wav, (str, Path)):
wav, source_sr = librosa.load(str(fpath_or_wav), sr=None)
else:
wav = fpath_or_wav
# Resample if numpy.array is passed and sr does not match
if source_sr is not None and source_sr != self.sampling_rate:
wav = librosa.resample(wav, source_sr, self.sampling_rate)
# loudness normalization
wav = normalize_volume(wav, self.audio_norm_target_dBFS, increase_only=True)
# trim long silence
if webrtcvad:
wav = trim_long_silences(wav, self.vad_window_length, self.vad_moving_average_width,
self.vad_max_silence_length, self.sampling_rate)
return wav
def melspectrogram(self, wav):
mel = librosa.feature.melspectrogram(wav,
sr=self.sampling_rate,
n_fft=self.n_fft,
hop_length=self.hop_length,
n_mels=self.n_mels)
mel = mel.astype(np.float32).T
return mel
def extract_mel_partials(self, wav):
wav_slices, mel_slices = compute_partial_slices(len(wav), self.partial_n_frames, self.hop_length,
self.min_pad_coverage, self.partial_overlap_ratio)
# pad audio if needed
max_wave_length = wav_slices[-1].stop
if max_wave_length >= len(wav):
wav = np.pad(wav, (0, max_wave_length - len(wav)), "constant")
# Split the utterance into partials
frames = self.melspectrogram(wav)
frames_batch = np.array([frames[s] for s in mel_slices])
return frames_batch # [B, T, C]
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Tuple
from pypinyin import lazy_pinyin, Style
from .preprocess_transcription import split_syllable
def convert_to_pinyin(text: str) -> List[str]:
"""convert text into list of syllables, other characters that are not chinese, thus
cannot be converted to pinyin are splited.
"""
syllables = lazy_pinyin(text, style=Style.TONE3, neutral_tone_with_five=True)
return syllables
def convert_sentence(text: str) -> List[Tuple[str]]:
"""convert a sentence into two list: phones and tones"""
syllables = convert_to_pinyin(text)
phones = []
tones = []
for syllable in syllables:
p, t = split_syllable(syllable)
phones.extend(p)
tones.extend(t)
return phones, tones
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import os
from typing import List
import numpy as np
import paddle
import paddle.nn as nn
from paddlehub.env import MODULE_HOME
from paddlehub.module.module import moduleinfo
from paddlehub.utils.log import logger
from paddlenlp.data import Pad
import soundfile as sf
if not importlib.util.find_spec('parakeet'):
raise ImportError('The module requires additional dependencies: "parakeet".\n'
'You can install parakeet via "git clone https://github.com'
'/PaddlePaddle/Parakeet -b release/v0.3 && pip install -e Parakeet"')
from parakeet.models import ConditionalWaveFlow, Tacotron2
from parakeet.models.lstm_speaker_encoder import LSTMSpeakerEncoder
from .audio_processor import SpeakerVerificationPreprocessor
from .chinese_g2p import convert_sentence
from .preprocess_transcription import voc_phones, voc_tones, phone_pad_token, tone_pad_token
@moduleinfo(
name="lstm_tacotron2",
version="1.0.0",
summary="",
author="Baidu",
author_email="",
type="audio/voice_cloning",
)
class VoiceCloner(nn.Layer):
def __init__(self, speaker_audio: str = None, output_dir: str = './'):
super(VoiceCloner, self).__init__()
self.sample_rate = 22050 # Hyper params for the following model ckpts.
speaker_encoder_ckpt = os.path.join(MODULE_HOME, 'lstm_tacotron2', 'assets',
'ge2e_ckpt_0.3/step-3000000.pdparams')
synthesizer_ckpt = os.path.join(MODULE_HOME, 'lstm_tacotron2', 'assets',
'tacotron2_aishell3_ckpt_0.3/step-450000.pdparams')
vocoder_ckpt = os.path.join(MODULE_HOME, 'lstm_tacotron2', 'assets',
'waveflow_ljspeech_ckpt_0.3/step-2000000.pdparams')
# Speaker encoder
self.speaker_processor = SpeakerVerificationPreprocessor(sampling_rate=16000,
audio_norm_target_dBFS=-30,
vad_window_length=30,
vad_moving_average_width=8,
vad_max_silence_length=6,
mel_window_length=25,
mel_window_step=10,
n_mels=40,
partial_n_frames=160,
min_pad_coverage=0.75,
partial_overlap_ratio=0.5)
self.speaker_encoder = LSTMSpeakerEncoder(n_mels=40, num_layers=3, hidden_size=256, output_size=256)
self.speaker_encoder.set_state_dict(paddle.load(speaker_encoder_ckpt))
self.speaker_encoder.eval()
# Voice synthesizer
self.synthesizer = Tacotron2(vocab_size=68,
n_tones=10,
d_mels=80,
d_encoder=512,
encoder_conv_layers=3,
encoder_kernel_size=5,
d_prenet=256,
d_attention_rnn=1024,
d_decoder_rnn=1024,
attention_filters=32,
attention_kernel_size=31,
d_attention=128,
d_postnet=512,
postnet_kernel_size=5,
postnet_conv_layers=5,
reduction_factor=1,
p_encoder_dropout=0.5,
p_prenet_dropout=0.5,
p_attention_dropout=0.1,
p_decoder_dropout=0.1,
p_postnet_dropout=0.5,
d_global_condition=256,
use_stop_token=False)
self.synthesizer.set_state_dict(paddle.load(synthesizer_ckpt))
self.synthesizer.eval()
# Vocoder
self.vocoder = ConditionalWaveFlow(upsample_factors=[16, 16],
n_flows=8,
n_layers=8,
n_group=16,
channels=128,
n_mels=80,
kernel_size=[3, 3])
self.vocoder.set_state_dict(paddle.load(vocoder_ckpt))
self.vocoder.eval()
# Speaking embedding
self._speaker_embedding = None
if speaker_audio is None or not os.path.isfile(speaker_audio):
speaker_audio = os.path.join(MODULE_HOME, 'lstm_tacotron2', 'assets', 'voice_cloning.wav')
logger.warning(f'Due to no speaker audio is specified, speaker encoder will use defult '
f'waveform({speaker_audio}) to extract speaker embedding. You can use '
'"set_speaker_embedding()" method to reset a speaker audio for voice cloning.')
self.set_speaker_embedding(speaker_audio)
self.output_dir = os.path.abspath(output_dir)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
def get_speaker_embedding(self):
return self._speaker_embedding.numpy()
def set_speaker_embedding(self, speaker_audio: str):
assert os.path.exists(speaker_audio), f'Speaker audio file: {speaker_audio} does not exists.'
mel_sequences = self.speaker_processor.extract_mel_partials(
self.speaker_processor.preprocess_wav(speaker_audio))
self._speaker_embedding = self.speaker_encoder.embed_utterance(paddle.to_tensor(mel_sequences))
logger.info(f'Speaker embedding has been set from file: {speaker_audio}')
def forward(self, phones: paddle.Tensor, tones: paddle.Tensor, speaker_embeddings: paddle.Tensor):
outputs = self.synthesizer.infer(phones, tones=tones, global_condition=speaker_embeddings)
mel_input = paddle.transpose(outputs["mel_outputs_postnet"], [0, 2, 1])
waveforms = self.vocoder.infer(mel_input)
return waveforms
def _convert_text_to_input(self, text: str):
"""
Convert input string to phones and tones.
"""
phones, tones = convert_sentence(text)
phones = np.array([voc_phones.lookup(item) for item in phones], dtype=np.int64)
tones = np.array([voc_tones.lookup(item) for item in tones], dtype=np.int64)
return phones, tones
def _batchify(self, data: List[str], batch_size: int):
"""
Generate input batches.
"""
phone_pad_func = Pad(voc_phones.lookup(phone_pad_token))
tone_pad_func = Pad(voc_tones.lookup(tone_pad_token))
def _parse_batch(batch_data):
phones, tones = zip(*batch_data)
speaker_embeddings = paddle.expand(self._speaker_embedding, shape=(len(batch_data), -1))
return phone_pad_func(phones), tone_pad_func(tones), speaker_embeddings
examples = [] # [(phones, tones), ...]
for text in data:
examples.append(self._convert_text_to_input(text))
# Seperates data into some batches.
one_batch = []
for example in examples:
one_batch.append(example)
if len(one_batch) == batch_size:
yield _parse_batch(one_batch)
one_batch = []
if one_batch:
yield _parse_batch(one_batch)
def generate(self, data: List[str], batch_size: int = 1, use_gpu: bool = False):
assert self._speaker_embedding is not None, f'Set speaker embedding before voice cloning.'
paddle.set_device('gpu') if use_gpu else paddle.set_device('cpu')
batches = self._batchify(data, batch_size)
results = []
for batch in batches:
phones, tones, speaker_embeddings = map(paddle.to_tensor, batch)
waveforms = self(phones, tones, speaker_embeddings).numpy()
results.extend(list(waveforms))
files = []
for idx, waveform in enumerate(results):
output_wav = os.path.join(self.output_dir, f'{idx+1}.wav')
sf.write(output_wav, waveform, samplerate=self.sample_rate)
files.append(output_wav)
return files
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from pathlib import Path
import pickle
import re
from parakeet.frontend import Vocab
import tqdm
import yaml
zh_pattern = re.compile("[\u4e00-\u9fa5]")
_tones = {'<pad>', '<s>', '</s>', '0', '1', '2', '3', '4', '5'}
_pauses = {'%', '$'}
_initials = {
'b',
'p',
'm',
'f',
'd',
't',
'n',
'l',
'g',
'k',
'h',
'j',
'q',
'x',
'zh',
'ch',
'sh',
'r',
'z',
'c',
's',
}
_finals = {
'ii',
'iii',
'a',
'o',
'e',
'ea',
'ai',
'ei',
'ao',
'ou',
'an',
'en',
'ang',
'eng',
'er',
'i',
'ia',
'io',
'ie',
'iai',
'iao',
'iou',
'ian',
'ien',
'iang',
'ieng',
'u',
'ua',
'uo',
'uai',
'uei',
'uan',
'uen',
'uang',
'ueng',
'v',
've',
'van',
'ven',
'veng',
}
_ernized_symbol = {'&r'}
_specials = {'<pad>', '<unk>', '<s>', '</s>'}
_phones = _initials | _finals | _ernized_symbol | _specials | _pauses
phone_pad_token = '<pad>'
tone_pad_token = '<pad>'
voc_phones = Vocab(sorted(list(_phones)))
voc_tones = Vocab(sorted(list(_tones)))
def is_zh(word):
global zh_pattern
match = zh_pattern.search(word)
return match is not None
def ernized(syllable):
return syllable[:2] != "er" and syllable[-2] == 'r'
def convert(syllable):
# expansion of o -> uo
syllable = re.sub(r"([bpmf])o$", r"\1uo", syllable)
# syllable = syllable.replace("bo", "buo").replace("po", "puo").replace("mo", "muo").replace("fo", "fuo")
# expansion for iong, ong
syllable = syllable.replace("iong", "veng").replace("ong", "ueng")
# expansion for ing, in
syllable = syllable.replace("ing", "ieng").replace("in", "ien")
# expansion for un, ui, iu
syllable = syllable.replace("un", "uen").replace("ui", "uei").replace("iu", "iou")
# rule for variants of i
syllable = syllable.replace("zi", "zii").replace("ci", "cii").replace("si", "sii")\
.replace("zhi", "zhiii").replace("chi", "chiii").replace("shi", "shiii")\
.replace("ri", "riii")
# rule for y preceding i, u
syllable = syllable.replace("yi", "i").replace("yu", "v").replace("y", "i")
# rule for w
syllable = syllable.replace("wu", "u").replace("w", "u")
# rule for v following j, q, x
syllable = syllable.replace("ju", "jv").replace("qu", "qv").replace("xu", "xv")
return syllable
def split_syllable(syllable: str):
"""Split a syllable in pinyin into a list of phones and a list of tones.
Initials have no tone, represented by '0', while finals have tones from
'1,2,3,4,5'.
e.g.
zhang -> ['zh', 'ang'], ['0', '1']
"""
if syllable in _pauses:
# syllable, tone
return [syllable], ['0']
tone = syllable[-1]
syllable = convert(syllable[:-1])
phones = []
tones = []
global _initials
if syllable[:2] in _initials:
phones.append(syllable[:2])
tones.append('0')
phones.append(syllable[2:])
tones.append(tone)
elif syllable[0] in _initials:
phones.append(syllable[0])
tones.append('0')
phones.append(syllable[1:])
tones.append(tone)
else:
phones.append(syllable)
tones.append(tone)
return phones, tones
def load_aishell3_transcription(line: str):
sentence_id, pinyin, text = line.strip().split("|")
syllables = pinyin.strip().split()
results = []
for syllable in syllables:
if syllable in _pauses:
results.append(syllable)
elif not ernized(syllable):
results.append(syllable)
else:
results.append(syllable[:-2] + syllable[-1])
results.append('&r5')
phones = []
tones = []
for syllable in results:
p, t = split_syllable(syllable)
phones.extend(p)
tones.extend(t)
for p in phones:
assert p in _phones, p
return {"sentence_id": sentence_id, "text": text, "syllables": results, "phones": phones, "tones": tones}
def process_aishell3(dataset_root, output_dir):
dataset_root = Path(dataset_root).expanduser()
output_dir = Path(output_dir).expanduser()
output_dir.mkdir(parents=True, exist_ok=True)
prosody_label_path = dataset_root / "label_train-set.txt"
with open(prosody_label_path, 'rt') as f:
lines = [line.strip() for line in f]
records = lines[5:]
processed_records = []
for record in tqdm.tqdm(records):
new_record = load_aishell3_transcription(record)
processed_records.append(new_record)
print(new_record)
with open(output_dir / "metadata.pickle", 'wb') as f:
pickle.dump(processed_records, f)
with open(output_dir / "metadata.yaml", 'wt', encoding="utf-8") as f:
yaml.safe_dump(processed_records, f, default_flow_style=None, allow_unicode=True)
print("metadata done!")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Preprocess transcription of AiShell3 and save them in a compact file(yaml and pickle).")
parser.add_argument("--input",
type=str,
default="~/datasets/aishell3/train",
help="path of the training dataset,(contains a label_train-set.txt).")
parser.add_argument("--output",
type=str,
help="the directory to save the processed transcription."
"If not provided, it would be the same as the input.")
args = parser.parse_args()
if args.output is None:
args.output = args.input
process_aishell3(args.input, args.output)
librosa
nltk
pypinyin
scipy
soundfile
webrtcvad
yaml
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册