[8d2107]: / icd_transformer.py

Download this file

116 lines (94 with data), 4.4 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import datetime
import numpy as np
from sklearn.base import TransformerMixin
import loader
import extract_data
from icd9.icd9_categories import get_diagnosis_categories, get_max_diagnosis_info
from structured_data_extractor import get_diagnoses, get_chronic_diagnoses
import language_processing
class ICD9_Transformer(TransformerMixin):
"""
"""
def __init__(self, depth=4):
self.categories = get_diagnosis_categories()
self.category_maxes = get_max_diagnosis_info()
self.depth = min(depth, len(self.category_maxes))
def fit(self, X, y=None, **fit_params):
return self
def transform(self, X, **transform_params):
transformed_X = map(self.get_icd9_vector, X)
return transformed_X
def get_icd9_vector(self, empi):
patient = loader.get_patient_by_EMPI(empi)
operation_date = extract_data.get_operation_date(patient)
diagnoses = get_diagnoses(empi)
# Build a diagnosis history vector for the whole patient
diagnosis_history = None
for (date, code_type, code, diagnosis_name) in diagnoses:
if date < operation_date:
if code_type == 'ICD9':
cleaned_code = code.replace('.','')
if cleaned_code in self.categories:
diagnosis_cat = self.categories[cleaned_code]
diagnosis_vec = self.get_vector_from_category(diagnosis_cat)
if diagnosis_history is None:
diagnosis_history = diagnosis_vec
else:
diagnosis_history += diagnosis_vec
else:
#print "Non-ICD9 Code for Patient: " + empi
pass
# Normalize array to be 1's or zeros, not counts
if diagnosis_history is not None:
diagnosis_history = np.array(map(int, diagnosis_history > 0))
return diagnosis_history
def get_vector_from_category(self, category):
'''
Turns an ICD9 Category into a vector of concatenated vectors where each
vector is a one-hot vector representing the presence of a diagnosis in
that level of the hierarchy
'''
# Normalize the category array with 0s so they are all the same length
category_array = map(int, category.split('.'))
while len(category_array) < self.depth:
category_array.append(0)
# Build a shape array specifying dimensionality of each category
shape = []
for i in range(self.depth):
# Codes are 1 indexed- zeroth element represents missing
shape.append(self.category_maxes[i]+1)
# Build an array at each level in the hierarchy
vectors = []
for i in range(1,self.depth+1):
vec = np.zeros(shape[:i])
subvec = vec
for j in range(i-1):
subvec = subvec[category_array[j]]
subvec[category_array[i-1]] = 1
vectors.append(vec)
return reduce(lambda a,b: np.concatenate((a.flatten(), b.flatten())), vectors)
def get_feature_names(self):
# Build a shape array specifying dimensionality of each category
shape = []
for i in range(self.depth):
# Codes are 1 indexed- zeroth element represents missing
shape.append(self.category_maxes[i]+1)
code_to_description = get_diagnosis_categories(return_descriptions=True)
vectors = []
for i in range(1,self.depth+1):
vec = np.empty(shape[:i], 'U30')
vectors.append(self.label_vector(code_to_description, vec))
return reduce(lambda a,b: np.concatenate((a.flatten(), b.flatten())), vectors)
def label_vector(self, code_to_description, vector, root=""):
for i in range(len(vector)):
element = vector[i]
# Recursive Case
if isinstance(element, np.ndarray):
vector[i] = self.label_vector(code_to_description, vector[i], root + "." + str(i))
# Base Case
else:
desc = (root + "." + str(i))[1:]
if desc in code_to_description:
desc = desc + '_' + code_to_description[desc]
vector[i] = desc
return vector