Diff of /caa_sheet_src/patient.py [000000] .. [302778]

Switch to side-by-side view

--- a
+++ b/caa_sheet_src/patient.py
@@ -0,0 +1,210 @@
+import numpy as np
+import re
+import os
+import usaddress
+import pandas as pd
+import nltk
+import sys
+import copy
+import request_handling_aws
+
+US_STATES = {"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA",
+          "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
+          "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
+          "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
+          "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"}
+
+US_CITY_CORRECTS = {"st louis":"saint louis"}
+TERMINOLOGY_CORRECTS = {"street": "st", "road":"rd"}
+
+class Patient:
+    def __init__(self,block_markers):
+
+        self.fields = {"SEX", "AGE", "DOB", "PHONE_OR_FAX", "EMAIL", "ADDRESS", "NAME", "po_box", "address","street"}
+        self.pat_dic = {}
+        self.insurance_df = pd.read_excel('./Insurance Companies_Updated.xlsx')
+        self.insurance_alias = {'uhc':'united healthcare',}
+        self.sheet_name = "Advocate Illinois Masonic Medical Center"
+        self.coverage_blocks = {'PRIMARY PLAN NAME/ADDRESS'}
+        self.update_keys(block_markers)
+
+
+    def process_gen_info(self,text_block):
+        for key, value in text_block.items():
+            # print (key, value)
+            if len(value) > 1:
+                if key == '<START>':
+                    self.pat_dic['START_name'] = self.sheet_name
+                    block = self.clean_block(value)
+                    print("START ADDRESS:\n")
+                    self.get_address(value,'START')
+
+                elif key == 'PATIENT NAME/ADDRESS':
+                    # block = self.clean_block(value)
+                    self.process_spaced(value,key)
+                    key_map = request_handling_aws.get_comprehend(value)
+                    self.process_comprehend_dic(key_map, key)
+                    self.get_address(value, key)
+
+                elif key == 'PRIMARY PLAN NAME/ADDRESS':
+                    block = self.clean_block(value)
+
+                    print("PRIMARY PLAN CLEAN", block)
+                    key_map = request_handling_aws.get_comprehend(block)
+                    self.process_comprehend_dic(key_map, key)
+                    self.get_address(block,key)
+
+                elif key == 'SUBSCRIBER NAME/ADDRESS':
+                    block = self.clean_block(value)
+
+                    print("SUBSCRIBER CLEAN", block)
+                    key_map = request_handling_aws.get_comprehend(block)
+                    self.process_comprehend_dic(key_map, key)
+                    self.process_spaced(block, key)
+                    self.get_address(block,key)
+
+        self.get_insurance_medcode()
+
+
+    def process_coloned(self,text_block,key):
+        for line in text_block:
+            curr_line = line.strip().lower().split(':')
+
+            if any(field == curr_line[0] for field in self.fields) and len(curr_line)>1 and '' not in curr_line:
+                if key != '<START>':
+                    self.pat_dic[key+ '_' + curr_line[0].strip()] = curr_line[1].strip()
+                else:
+                    self.pat_dic[curr_line[0].strip()] = curr_line[1].strip()
+
+    def process_spaced(self,text_block,key):
+        for line in text_block:
+            curr_line = copy.deepcopy(line)
+
+            while len(curr_line.split())>1:
+                if any(field in curr_line for field in self.fields):
+                    for field in self.fields:
+                        if field in curr_line:
+                            val_start = curr_line.find(field) + len(field)
+                            if val_start < len(curr_line):
+                                field_val = curr_line[val_start:].split()[0]
+                                self.pat_dic[key + '_' + field.lower()] = field_val
+                                curr_line = curr_line[(curr_line.find(field_val) + len(field_val) + 1):]
+                            else:
+                                field_val = ""
+                                curr_line = ""
+                else:
+                    field_val = ""
+                    curr_line = ""
+
+
+    def process_comprehend_dic(self, comprehend_dict, key):
+        # checked = set()
+        # for field in self.fields:
+        #     if field in comprehend_dict.keys() and field not in checked:
+        #         self.pat_dic[key+ '_' + field.lower()] = comprehend_dict[field]
+        #         checked.add(field)
+        # print(comprehend_dict)
+        types = set([entity['Type'] for entity in comprehend_dict])
+        count_map = {type:0 for type in types}
+        for entity in comprehend_dict:
+            for field in self.fields:
+                if field in entity["Type"] and count_map[entity["Type"]] == 0:
+                    self.pat_dic[key+ '_' + field.lower()] = entity["Text"]
+                    count_map[entity["Type"]] += 1
+                if field in entity["Type"] and count_map[entity["Type"]] > 0:
+                    self.pat_dic[key+ '_' + field.lower() + "_" + str(count_map[entity["Type"]])] = entity["Text"]
+                    count_map[entity["Type"]] += 1
+
+        print(comprehend_dict)
+
+    def get_address(self,text_block,key):
+        block_string = ' '.join(text_block).lower()
+        po_pattern = re.compile(r'(po box)\s*\d+')
+        po_box = re.search(po_pattern, block_string)
+        if po_box != None:
+            self.pat_dic[key+ '_' + 'po_box'] = po_box[0].split()[-1]
+
+        add_pattern = re.compile(r'([A-Z,a-z,0-9][^.!\-:;,\s]+)[,|\s]+([A-Z,a-z][^.!\-:;]+?)\s*(\d{5})')
+
+        addresses = []
+
+        for line in text_block:
+            addresses.append(re.findall(add_pattern, line.lower()))
+
+        for matches in addresses:
+            if len(matches) > 0:
+                try:
+                    tags = usaddress.tag(' '.join(matches[0]))[0]
+                    if 'PlaceName' in tags.keys() and 'StateName' in tags.keys() and tags['StateName'].upper() in US_STATES:
+                        self.pat_dic[key+ '_' + 'address'] = ' '.join(matches[0])
+                        self.pat_dic[key+'_' + 'PlaceName'] = tags['PlaceName']
+                        self.pat_dic[key+'_' + 'StateName'] = tags['StateName']
+                        self.pat_dic[key+'_' + 'ZipCode'] = tags['ZipCode']
+
+                except:
+                    print ("Unexpected error:", sys.exc_info()[0])
+
+        for matches in text_block:
+            if len(matches) > 0:
+                try:
+                    main_tags = usaddress.tag(matches.lower())
+                    tags = main_tags[0]
+                    if len(main_tags) > 0:
+                        if "StreetName" in tags.keys() and "AddressNumber" in tags.keys() and main_tags[1] == 'Street Address' and ('SubaddressType' not in tags.keys() and 'Recipient' not in tags.keys()):
+                            if tags["AddressNumber"].isdigit():
+                                print(tags)
+                                self.pat_dic[key+ '_' + 'street'] = matches.lower()
+
+                except:
+                    print ("Unexpected error:", sys.exc_info()[0])
+
+    def update_keys(self,block_markers):
+        for key in self.fields:
+            for pref in block_markers:
+                if pref != '<START>':
+                    self.pat_dic[pref+ '_' + key] = None
+                else:
+                    self.pat_dic[key] = None
+
+    def get_insurance_medcode(self):
+        for cov_block in self.coverage_blocks:
+            print (cov_block, self.pat_dic[cov_block+ '_' + 'po_box'],self.pat_dic[cov_block + '_' + 'address'])
+            if self.pat_dic[cov_block + '_' + 'address'] != None and (self.pat_dic[cov_block+ '_' + 'po_box'] != None or self.pat_dic[cov_block+ '_' + 'street'] != None):
+
+                tags_add = usaddress.tag(self.pat_dic[cov_block + '_' + 'address'])[0]
+
+                for word, replacement in US_CITY_CORRECTS.items():
+                    tags_add['PlaceName'] = tags_add['PlaceName'].replace(word, replacement)
+
+                print(tags_add)
+                if self.pat_dic[cov_block+ '_' + 'po_box'] != None:
+                    companies_df = self.insurance_df.loc[(self.insurance_df['Address'] == "PO BOX " + self.pat_dic[cov_block + '_' + 'po_box']) &
+                    (self.insurance_df['City'] == tags_add['PlaceName'].upper()) &
+                    (self.insurance_df['St'] == tags_add['StateName'].upper())]
+                elif self.pat_dic[cov_block+ '_' + 'street'] != None:
+                    for word, replacement in TERMINOLOGY_CORRECTS.items():
+                        self.pat_dic[cov_block + '_' + 'street'] = self.pat_dic[cov_block + '_' + 'street'].replace(word, replacement)
+
+                    companies_df = self.insurance_df.loc[(self.insurance_df['Address'] == self.pat_dic[cov_block + '_' + 'street'].upper()) &
+                    (self.insurance_df['City'] == tags_add['PlaceName'].upper()) &
+                    (self.insurance_df['St'] == tags_add['StateName'].upper())]
+
+                if not companies_df.empty:
+                    self.pat_dic[cov_block + "_mednetcode"] = companies_df.iloc[0]['MedNetCode']
+
+                else:
+                    self.pat_dic[cov_block + "_mednetcode"] = None
+            else:
+                self.pat_dic[cov_block + "_mednetcode"] = None
+                # print(self.pat_dic[cov_block + "_mednetcode"])
+
+    def clean_block(self, text_block):
+        cleaned_block = []
+        for line in text_block:
+            if len(line) > 3:
+                cleaned_block.append(line)
+        return cleaned_block
+
+
+    def csv_rep(self):
+        return self.pat_dic