'''The soundpy.builtin module includes more complex functions that pull from several
other functions to complete fairly complex tasks, such as dataset formatting,
filtering signals, and extracting features for neural networks.
'''
import time
import pathlib
import random
import numpy as np
import soundfile as sf
from scipy.io.wavfile import write
# in order to import soundpy
import os, sys
import inspect
currentdir = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))
packagedir = os.path.dirname(currentdir)
sys.path.insert(0, packagedir)
import soundpy as sp
[docs]def filtersignal(audiofile,
sr = None,
noise_file=None,
filter_type='wiener', # 'band_specsub'
filter_scale=1,
apply_postfilter=False,
duration_noise_ms=120,
real_signal=False,
phase_radians=True,
num_bands=None,
visualize=False,
visualize_every_n_windows=50,
max_vol = 0.4,
min_vol = 0.15,
save2wav=False,
output_filename=None,
overwrite=False,
use_scipy=False,
remove_dc=True,
control_vol = False,
**kwargs):
"""Apply Wiener or band spectral subtraction filter to signal using noise.
The noise can be provided as a separate file / samples, or it can be taken
from the beginning of the provided audio. How much noise is measured can be
set in the parameter `duration_noise_ms`.
Parameters
----------
audiofile : str, np.ndarray [size=(num_samples,) or (num_samples, num_channels)]
Filename or the audio data of the signal to be filtered.
sr : int
The sample rate of the audio. If `audiofile` is type np.ndarray, sr is
required. (default None)
noise_file : str, tuple, optional
Path to either noise audiofile or .npy file containing average power
spectrum values. If tuple, must include samples and sr.
If None, the beginning of the `audiofile` will be used for noise data.
(default None)
filter_type : str
Type of filter to apply. Options 'wiener' or 'band_specsub'.
filter_scale : int or float
The scale at which the filter should be applied. This value will be multiplied
to the noise levels thereby increasing or decreasing the filter strength.
(default 1)
apply_postfilter : bool
Whether or not the post filter should be applied. The post filter
reduces musical noise (i.e. distortion) in the signal as a byproduct
of filtering.
duration_noise_ms : int or float
The amount of time in milliseconds to use from noise to apply the
Welch's method to. In other words, how much of the noise to use
when approximating the average noise power spectrum.
real_signal : bool
If True, only half of the (mirrored) fast Fourier transform will be used
during filtering. For audio, there is no difference. This is visible in the
plots, however, if you are interested. (default False)
phase_radians : bool
Relevant for band spectral subtraction: whether phase should be calculated in
radians or complex values/ power spectrum. (default True)
num_bands : int
Relevant for band spectral subtraction: the number of bands to section frequencies
into. By grouping sections of frequencies during spectral subtraction filtering,
musical noise or distortion should be reduced. (defaults to 6)
visualize : bool
If True, plots of the windows and filtered signal will be made. (default False)
visualize_every_n_windows : int
If `visualize` is set to True, this controls how often plots are made: every 50
windows, for example. (default 50)
max_vol : int or float
The maximum volume level of the filtered signal. This is useful if you know you
do not want the signal to be louder than a certain value. Ears are important
(default 0.4) TODO improve on matching volume to original signal? At least use
objective measures.
min_vol : int or float
The minimum volume level of the filtered signal. (default 0.15)
TODO improve on matching volume to original signal.
save2wav : bool
If True, will save the filtered signal as a .wav file
output_filename : str, pathlib.PosixPath, optional
path and name the filtered signal is to be saved. (default None) If no filename
provided, will save under date.
overwrite : bool
If True and an audiofile by the same name exists, that file will be overwritten.
use_scipy : bool
If False, audiofiles will be loaded using librosa. Otherwise, scipy.io.wavfile.
(default False)
remove_dc : bool
It True, the DC bias ('direct current' bias) will be removed. In other words, the
mean amplitude will be made to equal 0.
**kwargs : additional keyword arguments
Keyword arguments for `soundpy.filters.WienerFilter` or
'soundpy.filters.BandSubtraction` (depending on `filter_type`).
Returns
-------
enhanced_signal : np.ndarray [size = (num_samples, )]
The enhanced signal in raw sample form. Stereo audio has not yet been tested.
sr : int
The sample rate of the enhanced/ filtered signal.
References
----------
Kamath, S. and Loizou, P. (2002). A multi-band spectral subtraction
method for enhancing speech corrupted by colored noise. Proc. IEEE Int.
Conf. Acoust.,Speech, Signal Processing
Kamath, S. and Loizou, P. (2006). mband.m MATLAB code from the book:
C Loizou, P. (2013). Speech Enhancement: Theory and Practice.
"""
if sr is None:
sr = 48000
if 'wiener' in filter_type:
if sr == 22050:
import warnings
warnings.warn('\n\nWARNING: sample rate 22050 may have some '+\
'missing frames within the filtered signal. \nIf possible, '+\
'perhaps use 8000, 16000, 41000, or 48000 sample rate instead.\n')
fil = sp.WienerFilter(sr = sr, **kwargs)
elif 'band' in filter_type:
# at this point, band spectral subtraction only works with
if sr != 48000:
import warnings
warnings.warn('\n\nWARNING: Band spectral subtraciton requires a sample rate'+\
' of 48 kHz. Sample rate adjusted from {} to 48000.\n'.format(sr))
sr = 48000
fil = sp.BandSubtraction(sr=48000,
**kwargs)
if visualize:
frame_subtitle = 'frame size {}ms, window shift {}ms'.format(fil.frame_dur, int(fil.percent_overlap*fil.frame_dur))
# load signal (to be filtered)
if not isinstance(audiofile, np.ndarray):
samples_orig, sr = sp.loadsound(audiofile, fil.sr, dur_sec=None,
use_scipy=use_scipy, remove_dc=remove_dc)
else:
samples_orig, sr = audiofile, sr
if remove_dc:
samples_orig = sp.dsp.remove_dc_bias(samples_orig)
if sr != fil.sr:
samples_orig, sr = sp.dsp.resample_audio(samples_orig,
sr_original = sr,
sr_desired = fil.sr)
assert fil.sr == sr
# TODO improve on volume control, improve SNR
# set volume max and min, or based on original sample data
fil.set_volume(samples_orig, max_vol = max_vol, min_vol = min_vol)
# set how many subframes are needed to process entire target signal
fil.set_num_subframes(len(samples_orig), is_noise=False, zeropad=fil.zeropad)
# prepare noise
# set up how noise will be considered: either as audiofile, averaged
# power values, or the first section of the target audiofile (i.e. None)
samples_noise = None
if noise_file:
if isinstance(noise_file, tuple):
# tuple must contain samples and sampling rate
samples_noise, sr_noise = noise_file
if remove_dc:
samples_noise = sp.dsp.remove_dc_bias(samples_noise)
if sr_noise != fil.sr:
samples_noise, sr_noise = sp.dsp.resample_audio(samples_noise,
sr_noise,
fil.sr)
assert sr_noise == fil.sr
# ensure string objects converted to pathlib.PosixPath objects:
elif not isinstance(noise_file, pathlib.PosixPath) and isinstance(noise_file, str):
noise_file = pathlib.Path(noise_file)
# find out path information
if isinstance(noise_file, pathlib.PosixPath):
extension = noise_file.suffix
if '.npy' in extension:
# if noise power spectrum already calculated or not
if 'powspec' in noise_file.stem or 'powerspectrum' in noise_file.stem:
noise_power = fil.load_power_vals(noise_file)
samples_noise = None
elif 'stft' in noise_file.stem:
noise_power = np.load(noise_file)
noise_power = np.abs(noise_power)**2
else:
# assume audio pathway
if duration_noise_ms is not None:
dur_sec = duration_noise_ms/1000
else:
dur_sec = None
samples_noise, sr_noise = sp.loadsound(noise_file,
fil.sr,
dur_sec=dur_sec,
use_scipy=use_scipy,
remove_dc = remove_dc)
assert sr_noise == fil.sr
if samples_noise is None and noise_power is None:
raise TypeError('Expected one of the following: '+\
'\ntype tuple containing (samples, samplerate) of noise data'+\
'\naudiofile pathway to noise file'+\
'\n.npy file with powerspectrum values for noise'+\
'\n\nDid not expect {} as input.'.format(noise_file))
else:
starting_noise_len = sp.dsp.calc_frame_length(fil.sr,
duration_noise_ms)
samples_noise = samples_orig[:starting_noise_len]
# if noise samples have been collected...
# TODO improve snr / volume measurements
if samples_noise is not None:
# set how many subframes are needed to process entire noise signal
fil.set_num_subframes(len(samples_noise), is_noise=True, zeropad=fil.zeropad)
if visualize:
sp.feats.plot(samples_orig, 'signal', title='Signal to filter'.upper(),sr = fil.sr)
sp.feats.plot(samples_noise, 'signal', title= 'Noise samples to filter out'.upper(), sr=fil.sr)
# prepare noise power matrix (if it's not loaded already)
if fil.noise_subframes:
if real_signal:
#only the first half of fft (+1)
total_rows = fil.num_fft_bins//2+1
else:
total_rows = fil.num_fft_bins
noise_power = sp.dsp.create_empty_matrix((total_rows,))
section = 0
for frame in range(fil.noise_subframes):
noise_section = samples_noise[section:section+fil.frame_length]
noise_w_win = sp.dsp.apply_window(noise_section, fil.get_window(),
zeropad=fil.zeropad)
noise_fft = sp.dsp.calc_fft(noise_w_win, real_signal=real_signal)
noise_power_frame = sp.dsp.calc_power(noise_fft)
noise_power += noise_power_frame
section += fil.overlap_length
# welch's method: take average of power that has been collected
# in windows
noise_power = sp.dsp.calc_average_power(noise_power,
fil.noise_subframes)
assert section == fil.noise_subframes * fil.overlap_length
if visualize:
sp.feats.plot(noise_power, 'stft',title='Average noise power spectrum'.upper()+'\n{}'.format(frame_subtitle), energy_scale='power_to_db',
sr = fil.sr)
# prepare target power matrix
increment_length = int(fil.frame_length * fil.percent_overlap)
total_rows = increment_length + increment_length * fil.target_subframes
filtered_sig = sp.dsp.create_empty_matrix(
(total_rows,), complex_vals=True)
section = 0
row = 0
target_power_baseline = 0
# increase/decrease noise values to increase strength of filter
if filter_scale is None:
filter_scale = 1
noise_power *= filter_scale
try:
for frame in range(fil.target_subframes):
target_section = samples_orig[section:section+fil.frame_length]
target_w_window = sp.dsp.apply_window(target_section,
fil.get_window(),
zeropad=fil.zeropad)
if visualize and frame % visualize_every_n_windows == 0:
sp.feats.plot(target_section,'signal', title='Signal'.upper()+' \nframe {}: {}'.format( frame+1,frame_subtitle),sr = fil.sr)
sp.feats.plot(target_w_window,'signal', title='Signal with {} window'.format(fil.window_type).upper()+'\nframe {}: {}'.format( frame+1,frame_subtitle),sr = fil.sr)
target_fft = sp.dsp.calc_fft(target_w_window, real_signal=real_signal)
target_power = sp.dsp.calc_power(target_fft)
# now start filtering!!
# initialize SNR matrix
if visualize and frame % visualize_every_n_windows == 0:
sp.feats.plot(target_power,'stft', title='Signal power spectrum'.upper()+'\nframe {}: {}'.format( frame+1,frame_subtitle), energy_scale='power_to_db', sr = fil.sr)
if 'wiener' in filter_type:
enhanced_fft = fil.apply_wienerfilter(frame,
target_fft,
target_power,
noise_power)
if apply_postfilter:
enhanced_fft = fil.apply_postfilter(enhanced_fft,
target_fft,
target_power)
elif 'band' in filter_type:
target_phase = sp.dsp.calc_phase(target_fft,
radians=phase_radians)
enhanced_fft = fil.apply_bandspecsub(target_power,
target_phase,
noise_power)
if apply_postfilter:
enhanced_fft = fil.apply_postfilter(enhanced_fft,
target_fft,
target_power,
noise_power)
enhanced_ifft = sp.dsp.calc_ifft(enhanced_fft,
real_signal=real_signal)
try:
filtered_sig[row:row+fil.frame_length] += enhanced_ifft
except ValueError:
# with sample rate 22050, had some problems... zeropad missing frames
if len(filtered_sig[row:row+fil.frame_length]) < fil.frame_length:
diff = fil.frame_length - len(filtered_sig[row:row+fil.frame_length])
filtered_sig[row:row+fil.frame_length] += \
enhanced_ifft[:fil.frame_length-diff]
elif len(enhanced_ifft) < fil.frame_length:
diff = fil.frame_length - len(enhanced_ifft)
filtered_sig[row:row+fil.frame_length-diff] += enhanced_ifft
if visualize and frame % visualize_every_n_windows == 0:
sp.feats.plot(filtered_sig,'signal', title='Filtered signal'.upper()+'\nup to frame {}: {}'.format(frame+1,frame_subtitle), sr = fil.sr)
row += fil.overlap_length
section += fil.overlap_length
except ValueError as e:
raise e
assert row == fil.target_subframes * fil.overlap_length
assert section == fil.target_subframes * fil.overlap_length
# make enhanced_ifft values real
enhanced_signal = filtered_sig.real
if visualize:
rows = len(filtered_sig)//increment_length
cols = increment_length
sp.feats.plot(np.abs(filtered_sig.reshape((
rows,cols,)))**2,
'stft', title='Final filtered signal power spectrum'.upper()+'\n{}: {}'.format(filter_type,frame_subtitle), energy_scale='power_to_db')
sp.feats.plot(enhanced_signal,'signal', title='Final filtered signal'.upper()+'\n{}'.format(filter_type), sr = fil.sr)
if control_vol:
enhanced_signal = fil.check_volume(enhanced_signal)
if len(enhanced_signal) > len(samples_orig):
enhanced_signal = enhanced_signal[:len(samples_orig)]
# for backwards compatibility
if output_filename is not None or save2wav:
if output_filename is None:
output_filename = sp.utils.get_date()+'.wav'
saved_filename = sp.savesound(str(output_filename),
enhanced_signal,
sr=fil.sr,
overwrite=overwrite,
remove_dc=remove_dc)
return enhanced_signal, fil.sr
[docs]def dataset_logger(audiofile_dir = None, recursive=True):
'''Logs name, format, bitdepth, sr, duration of audiofiles, num_channels
Parameters
----------
audiofile_dir : str or pathlib.PosixPath
The directory where audiofiles of interest are. If no directory
provided, the current working directory will be used.
recursive : bool
If True, all audiofiles will be analyzed, also in nested directories.
Otherwise, only the audio files in the immediate directory will be
analyzed. (default True)
Returns
-------
audiofile_dict : dict
Dictionary within a dictionary, holding the formats of the audiofiles in the
directory/ies.
Examples
--------
>>> audio_info = dataset_logger()
>>> # look at three audio files:
>>> count = 0
>>> for key, value in audio_info.items():
...: for k, v in value.items():
...: print(k, ' : ', v)
...: count += 1
...: print()
...: if count > 2:
...: break
audio : audiodata/dogbark_2channels.wav
sr : 48000
num_channels : 2
dur_sec : 0.389
format_type : WAV
bitdepth : PCM_16
<BLANKLINE>
audio : audiodata/python_traffic_pf.wav
sr : 48000
num_channels : 1
dur_sec : 1.86
format_type : WAV
bitdepth : DOUBLE
<BLANKLINE>
audio : audiodata/259672__nooc__this-is-not-right.wav
sr : 44100
num_channels : 1
dur_sec : 2.48453514739229
format_type : WAV
bitdepth : PCM_16
See Also
--------
soundfile.available_subtypes
The subtypes available with the package SoundFile
soundfile.available_formats
The formats available with the package SoundFile
'''
# ensure audio directory exists:
if audiofile_dir is None:
audiofile_dir = './'
audiofile_dir = sp.utils.check_dir(audiofile_dir)
audiofiles = sp.files.collect_audiofiles(audiofile_dir,
recursive = recursive)
audiofile_dict = dict()
for i, audio in enumerate(audiofiles):
# set sr to None to get audio file's sr
# set mono to False to see if mono or stereo sound
y, sr = sp.loadsound(audio, sr=None, mono=False)
# see number of channels
if len(y.shape) > 1:
num_channels = y.shape[1]
else:
num_channels = 1
dur_sec = len(y)/sr
try:
so = sf.SoundFile(audio)
bitdepth = so.subtype
format_type = so.format
except RuntimeError:
if isinstance(audio, str):
audio = pathlib.Path(audio)
format_type = audio.suffix.upper()[1:] # remove starting dot
bitdepth = 'unknown'
# ensure audio is string: if pathlib.PosixPath, it saves
# the PurePath in the string and makes it difficult to deal
# with later.
audio = str(audio)
curr_audio_dict = dict(audio = audio,
sr = sr,
num_channels = num_channels,
dur_sec = dur_sec,
format_type = format_type,
bitdepth = bitdepth)
audiofile_dict[audio] = curr_audio_dict
sp.utils.print_progress(i, len(audiofiles), task='logging audio file details')
return audiofile_dict
# TODO speed this up, e.g. preload noise data?
[docs]def create_denoise_data(cleandata_dir, noisedata_dir, trainingdata_dir, limit=None,
snr_levels = None, pad_mainsound_sec = None,
random_seed = None, overwrite = False, **kwargs):
'''Applies noise to clean audio; saves clean and noisy audio to `traingingdata_dir`.
Parameters
----------
cleandata_dir : str, pathlib.PosixPath
Name of folder containing clean audio data for autoencoder. E.g. 'clean_speech'
noisedata_dir : str, pathlib.PosixPath
Name of folder containing noise to add to clean data. E.g. 'noise'
trainingdata_dir : str, pathlib.PosixPath
Directory to save newly created train, validation, and test data
limit : int, optional
Limit in number of audiofiles used for training data
snr_levels : list of ints, optional
List of varying signal-to-noise ratios to apply to noise levels.
(default None)
pad_mainsound_sec : int, float, optional
Amount in seconds the main sound should be padded. In other words, in seconds how
long the background sound should play before the clean / main / target audio starts.
The same amount of noise will be appended at the end.
(default None)
random_seed : int
A value to allow random order of audiofiles to be predictable.
(default None). If None, the order of audiofiles will not be predictable.
overwrite : bool
If True, a new dataset will be created regardless of whether or not a matching
directory already exists. (default False)
**kwargs : additional keyword arguments
The keyword arguments for soundpy.files.loadsound
Returns
-------
saveinput_path : pathlib.PosixPath
Path to where noisy audio files are located
saveoutput_path : pathlib.PosixPath
Path to where clean audio files are located
See Also
--------
soundpy.files.loadsound
Loads audiofiles.
soundpy.dsp.add_backgroundsound
Add background sound / noise to signal at a determined signal-to-noise ratio.
'''
import math
import time
start = time.time()
# check to ensure clean and noisy data are there
# and turn them into pathlib.PosixPath objects:
cleandata_dir = sp.utils.check_dir(cleandata_dir, make=False)
noisedata_dir = sp.utils.check_dir(noisedata_dir, make=False)
trainingdata_dir = sp.utils.string2pathlib(trainingdata_dir)
cleandata_folder = 'clean'
noisedata_folder = 'noisy'
if limit is not None:
cleandata_folder += '_limit'+str(limit)
noisedata_folder += '_limit'+str(limit)
newdata_clean_dir = trainingdata_dir.joinpath(cleandata_folder)
newdata_noisy_dir = trainingdata_dir.joinpath(noisedata_folder)
# See if databases already exist:
if not overwrite:
try:
newdata_clean_dir = sp.utils.check_dir(newdata_clean_dir, make=False,
append = False)
newdata_noisy_dir = sp.utils.check_dir(newdata_noisy_dir, make=False,
append = False)
except FileExistsError:
raise FileExistsError('Datasets already exist at this location. Set '+\
'`overwrite` to True or designate a new directory.')
except FileNotFoundError:
pass
# create directory to save new data (if not exist)
newdata_clean_dir = sp.utils.check_dir(newdata_clean_dir, make = True)
newdata_noisy_dir = sp.utils.check_dir(newdata_noisy_dir, make = True)
# collect audiofiles (not limited to .wav files)
cleanaudio = sorted(sp.files.collect_audiofiles(cleandata_dir,
hidden_files = False,
wav_only = False,
recursive = False))
noiseaudio = sorted(sp.files.collect_audiofiles(noisedata_dir,
hidden_files = False,
wav_only = False,
recursive = False))
if random_seed is not None:
random.seed(random_seed)
random.shuffle(cleanaudio)
if limit is not None:
cleanaudio = cleanaudio[:limit]
# ensure snr_levels is array-like
if snr_levels is not None:
if not isinstance(snr_levels, list) and not isinstance(snr_levels, np.ndarray):
snr_levels = list(snr_levels)
for i, wavefile in enumerate(cleanaudio):
sp.utils.print_progress(iteration=i,
total_iterations=len(cleanaudio),
task='clean and noisy audio data generation')
# no random seed applied here:
# each choice would be the same for each iteration
noise = random.choice(noiseaudio)
if snr_levels is not None:
snr = random.choice(snr_levels)
else:
snr = None
clean_stem = wavefile.stem
noise_stem = noise.stem
# load clean data to get duration
if 'sr' not in kwargs:
# set at high sr for measuring noise in signals,
# necessary in applying noise at specific SNR level
kwargs['sr'] = 44100
else:
if int(kwargs['sr']) < 44100:
import warnings
msg = 'The measuring of signal to noise ratio is '+\
'improved if the sample rate is at or above 44100 Hz.'+\
'\nConsider changing sr = {} to sr = 44100.'.format(
kwargs['sr'])
warnings.warn(msg)
clean_data, sr = sp.loadsound(wavefile, **kwargs)
noise_data, sr = sp.loadsound(noise, **kwargs)
# makes adding of sounds smoother:
clean_data = sp.dsp.remove_dc_bias(clean_data)
noise_data = sp.dsp.remove_dc_bias(noise_data)
# incase any weird clicks at beginning / end of signals:
clean_data = sp.dsp.clip_at_zero(clean_data, samp_win = 10)
noise_data = sp.dsp.clip_at_zero(noise_data, samp_win = 10)
noisy_data, snr_appx = sp.dsp.add_backgroundsound(
audio_main = clean_data,
audio_background = noise_data,
snr = snr,
pad_mainsound_sec = pad_mainsound_sec,
wrap = False,
**kwargs)
if pad_mainsound_sec:
# pad clean the same way as noisy so they are the same length
num_pad_samps = sp.dsp.calc_frame_length(pad_mainsound_sec * 1000, sr)
padding = np.zeros(num_pad_samps)
clean_data = np.concatenate((padding, clean_data, padding))
# ensure length of clean and noisy data match
assert len(clean_data) == len(noisy_data)
# ensure both noisy and clean files have same beginning to filename (i.e. clean filename)
noisydata_filename = newdata_noisy_dir.joinpath(clean_stem+'_'+noise_stem\
+'_snr'+str(snr)+'.wav')
cleandata_filename = newdata_clean_dir.joinpath(clean_stem+'.wav')
write(noisydata_filename, sr_down, noisy_data)
write(cleandata_filename, sr_down, clean_data)
end = time.time()
total_time, units = sp.utils.adjust_time_units(end-start)
print('Data creation took a total of {} {}.'.format(
round(total_time, 2),
units))
return newdata_noisy_dir, newdata_clean_dir
[docs]def envclassifier_feats(
data_dir,
data_features_dir = None,
perc_train = 0.8,
ignore_label_marker = None,
**kwargs):
'''Environment Classifier: feature extraction of scene audio into train, val, & test datasets.
Saves extracted feature datasets (train, val, test datasets) as well as
feature extraction settings in the directory `data_features_dir`.
Parameters
----------
data_dir : str or pathlib.PosixPath
The directory with scene subfolders (e.g. 'air_conditioner', 'traffic') that
contain audio files belonging to that scene (e.g. 'air_conditioner/ac1.wav',
'air_conditioner/ac2.wav', 'traffic/t1.wav').
data_features_dir : str or pathlib.PosixPath, optional
The directory where feature extraction related to the dataset will be stored.
Within this directory, a unique subfolder will be created each time features are
extracted. This allows several versions of extracted features on the same dataset
without overwriting files.
perc_train : float
The amount of data to be set aside for train data. The rest will be divided into
validation and test datasets.
ignore_label_marker : str
A string to look for in the labels if the "label" should not be included.
For example, '__' to ignore a subdirectory titled "__noise" or "not__label".
kwargs : additional keyword arguments
Keyword arguments for `soundpy.feats.save_features_datasets` and
`soundpy.feats.get_feats`.
Returns
-------
feat_extraction_dir : pathlib.PosixPath
The pathway to where all feature extraction files can be found, including datasets.
See Also
--------
soundpy.feats.get_feats
Extract features from audio file or audio data.
soundpy.feats.save_features_datasets
Preparation of acoustic features in train, validation and test datasets.
'''
if data_features_dir is None:
data_features_dir = './audiodata/example_feats_models/envclassifier/'
feat_extraction_dir = 'features_' + sp.utils.get_date()
# collect labels
labels = []
data_dir = sp.utils.string2pathlib(data_dir)
for label in data_dir.glob('*/'):
if label.suffix:
# avoid adding unwanted files in the directory
# want only directory names
continue
if ignore_label_marker is not None:
if ignore_label_marker in label.stem:
continue
# ignores hidden directories
if label.stem[0] == '.':
continue
labels.append(label.stem)
labels = set(labels)
if len(labels) == 0:
raise ValueError('No subdirectories found to offer as labels. Ensure this path'+\
' contains the data:\n{}'.format(data_dir))
# create paths for what we need to save:
data_features_dir = sp.utils.check_dir(data_features_dir)
feat_extraction_dir = data_features_dir.joinpath(feat_extraction_dir)
feat_extraction_dir = sp.utils.check_dir(feat_extraction_dir, make=True)
# dictionaries containing encoding and decoding labels:
dict_encode_path = feat_extraction_dir.joinpath('dict_encode.csv')
dict_decode_path = feat_extraction_dir.joinpath('dict_decode.csv')
# dictionary for which audio paths are assigned to which labels:
dict_encdodedlabel2audio_path = feat_extraction_dir.joinpath('dict_encdodedlabel2audio.csv')
# designate where to save train, val, and test data
data_train_path = feat_extraction_dir.joinpath('{}_data.npy'.format('train'))
data_val_path = feat_extraction_dir.joinpath('{}_data.npy'.format('val'))
data_test_path = feat_extraction_dir.joinpath('{}_data.npy'.format('test'))
# create and save encoding/decoding labels dicts
dict_encode, dict_decode = sp.datasets.create_dicts_labelsencoded(labels)
dict_encode_path = sp.utils.save_dict(
filename = dict_encode_path,
dict2save = dict_encode,
overwrite=False)
dict_decode_path = sp.utils.save_dict(
filename = dict_decode_path,
dict2save = dict_encode,
overwrite=False)
# save audio paths to each label in dict
paths_list = sp.files.collect_audiofiles(data_dir, recursive=True)
paths_list = sorted(paths_list)
dict_encodedlabel2audio = sp.datasets.create_encodedlabel2audio_dict(dict_encode,
paths_list)
dict_encdodedlabel2audio_path = sp.utils.save_dict(
dict2save = dict_encodedlabel2audio,
filename = dict_encdodedlabel2audio_path,
overwrite = False)
# assign audiofiles into train, validation, and test datasets
train, val, test = sp.datasets.audio2datasets(
dict_encdodedlabel2audio_path,
perc_train = perc_train,
limit = None,
seed = 40)
# save audiofiles for each dataset to dict and save
dataset_dict = dict([('train',train),('val', val),('test',test)])
dataset_dict_path = feat_extraction_dir.joinpath('dataset_audiofiles.csv')
dataset_dict_path = sp.utils.save_dict(
dict2save = dataset_dict,
filename = dataset_dict_path,
overwrite=True)
# save paths to where extracted features of each dataset will be saved to dict w same keys
datasets_path2save_dict = dict([('train',data_train_path),
('val', data_val_path),
('test',data_test_path)])
# extract features
start = time.time()
dataset_dict, datasets_path2save_dict = sp.feats.save_features_datasets(
datasets_dict = dataset_dict,
datasets_path2save_dict = datasets_path2save_dict,
labeled_data = True,
decode_dict = dict_decode,
**kwargs)
end = time.time()
total_dur_sec = end-start
total_dur, units = sp.utils.adjust_time_units(total_dur_sec)
print('\nFinished! Total duration: {} {}.'.format(round(total_dur,2), units))
# save which audiofiles were extracted for each dataset
# save where extracted data were saved
# save how long feature extraction took
dataprep_settings = dict(dataset_dict = dataset_dict,
datasets_path2save_dict = datasets_path2save_dict,
total_dur_sec = total_dur_sec)
dataprep_settings_path = sp.utils.save_dict(
dict2save = dataprep_settings,
filename = feat_extraction_dir.joinpath('dataset_audio_assignments.csv'))
return feat_extraction_dir
[docs]def denoiser_feats(
data_clean_dir,
data_noisy_dir,
data_features_dir = None,
limit = None,
perc_train = 0.8,
**kwargs):
'''Autoencoder Denoiser: feature extraction of clean & noisy audio into train, val, & test datasets.
Saves extracted feature datasets (train, val, test datasets) as well as
feature extraction settings in the directory `data_features_dir`.
Parameters
----------
data_clean_dir : str or pathlib.PosixPath
The directory with clean audio files.
data_noisy_dir : str or pathlib.PosixPath
The directory with noisy audio files. These should be the same as the clean audio,
except noise has been added.
data_features_dir : str or pathlib.PosixPath, optional
The directory where feature extraction related to the dataset will be stored.
Within this directory, a unique subfolder will be created each time features are
extracted. This allows several versions of extracted features on the same dataset
without overwriting files.
limit : int, optional
The limit of audio files for feature extraction. (default None)
kwargs : additional keyword arguments
Keyword arguments for `soundpy.feats.save_features_datasets` and
`soundpy.feats.get_feats`.
Returns
-------
feat_extraction_dir : pathlib.PosixPath
The pathway to where all feature extraction files can be found, including datasets.
See Also
--------
soundpy.datasets.create_denoise_data
Applies noise at specified SNR levels to clean audio files.
soundpy.feats.get_feats
Extract features from audio file or audio data.
soundpy.feats.save_features_datasets
Preparation of acoustic features in train, validation and test datasets.
'''
if data_features_dir is None:
data_features_dir = './audiodata/example_feats_models/denoiser/'
# create unique directory for feature extraction session:
feat_extraction_dir = 'features_' + sp.utils.get_date()
# 1) Ensure clean and noisy data directories exist
audio_clean_path = sp.utils.check_dir(data_clean_dir, make=False)
audio_noisy_path = sp.utils.check_dir(data_noisy_dir, make=False)
# 2) create paths for what we need to save:
denoise_data_path = sp.utils.check_dir(data_features_dir, make=True)
feat_extraction_dir = denoise_data_path.joinpath(feat_extraction_dir)
feat_extraction_dir = sp.utils.check_dir(feat_extraction_dir, make=True)
# Noisy and clean train, val, and test data paths:
data_train_noisy_path = feat_extraction_dir.joinpath('{}_data_{}.npy'.format('train',
'noisy'))
data_val_noisy_path = feat_extraction_dir.joinpath('{}_data_{}.npy'.format('val',
'noisy'))
data_test_noisy_path = feat_extraction_dir.joinpath('{}_data_{}.npy'.format('test',
'noisy'))
data_train_clean_path = feat_extraction_dir.joinpath('{}_data_{}.npy'.format('train',
'clean'))
data_val_clean_path = feat_extraction_dir.joinpath('{}_data_{}.npy'.format('val',
'clean'))
data_test_clean_path = feat_extraction_dir.joinpath('{}_data_{}.npy'.format('test',
'clean'))
# 3) collect audiofiles and divide them into train, val, and test datasets
# noisy data
noisyaudio = sp.files.collect_audiofiles(audio_noisy_path,
hidden_files = False,
wav_only = False,
recursive = False)
# sort audio (can compare if noisy and clean datasets are compatible)
noisyaudio = sorted(noisyaudio)
if limit is not None:
noisyaudio = noisyaudio[:limit]
# clean data
cleanaudio = sp.files.collect_audiofiles(audio_clean_path,
hidden_files = False,
wav_only = False,
recursive = False)
cleanaudio = sorted(cleanaudio)
if limit is not None:
cleanaudio = cleanaudio[:limit]
# check if they match up: (expects clean file name to be in noisy file name)
for i, audiofile in enumerate(noisyaudio):
if not sp.utils.check_noisy_clean_match(audiofile, cleanaudio[i]):
raise ValueError('The noisy and clean audio datasets do not appear to match.')
# save collected audiofiles for noisy and clean datasets to dictionary
noisy_audio_dict = dict([('noisy', noisyaudio)])
clean_audio_dict = dict([('clean', cleanaudio)])
noisy_audio_dict_path = feat_extraction_dir.joinpath('noisy_audio.csv')
noisy_audio_dict_path = sp.utils.save_dict(
dict2save = noisy_audio_dict,
filename = noisy_audio_dict_path,
overwrite=False)
clean_audio_dict_path = feat_extraction_dir.joinpath('clean_audio.csv')
clean_audio_dict_path = sp.utils.save_dict(
dict2save = clean_audio_dict,
filename = clean_audio_dict_path,
overwrite=False)
# separate into datasets
train_noisy, val_noisy, test_noisy = sp.datasets.audio2datasets(
noisy_audio_dict_path, perc_train = perc_train, seed=40)
train_clean, val_clean, test_clean = sp.datasets.audio2datasets(
clean_audio_dict_path,
perc_train = perc_train,
seed=40)
# save train, val, test dataset assignments to dict
dataset_dict_noisy = dict([('train', train_noisy),('val', val_noisy),('test', test_noisy)])
dataset_dict_clean = dict([('train', train_clean),('val', val_clean),('test', test_clean)])
# keep track of paths to save data
dataset_paths_noisy_dict = dict([('train',data_train_noisy_path),
('val', data_val_noisy_path),
('test',data_test_noisy_path)])
dataset_paths_clean_dict = dict([('train',data_train_clean_path),
('val', data_val_clean_path),
('test',data_test_clean_path)])
path2_noisy_datasets = feat_extraction_dir.joinpath('audiofiles_datasets_noisy.csv')
path2_clean_datasets = feat_extraction_dir.joinpath('audiofiles_datasets_clean.csv')
# save dicts to .csv files
path2_noisy_datasets = sp.utils.save_dict(
dict2save = dataset_dict_noisy,
filename = path2_noisy_datasets,
overwrite=False)
path2_clean_datasets = sp.utils.save_dict(
dict2save = dataset_dict_clean,
filename = path2_clean_datasets,
overwrite=False)
# 5) extract features
# ensure the noisy and clean values match up:
for key, value in dataset_dict_noisy.items():
for j, audiofile in enumerate(value):
if not sp.utils.check_noisy_clean_match(audiofile,
dataset_dict_clean[key][j]):
raise ValueError('There is a mismatch between noisy and clean audio. '+\
'\nThe noisy file:\n{}'.format(dataset_dict_noisy[key][i])+\
'\ndoes not seem to match the clean file:\n{}'.format(audiofile))
start = time.time()
# first clean data
dataset_dict_clean, dataset_paths_clean_dict = sp.feats.save_features_datasets(
datasets_dict = dataset_dict_clean,
datasets_path2save_dict = dataset_paths_clean_dict,
**kwargs)
# then noisy data
dataset_dict_noisy, dataset_paths_noisy_dict = sp.feats.save_features_datasets(
datasets_dict = dataset_dict_noisy,
datasets_path2save_dict = dataset_paths_noisy_dict,
**kwargs)
end = time.time()
total_dur_sec = round(end-start,2)
total_dur, units = sp.utils.adjust_time_units(total_dur_sec)
print('\nFinished! Total duration: {} {}.'.format(total_dur, units))
# save which audiofiles were extracted for each dataset
# save where extracted data were saved
# save total duration of feature extraction
dataprep_settings = dict(dataset_dict_noisy = dataset_dict_noisy,
dataset_paths_noisy_dict = dataset_paths_noisy_dict,
dataset_dict_clean = dataset_dict_clean,
dataset_paths_clean_dict = dataset_paths_clean_dict,
total_dur_sec = total_dur_sec,
limit = limit,
perc_train = perc_train)
dataprep_settings_path = sp.utils.save_dict(
dict2save = dataprep_settings,
filename = feat_extraction_dir.joinpath('dataset_audio_assignments.csv'))
return feat_extraction_dir