Switch to side-by-side view

--- a
+++ b/cnnmodel/feature_extraction/sample_genration.py
@@ -0,0 +1,144 @@
+import sys
+import os
+import numpy as np
+import librosa
+import multiprocessing as mp
+
+
+from util import LRU
+from cnnmodel.feature_extraction import mfcc_extraction
+from cnnmodel.feature_extraction import non_mfcc_extraction
+
+OPTIMAL_DURATION = 0.115  # we use a frame width of .025 s with stride of .010 s. duration = 0.115 will have 10 frames
+
+
+class Phoneme:
+    def __init__(self, path, id_, word, phoneme):
+        self.path = path
+        self.id_ = id_
+        self.word = word
+        self.phoneme = phoneme
+
+
+class SampleExtraction:
+    def __init__(self, wav_root, alignment_file, out_dir):
+        self.wav_root = wav_root
+        self.alignment_file = alignment_file
+        self.out_dir = out_dir
+
+        self.pool = mp.Pool(mp.cpu_count())
+        self.make_directories()
+
+    def make_directories(self):
+        os.makedirs(self.out_dir + '/0', exist_ok=True)
+        os.makedirs(self.out_dir + '/1', exist_ok=True)
+        os.makedirs(self.out_dir + '/2', exist_ok=True)
+        print('Created directories for each label in path: {}'.format(self.out_dir))
+
+    def get_phoneme_features(self, index, n, vowel_phonemes, features_cache):
+        # if out of bound then
+        if index < 0 or index >= n:
+            return np.zeros(shape=(1, 13, 30), dtype=np.float32), np.zeros(6, dtype=np.float32)
+
+        phoneme = vowel_phonemes[index]
+
+        if phoneme not in features_cache:
+            signal, samplerate = librosa.load(self.wav_root + '/' + phoneme.path, sr=None)
+            optimal_signal_len = int(samplerate * OPTIMAL_DURATION)
+
+            signal_len = len(signal)
+            excess = signal_len - optimal_signal_len
+            left_pad = abs(excess // 2)
+            right_pad = abs(excess) - left_pad
+
+            if signal_len > optimal_signal_len:
+                signal_mfcc = signal[left_pad:-right_pad]
+
+            elif signal_len < optimal_signal_len:
+                signal_mfcc = np.concatenate([np.zeros(left_pad), signal, np.zeros(right_pad)], axis=0)
+            else:
+                signal_mfcc = signal
+
+            # extract MFCC features, should be a matrix of shape (1, 13, 30)
+            mfcc_features = mfcc_extraction.get_mfcc(signal_mfcc, samplerate)
+            # returned np array is of shape (13, 30), add a new channel axis
+            mfcc_features = mfcc_features[np.newaxis, :, :]
+
+            # extract non MFCC features, should be a vector of shape (6,)
+            non_mfcc_features = non_mfcc_extraction.get_non_mfcc(signal, samplerate)
+
+            features_cache[phoneme] = (mfcc_features, non_mfcc_features)
+
+        return features_cache[phoneme]
+
+    def generate_samples(self, vowel_phonemes):
+        n = len(vowel_phonemes)
+        features_cache = LRU(size=5)
+        for i in range(n):
+            phoneme = vowel_phonemes[i]
+            label = phoneme.phoneme[-1]
+
+            pre_mfcc, pre_non_mfcc = self.get_phoneme_features(i - 1, n, vowel_phonemes, features_cache)
+            anchor_mfcc, anchor_non_mfcc = self.get_phoneme_features(i, n, vowel_phonemes, features_cache)
+            suc_mfcc, suc_non_mfcc = self.get_phoneme_features(i + 1, n, vowel_phonemes, features_cache)
+
+            mfcc_tensor = np.concatenate([pre_mfcc, anchor_mfcc, suc_mfcc], axis=0)
+            non_mfcc_vector = np.concatenate([pre_non_mfcc, anchor_non_mfcc, suc_non_mfcc], axis=0)
+            file_name = phoneme.id_ + '_' + phoneme.word + '_' + phoneme.phoneme
+            np.save(self.out_dir + '/' + label + '/' + file_name + '_mfcc.npy', mfcc_tensor)
+            np.save(self.out_dir + '/' + label + '/' + file_name + '_other.npy', non_mfcc_vector)
+
+        print('finished writing {} samples for id: {}, word: {}'.
+              format(n, vowel_phonemes[0].id_, vowel_phonemes[0].word))
+
+    def extract_features(self):
+        phoneme_alignment_file = open(self.alignment_file, 'r')
+        current_word = None
+        curr_vowels = []
+        for line in phoneme_alignment_file:
+            path, id_, word, phoneme = line[:-1].split('\t')
+            phoneme = Phoneme(path, id_, word, phoneme)
+            if not current_word:
+                current_word = (id_, word)
+                if phoneme.phoneme[-1].isnumeric():
+                    curr_vowels.append(phoneme)
+
+            elif current_word == (id_, word):
+                if phoneme.phoneme[-1].isnumeric():
+                    curr_vowels.append(phoneme)
+
+            elif current_word != (id_, word):
+                # new word encountered. create training samples from the old list
+                self.pool.apply_async(self.generate_samples, args=[curr_vowels])
+
+                # overwrite the curr_word and curr_vowels
+                current_word = (id_, word)
+                curr_vowels = []
+                if phoneme.phoneme[-1].isnumeric():
+                    curr_vowels.append(phoneme)
+
+        self.pool.apply(self.generate_samples, args=[curr_vowels])
+        phoneme_alignment_file.close()
+        self.pool.close()
+        self.pool.join()
+
+    def __getstate__(self):
+        self_dict = self.__dict__.copy()
+        del self_dict['pool']
+        return self_dict
+
+    def __setstate__(self, state):
+        self.__dict__.update(state)
+
+
+def main(wav_root, alignment_file, out_dir):
+    sample_extraction = SampleExtraction(wav_root, alignment_file, out_dir)
+    sample_extraction.extract_features()
+
+
+if __name__ == '__main__':
+    # script needs three command line arguments
+    # 1. root path of the folder with wav files split into phonemes
+    # 2. tab separated file with phoneme info
+    # 3. output path where npy files will be generated
+    main(sys.argv[1], sys.argv[2], sys.argv[3])