Diff of /information_extract.py [000000] .. [ab5bfb]

Switch to unified view

a b/information_extract.py
1
import pymongo
2
from dotenv import load_dotenv
3
import os
4
import json
5
from med_terms import LABS, VITALS
6
import glob
7
8
load_dotenv()
9
10
text_data = glob.glob("./data/text/*.txt")
11
12
try:
13
    txt = open(text_data[0], encoding='utf-8')
14
    t = txt.read()
15
finally:
16
    txt.close()
17
18
19
def isnumber(num):
20
    try:
21
        float(num)
22
        return True
23
    except ValueError:
24
        return False
25
26
27
def get_allergies(raw_allergies):
28
    if raw_allergies[1] == 'No Known Allergies':
29
        return None
30
    allergies = []
31
    for allergy in raw_allergies[1:]:
32
        A = allergy.split(": ")[1].strip().split(" (")
33
        try:
34
            allergies.append({"to": A[0], "type": A[1][:-1]})
35
        except Exception:
36
            allergies.append({"to": A[0], "type": 'medicine'})
37
    return allergies
38
39
40
def get_name(filepath):
41
    try:
42
        txt = open(filepath, encoding='utf-8')
43
        return txt.readlines()[0].strip()
44
    finally:
45
        txt.close()
46
47
48
def get_medications(raw_medications):
49
    medications = []
50
    for medication in raw_medications[1:]:
51
        if '[CURRENT]' in medication:
52
            M = medication.split(" : ")
53
            medications.append({'medicine': M[1], 'from': M[0][2:12]})
54
    return medications
55
56
57
def get_conditions(raw_conditions):
58
    conditions = []
59
    for condition in raw_conditions[1:]:
60
        if not '(finding)' in condition:
61
            C = condition.split(" : ")
62
            condition = C[1]
63
            date = C[0].split(" - ")
64
            conditions.append(
65
                {'condition': condition,
66
                    'from': date[0].strip(), 'to': date[1].strip()}
67
            )
68
    return conditions
69
70
def get_care_plans(raw_care_plans):
71
    care_plans = []
72
    for care_plan in raw_care_plans[1:]:
73
        care_plan = care_plan.strip()
74
        if '[CURRENT]' in care_plan or '[STOPPED]' in care_plan:
75
            care_plans.append({'careplan': care_plan.split(" : ")[1], 'date': care_plan.split(
76
                " : ")[0][:10], 'activities': [], 'status': care_plan.split(" : ")[0][10:]})
77
        elif care_plan.strip().startswith("Reason: "):
78
            care_plans[-1]['reason'] = care_plan.replace("Reason: ", '')
79
        elif care_plan.strip().startswith("Activity: "):
80
            care_plans[-1]['activities'].append(
81
                care_plan.replace("Activity: ", ''))
82
    return care_plans
83
84
85
def get_vitals(raw_vitals, category='VITALS'):
86
    A = VITALS if category == 'VITALS' else LABS
87
    vitals = []
88
    for vital in raw_vitals[1:]:
89
        vital = vital.strip()
90
        for V in A:
91
            if V["DESCRIPTION"] in vital:
92
                val = vital.split(V["UNITS"])[0].strip().split()[-1]
93
                if isnumber(val):
94
                    try:
95
                        if not any([V["DESCRIPTION"] == v['description'] for v in vitals]):
96
                            vitals.append(
97
                                {'description': V["DESCRIPTION"], 'units': V["UNITS"]})
98
                            vitals[-1]["value"] = vital.split(
99
                                V["UNITS"])[0].strip().split()[-1]
100
                    except KeyError:
101
                        vitals.append(
102
                            {'description': V["DESCRIPTION"], 'units': V["UNITS"]})
103
                        vitals[-1]["value"] = vital.split(
104
                            V["UNITS"])[0].strip().split()[-1]
105
    return vitals
106
107
108
def get_immunization(raw_immunization):
109
    immunizations = []
110
    for imm in raw_immunization[1:]:
111
        imm = imm.strip()
112
        date, immunization = imm.split(" : ")
113
        immunizations.append({'immunization': immunization, 'date': date})
114
    return immunizations
115
116
117
def get_imaging_studies(raw_imaging):
118
    imagings = []
119
    for imm in raw_imaging[1:]:
120
        imm = imm.strip()
121
        date, imaging = imm.split(" : ")
122
        imagings.append({'imaging': imaging, 'date': date})
123
    return imagings
124
125
126
def wrangle(filepath: str):
127
    data = {}
128
    data["patient_name"] = get_name(filepath)
129
    try:
130
        txt = open(filepath, encoding='utf-8')
131
132
        # Name
133
        data["patient_name"] = get_name(filepath)
134
135
        t = txt.read()
136
        split1 = t.split("="*len(data["patient_name"]))
137
        split2 = split1[1].split(
138
            '--------------------------------------------------------------------------------')
139
        # print(split2[0].strip().split("\n"))
140
        # Demographics
141
        data["demographics"] = {d.split(':')[0].lower(): d.split(
142
            ':')[1].strip() for d in split2[0].strip().split("\n")}
143
144
        # Allergies
145
        # print(split2[1].strip().split("\n"))
146
        data["allergies"] = get_allergies(split2[1].strip().split("\n"))
147
148
        # Medications
149
        # print(split2[2].strip().split("\n"))
150
        data["medications"] = get_medications(split2[2].strip().split("\n"))
151
152
        # Conditions
153
        # assert split2[3].strip().split("\n")[0] == 'CONDITIONS:'
154
        # print(split2[3].strip().split("\n"))
155
        data["conditions"] = get_conditions(split2[3].strip().split("\n"))
156
157
        # Care plans
158
        # print(split2[4].strip().split("\n"))
159
        data["care_plans"] = get_care_plans(split2[4].strip().split("\n"))
160
161
        # Vitals
162
        raw_vitals = split2[5].strip().split("\n")
163
        raw_vitals.extend(split2[6].strip().split("\n"))
164
165
        data["vitals"] = get_vitals(raw_vitals)
166
        # assert all([isnumber(v["value"]) for v in data["vitals"]])
167
168
        # Lab
169
        data["labs"] = get_vitals(raw_vitals, category='LABS')
170
        # assert all([isnumber(v["value"]) for v in data["labs"]])
171
172
        # Immunization
173
        raw_immunization = split2[8].strip().split("\n")
174
        # print(raw_immunization)
175
        data["immunization"] = get_immunization(raw_immunization)
176
177
        # Imaging
178
        raw_imaging = split2[10].strip().split("\n")
179
        # print(raw_imaging)
180
        data["imaging_studies"] = get_imaging_studies(raw_imaging)
181
182
        return data
183
    finally:
184
        txt.close()
185
186
187
for i in range(len(text_data)):
188
    try:
189
        r = wrangle(text_data[i])
190
    except Exception as e:
191
        print(e)
192
        print(i)
193
194
195
data_list = [wrangle(data) for data in text_data]
196
197
198
with open('./data/output/data.json', 'w') as fp:
199
    json.dump(data_list, fp)
200
201
json.dumps(data_list[0])
202
203
username = os.environ.get("USERNAME")
204
password = os.environ.get("PASSWORD")
205
206
207
client = pymongo.MongoClient(
208
    f"mongodb+srv://{username}:{password}@cluster0.lxsbb.mongodb.net/?retryWrites=true&w=majority")
209
db = client.medical_record
210
collection = db["report"]
211
212
213
with open('./data/output/data.json', 'r') as fp:
214
    data_json = json.load(fp)
215
216
# Uncomment the line below.
217
# collection.insert_many(data_json)