Download this file

493 lines (404 with data), 22.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
import json
import logging
import multiprocessing
import os
import subprocess
import tempfile
import warnings
import math
import xmltodict
from joblib import Parallel, delayed
from medacy.tools.unicode_to_ascii import UNICODE_TO_ASCII
class MetaMap:
"""A python wrapper for MetaMap that includes built in caching of MetaMap output."""
def __init__(self, metamap_path, cache_output=False, cache_directory=None, convert_ascii=True, args=""):
"""
:param metamap_path: The location of the MetaMap executable.
(ex. /home/programs/metamap/2016/public_mm/bin/metamap)
:param cache_output: Whether to cache output as it run through metamap, will by default store in a
temp directory tmp/medacy*/
:param cache_directory: alternatively, specify a directory to cache metamapped files to
"""
# Set cache directory to tmp directory, creating if not exists
if cache_output and cache_directory is None:
tmp = tempfile.gettempdir()
files = [filename for filename in os.listdir(tmp) if filename.startswith("medacy")]
if files:
cache_directory = os.path.join(tmp, files[0])
else:
tmp_dir = tempfile.mkdtemp(prefix="medacy")
cache_directory = os.path.join(tmp, tmp_dir)
self.cache_directory = cache_directory
self.metamap_path = metamap_path
self.convert_ascii = convert_ascii
self.args = args
# Set path to the program that enables metamapping
self._program_name = os.path.join(os.path.dirname(self.metamap_path), 'skrmedpostctl')
self.recent_file = None
self.metamap_dict = {}
def activate(self):
"""Activates MetaMap for metamapping files or strings"""
subprocess.call([self._program_name, 'start'])
def __enter__(self):
"""Activates MetaMap for metamapping files or strings"""
self.activate()
return self
def deactivate(self):
"""Deactivates MetaMap"""
subprocess.call([self._program_name, 'stop'])
def __exit__(self, exc_type, exc_val, exc_tb):
"""Deactivates MetaMap"""
self.deactivate()
def map_file(self, file_to_map, max_prune_depth=10):
"""
Maps a given document from a file_path and returns a formatted dict
:param file_to_map: the path of the file to be metamapped
:param max_prune_depth: See metamap specs about pruning depth; defaults to 10; set to larger for better results.
:return: a dictionary of MetaMap data
"""
self.recent_file = file_to_map
if self.cache_directory is not None: # Look up file if exists, otherwise continue metamapping
cached_file_path = os.path.join(
self.cache_directory,
os.path.splitext(os.path.basename(file_to_map))[0] + ".metamapped"
)
if os.path.exists(cached_file_path):
logging.debug(cached_file_path)
return self.load(cached_file_path)
with open(file_to_map, 'r') as f:
contents = f.read()
metamap_dict = self._run_metamap('--XMLf --blanklines 0 --silent --prune %i %s' % (max_prune_depth, self.args), contents)
if self.cache_directory is not None:
with open(cached_file_path, 'w') as mapped_file:
try:
mapped_file.write(json.dumps(metamap_dict))
except Exception as e:
logging.error(str(e))
return metamap_dict
def map_text(self, text, max_prune_depth=10):
"""
Runs MetaMap over str input
:param text: A string to run MetaMap over
:param max_prune_depth: defaults to 10
:return: a MetaMap dict
"""
self.metamap_dict = self._run_metamap('--XMLf --blanklines 0 --silent --prune %i' % max_prune_depth, text)
return self.metamap_dict
@staticmethod
def load(file_to_load):
with open(file_to_load, 'rb') as f:
return json.load(f)
def _run_metamap(self, args, document):
"""
Runs metamap through bash and feeds in appropriate arguments
:param args: arguments to feed into metamap
:param document: the raw text to be metamapped
:return:
"""
if self.convert_ascii:
document, ascii_diff = self._convert_to_ascii(document)
bash_command = 'bash %s %s' % (self.metamap_path, args)
process = subprocess.Popen(bash_command, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate(input=bytes(document, 'UTF-8'))
output = str(output.decode('utf-8'))
xml = ""
lines = output.split('\n')
# Lines at index 1 and 2 are a header for the XML output
for line in lines[1:3]:
xml += line + '\n'
# The beginning of the metamap-specific XML is this tag
xml += "<metamap>\n"
for line in output.split("\n")[3:]:
if not all(item in line for item in ['DOCTYPE', 'xml']):
xml += line + '\n'
xml += "</metamap>" # surround in single root tag - hacky.
if output is None:
raise Exception("An error occured while using metamap: %s" % error)
metamap_dict = xmltodict.parse(xml)
if self.convert_ascii:
document, metamap_dict = self._restore_from_ascii(document, ascii_diff, metamap_dict)
return metamap_dict
def _item_generator(self, json_input, lookup_key):
if isinstance(json_input, dict):
for k, v in json_input.items():
if k == lookup_key:
yield v
else:
yield from self._item_generator(v, lookup_key)
elif isinstance(json_input, list):
for item in json_input:
yield from self._item_generator(item, lookup_key)
def extract_mapped_terms(self, metamap_dict):
"""
Extracts an array of term dictionaries from metamap_dict
:param metamap_dict: A dictionary containing the metamap output
:return: an array of mapped_terms
"""
if metamap_dict['metamap'] is None:
warnings.warn("Metamap output is none for a file in the pipeline. Exiting.")
return
all_terms = []
for term in self._item_generator(metamap_dict, 'Candidate'):
if isinstance(term, dict):
all_terms.append(term)
if isinstance(term, list):
all_terms = all_terms + term
return all_terms
def mapped_terms_to_spacy_ann(self, mapped_terms, entity_label=None):
"""
Transforms an array of mapped_terms in a spacy annotation object. Label for each annotation
defaults to first semantic type in semantic_type array
:param mapped_terms: an array of mapped terms
:param entity_label: the label to assign to each annotation, defaults to first semantic type of mapped_term
:return: a annotation formatted to spacy's specifications
"""
annotations = []
for term in mapped_terms:
for span in self.get_span_by_term(term): # if a single entity corresonds to a disjunct span
entity_start, entity_end = span
if entity_label is None:
annotations.append((entity_start, entity_end, self.get_semantic_types_by_term(term)[0]))
else:
annotations.append((entity_start, entity_end, entity_label))
return annotations
def get_term_by_semantic_type(self, mapped_terms, include=[], exclude=None):
"""
Returns metamapped utterances that all contain a given set of semantic types found in include
:param mapped_terms: An array of candidate dictionaries
:return: the dictionaries that contain a term with all the semantic types in semantic_types
"""
if exclude is not None:
intersection = set(include) & exclude
if intersection:
raise Exception("Include and exclude overlap with the following semantic types: " + ", ".join(intersection))
matches = []
for term in mapped_terms:
found_types = []
if int(term['SemTypes']['@Count']) == 0:
continue
if int(term['SemTypes']['@Count']) == 1:
found_types.append(term['SemTypes']['SemType'])
if int(term['SemTypes']['@Count']) > 1:
found_types = term['SemTypes']['SemType']
if exclude is not None and set(exclude) <= set(found_types):
continue
if set(include) <= set(found_types):
matches.append(term)
return matches
def get_span_by_term(self, term):
"""
Takes a given utterance dictionary (term) and extracts out the character indices of the utterance
:param term: The full dictionary corresponding to a metamap term
:return: the span of the referenced term in the document
"""
if isinstance(term['ConceptPIs']['ConceptPI'], list):
spans = []
for span in term['ConceptPIs']['ConceptPI']:
start = int(span['StartPos'])
length = int(span['Length'])
spans.append((start, start + length))
return spans
else:
start = int(term['ConceptPIs']['ConceptPI']['StartPos'])
length = int(term['ConceptPIs']['ConceptPI']['Length'])
return [(start, start + length)]
def get_semantic_types_by_term(self, term):
"""
Returns an array of the semantic types of a given term
:param term:
:return:
"""
if int(term['SemTypes']['@Count']) == 1:
return [term['SemTypes']['SemType']]
return term['SemTypes']['SemType']
def __call__(self, file_path):
"""
Metamaps a file and returns an array of mapped terms from the file
:param file_path:
:return: array of mapped_terms
"""
metamap_dict = self.map_file(file_path)
return self.extract_mapped_terms(metamap_dict)
def _convert_to_ascii(self, text):
"""Takes in a text string and converts it to ASCII,
keeping track of each character change
The changes are recorded in a list of objects, each object
detailing the original non-ASCII character and the starting
index and length of the replacement in the new string (keys
``original``, ``start``, and ``length``, respectively).
Args:
text (string): The text to be converted
Returns:
tuple: tuple containing:
**text** (*string*): The converted text
**diff** (*list*): Record of all ASCII conversions
"""
diff = list()
offset = 0
for i, char in enumerate(text):
if ord(char) >= 128: #non-ascii
if char in UNICODE_TO_ASCII and UNICODE_TO_ASCII[char] is not char:
ascii = UNICODE_TO_ASCII[char]
text = text[:i+offset] + ascii + text[i+1+offset:]
diff.append({
'start': i+offset,
'length': len(ascii),
'original': char
})
offset += len(ascii) - len(char)
else:
ascii = '?'
text = text[:i + offset] + ascii + text[i + 1 + offset:]
diff.append({
'start': i + offset,
'length': len(ascii),
'original': char
})
offset += len(ascii) - len(char)
return text, diff
def _restore_from_ascii(self, text, diff, metamap_dict):
"""Takes in non-ascii text and the list of changes made to it from the `convert()` function,
as well as a dictionary of metamap taggings, converts the text back to its original state
and updates the character spans in the metamap dict to match
Arguments:
text (string): Output of ``_convert_to_ascii()``
diff (list): Output of ``_convert_to_ascii()``
metamap_dict (dict): Dictionary of metamap information obtained from ``text``
Returns:
tuple: tuple containing:
**text** (*string*): The input with all of the changes listed in ``diff`` reversed
**metamap_dict** (*dict*): The input with all of its character spans updated to reflect the changes to ``text``
"""
offset = 0
for conv in diff: # Go through each recorded change to undo it & update metamap character spans accordingly
conv_start = conv['start'] + offset
conv_end = conv_start + conv['length']-1 # Ending index of converted span, INCLUSIVE
# Undo the change to the text (restore ascii characters)
text = text[:conv_start] + conv['original'] + text[conv_end+1:]
delta = len(conv['original']) - conv['length']
offset += delta
# Check each metamap entry and update its character spans to reflect this change
# > I'm so sorry it looks like this, but because of the way we convert the xml
# > into a dict, there are some levels in the hierarchy that are usually a list
# > but turn into an object if they only contain one element, so that needs
# > to be checked for at every step or it crashes the whole program
if type(metamap_dict['metamap']['MMOs']['MMO']['Utterances']['Utterance']) is not list:
metamap_dict['metamap']['MMOs']['MMO']['Utterances']['Utterance'] = [metamap_dict['metamap']['MMOs']['MMO']['Utterances']['Utterance']]
for utterance in metamap_dict['metamap']['MMOs']['MMO']['Utterances']['Utterance']:
if int(utterance['Phrases']['@Count']) == 0: # Ensure this level contains something
continue
if type(utterance['Phrases']['Phrase']) is not list: # Make sure this entry is a list
utterance['Phrases']['Phrase'] = [utterance['Phrases']['Phrase']]
for phrase in utterance['Phrases']['Phrase']:
if int(phrase['Mappings']['@Count']) == 0: # Ensure this level contains something
continue
if type(phrase['Mappings']['Mapping']) is not list: # Make sure this entry is a list
phrase['Mappings']['Mapping'] = [phrase['Mappings']['Mapping']]
for mapping in phrase['Mappings']['Mapping']:
if int(mapping['MappingCandidates']['@Total']) == 0: # Ensure this level contains something
continue
if type(mapping['MappingCandidates']['Candidate']) is not list: # Make sure this entry is a list
mapping['MappingCandidates']['Candidate'] = [mapping['MappingCandidates']['Candidate']]
# HERE'S THE IMPORTANT PART -----------------------------------------
# Just accept it as iterating through every entry in the metamap_dict
for candidate in mapping['MappingCandidates']['Candidate']:
if int(candidate['ConceptPIs']['@Count']) == 0: # Ensure this level contains something
continue
if type(candidate['ConceptPIs']['ConceptPI']) is not list: # Make sure this entry is a list
candidate['ConceptPIs']['ConceptPI'] = [candidate['ConceptPIs']['ConceptPI']]
candidate['MatchedWords']['MatchedWord'] = []
for conceptpi in candidate['ConceptPIs']['ConceptPI']:
match_start = int(conceptpi['StartPos'])
match_length = int(conceptpi['Length'])
match_end = match_start + match_length-1
if match_start == conv_start and match_end == conv_end: # If match is equal to conversion (a [conversion] and some text)
# print("Perfect match")
match_length += delta
elif match_start < conv_start and match_end < conv_end: # If match intersects conversion on left ([a con]version and some text)
# print("Left intersect")
match_length += delta + conv_start
elif conv_start < match_start and conv_end < match_end: # If match intersects conversion on right (a conver[sion and som]e text)
# print("Right intersect ")
if conv_end + delta < match_start:
match_start = conv_end + delta + 1
match_length = match_end - conv_end
else:
match_length += delta
elif conv_end < match_start: # If match is totally to the right of the conversion (a conversion and a [match])
# print("Full right")
match_start += delta
else: # If match is totally to right of conversion, no action needed (a [match] and a conversion)
# print("Full left")
pass
# Update metamap entry with new indices
candidate['MatchedWords']['MatchedWord'].append(text[match_start:match_end+1])
conceptpi['StartPos'] = str(match_start)
conceptpi['Length'] = str(match_length)
return text, metamap_dict
def metamap_dataset(self, dataset, n_jobs=multiprocessing.cpu_count() - 1, retry_possible_corruptions=True):
"""
Metamaps the files registered by a Dataset. Attempts to Metamap utilizing a max prune depth of 30, but on
failure retries with lower max prune depth. A lower prune depth roughly equates to decreased MetaMap performance.
More information can be found in the MetaMap documentation.
:param dataset: the Dataset to MetaMap.
:param n_jobs: the number of processes to spawn when metamapping. Defaults to one less core than available on your machine.
:param retry_possible_corruptions: Re-Metamap's files that are detected as being possibly corrupt. Set to False for more control over what gets Metamapped or if you are having bugs with Metamapping. (default: True)
:return: None
"""
if dataset.is_metamapped():
logging.info(f"The following Dataset has already been metamapped: {repr(dataset)}")
return
mm_dir = dataset.data_directory / "metamapped"
# Make MetaMap directory if it doesn't exist.
if not os.path.isdir(mm_dir):
os.makedirs(mm_dir)
dataset.metamapped_files_directory = mm_dir
# A file that is below 200 bytes is likely corrupted output from MetaMap, these should be retried.
if retry_possible_corruptions:
# Do not metamap files that are already metamapped and above 200 bytes in size
already_metamapped = [file[:file.find('.')] for file in os.listdir(mm_dir)
if os.path.getsize(os.path.join(mm_dir, file)) > 200]
else:
# Do not metamap files that are already metamapped
already_metamapped = [file[:file.find('.')] for file in os.listdir(mm_dir)]
files_to_metamap = [data_file for data_file in dataset if data_file.file_name not in already_metamapped]
logging.info(f"Number of files to MetaMap: {len(files_to_metamap)}")
Parallel(n_jobs=n_jobs)(delayed(self._parallel_metamap)(file, mm_dir) for file in files_to_metamap)
if not dataset.is_metamapped():
raise RuntimeError(f"MetaMapping {dataset} was unsuccessful")
for data_file in dataset:
data_file.metamapped_path = os.path.join(
mm_dir,
data_file.file_name + ".metamapped"
)
def _parallel_metamap(self, data_file, mm_dir):
"""
Facilitates metamapping in parallel by forking off processes to MetaMap each file individually.
:param data_file: a DataFile to metamap
:return: None
"""
file_name = data_file.file_name
file_path = data_file.txt_path
logging.info("Attempting to Metamap: %s", file_path)
mapped_file_location = os.path.join(mm_dir, file_name + ".metamapped")
with open(mapped_file_location, 'w') as mapped_file:
max_prune_depth = 30 # this is the maximum prune depth metamap utilizes when concept mapping
metamap_dict = None
# while current prune depth causes out of memory on document
while metamap_dict is None or metamap_dict['metamap'] is None:
if max_prune_depth <= 0:
logging.critical("Failed to to metamap after multiple attempts: %s", file_path)
return
try:
metamap_dict = self.map_file(file_path, max_prune_depth=max_prune_depth) # attempt to metamap
if metamap_dict['metamap'] is not None: # if successful
break
# Decrease prune depth by an order of magnitude
max_prune_depth = int(math.e ** (math.log(max_prune_depth) - .5))
except BaseException as e:
metamap_dict = None
# Decrease prune depth by an order of magnitude
max_prune_depth = int(math.e ** (math.log(max_prune_depth) - .5))
logging.warning(f"Error Metamapping: {file_path} after raising {type(e).__name__}: {str(e)}")
mapped_file.write(json.dumps(metamap_dict))
logging.info("Successfully Metamapped: %s", file_path)