'''Dsp module contains functions pertaining to the actual generation,
manipulation, and analysis of sound. This ranges from generating sounds to
calculating sound to noise ratio.
'''
###############################################################################
import sys, os
import inspect
import numpy as np
from numpy.fft import fft, rfft, ifft, irfft
from scipy.signal import hamming, hann, resample, iirfilter, lfilter
import librosa
import math
currentdir = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))
packagedir = os.path.dirname(currentdir)
sys.path.insert(0, packagedir)
import soundpy as sp
[docs]def generate_sound(freq=200, amplitude=0.4, sr=8000, dur_sec=0.25):
'''Generates a sound signal with the provided parameters. Signal begins at 0.
Parameters
----------
freq : int, float
The frequency in Hz the signal should have (default 200 Hz). This pertains
to the number of ossicliations per second.
amplitude : int, float
The parameter controling how much energy the signal should have.
(default 0.4)
sr : int
The sampling rate of the signal, or how many samples make up the signal
per second. (default 8000)
Returns
-------
sound_samples : np.ndarray [size = ()]
The samples of the generated sound
sr : int
The sample rate of the generated signal
Examples
--------
>>> sound, sr = generate_sound(freq=5, amplitude=0.5, sr=5, dur_sec=1)
>>> sound
array([ 0.000000e+00, 5.000000e-01, 3.061617e-16, -5.000000e-01, -6.123234e-16])
>>> sr
5
'''
#The variable `time` holds the expected number of measurements taken:
time = get_time_points(dur_sec, sr=sr)
# unit circle: 2pi equals a full circle
# https://www.marksmath.org/visualization/unit_circle/
full_circle = 2 * np.pi
sound_samples = amplitude * np.sin((freq*full_circle)*time)
return sound_samples, sr
[docs]def get_time_points(dur_sec,sr):
'''Get evenly spaced time points from zero to length of `dur_sec`.
The time points align with the provided sample rate, making it easy
to plot a signal with a time line in seconds.
Parameters
----------
dur_sec : int, float
The amount of time in seconds
sr : int
The sample rate relevant for the signal
Returns
-------
time : np.ndarray [size = (num_time_points,)]
Examples
--------
>>> # 50 milliseconds at sample rate of 100 (100 samples per second)
>>> x = get_time_points(0.05,100)
>>> x.shape
(5,)
>>> x
array([0. , 0.0125, 0.025 , 0.0375, 0.05 ])
'''
time = np.linspace(0, dur_sec, int(np.floor(dur_sec*sr)))
return time
[docs]def generate_noise(num_samples, amplitude=0.025, random_seed=None):
'''Generates noise to be of a certain amplitude and number of samples.
Useful for adding noise to another signal of length `num_samples`.
Parameters
----------
num_samples : int
The number of total samples making up the noise signal.
amplitude : float
Allows the noise signal to be louder or quieter. (default 0.025)
random_seed : int, optional
Useful for repeating 'random' noise samples.
Examples
--------
>>> noise = generate_noise(5, random_seed = 0)
>>> noise
array([0.04410131, 0.01000393, 0.02446845, 0.05602233, 0.04668895])
'''
if random_seed is not None:
np.random.seed(random_seed)
noise = amplitude * np.random.randn(num_samples)
return noise
[docs]def set_signal_length(samples, numsamps):
'''Sets audio signal to be a certain length. Zeropads if too short.
Useful for setting signals to be a certain length, regardless of how
long the audio signal is.
Parameters
----------
samples : np.ndarray [size = (num_samples, num_channels), or (num_samples,)]
The array of sample data to be zero padded.
numsamps : int
The desired number of samples.
Returns
-------
data : np.ndarray [size = (numsamps, num_channels), or (numsamps,)]
Copy of samples zeropadded or limited to `numsamps`.
Examples
--------
>>> import numpy as np
>>> input_samples = np.array([1,2,3,4,5])
>>> output_samples = set_signal_length(input_samples, numsamps = 8)
>>> output_samples
array([1, 2, 3, 4, 5, 0, 0, 0])
>>> output_samples = set_signal_length(input_samples, numsamps = 4)
>>> output_samples
array([1, 2, 3, 4])
'''
data = samples.copy()
if data.shape[0] < numsamps:
diff = numsamps - data.shape[0]
if len(data.shape) > 1:
signal_zeropadded = np.zeros(
(data.shape[0] + int(diff),data.shape[1]))
else:
signal_zeropadded = np.zeros(
(data.shape[0] + int(diff),))
for i, row in enumerate(data):
signal_zeropadded[i] = row
data = signal_zeropadded
else:
if len(data.shape) > 1:
data = data[:numsamps,:]
else:
data = data[:numsamps]
# ensure returned data same dtype as input
data = sp.utils.match_dtype(data, samples)
return data
# works for stereo sound (raw signal data)
[docs]def scalesound(data, max_val = 1, min_val=None):
'''Scales the input array to range between `min_val` and `max_val`.
Parameters
----------
data : np.ndarray [size = (num_samples,) or (num_samples, num_channels)]
Original samples
max_val : int, float
The maximum value the dataset is to range from (default 1)
min_val : int, float, optional
The minimum value the dataset is to range from. If set to None,
will be set to the opposiite of `max_val`. E.g. if `max_val` is set to
0.8, `min_val` will be set to -0.8. (default None)
Returns
-------
samples : np.ndarray [size = (num_samples,) or (num_samples, num_channels)]
Copy of original data, scaled to the min and max values.
Examples
--------
>>> import numpy as np
>>> np.random.seed(0)
>>> input_samples = np.random.random_sample((5,))
>>> input_samples
array([0.5488135 , 0.71518937, 0.60276338, 0.54488318, 0.4236548 ])
>>> input_samples.max()
0.7151893663724195
>>> input_samples.min()
0.4236547993389047
>>> # default setting: between -1 and 1
>>> output_samples = scalesound(input_samples)
>>> output_samples
array([-0.14138 ,1., 0.22872961, -0.16834299, -1.])
>>> output_samples.max()
1.0
>>> output_samples.min()
-1.0
>>> # range between -100 and 100
>>> output_samples = scalesound(input_samples, max_val = 100, min_val = -100)
>>> output_samples
array([ -14.13800026,100., 22.87296052,-16.83429866,-100.])
>>> output_samples.max()
100.0
>>> output_samples.min()
-100.0
'''
if min_val is None:
min_val = -max_val
samples = data.copy()
if isinstance(samples, np.ndarray):
samples = np.interp(samples,
(samples.min(), samples.max()),
(min_val, max_val))
else:
samples = np.interp(samples,
(min(samples), max(samples)),
(min_val, max_val))
return samples
[docs]def shape_samps_channels(data):
'''Returns data in shape (num_samps, num_channels)
Parameters
----------
data : np.ndarray [size= (num_samples,) or (num_samples, num_channels), or (num_channels, num_samples)]
The data that needs to be checked for correct format
Returns
-------
data : np.ndarray [size = (num_samples,) or (num_samples, num_channels)]
'''
if len(data.shape) == 1:
return data
if len(data.shape) > 2:
raise ValueError('Expected 2 dimensional data: (num_samples, num_channels,) not '+\
'shape {}'.format(data.shape))
if data.shape[0] < data.shape[1]:
# assumes number of samples will be greater than number of channels
data = data.T
assert data.shape[0] > data.shape[1]
return data
[docs]def resample_audio(samples, sr_original, sr_desired):
'''Allows audio samples to be resampled to desired sample rate.
Parameters
----------
samples : np.ndarray [size = (num_samples,)]
The samples to be resampled.
sr_original : int
The orignal sample rate of the samples.
sr_desired : int
The desired sample rate of the samples.
Returns
-------
resampled : np.ndarray [size = (num_samples_resampled,)]
The resampled samples.
sr_desired : int
The newly applied sample rate
Examples
--------
>>> import numpy as np
>>> # example samples from 5 millisecond signal with sr 100 and frequency 10
>>> input_samples = np.array([0.00e+00, 2.82842712e-01, 4.000e-01, 2.82842712e-01, 4.89858720e-17])
>>> # we want to resample to 80 instead of 100 (for this example's sake)
>>> output_samples, sr = resample_audio(input_samples, sr_original = 100, sr_desired = 80)
>>> output_samples
array([-2.22044605e-17, 3.35408001e-01, 3.72022523e-01, 6.51178161e-02])
'''
time_sec = len(samples)/sr_original
num_samples = int(time_sec * sr_desired)
resampled = resample(samples, num_samples)
return resampled, sr_desired
[docs]def stereo2mono(data):
'''If sound data has multiple channels, reduces to first channel
Parameters
----------
data : numpy.ndarray
The series of sound samples, with 1+ columns/channels
Returns
-------
data_mono : numpy.ndarray
The series of sound samples, with first column
Examples
--------
>>> import numpy as np
>>> data = np.linspace(0,20)
>>> data_2channel = data.reshape(25,2)
>>> data_2channel[:5]
array([[0. , 0.40816327],
[0.81632653, 1.2244898 ],
[1.63265306, 2.04081633],
[2.44897959, 2.85714286],
[3.26530612, 3.67346939]])
>>> data_mono = stereo2mono(data_2channel)
>>> data_mono[:5]
array([0. , 0.81632653, 1.63265306, 2.44897959, 3.26530612])
'''
data_mono = data.copy()
if len(data.shape) > 1 and data.shape[1] > 1:
# ensure data is samples first, channels second
data_mono = sp.dsp.shape_samps_channels(data_mono)
data_mono = np.take(data_mono,0,axis=-1)
return data_mono
[docs]def add_backgroundsound(audio_main, audio_background, sr, snr = None,
pad_mainsound_sec = None, total_len_sec=None,
wrap = False, stationary_noise = True,
random_seed = None, extend_window_ms = 0,
remove_dc = False, mirror_sound = False, clip_at_zero = True,
**kwargs):
'''Adds a sound (i.e. background noise) to a target signal. Stereo sound should work.
If the sample rates of the two audio samples do not match, the sample
rate of `audio_main` will be applied. (i.e. the `audio_background` will
be resampled). If you have issues with clicks at the beginning or end of
signals, see `soundpy.dsp.clip_at_zero`.
Parameters
----------
audio_main : str, pathlib.PosixPath, or np.ndarray [size=(num_samples,) or (num_samples, num_channels)]
Sound file of the main sound (will not be modified; only delayed if
specified). If not path or string, should be a data samples corrresponding to the provided sample rate.
audio_background : str, pathlib.PosixPath, or np.ndarray [size=(num_samples,)]
Sound file of the background sound (will be modified /repeated to match
or extend the length indicated). If not of type pathlib.PosixPath or
string, should be a data samples corrresponding to the provided sample rate.
sr : int
The sample rate of sounds to be added together. Note: sr of 44100 or higher
is suggested.
snr : int, float, list, tuple
The sound-to-noise-ratio of the target and background signals. Note: this
is an approximation and needs further testing and development to be
used as an official measurement of snr. If no SNR provided, signals
will be added together as-is. (default None)
pad_mainsound_sec : int or float, optional
Length of time in seconds the background sound will pad the main sound.
For example, if `pad_mainsound_sec` is set to 1, one second of the
`audio_background` will be played before `audio_main` starts as well as
after the `main audio` stops.
(default None)
total_len_sec : int or float, optional
Total length of combined sound in seconds. If none, the sound will end
after the (padded) target sound ends (default None).
wrap : bool
If False, the random selection of sound will be limited to end by
the end of the audio file. If True, the random selection will wrap to
beginning of the audio file if extends beyond the end of the audio file.
(default False)
stationary_noise : bool
If False, `soundpy.feats.get_vad_stft` will be applied to noise to get
energy of the active noise in the signal. Otherwise energy will be
collected via `soundpy.dsp.get_stft`. (default True)
random_seed : int
If provided, the 'random' section of noise will be chosen using this seed.
(default None)
extend_window_ms : int or float
The number of milliseconds the voice activity detected should be padded with.
This might be useful to ensure sufficient amount of activity is calculated.
(default 0)
remove_dc : bool
If the dc bias should be removed. This aids in the removal of clicks.
See `soundpy.dsp.remove_dc_bias`.
(default False)
**kwargs : additional keyword arguments
The keyword arguments for soundpy.files.loadsound
Returns
-------
combined : numpy.ndarray [shape=(num_samples) or (num_samples, num_channels)]
The samples of the sounds added together
snr : int, float
The updated signal-to-noise ratio. Due to the non-stationary state of speech and sound in general,
this value is only an approximation.
References
----------
Yi Hu and Philipos C. Loizou : original authors
Copyright (c) 2006 by Philipos C. Loizou
SIP-Lab/CNN-VAD/ : GitHub Repo
Copyright (c) 2019 Signal and Image Processing Lab
MIT License
See Also
--------
soundpy.files.loadsound
Loads audiofiles.
soundpy.dsp.snr_adjustnoiselevel
Calculates how much to adjust noise signal to achieve SNR.
soundpy.feats.get_vad_stft
Returns stft matrix of only voice active regions
soundpy.feats.get_stft
Returns stft matrix of entire signal
'''
if sr < 44100:
import warnings
msg = 'Performance of signal to noise analysis is improved with '+\
'sample rates at or higher than 44100 Hz. Current sample rate '+\
'set at {}.'.format(sr)
input_type_main = sp.utils.path_or_samples(audio_main)
input_type_background = sp.utils.path_or_samples(audio_background)
if 'path' in input_type_main:
target, sr = sp.loadsound(audio_main, sr = sr,
remove_dc = remove_dc,**kwargs)
elif 'samples' in input_type_main:
target, sr = audio_main, sr
if 'path' in input_type_background:
sound2add, sr2 = sp.loadsound(audio_background, sr = sr,
remove_dc = remove_dc,**kwargs)
elif 'samples' in input_type_background:
sound2add, sr2 = audio_background, sr
if sr != sr2:
sound2add, sr2 = sp.dsp.resample_audio(sound2add, sr2, sr)
assert sr2 == sr
# make background same shape as signal
if len(target.shape) != len(sound2add.shape):
# ensure in shape (num_samples,) or (num_samples, num_channels)
target = sp.dsp.shape_samps_channels(target)
if len(target.shape) > 1:
num_channels = target.shape[1]
else:
num_channels = 1
sound2add = sp.dsp.apply_num_channels(sound2add, num_channels)
if remove_dc:
target = sp.dsp.remove_dc_bias(target)
sound2add = sp.dsp.remove_dc_bias(sound2add)
target_stft, __ = sp.feats.get_vad_stft(target, sr,
extend_window_ms = extend_window_ms)
if not target_stft.any():
import warnings
msg = '\nNo voice activity detected in target signal.'
warnings.warn(msg)
target_stft = sp.feats.get_stft(target,sr)
if stationary_noise:
noise_stft = sp.feats.get_stft(sound2add, sr)
else:
# get energy of noise when active (e.g. car honking)
noise_stft, __ = sp.feats.get_vad_stft(sound2add, sr,
extend_window_ms = extend_window_ms)
if not noise_stft.any():
noise_stft = sp.feats.get_stft(sound2add, sr)
target_power = np.abs(target_stft)**2
noise_power = np.abs(noise_stft)**2
target_energy = np.mean(target_power)
noise_energy = np.mean(noise_power)
if snr is not None:
if isinstance(snr, list) or isinstance(snr, tuple):
snr = np.random.choice(snr)
elif isinstance(snr, int) or isinstance(snr, float) or isinstance(snr, np.int_) \
or isinstance(snr, np.float_):
pass
else:
raise TypeError('Function `add_backgroundsound` expects parameter '+\
'`snr` to be an int or float or a list / tuple of ints or floats, '+\
'not of type {}.'.format(type(snr)))
# see soundpy.dsp.snr_adjustnoiselevel
adjust_sound = (np.sqrt(target_energy/(noise_energy+1e-6) / (10**(snr/10))))
sound2add *= adjust_sound
# get SNR where voice activity is detected.
new_snr = sp.dsp.get_vad_snr(target, sound2add, sr=sr)
if pad_mainsound_sec is None:
pad_mainsound_sec = 0
num_padding_samples = int(sr*pad_mainsound_sec)*2 #pad on both sides of sound
if total_len_sec is not None:
total_samps = int(sr*total_len_sec)
else:
total_samps = len(target) + num_padding_samples
if total_samps < len(target) + num_padding_samples:
diff = len(target) + num_padding_samples - total_samps
import warnings
warnings.warn('The length of `audio_main` and `pad_mainsound_sec `'+\
'exceeds `total_len_sec`. {} samples from '.format(diff)+\
'`audio_main` will be cut off in '+\
'the `combined` audio signal.')
# make the background sound match the length of total samples
if len(sound2add) < total_samps:
# if shorter than total_samps, extend the noise
sound2add = sp.dsp.apply_sample_length(sound2add, total_samps,
mirror_sound = mirror_sound,
clip_at_zero = clip_at_zero)
else:
# otherwise, choose random selection of noise
sound2add = sp.dsp.clip_at_zero(sound2add)[:-1]
sound2add = sp.dsp.random_selection_samples(sound2add,
total_samps,
wrap = wrap,
random_seed = random_seed)
# separate samples to add to the target signal
target_sound = sound2add[num_padding_samples//2:len(target) \
+ num_padding_samples//2]
# If target is longer than indicated length, shorten it
if len(target_sound) < len(target):
target = target[:len(target_sound)]
combined = target_sound + target
if remove_dc:
combined = sp.dsp.remove_dc_bias(combined)
if pad_mainsound_sec:
# set aside samples for beginning delay (if there is one)
beginning_pad = sound2add[:num_padding_samples//2]
ending_pad = sound2add[num_padding_samples//2+len(target):]
combined = np.concatenate((beginning_pad, combined, ending_pad))
if len(combined) > total_samps:
combined = combined[:total_samps]
elif len(combined) < total_samps:
# set aside ending samples for ending (if sound is extended)
ending_sound = sound2add[len(target)+num_padding_samples:total_samps]
combined = np.concatenate((combined, ending_sound))
return combined, new_snr
[docs]def hz_to_mel(freq):
'''Converts frequency to Mel scale
Parameters
----------
freq : int or float or array like of ints / floats
The frequency/ies to convert to Mel scale.
Returns
-------
mel : int or float or array of ints / floats
The frequency/ies in Mel scale.
References
----------
https://en.wikipedia.org/wiki/Mel_scale#Formula
Fayek, H. M. (2016). Speech Processing for Machine Learning: Filter banks, Mel-Frequency Cepstral Coefficients (MFCCs) and What’s In-Between. Retrieved from https://haythamfayek.com/2016/04/21/speech-processing-for-machine-learning.html
'''
mel = (2595 * np.log10(1 + freq / 700))
return mel
[docs]def mel_to_hz(mel):
'''Converts Mel item or list to frequency/ies.
Parameters
----------
mel : int, float, or list of ints / floats
Mel item(s) to be converted to Hz.
Returns
-------
freq : int, float, or list of ints / floats
The converted frequency/ies
References
----------
https://en.wikipedia.org/wiki/Mel_scale#Formula
Fayek, H. M. (2016). Speech Processing for Machine Learning: Filter banks, Mel-Frequency Cepstral Coefficients (MFCCs) and What’s In-Between. Retrieved from https://haythamfayek.com/2016/04/21/speech-processing-for-machine-learning.html
'''
freq = (700 * (10**(mel / 2595) -1))
return freq
[docs]def fbank_filters(fmin, fmax, num_filters):
'''Calculates the mel filterbanks given a min and max frequency and `num_filters`.
Parameters
----------
fmin : int, float
Minimum frequency relevant in signal.
fmax : int, float
Maximum frequency relevant in signal.
num_filters : int
The number of evenly spaced filters (according to mel scale) between the `fmin`
and `fmax` frequencies.
Returns
-------
mel_points : np.ndarray [size=(num_filters,)]
An array of floats containing evenly spaced filters (according to mel scale).
References
----------
Fayek, H. M. (2016). Speech Processing for Machine Learning: Filter banks, Mel-Frequency Cepstral Coefficients (MFCCs) and What’s In-Between. Retrieved from https://haythamfayek.com/2016/04/21/speech-processing-for-machine-learning.html
'''
if fmin > 0:
low_freq_mel = sp.dsp.hz_to_mel(fmin)
else:
low_freq_mel = 0
high_freq_mel = sp.dsp.hz_to_mel(fmax)
mel_points = np.linspace(low_freq_mel, high_freq_mel, num_filters +2)
return mel_points
[docs]def sinosoidal_liftering(mfccs, cep_lifter = 22):
'''Reduces influence of higher coefficients; found useful in automatic speech rec.
Parameters
----------
mfccs : np.ndarray [shape=(num_samples, num_mfcc)]
The matrix containing mel-frequency cepstral coefficients.
cep_lifter : int
The amount to apply `sinosoidal_liftering`. (default 22)
References
----------
Fayek, H. M. (2016). Speech Processing for Machine Learning: Filter banks, Mel-Frequency Cepstral Coefficients (MFCCs) and What’s In-Between. Retrieved from https://haythamfayek.com/2016/04/21/speech-processing-for-machine-learning.html
'''
(nframes, ncoeff) = mfccs.shape
n = np.arange(ncoeff)
lift = 1 + (cep_lifter / 2) * np.sin(np.pi * n / cep_lifter)
mfccs *= lift
return mfccs
[docs]def index_at_zero(samples, num_dec_places=2):
'''Finds indices of start and end of utterance, given amplitude strength.
Parameters
----------
samples : numpy.ndarray [size= (num_samples,) or (num_samples, num_channels)]
The samples to index where the zeros surrounding speech are located.
num_dec_places : int
To the number of decimal places the lowest value in `samples` should
be rounded to. (default 2)
Returns
-------
f_0 : int
The index of the last occuring zero, right before speech or sound begins.
l_0 : int
The index of the first occuring zero, after speech ends.
Examples
--------
>>> signal = np.array([-1, 0, 1, 2, 3, 2, 1, 0, -1, -2, -3, -2, -1, 0, 1])
>>> zero_1, zero_2 = index_at_zero(signal)
>>> # +1 to include zero_2 in signal
>>> signal[zero_1:zero_2+1]
[ 0 1 2 3 2 1 0 -1 -2 -3 -2 -1 0]
>>> # does not assume a zero preceeds any sample
>>> signal = np.array([1, 2, 1, 0, -1, -2, -1, 0, 1, 2, 1])
>>> zero_1, zero_2 = index_at_zero(signal)
>>> signal[zero_1:zero_2+1]
[ 0 -1 -2 -1 0]
'''
almost_zero = 1e-1
original_shape = samples.shape
samps = samples.copy()
if len(original_shape) > 1:
# if multiple channels find where it is 0 across all channels
samps = sp.dsp.average_channels(samps)
min_samp = np.argmin(np.abs(samps))
# in some instances, stored as numpy array
if isinstance(samps[min_samp], np.ndarray):
assert len(samps[min_samp]) == 1
min_samp = samps[min_samp][0]
else:
min_samp = samps[min_samp]
if round(min_samp,num_dec_places) <= almost_zero:
almost_zero += min_samp
# find first instance of zero:
f_0_etc = np.where(np.abs(samps) <= almost_zero)
if len(f_0_etc[0]) > 0:
# get index of first zero
for i, index in enumerate(f_0_etc[0]):
# if more silence follows, adjust f_0
if i == len(f_0_etc[0])-1:
import warnings
warnings.warn('\n\nWarning: Only zeros found in signal.\n\n')
f_0 = 0
else:
if index+1 != f_0_etc[0][i+1]:
f_0 = index
break
else:
# no zero found
f_0 = 0
# find end of utterance last zero
l_0_etc = np.where(np.abs(np.flip(samps)) <= almost_zero)
if len(l_0_etc[0]) > 1:
# get index of first zero
for i, index in enumerate(l_0_etc[0]):
# if more silence follows, adjust l_0
if i == len(l_0_etc[0])-1:
# warning should get called for f_0
#import warnings
#warnings.warn('\n\nWarning: Only zeros found in signal.\n\n')
l_0 = 0
else:
if index+1 != l_0_etc[0][i+1]:
l_0 = index
break
else:
# no zeros found
l_0 = 0
if l_0 != 0:
l_0 = len(samps) - l_0 - 1
else:
l_0 = len(samps) - 1
try:
assert f_0 != l_0
except AssertionError:
import warnings
warnings.warn('\n\nWarning: only one zero was found. Returning '+\
'sample indices that encompass more energy.')
if sum(np.abs(samps[f_0:])) > sum(np.abs(samps[:f_0])):
f_0, l_0 = 0, l_0
else:
f_0, l_0 = f_0, len(samps)-1
return f_0, l_0
[docs]def clip_at_zero(samples, samp_win = None, neg2pos = True, **kwargs):
'''Clips the signal at samples close to zero.
The samples where clipping occurs crosses the zero line from negative to positive. This
clipping process allows for a smoother transition of audio, especially if concatenating audio.
Parameters
----------
samples : np.ndarray [shape = (num_samples, ) or (num_samples, num_channels)]
The array containing sample data. Should work on stereo sound.
start_with_zero : bool
If True, the returned array will begin with 0 (or close to 0). Otherwise
the array will end with 0.
neg2pos : bool
If True, the returned array will begin with positive values and end with
negative values. Otherwise, the array will be returned with the first
zeros detected, regardless of surrounding positive or negative values.
samp_win : int, optional
The window of samples to apply when clipping at zero crossings. The zero
crossings adjacent to the main signal will be used. This is useful to remove
already existing clicks within the signal, often found at the beginning and / or
end of signals.
kwargs : additional keyword arguments
Keyword arguments for `soundpy.dsp.index_at_zero`.
Warning
-------
If only one zero found.
Examples
--------
>>> sig = np.array([-2,-1,0,1, 2, 1, 0, -1, -2, -1, 0, 1, 2, 1,0])
>>> clip_at_zero(sig) # defaults
[ 0 1 2 1 0 -1 -2 -1 0]
>>> # finds first and last insance of zeros, regardless of surrounding
>>> # negative or positive values in signal
>>> clip_at_zero(sig, neg2pos = False)
[ 0 1 2 1 0 -1 -2 -1 0 1 2 1 0]
>>> # avoid clicks at start of signal
>>> sig = np.array([0,-10,-20,-1,0,1, 2, 1, 0, -1, -2, -1, 0, 1, 2, 1,0])
>>> clip_at_zero(sig, samp_win = 5)
[ 0 1 2 1 0 -1 -2 -1 0]
'''
almost_zero = 1e-1
original_shape = samples.shape
samps = samples.copy()
if samp_win is not None:
samps_beg = samps[:samp_win]
samps_end = samps[-samp_win:]
# find last instance of zero within window at beginning of signal
__, f_0 = index_at_zero(samps_beg)
# find first instance of zero within window at end of signal
l_0, __ = index_at_zero(samps_end)
# match l_0 to original samples
l_0 += len(samps)-samp_win
else:
f_0, l_0 = index_at_zero(samps)
# ensure same shape as original_shape
samps = samples[f_0:l_0+1]
# ensure beginning of signal starts positive and ends negative
if not neg2pos:
return samps
try:
if len(samps.shape) > 1:
beg_pos_neg = sum(samps[:3,0])
end_pos_neg = sum(samps[-4:,0])
else:
beg_pos_neg = sum(samps[:3])
end_pos_neg = sum(samps[-4:])
except IndexError:
raise ValueError('Function clip_at_zero can only be applied to arrays '+\
'longer than 5 samples.\n\n')
if beg_pos_neg > 0 and end_pos_neg < 0:
return samps
# try to cut at different zero but only
# if more than 1 zero left in signal:
if len(np.where(samps <= almost_zero)[0]) > 1:
if beg_pos_neg > 0 and end_pos_neg > 0:
# won't include the last zero
samps_no_last_zero = samps[:-1]
f_0, l_0 = index_at_zero(samps_no_last_zero)
samps = samps_no_last_zero[f_0:l_0+1]
elif beg_pos_neg < 0:
if end_pos_neg < 0:
# won't include the first zero
samps_no_first_zero = samps[f_0+1::]
f_0, l_0 = index_at_zero(samps_no_first_zero)
samps = samps_no_first_zero[f_0:l_0+1]
else:
samps_no_first_last_zero = samps[f_0+1:-1]
f_0, l_0 = index_at_zero(samps_no_first_last_zero)
samps = samps_no_first_last_zero[f_0:l_0+1]
try:
if len(samps.shape) > 1:
assert sum(samps[:2,0]) > 0 and \
sum(samps[-2:,0]) < 0
else:
assert sum(samps[:2]) > 0 and sum(samps[-2:]) < 0
except AssertionError:
import warnings
warnings.warn('\n\nWarning: was not able to clip at zero where '+\
'`samples` begin positive and end negative.\n\n')
return samps
[docs]def remove_dc_bias(samples, samp_win = None):
'''Removes DC bias by subtracting mean from sample data.
Seems to work best without samp_win.
# TODO add moving average?
Parameters
----------
samples : np.ndarray [shape=(samples, num_channels) or (samples)]
The sample data to center around zero. This worsk on both mono and stero data.
samp_win: int, optional
Apply subtraction of mean at windows - experimental. (default None)
Returns
-------
samps : np.ndarray [shape=(samples, num_channels) or (samples)]
The `samples` with zero mean.
References
----------
Lyons, Richard. (2011). Understanding Digital Signal Processing (3rd Edition).
'''
samps = samples.copy()
if samp_win is not None:
subframes = math.ceil(len(samples) / samp_win)
for frame in range(subframes):
section = samps[frame * samp_win : frame * samp_win + samp_win]
ave = np.mean(section)
samps[frame * samp_win : frame * samp_win + samp_win] -= ave
else:
ave = np.mean(samps)
samps -= ave
return samps
[docs]def apply_num_channels(sound_data, num_channels):
'''Ensures `data` has indicated `num_channels`.
To increase number of channels, the first column will be duplicated. To limit
channels, channels will simply be removed.
Parameters
----------
sound_data : np.ndarray [size= (num_samples,) or (num_samples, num_channels)]
The data to adjust the number of channels
num_channels : int
The number of channels desired
Returns
-------
data : np.ndarray [size = (num_samples, num_channels)]
Examples
--------
>>> import numpy as np
>>> data = np.array([1, 1, 1, 1])
>>> data_3d = apply_num_channels(data, 3)
>>> data_3d
array([[1, 1, 1],
[1, 1, 1],
[1, 1, 1],
[1, 1, 1]])
>>> data_2d = apply_num_channels(data_3d, 2)
>>> data_2d
array([[1, 1],
[1, 1],
[1, 1],
[1, 1]])
'''
if len(sound_data.shape)== 1:
data = np.expand_dims(sound_data, axis=1)
else:
data = sound_data
diff = num_channels - data.shape[1]
if diff < 0:
# limit number of channels
data = data[:,:num_channels]
return data
elif diff == 0:
# no change necessary
return sound_data
# add channels
duplicated_data = np.expand_dims(data[:,0], axis=1)
for i in range(diff):
data = np.append(data, duplicated_data, axis=1)
return data
[docs]def apply_sample_length(data, target_len, mirror_sound = False, clip_at_zero = True):
'''Extends a sound by repeating it until its `target_len`.
If the `target_len` is shorter than the length of `data`, `data`
will be shortened to the specificed `target_len`
This is perhaps useful when working with repetitive or
stationary sounds.
Parameters
----------
data : np.ndarray [size = (num_samples,) or (num_samples, num_channels)]
The data to be checked or extended in length. If shape (num_channels, num_samples),
the data will be reshaped to (num_samples, num_channels).
target_len : int
The length of samples the input `data` should be.
Returns
-------
new_data : np.ndarray [size=(target_len, ) or (target_len, num_channels)]
Examples
--------
>>> import numpy as np
>>> data = np.array([1,2,3,4])
>>> sp.dsp.apply_sample_length(data, 12)
array([1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4])
>>> # two channels
>>> data = np.zeros((3,2))
>>> data[:,0] = np.array([0,1,2])
>>> data[:,1] = np.array([1,2,3])
>>> data
array([[0., 1.],
[1., 2.],
[2., 3.]])
>>> sp.dsp.apply_sample_length(data,5)
array([[0., 1.],
[1., 2.],
[2., 3.],
[0., 1.],
[1., 2.]])
'''
if len(data.shape) > 2:
raise ValueError('Too many dimensions.')
if len(data) > target_len:
new_data = data[:target_len]
return new_data
elif len(data) == target_len:
new_data = data
return new_data
else:
while len(data) < target_len:
if clip_at_zero:
data_clipped = sp.dsp.clip_at_zero(data)
# get rid of last zero
if len(data_clipped) < len(data):
data = data_clipped[:-1]
if mirror_sound:
data = np.concatenate((data, np.flip(data[:-1])))
else:
data = np.concatenate((data, data))
if len(data) >= target_len:
break
if len(data.shape) > 1:
# ensure stereo in correct format (num_samples, num_channels)
data = sp.dsp.shape_samps_channels(data)
num_channels = data.shape[1]
else:
num_channels = 0
if num_channels:
new_data = np.zeros((target_len, num_channels))
else:
new_data = np.zeros((target_len,))
row_id = 0
while row_id < len(new_data):
if row_id + len(data) > len(new_data):
diff = row_id + len(data) - len(new_data)
new_data[row_id:] += data[:-diff]
else:
new_data[row_id:row_id+len(data)] = data
row_id += len(data)
new_data = sp.utils.match_dtype(new_data, data)
return new_data
# TODO: raise error or only warning if original data cut off?
[docs]def zeropad_sound(data, target_len, sr, delay_sec=None):
'''If the sound data needs to be a certain length, zero pad it.
Parameters
----------
data : numpy.ndarray [size = (num_samples,) or (num_samples, num_channels)]
The sound data that needs zero padding. Shape (len(data),).
target_len : int
The number of samples the `data` should have
sr : int
The samplerate of the `data`
delay_sec : int, float, optional
If the data should be zero padded also at the beginning.
(default None)
Returns
-------
signal_zeropadded : numpy.ndarray [size = (target_len,) or (target_len, num_channels)]
The data zero padded.
Examples
--------
>>> import numpy as np
>>> x = np.array([1,2,3,4])
>>> # with 1 second delay (with sr of 4, that makes 4 sample delay)
>>> x_zeropadded = zeropad_sound(x, target_len=10, sr=4, delay_sec=1)
>>> x_zeropadded
array([0., 0., 0., 0., 1., 2., 3., 4., 0., 0.])
>>> # without delay
>>> x_zeropadded = zeropad_sound(x, target_len=10, sr=4)
>>> x_zeropadded
array([1., 2., 3., 4., 0., 0., 0., 0., 0., 0.])
>>> # if signal is longer than desired length:
>>> x_zeropadded = zeropad_sound(x, target_len=3, sr=4)
UserWarning: The signal cannot be zeropadded and will instead be truncated as length of `data` is 4 and `target_len` is 3.
len(data), target_len))
>>> x_zeropadded
array([1, 2, 3])
'''
# ensure data follows shape of (num_samples,) or (num_samples, num_channels)
data = sp.dsp.shape_samps_channels(data)
num_channels = get_num_channels(data)
if delay_sec is None:
delay_sec = 0
delay_samps = sr * delay_sec
if target_len < len(data) + delay_samps:
import warnings
# data must be truncated:
remove_samples = len(data) - len(data[:target_len-delay_samps])
if remove_samples >= len(data):
warnings.warn('All data will be lost and replaced with zeros with the '+\
'provided `target_len` and `delay_sec` settings. Data length is '+\
'{}, target_len is {}, and delay samples is {}.'.format(
len(data), target_len, delay_samps))
data = data[:target_len-delay_samps]
warnings.warn('The `target_len` is shorter than the `data` and `delay_sec`. '+\
'Therefore the data will be cut off by {} sample(s).'.format(remove_samples))
if len(data) < target_len:
diff = target_len - len(data)
signal_zeropadded = np.zeros((data.shape[0] + int(diff)))
if num_channels > 1:
signal_zeropadded = apply_num_channels(signal_zeropadded, num_channels)
assert signal_zeropadded.shape[1] == data.shape[1]
for i, row in enumerate(data):
signal_zeropadded[i+delay_samps] += row
else:
import warnings
warnings.warn('The signal cannot be zeropadded and will instead be truncated '+\
'as length of `data` is {} and `target_len` is {}.'.format(
len(data), target_len))
signal_zeropadded = data[:target_len]
return signal_zeropadded
[docs]def get_num_channels(data):
if len(data.shape) > 1 and data.shape[1] > 1:
num_channels = data.shape[1]
else:
num_channels = 1
return num_channels
# TODO clarify how length of output array is established
[docs]def combine_sounds(file1, file2, match2shortest=True, time_delay_sec=None,total_dur_sec=None):
'''Combines sounds
Parameters
----------
file1 : str
One of two files to be added together
file2 : str
Second of two files to be added together
match2shortest : bool
If the lengths of the addition should be limited by the shorter sound.
(defaul True)
time_delay_sec : int, float, optional
The amount of time in seconds before the sounds are added together.
The longer sound will play for this period of time before the shorter
sound is added to it. (default 1)
total_dur_sec : int, float, optional
The total duration in seconds of the combined sounds. (default 5)
Returns
-------
added_sound : numpy.ndarray
The sound samples of the two soundfiles added together
sr1 : int
The sample rate of the original signals and added sound
'''
data1, sr1 = sp.loadsound(file1)
data2, sr2 = sp.loadsound(file2)
if sr1 != sr2:
data2, sr2 = resample_audio(data2, sr2, sr1)
if time_delay_sec:
num_extra_samples = int(sr1*time_delay_sec)
else:
num_extra_samples = 0
if len(data1) > len(data2):
data_long = data1
data_short = data2
else:
data_long = data2
data_short = data1
dl_copy = data_long.copy()
ds_copy = data_short.copy()
if match2shortest:
data_short = zeropad_sound(data_short, len(ds_copy) + num_extra_samples, sr1, delay_sec= time_delay_sec)
data_long = data_long[:len(ds_copy)+num_extra_samples]
else:
data_short = zeropad_sound(data_short,len(dl_copy), sr1, delay_sec= time_delay_sec)
added_sound = data_long + data_short
if total_dur_sec:
added_sound = added_sound[:sr1*total_dur_sec]
return added_sound, sr1
[docs]def calc_frame_length(dur_frame_millisec, sr):
"""Calculates the number of samples necessary for each frame
Parameters
----------
dur_frame_millisec : int or float
time in milliseconds each frame should be
sr : int
sampling rate of the samples to be framed
Returns
-------
frame_length : int
the number of samples necessary to fill a frame
Examples
--------
>>> calc_frame_length(dur_frame_millisec=20, sr=1000)
20
>>> calc_frame_length(dur_frame_millisec=20, sr=48000)
960
>>> calc_frame_length(dur_frame_millisec=25.5, sr=22500)
573
"""
frame_length = int(dur_frame_millisec * sr // 1000)
return frame_length
[docs]def calc_num_overlap_samples(samples_per_frame, percent_overlap):
"""Calculate the number of samples that constitute the overlap of frames
Parameters
----------
samples_per_frame : int
the number of samples in each window / frame
percent_overlap : int, float
either an integer between 0 and 100 or a decimal between 0.0 and 1.0
indicating the amount of overlap of windows / frames
Returns
-------
num_overlap_samples : int
the number of samples in the overlap
Examples
--------
>>> calc_num_overlap_samples(samples_per_frame=100,percent_overlap=0.10)
10
>>> calc_num_overlap_samples(samples_per_frame=100,percent_overlap=10)
10
>>> calc_num_overlap_samples(samples_per_frame=960,percent_overlap=0.5)
480
>>> calc_num_overlap_samples(samples_per_frame=960,percent_overlap=75)
720
"""
if percent_overlap > 1:
percent_overlap *= 0.01
num_overlap_samples = int(samples_per_frame * percent_overlap)
return num_overlap_samples
[docs]def calc_num_subframes(tot_samples, frame_length, overlap_samples, zeropad=False):
"""Assigns total frames needed to process entire noise or target series
This function calculates the number of full frames that can be
created given the total number of samples, the number of samples in
each frame, and the number of overlapping samples.
Parameters
----------
tot_samples : int
total number of samples in the entire series
frame_length : int
total number of samples in each frame / processing window
overlap_samples : int
number of samples in overlap between frames
zeropad : bool, optional
If False, number of subframes limited to full frames. If True,
number of subframes extended to zeropad the last partial frame.
(default False)
Returns
-------
subframes : int
The number of subframes necessary to fully process the audio samples
at given `frame_length`, `overlap_samples`, and `zeropad`.
Examples
--------
>>> calc_num_subframes(30,10,5)
5
>>> calc_num_subframes(30,20,5)
3
"""
import math
if overlap_samples == 0:
if zeropad:
subframes = int(math.ceil(tot_samples/frame_length))
else:
subframes = int(tot_samples/frame_length)
return subframes
trim = frame_length - overlap_samples
totsamps_adjusted = tot_samples-trim
if zeropad:
subframes = int(math.ceil(totsamps_adjusted / overlap_samples))
else:
subframes = int(totsamps_adjusted / overlap_samples)
return subframes
[docs]def create_window(window_type, frame_length):
"""Creates window according to set window type and frame length
the Hamming window tapers edges to around 0.08 while the Hann window
tapers edges to 0.0. Both are commonly used in noise filtering.
Parameters
----------
window_type : str
type of window to be applied (default 'hamming')
Returns
-------
window : ndarray
a window fitted to the class attribute 'frame_length'
Examples
--------
>>> #create Hamming window
>>> hamm_win = create_window('hamming', frame_length=5)
>>> hamm_win
array([0.08, 0.54, 1. , 0.54, 0.08])
>>> #create Hann window
>>> hann_win = create_window('hann',frame_length=5)
>>> hann_win
array([0. , 0.5, 1. , 0.5, 0. ])
"""
if window_type.lower() == 'hamming':
window = hamming(frame_length)
elif 'hann' in window_type.lower():
window = hann(frame_length)
return window
[docs]def apply_window(samples, window, zeropad=False):
"""Applies predefined window to a section of samples. Mono or stereo sound checked.
The length of the samples must be the same length as the window.
Parameters
----------
samples : ndarray [shape=(num_samples,) or (num_samples, num_channels)]
series of samples with the length of input window
window : ndarray [shape=(num_samples,) or (num_samples, num_channels)]
window to be applied to the signal. If window does not match number of
channels of sample data, the missing channels will be applied to the window,
repeating the first channel.
Returns
-------
samples_win : ndarray
series with tapered sides according to the window provided
Examples
--------
>>> import numpy as np
>>> input_signal = np.array([ 0. , 0.36371897, -0.302721,
... -0.1117662 , 0.3957433 ])
>>> window_hamming = np.array([0.08, 0.54, 1. , 0.54, 0.08])
>>> apply_window(input_signal, window_hamming)
array([ 0. , 0.19640824, -0.302721 , -0.06035375, 0.03165946])
>>> window_hann = np.array([0. , 0.5, 1. , 0.5, 0. ])
>>> apply_window(input_signal, window_hann)
array([ 0. , 0.18185948, -0.302721 , -0.0558831 , 0. ])
"""
if len(samples.shape) == 1:
if len(window.shape) > 1:
window = window[:,0]
elif len(samples.shape) != len(window.shape) or samples.shape[1] != window.shape[1]:
window = sp.dsp.add_channels(window, samples.shape[1])
# in the off chance the window has more channels than samples:
if window.shape[1] > samples.shape[1]:
window = window[:,:samples.shape[1]]
if zeropad:
if samples.shape != window.shape:
temp_matrix = sp.dsp.create_empty_matrix(
window.shape)
temp_matrix[:len(samples)] = samples
samples = temp_matrix
samples_win = samples * window
return samples_win
[docs]def add_channels(samples, channels_total):
'''Copies columns of `samples` to create additional channels.
Parameters
----------
samples : np.ndarray [shape=(num_samples) or (num_samples,num_channels)]
The samples to add channels to.
channels_total : int
The total number of channels desired. For example, if `samples` already has
2 channels and you want it to have 3, set `channels_total` to 3.
Returns
-------
x : np.ndarray [shape = (num_samples, channels_total)]
A copy of `samples` with desired number of channels.
Examples
--------
>>> import numpy as np
>>> samps_mono = np.array([1,2,3,4,5])
>>> samps_stereo2 = add_channels(samps_mono, 2)
>>> samps_stereo2
array([[1, 1],
... [2, 2],
... [3, 3],
... [4, 4],
... [5, 5]])
>>> samps_stereo5 = add_channels(samps_stereo2, 5)
>>> samps_stereo5
array([[1, 1, 1, 1, 1],
... [2, 2, 2, 2, 2],
... [3, 3, 3, 3, 3],
... [4, 4, 4, 4, 4],
... [5, 5, 5, 5, 5]])
Warning
-------
If `channels_total` is less than or equal to the number of channels already presesnt
in `samples`. No channels added in those cases.
'''
y = samples.copy()
if len(y.shape) == 1:
for i in range(channels_total):
if i == 0:
x = np.vstack((y, y))
elif i < channels_total-1:
x = np.vstack((x, y))
else:
if y.shape[1] == channels_total:
import warnings
msg = '\n\nWarning: provided `samples` already has {} channels.'.format(channels_total)+\
'\nTo add 1 channel to samples shaped (5, 1), set `channels_total` to 2.' +\
'\nNo channels added.\n'
warnings.warn(msg)
return y
elif y.shape[1] > channels_total:
import warnings
msg = '\n\nWarning: function soundpy.dsp.add_channels adds channels. '+\
'`samples` currently has {} channels and provided`'.format(samples.shape[1]) +\
' `channels_total` is less than that: {}'.format(channels_total) +\
'\nTo add 1 channel to samples shaped (5, 2), set `channels_total` to 3.'+\
'\nNo channels added.\n'
warnings.warn(msg)
x = y.T
extra_channels = channels_total//samples.shape[1]
for i in range(extra_channels):
x = np.vstack((x, y.T))
x = x.T
if x.shape[1] > channels_total:
x = x[:,:channels_total]
return x
[docs]def average_channels(data):
'''Averages all channels in a stereo signal into one channel.
Parameters
----------
data : np.ndarray [size=(num_samples, num_channels)]
The stereo data to average out. If mono data supplied, mono data is returned unchanged.
Returns
-------
data averaged : np.ndarray [size=(num_samples)]
Copy of `data` averaged into one channel.
Examples
--------
>>> import numpy as np
>>> input_samples1 = np.array([1,2,3,4,5])
>>> input_samples2 = np.array([1,1,3,3,5])
>>> input_2channels = np.vstack((input_samples1, input_samples2)).T
>>> input_averaged = average_channels(input_2channels)
>>> input_averaged
array([1. , 1.5, 3. , 3.5, 5. ])
'''
# average out channels
if len(data.shape) > 1 and data.shape[1] > 1:
data_summed = data[:,0].copy()
for channel in range(data.shape[1]):
if channel == 0:
pass
else:
data_summed += data[:,channel]
return data_summed / data.shape[1]
else:
return data
[docs]def calc_fft(signal_section, real_signal=None, fft_bins = None, **kwargs):
"""Calculates the fast Fourier transform of a time series. Should work with stereo signals.
The length of the signal_section determines the number of frequency
bins analyzed if `fft_bins` not set. Therefore, if there are higher frequencies in the
signal, the length of the `signal_section` should be long enough to
accommodate those frequencies.
The frequency bins with energy levels at around zero denote frequencies
not prevelant in the signal;the frequency bins with prevalent energy
levels relate to frequencies as well as their amplitudes that are in
the signal.
Parameters
----------
signal_section : ndarray [shape = (num_samples) or (num_samples, num_channels)]
the series that the fft will be applied to. If stereo sound, will return a FFT
for each channel.
real_signal : bool
If True, only half of the fft will be returned (the fft is mirrored). Otherwise the
full fft will be returned.
kwargs : additional keyword arguments
keyword arguments for numpy.fft.fft or nump.fft.rfft
Returns
-------
fft_vals : ndarray [shape=(num_fft_bins), or (num_fft_bins, num_channels), dtype=np.complex_]
the series transformed into the frequency domain with the same
shape as the input series
"""
if sp.dsp.ismono(signal_section):
if real_signal:
fft_vals = rfft(signal_section, n = fft_bins, **kwargs)
else:
fft_vals = fft(signal_section, n = fft_bins, **kwargs)
else:
for channel in range(signal_section.shape[1]):
if channel == 0:
if real_signal:
fft_vals = rfft(signal_section[:,channel], n = fft_bins, **kwargs)
else:
fft_vals = fft(signal_section[:,channel], n = fft_bins, **kwargs)
x = fft_vals
else:
if real_signal:
fft_vals = rfft(signal_section[:,channel], n = fft_bins, **kwargs)
else:
fft_vals = fft(signal_section[:,channel], n = fft_bins, **kwargs)
x = np.stack((x, fft_vals), axis=-1)
fft_vals = x
return fft_vals
[docs]def ismono(data):
# ensure channels last
data = sp.dsp.shape_samps_channels(data)
if len(data.shape) > 1 and data.shape[1] > 1:
return False
else:
return True
# TODO: https://github.com/biopython/biopython/issues/1496
# Fix numpy array repr for Doctest.
[docs]def calc_power(fft_vals):
'''Calculates the power of fft values
Parameters
----------
fft_vals : ndarray (complex or floats)
the fft values of a windowed section of a series
Returns
-------
power_spec : ndarray
the squared absolute value of the input fft values
Example
-------
>>> import numpy as np
>>> matrix = np.array([[1,1,1],[2j,2j,2j],[-3,-3,-3]],
... dtype=np.complex_)
>>> calc_power(matrix)
array([[0.33333333, 0.33333333, 0.33333333],
[1.33333333, 1.33333333, 1.33333333],
[3. , 3. , 3. ]])
'''
power_spec = np.abs(fft_vals)**2 / len(fft_vals)
return power_spec
[docs]def calc_average_power(matrix, num_iters):
'''Divides matrix values by the number of times power values were added.
This function assumes the power values of n-number of series were
calculated and added. It divides the values in the input matrix by n,
i.e. 'num_iters'.
Parameters
----------
matrix : ndarray
a collection of floats or ints representing the sum of power
values across several series sets
num_iters : int
an integer denoting the number of times power values were added to
the input matrix
Returns
-------
matrix : ndarray
the averaged input matrix
Examples
--------
>>> matrix = np.array([[6,6,6],[3,3,3],[1,1,1]])
>>> ave_matrix = calc_average_power(matrix, 3)
>>> ave_matrix
array([[2. , 2. , 2. ],
[1. , 1. , 1. ],
[0.33333333, 0.33333333, 0.33333333]])
'''
if matrix.dtype == 'int64':
matrix = matrix.astype('float')
for i in range(len(matrix)):
matrix[i] /= num_iters
return matrix
[docs]def calc_phase(fft_matrix, radians=False):
'''Calculates phase from complex fft values.
Parameters
----------
fft_vals : np.ndarray [shape=(num_frames, num_features), dtype=complex]
matrix with fft values
radians : boolean
False and complex values are returned. True and radians are returned.
(Default False)
Returns
-------
phase : np.ndarray [shape=(num_frames, num_features)]
Phase values for fft_vals. If radians is set to False, dtype = complex.
If radians is set to True, dtype = float.
Examples
--------
>>> import numpy as np
>>> frame_length = 10
>>> time = np.arange(0, 10, 0.1)
>>> signal = np.sin(time)[:frame_length]
>>> fft_vals = np.fft.fft(signal)
>>> phase = calc_phase(fft_vals, radians=False)
>>> phase[:2]
array([ 1. +0.j , -0.37872566+0.92550898j])
>>> phase = calc_phase(fft_vals, radians=True)
>>> phase[:2]
array([0. , 1.95921533])
'''
if not radians:
if len(fft_matrix.shape) > 1 and fft_matrix.shape[1] > 1:
# soundpy works with (num_frames, num_features)
# librosa works with (num_features, num_frames)
fft_matrix = fft_matrix.T
__, phase = librosa.magphase(fft_matrix)
if len(phase.shape) > 1 and phase.shape[1] > 1:
# transpose back to (num_frames, num_features)
phase = phase.T
else:
# in radians
#if normalization:
#phase = np.angle(fft_matrix) / (frame_length * norm_win)
#else:
phase = np.angle(fft_matrix)
return phase
[docs]def reconstruct_whole_spectrum(band_reduced_noise_matrix, n_fft=None):
'''Reconstruct whole spectrum by mirroring complex conjugate of data.
Parameters
----------
band_reduced_noise_matrix : np.ndarray [size=(n_fft,), dtype=np.float or np.complex_]
Matrix with either power or fft values of the left part of the fft. The whole
fft can be provided; however the right values will be overwritten by a mirrored
left side.
n_fft : int, optional
If None, `n_fft` set to length of `band_reduced_noise_matrix`. `n_fft` defines
the size of the mirrored vector.
Returns
-------
output_matrix : np.ndarray [size = (n_fft,), dtype=np.float or np.complex_]
Mirrored vector of input data.
Examples
--------
>>> x = np.array([3.,2.,1.,0.])
>>> # double the size of x
>>> x_rec = sp.dsp.reconstruct_whole_spectrum(x, n_fft=int(len(x)*2))
>>> x_rec
array([3., 2., 1., 0., 0., 1., 2., 3.])
>>> # overwrite right side of data
>>> x = np.array([3.,2.,1.,0.,0.,2.,3.,5.])
>>> x_rec = sp.dsp.reconstruct_whole_spectrum(x, n_fft=len(x))
>>> x_rec
array([3., 2., 1., 0., 0., 1., 2., 3.])
'''
# expects 1d data
if len(band_reduced_noise_matrix.shape) > 1:
band_reduced_noise_matrix = band_reduced_noise_matrix.reshape((
band_reduced_noise_matrix.shape[0]))
if n_fft is None:
n_fft = len(band_reduced_noise_matrix)
if np.issubdtype(band_reduced_noise_matrix.dtype, np.complexfloating):
complex_vals = True
else:
complex_vals = False
total_rows = n_fft
output_matrix = create_empty_matrix((total_rows,), complex_vals=complex_vals)
if band_reduced_noise_matrix.shape[0] < n_fft:
temp_matrix = create_empty_matrix((total_rows,), complex_vals=complex_vals)
temp_matrix[:len(band_reduced_noise_matrix)] += band_reduced_noise_matrix
band_reduced_noise_matrix = temp_matrix
# flip up-down
flipped_matrix = np.flip(band_reduced_noise_matrix, axis=0)
output_matrix[0:n_fft//2+1,] += band_reduced_noise_matrix[0:n_fft//2+1]
output_matrix[n_fft//2+1:,] += flipped_matrix[n_fft//2+1:]
assert output_matrix.shape == (n_fft,)
return output_matrix
# TODO: test with 2d+ dimensions
[docs]def apply_original_phase(spectrum, phase):
'''Multiplies phase to power spectrum
Parameters
----------
spectrum : np.ndarray [shape=(n,), dtype=np.float or np.complex]
Magnitude or power spectrum
phase : np.ndarray [shape=(n,), dtype=np.float or np.complex]
Phase to be applied to spectrum
Returns
-------
spectrum_complex : np.ndarray [shape=(n,), dtype = np.complex]
'''
# ensure 1d dimensions
if len(spectrum.shape) > 1:
spectrum = spectrum.reshape((
spectrum.shape[0],))
if len(phase.shape) > 1:
phase = phase.reshape((
phase.shape[0],))
assert spectrum.shape == phase.shape
# Whether or not phase is represented in radians or a spectrum.
if isinstance(phase[0], np.complex):
radians = False
else:
radians = True
if not radians:
spectrum_complex = spectrum * phase
else:
import cmath
phase_prepped = (1/2) * np.cos(phase) + cmath.sqrt(-1) * np.sin(phase)
spectrum_complex = spectrum**(1/2) * phase_prepped
return spectrum_complex
[docs]def calc_posteri_snr(target_power_spec, noise_power_spec):
"""Calculates and signal to noise ratio of current frame
Parameters
----------
target_power_spec : ndarray
matrix of shape with power values of target
signal
noise_power_spec : ndarray
matrix of shape with power values of noise
signal
Returns
-------
posteri_snr : ndarray
matrix containing the signal to noise ratio
Examples
--------
>>> sig_power = np.array([6,6,6,6])
>>> noise_power = np.array([2,2,2,2])
>>> calc_posteri_snr(sig_power, noise_power)
array([3., 3., 3., 3.])
"""
posteri_snr = np.zeros(target_power_spec.shape)
for i in range(len(target_power_spec)):
posteri_snr[i] += target_power_spec[i] / noise_power_spec[i]
return posteri_snr
[docs]def get_max_index(matrix):
'''If not np.ndarray, expects real sample data.
'''
max_val = 0
for i, value in enumerate(matrix):
if isinstance(value, np.ndarray):
if sum(value) > max_val:
max_val = sum(value)
max_index = i
else:
if np.abs(value) > max_val:
max_val = np.abs(value)
max_index = i
return max_index
[docs]def get_local_target_high_power(target_samples, sr, local_size_ms=25, min_power_percent=0.25):
# get the size of power spectrum of length local_size_ms
if local_size_ms is None:
local_size_ms = 25
if min_power_percent is None:
min_power_percent = 0.25
target_power_size = sp.feats.get_feats(target_samples, sr=sr,
feature_type='powspec',
dur_sec = local_size_ms/1000)
target_power = sp.feats.get_feats(target_samples, sr=sr,
feature_type='powspec')
target_high_power = sp.dsp.create_empty_matrix(target_power_size.shape,
complex_vals=False)
max_index = sp.dsp.get_max_index(target_power)
max_power = sum(target_power[max_index])
min_power = max_power * min_power_percent
index=0
for row in target_power:
# only get power values for `local_size_ms`
if index == len(target_high_power):
break
if sum(row) >= min_power:
target_high_power[index] = row
index += 1
return target_high_power
[docs]def get_vad_snr(target_samples, noise_samples, sr, extend_window_ms = 0):
'''Approximates the signal to noise ratio of two sets of power spectrums
Note: this is a simple implementation and should not be used for
official/exact measurement of snr.
Parameters
----------
target_samples : np.ndarray [size = (num_samples, )]
The samples of the main / speech signal. Only frames with
higher levels of energy will be used to calculate SNR.
noise_samples : np.ndarray [size = (num_samples, )]
The samples of background noise. Expects only noise, no speech.
Must be the same sample rate as the target_samples
sr : int
The sample rate for the audio samples.
local_size_ms : int or float
The length in milliseconds to calculate level of SNR.
(default 25)
min_power_percent : float
The minimum percentage of energy / power the target samples
should have. This is to look at only sections with speech or
other signal of interest and not periods of silence.
Value should be between 0 and 1. (default 0.25)
References
----------
http://www1.icsi.berkeley.edu/Speech/faq/speechSNR.html
Gomolka, Ryszard. (2017). Re: How to measure signal-to-noise ratio (SNR) in real time?. Retrieved from: https://www.researchgate.net/post/How_to_measure_signal-to-noise_ratio_SNR_in_real_time/586a880f217e2060b65a8853/citation/download.
https://www.who.int/occupational_health/publications/noise1.pdf
'''
# get target power with only high energy values (vad)
vad_stft, __ = sp.feats.get_vad_stft(target_samples,
sr=sr, extend_window_ms = extend_window_ms)
if not vad_stft.any():
import warnings
msg = '\nNo voice activity found in target signal.'
vad_stft = sp.feats.get_stft(target_samples,
sr=sr)
target_power = np.abs(vad_stft)**2
noise_stft = sp.feats.get_stft(noise_samples, sr=sr)
noise_power = np.abs(noise_stft)**2
snr = 10 * np.log10(np.mean(target_power)/ (np.mean(noise_power) + 1e-6))
snr = np.mean(snr)
return snr
# Not having success with this
[docs]def snr_adjustnoiselevel(target_samples, noise_samples, sr, snr):
'''Computes scale factor to adjust noise samples to achieve snr.
From script addnoise_asl_nseg.m:
This function adds noise to a file at a specified SNR level. It uses
the active speech level to compute the speech energy. The
active speech level is computed as per ITU-T P.56 standard.
soundpy Note: this functionality was pulled from the MATLAB script: addnoise_asl_nseg.m at this GitHub repo:
https://github.com/SIP-Lab/CNN-VAD/blob/master/Training%20Code/Functions/addnoise_asl_nseg.m
I do not understand all that went on to calculate the scale
factor and therefore do not explain anything futher than
the original script.
Parameters
----------
target_samples : np.ndarray [size = (num_samples,)]
The audio samples of the target / clean signal.
noise_samples : np.ndarray [size = (num_samples,)]
The audio samples of the noise signal.
sr : int
The sample rate of both `target_samples` and `noise_samples`
snr : int
The desired signal-to-noise ratio of the target and noise
audio signals.
Returns
-------
scale_factor : int, float
The factor to which noise samples should be multiplied
before being added to target samples to achieve SNR.
References
----------
Yi Hu and Philipos C. Loizou : original authors
Copyright (c) 2006 by Philipos C. Loizou
SIP-Lab/CNN-VAD/ : GitHub Repo
Copyright (c) 2019 Signal and Image Processing Lab
MIT License
ITU-T (1993). Objective measurement of active speech level. ITU-T
Recommendation P. 56
See Also
--------
soundpy.dsp.asl_P56
'''
#% Px is the active speech level ms energy, asl is the active factor, and c0
#% is the active speech level threshold.
target_px, asl, c0 = sp.dsp.asl_P56(target_samples, sr)
x = target_samples
x_len = len(x)
# apply IRS? to noise segment?
# TODO: randomize section of noise
#noise_samples = noise_samples[:len(speech_samps)]
noise_px = noise_samples.T @ noise_samples / len(target_samples)
print(noise_px)
#% we need to scale the noise segment samples to obtain the desired snr= 10*
#% log10( Px/ (sf^2 * Pn))
# Just used this within soundpy.dsp.add_backgroundsound
scale_factor = (np.sqrt(target_px/noise_px / (10**(snr/10))))
return scale_factor
[docs]def asl_P56(samples, sr, bitdepth=16, smooth_factor=0.03, hangover=0.2, margin_db=15.9):
'''Computes the active speech level according to ITU-T P.56 standard.
Note: I don't personally understand the functionality behind
this function and therefore do not offer the best documentation as
of yet.
Parameters
----------
samples : np.ndarray [size = (num_samples, )]
The audio samples, for example speech samples.
sr : int
The sample rate of `samples`.
bitdepth : int
The bitdepth of audio. Expects 16. (default 16)
smooth_factor : float
Time smoothing factor. (default 0.03)
hangover : float
Hangover. Thank goodness not the kind I'm familiar with.
(default 0.2)
margin_db : int, float
Margin decibels... (default 15.9)
Returns
-------
asl_ms : float
The active speech level ms energy
asl : float
The active factor
c0 : float
Active speech level threshold
References
----------
ITU-T (1993). Objective measurement of active speech level. ITU-T
Recommendation P. 56
TODO handle bitdepth variation - what if not 16?
TODO improve documentation
'''
thresh_nu = bitdepth -1 #number of thresholds
I = math.ceil(sr*hangover) # hangover in samples.. is this percent_overlap?
# 8820
g = np.exp( -1 /( sr * smooth_factor)) # smoothing factor in envelop detection
# 0.99924
###c( 1: thres_no)= 2.^ (-15: thres_no- 16);
###% vector with thresholds from one quantizing level up to half the maximum
###% code, at a step of 2, in the case of 16bit samples, from 2^-15 to 0.5;
###a( 1: thres_no)= 0; % activity counter for each level threshold
###hang( 1: thres_no)= I; % hangover counter for each level threshold
#% vector with thresholds from one quantizing level up to half the maximum
thresholds = np.zeros((len(np.arange(-15,thresh_nu-16),)))
for i, item in enumerate(np.arange(-15,thresh_nu-16)):
thresholds[i] = 2.**item
thresh_nu = len(thresholds)
#% activity counter for each level threshold
activity_counter = np.zeros((len(thresholds),))
# % hangover counter for each level threshold
hangover_matrix = np.zeros((len(thresholds,)))
hangover_matrix[:] = I
#% long-term level square energy of x
square_energy = samples.T @ samples
# 154.55
x_len = len(samples)
# use a 2nd order IIR filter to detect the envelope q
x_abs = np.abs(samples)
p, q = iirfilter(2,Wn=[1-g, g])
iir_2ndorder = lfilter(p, q, x_abs)
for index in range(x_len):
for j in range(thresh_nu):
if iir_2ndorder[index] < thresholds[j]:
activity_counter[j] += 1
hangover_matrix[j] += 1
else:
break
asl = 0
asl_rsm = 0
eps = 2**-52
# https://www.researchgate.net/post/What_does_eps_in_MATLAB_mean_What_is_the_value_of_it
if activity_counter[0]==0:
return None #??
#pass #
#AdB1 = None #?
#else:
AdB1 = 10 * np.log10(square_energy/activity_counter[0] + eps)
# if activity_counter[0] = 1 --> 21.89073220006766
CdB1 = 20 * np.log10( thresholds[0] + eps)
if AdB1 - CdB1 < margin_db:
return None#????
AdB = np.zeros((thresh_nu,))
CdB = np.zeros((thresh_nu,))
Delta = np.zeros((thresh_nu,))
AdB[0] = AdB1
CdB[0] = CdB1
Delta[0] = AdB1 - CdB1
for j in np.arange(0, thresh_nu, 2):
AdB[j] = 10 * np.log10( square_energy / (activity_counter[j] + eps) + eps)
CdB[j] = 20 * np.log10( thresholds[j] + eps)
for j in np.arange(0, thresh_nu, 2):
if activity_counter[j] != 0:
Delta[j] = AdB[j] - CdB[j]
if Delta[j] <= margin_db:
# % interpolate to find the asl
asl_ms_log, cl0 = bin_interp(
AdB[j], AdB[j-1], CdB[j], CdB[j-1], margin_db, 0.5)
asl_ms = 10**(asl_ms_log/10)
asl = (square_energy/x_len)/asl_ms
c0 = 10**( cl0/20)
return asl_ms, asl, c0
[docs]def bin_interp(upcount, lwcount, upthr, lwthr, Margin, tol):
if tol < 0:
tol = -tol
# % Check if extreme counts are not already the true active value
iteration = 0
if np.abs(upcount - upthr - Margin) < tol:
asl_ms_log = upcount
cc = upthr
return asl_ms_log, cc
if np.abs(lwcount - lwthr - Margin) < tol:
asl_ms_log = lwcount
cc = lwthr
return asl_ms_log, cc
# % Initialize first middle for given (initial) bounds
midcount = (upcount + lwcount) / 2.0
midthr = (upthr + lwthr) / 2.0
# % Repeats loop until `diff' falls inside the tolerance (-tol<=diff<=tol)
while True:
diff = midcount - midthr - Margin
if np.abs(diff) <= tol:
break
#% if tolerance is not met up to 20 iteractions, then relax the
#% tolerance by 10%
iteration += 1
if iteration > 20:
tol = tol * 1.1
if diff > tol: # then the new bounds are:
# upper and middle activities
midcount = (upcount + midcount) / 2.0
# and thresholds
midthr = (upthr + midthr) / 2.0
elif (diff < -tol): # then the new bounds are:
# middle and lower activities
midcount = (midcount + lwcount) / 2.0
# and thresholds
midthr = (midthr + lwthr) / 2.0
#% Since the tolerance has been satisfied, midcount is selected
#% as the interpolated value with a tol [dB] tolerance.
asl_ms_log = midcount
cc = midthr
return asl_ms_log, cc
[docs]def calc_posteri_prime(posteri_snr):
"""Calculates the posteri prime
Parameters
----------
posteri_snr : ndarray
The signal-to-noise ratio of the noisey signal, frame by frame.
Returns
-------
posteri_prime : ndarray
The primed posteri_snr, calculated according to the reference paper.
References
----------
Scalart, P. and Filho, J. (1996). Speech enhancement based on a priori
signal to noise estimation. Proc. IEEE Int. Conf. Acoust., Speech, Signal
Processing, 629-632.
"""
posteri_prime = posteri_snr - 1
posteri_prime[posteri_prime < 0] = 0
return posteri_prime
[docs]def calc_prior_snr(snr, snr_prime, smooth_factor=0.98, first_iter=None, gain=None):
"""Estimates the signal-to-noise ratio of the previous frame
Depending on the `first_iter` argument, the prior snr is calculated
according to different algorithms. If `first_iter` is None, prior snr is
calculated according to Scalart and Filho (1996); if `first_iter`
is True or False, snr prior is calculated according to Loizou (2013).
Parameters
----------
snr : ndarray
The sound-to-noise ratio of target vs noise power/energy levels.
snr_prime : ndarray
The prime of the snr (see Scalart & Filho (1996))
smooth_factor : float
The value applied to smooth the signal. (default 0.98)
first_iter : None, True, False
If None, snr prior values are estimated the same, no matter if it is
the first iteration or not (Scalart & Filho (1996))
If True, snr prior values are estimated without gain (Loizou 2013)
If False, snr prior values are enstimed with gain (Loizou 2013)
(default None)
gain : None, ndarray
If None, gain will not be used. If gain, it is a previously calculated
value from the previous frame. (default None)
Returns
-------
prior_snr : ndarray
Estimation of signal-to-noise ratio of the previous frame of target signal.
References
----------
C Loizou, P. (2013). Speech Enhancement: Theory and Practice.
Scalart, P. and Filho, J. (1996). Speech enhancement based on a
priori signal to noise estimation. Proc. IEEE Int. Conf. Acoust.,
Speech, Signal Processing, 629-632.
"""
if first_iter is None:
# calculate according to apriori SNR equation (6) in paper
# Scalart, P. and Filho, J. (1996)
first_arg = (1 - smooth_factor) * snr_prime
second_arg = smooth_factor * snr
prior_snr = first_arg + second_arg
elif first_iter is True:
# calculate according to Loizou (2013)
# don't yet have previous gain or snr values to apply
first_arg = smooth_factor
second_arg = (1-smooth_factor) * snr_prime
prior_snr = first_arg + second_arg
elif first_iter is False:
# now have previous gain and snr values
first_arg = smooth_factor * (gain**2) * snr
second_arg = (1 - smooth_factor) * snr_prime
prior_snr = first_arg + second_arg
return prior_snr
[docs]def calc_gain(prior_snr):
'''Calculates the gain (i.e. attenuation) values to reduce noise.
Parameters
----------
prior_snr : ndarray
The prior signal-to-noise ratio estimation
Returns
-------
gain : ndarray
An array of attenuation values to be applied to the signal (stft) array
at the current frame.
References
----------
C Loizou, P. (2013). Speech Enhancement: Theory and Practice.
Scalart, P. and Filho, J. (1996). Speech enhancement based on a
priori signal to noise estimation. Proc. IEEE Int. Conf. Acoust.,
Speech, Signal Processing, 629-632.
'''
gain = np.sqrt(prior_snr/(1+prior_snr))
return gain
[docs]def apply_gain_fft(fft_vals, gain):
'''Reduces noise by applying gain values to the stft / fft array of the
target signal
Parameters
----------
fft_vals : ndarray(complex)
Matrix containing complex values (i.e. stft values) of target signal
gain : ndarray(real)
Matrix containing calculated attenuation values to apply to 'fft_vals'
Returns
-------
enhanced_fft : ndarray(complex)
Matrix with attenuated noise in target (stft) values
'''
enhanced_fft = fft_vals * gain
assert enhanced_fft.shape == fft_vals.shape
return enhanced_fft
[docs]def postfilter(original_powerspec, noisereduced_powerspec, gain,
threshold=0.4, scale=10):
'''Apply filter that reduces musical noise resulting from other filter.
If it is estimated that speech (or target signal) is present, reduced
filtering is applied.
References
----------
T. Esch and P. Vary, "Efficient musical noise suppression for speech enhancement
system," Proceedings of IEEE International Conference on Acoustics, Speech and
Signal Processing, Taipei, 2009.
'''
power_ratio_current_frame = sp.dsp.calc_power_ratio(
original_powerspec,
noisereduced_powerspec)
# is there speech? If so, SNR decision = 1
if power_ratio_current_frame < threshold:
SNR_decision = power_ratio_current_frame
else:
SNR_decision = 1
noise_frame_len = sp.dsp.calc_noise_frame_len(SNR_decision, threshold, scale)
# apply window
postfilter_coeffs = sp.dsp.calc_linear_impulse(
noise_frame_len,
num_freq_bins = original_powerspec.shape[0])
gain_postfilter = np.convolve(gain, postfilter_coeffs, mode='valid')
return gain_postfilter
[docs]def calc_ifft(signal_section, real_signal=None, norm=False):
"""Calculates the inverse fft of a series of fft values
The real values of the ifft can be used to be saved as an audiofile
Parameters
----------
signal_section : ndarray [shape=(num_freq_bins,)
The frame of fft values to apply the inverse fft to
num_fft : int, optional
The number of total fft values applied when calculating the original fft.
If not given, length of `signal_section` is used.
norm : bool
Whether or not the ifft should apply 'ortho' normalization
(default False)
Returns
-------
ifft_vals : ndarray(complex)
The inverse Fourier transform of filtered audio data
"""
if norm:
norm = 'ortho'
else:
norm = None
if real_signal:
ifft_vals = irfft(signal_section, norm=norm)
else:
ifft_vals = ifft(signal_section, norm=norm)
return ifft_vals
[docs]def control_volume(samples, max_limit):
"""Keeps max volume of samples to within a specified range.
Parameters
----------
samples : ndarray
series of audio samples
max_limit: float
maximum boundary of the maximum value of the audio samples
Returns
-------
samples : np.ndarray
samples with volume adjusted (if need be).
Examples
--------
>>> import numpy as np
>>> #low volume example: increase volume to desired window
>>> x = np.array([-0.03, 0.04, -0.05, 0.02])
>>> x = control_volume(x, max_limit=0.25)
>>> x
array([-0.13888889, 0.25 , -0.25 , 0.13888889])
>>> #high volume example: decrease volume to desired window
>>> y = np.array([-0.3, 0.4, -0.5, 0.2])
>>> y = control_volume(y, max_limit=0.15)
>>> y
array([-0.08333333, 0.15 , -0.15 , 0.08333333])
"""
if max(samples) != max_limit:
samples = np.interp(samples,
(samples.min(), samples.max()),
(-max_limit, max_limit))
return samples
######### Functions related to postfilter###############
[docs]def calc_power_ratio(original_powerspec, noisereduced_powerspec):
'''Calc. the ratio of original vs noise reduced power spectrum.
'''
power_ratio = sum(noisereduced_powerspec) / \
sum(original_powerspec)/len(noisereduced_powerspec)
return power_ratio
[docs]def calc_noise_frame_len(SNR_decision, threshold, scale):
'''Calc. window length for calculating moving average.
Note: lower SNRs require larger window.
'''
if SNR_decision < 1:
soft_decision = 1 - (SNR_decision/threshold)
soft_decision_scaled = round((soft_decision) * scale)
noise_frame_len = 2 * soft_decision_scaled + 1
else:
noise_frame_len = SNR_decision
return noise_frame_len
[docs]def calc_linear_impulse(noise_frame_len, num_freq_bins):
'''Calc. the post filter coefficients to be applied to gain values.
'''
linear_filter_impulse = np.zeros((num_freq_bins,))
for i in range(num_freq_bins):
if i < noise_frame_len:
linear_filter_impulse[i] = 1 / noise_frame_len
else:
linear_filter_impulse[i] = 0
return linear_filter_impulse
[docs]def adjust_volume(samples, vol_range):
samps = samples.copy()
adjusted_volume = np.interp(samps,
(samps.min(), samps.max()),
(-vol_range, vol_range))
return adjusted_volume
[docs]def spread_volumes(samples, vol_list = [0.1,0.3,0.5]):
'''Returns samples with a range of volumes.
This may be useful in applying to training data (transforming data).
Parameters
----------
samples : ndarray
Series belonging to acoustic signal.
vol_list : list
List of floats or ints representing the volumes the samples
are to be oriented towards. (default [0.1,0.3,0.5])
Returns
-------
volrange_dict : tuple
Tuple of `volrange_dict` values containing `samples` at various vols.
'''
if samples is None or len(samples) == 0:
raise ValueError('No Audio sample data recognized.')
max_vol = max(samples)
if round(max_vol) > 1:
raise ValueError('Audio data not normalized.')
volrange_dict = {}
for i, vol in enumerate(vol_list):
volrange_dict[i] = adjust_volume(samples, vol)
return tuple(volrange_dict.values())
[docs]def create_empty_matrix(shape, complex_vals=False):
'''Allows creation of a matrix filled with real or complex zeros.
In digital signal processing, complex numbers are common; it is
important to note that if complex_vals=False and complex values are
inserted into the matrix, the imaginary part will be removed.
Parameters
----------
shape : tuple or int
tuple or int indicating the shape or length of desired matrix or
vector, respectively
complex_vals : bool
indicator of whether or not the matrix will receive real or complex
values (default False)
Returns
----------
matrix : ndarray
a matrix filled with real or complex zeros
Examples
----------
>>> matrix = create_empty_matrix((3,4))
>>> matrix
array([[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 0., 0.]])
>>> matrix_complex = create_empty_matrix((3,4),complex_vals=True)
>>> matrix_complex
array([[0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j],
[0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j],
[0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j]])
>>> vector = create_empty_matrix(5,)
>>> vector
array([0., 0., 0., 0., 0.])
'''
if complex_vals:
matrix = np.zeros(shape, dtype=np.complex_)
else:
matrix = np.zeros(shape, dtype=float)
return matrix
# TODO test this in applications (currently not implemented)
[docs]def overlap_add(enhanced_matrix, frame_length, overlap, complex_vals=False):
'''Overlaps and adds windowed sections together to form 1D signal.
Parameters
----------
enhanced_matrix : np.ndarray [shape=(frame_length, num_frames), dtype=float]
Matrix with enhance values
frame_length : int
Number of samples per frame
overlap : int
Number of samples that overlap
Returns
-------
new_signal : np.ndarray [shape=(frame_length,), dtype=float]
Length equals (frame_length - overlap) * enhanced_matrix.shape[1] + overlap
Examples
--------
>>> import numpy as np
>>> enhanced_matrix = np.ones((4, 4))
>>> frame_length = 4
>>> overlap = 1
>>> sig = overlap_add(enhanced_matrix, frame_length, overlap)
>>> sig
array([1., 1., 1., 2., 1., 1., 2., 1., 1., 2., 1., 1., 1.])
'''
try:
assert enhanced_matrix.shape[0] == frame_length
except AssertionError as e:
raise TypeError('The first dimension of the enhance matrix should '+ \
'match the frame length. {} does not match frame length {}'.format(
enhanced_matrix.shape[0], frame_length))
if np.issubdtype(enhanced_matrix.dtype, np.complexfloating):
complex_vals = True
else:
complex_vals = False
increments = frame_length - overlap
start= increments
mid= start + overlap
stop= start + frame_length
expected_len = increments * enhanced_matrix.shape[1] + overlap
new_signal = create_empty_matrix(
(expected_len,),
complex_vals=complex_vals)
for i in range(enhanced_matrix.shape[1]):
if i == 0:
new_signal[:frame_length] += enhanced_matrix[:frame_length,i]
else:
new_signal[start:mid] += enhanced_matrix[:overlap,i]
new_signal[mid:stop] += enhanced_matrix[overlap:frame_length,i]
start += increments
mid = start+overlap
stop = start+frame_length
return new_signal
[docs]def random_selection_samples(samples, len_section_samps, wrap=False, random_seed=None, axis=0):
'''Selects a section of samples, starting at random.
Parameters
----------
samples : np.ndarray [shape = (num_samples, )]
The array of sample data
len_section_samps : int
How many samples should be randomly selected
wrap : bool
If False, the selected noise will not be wrapped from end to beginning;
if True, the random selected may take sound sample that is wrapped
from the end to the beginning. See examples below. (default False)
random_seed : int, optional
If replicated randomization desired. (default None)
Examples
--------
>>> import numpy as np
>>> # no wrap:
>>> x = np.array([1,2,3,4,5,6,7,8,9,10])
>>> n = sp.dsp.random_selection_samples(x, len_section_samps = 7,
... wrap = False, random_seed = 40)
>>> n
array([3, 4, 5, 6, 7, 8, 9])
>>> # with wrap:
>>> n = sp.dsp.random_selection_samples(x, len_section_samps = 7,
... wrap = True, random_seed = 40)
>>> n
array([ 7, 8, 9, 10, 1, 2, 3])
'''
if not isinstance(samples, np.ndarray):
samples = np.array(samples)
if random_seed is not None and random_seed is not False:
np.random.seed(random_seed)
if wrap:
start_index = np.random.choice(range(len(samples)))
if start_index + len_section_samps > len(samples):
left_over = start_index + len_section_samps - len(samples)
start = samples[start_index:]
end = samples[:left_over]
total_section = np.concatenate((start,end),axis=axis)
return total_section
else:
total_section = samples[start_index:start_index+len_section_samps]
return total_section
else:
max_index_start = len(samples) - len_section_samps + 1
if max_index_start > 0:
start_index = np.random.choice(range(max_index_start))
else:
start_index = 0
total_section = samples[start_index:start_index+len_section_samps]
return total_section
# TODO: remove? function get_mean_freq is much better
[docs]def get_pitch(sound, sr=16000, win_size_ms = 50, percent_overlap = 0.5,
real_signal = False, fft_bins = 1024,
window = 'hann', **kwargs):
'''Approximates pitch by collecting dominant frequencies of signal.
'''
import warnings
warnings.warn('\n\nWarning: `soundpy.dsp.get_pitch` is experimental at best.'+\
' \nPerhaps try `soundpy.dsp.get_mean_freq`, which is still experimental '+\
'but a bit more reliable.\n\n')
if isinstance(sound, np.ndarray):
data = sound
else:
data, sr2 = sp.loadsound(sound, sr=sr)
assert sr2 == sr
frame_length = sp.dsp.calc_frame_length(win_size_ms, sr)
num_overlap_samples = int(frame_length * percent_overlap)
num_subframes = sp.dsp.calc_num_subframes(len(data),
frame_length = frame_length,
overlap_samples = num_overlap_samples,
zeropad = True)
max_freq = sr//2
# ensure even
if not max_freq % 2 == 0:
max_freq += 1
total_rows = fft_bins
# initialize empty matrix for dominant frequency values of speech frames
freq_matrix = sp.dsp.create_empty_matrix((num_subframes,),
complex_vals = False)
section_start = 0
window_frame = sp.dsp.create_window(window, frame_length)
row = 0
for frame in range(num_subframes):
section = data[section_start:section_start+frame_length]
# otherwise calculate frequency info
section = sp.dsp.apply_window(section,
window_frame,
zeropad = True)
section_fft = sp.dsp.calc_fft(section,
real_signal = real_signal,
fft_bins = total_rows,
)
# limit exploration of dominant frequency to max frequency (Nyquist Theorem)
section_fft = section_fft[:max_freq]
section_power = sp.dsp.calc_power(section_fft)
dom_f = sp.dsp.get_dom_freq(section_power)
freq_matrix[row] = dom_f
row += 1
section_start += (frame_length - num_overlap_samples)
return freq_matrix
# TODO consolidate into VAD? get_vad_stft, or get_vad_samples? Avoid extra processing?
# Perhaps as class attribute..
[docs]def get_mean_freq(sound, sr=16000, win_size_ms = 50, percent_overlap = 0.5,
real_signal = False, fft_bins = 1024,
window = 'hann', percent_vad=0.75):
'''Takes the mean of dominant frequencies of voice activated regions in a signal.
Note: Silences discarded.
The average fundamental frequency for a male voice is 125Hz; for a female voice it’s 200Hz; and for a child’s voice, 300Hz. (Russell, J., 2020)
References
----------
Russell, James (2020) The Human Voice and the Frequency Range.
Retrieved from:
https://blog.accusonus.com/pro-audio-production/human-voice-frequency-range/
'''
if isinstance(sound, np.ndarray):
data = sound
else:
data, sr2 = sp.loadsound(sound, sr=sr)
assert sr2 == sr
frame_length = sp.dsp.calc_frame_length(win_size_ms, sr)
num_overlap_samples = int(frame_length * percent_overlap)
num_subframes = sp.dsp.calc_num_subframes(len(data),
frame_length = frame_length,
overlap_samples = num_overlap_samples,
zeropad = True)
max_freq = sr//2
# ensure even
if not max_freq % 2 == 0:
max_freq += 1
total_rows = fft_bins
# initialize empty matrix for dominant frequency values of speech frames
freq_matrix = sp.dsp.create_empty_matrix((num_subframes,),
complex_vals = False)
section_start = 0
extra_rows = 0
e_min = None
f_min = None
sfm_min = None
window_frame = sp.dsp.create_window(window, frame_length)
row = 0
for frame in range(num_subframes):
section = data[section_start:section_start+frame_length]
section_vad, values_vad = sp.dsp.vad(section,
sr=sr,
win_size_ms = 10,
percent_overlap = 0.5,
min_energy = e_min,
min_freq = f_min,
min_sfm = sfm_min)
# adjusted number of values returned by vad() - still not 100%
# if should return three or four (probably three)
if len(values_vad) == 3:
e, f, sfm = values_vad
elif len(values_vad) == 4:
sr, e, f, sfm = values_vad
if e_min is None or e < e_min:
e_min = e
if f_min is None or f < f_min:
f_min = f
if sfm_min is None or sfm < sfm_min:
sfm_min = sfm
if sum(section_vad)/len(section_vad) < percent_vad:
# start over with the loop
extra_rows += 1
section_start += (frame_length - num_overlap_samples)
continue
# otherwise calculate frequency info
section = sp.dsp.apply_window(section,
window_frame,
zeropad = True)
section_fft = sp.dsp.calc_fft(section,
real_signal = real_signal,
fft_bins = total_rows,
)
# limit exploration of dominant frequency to max frequency (Nyquist Theorem)
section_fft = section_fft[:max_freq]
section_power = sp.dsp.calc_power(section_fft)
dom_f = sp.dsp.get_dom_freq(section_power)
if dom_f > 0:
freq_matrix[row] = dom_f
row += 1
else:
extra_rows += 1
section_start += (frame_length - num_overlap_samples)
freq_matrix = freq_matrix[:-extra_rows]
fund_freq = np.mean(freq_matrix)
return round(fund_freq,2)
# TODO: finicky
# Seems that removing the dc offset helps, as does having a higher sample rate
# still exploring influence of window size and percent overlap
# NOTE: good settings for VAD in SNR calc:
# set percent_overlap at 0.5, win_size_ms at 300
# (and padding 100 ms to the identified VAD start and end)
# TODO: fix ERROR issue: frame == 0, measure_noise_frames == 0
#vad_matrix, vad_settings = sp.dsp.vad(audio, sr,
#File "/home/airos/Projects/gitlab/family-language-tracker/soundpy/dsp.py", line 2877, in vad
#if ste - min_energy > thresh_e:
#UnboundLocalError: local variable 'thresh_e' referenced before assignment
[docs]def vad(sound, sr, win_size_ms = 50, percent_overlap = 0,
real_signal = False, fft_bins = None, window = 'hann',
energy_thresh = 40, freq_thresh = 185, sfm_thresh = 5,
min_energy = None, min_freq = None, min_sfm = None, use_beg_ms = 120):
'''
Warning: this VAD works best with sample rates above 44100 Hz.
Parameters
----------
energy_thresh : int, float
The minimum amount of energy for speech detection.
freq_thresh : int, float
The maximum frequency threshold.
sfm_thresh : int, float
The spectral flatness measure threshold.
References
----------
M. H. Moattar and M. M. Homayounpour, "A simple but efficient real-time Voice Activity Detection algorithm," 2009 17th European Signal Processing Conference, Glasgow, 2009, pp. 2549-2553.
'''
# if real signal, divide freq_thresh in half, as only half freq_bins are used
if real_signal:
freq_thresh /= 2
if fft_bins is None:
fft_bins = sr // 2
if fft_bins % 2 != 0:
fft_bins += 1
max_freq = sr//2
# ensure even
if not max_freq % 2 == 0:
max_freq += 1
if isinstance(sound, np.ndarray):
data = sound
# resample audio if sr is lower than 44100
if sr < 44100:
import warnings
msg = '\nWarning: VAD works best with sample rates above 44100 Hz.'
warnings.warn(msg)
else:
if sr < 44100:
import warnings
msg = '\nWarning: VAD works best with sample rates above 44100 Hz.'
warnings.warn(msg)
data, sr2 = sp.loadsound(sound, sr=sr)
assert sr2 == sr
# first scale samples to be between -1 and 1
data = sp.dsp.scalesound(data, max_val = 1)
frame_length = sp.dsp.calc_frame_length(win_size_ms, sr)
num_overlap_samples = int(frame_length * percent_overlap)
num_subframes = sp.dsp.calc_num_subframes(len(data),
frame_length = frame_length,
overlap_samples = num_overlap_samples,
zeropad = True)
# limit number of beginning frames to measure background noise:
measure_noise_samples = sp.dsp.calc_frame_length(use_beg_ms, sr)
measure_noise_frames = sp.dsp.calc_num_subframes(measure_noise_samples,
frame_length = frame_length,
overlap_samples = num_overlap_samples,
zeropad = True)
# initialize empty matrix to vad values into
vad_matrix = sp.dsp.create_empty_matrix((num_subframes,),
complex_vals = False)
section_start = 0
speech = 0
silence = 0
min_energy_array = np.zeros(measure_noise_frames)
min_freq_array = np.zeros(measure_noise_frames)
min_sfm_array = np.zeros(measure_noise_frames)
window_frame = sp.dsp.create_window(window, frame_length)
for frame in range(num_subframes):
samples = data[section_start:section_start+frame_length]
samples_windowed = sp.dsp.apply_window(samples, window_frame, zeropad = True)
# apply dft to large window - increase frequency resolution during warping
section_fft = sp.dsp.calc_fft(samples_windowed,
real_signal = real_signal,
fft_bins = fft_bins,
)
# limit exploration of dominant frequency to max frequency (Nyquist Theorem)
section_fft = section_fft[:max_freq]
section_power = sp.dsp.calc_power(section_fft)
# set minimum values if not yet set
if min_energy is None and frame == 0:
min_energy = sp.dsp.short_term_energy(samples)
#elif frame < measure_noise_frames:
#new_min_energy = sp.dsp.short_term_energy(samples)
#min_energy_array[frame] = new_min_energy
#min_energy = np.mean(min_energy_array[:frame+1])
ste = sp.dsp.short_term_energy(samples)
if ste < min_energy and frame < measure_noise_frames:
min_energy = ste
if min_freq is None and frame == 0:
min_freq = sp.dsp.get_dom_freq(section_power)
#elif frame < measure_noise_frames:
#new_min_freq = sp.dsp.get_dom_freq(section_power)
#min_freq_array[frame] = new_min_freq
#min_freq = np.mean(min_freq_array[:frame+1])
f = sp.dsp.get_dom_freq(section_power)
if f < min_freq and frame < measure_noise_frames:
min_freq = f
# TODO fix when NAN value
if min_sfm is None and frame == 0:
try:
min_sfm = sp.dsp.spectral_flatness_measure(section_fft)
except TypeError:
min_sfm = 0
#elif frame < measure_noise_frames:
#new_min_sfm = sp.dsp.spectral_flatness_measure(section_fft)
#min_sfm_array[frame] = new_min_sfm
#min_sfm= np.mean(min_sfm_array[:frame+1])
# TODO fix when NAN value
try:
sfm = sp.dsp.spectral_flatness_measure(section_fft)
except TypeError:
if frame == 0:
sfm = 0
else:
pass # it should already be defined
if sfm < min_sfm and frame < measure_noise_frames:
min_sfm = sfm
# decide if speech or silence
# not finding this helpful
if frame < measure_noise_frames:
thresh_e = energy_thresh * np.log(min_energy)
else:
# what if frame == 0 and measure_noise_frames == 0?
# still samples to process
thresh_e = energy_thresh # TODO: test this fix
counter = 0
if ste - min_energy > thresh_e:
counter += 1
if f - min_freq > freq_thresh:
counter += 1
if sfm - min_sfm > sfm_thresh:
counter += 1
if counter >= 1:
vad_matrix[frame] += 1
speech += 1
# set silence back to 0 if at least 5 speech frames
if speech > 5:
silence = 0
else:
vad_matrix[frame] += 0
# update min energy and silence count
silence += 1
# not finding this helpful
if frame < measure_noise_frames:
min_energy = ((silence * min_energy) + ste) / \
silence + 1
# if silence has been longer than 10 frames, set speech to 0
if silence > 10:
speech = 0
# not finding this helpful
if frame < measure_noise_frames:
thresh_e = energy_thresh * np.log(min_energy)
section_start += (frame_length - num_overlap_samples)
return vad_matrix, (sr, min_energy, min_freq, min_sfm)
[docs]def suspended_energy(speech_energy,speech_energy_mean,row,start):
try:
if start == True:
if row <= len(speech_energy)-4:
if speech_energy[row+1] and speech_energy[row+2] and speech_energy[row+3] > speech_energy_mean:
return True
else:
if row >= 3:
if speech_energy[row-1] and speech_energy[row-2] and speech_energy[row-3] > speech_energy_mean:
return True
except IndexError as ie:
return False
[docs]def sound_index(speech_energy,speech_energy_mean,start = True):
'''Identifies the index of where speech or energy starts or ends.
'''
if start == True:
side = 1
beg = 0
end = len(speech_energy)
else:
side = -1
beg = len(speech_energy)-1
end = -1
for row in range(beg,end,side):
if speech_energy[row] > speech_energy_mean:
if suspended_energy(speech_energy, speech_energy_mean, row,start=start):
if start==True:
#to catch plosive sounds
while row >= 0:
row -= 1
row -= 1
if row < 0:
row = 0
break
return row, True
else:
#to catch quiet consonant endings
while row <= len(speech_energy):
row += 1
row += 1
if row > len(speech_energy):
row = len(speech_energy)
break
return row, True
else:
#print("No Speech Detected")
pass
return beg, False
[docs]def get_energy(stft):
rms_list = [np.sqrt(sum(np.abs(stft[row])**2)/stft.shape[1]) for row in range(len(stft))]
return rms_list
[docs]def get_energy_mean(rms_energy):
energy_mean = sum(rms_energy)/len(rms_energy)
return energy_mean
# TODO test
[docs]def spectral_flatness_measure(spectrum):
import scipy
spectrum = np.abs(spectrum)
g = scipy.stats.gmean(spectrum)
a = np.mean(spectrum)
if not np.isfinite(a).all():
raise TypeError(a)
elif not np.isfinite(a).any():
raise ValueError(a)
sfm = 10 * np.log10(g/(a+1e-6))
return sfm
[docs]def get_dom_freq(power_values):
'''If real_signal (i.e. half fft bins), might mess up values.
'''
freq_bin = np.argmax(power_values)
return freq_bin
[docs]def short_term_energy(signal_windowed):
'''
Expects `signal` to be scaled (-1, 1) as well as windowed.
References
----------
http://vlab.amrita.edu/?sub=3&brch=164&sim=857&cnt=1
'''
ste = sum(np.abs(signal_windowed)**2)
return ste
[docs]def bilinear_warp(fft_value, alpha):
'''Subfunction for vocal tract length perturbation.
See Also
--------
soundpy.augment.vtlp
References
----------
Kim, C., Shin, M., Garg, A., & Gowda, D. (2019). Improved vocal tract length perturbation
for a state-of-the-art end-to-end speech recognition system. Interspeech. September 15-19,
Graz, Austria.
'''
nominator = (1-alpha) * np.sin(fft_value)
denominator = 1 - (1-alpha) * np.cos(fft_value)
fft_warped = fft_value + 2 * np.arctan(nominator/(denominator + 1e-6) + 1e-6)
return fft_warped
[docs]def piecewise_linear_warp(fft_value, alpha, max_freq):
'''Subfunction for vocal tract length perturbation.
See Also
--------
soundpy.augment.vtlp
References
----------
Kim, C., Shin, M., Garg, A., & Gowda, D. (2019). Improved vocal tract length perturbation
for a state-of-the-art end-to-end speech recognition system. Interspeech. September 15-19,
Graz, Austria.
'''
if fft_value.all() <= max_freq * (min(alpha, 1)/ (alpha + 1e-6)):
fft_warped = fft_value * alpha
else:
nominator = np.pi - max_freq * (min(alpha, 1))
denominator = np.pi - max_freq * (min(alpha, 1) / (alpha + 1e-6))
fft_warped = np.pi - (nominator / denominator + 1e-6) * (np.pi - fft_value)
return fft_warped
# TODO: remove? function get_mean_freq is much better
[docs]def f0_approximation(sound, sr, low_freq = 50, high_freq = 300, **kwargs):
'''Approximates fundamental frequency.
Limits the stft of voice active sections to frequencies to between
`low_freq` and `high_freq` and takes mean of the dominant frequencies
within that range. Defaults are set at 50 and 300 as most human speech
frequencies occur between 85 and 255 Hz.
References
----------
https://en.wikipedia.org/wiki/Voice_frequency
'''
import warnings
warnings.warn('\n\nWarning: `soundpy.dsp.f0_approximation` is experimental at'+\
' best. \nPerhaps try `soundpy.dsp.get_mean_freq`, which is still '+\
'experimental but a bit more reliable.\n\n')
import scipy
if isinstance(sound, np.ndarray):
data = sound
else:
data, sr2 = sp.loadsound(sound, sr=sr)
assert sr2 == sr
stft = sp.feats.get_vad_stft(data, sr=sr, **kwargs)
# get_vad_stft now returns stft, vad_matrix but used to return just stft
if isinstance(stft, tuple):
stft = stft[0]
stft = stft[:,low_freq:high_freq]
power = np.abs(stft)**2
dom_f = []
for row in power:
dom_f.append(sp.dsp.get_dom_freq(row))
return np.mean(dom_f)
if __name__ == '__main__':
import doctest
doctest.testmod()