cmvn.py 5.3 KB
Newer Older
H
Hui Zhang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
H
Hui Zhang 已提交
14
# Modified from espnet(https://github.com/espnet/espnet)
15 16 17 18 19 20 21 22 23 24 25
import io

import h5py
import kaldiio
import numpy as np


class CMVN():
    "Apply Global/Spk CMVN/iverserCMVN."

    def __init__(
H
Hui Zhang 已提交
26 27 28 29 30 31 32 33 34
            self,
            stats,
            norm_means=True,
            norm_vars=False,
            filetype="mat",
            utt2spk=None,
            spk2utt=None,
            reverse=False,
            std_floor=1.0e-20, ):
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
        self.stats_file = stats
        self.norm_means = norm_means
        self.norm_vars = norm_vars
        self.reverse = reverse

        if isinstance(stats, dict):
            stats_dict = dict(stats)
        else:
            # Use for global CMVN
            if filetype == "mat":
                stats_dict = {None: kaldiio.load_mat(stats)}
            # Use for global CMVN
            elif filetype == "npy":
                stats_dict = {None: np.load(stats)}
            # Use for speaker CMVN
            elif filetype == "ark":
                self.accept_uttid = True
                stats_dict = dict(kaldiio.load_ark(stats))
            # Use for speaker CMVN
            elif filetype == "hdf5":
                self.accept_uttid = True
                stats_dict = h5py.File(stats)
            else:
                raise ValueError("Not supporting filetype={}".format(filetype))

        if utt2spk is not None:
            self.utt2spk = {}
            with io.open(utt2spk, "r", encoding="utf-8") as f:
                for line in f:
                    utt, spk = line.rstrip().split(None, 1)
                    self.utt2spk[utt] = spk
        elif spk2utt is not None:
            self.utt2spk = {}
            with io.open(spk2utt, "r", encoding="utf-8") as f:
                for line in f:
                    spk, utts = line.rstrip().split(None, 1)
                    for utt in utts.split():
                        self.utt2spk[utt] = spk
        else:
            self.utt2spk = None

        # Kaldi makes a matrix for CMVN which has a shape of (2, feat_dim + 1),
        # and the first vector contains the sum of feats and the second is
        # the sum of squares. The last value of the first, i.e. stats[0,-1],
        # is the number of samples for this statistics.
        self.bias = {}
        self.scale = {}
        for spk, stats in stats_dict.items():
            assert len(stats) == 2, stats.shape

            count = stats[0, -1]

            # If the feature has two or more dimensions
            if not (np.isscalar(count) or isinstance(count, (int, float))):
                # The first is only used
                count = count.flatten()[0]

            mean = stats[0, :-1] / count
            # V(x) = E(x^2) - (E(x))^2
            var = stats[1, :-1] / count - mean * mean
            std = np.maximum(np.sqrt(var), std_floor)
            self.bias[spk] = -mean
            self.scale[spk] = 1 / std

    def __repr__(self):
H
Hui Zhang 已提交
100 101 102 103 104 105 106 107
        return ("{name}(stats_file={stats_file}, "
                "norm_means={norm_means}, norm_vars={norm_vars}, "
                "reverse={reverse})".format(
                    name=self.__class__.__name__,
                    stats_file=self.stats_file,
                    norm_means=self.norm_means,
                    norm_vars=self.norm_vars,
                    reverse=self.reverse, ))
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133

    def __call__(self, x, uttid=None):
        if self.utt2spk is not None:
            spk = self.utt2spk[uttid]
        else:
            spk = uttid

        if not self.reverse:
            # apply cmvn
            if self.norm_means:
                x = np.add(x, self.bias[spk])
            if self.norm_vars:
                x = np.multiply(x, self.scale[spk])

        else:
            # apply reverse cmvn
            if self.norm_vars:
                x = np.divide(x, self.scale[spk])
            if self.norm_means:
                x = np.subtract(x, self.bias[spk])

        return x


class UtteranceCMVN():
    "Apply Utterance CMVN"
H
Hui Zhang 已提交
134

135 136 137 138 139 140 141 142 143
    def __init__(self, norm_means=True, norm_vars=False, std_floor=1.0e-20):
        self.norm_means = norm_means
        self.norm_vars = norm_vars
        self.std_floor = std_floor

    def __repr__(self):
        return "{name}(norm_means={norm_means}, norm_vars={norm_vars})".format(
            name=self.__class__.__name__,
            norm_means=self.norm_means,
H
Hui Zhang 已提交
144
            norm_vars=self.norm_vars, )
145 146 147

    def __call__(self, x, uttid=None):
        # x: [Time, Dim]
H
Hui Zhang 已提交
148
        square_sums = (x**2).sum(axis=0)
149 150 151 152 153 154
        mean = x.mean(axis=0)

        if self.norm_means:
            x = np.subtract(x, mean)

        if self.norm_vars:
H
Hui Zhang 已提交
155
            var = square_sums / x.shape[0] - mean**2
156 157 158
            std = np.maximum(np.sqrt(var), self.std_floor)
            x = np.divide(x, std)

H
Hui Zhang 已提交
159
        return x