--- a +++ b/cnnmodel/feature_extraction/sample_genration.py @@ -0,0 +1,144 @@ +import sys +import os +import numpy as np +import librosa +import multiprocessing as mp + + +from util import LRU +from cnnmodel.feature_extraction import mfcc_extraction +from cnnmodel.feature_extraction import non_mfcc_extraction + +OPTIMAL_DURATION = 0.115 # we use a frame width of .025 s with stride of .010 s. duration = 0.115 will have 10 frames + + +class Phoneme: + def __init__(self, path, id_, word, phoneme): + self.path = path + self.id_ = id_ + self.word = word + self.phoneme = phoneme + + +class SampleExtraction: + def __init__(self, wav_root, alignment_file, out_dir): + self.wav_root = wav_root + self.alignment_file = alignment_file + self.out_dir = out_dir + + self.pool = mp.Pool(mp.cpu_count()) + self.make_directories() + + def make_directories(self): + os.makedirs(self.out_dir + '/0', exist_ok=True) + os.makedirs(self.out_dir + '/1', exist_ok=True) + os.makedirs(self.out_dir + '/2', exist_ok=True) + print('Created directories for each label in path: {}'.format(self.out_dir)) + + def get_phoneme_features(self, index, n, vowel_phonemes, features_cache): + # if out of bound then + if index < 0 or index >= n: + return np.zeros(shape=(1, 13, 30), dtype=np.float32), np.zeros(6, dtype=np.float32) + + phoneme = vowel_phonemes[index] + + if phoneme not in features_cache: + signal, samplerate = librosa.load(self.wav_root + '/' + phoneme.path, sr=None) + optimal_signal_len = int(samplerate * OPTIMAL_DURATION) + + signal_len = len(signal) + excess = signal_len - optimal_signal_len + left_pad = abs(excess // 2) + right_pad = abs(excess) - left_pad + + if signal_len > optimal_signal_len: + signal_mfcc = signal[left_pad:-right_pad] + + elif signal_len < optimal_signal_len: + signal_mfcc = np.concatenate([np.zeros(left_pad), signal, np.zeros(right_pad)], axis=0) + else: + signal_mfcc = signal + + # extract MFCC features, should be a matrix of shape (1, 13, 30) + mfcc_features = mfcc_extraction.get_mfcc(signal_mfcc, samplerate) + # returned np array is of shape (13, 30), add a new channel axis + mfcc_features = mfcc_features[np.newaxis, :, :] + + # extract non MFCC features, should be a vector of shape (6,) + non_mfcc_features = non_mfcc_extraction.get_non_mfcc(signal, samplerate) + + features_cache[phoneme] = (mfcc_features, non_mfcc_features) + + return features_cache[phoneme] + + def generate_samples(self, vowel_phonemes): + n = len(vowel_phonemes) + features_cache = LRU(size=5) + for i in range(n): + phoneme = vowel_phonemes[i] + label = phoneme.phoneme[-1] + + pre_mfcc, pre_non_mfcc = self.get_phoneme_features(i - 1, n, vowel_phonemes, features_cache) + anchor_mfcc, anchor_non_mfcc = self.get_phoneme_features(i, n, vowel_phonemes, features_cache) + suc_mfcc, suc_non_mfcc = self.get_phoneme_features(i + 1, n, vowel_phonemes, features_cache) + + mfcc_tensor = np.concatenate([pre_mfcc, anchor_mfcc, suc_mfcc], axis=0) + non_mfcc_vector = np.concatenate([pre_non_mfcc, anchor_non_mfcc, suc_non_mfcc], axis=0) + file_name = phoneme.id_ + '_' + phoneme.word + '_' + phoneme.phoneme + np.save(self.out_dir + '/' + label + '/' + file_name + '_mfcc.npy', mfcc_tensor) + np.save(self.out_dir + '/' + label + '/' + file_name + '_other.npy', non_mfcc_vector) + + print('finished writing {} samples for id: {}, word: {}'. + format(n, vowel_phonemes[0].id_, vowel_phonemes[0].word)) + + def extract_features(self): + phoneme_alignment_file = open(self.alignment_file, 'r') + current_word = None + curr_vowels = [] + for line in phoneme_alignment_file: + path, id_, word, phoneme = line[:-1].split('\t') + phoneme = Phoneme(path, id_, word, phoneme) + if not current_word: + current_word = (id_, word) + if phoneme.phoneme[-1].isnumeric(): + curr_vowels.append(phoneme) + + elif current_word == (id_, word): + if phoneme.phoneme[-1].isnumeric(): + curr_vowels.append(phoneme) + + elif current_word != (id_, word): + # new word encountered. create training samples from the old list + self.pool.apply_async(self.generate_samples, args=[curr_vowels]) + + # overwrite the curr_word and curr_vowels + current_word = (id_, word) + curr_vowels = [] + if phoneme.phoneme[-1].isnumeric(): + curr_vowels.append(phoneme) + + self.pool.apply(self.generate_samples, args=[curr_vowels]) + phoneme_alignment_file.close() + self.pool.close() + self.pool.join() + + def __getstate__(self): + self_dict = self.__dict__.copy() + del self_dict['pool'] + return self_dict + + def __setstate__(self, state): + self.__dict__.update(state) + + +def main(wav_root, alignment_file, out_dir): + sample_extraction = SampleExtraction(wav_root, alignment_file, out_dir) + sample_extraction.extract_features() + + +if __name__ == '__main__': + # script needs three command line arguments + # 1. root path of the folder with wav files split into phonemes + # 2. tab separated file with phoneme info + # 3. output path where npy files will be generated + main(sys.argv[1], sys.argv[2], sys.argv[3])