Source code for soundpy.models.dataprep
'''The models.dataprep module covers functionality for feeding features to models.
'''
import os, sys
import inspect
currentdir = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))
packagedir = os.path.dirname(currentdir)
sys.path.insert(0, packagedir)
import numpy as np
import math
import random
import soundpy as sp
import librosa
###############################################################################
#feed data to models
[docs]class Generator:
[docs] def __init__(self, data_matrix1, data_matrix2=None, timestep = None,
axis_timestep = 0, normalize=True, apply_log = False,
context_window = None, axis_context_window = -2, labeled_data = False,
gray2color = False, zeropad = True,
desired_input_shape = None, combine_axes_0_1=False):
'''
This generator pulls data out in sections (i.e. batch sizes). Prepared for 3 dimensional data.
Note: Keras adds a dimension to input to represent the "Tensor" that
#handles the input. This means that sometimes you have to add a
shape of (1,) to the shape of the data.
Parameters
----------
data_matrix1 : np.ndarray [size=(num_samples, batch_size, num_frames, num_features) or (num_samples, num_frames, num_features+label_column)]
The training data. This can contain the feature and label data or
just the input feature data.
data_matrix2 : np.ndarray [size = (num_samples, ) `data_matrix1`.shape], optional
Either label data for `data_matrix1` or, for example, the clean
version of `data_matrix1` if training an autoencoder. (default None)
normalize : bool
If False, the data has already been normalized and won't be normalized
by the generator. (default True)
apply_log : bool
If True, log will be applied to the data.
timestep : int
The number of frames to constitute a timestep.
axis_timestep : int
The axis to apply the `timestep` to. (default 0)
context_window : int
The size of `context_window` or number of samples padding a central frame.
This may be useful for models training on small changes occuring in the signal, e.g. to break up the image of sound into smaller parts.
axis_context_window : int
The axis to `apply_context_window`, if `context_window` is not None.
Ideally should be in axis preceding feature column.
(default -2)
zeropad : bool
If features should be zeropadded in reshaping functions.
desired_input_shape : int or tuple, optional
The desired number of features or shape of data to feed a neural network.
If type int, only the last column of features will be adjusted (zeropadded
or limited). If tuple, the entire data shape will be adjusted (all columns).
If the int or shape is larger than that of the data provided, data will
be zeropadded. If the int or shape is smaller, the data will be restricted.
(default None)
'''
self.batch_size = 1
self.samples_per_epoch = data_matrix1.shape[0]
self.number_of_batches = self.samples_per_epoch/self.batch_size
self.counter = 0
self.datax = data_matrix1
self.datay = data_matrix2
self.normalize = normalize
self.apply_log = apply_log
self.timestep = timestep
self.axis_timestep = axis_timestep
self.context_window = context_window
self.axis_context = axis_context_window
self.zeropad = zeropad
self.gray2color = gray2color # if need to change grayscale data to rgb
if self.datay is None:
# separate the label from the feature data
self.datax, self.datay = sp.feats.separate_dependent_var(self.datax)
if self.datay.dtype == np.complex64 or self.datay.dtype == np.complex64:
self.datay = self.datay.astype(float)
# assumes last column of features is the label column
self.num_feats = self.datax.shape[-1]
self.labels = True
else:
self.labels = None
if labeled_data:
self.labels = True
self.desired_shape = desired_input_shape
self.combine_axes_0_1 = combine_axes_0_1
[docs] def generator(self):
'''Shapes, norms, and feeds data depending on labeled or non-labeled data.
'''
while 1:
# will be size (batch_size, num_frames, num_features)
batch_x = self.datax[self.counter]
batch_y = self.datay[self.counter]
# ensure label is shape (1,)
if self.labels:
if isinstance(batch_y, np.ndarray) and len(batch_y) > 1:
batch_y = batch_y[:0]
if not isinstance(batch_y, np.ndarray):
batch_y = np.expand_dims(batch_y, axis=0)
# TODO: is there a difference between taking log of stft before
# or after normalization?
if self.normalize or self.datax.dtype == np.complex_:
# if complex data, power spectrum will be extracted
# power spectrum = np.abs(complex_data)**2
batch_x = sp.feats.normalize(batch_x)
if self.labels is None:
batch_y = sp.feats.normalize(batch_y)
# apply log if specified
if self.apply_log:
batch_x = np.log(np.abs(batch_x))
# don't need to touch label data
if self.labels is None:
batch_y = np.log(np.abs(batch_y))
# reshape features to allow for timestep / subsection features
if self.timestep is not None:
batch_x = sp.feats.apply_new_subframe(
batch_x,
new_frame_size = self.timestep,
zeropad = self.zeropad,
axis = self.axis_timestep)
if self.labels is None:
batch_y = sp.feats.apply_new_subframe(
batch_y,
new_frame_size = self.timestep,
zeropad = self.zeropad,
axis = self.axis_timestep)
# reshape features to allow for context window / subsection features
if self.context_window is not None:
batch_x = sp.feats.apply_new_subframe(
batch_x,
new_frame_size = self.context_window * 2 + 1,
zeropad = self.zeropad,
axis = self.axis_context)
if self.labels is None:
batch_y = apply_new_subframe(
batch_y,
new_frame_size = self.context_window * 2 + 1,
zeropad = self.zeropad,
axis = self.axis_context)
if self.gray2color:
# expects colorscale to be rgb (i.e. 3)
# will copy first channel into the other color channels
batch_x = sp.feats.grayscale2color(batch_x,
colorscale = 3)
if self.labels is None:
batch_y = sp.feats.grayscale2color(batch_y,
colorscale = 3)
if self.labels:
if batch_y.dtype == np.complex64 or batch_y.dtype == np.complex128:
batch_y = batch_y.astype(int)
# TODO test
# if need greater number of features --> zero padding
# could this be applied to including both narrowband and wideband data?
# check to ensure batches match desired input shape
if self.combine_axes_0_1 is True:
batch_x = batch_x.reshape((batch_x.shape[0]*batch_x.shape[1],)+ batch_x.shape[2:])
if self.labels is None:
batch_y = batch_y.reshape((batch_y.shape[0]*batch_y.shape[1],)+ batch_y.shape[2:])
if self.desired_shape is not None:
# can add dimensions of length 1 to first and last axis:
try:
batch_x = sp.feats.adjust_shape(batch_x, self.desired_shape)
if self.labels is None:
batch_y = sp.feats.adjust_shape(batch_y, self.desired_shape)
except ValueError:
raise ValueError('Data batch with shape {}'.format(batch_x.shape))+\
' cannot be reshaped to match `desired_input_shape` of '+\
'{}. Perhaps try setting '.format(self.desired_shape) +\
'parameter `combine_axes_0_1` to True or False. ' +\
'(default is False)'
#send the batched and reshaped data to model
self.counter += 1
yield batch_x, batch_y
#restart counter to yeild data in the next epoch as well
if self.counter >= self.number_of_batches:
self.counter = 0
[docs]class GeneratorFeatExtraction(Generator):
def __init__(self, datalist, datalist2 = None, model_name = None,
normalize = True, apply_log = False, randomize = True,
random_seed=None, desired_input_shape = None,
timestep = None, axis_timestep = 0, context_window = None,
axis_context_window = -2, batch_size = 1,
gray2color = False, visualize = False,
vis_every_n_items = 50, visuals_dir = None,
decode_dict = None, dataset='train',
augment_dict = None, label_silence = False,
vad_start_end = False, **kwargs):
'''
Parameters
----------
datalist : list
List of audiofile pathways for feature extraction and model training.
If labeled data, expects pathway and encoded label (i.e. int) to be paired
together in a tuple (a list of tuples).
datalist2 : list, optional
List of audiofile pathways or labels for feature extraction and model training.
This list might contain clean versions of `datalist`. These will be assigned
as the 'label' or expected output of the input features.
vad_start_end : bool
If True, VAD will be applied only to the beginng and end of the signal, to clip off
the silences. If False, VAD will be applied to the entire signal; however, this is
potentially finicky.
**kwargs : additional keyword arguments
Keyword arguments for soundpy.feats.get_feats
'''
if desired_input_shape is None and 'dur_sec' not in kwargs.keys():
raise ValueError('No information pertaining to amount of audio data '+\
'to be extracted is supplied. Please specify `sample_length`, '+\
'`desired_input_shape`, or `dur_sec`.')
if randomize:
# to ensure each iteration data is randomized
random_seed = np.random.choice(range(100))
if random_seed is not None:
random.seed(random_seed)
random.shuffle(datalist)
if datalist2 is not None:
if random_seed is not None:
random.seed(random_seed)
else:
raise ValueError('If two audiolists lists are provided, ' +\
'a `random_seed` is necessary to ensure they still match '+\
'post randomization.')
random.shuffle(datalist2)
self.dataset = dataset
self.label_silence = label_silence
self.vad_start_end = vad_start_end
self.model_name = model_name
self.batch_size = batch_size
self.samples_per_epoch = len(datalist)
self.number_of_batches = self.samples_per_epoch//batch_size
self.counter = 0
self.audiolist = datalist
self.audiolist2 = datalist2
self.context_window = context_window
self.axis_context = axis_context_window
self.timestep = timestep
self.axis_timestep = axis_timestep
self.normalize = normalize
self.apply_log = apply_log
self.desired_input_shape = desired_input_shape
self.gray2color = gray2color
self.visualize = visualize
self.vis_every_n_items = vis_every_n_items
self.visuals_dir = visuals_dir
if decode_dict is None:
decode_dict = dict()
self.decode_dict = decode_dict
if label_silence:
if 'silence' not in decode_dict.values():
raise ValueError('Cannot apply `silence` label if not included in '+\
'`decode_dict`.')
if augment_dict is None:
augment_dict = dict()
self.augment_dict = augment_dict
self.kwargs = kwargs
# Ensure `feature_type` and `sr` are provided in **kwargs
try:
feature_type = kwargs['feature_type']
except KeyError:
raise KeyError('Feature type not indicated. '+\
'Please set `feature_type` to one of the following: '+\
'\nfbank\nstft\npowspec\nsignal\nmfcc\n')
try:
sr = kwargs['sr']
except KeyError:
raise KeyError('Sample rate is not indicated. '+\
'Please set `sr` (e.g. sr = 22050)')
[docs] def generator(self):
'''Extracts features and feeds them to model according to `desired_input_shape`.
'''
while 1:
augmentation = ''
audioinfo = self.audiolist[self.counter]
# does the list contain label audiofile pairs?
if isinstance(audioinfo, tuple):
if len(audioinfo) != 2:
raise ValueError('Expected tuple containing audio file path and label. '+\
'Instead received tuple of length: \n{}'.format(len(audioinfo)))
# if label is a string digit, int, or float - turn to int
if isinstance(audioinfo[0], int) or isinstance(audioinfo[0], float) or \
isinstance(audioinfo[0], str) and audioinfo[0].isdigit():
label = int(audioinfo[0])
audiopath = audioinfo[1]
elif isinstance(audioinfo[1], int) or isinstance(audioinfo[1], float) or \
isinstance(audioinfo[1], str) and audioinfo[1].isdigit():
label = int(audioinfo[1])
audiopath = audioinfo[1]
else:
raise ValueError('Expected tuple to contain an integer label '+\
'and audio pathway. Received instead tuple with types '+\
'{} and {}.'.format(type(audioinfo[0]), type(audioinfo[1])))
# otherwise list of audiofiles
else:
audiopath = audioinfo
label = None
if self.audiolist2 is not None:
# expects audiolist2 to be either integer labels or audiofile pathways
audioinfo2 = self.audiolist2[self.counter]
if isinstance(audioinfo2, int) or isinstance(audioinfo2, str) and \
audioinfo2.isdigit():
if label is None:
label = audioinfo2
else:
if label == int(audioinfo2):
pass
else:
raise ValueError('Provided conflicting labels for '+\
'current audiofile: {}.'.format(audiopath) +\
'\nReceived both label {} and {} .'.format(
label, int(audioinfo2)))
audiopath2 = None
else:
audiopath2 = audioinfo2
else:
audiopath2 = None
if label is not None:
labeled_data = True
if self.decode_dict is not None:
try:
label_pic = self.decode_dict[label].upper()
except KeyError:
# dictionary keys might be string type, not int type
label_pic = self.decode_dict[str(int(label))].upper()
else:
label_pic = label
else:
labeled_data = False
label_pic = None
# ensure audio is valid:
y, sr = sp.loadsound(audiopath, self.kwargs['sr'])
if audiopath2:
y2, sr2 = sp.loadsound(audiopath2, self.kwargs['sr'])
else:
y2, sr2 = None, None
if self.label_silence:
if self.vad_start_end:
y_stft, vad = sp.dsp.get_stft_clipped(y, sr=sr,
win_size_ms = 50,
percent_overlap = 0.5)
else:
y_stft, __ = sp.feats.get_vad_stft(y, sr=sr,
win_size_ms = 50,
percent_overlap = 0.5,
use_beg_ms = 120,
energy_thresh = 40,
freq_thresh = 185,
sfm_thresh = 5)
if not y_stft.any():
label = len(self.decode_dict)-1
print('\nNo voice activity detected in {}'.format(audiopath))
print('Label {} adjusted to {}.'.format(label_pic,self.decode_dict[label]))
label_pic = self.decode_dict[label]
# augment_data
if self.augment_dict is not None:
aug_dict = randomize_augs(self.augment_dict)
augmented_data, augmentation = augment_features(y,
self.kwargs['sr'],
**aug_dict)
if audiopath2:
# remove 'add_white_noise' if in aug_dict
aug_dict2 = {}
for key, value in aug_dict.items():
if key != 'add_white_noise':
aug_dict2[key] = value
augmented_data2, augmentation2 = augment_features(y2,
self.kwargs['sr'],
**aug_dict2)
else:
augmented_data, augmentation = y, ''
aug_dict = dict()
augmented_data2, augmentation2 = y2, ''
aug_dict2 = dict()
# extract features
# will be shape (num_frames, num_features)
if 'vtlp' in aug_dict and aug_dict['vtlp']:
sr = self.kwargs['sr']
win_size_ms = sp.utils.restore_dictvalue(self.kwargs['win_size_ms'])
percent_overlap = sp.utils.restore_dictvalue(self.kwargs['percent_overlap'])
fft_bins = sp.utils.restore_dictvalue(self.kwargs['fft_bins'])
window = sp.utils.restore_dictvalue(self.kwargs['window'])
real_signal = sp.utils.restore_dictvalue(self.kwargs['real_signal'])
feature_type_vtlp = 'stft'
dur_sec = sp.utils.restore_dictvalue(self.kwargs['dur_sec'])
zeropad = sp.utils.restore_dictvalue(self.kwargs['zeropad'])
# need to tell vtlp the size of fft we need, in order to
# be able to extract fbank and mfcc features as well
expected_stft_shape, __ = sp.feats.get_feature_matrix_shape(
sr = sr,
dur_sec = dur_sec,
feature_type = feature_type_vtlp,
win_size_ms = win_size_ms,
percent_overlap = percent_overlap,
fft_bins = fft_bins,
zeropad = zeropad,
real_signal = real_signal)
# TODO bug fix: oversize_factor higher than 1:
# how to reduce dimension back to `expected_stft_shape` without
# shaving off data?
oversize_factor = 16
augmented_data, alpha = sp.augment.vtlp(
augmented_data, sr,
win_size_ms = win_size_ms,
percent_overlap = percent_overlap,
fft_bins = fft_bins,
window = window,
real_signal = real_signal,
expected_shape = expected_stft_shape,
oversize_factor = oversize_factor,
visualize=False)
# vtlp was last augmentation to be added to `augmentation` string
# add the value that was applied
augmentation += '_vtlp'+str(alpha)
# need to be able to set alpha
augmented_data2, alpha2 = sp.augment.vtlp(
augmented_data2, sr,
a = alpha,
win_size_ms = win_size_ms,
percent_overlap = percent_overlap,
fft_bins = fft_bins,
window = window,
real_signal = real_signal,
expected_shape = expected_stft_shape,
oversize_factor = oversize_factor,
visualize=False)
try:
assert alpha == alpha2
except AssertionError:
raise ValueError('The alpha value for vtlp application '+\
'does not match for the X and y audio: '+\
'X alpha is {} and y alpha is {}'.format(alpha, alpha2))
# vtlp was last augmentation to be added to `augmentation` string
# add the value that was applied
augmentation2 += '_vtlp'+str(alpha)
if 'vtlp' in aug_dict and aug_dict['vtlp']:
if 'stft' in self.kwargs['feature_type'] or \
'powspec' in self.kwargs['feature_type']:
if 'stft' in self.kwargs['feature_type'] and oversize_factor > 1:
import warnings
msg = '\nWARNING: due to resizing of STFT matrix due to '+\
' `oversize_factor` {}, converted to '.format(oversize_factor)+\
'power spectrum. Phase information has been removed.'
warnings.warn(msg)
feats = augmented_data
if audiopath2:
feats2 = augmented_data2
if 'powspec' in self.kwargs['feature_type'] and oversize_factor == 1:
# otherwise already a power spectrum
feats = sp.dsp.calc_power(feats)
if audiopath2:
feats2 = sp.dsp.calc_power(feats2)
elif 'stft'in self.kwargs['feature_type'] or \
'powspec' in self.kwargs['feature_type']:
feats = sp.feats.get_stft(
augmented_data,
sr = self.kwargs['sr'],
win_size_ms = self.kwargs['win_size_ms'],
percent_overlap = self.kwargs['percent_overlap'],
real_signal = self.kwargs['real_signal'],
fft_bins = self.kwargs['fft_bins'],
rate_of_change = self.kwargs['rate_of_change'],
rate_of_acceleration = self.kwargs['rate_of_acceleration'],
window = self.kwargs['window'],
zeropad = self.kwargs['zeropad']
)
if audiopath2:
feats2 = sp.feats.get_stft(
augmented_data2,
sr = self.kwargs['sr'],
win_size_ms = self.kwargs['win_size_ms'],
percent_overlap = self.kwargs['percent_overlap'],
real_signal = self.kwargs['real_signal'],
fft_bins = self.kwargs['fft_bins'],
rate_of_change = self.kwargs['rate_of_change'],
rate_of_acceleration = self.kwargs['rate_of_acceleration'],
window = self.kwargs['window'],
zeropad = self.kwargs['zeropad']
)
if 'powspec' in self.kwargs['feature_type']:
feats = sp.dsp.calc_power(feats)
if audiopath2:
feats2 = sp.dsp.calc_power(feats2)
if 'fbank' in self.kwargs['feature_type']:
feats = sp.feats.get_fbank(
augmented_data,
sr = self.kwargs['sr'],
num_filters = self.kwargs['num_filters'],
win_size_ms = self.kwargs['win_size_ms'],
percent_overlap = self.kwargs['percent_overlap'],
real_signal = self.kwargs['real_signal'],
fft_bins = self.kwargs['fft_bins'],
rate_of_change = self.kwargs['rate_of_change'],
rate_of_acceleration = self.kwargs['rate_of_acceleration'],
window = self.kwargs['window'],
zeropad = self.kwargs['zeropad']
)
if audiopath2:
feats2 = sp.feats.get_fbank(
augmented_data2,
sr = self.kwargs['sr'],
num_filters = self.kwargs['num_filters'],
win_size_ms = self.kwargs['win_size_ms'],
percent_overlap = self.kwargs['percent_overlap'],
real_signal = self.kwargs['real_signal'],
fft_bins = self.kwargs['fft_bins'],
rate_of_change = self.kwargs['rate_of_change'],
rate_of_acceleration = self.kwargs['rate_of_acceleration'],
window = self.kwargs['window'],
zeropad = self.kwargs['zeropad']
)
elif 'mfcc' in self.kwargs['feature_type']:
feats = sp.feats.get_mfcc(
augmented_data,
sr = self.kwargs['sr'],
num_mfcc = self.kwargs['num_mfcc'],
num_filters = self.kwargs['num_filters'],
win_size_ms = self.kwargs['win_size_ms'],
percent_overlap = self.kwargs['percent_overlap'],
real_signal = self.kwargs['real_signal'],
fft_bins = self.kwargs['fft_bins'],
rate_of_change = self.kwargs['rate_of_change'],
rate_of_acceleration = self.kwargs['rate_of_acceleration'],
window = self.kwargs['window'],
zeropad = self.kwargs['zeropad']
)
if audiopath2:
feats2 = sp.feats.get_mfcc(
augmented_data2,
sr = self.kwargs['sr'],
num_mfcc = self.kwargs['num_mfcc'],
num_filters = self.kwargs['num_filters'],
win_size_ms = self.kwargs['win_size_ms'],
percent_overlap = self.kwargs['percent_overlap'],
real_signal = self.kwargs['real_signal'],
fft_bins = self.kwargs['fft_bins'],
rate_of_change = self.kwargs['rate_of_change'],
rate_of_acceleration = self.kwargs['rate_of_acceleration'],
window = self.kwargs['window'],
zeropad = self.kwargs['zeropad']
)
if self.apply_log:
# TODO test
if feats[0].any() < 0:
feats = np.abs(feats)
feats = np.log(feats)
if self.normalize:
feats = sp.feats.normalize(feats)
if audiopath2:
if self.apply_log:
# TODO test
if feats2[0].any() < 0:
feats2 = np.abs(feats2)
feats2 = np.log(feats2)
if self.normalize:
feats2 = sp.feats.normalize(feats2)
else:
feats2 = None
# Save visuals if desired
if self.visualize:
if self.counter % self.vis_every_n_items == 0:
# make augmentation string more legible.
augments_vis = augmentation[1:].split('_')
if len(augments_vis) > 1:
augs1 = augments_vis[:len(augments_vis)//2]
augs2 = augments_vis[len(augments_vis)//2:]
augs1 = ', '.join(augs1)
augs2 = ', '.join(augs2)
else:
augs1 = augments_vis[0]
augs2 = ''
if self.visuals_dir is not None:
save_visuals_path = sp.check_dir(self.visuals_dir, make=True)
else:
save_visuals_path = sp.check_dir('./training_images/', make=True)
save_visuals_path = save_visuals_path.joinpath(
'{}_label{}_training_{}_{}_{}.png'.format(
self.dataset,
label_pic,
self.model_name,
augmentation,
sp.utils.get_date()))
feature_type = self.kwargs['feature_type']
sr = self.kwargs['sr']
win_size_ms = self.kwargs['win_size_ms']
percent_overlap = self.kwargs['percent_overlap']
if 'stft' in feature_type or 'powspec' in feature_type or 'fbank' \
in feature_type:
energy_scale = 'power_to_db'
else:
energy_scale = None
sp.feats.plot(
feature_matrix = feats,
feature_type = feature_type,
sr = sr,
win_size_ms = win_size_ms, percent_overlap = percent_overlap,
energy_scale = energy_scale, save_pic = True,
name4pic = save_visuals_path,
title = '"{}" {} Aug: {}-\n{}'.format(
label_pic,
feature_type.upper(),
augs1,
augs2),
subprocess=True) #use Agg backend for plotting
if feats2 is not None:
# add '_2' to pathway
p = sp.utils.string2pathlib(save_visuals_path)
p2 = p.name.stem
save_visuals_path2 = p.parent.joinpath(p2+'_2'+p.name.suffix)
sp.feats.plot(
feature_matrix = feats2,
feature_type = feature_type,
sr = sr,
win_size_ms = win_size_ms, percent_overlap = percent_overlap,
energy_scale = energy_scale, save_pic = True,
name4pic = save_visuals_path2,
title = 'Output {} features {}'.format(
label_pic, feature_type),
subprocess=True)
batch_x = feats
batch_y = feats2
# reshape features to allow for timestep / subsection features
if self.timestep is not None:
batch_x = sp.feats.apply_new_subframe(
batch_x,
new_frame_size = self.timestep,
zeropad = self.kwargs['zeropad'],
axis = self.axis_timestep)
if batch_y is not None:
batch_y = sp.feats.apply_new_subframe(
batch_y,
new_frame_size = self.timestep,
zeropad = self.kwargs['zeropad'],
axis = self.axis_timestep)
# reshape features to allow for context window / subsection features
if self.context_window is not None:
batch_x = sp.feats.apply_new_subframe(
batch_x,
new_frame_size = self.context_window * 2 + 1,
zeropad = self.kwargs['zeropad'],
axis = self.axis_context)
if batch_y is not None:
batch_y = apply_new_subframe(
batch_y,
new_frame_size = self.context_window * 2 + 1,
zeropad = self.kwargs['zeropad'],
axis = self.axis_context)
# grayscale 2 color
if self.gray2color:
batch_x = sp.feats.grayscale2color(batch_x,
colorscale = 3) # default colorscale is 3
if batch_y is not None:
batch_y = sp.feats.grayscale2color(batch_y,
colorscale = 3)
# reshape to input shape. Will be zeropadded or limited to this shape.
# tensor dimensions on either side can be added here as well.
if self.desired_input_shape is not None:
batch_x = sp.feats.adjust_shape(batch_x, self.desired_input_shape)
if batch_y is not None:
batch_y = sp.feats.adjust_shape(batch_y, self.desired_input_shape)
# prepare data to be fed to network:
if labeled_data:
# has to be at least (1,)
batch_y = np.expand_dims(np.array(label), axis=0)
elif batch_y is not None:
pass
else:
raise ValueError('No independent variable provided.')
self.counter += 1
yield batch_x, batch_y
#restart counter to yield data in the next epoch as well
if self.counter >= self.number_of_batches:
self.counter = 0
[docs]def check4na(numpyarray):
if not np.isfinite(numpyarray).all():
print('NAN present.')
return True
else:
return False
[docs]def randomize_augs(aug_dict, random_seed=None):
'''Creates copy of dict and chooses which augs applied randomly.
Can apply random seed for number of augmentations applied and shuffling
order of possible augmentations.
'''
possible_augs = []
num_possible_aug = 0
if aug_dict is not None:
for key, value in aug_dict.items():
if value == True:
num_possible_aug += 1
possible_augs.append(key)
if random_seed is not None:
np.random.seed(random_seed)
num_augs = np.random.choice(range(num_possible_aug+1))
if num_augs == 0:
# no augmentations applied:
new_dict = dict(aug_dict)
for key, value in new_dict.items():
if value == True:
new_dict[key] = False
return new_dict
if random_seed is not None:
random.seed(random_seed)
random.shuffle(possible_augs)
augs = possible_augs[:num_augs]
augs_leftover = augs[num_augs:]
if 'speed_increase' in augs and 'speed_decrease' in augs:
i1 = augs.index('speed_increase')
i2 = augs.index('speed_decrease')
x = [i1, i2]
random.shuffle(x)
speed2remove = augs.pop(x[0])
if augs_leftover:
aug_added = augs_leftover.pop(0)
augs.append(aug_added)
if 'pitch_increase' in augs and 'pitch_decrease' in augs:
i1 = augs.index('pitch_increase')
i2 = augs.index('pitch_decrease')
x = [i1, i2]
random.shuffle(x)
pitch2remove = augs.pop(x[0])
if augs_leftover:
aug_added = augs_leftover.pop(0)
augs.append(aug_added)
new_dict = dict(aug_dict)
for key, value in new_dict.items():
if value == True:
if key not in augs:
new_dict[key] = False
return new_dict
[docs]def augment_features(sound,
sr,
add_white_noise = False,
snr = [5,10,20],
speed_increase = False,
speed_decrease = False,
speed_perc = 0.15,
time_shift = False,
shufflesound = False,
num_subsections = 3,
harmonic_distortion = False,
pitch_increase = False,
pitch_decrease = False,
num_semitones = 2,
vtlp = False,
bilinear_warp = True,
augment_settings_dict = None,
random_seed = None,
):
'''Randomly applies augmentations to audio. If no `augment_settings_dict`, defaults applied.
'''
if augment_settings_dict is not None:
aug_settings = dict(augment_settings_dict)
else:
aug_settings = augment_settings_dict
if speed_increase and speed_decrease:
raise ValueError('Cannot have both speed_increase and speed_decrease'+\
' as augmentation options. Set just one to True.')
if pitch_increase and pitch_decrease:
raise ValueError('Cannot have both pitch_increase and pitch_decrease'+\
' as augmentation options. Set just one to True.')
if isinstance(sound, np.ndarray):
data = sound
else:
data, sr2 = sp.loadsound(sound, sr=sr)
assert sr2 == sr
samples = data.copy()
samples_augmented = samples.copy()
augmentation = ''
if add_white_noise:
# allow default settings to be used/overwritten
if aug_settings is not None:
kwargs_aug = aug_settings['add_white_noise']
if isinstance(kwargs_aug['snr'], str):
kwargs_aug['snr'] = sp.utils.restore_dictvalue(kwargs_aug['snr'])
# if a list of snr values: choose randomly
if isinstance(kwargs_aug['snr'], list):
snr = np.random.choice(kwargs_aug['snr'])
else:
snr = np.random.choice(snr)
samples_augmented = sp.augment.add_white_noise(samples_augmented,
sr = sr,
snr = snr)
augmentation += '_whitenoise{}SNR'.format(snr)
if speed_increase:
if aug_settings is not None:
kwargs_aug = aug_settings['speed_increase']
else:
kwargs_aug = dict([('perc', speed_perc)])
samples_augmented = sp.augment.speed_increase(samples_augmented,
sr = sr,
**kwargs_aug)
augmentation += '_speedincrease{}'.format(kwargs_aug['perc'])
elif speed_decrease:
if aug_settings is not None:
kwargs_aug = aug_settings['speed_decrease']
else:
kwargs_aug = dict([('perc', speed_perc)])
samples_augmented = sp.augment.speed_decrease(samples_augmented,
sr = sr,
**kwargs_aug)
augmentation += '_speeddecrease{}'.format(kwargs_aug['perc'])
if time_shift:
samples_augmented = sp.augment.time_shift(samples_augmented,
sr = sr)
augmentation += '_randtimeshift'
if shufflesound:
if aug_settings is not None:
kwargs_aug = aug_settings['shufflesound']
else:
kwargs_aug = dict([('num_subsections', num_subsections)])
samples_augmented = sp.augment.shufflesound(samples_augmented,
sr = sr,
**kwargs_aug)
augmentation += '_randshuffle{}sections'.format(kwargs_aug['num_subsections'])
if harmonic_distortion:
samples_augmented = sp.augment.harmonic_distortion(samples_augmented,
sr = sr)
augmentation += '_harmonicdistortion'
if pitch_increase:
if aug_settings is not None:
kwargs_aug = aug_settings['pitch_increase']
else:
kwargs_aug = dict([('num_semitones', num_semitones)])
samples_augmented = sp.augment.pitch_increase(samples_augmented,
sr = sr,
**kwargs_aug)
augmentation += '_pitchincrease{}semitones'.format(kwargs_aug['num_semitones'])
elif pitch_decrease:
if aug_settings is not None:
kwargs_aug = aug_settings['pitch_decrease']
else:
kwargs_aug = dict([('num_semitones', num_semitones)])
samples_augmented = sp.augment.pitch_decrease(samples_augmented,
sr = sr,
**kwargs_aug)
augmentation += '_pitchdecrease{}semitones'.format(kwargs_aug['num_semitones'])
# all augmentation techniques return sample data except for vtlp
# therefore vtlp will be handled outside of this function (returns stft or powspec)
if vtlp:
pass
samples_augmented = sp.dsp.set_signal_length(samples_augmented, len(samples))
return samples_augmented, augmentation
# TODO: add default values?
# does real_signal influence shape??
[docs]def get_input_shape(kwargs_get_feats, labeled_data = False,
frames_per_sample = None, use_librosa = True, mode = 'reflect'):
# set defaults if not provided
try:
feature_type = kwargs_get_feats['feature_type']
except KeyError:
raise ValueError('Missing `feature_type` key and value.')
try:
dur_sec = kwargs_get_feats['dur_sec']
except KeyError:
raise ValueError('Missing `dur_sec` key and value.')
try:
sr = kwargs_get_feats['sr']
except KeyError:
kwargs_get_feats['sr'] = 441000
sr = kwargs_get_feats['sr']
try:
win_size_ms = kwargs_get_feats['win_size_ms']
except KeyError:
kwargs_get_feats['win_size_ms'] = 25
win_size_ms = kwargs_get_feats['win_size_ms']
try:
percent_overlap = kwargs_get_feats['percent_overlap']
except KeyError:
kwargs_get_feats['percent_overlap'] = 0.5
percent_overlap = kwargs_get_feats['percent_overlap']
try:
fft_bins = kwargs_get_feats['fft_bins']
except KeyError:
kwargs_get_feats['fft_bins'] = None
fft_bins = kwargs_get_feats['fft_bins']
try:
center = kwargs_get_feats['center']
except KeyError:
kwargs_get_feats['center'] = True
center = kwargs_get_feats['center']
try:
num_filters = kwargs_get_feats['num_filters']
except KeyError:
raise ValueError('Missing `num_filters` key and value.')
try:
num_mfcc = kwargs_get_feats['num_mfcc']
except KeyError:
kwargs_get_feats['num_mfcc'] = None
num_mfcc = kwargs_get_feats['num_mfcc']
try:
real_signal = kwargs_get_feats['real_signal']
except KeyError:
kwargs_get_feats['real_signal'] = True
real_signal = kwargs_get_feats['real_signal']
# figure out shape of data:
total_samples = sp.dsp.calc_frame_length(dur_sec*1000, sr=sr)
if use_librosa:
frame_length = sp.dsp.calc_frame_length(win_size_ms, sr)
win_shift_ms = win_size_ms - (win_size_ms * percent_overlap)
hop_length = int(win_shift_ms*0.001*sr)
if fft_bins is None:
fft_bins = int(win_size_ms * sr // 1000)
# librosa centers samples by default, sligthly adjusting total
# number of samples
if center:
y_zeros = np.zeros((total_samples,))
y_centered = np.pad(y_zeros, int(fft_bins // 2), mode=mode)
total_samples = len(y_centered)
# each audio file
if 'signal' in feature_type:
# don't apply fft to signal (not sectioned into overlapping windows)
total_rows_per_wav = total_samples // frame_length
else:
# do apply fft to signal (via Librosa) - (will be sectioned into overlapping windows)
total_rows_per_wav = int(1 + (total_samples - fft_bins)//hop_length)
# set defaults to num_feats if set as None:
if num_filters is None:
if 'mfcc' in feature_type:
if num_mfcc is None:
num_feats = 40
else:
num_feats = num_mfcc
elif 'fbank' in feature_type:
num_feats = 40
elif 'powspec' in feature_type or 'stft' in feature_type:
num_feats = int(1+fft_bins/2)
elif 'signal' in feature_type:
num_feats = frame_length
else:
raise ValueError('Feature type "{}" '.format(feature_type)+\
'not understood.\nMust include one of the following: \n'+\
', '.join(list_available_features()))
else:
if 'signal' in feature_type:
num_feats = frame_length
elif 'stft' in feature_type or 'powspec' in feature_type:
num_feats = int(1+fft_bins/2)
else:
num_feats = num_filters
if frames_per_sample is not None:
# want smaller windows, e.g. autoencoder denoiser or speech recognition
batch_size = math.ceil(total_rows_per_wav/frames_per_sample)
if labeled_data:
orig_shape = (batch_size, frames_per_sample, num_feats + 1)
input_shape = (orig_shape[0] * orig_shape[1],
orig_shape[2]-1)
else:
orig_shape = (batch_size, frames_per_sample, num_feats)
input_shape = (orig_shape[0]*orig_shape[1],
orig_shape[2])
else:
if labeled_data:
orig_shape = (int(total_rows_per_wav), num_feats + 1)
input_shape = (orig_shape[0], orig_shape[1]-1)
else:
orig_shape = (int(total_rows_per_wav), num_feats)
input_shape = orig_shape
return input_shape
[docs]def make_gen_callable(_gen):
'''Prepares Python generator for `tf.data.Dataset.from_generator`
Bug fix: Python generator fails to work in Tensorflow 2.2.0 +
Parameters
----------
_gen : generator
The generator function to feed to a deep neural network.
Returns
-------
x : np.ndarray [shape=(batch_size, num_frames, num_features, 1)]
The feature data
y : np.ndarray [shape=(1,1)]
The label for the feature data.
References
----------
Shu, Nicolas (2020) https://stackoverflow.com/a/62186572
CC BY-SA 4.0
'''
def gen():
for x,y in _gen:
yield x,y
return gen