'''The datasets module contains functions related to organizing datasets.
'''
import numpy as np
import random
import collections
import math
import pathlib
from scipy.io.wavfile import write, read
from scipy.signal import resample
import soundfile as sf
import librosa
import os, sys
import inspect
currentdir = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))
packagedir = os.path.dirname(currentdir)
sys.path.insert(0, packagedir)
import soundpy as sp
###############################################################################
[docs]def create_encodedlabel2audio_dict(dict_encodelabels, paths_list, limit=None, seed=40):
'''Creates dictionary with audio labels as keys and filename lists as values.
If no label is found in the filename path, the label is not included
in the returned dictionary: labels are only included if corresponding
paths are present.
Parameters
----------
dict_encodelabels : dict
Dictionary containing the labels as keys and their encoded values as values.
paths_list : set, list
List containing pathlib.PosixPath objects (i.e. paths) of all audio
files; expected the audio files reside in directories with names
matching their audio class
limit : int, optional
The integer indicating a limit to number of audiofiles to each class. This
may be useful if one wants to ensure a balanced dataset (default None)
seed : int, optional
The seed for pseudorandomizing the wavfiles, if a limit is requested.
If `seed` is set to None, the randomized order of the limited wavfiles cannot
be repeated. (default 40)
Returns
-------
label_waves_dict : OrderedDict
A dictionary with encoded audio labels as keys with values being the audio files
corresponding to that label
TODO update:
Examples
--------
>>> from pathlib import Path
>>> labels = dict([('vacuum',2),('fridge',0),('wind',1)])
>>> paths = [Path('data/audio/vacuum/vacuum1.wav'),
... Path('data/audio/fridge/fridge1.wav'),
... Path('data/audio/vacuum/vacuum2.wav'),
... Path('data/audio/wind/wind1.wav')]
>>> label_waves_dict = create_encodedlabel2audio_dict(labels, paths)
>>> label_waves_dict
OrderedDict([(0, [PosixPath('data/audio/fridge/fridge1.wav')]), \
(2, [PosixPath('data/audio/vacuum/vacuum1.wav'), \
PosixPath('data/audio/vacuum/vacuum2.wav')]), \
(1, [PosixPath('data/audio/wind/wind1.wav')])])
>>> #to set a limit on number of audiofiles per class:
>>> create_encodedlabel2audio_dict(labels, paths, limit=1, seed=40)
OrderedDict([(0, [PosixPath('data/audio/fridge/fridge1.wav')]), \
(2, [PosixPath('data/audio/vacuum/vacuum2.wav')]), \
(1, [PosixPath('data/audio/wind/wind1.wav')])])
>>> #change the limited pathways chosen:
>>> create_encodedlabel2audio_dict(labels, paths, limit=1, seed=10)
OrderedDict([(0, [PosixPath('data/audio/fridge/fridge1.wav')]), \
(2, [PosixPath('data/audio/vacuum/vacuum1.wav')]), \
(1, [PosixPath('data/audio/wind/wind1.wav')])])
'''
if not isinstance(dict_encodelabels, dict):
raise TypeError(
'Expected dict_encodelabels to be type dict, not type {}'.format(type(
dict_encodelabels)))
if not isinstance(paths_list, set) and not isinstance(paths_list, list):
raise TypeError(
'Expected paths list as type set or list, not type {}'.format(type(
paths_list)))
label_waves_dict = collections.OrderedDict()
# get labels from dict_encodelabels:
labels_set = set(list(dict_encodelabels.keys()))
for label in sorted(labels_set):
# expects folder name to in pathway to be the same as label
label_folder = pathlib.Path('/'+label+'/')
label_paths = [path for path in paths_list if str(label_folder).lower() \
in str(path).lower()]
if label_paths:
if isinstance(limit, int):
if seed:
np.random.seed(seed=seed)
rand_idx = np.random.choice(range(len(label_paths)),
len(label_paths),
replace=False)
paths_idx = rand_idx[:limit]
label_paths = list(np.array(label_paths)[paths_idx])
# encode label in the label_waves_dict
label_waves_dict[dict_encodelabels[label]] = sorted(label_paths)
if not label_waves_dict:
raise ValueError('No matching labels found in paths list.')
return label_waves_dict
[docs]def create_dicts_labelsencoded(labels_class, add_extra_label = False, extra_label='silence'):
'''Encodes audio class labels and saves in dictionaries.
The labels are alphabetized and encoded under their index. If `add_extra_label`,
the `extra_label` is added as the last entry in the dictionary.
This is useful if performing voice activity and want to label
non-voice activated sections as silent rather than as some speech label.
Parameters
----------
labels_class : set, list
Set or list containing the labels of all audio classes.
add_extra_label : bool
If True, `extra_label` added to dictionary.
extra_label : str
The extra label to add. (default 'silence').
Returns
-------
dict_label2int : dict
Dictionary where the keys are the string labels and the
values are the encoded integers
dict_int2label : dict
Dictionary where the keys are the encoded integers and the
values are the string labels
Examples
--------
>>> labels = {'wind','air_conditioner','fridge'}
>>> label2int, int2label = create_dicts_labelsencoded(labels)
>>> label2int
{'air_conditioner': 0, 'fridge': 1, 'wind': 2}
>>> int2label
{0: 'air_conditioner', 1: 'fridge', 2: 'wind'}
'''
if not isinstance(labels_class, set) and not isinstance(labels_class, list):
raise TypeError(
'Expected inputs as type set or list, not type {}'.format(type(
labels_class)))
labels_sorted = sorted(set(labels_class))
dict_label2int = {}
dict_int2label = {}
for i, label in enumerate(labels_sorted):
dict_label2int[label] = i
dict_int2label[i] = label
if add_extra_label:
dict_label2int[extra_label] = i + 1
dict_int2label[i+1] = extra_label
return dict_label2int, dict_int2label
# TODO change name to audiolist2dataset?
[docs]def waves2dataset(audiolist, perc_train=0.8, seed=40, train=True, val=True, test=True):
'''Organizes audio files list into train, validation and test datasets.
If only two or one dataset is to be prepared, they will be assigned to train and
val or simply to train, respectively. The remaining 'datasets' will remain empty.
Parameters
----------
audiolist : list
List containing paths to audio files
perc_train : float, int
Percentage of data to be in the training dataset (default 0.8)
seed : int, None, optional
Set seed for the generation of pseudorandom train, validation,
and test datsets. Useful for reproducing results. (default 40)
train : bool
If True, assumed the training data will be prepared. (default True)
val : bool
If True, assumed validation data will be prepared. (default True)
test : bool
If True, assumed test data will be prepared. (default True)
Returns
-------
train_waves : list
List of audio files for the training dataset
val_waves : list
List of audio files for the validation dataset
test_waves : list
List of audio files for the test dataset
Examples
--------
>>> #Using a list of numbers instead of filenames
>>> audiolist = [1,2,3,4,5,6,7,8,9,10]
>>> #default settings:
>>> waves2dataset(audiolist)
([5, 4, 9, 2, 3, 10, 1, 6], [8], [7])
>>> #perc_train set to 50% instead of 80%:
>>> waves2dataset(audiolist, perc_train=50)
([5, 4, 9, 2, 3, 10], [1, 6], [8, 7])
>>> #change seed number
>>> waves2dataset(audiolist, seed=0)
([7, 1, 2, 5, 6, 9, 10, 8], [4], [3])
'''
if seed == 0:
raise ValueError('Seed equals 0. This will result in unreliable '+\
'randomization. Either set `seed` to None or to another integer.')
# set the dataset assignments to strings
if isinstance(train, bool) and train:
train = 'train'
if isinstance(val, bool) and val:
val = 'val'
if isinstance(test, bool) and test:
test = 'test'
# ensure percent train is between 0 and 1
if perc_train > 1:
perc_train /= 100.
if perc_train > 1:
raise ValueError('The percentage value of train data exceeds 100%')
# assign amount of data for train, validation, and test datasets
# three datasets
if train and val and test:
num_datasets = 3
perc_valtest = (1-perc_train)/2.
if perc_valtest*2 > perc_train:
raise ValueError(
'The percentage of train data is too small: {}\
\nPlease check your values.'.format(
perc_train))
# only two
elif train and val or train and test or val and test:
num_datasets = 2
perc_valtest = 1-perc_train
# only one
else:
print('Only one dataset to be prepared.')
num_datasets = 1
perc_valtest = 0
perc_train = 1.
# assign empty datasets to train, train and val, for this function
if train:
pass
if val:
if not train:
train = val
val = ''
if test:
if not train:
train = test
test = ''
elif not val:
val = test
test = ''
num_waves = len(audiolist)
num_train = int(num_waves * perc_train)
num_val_test = int(num_waves * perc_valtest)
if num_datasets > 1 and num_val_test < num_datasets-1:
while num_val_test < num_datasets-1:
num_val_test += 1
num_train -= 1
if num_val_test == num_datasets-1:
break
if num_datasets == 3 and num_train + 2*num_val_test < num_waves:
diff = num_waves - num_train - 2*num_val_test
num_train += diff
elif num_datasets == 1 and num_train < num_waves:
num_train = num_waves
if seed:
np.random.seed(seed=seed)
rand_idx = np.random.choice(range(num_waves),
num_waves,
replace=False)
train_idx = rand_idx[:num_train]
val_test_idx = rand_idx[num_train:]
if num_datasets == 3:
val_idx = val_test_idx[:num_val_test]
test_idx = val_test_idx[num_val_test:]
elif num_datasets == 2:
val_idx = val_test_idx
test_idx = []
else:
val_idx = val_test_idx # should be empty
test_idx = val_test_idx # should be empty
train_waves = list(np.array(audiolist)[train_idx])
val_waves = list(np.array(audiolist)[val_idx])
test_waves = list(np.array(audiolist)[test_idx])
try:
assert len(train_waves)+len(val_waves)+len(test_waves) == len(audiolist)
except AssertionError:
print('mismatch lengths:')
print(len(train_waves))
print(len(val_waves))
print(len(test_waves))
print(test_waves)
print(len(audiolist))
return train_waves, val_waves, test_waves
# TODO rename to audioclasses2datasets?
[docs]def audio2datasets(audiodata, perc_train=0.8, limit=None, seed = None,
audio_only = True, **kwargs):
'''Organizes all audio in audio class directories into datasets (randomized).
The validation and test datasets are halved between what isn't train data. For
example, if `perc_train` is 0.8, validation data will be 0.1 and test data
will be 0.1.
Parameters
----------
audiodata : str, pathlib.PosixPath, dict, list, or set
If data has multiple labels, path to the dictionary where audio class
labels and the paths of all audio files belonging to each class are or will
be stored. The dictionary with the labels and their encoded values
can also directly supplied here. If the data does not have labels, a list or
set of audiofiles can be provided to be placed in train, val, and test datasets.
seed : int, optional
A value to allow random order of audiofiles to be predictable.
(default None). If None, the order of audiofiles will not be predictable.
audio_only : bool
If audio files are expected (e.g. extensions of .wav, .flac etc.) or not.
If True, list will be checked to contain only audio files. Otherwise not.
(default True)
**kwargs : additional keyword arguments
Keyword arguments for soundpy.datasets.waves2dataset
Returns
-------
dataset_audio : tuple
Named tuple including three lists / datasets of audiofiles or
label-audiofile pairs: the train, validation, and test lists, respectively.
The label-audiofile pairs are saved as tuples within the lists and contain
the encoded label integer (e.g. 0 instead of 'air_conditioner') and the
audio paths associated to that class and dataset.
Raises
------
ValueError
If `perc_train` is set too high for the amount of data or there are
simply too few data. Specifically, if the percentage of train data cannot
be upheld while also ensuring the validation and test datasets have more
than 1 sample.
'''
if seed == 0:
raise ValueError('Seed equals 0. This will result in unreliable '+\
'randomization. Either set `seed` to None or to another integer.')
if isinstance(audiodata, dict) or isinstance(audiodata, list) or \
isinstance(audiodata, set):
waves = audiodata
else:
# it is a string or pathlib.PosixPath
waves = sp.utils.load_dict(audiodata)
if isinstance(waves, list) or isinstance(waves,set) or len(waves) == 1:
multiple_labels = False
else:
multiple_labels = True
count = 0
row = 0
train_list = []
val_list = []
test_list = []
if multiple_labels:
for key, value in waves.items():
if isinstance(value, str):
audiolist = sp.utils.restore_dictvalue(value)
if audio_only:
# check to make sure all audiofiles and none were lost
audiolist = sp.files.ensure_only_audiofiles(audiolist)
key = int(key)
else:
audiolist = value
train_waves, val_waves, test_waves = waves2dataset(sorted(audiolist), seed=seed,
**kwargs)
for i, wave in enumerate(train_waves):
train_list.append(tuple([key, wave]))
for i, wave in enumerate(val_waves):
val_list.append(tuple([key, wave]))
for i, wave in enumerate(test_waves):
test_list.append(tuple([key, wave]))
else:
# data has all same label, can be in a simple list, not paired with a label
if isinstance(waves, dict):
for i, key in enumerate(waves):
if i >= 1:
raise ValueError('Expected only 1 key, not {}.'.format(len(waves)))
audiolist = waves[key]
if isinstance(audiolist, str):
# check to make sure all audiofiles and none were lost
audiolist = sp.utils.restore_dictvalue(audiolist)
if audio_only:
audiolist = sp.files.ensure_only_audiofiles(audiolist)
else:
audiolist = waves
# sort to ensure a consistent order of audio; otherwise cannot control randomization
train_waves, val_waves, test_waves = waves2dataset(sorted(audiolist), seed=seed,
**kwargs)
for i, wave in enumerate(train_waves):
train_list.append(wave)
for i, wave in enumerate(val_waves):
val_list.append(wave)
for i, wave in enumerate(test_waves):
test_list.append(wave)
# be sure the classes are not in any certain order
if seed is not None:
random.seed(seed)
random.shuffle(train_list)
if seed is not None:
random.seed(seed)
random.shuffle(val_list)
if seed is not None:
random.seed(seed)
random.shuffle(test_list)
if limit is not None:
num_train = limit * perc_train
num_val = limit * (1-perc_train) // 2
num_test = limit * (1-perc_train) // 2
train_list = train_list[:int(num_train)]
val_list = val_list[:int(num_val)+1]
test_list = test_list[:int(num_test)+1]
# esure the number of training data is 80% of all available audiodata:
if len(train_list) < math.ceil((len(train_list)+len(val_list)+len(test_list))*perc_train):
print('perc train', perc_train)
raise sp.errors.notsufficientdata_error(len(train_list),
len(val_list),
len(test_list),
math.ceil(
(len(train_list)+len(val_list)+len(test_list))*perc_train))
TrainingData = collections.namedtuple('TrainingData',
['train_data', 'val_data', 'test_data'])
dataset_audio = TrainingData(
train_data = train_list, val_data = val_list, test_data = test_list)
return dataset_audio
[docs]def separate_train_val_test_files(list_of_files):
'''Checks that file(s) exist, then sorts file(s) into train, val, test lists.
If 'nois' or 'clean' are in the filenames, two paths lists per dataset
will be generated. Otherwise just one. This paths list is useful if there are multiple
training files available for training a model (e.g. for large datasets).
Parameters
----------
list_of_files : list, str, or pathlib.PosixPath
The feature files (format: .npy) for training a model.
Returns
-------
(train_paths_list, val_paths_list, test_paths_list) : tuple
Tuple comprised of paths lists to train, validation, and test data files.
If noisy and clean data files found, each tuple item will be a tuple comprised of two lists: a noisy file paths list and a clean file paths list.
Examples
--------
>>> features_files = ['train1.npy', 'train2.npy', 'val.npy', 'test.npy']
>>> datasets = separate_train_val_test_files(features_files)
>>> datasets.train
[PosixPath('train1.npy'), PosixPath('train2.npy')]
>>> datasets.val
[PosixPath('val.npy')]
>>> datasets.test
[PosixPath('test.npy')]
>>> # try with noisy and clean data
>>> features_files = ['train_noisy.npy', 'train_clean.npy', 'val_noisy.npy', \
'val_clean.npy', 'test_noisy.npy', 'test_clean.npy']
>>> datasets = separate_train_val_test_files(features_files)
>>> datasets.train.noisy
[PosixPath('train_noisy.npy')]
>>> datasets.train.clean
[PosixPath('train_clean.npy')]
>>> datasets.val.noisy
[PosixPath('val_noisy.npy')]
>>> datasets.val.clean
[PosixPath('val_clean.npy')]
>>> datasets.test.noisy
[PosixPath('test_noisy.npy')]
>>> datasets.test.clean
[PosixPath('test_clean.npy')]
'''
train_data_input = []
train_data_output = []
val_data_input = []
val_data_output = []
test_data_input = []
test_data_output = []
if isinstance(list_of_files, str) or isinstance(list_of_files, pathlib.PosixPath):
list_of_files = list(list_of_files)
for f in list_of_files:
if isinstance(f, str):
f = pathlib.Path(f)
# make sure data files exists:
if not os.path.exists(f):
raise FileNotFoundError('Feature file {} not found.'.format(f))
if 'train' in f.stem:
if 'nois' in f.stem:
train_data_input.append(f)
elif 'clean' in f.stem:
train_data_output.append(f)
else:
# non noisy vs clean data
train_data_input.append(f)
elif 'val' in f.stem:
if 'nois' in f.stem:
val_data_input.append(f)
elif 'clean' in f.stem:
val_data_output.append(f)
else:
# non noisy vs clean data
val_data_input.append(f)
elif 'test' in f.stem:
if 'nois' in f.stem:
test_data_input.append(f)
elif 'clean' in f.stem:
test_data_output.append(f)
else:
# non noisy vs clean data
test_data_input.append(f)
TrainingData = collections.namedtuple('TrainingData',
['train', 'val', 'test'])
NoisyCleanData = collections.namedtuple('NoisyCleanData',
['noisy', 'clean'])
if train_data_output:
train_paths_list = NoisyCleanData(noisy = train_data_input,
clean = train_data_output)
else:
train_paths_list = train_data_input
if val_data_output:
val_paths_list = NoisyCleanData(noisy = val_data_input,
clean = val_data_output)
else:
val_paths_list = val_data_input
if test_data_output:
test_paths_list = NoisyCleanData(noisy = test_data_input,
clean = test_data_output)
else:
test_paths_list = test_data_input
return TrainingData(train = train_paths_list,
val = val_paths_list,
test = test_paths_list)
[docs]def section_data(dataset_dict, dataset_paths_dict, divide_factor=None):
'''Expects keys of these two dictionaries to match
Examples
--------
>>> import pathlib
>>> # train is longer than val and test
>>> d = {'train': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],\
'val': [1, 2, 3, 4, 5],\
'test': [1, 2, 3, 4, 5]}
>>> # dictionary: paths to where extracted data will be saved
>>> dp = {'train': pathlib.PosixPath('train_data.npy'),\
'val': pathlib.PosixPath('val_data.npy'),\
'test': pathlib.PosixPath('test_data.npy')}
>>> d2, dp2 = section_data(d, dp, divide_factor = 3)
>>> # val and train not touched (too small)
>>> d2
{'train__1': [1, 2, 3, 4, 5], \
'train__2': [6, 7, 8, 9, 10], \
'train__3': [11, 12, 13, 14, 15], \
'val': [1, 2, 3, 4, 5], \
'test': [1, 2, 3, 4, 5]}
>>> dp2
{'train__1': PosixPath('train_data__1.npy'), \
'train__2': PosixPath('train_data__2.npy'), \
'train__3': PosixPath('train_data__3.npy'), \
'val': PosixPath('val_data.npy'), \
'test': PosixPath('test_data.npy')}
>>> # repeat: now val and test as long as train
>>> # default divide_factor is 2
>>> d3, dp3 = section_data(d2, dp2)
>>> d3
{'train__1': [1, 2], \
'train__2': [3, 4, 5], \
'train__3': [6, 7], \
'train__4': [8, 9, 10], \
'train__5': [11, 12], \
'train__6': [13, 14, 15], \
'val__1': [1, 2], \
'val__2': [3, 4, 5], \
'test__1': [1, 2], \
'test__2': [3, 4, 5]}
>>> dp3
{'train__1': PosixPath('train_data__1.npy'), \
'train__2': PosixPath('train_data__2.npy'), \
'train__3': PosixPath('train_data__3.npy'), \
'train__4': PosixPath('train_data__4.npy'), \
'train__5': PosixPath('train_data__5.npy'), \
'train__6': PosixPath('train_data__6.npy'), \
'val__1': PosixPath('val_data__1.npy'), \
'val__2': PosixPath('val_data__2.npy'), \
'test__1': PosixPath('test_data__1.npy'), \
'test__2': PosixPath('test_data__2.npy')}
'''
if divide_factor is None:
divide_factor = 2
# find max length:
maxlen = 0
for key, value in dataset_dict.items():
if len(value) > maxlen:
maxlen = len(value)
# the length the maximum list will have
# if other value lists are shorter, don't need to be sectioned.
new_max_len = int(maxlen/divide_factor)
try:
new_key_list = []
updated_dataset_dict = dict()
updated_dataset_paths_dict = dict()
for key, value in dataset_dict.items():
if len(value) <= new_max_len:
updated_dataset_dict[key] = dataset_dict[key]
updated_dataset_paths_dict[key] = dataset_paths_dict[key]
else:
# don't need to divide smaller datasets more than necessary
curr_divide_factor = 2
while True:
if len(value)//curr_divide_factor > new_max_len:
curr_divide_factor += 1
else:
break
# separate value into sections
divided_values = {}
len_new_values = int(len(value)/curr_divide_factor)
if len_new_values < 1:
len_new_values = 1
index = 0
for i in range(curr_divide_factor):
if i == curr_divide_factor - 1:
# to ensure all values are included
vals = value[index:]
else:
vals = value[index:index+len_new_values]
divided_values[i] = vals
index += len_new_values
# assign new keys for each divided section of data
divided_values_keys = {}
# assign new paths to each section of data:
divided_values_paths = {}
path_orig = dataset_paths_dict[key]
for i in range(curr_divide_factor):
if not key[-1].isdigit():
key_stem = key
version_num = 1
new_key = key_stem + '__' + str(version_num)
else:
key_stem, version_num = key.split('__')
version_num = int(version_num)
new_key = key
unique_key = False
while unique_key is False:
if new_key not in new_key_list:
unique_key = True
new_key_list.append(new_key)
divided_values_keys[i] = new_key
break
else:
version_num += 1
new_key = key_stem + '__'+ str(version_num)
if not isinstance(path_orig, pathlib.PosixPath):
path_orig = pathlib.Path(path_orig)
# convert to pathlib.PosixPath
dataset_paths_dict[key] = path_orig
stem_orig = path_orig.stem
if stem_orig[-1].isdigit():
stem, vers = stem_orig.split('__')
else:
stem = stem_orig
new_stem = stem + '__' + str(version_num)
new_path = path_orig.parent.joinpath(new_stem+path_orig.suffix)
divided_values_paths[i] = new_path
# apply newly divided data and keys to new dictionaries
for i in range(curr_divide_factor):
# only if the list of divided values has values in it
if len(divided_values[i]) > 0:
new_key = divided_values_keys[i]
updated_dataset_dict[new_key] = divided_values[i]
updated_dataset_paths_dict[new_key] = divided_values_paths[i]
except ValueError:
raise ValueError('Expect only one instance of "__" to '+\
'be in the dictionary keys. Multiple found.')
return updated_dataset_dict, updated_dataset_paths_dict
if __name__ == '__main__':
import doctest
doctest.testmod()