Source code for soundpy.models.builtin

'''The soundpy.models.builtin module includes example functions that train neural
networks on sound data.
''' 
import time
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam
import librosa
import collections

import os, sys
import inspect
currentdir = os.path.dirname(os.path.abspath(
    inspect.getfile(inspect.currentframe())))
packagedir = os.path.dirname(currentdir)
sys.path.insert(0, packagedir)
import soundpy as sp
from soundpy import models as spdl


[docs]def denoiser_train(feature_extraction_dir, model_name = 'model_autoencoder_denoise', feature_type = None, use_generator = True, normalize = True, patience = 10, **kwargs): '''Collects training features and train autoencoder denoiser. Parameters ---------- feature_extraction_dir : str or pathlib.PosixPath Directory where extracted feature files are located (format .npy). model_name : str The name for the model. This can be quite generic as the date up to the millisecond will be added to ensure a unique name for each trained model. (default 'model_autoencoder_denoise') feature_type : str, optional The type of features that will be used to train the model. This is only for the purposes of naming the model. If set to None, it will not be included in the model name. use_generator : bool If True, a generator will be used to feed training data to the model. Otherwise the entire training data will be used to train the model all at once. (default True) normalize : bool If True, the data will be normalized before feeding to the model. (default False) patience : int Number of epochs to train without improvement before early stopping. **kwargs : additional keyword arguments The keyword arguments for keras.fit(). Note, the keyword arguments differ for validation data so be sure to use the correct keyword arguments, depending on if you use the generator or not. TODO: add link to keras.fit(). Returns ------- model_dir : pathlib.PosixPath The directory where the model and associated files can be found. See Also -------- soundpy.datasets.separate_train_val_test_files Generates paths lists for train, validation, and test files. Useful for noisy vs clean datasets and also for multiple training files. soundpy.models.generator The generator function that feeds data to the model. soundpy.models.modelsetup.setup_callbacks The function that sets up callbacks (e.g. logging, save best model, early stopping, etc.) soundpy.models.template_models.autoencoder_denoise Template model architecture for basic autoencoder denoiser. ''' if use_generator is False: import warnings msg = '\nWARNING: It is advised to set `use_generator` to True '+\ 'as memory issues are avoided and training is more reliable. '+\ 'There may be bugs in the functionality set to False. ' dataset_path = sp.utils.check_dir(feature_extraction_dir, make=False) # designate where to save model and related files if feature_type: model_name += '_'+feature_type + '_' + sp.utils.get_date() else: model_name += '_' + sp.utils.get_date() model_dir = dataset_path.joinpath(model_name) model_dir = sp.utils.check_dir(model_dir, make=True) model_name += '.h5' model_path = model_dir.joinpath(model_name) # prepare features files to load for training features_files = list(dataset_path.glob('*.npy')) # NamedTuple: 'datasets.train.noisy', 'datasets.train.clean', etc. datasets = sp.datasets.separate_train_val_test_files( features_files) # TODO test this: if not datasets.train: # perhaps data files located in subdirectories features_files = list(dataset_path.glob('**/*.npy')) datasets = sp.datasets.separate_train_val_test_files( features_files) if not datasets.train: raise FileNotFoundError('Could not locate train, validation, or test '+\ '.npy files in the provided directory: \n{}'.format(dataset_path) +\ '\nThis program expects "train", "val", or "test" to be '+\ 'included in each filename (not parent directory/ies) names.') # only need train and val feature data for autoencoder train_paths_noisy, train_paths_clean = datasets.train.noisy, datasets.train.clean val_paths_noisy, val_paths_clean = datasets.val.noisy, datasets.val.clean # make sure both dataset pathways match in length and order: try: assert len(train_paths_noisy) == len(train_paths_clean) assert len(val_paths_noisy) == len(val_paths_clean) except AssertionError: raise ValueError('Noisy and Clean datasets do not match in length. '+\ 'They must be the same length.') train_paths_noisy = sorted(train_paths_noisy) train_paths_clean = sorted(train_paths_clean) val_paths_noisy = sorted(val_paths_noisy) val_paths_clean = sorted(val_paths_clean) # load smaller dataset to determine input size: data_val_noisy = np.load(val_paths_noisy[0]) # expect shape (num_audiofiles, batch_size, num_frames, num_features) # but don't need batch size for denoiser... combine w num_frames in generator # with 'combine_axes_0_1' = True if len(data_val_noisy.shape) == 4: input_shape = (data_val_noisy.shape[1] * data_val_noisy.shape[2], data_val_noisy.shape[3], 1) combine_axes_0_1 = True # expect shape (num_audiofiles, num_frames, num_features) elif len(data_val_noisy.shape) == 3: input_shape = data_val_noisy.shape[1:] + (1,) combine_axes_0_1 = False del data_val_noisy # setup model denoiser, settings_dict = spdl.autoencoder_denoise( input_shape = input_shape) # create callbacks variable if not in kwargs # allow users to use different callbacks if desired if 'callbacks' not in kwargs: callbacks = spdl.setup_callbacks(patience = patience, best_modelname = model_path, log_filename = model_dir.joinpath('log.csv')) adm = tf.keras.optimizers.Adam(learning_rate=0.0001) denoiser.compile(optimizer=adm, loss='binary_crossentropy') # TODO remove? # save variables that are not too large: local_variables = locals() global_variables = globals() sp.utils.save_dict( dict2save = local_variables, filename = model_dir.joinpath('local_variables_{}.csv'.format( model_name)), overwrite=True) sp.utils.save_dict( dict2save = global_variables, filename = model_dir.joinpath('global_variables_{}.csv'.format( model_name)), overwrite = True) # start training start = time.time() for i, train_path in enumerate(train_paths_noisy): if i == 0: if 'epochs' in kwargs: epochs = kwargs['epochs'] else: epochs = 10 # default in Keras kwargs['epochs'] = epochs total_epochs = epochs * len(train_paths_noisy) print('\n\nThe model will be trained {} epochs per '.format(epochs)+\ 'training session. \nTotal possible epochs: {}\n\n'.format(total_epochs)) start_session = time.time() data_train_noisy_path = train_path data_train_clean_path = train_paths_clean[i] # just use first validation data file data_val_noisy_path = val_paths_noisy[0] data_val_clean_path = val_paths_clean[0] print('\nTRAINING SESSION ',i+1) print("Training on: ") print(data_train_noisy_path) print(data_train_clean_path) print() data_train_noisy = np.load(data_train_noisy_path) data_train_clean = np.load(data_train_clean_path) data_val_noisy = np.load(data_val_noisy_path) data_val_clean = np.load(data_val_clean_path) # reinitiate 'callbacks' for additional iterations # TODO test for when callbacks already in **kwargs if i > 0: if 'callbacks' not in kwargs: callbacks = spdl.setup_callbacks( patience = patience, best_modelname = model_path, log_filename = model_dir.joinpath('log.csv')) #else: ## apply callbacks set in **kwargs #callbacks = kwargs['callbacks'] tensor = (1,) if use_generator: train_generator = spdl.Generator( data_matrix1 = data_train_noisy, data_matrix2 = data_train_clean, normalize = normalize, desired_input_shape = tensor + input_shape, combine_axes_0_1 = combine_axes_0_1) # don't need batchsize / context window val_generator = spdl.Generator( data_matrix1 = data_val_noisy, data_matrix2 = data_val_clean, normalize = normalize, desired_input_shape = tensor + input_shape, combine_axes_0_1 = combine_axes_0_1) # don't need batchsize / context window feats_noisy, feats_clean = next(train_generator.generator()) ds_train = tf.data.Dataset.from_generator( spdl.make_gen_callable(train_generator.generator()), output_types=(feats_noisy.dtype, feats_clean.dtype), output_shapes=(feats_noisy.shape, feats_clean.shape)) ds_val = tf.data.Dataset.from_generator( spdl.make_gen_callable(val_generator.generator()), output_types=(feats_noisy.dtype, feats_clean.dtype), output_shapes=(feats_noisy.shape, feats_clean.shape)) print(ds_train) print(ds_val) try: history = denoiser.fit( ds_train, steps_per_epoch = data_train_noisy.shape[0], callbacks = callbacks, validation_data = ds_val, validation_steps = data_val_noisy.shape[0], **kwargs) except ValueError as e: print('\nValueError: ', e) raise ValueError('Try setting changing the parameter '+\ '`add_tensor_last` (in function '+\ '`soundpy.models.dataprep.Generator`)'+\ ' to either True, False, or None.') else: # reshape to mix samples and batchsizes: # if batch sizes are prevalent # need better way to distinguish this if len(data_train_noisy.shape) > 3: train_shape = (data_train_noisy.shape[0]*data_train_noisy.shape[1],)+ data_train_noisy.shape[2:] + (1,) val_shape = (data_val_noisy.shape[0]*data_val_noisy.shape[1],)+ data_val_noisy.shape[2:] + (1,) else: train_shape = data_train_noisy.shape + (1,) val_shape = data_val_noisy.shape + (1,) if normalize: data_train_noisy = sp.feats.normalize(data_train_noisy) data_train_clean = sp.feats.normalize(data_train_clean) data_val_noisy = sp.feats.normalize(data_val_noisy) data_val_clean = sp.feats.normalize(data_val_clean) X_train = data_train_noisy.reshape(train_shape) y_train = data_train_clean.reshape(train_shape) X_val = data_val_noisy.reshape(val_shape) y_val = data_val_clean.reshape(val_shape) history = denoiser.fit(X_train, y_train, batch_size = data_train_noisy.shape[1], callbacks = callbacks, validation_data = (X_val, y_val), **kwargs) end_session = time.time() total_dur_sec_session = round(end_session-start_session,2) model_features_dict = dict(model_path = model_path, data_train_noisy_path = data_train_noisy_path, data_val_noisy_path = data_val_noisy_path, data_train_clean_path = data_train_clean_path, data_val_clean_path = data_val_clean_path, total_dur_sec_session = total_dur_sec_session, use_generator = use_generator, kwargs = kwargs) model_features_dict.update(settings_dict) if i == len(train_paths_noisy)-1: end = time.time() total_duration_seconds = round(end-start,2) time_dict = dict(total_duration_seconds=total_duration_seconds) model_features_dict.update(time_dict) model_features_dict_path = model_dir.joinpath('info_{}_{}.csv'.format( model_name, i)) model_features_dict_path = sp.utils.save_dict( dict2save = model_features_dict, filename = model_features_dict_path) print('\nFinished training the model. The model and associated files can be '+\ 'found here: \n{}'.format(model_dir)) return model_dir, history
############################################################################### # TODO include example extraction data in feature_extraction_dir?
[docs]def envclassifier_train(feature_extraction_dir, model_name = 'model_cnn_classifier', feature_type = None, use_generator = True, normalize = True, patience = 15, add_tensor_last = True, num_layers = 3, **kwargs): '''Collects training features and trains cnn environment classifier. This model may be applied to any speech and label scenario, for example, male vs female speech, clinical vs healthy speech, simple speech / word recognition, as well as noise / scene / environment classification. Parameters ---------- feature_extraction_dir : str or pathlib.PosixPath Directory where extracted feature files are located (format .npy). model_name : str The name for the model. This can be quite generic as the date up to the millisecond will be added to ensure a unique name for each trained model. (default 'model_cnn_classifier') feature_type : str, optional The type of features that will be used to train the model. This is only for the purposes of naming the model. If set to None, it will not be included in the model name. use_generator : bool If True, a generator will be used to feed training data to the model. Otherwise the entire training data will be used to train the model all at once. (default True) normalize : bool If True, the data will be normalized before feeding to the model. (default False) patience : int Number of epochs to train without improvement before early stopping. num_layers : int The number of convolutional neural network layers desired. (default 3) **kwargs : additional keyword arguments The keyword arguments for keras.fit(). Note, the keyword arguments differ for validation data so be sure to use the correct keyword arguments, depending on if you use the generator or not. TODO: add link to keras.fit(). Returns ------- model_dir : pathlib.PosixPath The directory where the model and associated files can be found. See Also -------- soundpy.datasets.separate_train_val_test_files Generates paths lists for train, validation, and test files. Useful for noisy vs clean datasets and also for multiple training files. soundpy.models.generator The generator function that feeds data to the model. soundpy.models.modelsetup.setup_callbacks The function that sets up callbacks (e.g. logging, save best model, early stopping, etc.) soundpy.models.template_models.cnn_classifier Template model architecture for a low-computational CNN sound classifier. ''' # ensure feature_extraction_folder exists: if feature_extraction_dir is None: feature_extraction_dir = './audiodata/example_feats_models/envclassifier/'+\ 'features_fbank_6m20d0h18m11s123ms/' dataset_path = sp.utils.check_dir(feature_extraction_dir, make=False) # designate where to save model and related files if feature_type: model_name += '_'+feature_type + '_' + sp.utils.get_date() else: model_name += '_' + sp.utils.get_date() model_dir = dataset_path.joinpath(model_name) model_dir = sp.utils.check_dir(model_dir, make=True) model_name += '.h5' model_path = model_dir.joinpath(model_name) # prepare features files to load for training features_files = list(dataset_path.glob('*.npy')) # NamedTuple: 'datasets.train', 'datasets.val', 'datasets.test' datasets = sp.datasets.separate_train_val_test_files( features_files) # TODO test if not datasets.train: # perhaps data files located in subdirectories features_files = list(dataset_path.glob('**/*.npy')) datasets = sp.datasets.separate_train_val_test_files( features_files) if not datasets.train: raise FileNotFoundError('Could not locate train, validation, or test '+\ '.npy files in the provided directory: \n{}'.format(dataset_path) +\ '\nThis program expects "train", "val", or "test" to be '+\ 'included in each filename (not parent directory/ies) names.') train_paths = datasets.train val_paths = datasets.val test_paths = datasets.test # need dictionary for decoding labels: dict_decode_path = dataset_path.joinpath('dict_decode.csv') if not os.path.exists(dict_decode_path): raise FileNotFoundError('Could not find {}.'.format(dict_decode_path)) dict_decode = sp.utils.load_dict(dict_decode_path) num_labels = len(dict_decode) # load smaller dataset to determine input size: data_val = np.load(val_paths[0]) # expect shape (num_audiofiles, batch_size, num_frames, num_features + label_column) if len(data_val.shape) == 4: input_shape = (data_val.shape[2], data_val.shape[3] - 1, 1) # expect shape (num_audiofiles, num_frames, num_features + label_column) elif len(data_val.shape) == 3: input_shape = (data_val.shape[1], data_val.shape[2] - 1, 1) # remove unneeded variable del data_val # setup model feature_maps, kernels = spdl.setup_layers(num_features = input_shape[-2], num_layers = num_layers) envclassifier, settings_dict = spdl.cnn_classifier( feature_maps = feature_maps, kernel_size = kernels, input_shape = input_shape, num_labels = num_labels) if envclassifier is None: raise sp.errors.numfeatures_incompatible_templatemodel() # create callbacks variable if not in kwargs # allow users to use different callbacks if desired if 'callbacks' not in kwargs: callbacks = spdl.setup_callbacks(patience = patience, best_modelname = model_path, log_filename = model_dir.joinpath('log.csv')) optimizer = 'adam' loss = 'sparse_categorical_crossentropy' metrics = ['accuracy'] envclassifier.compile(optimizer = optimizer, loss = loss, metrics = metrics) # TODO remove? # save variables that are not too large: local_variables = locals() global_variables = globals() sp.utils.save_dict( dict2save = local_variables, filename = model_dir.joinpath('local_variables_{}.csv'.format( model_name)), overwrite=True) sp.utils.save_dict( dict2save = global_variables, filename = model_dir.joinpath('global_variables_{}.csv'.format( model_name)), overwrite = True) # start training start = time.time() for i, train_path in enumerate(train_paths): if i == 0: if 'epochs' in kwargs: epochs = kwargs['epochs'] else: epochs = 10 # default in Keras total_epochs = epochs * len(train_paths) print('\n\nThe model will be trained {} epochs per '.format(epochs)+\ 'training session. \nTotal possible epochs: {}\n\n'.format(total_epochs)) start_session = time.time() data_train_path = train_path # just use first validation data file data_val_path = val_paths[0] # just use first test data file data_test_path = test_paths[0] print('\nTRAINING SESSION ',i+1) print("Training on: ") print(data_train_path) print() data_train = np.load(data_train_path) data_val = np.load(data_val_path) data_test = np.load(data_test_path) # reinitiate 'callbacks' for additional iterations if i > 0: if 'callbacks' not in kwargs: callbacks = spdl.setup_callbacks(patience = patience, best_modelname = model_path, log_filename = model_dir.joinpath('log.csv')) else: # apply callbacks set in **kwargs callbacks = kwargs['callbacks'] # might need to add tensor dimension to `desired_input_shape` tensor = (1,) if use_generator: train_generator = spdl.Generator( data_matrix1 = data_train, data_matrix2 = None, normalize = normalize, desired_input_shape = tensor + input_shape) val_generator = spdl.Generator( data_matrix1 = data_val, data_matrix2 = None, normalize = normalize, desired_input_shape = tensor + input_shape) test_generator = spdl.Generator( data_matrix1 = data_test, data_matrix2 = None, normalize = normalize, desired_input_shape = tensor + input_shape) # resource: # https://www.tensorflow.org/guide/data feats, label = next(train_generator.generator()) ds_train = tf.data.Dataset.from_generator( spdl.make_gen_callable(train_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) ds_val = tf.data.Dataset.from_generator( spdl.make_gen_callable(val_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) ds_test = tf.data.Dataset.from_generator( spdl.make_gen_callable(test_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) print(ds_train) print(ds_val) print(ds_test) history = envclassifier.fit( ds_train, steps_per_epoch = data_train.shape[0], callbacks = callbacks, validation_data = ds_val, validation_steps = data_val.shape[0], **kwargs) ## TODO test how well prediction works. use simple predict instead? ## need to define `y_test` #X_test, y_test = sp.feats.separate_dependent_var(data_test) #y_predicted = envclassifier.predict( #ds_train, #steps = data_test.shape[0]) score = envclassifier.evaluate(ds_test, steps=500) else: # TODO make scaling data optional? # data is separated and shaped for this classifier in scale_X_y.. X_train, y_train, scalars = sp.feats.scale_X_y(data_train, is_train=True) X_val, y_val, __ = sp.feats.scale_X_y(data_val, is_train=False, scalars=scalars) X_test, y_test, __ = sp.feats.scale_X_y(data_test, is_train=False, scalars=scalars) history = envclassifier.fit(X_train, y_train, callbacks = callbacks, validation_data = (X_val, y_val), **kwargs) score = envclassifier.evaluate(X_test, y_test) print('Test loss:', score[0]) print('Test accuracy:', score[1]) end_session = time.time() total_dur_sec_session = round(end_session-start_session,2) model_features_dict = dict(model_path = model_path, data_train_path = data_train_path, data_val_path = data_val_path, data_test_path = data_test_path, total_dur_sec_session = total_dur_sec_session, use_generator = use_generator, score = score, kwargs = kwargs) model_features_dict.update(settings_dict) if i == len(train_paths)-1: end = time.time() total_duration_seconds = round(end-start,2) time_dict = dict(total_duration_seconds=total_duration_seconds) model_features_dict.update(time_dict) model_features_dict_path = model_dir.joinpath('info_{}_{}.csv'.format( model_name, i)) model_features_dict_path = sp.utils.save_dict( filename = model_features_dict_path, dict2save = model_features_dict) print('\nFinished training the model. The model and associated files can be '+\ 'found here: \n{}'.format(model_dir)) return model_dir, history
[docs]def denoiser_run(model, new_audio, feat_settings_dict, remove_dc=True): '''Implements a pre-trained denoiser Parameters ---------- model : str or pathlib.PosixPath The path to the denoising model. new_audio : str, pathlib.PosixPath, or np.ndarray The path to the noisy audiofile. feat_settings_dict : dict Dictionary containing necessary settings for how the features were extracted for training the model. Expected keys: 'feature_type', 'win_size_ms', 'percent_overlap', 'sr', 'window', 'frames_per_sample', 'input_shape', 'desired_shape', 'dur_sec', 'num_feats'. Returns ------- cleaned_audio : np.ndarray [shape = (num_samples, )] The cleaned audio samples ready for playing or saving as audio file. sr : int The sample rate of `cleaned_audio`. See Also -------- soundpy.feats.get_feats How features are extracted. soundpy.feats.feats2audio How features are transformed back into audio samples. ''' featsettings = sp.feats.load_feat_settings(feat_settings_dict) feats = sp.feats.get_feats( new_audio, sr = featsettings.sr, feature_type = featsettings.feature_type, win_size_ms = featsettings.win_size_ms, percent_overlap = featsettings.percent_overlap, window = featsettings.window, dur_sec = featsettings.dur_sec, num_filters = featsettings.num_feats, num_mfcc = featsettings.num_mfcc, fft_bins = featsettings.fft_bins, remove_first_coefficient = featsettings.remove_first_coefficient, sinosoidal_liftering = featsettings.sinosoidal_liftering, mono = featsettings.mono, rate_of_change = featsettings.rate_of_change, rate_of_acceleration = featsettings.rate_of_acceleration, subtract_mean = featsettings.subtract_mean, real_signal = featsettings.real_signal, fmin = featsettings.fmin, fmax = featsettings.fmax, zeropad = featsettings.zeropad) # are phase data still present? (only in stft features) if feats.dtype == np.complex and np.min(feats) < 0: original_phase = sp.dsp.calc_phase(feats, radians=False) elif 'stft' in feature_type or 'powspec' in featsettings.feature_type: feats_stft = sp.feats.get_feats( new_audio, sr = featsettings.sr, feature_type = 'stft', win_size_ms = featsettings.win_size_ms, percent_overlap = featsettings.percent_overlap, window = featsettings.window, dur_sec = featsettings.dur_sec, fft_bins = featsettings.fft_bins, mono = featsettings.mono) original_phase = sp.dsp.calc_phase(feats_stft, radians = False) else: original_phase = None if 'signal' in featsettings.feature_type: feats_zeropadded = np.zeros(featsettings.base_shape) feats_zeropadded = feats_zeropadded.flatten() if len(feats.shape) > 1: feats_zeropadded = feats_zeropadded.reshape(feats_zeropadded.shape[0], feats.shape[1]) if len(feats) > len(feats_zeropadded): feats = feats[:len(feats_zeropadded)] feats_zeropadded[:len(feats)] += feats # reshape here to avoid memory issues if total # samples is large feats = feats_zeropadded.reshape(featsettings.base_shape) # add a tensor dimension to either first or last channel.. whatever works I guess? # keras.. tensor = (1,) feats = sp.feats.prep_new_audiofeats(feats, featsettings.base_shape, featsettings.input_shape)# tensor alread included # ensure same shape as feats if original_phase is not None: original_phase = sp.feats.prep_new_audiofeats(original_phase, featsettings.base_shape, featsettings.input_shape) feats_normed = sp.feats.normalize(feats) denoiser = load_model(model) if len(feats_normed.shape) >= 3: batch_size = feats_normed.shape[0] # newer version soundpy 0.1.0a3 feats_normed = feats_normed.reshape((1,) + feats_normed.shape) try: cleaned_feats = denoiser.predict(feats_normed, batch_size = batch_size) except ValueError: # newer version soundpy 0.1.0a3 import warnings msg = '\nWARNING: adjustments to feature extraction in a more recent'+\ ' SoundPy version may result in imperfect feature alignmnet '+\ 'with a model trained with features generated with a previous'+\ ' SoundPy version. Sincerest apologies!' warnings.warn(msg) feats_normed = feats_normed.reshape(feats_normed.shape[1:]) cleaned_feats = denoiser.predict(feats_normed, batch_size = batch_size) else: feats_normed = feats_normed.reshape((1,)+feats_normed.shape) cleaned_feats = denoiser.predict(feats_normed) try: # need to change shape back to 2D # current shape is (batch_size, num_frames, num_features, 1) # need (num_frames, num_features) # remove last tensor dimension if feats_normed.shape[-1] == 1: feats_normed = feats_normed.reshape(feats_normed.shape[:-1]) feats_flattened = feats_normed.reshape(-1, feats_normed.shape[-1]) audio_shape = (feats_flattened.shape) cleaned_feats = cleaned_feats.reshape(audio_shape) if original_phase is not None: original_phase = original_phase.reshape(audio_shape) # now combine them to create audio samples: cleaned_audio = sp.feats.feats2audio( cleaned_feats, feature_type = featsettings.feature_type, sr = featsettings.sr, win_size_ms = featsettings.win_size_ms, percent_overlap = featsettings.percent_overlap, phase = original_phase) if not isinstance(new_audio, np.ndarray): noisy_audio, __ = sp.loadsound(new_audio, sr = featsettings.sr, remove_dc = remove_dc) else: noisy_audio = new_audio if len(cleaned_audio) > len(noisy_audio): cleaned_audio = cleaned_audio[:len(noisy_audio)] max_energy_original = np.max(noisy_audio) # match the scale of the original audio: cleaned_audio = sp.dsp.scalesound(cleaned_audio, max_val = max_energy_original) except librosa.ParameterError as e: import warnings msg = '\nlibrosa.ParameterError: {}'.format(e)+\ '\nUnable to convert cleaned features to raw audio samples.'+\ '\nReturning cleaned audio in {} features.'.format(featsettings.feature_type) warnings.warn(msg) cleaned_audio = cleaned_feats return cleaned_audio, featsettings.sr
[docs]def envclassifier_run(model, new_audio, feat_settings_dict, dict_decode): '''Implement a convnet model with `new_audio`. Parameters ---------- model : str, pathlib.PosixPath The pathway to the pre-trained model. new_audio : str, pathlib.PosixPath The pathway to the audio file to be classified. feat_settings_dict : dict Dictionary containing necessary settings for feature extraction, such as sample rate, feature type, etc. dict_decode : dict Dictionary containing encoded labels as keys and string labels as values. for example {0:'office', 1:'traffic', 2:'park'}. Returns ------- label : int The encoded label applied to the `new_audio`. label_string : str The string label applied to the `new_audio`. strength : float The confidence of the model's assignment. For example, 0.99 would be very confident, 0.51 would not be very confident. ''' featsettings = sp.feats.load_feat_settings(feat_settings_dict) feats = sp.feats.get_feats( new_audio, sr = featsettings.sr, feature_type = featsettings.feature_type, win_size_ms = featsettings.win_size_ms, percent_overlap = featsettings.percent_overlap, window = featsettings.window, dur_sec = featsettings.dur_sec, num_filters = featsettings.num_feats, num_mfcc = featsettings.num_mfcc, fft_bins = featsettings.fft_bins, remove_first_coefficient = featsettings.remove_first_coefficient, sinosoidal_liftering = featsettings.sinosoidal_liftering, mono = featsettings.mono, rate_of_change = featsettings.rate_of_change, rate_of_acceleration = featsettings.rate_of_acceleration, subtract_mean = featsettings.subtract_mean, real_signal = featsettings.real_signal, fmin = featsettings.fmin, fmax = featsettings.fmax, zeropad = featsettings.zeropad) # load info csv with model input shape model_path = sp.utils.string2pathlib(model) model_info_path = model.parent.glob('*.csv') model_info_path = [i for i in model_info_path if 'info' in i.stem][0] model_info = sp.utils.load_dict(model_info_path) for key, value in model_info.items(): model_info[key] = sp.utils.restore_dictvalue(value) input_shape = model_info['input_shape'] feats = sp.feats.prep_new_audiofeats(feats, featsettings.base_shape, input_shape) feats_normed = sp.feats.normalize(feats) envclassifier = load_model(model) tensor = (1,) feats_normed = feats_normed.reshape(tensor + feats_normed.shape) prediction = envclassifier.predict(feats_normed) label = np.argmax(prediction) strength = prediction[0][label] try: label_string = dict_decode[label] except KeyError: label_string = dict_decode[str(int(label))] return label, label_string, strength
[docs]def collect_classifier_settings(feature_extraction_dir): '''Collects relevant information for some models from files in the feature directory. These relevant files have been generated in `soundpy.models.builtin.envclassifier_train`. Parameters ---------- feature_extraction_dir : str, pathlib.PosixPath The directory where extracted files are located, included .npy and .csv log files. Returns ------- datasets : NamedTuple A named tuple containing train, val, and test data num_labels : int The number of labels used for the data. feat_shape : tuple The initial shape of the features when they were extracted. For example, labels or context window not applied. num_feats : int The number of features used to train the pre-trained model. feature_type : str The `feature_type` used to train the pre-trained model. For example, 'fbank', 'mfcc', 'stft', 'signal', 'powspec'. See Also -------- soundpy.models.builtin.envclassifier_train The builtin functionality for training a simple scene/environment/speech classifier. This function generates the files expected by this function. ''' # ensure feature_extraction_folder exists: dataset_path = sp.utils.check_dir(feature_extraction_dir, make=False) # prepare features files to load for training features_files = list(dataset_path.glob('*.npy')) # NamedTuple: 'datasets.train', 'datasets.val', 'datasets.test' datasets = sp.datasets.separate_train_val_test_files( features_files) # TODO test if not datasets.train: # perhaps data files located in subdirectories features_files = list(dataset_path.glob('**/*.npy')) datasets = sp.datasets.separate_train_val_test_files( features_files) if not datasets.train: raise FileNotFoundError('Could not locate train, validation, or test '+\ '.npy files in the provided directory: \n{}'.format(dataset_path) +\ '\nThis program expects "train", "val", or "test" to be '+\ 'included in each filename (not parent directory/ies) names.') train_paths = datasets.train val_paths = datasets.val test_paths = datasets.test # need dictionary for decoding labels: dict_decode_path = dataset_path.joinpath('dict_decode.csv') if not os.path.exists(dict_decode_path): raise FileNotFoundError('Could not find {}.'.format(dict_decode_path)) dict_decode = sp.utils.load_dict(dict_decode_path) num_labels = len(dict_decode) settings_dict = sp.utils.load_dict( dataset_path.joinpath('log_extraction_settings.csv')) if 'kwargs' in settings_dict: kwargs = sp.utils.restore_dictvalue(settings_dict['kwargs']) settings_dict.update(kwargs) # should the shape include the label column or not? # currently not try: feat_shape = sp.utils.restore_dictvalue(settings_dict['desired_shape']) except KeyError: feat_shape = sp.utils.restore_dictvalue(settings_dict['feat_base_shape']) try: num_feats = sp.utils.restore_dictvalue(settings_dict['num_feats']) except KeyError: num_feats = feat_shape[-1] try: feature_type = settings_dict['feat_type'] except KeyError: feature_type = settings_dict['feature_type'] return datasets, num_labels, feat_shape, num_feats, feature_type
# TODO cleanup # TODO test
[docs]def cnnlstm_train(feature_extraction_dir, model_name = 'model_cnnlstm_classifier', use_generator = True, normalize = True, patience = 15, timesteps = 10, context_window = 5, frames_per_sample = None, colorscale = 1, total_training_sessions = None, add_tensor_last = False, **kwargs): '''Example implementation of a Convnet+LSTM model for speech recognition. Note: improvements must still be made, for example with the `context_window`. However, this still may be useful as an example of a simple CNN and LSTM model. Parameters ---------- feature_extraction_dir : str, pathlib.PosixPath The directory where feature data will be saved. model_name : str The name of the model. (default 'model_cnnlstm_classifier') use_generator : True If True, data will be fed to the model via generator. This parameter will likely be removed and set as a default. (default True) normalize : bool If True, the data will be normalized before being fed to the model. (default True) patience : int The number of epochs to allow with no improvement in either val accuracy or loss. (default 15) timesteps : int The frames dedicated to each subsection of each sample. This allows the long-short term memory model to process each subsection consecutively. context_window : int The number of frames surrounding a central frame that make up sound context. Note: this needs improvement and further exploration. frames_per_sample : int Serves basically same role as `context_window` does currently: `frames_per_sample` equals `context_window` * 2 + 1. This parameter will likely be removed in future versions. colorscale : int The colorscale relevant for the convolutional neural network. (default 1) total_training_sessions : int Option to limit number of audiofiles used for training, if `use_generator` is set to False. This parameter will likely be removed in future versions. But as this is just an example model, the low priority may result in this parameter living forever. add_tensor_last : bool No longer used in the code. Irrelevant. kwargs : additional keyword arguments. Keyword arguments for `keras.model.fit`. Returns ------- model_dir : pathlib.PosixPath The directory where model and log files are saved. history : tf.keras.callbacks.History Contains model training and validation accuracy and loss throughout training. References ---------- Kim, Myungjong & Cao, Beiming & An, Kwanghoon & Wang, Jun. (2018). Dysarthric Speech Recognition Using Convolutional LSTM Neural Network. 10.21437/interspeech.2018-2250. ''' datasets, num_labels, feat_shape, num_feats, feature_type =\ collect_classifier_settings(feature_extraction_dir) train_paths = datasets.train val_paths = datasets.val test_paths = datasets.test # Save model directory inside feature directory dataset_path = train_paths[0].parent if feature_type: model_name += '_'+feature_type + '_' + sp.utils.get_date() else: model_name += '_' + sp.utils.get_date() model_dir = dataset_path.joinpath(model_name) model_dir = sp.utils.check_dir(model_dir, make=True) model_name += '.h5' model_path = model_dir.joinpath(model_name) if frames_per_sample is not None: raise DeprecationWarning('In future versions, the `frames_per_sample` '+\ 'parameter will be no longer used.\n'+\ 'Instead features can be segmented in generator functions using the '+\ 'parameter `context_window`: `soundpy.models.dataprep.Generator`.') if context_window is not None: # by default it is not None if frames_per_sample is None: frame_width = context_window * 2 + 1 # context window w central frame else: frame_width = frames_per_sample elif frames_per_sample is not None: frame_width = frames_per_sample input_shape = (timesteps, frame_width, num_feats, colorscale) model, settings = spdl.cnnlstm_classifier(num_labels = num_labels, input_shape = input_shape, lstm_cells = num_feats) #print('cnnlstm desired input shape: ', input_shape) #cnnlstm desired input shape: (10, 11, 221, 1) #train data shape: (7433, 99, 222) #start #(99, 221) #timestep #(10, 10, 221) #context_window (with zeropadding) #(10, 11, 221) # create callbacks variable if not in kwargs # allow users to use different callbacks if desired # TODO test how it works when callbacks set in kwargs. if 'callbacks' not in kwargs: callbacks = spdl.setup_callbacks(patience = patience, best_modelname = model_path, log_filename = model_dir.joinpath('log.csv')) optimizer = 'adam' loss = 'sparse_categorical_crossentropy' metrics = ['accuracy'] model.compile(optimizer = optimizer, loss = loss, metrics = metrics) # update settings with optimizer etc. additional_settings = dict(optimizer = optimizer, loss = loss, metrics = metrics, kwargs = kwargs) settings.update(additional_settings) # start training start = time.time() for i, train_path in enumerate(train_paths): if i == 0: if 'epochs' in kwargs: epochs = kwargs['epochs'] else: epochs = 10 # default in Keras total_epochs = epochs * len(train_paths) print('\n\nThe model will be trained {} epochs per '.format(epochs)+\ 'training session. \nTotal possible epochs: {}\n\n'.format(total_epochs)) start_session = time.time() data_train_path = train_path # just use first validation data file data_val_path = val_paths[0] # just use first test data file data_test_path = test_paths[0] print('\nTRAINING SESSION ',i+1) print("Training on: ") print(data_train_path) print() data_train = np.load(data_train_path) data_val = np.load(data_val_path) data_test = np.load(data_test_path) print('\ntrain data shape: ', data_train.shape) print() # shuffle data_train, just to ensure random np.random.shuffle(data_train) # reinitiate 'callbacks' for additional iterations if i > 0: if 'callbacks' not in kwargs: callbacks = spdl.setup_callbacks(patience = patience, best_modelname = model_path, log_filename = model_dir.joinpath('log.csv')) else: # apply callbacks set in **kwargs callbacks = kwargs['callbacks'] if use_generator: train_generator = spdl.Generator(data_matrix1 = data_train, data_matrix2 = None, normalize = normalize, timestep = timesteps, axis_timestep = 0, context_window = context_window, axis_context_window = -2, desired_input_shape = (1,)+input_shape, ) # expecting features in last axis # add_tensor_last = add_tensor_last) val_generator = spdl.Generator(data_matrix1 = data_val, data_matrix2 = None, normalize = normalize, timestep = timesteps, axis_timestep = 0, context_window = context_window, axis_context_window = -2, desired_input_shape = (1,)+input_shape, ) #add_tensor_last = add_tensor_last) test_generator = spdl.Generator(data_matrix1 = data_test, data_matrix2 = None, normalize = normalize, timestep = timesteps, axis_timestep = 0, context_window = context_window, axis_context_window = -2, desired_input_shape = (1,)+input_shape, ) feats, label = next(train_generator.generator()) print('generator items:') print('feature shape') print(feats.shape) print('label') print(label) #sp.feats.plot(feats, feature_type='stft', save_pic = True, #name4pic = 'cnnlstm_test.png') ds_train = tf.data.Dataset.from_generator( spdl.make_gen_callable(train_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) ds_val = tf.data.Dataset.from_generator( spdl.make_gen_callable(val_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) ds_test = tf.data.Dataset.from_generator( spdl.make_gen_callable(test_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) print(ds_train) print(ds_val) print(ds_test) history = model.fit( ds_train, steps_per_epoch = data_train.shape[0], callbacks = callbacks, validation_data = ds_val, validation_steps = data_val.shape[0], **kwargs) score = model.evaluate(ds_test, steps=500) else: # TODO make scaling data optional? # TODO remove option for non-generator fed data..? # data is separated and shaped for this classifier in scale_X_y.. X_train, y_train, scalars = sp.feats.scale_X_y(data_train, is_train=True) X_val, y_val, __ = sp.feats.scale_X_y(data_val, is_train=False, scalars=scalars) X_test, y_test, __ = sp.feats.scale_X_y(data_test, is_train=False, scalars=scalars) X_train = sp.feats.adjust_shape(X_train, (X_train.shape[0],)+input_shape, change_dims = True) X_val = sp.feats.adjust_shape(X_val, (X_val.shape[0],)+input_shape, change_dims = True) X_test = sp.feats.adjust_shape(X_test, (X_test.shape[0],)+input_shape, change_dims = True) # randomize train data rand_idx = np.random.choice(range(len(X_train)), len(X_train), replace=False) X_train = X_train[rand_idx] history = model.fit(X_train, y_train, callbacks = callbacks, validation_data = (X_val, y_val), **kwargs) score = model.evaluate(X_test, y_test) print('Test loss:', score[0]) print('Test accuracy:', score[1]) end_session = time.time() total_dur_sec_session = round(end_session-start_session,2) model_features_dict = dict(model_path = model_path, data_train_path = data_train_path, data_val_path = data_val_path, data_test_path = data_test_path, total_dur_sec_session = total_dur_sec_session, use_generator = use_generator, score = score, kwargs = kwargs) model_features_dict.update(settings) model_features_dict_path = model_dir.joinpath('info_{}_{}.csv'.format( model_name, i)) model_features_dict_path = sp.utils.save_dict( filename = model_features_dict_path, dict2save = model_features_dict) if total_training_sessions is None: total_training_sessions = len(train_paths) if i == total_training_sessions-1: end = time.time() total_duration_seconds = round(end-start,2) time_dict = dict(total_duration_seconds=total_duration_seconds) model_features_dict.update(time_dict) model_features_dict_path = model_dir.joinpath('info_{}_{}.csv'.format( model_name, i)) model_features_dict_path = sp.utils.save_dict( filename = model_features_dict_path, dict2save = model_features_dict, overwrite = True) print('\nFinished training the model. The model and associated files can be '+\ 'found here: \n{}'.format(model_dir)) model.save(model_dir.joinpath('final_not_best_model.h5')) return model_dir, history
# TODO cleanup # TODO test
[docs]def resnet50_train(feature_extraction_dir, model_name = 'model_resnet50_classifier', use_generator = True, normalize = True, patience = 15, colorscale = 3, total_training_sessions = None, **kwargs): '''Continue training a pre-trained resnet50 model for speech recogntion or other sound classification. Parameters ---------- feature_extraction_dir : str or pathlib.PosixPath The directory where feature extraction files will be saved. model_name : str The name for the model. (default 'model_resnet50_classifier') use_generator : True If True, data will be fed to the model via generator. This parameter will likely be removed and set as a default. (default True) normalize : bool If True, the data will be normalized before being fed to the model. (default True) patience : int The number of epochs to allow with no improvement in either val accuracy or loss. (default 15) timesteps : int The frames dedicated to each subsection of each sample. This allows the long-short term memory model to process each subsection consecutively. context_window : int The number of frames surrounding a central frame that make up sound context. Note: this needs improvement and further exploration. frames_per_sample : int Serves basically same role as `context_window` does currently: `frames_per_sample` equals `context_window` * 2 + 1. This parameter will likely be removed in future versions. colorscale : int The colorscale relevant for the convolutional neural network. (default 1) total_training_sessions : int Option to limit number of audiofiles used for training, if `use_generator` is set to False. This parameter will likely be removed in future versions. But as this is just an example model, the low priority may result in this parameter living forever. Returns ------- model_dir : pathlib.PosixPath The directory where model and log files are saved. history : tf.keras.callbacks.History() Contains model training and validation accuracy and loss throughout training. ''' datasets, num_labels, feat_shape, num_feats, feature_type =\ collect_classifier_settings(feature_extraction_dir) train_paths = datasets.train val_paths = datasets.val test_paths = datasets.test # Save model directory inside feature directory dataset_path = train_paths[0].parent if feature_type: model_name += '_'+feature_type + '_' + sp.utils.get_date() else: model_name += '_' + sp.utils.get_date() model_dir = dataset_path.joinpath(model_name) model_dir = sp.utils.check_dir(model_dir, make=True) model_name += '.h5' model_path = model_dir.joinpath(model_name) input_shape = (feat_shape[0], num_feats, colorscale) model, settings = spdl.resnet50_classifier(num_labels = num_labels, input_shape = input_shape) # create callbacks variable if not in kwargs # allow users to use different callbacks if desired if 'callbacks' not in kwargs: callbacks = spdl.setup_callbacks(patience = patience, best_modelname = model_path, log_filename = model_dir.joinpath('log.csv')) optimizer = Adam(lr=0.0001) loss='sparse_categorical_crossentropy' metrics = ['accuracy'] model.compile(optimizer=optimizer, loss = loss, metrics = metrics) # update settings with optimizer etc. additional_settings = dict(optimizer = optimizer, loss = loss, metrics = metrics, kwargs = kwargs) settings.update(additional_settings) # start training start = time.time() for i, train_path in enumerate(train_paths): if i == 0: if 'epochs' in kwargs: epochs = kwargs['epochs'] else: epochs = 10 # default in Keras total_epochs = epochs * len(train_paths) print('\n\nThe model will be trained {} epochs per '.format(epochs)+\ 'training session. \nTotal possible epochs: {}\n\n'.format(total_epochs)) start_session = time.time() data_train_path = train_path # just use first validation data file data_val_path = val_paths[0] # just use first test data file data_test_path = test_paths[0] print('\nTRAINING SESSION ',i+1) print("Training on: ") print(data_train_path) print() data_train = np.load(data_train_path) data_val = np.load(data_val_path) data_test = np.load(data_test_path) # shuffle data_train, just to ensure random np.random.shuffle(data_train) # reinitiate 'callbacks' for additional iterations if i > 0: if 'callbacks' not in kwargs: callbacks = spdl.setup_callbacks(patience = patience, best_modelname = model_path, log_filename = model_dir.joinpath('log.csv')) else: # apply callbacks set in **kwargs callbacks = kwargs['callbacks'] tensor = (1,) if use_generator: train_generator = spdl.Generator( data_matrix1 = data_train, data_matrix2 = None, normalize = normalize, desired_input_shape = tensor + input_shape, gray2color = True) val_generator = spdl.Generator( data_matrix1 = data_val, data_matrix2 = None, normalize = normalize, desired_input_shape = tensor + input_shape, gray2color = True) test_generator = spdl.Generator( data_matrix1 = data_test, data_matrix2 = None, normalize = normalize, desired_input_shape = tensor + input_shape, gray2color = True) feats, label = next(train_generator.generator()) ds_train = tf.data.Dataset.from_generator( spdl.make_gen_callable(train_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) ds_val = tf.data.Dataset.from_generator( spdl.make_gen_callable(val_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) ds_test = tf.data.Dataset.from_generator( spdl.make_gen_callable(test_generator.generator()), output_types=(feats.dtype, label.dtype), output_shapes=(feats.shape, label.shape)) print(ds_train) print(ds_val) print(ds_test) history = model.fit( ds_train, steps_per_epoch = data_train.shape[0], callbacks = callbacks, validation_data = ds_val, validation_steps = data_val.shape[0], **kwargs) score = model.evaluate(ds_test, steps=500) else: # TODO make scaling data optional? # data is separated and shaped for this classifier in scale_X_y.. X_train, y_train, scalars = sp.feats.scale_X_y(data_train, is_train=True) X_val, y_val, __ = sp.feats.scale_X_y(data_val, is_train=False, scalars=scalars) X_test, y_test, __ = sp.feats.scale_X_y(data_test, is_train=False, scalars=scalars) print(X_train.shape) X_train = sp.feats.adjust_shape(X_train, (X_train.shape[0],)+input_shape, change_dims = True) print(X_train.shape) X_val = sp.feats.adjust_shape(X_val, (X_val.shape[0],)+input_shape, change_dims = True) X_test = sp.feats.adjust_shape(X_test, (X_test.shape[0],)+input_shape, change_dims = True) # randomize train data rand_idx = np.random.choice(range(len(X_train)), len(X_train), replace=False) X_train = X_train[rand_idx] # make grayscale to colorscale X_train = sp.feats.grayscale2color(X_train, colorscale = 3) X_val = sp.feats.grayscale2color(X_val, colorscale = 3) X_test = sp.feats.grayscale2color(X_test, colorscale = 3) print(X_train.shape) history = model.fit(X_train, y_train, callbacks = callbacks, validation_data = (X_val, y_val), **kwargs) score = model.evaluate(X_test, y_test) print('Test loss:', score[0]) print('Test accuracy:', score[1]) end_session = time.time() total_dur_sec_session = round(end_session-start_session,2) model_features_dict = dict(model_path = model_path, data_train_path = data_train_path, data_val_path = data_val_path, data_test_path = data_test_path, total_dur_sec_session = total_dur_sec_session, use_generator = use_generator, score = score, kwargs = kwargs) model_features_dict.update(settings) model_features_dict_path = model_dir.joinpath('info_{}_{}.csv'.format( model_name, i)) model_features_dict_path = sp.utils.save_dict( filename = model_features_dict_path, dict2save = model_features_dict) if total_training_sessions is None: total_training_sessions = len(train_paths) if i == total_training_sessions-1: end = time.time() total_duration_seconds = round(end-start,2) time_dict = dict(total_duration_seconds=total_duration_seconds) model_features_dict.update(time_dict) model_features_dict_path = model_dir.joinpath('info_{}_{}.csv'.format( model_name, i)) model_features_dict_path = sp.utils.save_dict( filename = model_features_dict_path, dict2save = model_features_dict, overwrite = True) print('\nFinished training the model. The model and associated files can be '+\ 'found here: \n{}'.format(model_dir)) model.save(model_dir.joinpath('final_not_best_model.h5')) return model_dir, history
# TODO cleanup # TODO test # TODO continue docstrings
[docs]def envclassifier_extract_train( model_name = 'env_classifier', augment_dict = None, audiodata_path = None, features_dir = None, save_new_files_dir = None, labeled_data = True, ignore_label_marker = None, batch_size = 10, epochs = 5, patience = 15, callbacks = None, random_seed = None, visualize = False, vis_every_n_items = 50, label_silence = False, val_data = None, test_data = None, append_model_dir = False, **kwargs): '''Extract and augment features during training of a scene/environment/speech classifier Parameters ---------- model_name : str Name of the model. No extension (will save as .h5 file) (default 'env_classifier') augment_dict : dict, optional Dictionary containing keys (e.g. 'add_white_noise'). See `soundpy.augment.list_augmentations`and corresponding True or False values. If the value is True, the key / augmentation gets implemented at random, each epoch. (default None) audiodata_path : str, pathlib.PosixPath Where audio data can be found, if no `features_dir` where previously extracted and prepared files are located. (default None) features_dir : str, pathlib.PosixPath The feature directory where previously extracted validation and test data are located, as well as the relevant log files. save_new_files_dir : str, pathlib.PosixPath Where new files (logging, model(s), etc.) will be saved. If None, will be set in a unique directory within the current working directory. (default None) labeled_data : bool Useful in determining shape of data. If True, expected label column to exist at the end of the feature column of feature data. Note: this may be removed in future versions. ignore_label_marker : str When collecting labels from subdirectory names, this allows a subfolder name to be ignored. For example, if `ignore_label_marker` is set as '__', the folder name '__test__' will not be included as a label while a folder name 'dog_barking' will. **kwargs : additional keyword arguments Keyword arguments for `soundpy.feats.get_feats`. ''' if features_dir is not None: features_dir = sp.utils.string2pathlib(features_dir) feat_settings_file = features_dir.joinpath('log_extraction_settings.csv') feat_settings_dict = sp.utils.load_dict(feat_settings_file) # should be a dict feat_kwargs = sp.utils.restore_dictvalue(feat_settings_dict['kwargs']) print(feat_kwargs) # load decode dictionary for labeled data dict_decode_path = features_dir.joinpath('dict_decode.csv') dict_decode = sp.utils.load_dict(dict_decode_path) dict_encode = None # ensure items in dictionaries original type for key, value in feat_kwargs.items(): feat_kwargs[key] = sp.utils.restore_dictvalue(value) for key, value in feat_settings_dict.items(): feat_settings_dict[key] = sp.utils.restore_dictvalue(value) for key, value in dict_decode.items(): # expects key to be integer dict_decode[key] = sp.utils.restore_dictvalue(value) # update kwargs with loaded feature kwargs kwargs = dict(feat_kwargs) # require 'feature_type' to be indicated if 'feature_type' not in kwargs: raise ValueError('Function `envclassifier_extract_train` expects the '+ \ 'parameter `feature_type` to be set as one of the following:\n'+ \ '- signal\n- stft\n- powspec\n- fbank\n- mfcc\n') #if 'stft' not in kwargs['feature_type'] and 'powspec' not in kwargs['feature_type']: #raise ValueError('Function `envclassifier_extract_train` can only reliably '+\ #'work if `feature_type` parameter is set to "stft" or "powspec".'+\ #' In future versions the other feature types will be made available.') # ensure defaults are set if not included in kwargs: if 'win_size_ms' not in kwargs: kwargs['win_size_ms'] = 20 if 'percent_overlap' not in kwargs: kwargs['percent_overlap'] = 0.5 if 'rate_of_change' not in kwargs: kwargs['rate_of_change'] = False if 'rate_of_acceleration' not in kwargs: kwargs['rate_of_acceleration'] = False if 'dur_sec' not in kwargs: raise ValueError('Function `envclassifier_extract_train``requires ' +\ 'the keyword argument `dur_sec` to be set. How many seconds of audio '+\ 'from each audio file would you like to use for training?') if 'sr' not in kwargs: kwargs['sr'] = 22050 if 'fft_bins' not in kwargs: import warnings fft_bins = int(kwargs['win_size_ms'] * kwargs['sr'] // 1000) msg = '\nWARNING: `fft_bins` was not set. Setting it to {}'.format(fft_bins) warnings.warn(msg) kwargs['fft_bins'] = fft_bins if 'real_signal' not in kwargs: kwargs['real_signal'] = True if 'window' not in kwargs: kwargs['window'] = 'hann' if 'zeropad' not in kwargs: kwargs['zeropad'] = True if 'num_filters' not in kwargs: kwargs['num_filters'] = 40 if 'num_mfcc' not in kwargs: kwargs['num_mfcc'] = 40 # training will fail if patience set to a non-integer type if patience is None: patience = epochs if features_dir is None: # Set up directory to save new files: # will not raise error if not exists: instead makes the directory if save_new_files_dir is None: save_new_files_dir = './example_feats_models/envclassifer/' dataset_path = sp.check_dir(save_new_files_dir, make = True) # create unique timestamped directory to save new files # to avoid overwriting issues: dataset_path = dataset_path.joinpath( 'features_{}_{}'.format(kwargs['feature_type'], sp.utils.get_date())) # create that new directory as well dataset_path = sp.check_dir(dataset_path, make=True) else: dataset_path = features_dir # set up datasets if no dataset_dict provided: if features_dir is None: if audiodata_path is None: raise ValueError('Function `envclassifier_extract_train` expects either:\n'+\ '1) a `dataset_dict` with audiofile pathways assigned to datasets OR'+\ '\n2) a `audiodata_path` indicating where audiofiles for'+\ 'training are located.\n**Both cannot be None.') # sp.check_dir: # raises error if this path doesn't exist (make = False) # if does exist, returns path as pathlib.PosixPath object data_dir = sp.check_dir(audiodata_path, make = False) # collect labels labels = [] for label in data_dir.glob('*/'): if label.suffix: # avoid adding unwanted files in the directory # want only directory names continue if ignore_label_marker is not None: if ignore_label_marker in label.stem: continue # ignores hidden directories if label.stem[0] == '.': continue labels.append(label.stem) labels = set(labels) # create encoding and decoding dictionaries of labels: dict_encode, dict_decode = sp.datasets.create_dicts_labelsencoded( labels, add_extra_label = label_silence, extra_label = 'silence') # save labels and their encodings dict_encode_path = dataset_path.joinpath('dict_encode.csv') dict_decode_path = dataset_path.joinpath('dict_decode.csv') sp.utils.save_dict(dict2save = dict_encode, filename = dict_encode_path, overwrite=True) dict_decode_path = sp.utils.save_dict(dict2save = dict_decode, filename = dict_decode_path, overwrite=True) # get audio pathways and assign them their encoded labels: paths_list = sp.files.collect_audiofiles(data_dir, recursive=True) paths_list = sorted(paths_list) dict_encodedlabel2audio = sp.datasets.create_encodedlabel2audio_dict( dict_encode, paths_list) # path for saving dict for which audio paths are assigned to which labels: dict_encdodedlabel2audio_path = dataset_path.joinpath( 'dict_encdodedlabel2audio.csv') sp.utils.save_dict(dict2save = dict_encodedlabel2audio, filename = dict_encdodedlabel2audio_path, overwrite=True) # assign audio files int train, validation, and test datasets train, val, test = sp.datasets.audio2datasets( dict_encdodedlabel2audio_path, perc_train=0.8, limit=None, seed=random_seed) if random_seed is not None: random.seed(random_seed) random.shuffle(train) if random_seed is not None: random.seed(random_seed) random.shuffle(val) if random_seed is not None: random.seed(random_seed) random.shuffle(test) # save audiofiles for each dataset to dict and save # for logging purposes dataset_dict = dict([('train', train), ('val', val), ('test', test)]) dataset_dict_path = dataset_path.joinpath('dataset_audiofiles.csv') dataset_dict_path = sp.utils.save_dict( dict2save = dataset_dict, filename = dataset_dict_path, overwrite=True) feat_base_shape, shape_with_label = sp.feats.get_feature_matrix_shape( labeled_data = labeled_data, **kwargs) extracted_data_dict = dict([('val',dataset_dict['val']), ('test',dataset_dict['test'])]) val_path = dataset_path.joinpath('val_data.npy') test_path = dataset_path.joinpath('test_data.npy') extracted_data_path_dict = dict([('val', val_path), ('test', test_path)]) # extract test data print('\nExtracting validation data for use in training:') extracted_data_dict, extracted_data_path_dict = sp.feats.save_features_datasets( extracted_data_dict, extracted_data_path_dict, labeled_data = labeled_data, **kwargs) val_data = np.load(extracted_data_path_dict['val']) test_data = np.load(extracted_data_path_dict['test']) else: feat_base_shape = feat_settings_dict['feat_base_shape'] shape_with_label = feat_settings_dict['feat_model_shape'] # use pre-collected dataset dict dataset_dict_path = dataset_path.joinpath('dataset_audiofiles.csv') dataset_dict = sp.utils.load_dict(dataset_dict_path) for key, value in dataset_dict.items(): dataset_dict[key] = sp.utils.restore_dictvalue(value) val_data = np.load(val_data) test_data = np.load(test_data) if 'fbank' in kwargs['feature_type'] or 'mfcc' in kwargs['feature_type']: kwargs['fmax'] = kwargs['sr'] / 2.0 # Niquist theorem # extract validation data (must already be extracted) color_dimension = (1,) # our data is in grayscale input_shape = feat_base_shape + color_dimension num_labels = len(dict_decode) # otherwise should arleady be specified if augment_dict is None: augment_dict = dict() # designate where to save model and related files model_name += '_' + kwargs['feature_type'] model_dir = dataset_path.joinpath(model_name) model_dir = sp.utils.check_dir(model_dir, make=True, append=append_model_dir) # don't want to overwrite already trained model and logs model_path = model_dir.joinpath(model_name) # setup model envclassifier, settings_dict = spdl.cnn_classifier( input_shape = input_shape, num_labels = num_labels) optimizer = 'adam' loss = 'sparse_categorical_crossentropy' metrics = ['accuracy'] envclassifier.compile(optimizer = optimizer, loss = loss, metrics = metrics) # should randomly apply augmentations in generator # items that need to be called with each iteration: # save best model for each iteration - don't want to be overwritten # with worse model best_modelname = str(model_path) + '.h5' callbacks = spdl.setup_callbacks( patience = patience, best_modelname = best_modelname, log_filename = model_dir.joinpath('log.csv'), append = True) normalize = True tensor = (1,) train_generator = spdl.GeneratorFeatExtraction( datalist = dataset_dict['train'], model_name = model_name, normalize = normalize, apply_log = False, randomize = True, # want the data order to be different for each iteration random_seed = None, desired_input_shape = tensor + input_shape, batch_size = batch_size, gray2color = False, visualize = visualize, vis_every_n_items = vis_every_n_items, visuals_dir = model_dir.joinpath('images'), decode_dict = dict_decode, dataset = 'train', augment_dict = augment_dict, label_silence = label_silence, **kwargs) val_generator = spdl.Generator( data_matrix1 = val_data, desired_input_shape = tensor + input_shape) test_generator = spdl.Generator( data_matrix1 = test_data, desired_input_shape = tensor + input_shape) if 'stft' in kwargs['feature_type'] or 'fbank' in kwargs['feature_type'] \ or 'powspec' in kwargs['feature_type']: energy_scale = 'power_to_db' else: energy_scale = None feats_train, label_train = next(train_generator.generator()) try: label_train_vis = dict_decode[label_train[0]] except KeyError: label_train_vis = dict_decode[str(int(label_train[0]))] feats_vis = feats_train.reshape((feats_train.shape[1],feats_train.shape[2])) sp.feats.plot(feature_matrix = feats_vis, feature_type=kwargs['feature_type'], title='Train: {} features label "{}"'.format(kwargs['feature_type'], label_train_vis), name4pic='train_feats{}.png'.format(sp.utils.get_date()), subprocess=True, energy_scale = energy_scale) feats_val, label_val = next(val_generator.generator()) try: label_val_vis = dict_decode[label_val[0]] except KeyError: label_val_vis = dict_decode[str(int(label_val[0]))] feats_vis = feats_val.reshape((feats_val.shape[1],feats_val.shape[2])) sp.feats.plot(feature_matrix = feats_vis, feature_type=kwargs['feature_type'], title='Val: {} features label "{}"'.format(kwargs['feature_type'], label_val_vis), name4pic='val_feats{}.png'.format(sp.utils.get_date()), subprocess=True, energy_scale = energy_scale) feats_test, label_test = next(test_generator.generator()) try: label_test_vis = dict_decode[label_test[0]] except KeyError: label_test_vis = dict_decode[str(int(label_test[0]))] feats_vis = feats_test.reshape((feats_test.shape[1],feats_test.shape[2])) sp.feats.plot(feature_matrix = feats_vis, feature_type=kwargs['feature_type'], title='Test: {} features label "{}"'.format(kwargs['feature_type'], label_test_vis), name4pic='test_feats{}.png'.format(sp.utils.get_date()), subprocess=True, energy_scale = energy_scale) ds_train = tf.data.Dataset.from_generator( spdl.make_gen_callable(train_generator.generator()), output_types=(feats_train.dtype, label_train.dtype), output_shapes=(feats_train.shape, label_train.shape)) ds_val = tf.data.Dataset.from_generator( spdl.make_gen_callable(val_generator.generator()), output_types=(feats_val.dtype, label_val.dtype), output_shapes=(feats_val.shape, label_val.shape)) ds_test = tf.data.Dataset.from_generator( spdl.make_gen_callable(test_generator.generator()), output_types=(feats_test.dtype, label_test.dtype), output_shapes=(feats_test.shape, label_test.shape)) print('\nShapes of X and y data from the train, val, and test generators:') print(ds_train) print(ds_val) print(ds_test) print() print('-'*79) if augment_dict: print('\nAugmentation(s) applied (at random): \n') for key, value in augment_dict.items(): if value == True: print('{}'.format(key).upper()) try: settings = augment_dict['augment_settings_dict'][key] print('- Settings: {}'.format(settings)) except KeyError: pass print() else: print('\nNo augmentations applied.\n') print('-'*79) # start training start = time.time() history = envclassifier.fit( ds_train, steps_per_epoch = len(dataset_dict['train']), callbacks = callbacks, epochs = epochs, validation_data = ds_val, validation_steps = val_data.shape[0] ) model_features_dict = dict(model_path = model_path, dataset_dict = dataset_dict, augment_dict = augment_dict) model_features_dict.update(settings_dict) model_features_dict.update(augment_dict) end = time.time() total_duration_seconds = round(end-start,2) time_dict = dict(total_duration_seconds=total_duration_seconds) model_features_dict.update(time_dict) model_features_dict_path = model_dir.joinpath('info_{}.csv'.format( model_name)) model_features_dict_path = sp.utils.save_dict( filename = model_features_dict_path, dict2save = model_features_dict) print('\nFinished training the model. The model and associated files can be '+\ 'found here: \n{}'.format(model_dir)) score = envclassifier.evaluate(ds_test, steps=1000) print('Test loss:', score[0]) print('Test accuracy:', score[1]) finished_time = time.time() total_total_duration = finished_time - start time_new_units, units = sp.utils.adjust_time_units(total_total_duration) print('\nEntire program took {} {}.\n\n'.format(time_new_units, units)) print('-'*79) return model_dir, history
[docs]def cnnlstm_extract_train( model_name = 'cnnlstm_classifier', dataset_dict = None, num_labels = None, augment_dict = None, audiodata_path = None, save_new_files_dir = None, labeled_data = True, ignore_label_marker = None, context_window = 5, batch_size = 10, epochs = 5, patience = 15, callbacks = None, random_seed = None, visualize = False, vis_every_n_items = 50, label_silence = False, **kwargs): '''Extract and augment features during training of a scene/environment/speech classifier Parameters ---------- model_name : str Name of the model. No extension (will save as .h5 file) dataset_dict : dict, optional A dictionary including datasets as keys, and audio file lists (with or without labels) as values. If None, will be created based on `audiodata_path`. (default None) augment_dict : dict, optional Dictionary containing keys (e.g. 'add_white_noise'). See `soundpy.augment.list_augmentations`and corresponding True or False values. If the value is True, the key / augmentation gets implemented at random, each epoch. (default None) audiodata_path : str, pathlib.PosixPath Where audio data can be found, if no `dataset_dict` provided. (default None) save_new_files_dir : str, pathlib.PosixPath Where new files (logging, model(s), etc.) will be saved. If None, will be set in a unique directory within the current working directory. (default None) **kwargs : additional keyword arguments Keyword arguments for `soundpy.feats.get_feats`. ''' # require 'feature_type' to be indicated if 'feature_type' not in kwargs: raise ValueError('Function `envclassifier_extract_train` expects the '+ \ 'parameter `feature_type` to be set as one of the following:\n'+ \ '- signal\n- stft\n- powspec\n- fbank\n- mfcc\n') #if 'stft' not in kwargs['feature_type'] and 'powspec' not in kwargs['feature_type']: #raise ValueError('Function `envclassifier_extract_train` can only reliably '+\ #'work if `feature_type` parameter is set to "stft" or "powspec".'+\ #' In future versions the other feature types will be made available.') # ensure defaults are set if not included in kwargs: if 'win_size_ms' not in kwargs: kwargs['win_size_ms'] = 20 if 'percent_overlap' not in kwargs: kwargs['percent_overlap'] = 0.5 if 'rate_of_change' not in kwargs: kwargs['rate_of_change'] = False if 'rate_of_acceleration' not in kwargs: kwargs['rate_of_acceleration'] = False if 'dur_sec' not in kwargs: raise ValueError('Function `envclassifier_extract_train``requires ' +\ 'the keyword argument `dur_sec` to be set. How many seconds of audio '+\ 'from each audio file would you like to use for training?') if 'sr' not in kwargs: kwargs['sr'] = 22050 if 'fft_bins' not in kwargs: import warnings fft_bins = int(kwargs['win_size_ms'] * kwargs['sr'] // 1000) msg = '\nWARNING: `fft_bins` was not set. Setting it to {}'.format(fft_bins) warnings.warn(msg) kwargs['fft_bins'] = fft_bins if 'real_signal' not in kwargs: kwargs['real_signal'] = True if 'window' not in kwargs: kwargs['window'] = 'hann' if 'zeropad' not in kwargs: kwargs['zeropad'] = True if 'num_filters' not in kwargs: kwargs['num_filters'] = 40 if 'num_mfcc' not in kwargs: kwargs['num_mfcc'] = 40 # training will fail if patience set to a non-integer type if patience is None: patience = epochs # Set up directory to save new files: # will not raise error if not exists: instead makes the directory if save_new_files_dir is None: save_new_files_dir = './example_feats_models/envclassifer/' dataset_path = sp.check_dir(save_new_files_dir, make = True) # create unique timestamped directory to save new files # to avoid overwriting issues: dataset_path = dataset_path.joinpath( 'features_{}_{}'.format(kwargs['feature_type'], sp.utils.get_date())) # create that new directory as well dataset_path = sp.check_dir(dataset_path, make=True) # set up datasets if no dataset_dict provided: if dataset_dict is None: if audiodata_path is None: raise ValueError('Function `cnnlstm_extract_train` expects either:\n'+\ '1) a `dataset_dict` with audiofile pathways assigned to datasets OR'+\ '\n2) a `audiodata_path` indicating where audiofiles for'+\ 'training are located.\n**Both cannot be None.') # sp.check_dir: # raises error if this path doesn't exist (make = False) # if does exist, returns path as pathlib.PosixPath object data_dir = sp.check_dir(audiodata_path, make = False) # collect labels labels = [] for label in data_dir.glob('*/'): if label.suffix: # avoid adding unwanted files in the directory # want only directory names continue if ignore_label_marker is not None: if ignore_label_marker in label.stem: continue # ignores hidden directories if label.stem[0] == '.': continue labels.append(label.stem) labels = set(labels) # create encoding and decoding dictionaries of labels: dict_encode, dict_decode = sp.datasets.create_dicts_labelsencoded( labels, add_extra_label = label_silence, extra_label = 'silence') # save labels and their encodings dict_encode_path = dataset_path.joinpath('dict_encode.csv') dict_decode_path = dataset_path.joinpath('dict_decode.csv') sp.utils.save_dict(dict2save = dict_encode, filename = dict_encode_path, overwrite=True) dict_decode_path = sp.utils.save_dict(dict2save = dict_decode, filename = dict_decode_path, overwrite=True) # get audio pathways and assign them their encoded labels: paths_list = sp.files.collect_audiofiles(data_dir, recursive=True) paths_list = sorted(paths_list) dict_encodedlabel2audio = sp.datasets.create_encodedlabel2audio_dict( dict_encode, paths_list) # path for saving dict for which audio paths are assigned to which labels: dict_encdodedlabel2audio_path = dataset_path.joinpath( 'dict_encdodedlabel2audio.csv') sp.utils.save_dict(dict2save = dict_encodedlabel2audio, filename = dict_encdodedlabel2audio_path, overwrite=True) # assign audio files int train, validation, and test datasets train, val, test = sp.datasets.audio2datasets( dict_encdodedlabel2audio_path, perc_train=0.8, limit=None, seed=random_seed) if random_seed is not None: random.seed(random_seed) random.shuffle(train) if random_seed is not None: random.seed(random_seed) random.shuffle(val) if random_seed is not None: random.seed(random_seed) random.shuffle(test) # save audiofiles for each dataset to dict and save # for logging purposes dataset_dict = dict([('train', train), ('val', val), ('test', test)]) dataset_dict_path = dataset_path.joinpath('dataset_audiofiles.csv') dataset_dict_path = sp.utils.save_dict( dict2save = dataset_dict, filename = dataset_dict_path, overwrite=True) else: if num_labels is None: raise ValueError('Function `cnnlstm_extract_train` requires '+\ '`num_labels` to be provided if a pre-made `dataset_dict` is provided.') # use pre-collected dataset dict dataset_dict = sp.utils.load_dict(dataset_dict) # don't have the label data available dict_encode, dict_decode = None, None feat_base_shape, shape_with_label = sp.feats.get_feature_matrix_shape( labeled_data = labeled_data, **kwargs) color_dimension = (1,) # our data is in grayscale if context_window: feat_base_shape = sp.feats.featshape_new_subframe(feat_base_shape, context_window, zeropad=True, axis=0, include_dim_size_1=True) input_shape = feat_base_shape + color_dimension if 'fbank' in kwargs['feature_type'] or 'mfcc' in kwargs['feature_type']: kwargs['fmax'] = kwargs['sr'] / 2.0 # Niquist theorem # extract validation data (must already be extracted) extracted_data_dict = dict([('val',dataset_dict['val']), ('test',dataset_dict['test'])]) val_path = dataset_path.joinpath('val_data.npy') test_path = dataset_path.joinpath('test_data.npy') extracted_data_path_dict = dict([('val', val_path), ('test', test_path)]) # extract test data print('\nExtracting validation data for use in training:') extracted_data_dict, extracted_data_path_dict = sp.feats.save_features_datasets( extracted_data_dict, extracted_data_path_dict, labeled_data = labeled_data, **kwargs) val_data = np.load(extracted_data_path_dict['val']) test_data = np.load(extracted_data_path_dict['test']) # start training start = time.time() if dict_encode is not None: num_labels = len(dict_encode) # otherwise should arleady be specified if augment_dict is None: augment_dict = dict() # designate where to save model and related files model_name = 'audioaugment_' + kwargs['feature_type'] model_dir = dataset_path.joinpath(model_name) model_dir = sp.utils.check_dir(model_dir, make=True) model_path = model_dir.joinpath(model_name) # setup model envclassifier, settings_dict = spdl.cnnlstm_classifier( input_shape = input_shape, num_labels = num_labels, lstm_cells = 40) # need to fix for other kinds of features optimizer = 'adam' loss = 'sparse_categorical_crossentropy' metrics = ['accuracy'] envclassifier.compile(optimizer = optimizer, loss = loss, metrics = metrics) # should randomly apply augmentations in generator # items that need to be called with each iteration: # save best model for each iteration - don't want to be overwritten # with worse model best_modelname = str(model_path) + '.h5' callbacks = spdl.setup_callbacks( patience = patience, best_modelname = best_modelname, log_filename = model_dir.joinpath('log.csv'), append = True) normalize = True tensor = (1,) train_generator = spdl.GeneratorFeatExtraction( datalist = dataset_dict['train'], model_name = model_name, normalize = normalize, apply_log = False, randomize = True, # want the data order to be different for each iteration random_seed = None, desired_input_shape = tensor + input_shape, batch_size = batch_size, gray2color = False, visualize = visualize, vis_every_n_items = vis_every_n_items, visuals_dir = model_dir.joinpath('images'), decode_dict = dict_decode, dataset = 'train', augment_dict = augment_dict, label_silence = label_silence, context_window = context_window, **kwargs) val_generator = spdl.Generator( data_matrix1 = val_data, desired_input_shape = tensor + input_shape, context_window = context_window) test_generator = spdl.Generator( data_matrix1 = test_data, desired_input_shape = tensor + input_shape, context_window = context_window) if 'stft' in kwargs['feature_type'] or 'fbank' in kwargs['feature_type'] \ or 'powspec' in kwargs['feature_type']: energy_scale = 'power_to_db' else: energy_scale = None feats_train, label_train = next(train_generator.generator()) #feats_vis = feats_train.reshape((feats_train.shape[1],feats_train.shape[2])) #sp.feats.plot(feature_matrix = feats_vis, feature_type=kwargs['feature_type'], #title='Train: {} features label "{}"'.format(kwargs['feature_type'], #dict_decode[label_train[0]]), #name4pic='train_feats{}.png'.format(sp.utils.get_date()), #subprocess=True, #energy_scale = energy_scale) feats_val, label_val = next(val_generator.generator()) #feats_vis = feats_val.reshape((feats_val.shape[1],feats_val.shape[2])) #sp.feats.plot(feature_matrix = feats_vis, feature_type=kwargs['feature_type'], #title='Val: {} features label "{}"'.format(kwargs['feature_type'], #dict_decode[label_val[0]]), #name4pic='val_feats{}.png'.format(sp.utils.get_date()), #subprocess=True, #energy_scale = energy_scale) feats_test, label_test = next(test_generator.generator()) #feats_vis = feats_test.reshape((feats_test.shape[1],feats_test.shape[2])) #sp.feats.plot(feature_matrix = feats_vis, feature_type=kwargs['feature_type'], #title='Test: {} features label "{}"'.format(kwargs['feature_type'], #dict_decode[label_test[0]]), #name4pic='test_feats{}.png'.format(sp.utils.get_date()), #subprocess=True, #energy_scale = energy_scale) ds_train = tf.data.Dataset.from_generator( spdl.make_gen_callable(train_generator.generator()), output_types=(feats_train.dtype, label_train.dtype), output_shapes=(feats_train.shape, label_train.shape)) ds_val = tf.data.Dataset.from_generator( spdl.make_gen_callable(val_generator.generator()), output_types=(feats_val.dtype, label_val.dtype), output_shapes=(feats_val.shape, label_val.shape)) ds_test = tf.data.Dataset.from_generator( spdl.make_gen_callable(test_generator.generator()), output_types=(feats_test.dtype, label_test.dtype), output_shapes=(feats_test.shape, label_test.shape)) print('\nShapes of X and y data from the train, val, and test generators:') print(ds_train) print(ds_val) print(ds_test) print() print('-'*79) if augment_dict: print('\nAugmentation(s) applied (at random): \n') for key, value in augment_dict.items(): if value == True: print('{}'.format(key).upper()) try: settings = augment_dict['augment_settings_dict'][key] print('- Settings: {}'.format(settings)) except KeyError: pass print() else: print('\nNo augmentations applied.\n') print('-'*79) history = envclassifier.fit( ds_train, steps_per_epoch = len(dataset_dict['train']), callbacks = callbacks, epochs = epochs, validation_data = ds_val, validation_steps = val_data.shape[0] ) model_features_dict = dict(model_path = model_path, dataset_dict = dataset_dict, augment_dict = augment_dict) model_features_dict.update(settings_dict) model_features_dict.update(augment_dict) end = time.time() total_duration_seconds = round(end-start,2) time_dict = dict(total_duration_seconds=total_duration_seconds) model_features_dict.update(time_dict) model_features_dict_path = model_dir.joinpath('info_{}.csv'.format( model_name)) model_features_dict_path = sp.utils.save_dict( filename = model_features_dict_path, dict2save = model_features_dict) print('\nFinished training the model. The model and associated files can be '+\ 'found here: \n{}'.format(model_dir)) score = envclassifier.evaluate(ds_test, steps=1000) print('Test loss:', score[0]) print('Test accuracy:', score[1]) finished_time = time.time() total_total_duration = finished_time - start time_new_units, units = sp.utils.adjust_time_units(total_total_duration) print('\nEntire program took {} {}.\n\n'.format(time_new_units, units)) print('-'*79) return model_dir, history
# TODO cleanup # TODO test # TODO continue docstrings
[docs]def denoiser_extract_train( model_name = 'denoiser', augment_dict = None, audiodata_clean_path = None, audiodata_noisy_path = None, features_dir = None, save_new_files_dir = None, labeled_data = False, ignore_label_marker = None, batch_size = 10, epochs = 5, patience = 15, callbacks = None, random_seed = 20, visualize = False, vis_every_n_items = 50, label_silence = False, val_data = None, test_data = None, append_model_dir = False, **kwargs): '''Extract and augment features during training of a scene/environment/speech classifier Parameters ---------- model_name : str Name of the model. No extension (will save as .h5 file) (default 'env_classifier') augment_dict : dict, optional Dictionary containing keys (e.g. 'add_white_noise'). See `soundpy.augment.list_augmentations`and corresponding True or False values. If the value is True, the key / augmentation gets implemented at random, each epoch. (default None) audiodata_path : str, pathlib.PosixPath Where audio data can be found, if no `features_dir` where previously extracted and prepared files are located. (default None) features_dir : str, pathlib.PosixPath The feature directory where previously extracted validation and test data are located, as well as the relevant log files. save_new_files_dir : str, pathlib.PosixPath Where new files (logging, model(s), etc.) will be saved. If None, will be set in a unique directory within the current working directory. (default None) labeled_data : bool Useful in determining shape of data. If True, expected label column to exist at the end of the feature column of feature data. Note: this may be removed in future versions. ignore_label_marker : str When collecting labels from subdirectory names, this allows a subfolder name to be ignored. For example, if `ignore_label_marker` is set as '__', the folder name '__test__' will not be included as a label while a folder name 'dog_barking' will. **kwargs : additional keyword arguments Keyword arguments for `soundpy.feats.get_feats`. ''' if features_dir is not None: features_dir = sp.utils.string2pathlib(features_dir) feat_settings_file = features_dir.joinpath('log_extraction_settings.csv') feat_settings_dict = sp.utils.load_dict(feat_settings_file) # should be a dict feat_kwargs = sp.utils.restore_dictvalue(feat_settings_dict['kwargs']) print(feat_kwargs) # load decode dictionary for labeled data dict_decode_path = features_dir.joinpath('dict_decode.csv') dict_decode = sp.utils.load_dict(dict_decode_path) dict_encode = None # ensure items in dictionaries original type for key, value in feat_kwargs.items(): feat_kwargs[key] = sp.utils.restore_dictvalue(value) for key, value in feat_settings_dict.items(): feat_settings_dict[key] = sp.utils.restore_dictvalue(value) for key, value in dict_decode.items(): # expects key to be integer dict_decode[key] = sp.utils.restore_dictvalue(value) # update kwargs with loaded feature kwargs kwargs = dict(feat_kwargs) # require 'feature_type' to be indicated if 'feature_type' not in kwargs: raise ValueError('Function `denoiser_extract_train` expects the '+ \ 'parameter `feature_type` to be set as one of the following:\n'+ \ '- signal\n- stft\n- powspec\n- fbank\n- mfcc\n') #if 'stft' not in kwargs['feature_type'] and 'powspec' not in kwargs['feature_type']: #raise ValueError('Function `denoiser_extract_train` can only reliably '+\ #'work if `feature_type` parameter is set to "stft" or "powspec".'+\ #' In future versions the other feature types will be made available.') # ensure defaults are set if not included in kwargs: if 'win_size_ms' not in kwargs: kwargs['win_size_ms'] = 20 if 'percent_overlap' not in kwargs: kwargs['percent_overlap'] = 0.5 if 'rate_of_change' not in kwargs: kwargs['rate_of_change'] = False if 'rate_of_acceleration' not in kwargs: kwargs['rate_of_acceleration'] = False if 'dur_sec' not in kwargs: raise ValueError('Function `denoiser_extract_train``requires ' +\ 'the keyword argument `dur_sec` to be set. How many seconds of audio '+\ 'from each audio file would you like to use for training?') if 'sr' not in kwargs: kwargs['sr'] = 22050 if 'fft_bins' not in kwargs: import warnings fft_bins = int(kwargs['win_size_ms'] * kwargs['sr'] // 1000) msg = '\nWARNING: `fft_bins` was not set. Setting it to {}'.format(fft_bins) warnings.warn(msg) kwargs['fft_bins'] = fft_bins if 'real_signal' not in kwargs: kwargs['real_signal'] = True if 'window' not in kwargs: kwargs['window'] = 'hann' if 'zeropad' not in kwargs: kwargs['zeropad'] = True if 'num_filters' not in kwargs: kwargs['num_filters'] = 40 if 'num_mfcc' not in kwargs: kwargs['num_mfcc'] = 40 # training will fail if patience set to a non-integer type if patience is None: patience = epochs if features_dir is None: # Set up directory to save new files: # will not raise error if not exists: instead makes the directory if save_new_files_dir is None: save_new_files_dir = './example_feats_models/denoiser/' dataset_path = sp.check_dir(save_new_files_dir, make = True) # create unique timestamped directory to save new files # to avoid overwriting issues: dataset_path = dataset_path.joinpath( 'features_{}_{}'.format(kwargs['feature_type'], sp.utils.get_date())) # create that new directory as well dataset_path = sp.check_dir(dataset_path, make=True) else: dataset_path = features_dir # designate where to save model and related files model_name += '_' + kwargs['feature_type'] model_dir = dataset_path.joinpath(model_name) model_dir = sp.utils.check_dir(model_dir, make=True, append=append_model_dir) # don't want to overwrite already trained model and logs model_path = model_dir.joinpath(model_name+'.h5') if features_dir is None: if audiodata_clean_path is None: raise ValueError('Function `denoiser_extract_train` expects either:\n'+\ '1) a `dataset_dict` with audiofile pathways assigned to datasets OR'+\ '\n2) `audiodata_clean_path` and `audiodata_noisy_path` indicating where audiofiles for'+\ 'training are located.\n**Both cannot be None.') # sp.check_dir: # raises error if this path doesn't exist (make = False) # if does exist, returns path as pathlib.PosixPath object data_clean_dir = sp.check_dir(audiodata_clean_path, make = False) data_noisy_dir = sp.check_dir(audiodata_noisy_path, make = False) paths_list_clean = sp.files.collect_audiofiles(data_clean_dir, recursive=False) paths_list_clean = sorted(paths_list_clean) paths_list_noisy = sp.files.collect_audiofiles(data_noisy_dir, recursive=False) paths_list_noisy = sorted(paths_list_noisy) # for now not using any test data: too small a dataset # can test from greater dataset train_clean, test_clean, __ = sp.datasets.waves2dataset( audiolist = paths_list_clean, perc_train=1, seed=40, train=True, val=False, test=False) train_noisy, test_noisy, __ = sp.datasets.waves2dataset( audiolist = paths_list_noisy, perc_train=1, seed=40, train=True, val=False, test=False) # save filenames not used in training #doc_dir = model_path.parent #sp.utils.save_dict(doc_dir.joinpath('test_noisy_files.csv'), #dict(test_noisy = test_noisy)) #sp.utils.save_dict(doc_dir.joinpath('test_clean_files.csv'), #dict(test_clean = test_clean)) if random_seed is not None: random.seed(random_seed) random.shuffle(train_clean) if random_seed is not None: random.seed(random_seed) random.shuffle(train_noisy) if random_seed is not None: random.seed(random_seed) random.shuffle(test_clean) if random_seed is not None: random.seed(random_seed) random.shuffle(test_noisy) for i in range(10): try: print(train_clean[i]) print() except IndexError: pass try: print(train_noisy[i]) print() print() except IndexError: pass try: print(test_clean[i]) except IndexError: pass try: print(test_noisy[i]) except IndexError: pass labeled_data = False feat_base_shape, shape_for_model = sp.feats.get_feature_matrix_shape( labeled_data = labeled_data, **kwargs) if 'fbank' in kwargs['feature_type'] or 'mfcc' in kwargs['feature_type']: kwargs['fmax'] = kwargs['sr'] / 2.0 # Niquist theorem # extract validation data (must already be extracted) color_dimension = (1,) # our data is in grayscale input_shape = feat_base_shape + color_dimension if augment_dict is None: augment_dict = dict() # setup model denoiser, settings_dict = spdl.autoencoder_denoise( input_shape = input_shape) adm = tf.keras.optimizers.Adam(learning_rate=0.0001) denoiser.compile(optimizer=adm, loss='binary_crossentropy') # should randomly apply augmentations in generator # items that need to be called with each iteration: # save best model for each iteration - don't want to be overwritten # with worse model best_modelname = str(model_path) + '.h5' callbacks = spdl.setup_callbacks( patience = patience, early_stop = False, # don't have validation data save_bestmodel = False, best_modelname = best_modelname, # won't be used (no validation data) log_filename = model_dir.joinpath('log.csv'), append = True) normalize = True tensor = (1,) train_generator = spdl.GeneratorFeatExtraction( datalist = train_noisy, datalist2 = train_clean, model_name = model_name, normalize = normalize, apply_log = False, randomize = True, # want the data order to be different for each iteration random_seed = 50, desired_input_shape = tensor + input_shape, batch_size = batch_size, gray2color = False, visualize = visualize, vis_every_n_items = vis_every_n_items, visuals_dir = model_dir.joinpath('images'), decode_dict = None, dataset = 'train', augment_dict = augment_dict, label_silence = label_silence, **kwargs) if 'stft' in kwargs['feature_type'] or 'fbank' in kwargs['feature_type'] \ or 'powspec' in kwargs['feature_type']: energy_scale = 'power_to_db' else: energy_scale = None feats_noisy, feats_clean = next(train_generator.generator()) # visualize the features feats_vis_noisy = feats_noisy.reshape((feats_noisy.shape[1],feats_noisy.shape[2])) sp.feats.plot(feature_matrix = feats_vis_noisy, feature_type=kwargs['feature_type'], title='Train: {} features label "{}"'.format(kwargs['feature_type'], 'noisy'), name4pic='feats_noisy{}.png'.format(sp.utils.get_date()), subprocess=True, energy_scale = energy_scale) feats_vis_clean = feats_clean.reshape((feats_clean.shape[1],feats_clean.shape[2])) sp.feats.plot(feature_matrix = feats_vis_clean, feature_type=kwargs['feature_type'], title='Train: {} features label "{}"'.format(kwargs['feature_type'], 'clean'), name4pic='feats_clean{}.png'.format(sp.utils.get_date()), subprocess=True, energy_scale = energy_scale) ds_train = tf.data.Dataset.from_generator( spdl.make_gen_callable(train_generator.generator()), output_types=(feats_noisy.dtype, feats_clean.dtype), output_shapes=(feats_noisy.shape, feats_clean.shape)) print('\nShapes of X and y data from the train generator:') print(ds_train) print('-'*79) if augment_dict: print('\nAugmentation(s) applied (at random): \n') for key, value in augment_dict.items(): if value == True: print('{}'.format(key).upper()) try: settings = augment_dict['augment_settings_dict'][key] print('- Settings: {}'.format(settings)) except KeyError: pass print() else: print('\nNo augmentations applied.\n') print('-'*79) # start training start = time.time() history = denoiser.fit( ds_train, steps_per_epoch = len(train_noisy), callbacks = callbacks, epochs = epochs) denoiser.save(model_path) # save this info for when implementing model kwargs['input_shape'] = input_shape sp.utils.save_dict(model_dir.joinpath('log_extraction_settings.csv'), kwargs) model_features_dict = dict(model_path = model_path, augment_dict = augment_dict) model_features_dict.update(settings_dict) model_features_dict.update(augment_dict) #model_features_dict.update(kwargs) end = time.time() total_duration_seconds = round(end-start,2) time_dict = dict(total_duration_seconds = total_duration_seconds) model_features_dict.update(time_dict) model_features_dict_path = model_dir.joinpath('info_{}.csv'.format( model_name)) model_features_dict_path = sp.utils.save_dict( filename = model_features_dict_path, dict2save = model_features_dict) print('\nFinished training the model. The model and associated files can be '+\ 'found here: \n{}'.format(model_dir)) finished_time = time.time() total_total_duration = finished_time - start time_new_units, units = sp.utils.adjust_time_units(total_total_duration) print('\nEntire program took {} {}.\n\n'.format(time_new_units, units)) print('-'*79) return model_dir, history