未验证 提交 880829fe 编写于 作者: Q qingen 提交者: GitHub

Merge pull request #1681 from qingen/cluster

[vec][score] add plda model
# Copyright (c) 2022 SpeechBrain Authors. All Rights Reserved.
# Copyright (c) 2022 PaddlePaddle and SpeechBrain Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -18,12 +18,14 @@ This script has an optional dependency on open source sklearn library.
A few sklearn functions are modified in this script as per requirement.
"""
import argparse
import copy
import warnings
from distutils.util import strtobool
import numpy as np
import scipy
import sklearn
from scipy import linalg
from scipy import sparse
from scipy.sparse.csgraph import connected_components
from scipy.sparse.csgraph import laplacian as csgraph_laplacian
......@@ -346,6 +348,8 @@ class EmbeddingMeta:
---------
segset : list
List of session IDs as an array of strings.
modelset : list
List of model IDs as an array of strings.
stats : tensor
An ndarray of float64. Each line contains embedding
from the corresponding session.
......@@ -354,15 +358,20 @@ class EmbeddingMeta:
def __init__(
self,
segset=None,
modelset=None,
stats=None, ):
if segset is None:
self.segset = numpy.empty(0, dtype="|O")
self.stats = numpy.array([], dtype=np.float64)
self.segset = np.empty(0, dtype="|O")
self.modelset = np.empty(0, dtype="|O")
self.stats = np.array([], dtype=np.float64)
else:
self.segset = segset
self.modelset = modelset
self.stats = stats
self.stat0 = np.array([[1.0]] * self.stats.shape[0])
def norm_stats(self):
"""
Divide all first-order statistics by their Euclidean norm.
......@@ -371,6 +380,188 @@ class EmbeddingMeta:
vect_norm = np.clip(np.linalg.norm(self.stats, axis=1), 1e-08, np.inf)
self.stats = (self.stats.transpose() / vect_norm).transpose()
def get_mean_stats(self):
"""
Return the mean of first order statistics.
"""
mu = np.mean(self.stats, axis=0)
return mu
def get_total_covariance_stats(self):
"""
Compute and return the total covariance matrix of the first-order statistics.
"""
C = self.stats - self.stats.mean(axis=0)
return np.dot(C.transpose(), C) / self.stats.shape[0]
def get_model_stat0(self, mod_id):
"""Return zero-order statistics of a given model
Arguments
---------
mod_id : str
ID of the model which stat0 will be returned.
"""
S = self.stat0[self.modelset == mod_id, :]
return S
def get_model_stats(self, mod_id):
"""Return first-order statistics of a given model.
Arguments
---------
mod_id : str
ID of the model which stat1 will be returned.
"""
return self.stats[self.modelset == mod_id, :]
def sum_stat_per_model(self):
"""
Sum the zero- and first-order statistics per model and store them
in a new EmbeddingMeta.
Returns a EmbeddingMeta object with the statistics summed per model
and a numpy array with session_per_model.
"""
sts_per_model = EmbeddingMeta()
sts_per_model.modelset = np.unique(
self.modelset) # nd: get uniq spkr ids
sts_per_model.segset = copy.deepcopy(sts_per_model.modelset)
sts_per_model.stat0 = np.zeros(
(sts_per_model.modelset.shape[0], self.stat0.shape[1]),
dtype=np.float64, )
sts_per_model.stats = np.zeros(
(sts_per_model.modelset.shape[0], self.stats.shape[1]),
dtype=np.float64, )
session_per_model = np.zeros(np.unique(self.modelset).shape[0])
# For each model sum the stats
for idx, model in enumerate(sts_per_model.modelset):
sts_per_model.stat0[idx, :] = self.get_model_stat0(model).sum(
axis=0)
sts_per_model.stats[idx, :] = self.get_model_stats(model).sum(
axis=0)
session_per_model[idx] += self.get_model_stats(model).shape[0]
return sts_per_model, session_per_model
def center_stats(self, mu):
"""
Center first order statistics.
Arguments
---------
mu : array
Array to center on.
"""
dim = self.stats.shape[1] / self.stat0.shape[1]
index_map = np.repeat(np.arange(self.stat0.shape[1]), dim)
self.stats = self.stats - (self.stat0[:, index_map] *
mu.astype(np.float64))
def rotate_stats(self, R):
"""
Rotate first-order statistics by a right-product.
Arguments
---------
R : ndarray
Matrix to use for right product on the first order statistics.
"""
self.stats = np.dot(self.stats, R)
def whiten_stats(self, mu, sigma, isSqrInvSigma=False):
"""
Whiten first-order statistics
If sigma.ndim == 1, case of a diagonal covariance.
If sigma.ndim == 2, case of a single Gaussian with full covariance.
If sigma.ndim == 3, case of a full covariance UBM.
Arguments
---------
mu : array
Mean vector to be subtracted from the statistics.
sigma : narray
Co-variance matrix or covariance super-vector.
isSqrInvSigma : bool
True if the input Sigma matrix is the inverse of the square root of a covariance matrix.
"""
if sigma.ndim == 1:
self.center_stats(mu)
self.stats = self.stats / np.sqrt(sigma.astype(np.float64))
elif sigma.ndim == 2:
# Compute the inverse square root of the co-variance matrix Sigma
sqr_inv_sigma = sigma
if not isSqrInvSigma:
# eigen_values, eigen_vectors = scipy.linalg.eigh(sigma)
eigen_values, eigen_vectors = linalg.eigh(sigma)
ind = eigen_values.real.argsort()[::-1]
eigen_values = eigen_values.real[ind]
eigen_vectors = eigen_vectors.real[:, ind]
sqr_inv_eval_sigma = 1 / np.sqrt(eigen_values.real)
sqr_inv_sigma = np.dot(eigen_vectors,
np.diag(sqr_inv_eval_sigma))
else:
pass
# Whitening of the first-order statistics
self.center_stats(mu) # CENTERING
self.rotate_stats(sqr_inv_sigma)
elif sigma.ndim == 3:
# we assume that sigma is a 3D ndarray of size D x n x n
# where D is the number of distributions and n is the dimension of a single distribution
n = self.stats.shape[1] // self.stat0.shape[1]
sess_nb = self.stat0.shape[0]
self.center_stats(mu)
self.stats = (np.einsum("ikj,ikl->ilj",
self.stats.T.reshape(-1, n, sess_nb), sigma)
.reshape(-1, sess_nb).T)
else:
raise Exception("Wrong dimension of Sigma, must be 1 or 2")
def align_models(self, model_list):
"""
Align models of the current EmbeddingMeta to match a list of models
provided as input parameter. The size of the StatServer might be
reduced to match the input list of models.
Arguments
---------
model_list : ndarray of strings
List of models to match.
"""
indx = np.array(
[np.argwhere(self.modelset == v)[0][0] for v in model_list])
self.segset = self.segset[indx]
self.modelset = self.modelset[indx]
self.stat0 = self.stat0[indx, :]
self.stats = self.stats[indx, :]
def align_segments(self, segment_list):
"""
Align segments of the current EmbeddingMeta to match a list of segment
provided as input parameter. The size of the StatServer might be
reduced to match the input list of segments.
Arguments
---------
segment_list: ndarray of strings
list of segments to match
"""
indx = np.array(
[np.argwhere(self.segset == v)[0][0] for v in segment_list])
self.segset = self.segset[indx]
self.modelset = self.modelset[indx]
self.stat0 = self.stat0[indx, :]
self.stats = self.stats[indx, :]
class SpecClustUnorm:
"""
......
此差异已折叠。
......@@ -26,14 +26,14 @@ from paddleaudio.compliance.librosa import mfcc
class meta_info:
"""the audio meta info in the vector JSONDataset
Args:
id (str): the segment name
utt_id (str): the segment name
duration (float): segment time
wav (str): wav file path
start (int): start point in the original wav file
stop (int): stop point in the original wav file
lab_id (str): the record id
"""
id: str
utt_id: str
duration: float
wav: str
start: int
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册