Diff of /information_extract.py [000000] .. [ab5bfb]

Switch to side-by-side view

--- a
+++ b/information_extract.py
@@ -0,0 +1,217 @@
+import pymongo
+from dotenv import load_dotenv
+import os
+import json
+from med_terms import LABS, VITALS
+import glob
+
+load_dotenv()
+
+text_data = glob.glob("./data/text/*.txt")
+
+try:
+    txt = open(text_data[0], encoding='utf-8')
+    t = txt.read()
+finally:
+    txt.close()
+
+
+def isnumber(num):
+    try:
+        float(num)
+        return True
+    except ValueError:
+        return False
+
+
+def get_allergies(raw_allergies):
+    if raw_allergies[1] == 'No Known Allergies':
+        return None
+    allergies = []
+    for allergy in raw_allergies[1:]:
+        A = allergy.split(": ")[1].strip().split(" (")
+        try:
+            allergies.append({"to": A[0], "type": A[1][:-1]})
+        except Exception:
+            allergies.append({"to": A[0], "type": 'medicine'})
+    return allergies
+
+
+def get_name(filepath):
+    try:
+        txt = open(filepath, encoding='utf-8')
+        return txt.readlines()[0].strip()
+    finally:
+        txt.close()
+
+
+def get_medications(raw_medications):
+    medications = []
+    for medication in raw_medications[1:]:
+        if '[CURRENT]' in medication:
+            M = medication.split(" : ")
+            medications.append({'medicine': M[1], 'from': M[0][2:12]})
+    return medications
+
+
+def get_conditions(raw_conditions):
+    conditions = []
+    for condition in raw_conditions[1:]:
+        if not '(finding)' in condition:
+            C = condition.split(" : ")
+            condition = C[1]
+            date = C[0].split(" - ")
+            conditions.append(
+                {'condition': condition,
+                    'from': date[0].strip(), 'to': date[1].strip()}
+            )
+    return conditions
+
+def get_care_plans(raw_care_plans):
+    care_plans = []
+    for care_plan in raw_care_plans[1:]:
+        care_plan = care_plan.strip()
+        if '[CURRENT]' in care_plan or '[STOPPED]' in care_plan:
+            care_plans.append({'careplan': care_plan.split(" : ")[1], 'date': care_plan.split(
+                " : ")[0][:10], 'activities': [], 'status': care_plan.split(" : ")[0][10:]})
+        elif care_plan.strip().startswith("Reason: "):
+            care_plans[-1]['reason'] = care_plan.replace("Reason: ", '')
+        elif care_plan.strip().startswith("Activity: "):
+            care_plans[-1]['activities'].append(
+                care_plan.replace("Activity: ", ''))
+    return care_plans
+
+
+def get_vitals(raw_vitals, category='VITALS'):
+    A = VITALS if category == 'VITALS' else LABS
+    vitals = []
+    for vital in raw_vitals[1:]:
+        vital = vital.strip()
+        for V in A:
+            if V["DESCRIPTION"] in vital:
+                val = vital.split(V["UNITS"])[0].strip().split()[-1]
+                if isnumber(val):
+                    try:
+                        if not any([V["DESCRIPTION"] == v['description'] for v in vitals]):
+                            vitals.append(
+                                {'description': V["DESCRIPTION"], 'units': V["UNITS"]})
+                            vitals[-1]["value"] = vital.split(
+                                V["UNITS"])[0].strip().split()[-1]
+                    except KeyError:
+                        vitals.append(
+                            {'description': V["DESCRIPTION"], 'units': V["UNITS"]})
+                        vitals[-1]["value"] = vital.split(
+                            V["UNITS"])[0].strip().split()[-1]
+    return vitals
+
+
+def get_immunization(raw_immunization):
+    immunizations = []
+    for imm in raw_immunization[1:]:
+        imm = imm.strip()
+        date, immunization = imm.split(" : ")
+        immunizations.append({'immunization': immunization, 'date': date})
+    return immunizations
+
+
+def get_imaging_studies(raw_imaging):
+    imagings = []
+    for imm in raw_imaging[1:]:
+        imm = imm.strip()
+        date, imaging = imm.split(" : ")
+        imagings.append({'imaging': imaging, 'date': date})
+    return imagings
+
+
+def wrangle(filepath: str):
+    data = {}
+    data["patient_name"] = get_name(filepath)
+    try:
+        txt = open(filepath, encoding='utf-8')
+
+        # Name
+        data["patient_name"] = get_name(filepath)
+
+        t = txt.read()
+        split1 = t.split("="*len(data["patient_name"]))
+        split2 = split1[1].split(
+            '--------------------------------------------------------------------------------')
+        # print(split2[0].strip().split("\n"))
+        # Demographics
+        data["demographics"] = {d.split(':')[0].lower(): d.split(
+            ':')[1].strip() for d in split2[0].strip().split("\n")}
+
+        # Allergies
+        # print(split2[1].strip().split("\n"))
+        data["allergies"] = get_allergies(split2[1].strip().split("\n"))
+
+        # Medications
+        # print(split2[2].strip().split("\n"))
+        data["medications"] = get_medications(split2[2].strip().split("\n"))
+
+        # Conditions
+        # assert split2[3].strip().split("\n")[0] == 'CONDITIONS:'
+        # print(split2[3].strip().split("\n"))
+        data["conditions"] = get_conditions(split2[3].strip().split("\n"))
+
+        # Care plans
+        # print(split2[4].strip().split("\n"))
+        data["care_plans"] = get_care_plans(split2[4].strip().split("\n"))
+
+        # Vitals
+        raw_vitals = split2[5].strip().split("\n")
+        raw_vitals.extend(split2[6].strip().split("\n"))
+
+        data["vitals"] = get_vitals(raw_vitals)
+        # assert all([isnumber(v["value"]) for v in data["vitals"]])
+
+        # Lab
+        data["labs"] = get_vitals(raw_vitals, category='LABS')
+        # assert all([isnumber(v["value"]) for v in data["labs"]])
+
+        # Immunization
+        raw_immunization = split2[8].strip().split("\n")
+        # print(raw_immunization)
+        data["immunization"] = get_immunization(raw_immunization)
+
+        # Imaging
+        raw_imaging = split2[10].strip().split("\n")
+        # print(raw_imaging)
+        data["imaging_studies"] = get_imaging_studies(raw_imaging)
+
+        return data
+    finally:
+        txt.close()
+
+
+for i in range(len(text_data)):
+    try:
+        r = wrangle(text_data[i])
+    except Exception as e:
+        print(e)
+        print(i)
+
+
+data_list = [wrangle(data) for data in text_data]
+
+
+with open('./data/output/data.json', 'w') as fp:
+    json.dump(data_list, fp)
+
+json.dumps(data_list[0])
+
+username = os.environ.get("USERNAME")
+password = os.environ.get("PASSWORD")
+
+
+client = pymongo.MongoClient(
+    f"mongodb+srv://{username}:{password}@cluster0.lxsbb.mongodb.net/?retryWrites=true&w=majority")
+db = client.medical_record
+collection = db["report"]
+
+
+with open('./data/output/data.json', 'r') as fp:
+    data_json = json.load(fp)
+
+# Uncomment the line below.
+# collection.insert_many(data_json)