[c3444c]: / coderpp / test / load_umls.py

Download this file

121 lines (106 with data), 4.5 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
from tqdm import tqdm
import re
from random import shuffle
import pickle
import ahocorasick
#import ipdb
def byLineReader(filename):
with open(filename, "r", encoding="utf-8") as f:
line = f.readline()
while line:
yield line
line = f.readline()
return
class UMLS(object):
def __init__(self, umls_path, phrase2idx_path, idx2phrase_path, source_range=None, lang_range=['ENG'], only_load_dict=False):
# phrase2idx is the dict of our NER vocab. It is used to exclude those phrases in MRCONSO but not in our NER vocab
self.umls_path = umls_path
self.source_range = source_range
self.lang_range = lang_range
self.phrase2idx = self._load_pickle(phrase2idx_path)
self.idx2phrase = self._load_pickle(idx2phrase_path)
self.detect_type()
self.load()
def _load_pickle(self, path):
with open(path, 'rb') as f:
return pickle.load(f)
def transform(self, phrase):
if phrase in self.phrase2idx.keys() and len(phrase)>3:
return self.phrase2idx[phrase]
else:
return None
def detect_type(self):
if os.path.exists(os.path.join(self.umls_path, "MRCONSO.RRF")):
self.type = "RRF"
else:
self.type = "txt"
def load(self):
reader = byLineReader(os.path.join(self.umls_path, "MRCONSO." + self.type))
self.lui_set = set()
self.cui2str = {}
self.str2cui = {}
self.code2cui = {}
self.stridx_list = set()
#self.lui_status = {}
read_count = 0
for line in tqdm(reader, ascii=True):
if self.type == "txt":
l = [t.replace("\"", "") for t in line.split(",")]
else:
l = line.strip().split("|")
cui = l[0]
lang = l[1]
# lui_status = l[2].lower() # p -> preferred
lui = l[3]
source = l[11]
code = l[13]
string = l[14]
if (self.source_range is None or source in self.source_range) and (self.lang_range is None or lang in self.lang_range):
if not lui in self.lui_set:
clean_string = self.clean(string)
idx = self.transform(clean_string)
if idx is None:
continue
read_count += 1
# if 'abdom' not in clean_string:
# continue
if string not in self.str2cui:
self.str2cui[string] = set()
self.str2cui[string].update([cui])
if string.lower() not in self.str2cui:
self.str2cui[string.lower()] = set()
self.str2cui[string.lower()].update([cui])
if clean_string not in self.str2cui:
self.str2cui[clean_string] = set()
self.str2cui[clean_string].update([cui])
if not cui in self.cui2str:
self.cui2str[cui] = set()
self.cui2str[cui].update([idx])
self.stridx_list.update([idx])
self.code2cui[code] = cui
self.lui_set.update([lui])
# For debug
# if len(self.stridx_list) > 500:
# break
self.cui = list(self.cui2str.keys())
shuffle(self.cui)
self.cui_count = len(self.cui)
self.stridx_list = list(self.stridx_list)
print("cui count:", self.cui_count)
print("str2cui count:", len(self.str2cui))
print("MRCONSO count:", read_count)
print("str count:", len(self.stridx_list))
# print([[self.idx2phrase[stridx] for stridx in list(gt_clustering)] for gt_clustering in list(self.cui2str.values())])
def clean(self, term, lower=True, clean_NOS=True, clean_bracket=True, clean_dash=True):
term = " " + term + " "
if lower:
term = term.lower()
if clean_NOS:
term = term.replace(" NOS ", " ").replace(" nos ", " ")
if clean_bracket:
term = re.sub(u"\\(.*?\\)", "", term)
if clean_dash:
term = term.replace("-", " ")
term = " ".join([w for w in term.split() if w])
return term