|
a |
|
b/information_extract.py |
|
|
1 |
import pymongo |
|
|
2 |
from dotenv import load_dotenv |
|
|
3 |
import os |
|
|
4 |
import json |
|
|
5 |
from med_terms import LABS, VITALS |
|
|
6 |
import glob |
|
|
7 |
|
|
|
8 |
load_dotenv() |
|
|
9 |
|
|
|
10 |
text_data = glob.glob("./data/text/*.txt") |
|
|
11 |
|
|
|
12 |
try: |
|
|
13 |
txt = open(text_data[0], encoding='utf-8') |
|
|
14 |
t = txt.read() |
|
|
15 |
finally: |
|
|
16 |
txt.close() |
|
|
17 |
|
|
|
18 |
|
|
|
19 |
def isnumber(num): |
|
|
20 |
try: |
|
|
21 |
float(num) |
|
|
22 |
return True |
|
|
23 |
except ValueError: |
|
|
24 |
return False |
|
|
25 |
|
|
|
26 |
|
|
|
27 |
def get_allergies(raw_allergies): |
|
|
28 |
if raw_allergies[1] == 'No Known Allergies': |
|
|
29 |
return None |
|
|
30 |
allergies = [] |
|
|
31 |
for allergy in raw_allergies[1:]: |
|
|
32 |
A = allergy.split(": ")[1].strip().split(" (") |
|
|
33 |
try: |
|
|
34 |
allergies.append({"to": A[0], "type": A[1][:-1]}) |
|
|
35 |
except Exception: |
|
|
36 |
allergies.append({"to": A[0], "type": 'medicine'}) |
|
|
37 |
return allergies |
|
|
38 |
|
|
|
39 |
|
|
|
40 |
def get_name(filepath): |
|
|
41 |
try: |
|
|
42 |
txt = open(filepath, encoding='utf-8') |
|
|
43 |
return txt.readlines()[0].strip() |
|
|
44 |
finally: |
|
|
45 |
txt.close() |
|
|
46 |
|
|
|
47 |
|
|
|
48 |
def get_medications(raw_medications): |
|
|
49 |
medications = [] |
|
|
50 |
for medication in raw_medications[1:]: |
|
|
51 |
if '[CURRENT]' in medication: |
|
|
52 |
M = medication.split(" : ") |
|
|
53 |
medications.append({'medicine': M[1], 'from': M[0][2:12]}) |
|
|
54 |
return medications |
|
|
55 |
|
|
|
56 |
|
|
|
57 |
def get_conditions(raw_conditions): |
|
|
58 |
conditions = [] |
|
|
59 |
for condition in raw_conditions[1:]: |
|
|
60 |
if not '(finding)' in condition: |
|
|
61 |
C = condition.split(" : ") |
|
|
62 |
condition = C[1] |
|
|
63 |
date = C[0].split(" - ") |
|
|
64 |
conditions.append( |
|
|
65 |
{'condition': condition, |
|
|
66 |
'from': date[0].strip(), 'to': date[1].strip()} |
|
|
67 |
) |
|
|
68 |
return conditions |
|
|
69 |
|
|
|
70 |
def get_care_plans(raw_care_plans): |
|
|
71 |
care_plans = [] |
|
|
72 |
for care_plan in raw_care_plans[1:]: |
|
|
73 |
care_plan = care_plan.strip() |
|
|
74 |
if '[CURRENT]' in care_plan or '[STOPPED]' in care_plan: |
|
|
75 |
care_plans.append({'careplan': care_plan.split(" : ")[1], 'date': care_plan.split( |
|
|
76 |
" : ")[0][:10], 'activities': [], 'status': care_plan.split(" : ")[0][10:]}) |
|
|
77 |
elif care_plan.strip().startswith("Reason: "): |
|
|
78 |
care_plans[-1]['reason'] = care_plan.replace("Reason: ", '') |
|
|
79 |
elif care_plan.strip().startswith("Activity: "): |
|
|
80 |
care_plans[-1]['activities'].append( |
|
|
81 |
care_plan.replace("Activity: ", '')) |
|
|
82 |
return care_plans |
|
|
83 |
|
|
|
84 |
|
|
|
85 |
def get_vitals(raw_vitals, category='VITALS'): |
|
|
86 |
A = VITALS if category == 'VITALS' else LABS |
|
|
87 |
vitals = [] |
|
|
88 |
for vital in raw_vitals[1:]: |
|
|
89 |
vital = vital.strip() |
|
|
90 |
for V in A: |
|
|
91 |
if V["DESCRIPTION"] in vital: |
|
|
92 |
val = vital.split(V["UNITS"])[0].strip().split()[-1] |
|
|
93 |
if isnumber(val): |
|
|
94 |
try: |
|
|
95 |
if not any([V["DESCRIPTION"] == v['description'] for v in vitals]): |
|
|
96 |
vitals.append( |
|
|
97 |
{'description': V["DESCRIPTION"], 'units': V["UNITS"]}) |
|
|
98 |
vitals[-1]["value"] = vital.split( |
|
|
99 |
V["UNITS"])[0].strip().split()[-1] |
|
|
100 |
except KeyError: |
|
|
101 |
vitals.append( |
|
|
102 |
{'description': V["DESCRIPTION"], 'units': V["UNITS"]}) |
|
|
103 |
vitals[-1]["value"] = vital.split( |
|
|
104 |
V["UNITS"])[0].strip().split()[-1] |
|
|
105 |
return vitals |
|
|
106 |
|
|
|
107 |
|
|
|
108 |
def get_immunization(raw_immunization): |
|
|
109 |
immunizations = [] |
|
|
110 |
for imm in raw_immunization[1:]: |
|
|
111 |
imm = imm.strip() |
|
|
112 |
date, immunization = imm.split(" : ") |
|
|
113 |
immunizations.append({'immunization': immunization, 'date': date}) |
|
|
114 |
return immunizations |
|
|
115 |
|
|
|
116 |
|
|
|
117 |
def get_imaging_studies(raw_imaging): |
|
|
118 |
imagings = [] |
|
|
119 |
for imm in raw_imaging[1:]: |
|
|
120 |
imm = imm.strip() |
|
|
121 |
date, imaging = imm.split(" : ") |
|
|
122 |
imagings.append({'imaging': imaging, 'date': date}) |
|
|
123 |
return imagings |
|
|
124 |
|
|
|
125 |
|
|
|
126 |
def wrangle(filepath: str): |
|
|
127 |
data = {} |
|
|
128 |
data["patient_name"] = get_name(filepath) |
|
|
129 |
try: |
|
|
130 |
txt = open(filepath, encoding='utf-8') |
|
|
131 |
|
|
|
132 |
# Name |
|
|
133 |
data["patient_name"] = get_name(filepath) |
|
|
134 |
|
|
|
135 |
t = txt.read() |
|
|
136 |
split1 = t.split("="*len(data["patient_name"])) |
|
|
137 |
split2 = split1[1].split( |
|
|
138 |
'--------------------------------------------------------------------------------') |
|
|
139 |
# print(split2[0].strip().split("\n")) |
|
|
140 |
# Demographics |
|
|
141 |
data["demographics"] = {d.split(':')[0].lower(): d.split( |
|
|
142 |
':')[1].strip() for d in split2[0].strip().split("\n")} |
|
|
143 |
|
|
|
144 |
# Allergies |
|
|
145 |
# print(split2[1].strip().split("\n")) |
|
|
146 |
data["allergies"] = get_allergies(split2[1].strip().split("\n")) |
|
|
147 |
|
|
|
148 |
# Medications |
|
|
149 |
# print(split2[2].strip().split("\n")) |
|
|
150 |
data["medications"] = get_medications(split2[2].strip().split("\n")) |
|
|
151 |
|
|
|
152 |
# Conditions |
|
|
153 |
# assert split2[3].strip().split("\n")[0] == 'CONDITIONS:' |
|
|
154 |
# print(split2[3].strip().split("\n")) |
|
|
155 |
data["conditions"] = get_conditions(split2[3].strip().split("\n")) |
|
|
156 |
|
|
|
157 |
# Care plans |
|
|
158 |
# print(split2[4].strip().split("\n")) |
|
|
159 |
data["care_plans"] = get_care_plans(split2[4].strip().split("\n")) |
|
|
160 |
|
|
|
161 |
# Vitals |
|
|
162 |
raw_vitals = split2[5].strip().split("\n") |
|
|
163 |
raw_vitals.extend(split2[6].strip().split("\n")) |
|
|
164 |
|
|
|
165 |
data["vitals"] = get_vitals(raw_vitals) |
|
|
166 |
# assert all([isnumber(v["value"]) for v in data["vitals"]]) |
|
|
167 |
|
|
|
168 |
# Lab |
|
|
169 |
data["labs"] = get_vitals(raw_vitals, category='LABS') |
|
|
170 |
# assert all([isnumber(v["value"]) for v in data["labs"]]) |
|
|
171 |
|
|
|
172 |
# Immunization |
|
|
173 |
raw_immunization = split2[8].strip().split("\n") |
|
|
174 |
# print(raw_immunization) |
|
|
175 |
data["immunization"] = get_immunization(raw_immunization) |
|
|
176 |
|
|
|
177 |
# Imaging |
|
|
178 |
raw_imaging = split2[10].strip().split("\n") |
|
|
179 |
# print(raw_imaging) |
|
|
180 |
data["imaging_studies"] = get_imaging_studies(raw_imaging) |
|
|
181 |
|
|
|
182 |
return data |
|
|
183 |
finally: |
|
|
184 |
txt.close() |
|
|
185 |
|
|
|
186 |
|
|
|
187 |
for i in range(len(text_data)): |
|
|
188 |
try: |
|
|
189 |
r = wrangle(text_data[i]) |
|
|
190 |
except Exception as e: |
|
|
191 |
print(e) |
|
|
192 |
print(i) |
|
|
193 |
|
|
|
194 |
|
|
|
195 |
data_list = [wrangle(data) for data in text_data] |
|
|
196 |
|
|
|
197 |
|
|
|
198 |
with open('./data/output/data.json', 'w') as fp: |
|
|
199 |
json.dump(data_list, fp) |
|
|
200 |
|
|
|
201 |
json.dumps(data_list[0]) |
|
|
202 |
|
|
|
203 |
username = os.environ.get("USERNAME") |
|
|
204 |
password = os.environ.get("PASSWORD") |
|
|
205 |
|
|
|
206 |
|
|
|
207 |
client = pymongo.MongoClient( |
|
|
208 |
f"mongodb+srv://{username}:{password}@cluster0.lxsbb.mongodb.net/?retryWrites=true&w=majority") |
|
|
209 |
db = client.medical_record |
|
|
210 |
collection = db["report"] |
|
|
211 |
|
|
|
212 |
|
|
|
213 |
with open('./data/output/data.json', 'r') as fp: |
|
|
214 |
data_json = json.load(fp) |
|
|
215 |
|
|
|
216 |
# Uncomment the line below. |
|
|
217 |
# collection.insert_many(data_json) |