Source code for soundpy.models.dataprep

'''The models.dataprep module covers functionality for feeding features to models.
'''

import os, sys
import inspect
currentdir = os.path.dirname(os.path.abspath(
    inspect.getfile(inspect.currentframe())))
packagedir = os.path.dirname(currentdir)
sys.path.insert(0, packagedir)
import numpy as np
import math
import random
import soundpy as sp
import librosa


###############################################################################

#feed data to models
[docs]class Generator:
[docs] def __init__(self, data_matrix1, data_matrix2=None, timestep = None, axis_timestep = 0, normalize=True, apply_log = False, context_window = None, axis_context_window = -2, labeled_data = False, gray2color = False, zeropad = True, desired_input_shape = None, combine_axes_0_1=False): ''' This generator pulls data out in sections (i.e. batch sizes). Prepared for 3 dimensional data. Note: Keras adds a dimension to input to represent the "Tensor" that #handles the input. This means that sometimes you have to add a shape of (1,) to the shape of the data. Parameters ---------- data_matrix1 : np.ndarray [size=(num_samples, batch_size, num_frames, num_features) or (num_samples, num_frames, num_features+label_column)] The training data. This can contain the feature and label data or just the input feature data. data_matrix2 : np.ndarray [size = (num_samples, ) `data_matrix1`.shape], optional Either label data for `data_matrix1` or, for example, the clean version of `data_matrix1` if training an autoencoder. (default None) normalize : bool If False, the data has already been normalized and won't be normalized by the generator. (default True) apply_log : bool If True, log will be applied to the data. timestep : int The number of frames to constitute a timestep. axis_timestep : int The axis to apply the `timestep` to. (default 0) context_window : int The size of `context_window` or number of samples padding a central frame. This may be useful for models training on small changes occuring in the signal, e.g. to break up the image of sound into smaller parts. axis_context_window : int The axis to `apply_context_window`, if `context_window` is not None. Ideally should be in axis preceding feature column. (default -2) zeropad : bool If features should be zeropadded in reshaping functions. desired_input_shape : int or tuple, optional The desired number of features or shape of data to feed a neural network. If type int, only the last column of features will be adjusted (zeropadded or limited). If tuple, the entire data shape will be adjusted (all columns). If the int or shape is larger than that of the data provided, data will be zeropadded. If the int or shape is smaller, the data will be restricted. (default None) ''' self.batch_size = 1 self.samples_per_epoch = data_matrix1.shape[0] self.number_of_batches = self.samples_per_epoch/self.batch_size self.counter = 0 self.datax = data_matrix1 self.datay = data_matrix2 self.normalize = normalize self.apply_log = apply_log self.timestep = timestep self.axis_timestep = axis_timestep self.context_window = context_window self.axis_context = axis_context_window self.zeropad = zeropad self.gray2color = gray2color # if need to change grayscale data to rgb if self.datay is None: # separate the label from the feature data self.datax, self.datay = sp.feats.separate_dependent_var(self.datax) if self.datay.dtype == np.complex64 or self.datay.dtype == np.complex64: self.datay = self.datay.astype(float) # assumes last column of features is the label column self.num_feats = self.datax.shape[-1] self.labels = True else: self.labels = None if labeled_data: self.labels = True self.desired_shape = desired_input_shape self.combine_axes_0_1 = combine_axes_0_1
[docs] def generator(self): '''Shapes, norms, and feeds data depending on labeled or non-labeled data. ''' while 1: # will be size (batch_size, num_frames, num_features) batch_x = self.datax[self.counter] batch_y = self.datay[self.counter] # ensure label is shape (1,) if self.labels: if isinstance(batch_y, np.ndarray) and len(batch_y) > 1: batch_y = batch_y[:0] if not isinstance(batch_y, np.ndarray): batch_y = np.expand_dims(batch_y, axis=0) # TODO: is there a difference between taking log of stft before # or after normalization? if self.normalize or self.datax.dtype == np.complex_: # if complex data, power spectrum will be extracted # power spectrum = np.abs(complex_data)**2 batch_x = sp.feats.normalize(batch_x) if self.labels is None: batch_y = sp.feats.normalize(batch_y) # apply log if specified if self.apply_log: batch_x = np.log(np.abs(batch_x)) # don't need to touch label data if self.labels is None: batch_y = np.log(np.abs(batch_y)) # reshape features to allow for timestep / subsection features if self.timestep is not None: batch_x = sp.feats.apply_new_subframe( batch_x, new_frame_size = self.timestep, zeropad = self.zeropad, axis = self.axis_timestep) if self.labels is None: batch_y = sp.feats.apply_new_subframe( batch_y, new_frame_size = self.timestep, zeropad = self.zeropad, axis = self.axis_timestep) # reshape features to allow for context window / subsection features if self.context_window is not None: batch_x = sp.feats.apply_new_subframe( batch_x, new_frame_size = self.context_window * 2 + 1, zeropad = self.zeropad, axis = self.axis_context) if self.labels is None: batch_y = apply_new_subframe( batch_y, new_frame_size = self.context_window * 2 + 1, zeropad = self.zeropad, axis = self.axis_context) if self.gray2color: # expects colorscale to be rgb (i.e. 3) # will copy first channel into the other color channels batch_x = sp.feats.grayscale2color(batch_x, colorscale = 3) if self.labels is None: batch_y = sp.feats.grayscale2color(batch_y, colorscale = 3) if self.labels: if batch_y.dtype == np.complex64 or batch_y.dtype == np.complex128: batch_y = batch_y.astype(int) # TODO test # if need greater number of features --> zero padding # could this be applied to including both narrowband and wideband data? # check to ensure batches match desired input shape if self.combine_axes_0_1 is True: batch_x = batch_x.reshape((batch_x.shape[0]*batch_x.shape[1],)+ batch_x.shape[2:]) if self.labels is None: batch_y = batch_y.reshape((batch_y.shape[0]*batch_y.shape[1],)+ batch_y.shape[2:]) if self.desired_shape is not None: # can add dimensions of length 1 to first and last axis: try: batch_x = sp.feats.adjust_shape(batch_x, self.desired_shape) if self.labels is None: batch_y = sp.feats.adjust_shape(batch_y, self.desired_shape) except ValueError: raise ValueError('Data batch with shape {}'.format(batch_x.shape))+\ ' cannot be reshaped to match `desired_input_shape` of '+\ '{}. Perhaps try setting '.format(self.desired_shape) +\ 'parameter `combine_axes_0_1` to True or False. ' +\ '(default is False)' #send the batched and reshaped data to model self.counter += 1 yield batch_x, batch_y #restart counter to yeild data in the next epoch as well if self.counter >= self.number_of_batches: self.counter = 0
[docs]class GeneratorFeatExtraction(Generator): def __init__(self, datalist, datalist2 = None, model_name = None, normalize = True, apply_log = False, randomize = True, random_seed=None, desired_input_shape = None, timestep = None, axis_timestep = 0, context_window = None, axis_context_window = -2, batch_size = 1, gray2color = False, visualize = False, vis_every_n_items = 50, visuals_dir = None, decode_dict = None, dataset='train', augment_dict = None, label_silence = False, vad_start_end = False, **kwargs): ''' Parameters ---------- datalist : list List of audiofile pathways for feature extraction and model training. If labeled data, expects pathway and encoded label (i.e. int) to be paired together in a tuple (a list of tuples). datalist2 : list, optional List of audiofile pathways or labels for feature extraction and model training. This list might contain clean versions of `datalist`. These will be assigned as the 'label' or expected output of the input features. vad_start_end : bool If True, VAD will be applied only to the beginng and end of the signal, to clip off the silences. If False, VAD will be applied to the entire signal; however, this is potentially finicky. **kwargs : additional keyword arguments Keyword arguments for soundpy.feats.get_feats ''' if desired_input_shape is None and 'dur_sec' not in kwargs.keys(): raise ValueError('No information pertaining to amount of audio data '+\ 'to be extracted is supplied. Please specify `sample_length`, '+\ '`desired_input_shape`, or `dur_sec`.') if randomize: # to ensure each iteration data is randomized random_seed = np.random.choice(range(100)) if random_seed is not None: random.seed(random_seed) random.shuffle(datalist) if datalist2 is not None: if random_seed is not None: random.seed(random_seed) else: raise ValueError('If two audiolists lists are provided, ' +\ 'a `random_seed` is necessary to ensure they still match '+\ 'post randomization.') random.shuffle(datalist2) self.dataset = dataset self.label_silence = label_silence self.vad_start_end = vad_start_end self.model_name = model_name self.batch_size = batch_size self.samples_per_epoch = len(datalist) self.number_of_batches = self.samples_per_epoch//batch_size self.counter = 0 self.audiolist = datalist self.audiolist2 = datalist2 self.context_window = context_window self.axis_context = axis_context_window self.timestep = timestep self.axis_timestep = axis_timestep self.normalize = normalize self.apply_log = apply_log self.desired_input_shape = desired_input_shape self.gray2color = gray2color self.visualize = visualize self.vis_every_n_items = vis_every_n_items self.visuals_dir = visuals_dir if decode_dict is None: decode_dict = dict() self.decode_dict = decode_dict if label_silence: if 'silence' not in decode_dict.values(): raise ValueError('Cannot apply `silence` label if not included in '+\ '`decode_dict`.') if augment_dict is None: augment_dict = dict() self.augment_dict = augment_dict self.kwargs = kwargs # Ensure `feature_type` and `sr` are provided in **kwargs try: feature_type = kwargs['feature_type'] except KeyError: raise KeyError('Feature type not indicated. '+\ 'Please set `feature_type` to one of the following: '+\ '\nfbank\nstft\npowspec\nsignal\nmfcc\n') try: sr = kwargs['sr'] except KeyError: raise KeyError('Sample rate is not indicated. '+\ 'Please set `sr` (e.g. sr = 22050)')
[docs] def generator(self): '''Extracts features and feeds them to model according to `desired_input_shape`. ''' while 1: augmentation = '' audioinfo = self.audiolist[self.counter] # does the list contain label audiofile pairs? if isinstance(audioinfo, tuple): if len(audioinfo) != 2: raise ValueError('Expected tuple containing audio file path and label. '+\ 'Instead received tuple of length: \n{}'.format(len(audioinfo))) # if label is a string digit, int, or float - turn to int if isinstance(audioinfo[0], int) or isinstance(audioinfo[0], float) or \ isinstance(audioinfo[0], str) and audioinfo[0].isdigit(): label = int(audioinfo[0]) audiopath = audioinfo[1] elif isinstance(audioinfo[1], int) or isinstance(audioinfo[1], float) or \ isinstance(audioinfo[1], str) and audioinfo[1].isdigit(): label = int(audioinfo[1]) audiopath = audioinfo[1] else: raise ValueError('Expected tuple to contain an integer label '+\ 'and audio pathway. Received instead tuple with types '+\ '{} and {}.'.format(type(audioinfo[0]), type(audioinfo[1]))) # otherwise list of audiofiles else: audiopath = audioinfo label = None if self.audiolist2 is not None: # expects audiolist2 to be either integer labels or audiofile pathways audioinfo2 = self.audiolist2[self.counter] if isinstance(audioinfo2, int) or isinstance(audioinfo2, str) and \ audioinfo2.isdigit(): if label is None: label = audioinfo2 else: if label == int(audioinfo2): pass else: raise ValueError('Provided conflicting labels for '+\ 'current audiofile: {}.'.format(audiopath) +\ '\nReceived both label {} and {} .'.format( label, int(audioinfo2))) audiopath2 = None else: audiopath2 = audioinfo2 else: audiopath2 = None if label is not None: labeled_data = True if self.decode_dict is not None: try: label_pic = self.decode_dict[label].upper() except KeyError: # dictionary keys might be string type, not int type label_pic = self.decode_dict[str(int(label))].upper() else: label_pic = label else: labeled_data = False label_pic = None # ensure audio is valid: y, sr = sp.loadsound(audiopath, self.kwargs['sr']) if audiopath2: y2, sr2 = sp.loadsound(audiopath2, self.kwargs['sr']) else: y2, sr2 = None, None if self.label_silence: if self.vad_start_end: y_stft, vad = sp.dsp.get_stft_clipped(y, sr=sr, win_size_ms = 50, percent_overlap = 0.5) else: y_stft, __ = sp.feats.get_vad_stft(y, sr=sr, win_size_ms = 50, percent_overlap = 0.5, use_beg_ms = 120, energy_thresh = 40, freq_thresh = 185, sfm_thresh = 5) if not y_stft.any(): label = len(self.decode_dict)-1 print('\nNo voice activity detected in {}'.format(audiopath)) print('Label {} adjusted to {}.'.format(label_pic,self.decode_dict[label])) label_pic = self.decode_dict[label] # augment_data if self.augment_dict is not None: aug_dict = randomize_augs(self.augment_dict) augmented_data, augmentation = augment_features(y, self.kwargs['sr'], **aug_dict) if audiopath2: # remove 'add_white_noise' if in aug_dict aug_dict2 = {} for key, value in aug_dict.items(): if key != 'add_white_noise': aug_dict2[key] = value augmented_data2, augmentation2 = augment_features(y2, self.kwargs['sr'], **aug_dict2) else: augmented_data, augmentation = y, '' aug_dict = dict() augmented_data2, augmentation2 = y2, '' aug_dict2 = dict() # extract features # will be shape (num_frames, num_features) if 'vtlp' in aug_dict and aug_dict['vtlp']: sr = self.kwargs['sr'] win_size_ms = sp.utils.restore_dictvalue(self.kwargs['win_size_ms']) percent_overlap = sp.utils.restore_dictvalue(self.kwargs['percent_overlap']) fft_bins = sp.utils.restore_dictvalue(self.kwargs['fft_bins']) window = sp.utils.restore_dictvalue(self.kwargs['window']) real_signal = sp.utils.restore_dictvalue(self.kwargs['real_signal']) feature_type_vtlp = 'stft' dur_sec = sp.utils.restore_dictvalue(self.kwargs['dur_sec']) zeropad = sp.utils.restore_dictvalue(self.kwargs['zeropad']) # need to tell vtlp the size of fft we need, in order to # be able to extract fbank and mfcc features as well expected_stft_shape, __ = sp.feats.get_feature_matrix_shape( sr = sr, dur_sec = dur_sec, feature_type = feature_type_vtlp, win_size_ms = win_size_ms, percent_overlap = percent_overlap, fft_bins = fft_bins, zeropad = zeropad, real_signal = real_signal) # TODO bug fix: oversize_factor higher than 1: # how to reduce dimension back to `expected_stft_shape` without # shaving off data? oversize_factor = 16 augmented_data, alpha = sp.augment.vtlp( augmented_data, sr, win_size_ms = win_size_ms, percent_overlap = percent_overlap, fft_bins = fft_bins, window = window, real_signal = real_signal, expected_shape = expected_stft_shape, oversize_factor = oversize_factor, visualize=False) # vtlp was last augmentation to be added to `augmentation` string # add the value that was applied augmentation += '_vtlp'+str(alpha) # need to be able to set alpha augmented_data2, alpha2 = sp.augment.vtlp( augmented_data2, sr, a = alpha, win_size_ms = win_size_ms, percent_overlap = percent_overlap, fft_bins = fft_bins, window = window, real_signal = real_signal, expected_shape = expected_stft_shape, oversize_factor = oversize_factor, visualize=False) try: assert alpha == alpha2 except AssertionError: raise ValueError('The alpha value for vtlp application '+\ 'does not match for the X and y audio: '+\ 'X alpha is {} and y alpha is {}'.format(alpha, alpha2)) # vtlp was last augmentation to be added to `augmentation` string # add the value that was applied augmentation2 += '_vtlp'+str(alpha) if 'vtlp' in aug_dict and aug_dict['vtlp']: if 'stft' in self.kwargs['feature_type'] or \ 'powspec' in self.kwargs['feature_type']: if 'stft' in self.kwargs['feature_type'] and oversize_factor > 1: import warnings msg = '\nWARNING: due to resizing of STFT matrix due to '+\ ' `oversize_factor` {}, converted to '.format(oversize_factor)+\ 'power spectrum. Phase information has been removed.' warnings.warn(msg) feats = augmented_data if audiopath2: feats2 = augmented_data2 if 'powspec' in self.kwargs['feature_type'] and oversize_factor == 1: # otherwise already a power spectrum feats = sp.dsp.calc_power(feats) if audiopath2: feats2 = sp.dsp.calc_power(feats2) elif 'stft'in self.kwargs['feature_type'] or \ 'powspec' in self.kwargs['feature_type']: feats = sp.feats.get_stft( augmented_data, sr = self.kwargs['sr'], win_size_ms = self.kwargs['win_size_ms'], percent_overlap = self.kwargs['percent_overlap'], real_signal = self.kwargs['real_signal'], fft_bins = self.kwargs['fft_bins'], rate_of_change = self.kwargs['rate_of_change'], rate_of_acceleration = self.kwargs['rate_of_acceleration'], window = self.kwargs['window'], zeropad = self.kwargs['zeropad'] ) if audiopath2: feats2 = sp.feats.get_stft( augmented_data2, sr = self.kwargs['sr'], win_size_ms = self.kwargs['win_size_ms'], percent_overlap = self.kwargs['percent_overlap'], real_signal = self.kwargs['real_signal'], fft_bins = self.kwargs['fft_bins'], rate_of_change = self.kwargs['rate_of_change'], rate_of_acceleration = self.kwargs['rate_of_acceleration'], window = self.kwargs['window'], zeropad = self.kwargs['zeropad'] ) if 'powspec' in self.kwargs['feature_type']: feats = sp.dsp.calc_power(feats) if audiopath2: feats2 = sp.dsp.calc_power(feats2) if 'fbank' in self.kwargs['feature_type']: feats = sp.feats.get_fbank( augmented_data, sr = self.kwargs['sr'], num_filters = self.kwargs['num_filters'], win_size_ms = self.kwargs['win_size_ms'], percent_overlap = self.kwargs['percent_overlap'], real_signal = self.kwargs['real_signal'], fft_bins = self.kwargs['fft_bins'], rate_of_change = self.kwargs['rate_of_change'], rate_of_acceleration = self.kwargs['rate_of_acceleration'], window = self.kwargs['window'], zeropad = self.kwargs['zeropad'] ) if audiopath2: feats2 = sp.feats.get_fbank( augmented_data2, sr = self.kwargs['sr'], num_filters = self.kwargs['num_filters'], win_size_ms = self.kwargs['win_size_ms'], percent_overlap = self.kwargs['percent_overlap'], real_signal = self.kwargs['real_signal'], fft_bins = self.kwargs['fft_bins'], rate_of_change = self.kwargs['rate_of_change'], rate_of_acceleration = self.kwargs['rate_of_acceleration'], window = self.kwargs['window'], zeropad = self.kwargs['zeropad'] ) elif 'mfcc' in self.kwargs['feature_type']: feats = sp.feats.get_mfcc( augmented_data, sr = self.kwargs['sr'], num_mfcc = self.kwargs['num_mfcc'], num_filters = self.kwargs['num_filters'], win_size_ms = self.kwargs['win_size_ms'], percent_overlap = self.kwargs['percent_overlap'], real_signal = self.kwargs['real_signal'], fft_bins = self.kwargs['fft_bins'], rate_of_change = self.kwargs['rate_of_change'], rate_of_acceleration = self.kwargs['rate_of_acceleration'], window = self.kwargs['window'], zeropad = self.kwargs['zeropad'] ) if audiopath2: feats2 = sp.feats.get_mfcc( augmented_data2, sr = self.kwargs['sr'], num_mfcc = self.kwargs['num_mfcc'], num_filters = self.kwargs['num_filters'], win_size_ms = self.kwargs['win_size_ms'], percent_overlap = self.kwargs['percent_overlap'], real_signal = self.kwargs['real_signal'], fft_bins = self.kwargs['fft_bins'], rate_of_change = self.kwargs['rate_of_change'], rate_of_acceleration = self.kwargs['rate_of_acceleration'], window = self.kwargs['window'], zeropad = self.kwargs['zeropad'] ) if self.apply_log: # TODO test if feats[0].any() < 0: feats = np.abs(feats) feats = np.log(feats) if self.normalize: feats = sp.feats.normalize(feats) if audiopath2: if self.apply_log: # TODO test if feats2[0].any() < 0: feats2 = np.abs(feats2) feats2 = np.log(feats2) if self.normalize: feats2 = sp.feats.normalize(feats2) else: feats2 = None # Save visuals if desired if self.visualize: if self.counter % self.vis_every_n_items == 0: # make augmentation string more legible. augments_vis = augmentation[1:].split('_') if len(augments_vis) > 1: augs1 = augments_vis[:len(augments_vis)//2] augs2 = augments_vis[len(augments_vis)//2:] augs1 = ', '.join(augs1) augs2 = ', '.join(augs2) else: augs1 = augments_vis[0] augs2 = '' if self.visuals_dir is not None: save_visuals_path = sp.check_dir(self.visuals_dir, make=True) else: save_visuals_path = sp.check_dir('./training_images/', make=True) save_visuals_path = save_visuals_path.joinpath( '{}_label{}_training_{}_{}_{}.png'.format( self.dataset, label_pic, self.model_name, augmentation, sp.utils.get_date())) feature_type = self.kwargs['feature_type'] sr = self.kwargs['sr'] win_size_ms = self.kwargs['win_size_ms'] percent_overlap = self.kwargs['percent_overlap'] if 'stft' in feature_type or 'powspec' in feature_type or 'fbank' \ in feature_type: energy_scale = 'power_to_db' else: energy_scale = None sp.feats.plot( feature_matrix = feats, feature_type = feature_type, sr = sr, win_size_ms = win_size_ms, percent_overlap = percent_overlap, energy_scale = energy_scale, save_pic = True, name4pic = save_visuals_path, title = '"{}" {} Aug: {}-\n{}'.format( label_pic, feature_type.upper(), augs1, augs2), subprocess=True) #use Agg backend for plotting if feats2 is not None: # add '_2' to pathway p = sp.utils.string2pathlib(save_visuals_path) p2 = p.name.stem save_visuals_path2 = p.parent.joinpath(p2+'_2'+p.name.suffix) sp.feats.plot( feature_matrix = feats2, feature_type = feature_type, sr = sr, win_size_ms = win_size_ms, percent_overlap = percent_overlap, energy_scale = energy_scale, save_pic = True, name4pic = save_visuals_path2, title = 'Output {} features {}'.format( label_pic, feature_type), subprocess=True) batch_x = feats batch_y = feats2 # reshape features to allow for timestep / subsection features if self.timestep is not None: batch_x = sp.feats.apply_new_subframe( batch_x, new_frame_size = self.timestep, zeropad = self.kwargs['zeropad'], axis = self.axis_timestep) if batch_y is not None: batch_y = sp.feats.apply_new_subframe( batch_y, new_frame_size = self.timestep, zeropad = self.kwargs['zeropad'], axis = self.axis_timestep) # reshape features to allow for context window / subsection features if self.context_window is not None: batch_x = sp.feats.apply_new_subframe( batch_x, new_frame_size = self.context_window * 2 + 1, zeropad = self.kwargs['zeropad'], axis = self.axis_context) if batch_y is not None: batch_y = apply_new_subframe( batch_y, new_frame_size = self.context_window * 2 + 1, zeropad = self.kwargs['zeropad'], axis = self.axis_context) # grayscale 2 color if self.gray2color: batch_x = sp.feats.grayscale2color(batch_x, colorscale = 3) # default colorscale is 3 if batch_y is not None: batch_y = sp.feats.grayscale2color(batch_y, colorscale = 3) # reshape to input shape. Will be zeropadded or limited to this shape. # tensor dimensions on either side can be added here as well. if self.desired_input_shape is not None: batch_x = sp.feats.adjust_shape(batch_x, self.desired_input_shape) if batch_y is not None: batch_y = sp.feats.adjust_shape(batch_y, self.desired_input_shape) # prepare data to be fed to network: if labeled_data: # has to be at least (1,) batch_y = np.expand_dims(np.array(label), axis=0) elif batch_y is not None: pass else: raise ValueError('No independent variable provided.') self.counter += 1 yield batch_x, batch_y #restart counter to yield data in the next epoch as well if self.counter >= self.number_of_batches: self.counter = 0
[docs]def check4na(numpyarray): if not np.isfinite(numpyarray).all(): print('NAN present.') return True else: return False
[docs]def randomize_augs(aug_dict, random_seed=None): '''Creates copy of dict and chooses which augs applied randomly. Can apply random seed for number of augmentations applied and shuffling order of possible augmentations. ''' possible_augs = [] num_possible_aug = 0 if aug_dict is not None: for key, value in aug_dict.items(): if value == True: num_possible_aug += 1 possible_augs.append(key) if random_seed is not None: np.random.seed(random_seed) num_augs = np.random.choice(range(num_possible_aug+1)) if num_augs == 0: # no augmentations applied: new_dict = dict(aug_dict) for key, value in new_dict.items(): if value == True: new_dict[key] = False return new_dict if random_seed is not None: random.seed(random_seed) random.shuffle(possible_augs) augs = possible_augs[:num_augs] augs_leftover = augs[num_augs:] if 'speed_increase' in augs and 'speed_decrease' in augs: i1 = augs.index('speed_increase') i2 = augs.index('speed_decrease') x = [i1, i2] random.shuffle(x) speed2remove = augs.pop(x[0]) if augs_leftover: aug_added = augs_leftover.pop(0) augs.append(aug_added) if 'pitch_increase' in augs and 'pitch_decrease' in augs: i1 = augs.index('pitch_increase') i2 = augs.index('pitch_decrease') x = [i1, i2] random.shuffle(x) pitch2remove = augs.pop(x[0]) if augs_leftover: aug_added = augs_leftover.pop(0) augs.append(aug_added) new_dict = dict(aug_dict) for key, value in new_dict.items(): if value == True: if key not in augs: new_dict[key] = False return new_dict
[docs]def augment_features(sound, sr, add_white_noise = False, snr = [5,10,20], speed_increase = False, speed_decrease = False, speed_perc = 0.15, time_shift = False, shufflesound = False, num_subsections = 3, harmonic_distortion = False, pitch_increase = False, pitch_decrease = False, num_semitones = 2, vtlp = False, bilinear_warp = True, augment_settings_dict = None, random_seed = None, ): '''Randomly applies augmentations to audio. If no `augment_settings_dict`, defaults applied. ''' if augment_settings_dict is not None: aug_settings = dict(augment_settings_dict) else: aug_settings = augment_settings_dict if speed_increase and speed_decrease: raise ValueError('Cannot have both speed_increase and speed_decrease'+\ ' as augmentation options. Set just one to True.') if pitch_increase and pitch_decrease: raise ValueError('Cannot have both pitch_increase and pitch_decrease'+\ ' as augmentation options. Set just one to True.') if isinstance(sound, np.ndarray): data = sound else: data, sr2 = sp.loadsound(sound, sr=sr) assert sr2 == sr samples = data.copy() samples_augmented = samples.copy() augmentation = '' if add_white_noise: # allow default settings to be used/overwritten if aug_settings is not None: kwargs_aug = aug_settings['add_white_noise'] if isinstance(kwargs_aug['snr'], str): kwargs_aug['snr'] = sp.utils.restore_dictvalue(kwargs_aug['snr']) # if a list of snr values: choose randomly if isinstance(kwargs_aug['snr'], list): snr = np.random.choice(kwargs_aug['snr']) else: snr = np.random.choice(snr) samples_augmented = sp.augment.add_white_noise(samples_augmented, sr = sr, snr = snr) augmentation += '_whitenoise{}SNR'.format(snr) if speed_increase: if aug_settings is not None: kwargs_aug = aug_settings['speed_increase'] else: kwargs_aug = dict([('perc', speed_perc)]) samples_augmented = sp.augment.speed_increase(samples_augmented, sr = sr, **kwargs_aug) augmentation += '_speedincrease{}'.format(kwargs_aug['perc']) elif speed_decrease: if aug_settings is not None: kwargs_aug = aug_settings['speed_decrease'] else: kwargs_aug = dict([('perc', speed_perc)]) samples_augmented = sp.augment.speed_decrease(samples_augmented, sr = sr, **kwargs_aug) augmentation += '_speeddecrease{}'.format(kwargs_aug['perc']) if time_shift: samples_augmented = sp.augment.time_shift(samples_augmented, sr = sr) augmentation += '_randtimeshift' if shufflesound: if aug_settings is not None: kwargs_aug = aug_settings['shufflesound'] else: kwargs_aug = dict([('num_subsections', num_subsections)]) samples_augmented = sp.augment.shufflesound(samples_augmented, sr = sr, **kwargs_aug) augmentation += '_randshuffle{}sections'.format(kwargs_aug['num_subsections']) if harmonic_distortion: samples_augmented = sp.augment.harmonic_distortion(samples_augmented, sr = sr) augmentation += '_harmonicdistortion' if pitch_increase: if aug_settings is not None: kwargs_aug = aug_settings['pitch_increase'] else: kwargs_aug = dict([('num_semitones', num_semitones)]) samples_augmented = sp.augment.pitch_increase(samples_augmented, sr = sr, **kwargs_aug) augmentation += '_pitchincrease{}semitones'.format(kwargs_aug['num_semitones']) elif pitch_decrease: if aug_settings is not None: kwargs_aug = aug_settings['pitch_decrease'] else: kwargs_aug = dict([('num_semitones', num_semitones)]) samples_augmented = sp.augment.pitch_decrease(samples_augmented, sr = sr, **kwargs_aug) augmentation += '_pitchdecrease{}semitones'.format(kwargs_aug['num_semitones']) # all augmentation techniques return sample data except for vtlp # therefore vtlp will be handled outside of this function (returns stft or powspec) if vtlp: pass samples_augmented = sp.dsp.set_signal_length(samples_augmented, len(samples)) return samples_augmented, augmentation
# TODO: add default values? # does real_signal influence shape??
[docs]def get_input_shape(kwargs_get_feats, labeled_data = False, frames_per_sample = None, use_librosa = True, mode = 'reflect'): # set defaults if not provided try: feature_type = kwargs_get_feats['feature_type'] except KeyError: raise ValueError('Missing `feature_type` key and value.') try: dur_sec = kwargs_get_feats['dur_sec'] except KeyError: raise ValueError('Missing `dur_sec` key and value.') try: sr = kwargs_get_feats['sr'] except KeyError: kwargs_get_feats['sr'] = 441000 sr = kwargs_get_feats['sr'] try: win_size_ms = kwargs_get_feats['win_size_ms'] except KeyError: kwargs_get_feats['win_size_ms'] = 25 win_size_ms = kwargs_get_feats['win_size_ms'] try: percent_overlap = kwargs_get_feats['percent_overlap'] except KeyError: kwargs_get_feats['percent_overlap'] = 0.5 percent_overlap = kwargs_get_feats['percent_overlap'] try: fft_bins = kwargs_get_feats['fft_bins'] except KeyError: kwargs_get_feats['fft_bins'] = None fft_bins = kwargs_get_feats['fft_bins'] try: center = kwargs_get_feats['center'] except KeyError: kwargs_get_feats['center'] = True center = kwargs_get_feats['center'] try: num_filters = kwargs_get_feats['num_filters'] except KeyError: raise ValueError('Missing `num_filters` key and value.') try: num_mfcc = kwargs_get_feats['num_mfcc'] except KeyError: kwargs_get_feats['num_mfcc'] = None num_mfcc = kwargs_get_feats['num_mfcc'] try: real_signal = kwargs_get_feats['real_signal'] except KeyError: kwargs_get_feats['real_signal'] = True real_signal = kwargs_get_feats['real_signal'] # figure out shape of data: total_samples = sp.dsp.calc_frame_length(dur_sec*1000, sr=sr) if use_librosa: frame_length = sp.dsp.calc_frame_length(win_size_ms, sr) win_shift_ms = win_size_ms - (win_size_ms * percent_overlap) hop_length = int(win_shift_ms*0.001*sr) if fft_bins is None: fft_bins = int(win_size_ms * sr // 1000) # librosa centers samples by default, sligthly adjusting total # number of samples if center: y_zeros = np.zeros((total_samples,)) y_centered = np.pad(y_zeros, int(fft_bins // 2), mode=mode) total_samples = len(y_centered) # each audio file if 'signal' in feature_type: # don't apply fft to signal (not sectioned into overlapping windows) total_rows_per_wav = total_samples // frame_length else: # do apply fft to signal (via Librosa) - (will be sectioned into overlapping windows) total_rows_per_wav = int(1 + (total_samples - fft_bins)//hop_length) # set defaults to num_feats if set as None: if num_filters is None: if 'mfcc' in feature_type: if num_mfcc is None: num_feats = 40 else: num_feats = num_mfcc elif 'fbank' in feature_type: num_feats = 40 elif 'powspec' in feature_type or 'stft' in feature_type: num_feats = int(1+fft_bins/2) elif 'signal' in feature_type: num_feats = frame_length else: raise ValueError('Feature type "{}" '.format(feature_type)+\ 'not understood.\nMust include one of the following: \n'+\ ', '.join(list_available_features())) else: if 'signal' in feature_type: num_feats = frame_length elif 'stft' in feature_type or 'powspec' in feature_type: num_feats = int(1+fft_bins/2) else: num_feats = num_filters if frames_per_sample is not None: # want smaller windows, e.g. autoencoder denoiser or speech recognition batch_size = math.ceil(total_rows_per_wav/frames_per_sample) if labeled_data: orig_shape = (batch_size, frames_per_sample, num_feats + 1) input_shape = (orig_shape[0] * orig_shape[1], orig_shape[2]-1) else: orig_shape = (batch_size, frames_per_sample, num_feats) input_shape = (orig_shape[0]*orig_shape[1], orig_shape[2]) else: if labeled_data: orig_shape = (int(total_rows_per_wav), num_feats + 1) input_shape = (orig_shape[0], orig_shape[1]-1) else: orig_shape = (int(total_rows_per_wav), num_feats) input_shape = orig_shape return input_shape
[docs]def make_gen_callable(_gen): '''Prepares Python generator for `tf.data.Dataset.from_generator` Bug fix: Python generator fails to work in Tensorflow 2.2.0 + Parameters ---------- _gen : generator The generator function to feed to a deep neural network. Returns ------- x : np.ndarray [shape=(batch_size, num_frames, num_features, 1)] The feature data y : np.ndarray [shape=(1,1)] The label for the feature data. References ---------- Shu, Nicolas (2020) https://stackoverflow.com/a/62186572 CC BY-SA 4.0 ''' def gen(): for x,y in _gen: yield x,y return gen