--- a +++ b/information_extract.py @@ -0,0 +1,217 @@ +import pymongo +from dotenv import load_dotenv +import os +import json +from med_terms import LABS, VITALS +import glob + +load_dotenv() + +text_data = glob.glob("./data/text/*.txt") + +try: + txt = open(text_data[0], encoding='utf-8') + t = txt.read() +finally: + txt.close() + + +def isnumber(num): + try: + float(num) + return True + except ValueError: + return False + + +def get_allergies(raw_allergies): + if raw_allergies[1] == 'No Known Allergies': + return None + allergies = [] + for allergy in raw_allergies[1:]: + A = allergy.split(": ")[1].strip().split(" (") + try: + allergies.append({"to": A[0], "type": A[1][:-1]}) + except Exception: + allergies.append({"to": A[0], "type": 'medicine'}) + return allergies + + +def get_name(filepath): + try: + txt = open(filepath, encoding='utf-8') + return txt.readlines()[0].strip() + finally: + txt.close() + + +def get_medications(raw_medications): + medications = [] + for medication in raw_medications[1:]: + if '[CURRENT]' in medication: + M = medication.split(" : ") + medications.append({'medicine': M[1], 'from': M[0][2:12]}) + return medications + + +def get_conditions(raw_conditions): + conditions = [] + for condition in raw_conditions[1:]: + if not '(finding)' in condition: + C = condition.split(" : ") + condition = C[1] + date = C[0].split(" - ") + conditions.append( + {'condition': condition, + 'from': date[0].strip(), 'to': date[1].strip()} + ) + return conditions + +def get_care_plans(raw_care_plans): + care_plans = [] + for care_plan in raw_care_plans[1:]: + care_plan = care_plan.strip() + if '[CURRENT]' in care_plan or '[STOPPED]' in care_plan: + care_plans.append({'careplan': care_plan.split(" : ")[1], 'date': care_plan.split( + " : ")[0][:10], 'activities': [], 'status': care_plan.split(" : ")[0][10:]}) + elif care_plan.strip().startswith("Reason: "): + care_plans[-1]['reason'] = care_plan.replace("Reason: ", '') + elif care_plan.strip().startswith("Activity: "): + care_plans[-1]['activities'].append( + care_plan.replace("Activity: ", '')) + return care_plans + + +def get_vitals(raw_vitals, category='VITALS'): + A = VITALS if category == 'VITALS' else LABS + vitals = [] + for vital in raw_vitals[1:]: + vital = vital.strip() + for V in A: + if V["DESCRIPTION"] in vital: + val = vital.split(V["UNITS"])[0].strip().split()[-1] + if isnumber(val): + try: + if not any([V["DESCRIPTION"] == v['description'] for v in vitals]): + vitals.append( + {'description': V["DESCRIPTION"], 'units': V["UNITS"]}) + vitals[-1]["value"] = vital.split( + V["UNITS"])[0].strip().split()[-1] + except KeyError: + vitals.append( + {'description': V["DESCRIPTION"], 'units': V["UNITS"]}) + vitals[-1]["value"] = vital.split( + V["UNITS"])[0].strip().split()[-1] + return vitals + + +def get_immunization(raw_immunization): + immunizations = [] + for imm in raw_immunization[1:]: + imm = imm.strip() + date, immunization = imm.split(" : ") + immunizations.append({'immunization': immunization, 'date': date}) + return immunizations + + +def get_imaging_studies(raw_imaging): + imagings = [] + for imm in raw_imaging[1:]: + imm = imm.strip() + date, imaging = imm.split(" : ") + imagings.append({'imaging': imaging, 'date': date}) + return imagings + + +def wrangle(filepath: str): + data = {} + data["patient_name"] = get_name(filepath) + try: + txt = open(filepath, encoding='utf-8') + + # Name + data["patient_name"] = get_name(filepath) + + t = txt.read() + split1 = t.split("="*len(data["patient_name"])) + split2 = split1[1].split( + '--------------------------------------------------------------------------------') + # print(split2[0].strip().split("\n")) + # Demographics + data["demographics"] = {d.split(':')[0].lower(): d.split( + ':')[1].strip() for d in split2[0].strip().split("\n")} + + # Allergies + # print(split2[1].strip().split("\n")) + data["allergies"] = get_allergies(split2[1].strip().split("\n")) + + # Medications + # print(split2[2].strip().split("\n")) + data["medications"] = get_medications(split2[2].strip().split("\n")) + + # Conditions + # assert split2[3].strip().split("\n")[0] == 'CONDITIONS:' + # print(split2[3].strip().split("\n")) + data["conditions"] = get_conditions(split2[3].strip().split("\n")) + + # Care plans + # print(split2[4].strip().split("\n")) + data["care_plans"] = get_care_plans(split2[4].strip().split("\n")) + + # Vitals + raw_vitals = split2[5].strip().split("\n") + raw_vitals.extend(split2[6].strip().split("\n")) + + data["vitals"] = get_vitals(raw_vitals) + # assert all([isnumber(v["value"]) for v in data["vitals"]]) + + # Lab + data["labs"] = get_vitals(raw_vitals, category='LABS') + # assert all([isnumber(v["value"]) for v in data["labs"]]) + + # Immunization + raw_immunization = split2[8].strip().split("\n") + # print(raw_immunization) + data["immunization"] = get_immunization(raw_immunization) + + # Imaging + raw_imaging = split2[10].strip().split("\n") + # print(raw_imaging) + data["imaging_studies"] = get_imaging_studies(raw_imaging) + + return data + finally: + txt.close() + + +for i in range(len(text_data)): + try: + r = wrangle(text_data[i]) + except Exception as e: + print(e) + print(i) + + +data_list = [wrangle(data) for data in text_data] + + +with open('./data/output/data.json', 'w') as fp: + json.dump(data_list, fp) + +json.dumps(data_list[0]) + +username = os.environ.get("USERNAME") +password = os.environ.get("PASSWORD") + + +client = pymongo.MongoClient( + f"mongodb+srv://{username}:{password}@cluster0.lxsbb.mongodb.net/?retryWrites=true&w=majority") +db = client.medical_record +collection = db["report"] + + +with open('./data/output/data.json', 'r') as fp: + data_json = json.load(fp) + +# Uncomment the line below. +# collection.insert_many(data_json)