Switch to unified view

a b/moline_sheet_src/patient.py
1
import numpy as np
2
import re
3
import os
4
import usaddress
5
import pandas as pd
6
import nltk
7
import sys
8
9
US_STATES = {"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA",
10
          "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
11
          "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
12
          "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
13
          "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"}
14
15
US_CITY_CORRECTS = {"st louis":"saint louis"}
16
17
class Patient:
18
    def __init__(self,block_markers):
19
20
        self.fields = {'address', 'adm diagnosis', 'admitting physician', 'attending physician', 'bed', 'city',
21
                        'coded procedure', 'contact serial #', 'dob', 'encounter date', 'guarantor', 'guarantor employer', 'guarantor id',
22
                        'home phone', 'hospital account', 'hospital service', 'mrn', 'name', 'patient class', 'payor', 'po_box', 'primary care provider',
23
                        'primary phone', 'race', 'relation to patient', 'sex', 'status', 'unit', 'street','PlaceName', 'StateName', 'ZipCode'}
24
        self.pat_dic = {}
25
        self.insurance_df = pd.read_excel('./Insurance Companies_Updated.xlsx')
26
        self.insurance_alias = {'uhc':'united healthcare',}
27
28
29
        self.update_keys(block_markers)
30
31
32
    def process_gen_info(self,text_block):
33
        for key, value in text_block.items():
34
            # print (key, value)
35
            if key == 'COVERAGE':
36
                self.process_coverage_info(value)
37
            elif key == '<START>':
38
                try:
39
                    self.pat_dic['START_name'] = value[0]
40
                except:
41
                    print("No Start Name")
42
                print("START ADDRESS:\n")
43
                self.get_address(value,'START')
44
                self.process_coloned(value,key)
45
            else:
46
                self.process_coloned(value,key)
47
        self.get_insurance_medcode()
48
49
    def process_coverage_info(self, text_block):
50
        self.coverage_blocks = {'PRIMARY INSURANCE', 'SECONDARY INSURANCE'}
51
        self.update_keys(self.coverage_blocks)
52
        blocks = {x:[] for x in self.coverage_blocks}
53
        curr_marker = ''
54
55
        for line in text_block:
56
            if any(nltk.edit_distance(line.strip(),x)<3 for x in self.coverage_blocks):
57
                    curr_marker = min([(x, nltk.edit_distance(line.strip(),x)) for x in self.coverage_blocks], key = lambda x: x[1])[0]
58
                    continue
59
            elif curr_marker == '':
60
                continue
61
62
            blocks[curr_marker].append(line)
63
64
        for key,value in blocks.items():
65
            self.process_coloned(value,key)
66
            print("\n" + key)
67
            print(value)
68
            self.get_address(value,key)
69
70
71
72
    def process_coloned(self,text_block,key):
73
        for line in text_block:
74
            curr_line = line.strip().lower().split(':')
75
76
            if any(field == curr_line[0] for field in self.fields) and len(curr_line)>1 and '' not in curr_line:
77
                if key != '<START>':
78
                    self.pat_dic[key+ '_' + curr_line[0].strip()] = curr_line[1].strip()
79
                else:
80
                    self.pat_dic[curr_line[0].strip()] = curr_line[1].strip()
81
82
83
    def get_address(self,text_block,key):
84
        block_string = ' '.join(text_block).lower()
85
        po_pattern = re.compile(r'(po box)\s*\d+')
86
        po_box = re.search(po_pattern, block_string)
87
        if po_box != None:
88
            self.pat_dic[key+ '_' + 'po_box'] = po_box[0].split()[-1]
89
90
        # add_pattern = re.compile(r'([A-Z,a-z,0-9][^.!\-:;,\s]+)[,|\s]+([A-Z,a-z][^.!\-:;]+?)\s*(\d{5})')
91
        add_pattern = re.compile(r'([A-Z,a-z,0-9][^!\-:;,]+)[,|\s]+([A-Z,a-z][^.!\-:;]+?)\s*(\d{5})')
92
93
        addresses = []
94
95
        for line in text_block:
96
            addresses.append(re.findall(add_pattern, line.lower()))
97
98
        print(addresses)
99
        for matches in addresses:
100
            if len(matches) > 0:
101
                try:
102
                    tags = usaddress.tag(' '.join(matches[0]).replace('.',''))[0]
103
                    if 'PlaceName' in tags.keys() and 'StateName' in tags.keys() and tags['StateName'].upper() in US_STATES:
104
                        self.pat_dic[key+ '_' + 'address'] = ' '.join(matches[0]).replace('.','')
105
                        self.pat_dic[key+'_' + 'PlaceName'] = tags['PlaceName']
106
                        self.pat_dic[key+'_' + 'StateName'] = tags['StateName']
107
                        self.pat_dic[key+'_' + 'ZipCode'] = tags['ZipCode']
108
109
                except:
110
                    print ("Unexpected error:", sys.exc_info()[0])
111
112
        for matches in text_block:
113
            if len(matches) > 0:
114
                try:
115
                    main_tags = usaddress.tag(matches.lower())
116
                    tags = main_tags[0]
117
                    if len(main_tags) > 0:
118
                        if "StreetName" in tags.keys() and "AddressNumber" in tags.keys() and main_tags[1] == 'Street Address' and ('SubaddressType' not in tags.keys() and 'Recipient' not in tags.keys()):
119
                            if tags["AddressNumber"].isdigit():
120
                                print(tags)
121
                                self.pat_dic[key+ '_' + 'street'] = matches.lower()
122
123
                except:
124
                    print ("Unexpected error:", sys.exc_info()[0])
125
126
    def update_keys(self,block_markers):
127
        for key in self.fields:
128
            for pref in block_markers:
129
                if pref != '<START>':
130
                    self.pat_dic[pref+ '_' + key] = None
131
                else:
132
                    self.pat_dic[key] = None
133
134
    def get_insurance_medcode(self):
135
        for cov_block in self.coverage_blocks:
136
            print (cov_block, self.pat_dic[cov_block+ '_' + 'po_box'],self.pat_dic[cov_block + '_' + 'address'])
137
            if self.pat_dic[cov_block + '_' + 'address'] != None and (self.pat_dic[cov_block+ '_' + 'po_box'] != None or self.pat_dic[cov_block+ '_' + 'street'] != None):
138
139
                tags_add = usaddress.tag(self.pat_dic[cov_block + '_' + 'address'])[0]
140
141
                for word, replacement in US_CITY_CORRECTS.items():
142
                    tags_add['PlaceName'] = tags_add['PlaceName'].replace(word, replacement)
143
144
                print(tags_add)
145
                if self.pat_dic[cov_block+ '_' + 'po_box'] != None:
146
                    companies_df = self.insurance_df.loc[(self.insurance_df['Address'] == "PO BOX " + self.pat_dic[cov_block + '_' + 'po_box']) &
147
                    (self.insurance_df['City'] == tags_add['PlaceName'].upper()) &
148
                    (self.insurance_df['St'] == tags_add['StateName'].upper())]
149
                elif self.pat_dic[cov_block+ '_' + 'street'] != None:
150
                    companies_df = self.insurance_df.loc[(self.insurance_df['Address'] == self.pat_dic[cov_block + '_' + 'street'].upper()) &
151
                    (self.insurance_df['City'] == tags_add['PlaceName'].upper()) &
152
                    (self.insurance_df['St'] == tags_add['StateName'].upper())]
153
154
                if not companies_df.empty:
155
156
                    print(companies_df)
157
                    if len(companies_df.index) > 1 and self.pat_dic[cov_block + "_payor"] != None:
158
                        print(self.pat_dic[cov_block + "_payor"])
159
                        min_dis = (0,10000)
160
                        company_payor = self.pat_dic[cov_block + "_payor"]
161
162
                        for word, replacement in self.insurance_alias.items():
163
                            company_payor = company_payor.replace(word, replacement)
164
165
                        for index, row in companies_df.iterrows():
166
                            min_dis = min((index,nltk.edit_distance(company_payor, row["Insurance Company Name"].lower())) , min_dis, key=lambda x: x[1])
167
168
                        self.pat_dic[cov_block + "_mednetcode"] = companies_df[companies_df.index == min_dis[0]]['MedNetCode'].item()
169
170
                        print("MULTIPLE",companies_df[companies_df.index == min_dis[0]]['Insurance Company Name'])
171
                    else:
172
                        self.pat_dic[cov_block + "_mednetcode"] = companies_df.iloc[0]['MedNetCode']
173
174
175
                else:
176
                    self.pat_dic[cov_block + "_mednetcode"] = None
177
            else:
178
                self.pat_dic[cov_block + "_mednetcode"] = None
179
                # print(self.pat_dic[cov_block + "_mednetcode"])
180
    def csv_rep(self):
181
        return self.pat_dic