dsp.py 2.9 KB
Newer Older
H
hypox64 已提交
1 2 3 4
import scipy.signal
import scipy.fftpack as fftpack
import numpy as np

H
hypox64 已提交
5 6 7
def sin(f,fs,time):
    x = np.linspace(0, 2*np.pi*f*time, fs*time)
    return np.sin(x)
H
hypox64 已提交
8

H
hypox64 已提交
9
def downsample(signal,fs1=0,fs2=0,alpha=0,mod = 'just_down'):
H
hypox64 已提交
10
    if alpha == 0:
H
hypox64 已提交
11 12 13 14 15 16 17 18
        alpha = int(fs1/fs2)
    if mod == 'just_down':
        return signal[::alpha]
    elif mod == 'avg':
        result = np.zeros(int(len(signal)/alpha))
        for i in range(int(len(signal)/alpha)):
            result[i] = np.mean(signal[i*alpha:(i+1)*alpha])
        return result       
H
hypox64 已提交
19

H
hypox64 已提交
20 21
def medfilt(signal,x):
    return scipy.signal.medfilt(signal,x)
H
hypox64 已提交
22

H
hypox64 已提交
23 24 25 26 27 28
def cleanoffset(signal):
    return signal - np.mean(signal)

def bpf_fir(signal,fs,fc1,fc2,numtaps=101):
    b=scipy.signal.firwin(numtaps, [fc1, fc2], pass_zero=False,fs=fs)
    result = scipy.signal.lfilter(b, 1, signal)
H
hypox64 已提交
29
    return result
H
hypox64 已提交
30

H
hypox64 已提交
31 32 33 34 35 36 37 38 39
def fft_filter(signal,fs,fc=[],type = 'bandpass'):
    '''
    signal: Signal
    fs: Sampling frequency
    fc: [fc1,fc2...] Cut-off frequency 
    type: bandpass | bandstop
    '''
    k = []
    N=len(signal)#get N
H
hypox64 已提交
40

H
hypox64 已提交
41 42
    for i in range(len(fc)):
        k.append(int(fc[i]*N/fs))
H
hypox64 已提交
43

H
hypox64 已提交
44 45 46
    #FFT
    signal_fft=scipy.fftpack.fft(signal)
    #Frequency truncation
H
hypox64 已提交
47

H
hypox64 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60
    if type == 'bandpass':
        a = np.zeros(N)
        for i in range(int(len(fc)/2)):
            a[k[2*i]:k[2*i+1]] = 1
            a[N-k[2*i+1]:N-k[2*i]] = 1
    elif type == 'bandstop':
        a = np.ones(N)
        for i in range(int(len(fc)/2)):
            a[k[2*i]:k[2*i+1]] = 0
            a[N-k[2*i+1]:N-k[2*i]] = 0
    signal_fft = a*signal_fft
    signal_ifft=scipy.fftpack.ifft(signal_fft)
    result = signal_ifft.real
H
hypox64 已提交
61 62
    return result

H
hypox64 已提交
63 64 65
def rms(signal):
    signal = signal.astype('float64')
    return np.mean((signal*signal))**0.5
H
hypox64 已提交
66

H
hypox64 已提交
67 68 69 70 71 72 73 74 75
def energy(signal,kernel_size,stride,padding = 0):
    _signal = np.zeros(len(signal)+padding)
    _signal[0:len(signal)] = signal
    signal = _signal
    out_len = int((len(signal)+1-kernel_size)/stride)
    energy = np.zeros(out_len)
    for i in range(out_len):
        energy[i] = rms(signal[i*stride:i*stride+kernel_size]) 
    return energy
H
hypox64 已提交
76

H
hypox64 已提交
77
def signal2spectrum(data,window_size, stride, n_downsample=1, log = True, log_alpha = 0.1):
H
hypox64 已提交
78
    # window : ('tukey',0.5) hann
H
hypox64 已提交
79 80
    if n_downsample != 1:
        data = downsample(data,alpha=n_downsample)
H
hypox64 已提交
81

H
hypox64 已提交
82 83 84 85 86 87
    zxx = scipy.signal.stft(data, window='hann', nperseg=window_size,noverlap=window_size-stride)[2]
    spectrum = np.abs(zxx)

    if log:
        spectrum = np.log1p(spectrum)
        h = window_size//2+1
H
hypox64 已提交
88 89 90
        x = np.linspace(h*log_alpha, h-1,num=h+1,dtype=np.int64)
        index = (np.log1p(x)-np.log1p(h*log_alpha))/(np.log1p(h)-np.log1p(h*log_alpha))*h

H
hypox64 已提交
91
        spectrum_new = np.zeros_like(spectrum)
H
hypox64 已提交
92
        for i in range(h):
H
hypox64 已提交
93 94
            spectrum_new[int(index[i]):int(index[i+1])] = spectrum[i]
        spectrum = spectrum_new
H
hypox64 已提交
95 96 97 98
        spectrum = (spectrum-0.05)/0.25

    else:
        spectrum = (spectrum-0.02)/0.05
H
hypox64 已提交
99

H
hypox64 已提交
100
    return spectrum