--- a +++ b/deepheart/parser.py @@ -0,0 +1,285 @@ +import os +import pickle +import numpy as np +from scipy.io import wavfile +from scipy.fftpack import fft +from scipy.signal import butter, lfilter +from sklearn.preprocessing import normalize +from sklearn.cross_validation import train_test_split +from collections import namedtuple +from sklearn.cross_validation import check_random_state + + +class PCG: + """ + PCG is a container for loading phonocardiogram (PCG) data for the [2016 physionet + challenge](http://physionet.org/challenge/2016). Raw wav files are parsed into + features, class labels are extracted from header files and data is split into + training and testing groups. + """ + def __init__(self, basepath, random_state=42): + self.basepath = basepath + self.class_name_to_id = {"normal": 0, "abnormal": 1} + self.nclasses = len(self.class_name_to_id.keys()) + + self.train = None + self.test = None + + self.n_samples = 0 + + self.X = None + self.y = None + + self.random_state = random_state + + def initialize_wav_data(self): + """ + Load the original wav files and extract features. Warning, this can take a + while due to slow FFTs. + + Parameters + ---------- + None + + Returns + ------- + None + """ + self.__load_wav_file() + self.__split_train_test() + # TODO: check if directory exists + self.save("/tmp") + + def save(self, save_path): + """ + Persist the PCG class to disk + + Parameters + ---------- + save_path: str + Location on disk to store the parsed PCG metadata + + Returns + ------- + None + + """ + np.save(os.path.join(save_path, "X.npy"), self.X) + np.save(os.path.join(save_path, "y.npy"), self.y) + with open( os.path.join(save_path, "meta"), "w") as fout: + pickle.dump((self.basepath, self.class_name_to_id, self.nclasses, + self.n_samples, self.random_state), fout) + + def load(self, load_path): + """ + Load a previously stored PCG class. + + Parameters + ---------- + load_path: str + Location on disk to load parsed PCG data + + Returns + ------- + None + + """ + self.X = np.load(os.path.join(load_path, "X.npy")) + self.y = np.load(os.path.join(load_path, "y.npy")) + with open(os.path.join(load_path, "meta"), "r") as fin: + (self.basepath, self.class_name_to_id, self.nclasses, + self.n_samples, self.random_state) = pickle.load(fin) + self.__split_train_test() + + def __load_wav_file(self, doFFT=True): + """ + Loads physio 2016 challenge dataset from self.basepath by crawling the path. + For each discovered wav file: + + * Attempt to parse the header file for class label + * Attempt to load the wav file + * Calculate features from the wav file. if doFFT, features are + the Fourier transform of the original signal. Else, features are + the raw signal itself truncated to a fixed length + + Parameters + ---------- + doFFT: bool + True if features to be calculated are the FFT of the original signal + + Returns + ------- + None + """ + + # First pass to calculate number of samples + # ensure each wav file has an associated and parsable + # Header file + wav_file_names = [] + class_labels = [] + for root, dirs, files in os.walk(self.basepath): + # Ignore validation for now! + if "validation" in root: + continue + for file in files: + if file.endswith('.wav'): + try: + base_file_name = file.rstrip(".wav") + label_file_name = os.path.join(root, base_file_name + ".hea") + + class_label = self.__parse_class_label(label_file_name) + class_labels.append(self.class_name_to_id[class_label]) + wav_file_names.append(os.path.join(root, file)) + + self.n_samples += 1 + except InvalidHeaderFileException as e: + print e + + if doFFT: + fft_embedding_size = 400 + highpass_embedding_size = 200 + X = np.zeros([self.n_samples, fft_embedding_size + highpass_embedding_size]) + else: + # Truncating the length of each wav file to the + # min file size (10611) (Note: this is bad + # And causes loss of information!) + embedding_size = 10611 + X = np.zeros([self.n_samples, embedding_size]) + + for idx, wavfname in enumerate(wav_file_names): + rate, wf = wavfile.read(wavfname) + wf = normalize(wf.reshape(1, -1)) + + if doFFT: + # We only care about the magnitude of each frequency + wf_fft = np.abs(fft(wf)) + wf_fft = wf_fft[:, :fft_embedding_size].reshape(-1) + + # Filter out high frequencies via Butter transform + # The human heart maxes out around 150bpm = 2.5Hz + # Let's filter out any frequency significantly above this + nyquist = 0.5 * rate + cutoff_freq = 4.0 # Hz + w0, w1 = butter(5, cutoff_freq / nyquist, btype='low', analog=False) + wf_low_pass = lfilter(w0, w1, wf) + + # FFT the filtered signal + wf_low_pass_fft = np.abs(fft(wf_low_pass)) + wf_low_pass_fft = wf_low_pass_fft[:, :highpass_embedding_size].reshape(-1) + + features = np.concatenate((wf_fft, wf_low_pass_fft)) + else: + features = wf[:embedding_size] + + X[idx, :] = features + idx += 1 + + self.X = X + + class_labels = np.array(class_labels) + + # Map from dense to one hot + self.y = np.eye(self.nclasses)[class_labels] + + def __parse_class_label(self, label_file_name): + """ + Parses physio bank header files, where the class label + is located in the last line of the file. An example header + file could contain: + + f0112 1 2000 60864 + f0112.wav 16+44 1 16 0 0 0 0 PCG + # Normal + + + Parameters + ---------- + label_file_name: str + Path to a specific header file + + Returns + ------- + class_label: str + One of `normal` or `abnormal` + """ + with open(label_file_name, 'r') as fin: + header = fin.readlines() + + comments = [line for line in header if line.startswith("#")] + if not len(comments) == 1: + raise InvalidHeaderFileException("Invalid label file %s" % label_file_name) + + class_label = str(comments[0]).lstrip("#").rstrip("\r").strip().lower() + + if not class_label in self.class_name_to_id.keys(): + raise InvalidHeaderFileException("Invalid class label %s" % class_label) + + return class_label + + def __split_train_test(self): + """ + Splits internal features (self.X) and class labels (self.y) into + balanced training and test sets using sklearn's helper function. + + Notes: + * if self.random_state is None, splits will be randomly seeded + otherwise, self.random_state defines the random seed to deterministicly + split training and test data + * For now, class balancing is done by subsampling the overrepresented class. + Ideally this would be pushed down to the cost function in TensorFlow. + + Returns + ------- + None + """ + mlData = namedtuple('ml_data', 'X y') + + num_pos, num_neg = np.sum(self.y, axis=0) + + # Remove samples to rebalance classes + # TODO: push this down into the cost function + undersample_rate = num_neg / num_pos + over_represented_idxs = self.y[:, 1] == 0 + under_represented_idxs = self.y[:, 1] == 1 + random_indexes_to_remove = np.random.rand(self.y.shape[0]) < undersample_rate + sample_idxs = (over_represented_idxs & random_indexes_to_remove | + under_represented_idxs) + + X_balanced = self.X[sample_idxs, :] + y_balanced = self.y[sample_idxs, :] + + X_train, X_test, y_train, y_test = train_test_split(X_balanced, y_balanced, test_size=0.25, + random_state=self.random_state) + + self.train = mlData(X=X_train, y=y_train) + self.test = mlData(X=X_test, y=y_test) + + def get_mini_batch(self, batch_size): + """ + Helper function for sampling mini-batches from the training + set. Note, random_state needs to be set to None or the same + mini batch will be sampled eternally! + + Parameters + ---------- + batch_size: int + Number of elements to return in the mini batch + + Returns + ------- + X: np.ndarray + A feature matrix subsampled from self.train + + y: np.ndarray + A one-hot matrix of class labels subsampled from self.train + """ + random_state = check_random_state(None) # self.random_state) + n_training_samples = self.train.X.shape[0] + minibatch_indices = random_state.randint(0, n_training_samples - 1, batch_size) + + return self.train.X[minibatch_indices, :], self.train.y[minibatch_indices, :] + + +class InvalidHeaderFileException(Exception): + def __init__(self, *args, **kwargs): + super(args, kwargs)