[8c54ae]: / src / ai / preprocess.py

Download this file

263 lines (209 with data), 11.0 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import os
import json
import xml.etree.ElementTree as ET
from datetime import datetime
import requests
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from dotenv import load_dotenv
# Namespace for the XML
ns = {'hl7': 'urn:hl7-org:v3'}
# Sections of interest in the XML data
sections = ['Allergies and Adverse Reactions', 'Medications', 'Diagnostic Results', 'Problems', 'Surgeries', 'Vital Signs', 'Immunizations']
def calculate_age(birth_time_str):
"""Calculate age based on birth time string.
Args:
birth_time_str (str): The birth time in the format 'YYYYMMDDHHMMSS'.
Returns:
int: The calculated age.
"""
birth_time = datetime.strptime(birth_time_str, '%Y%m%d%H%M%S')
today = datetime.today()
return today.year - birth_time.year - ((today.month, today.day) < (birth_time.month, birth_time.day))
def calculate_duration(start_date_str, stop_date_str):
"""Calculate the number of days between start and stop dates (inclusive).
Args:
start_date_str (str): The start date in ISO format.
stop_date_str (str): The stop date in ISO format.
Returns:
str: Duration in days as a string, or None if dates are invalid.
"""
if start_date_str and stop_date_str:
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%dT%H:%M:%SZ')
stop_date = datetime.strptime(stop_date_str, '%Y-%m-%dT%H:%M:%SZ')
duration = (stop_date - start_date).days + 1 # Inclusive of both start and stop dates
return f"{duration} days" # Return the duration as a string
except ValueError:
return "Invalid date format"
return None
def calculate_last_usage(stop_date_str):
"""Calculate the number of days since the last usage or indicate currently used.
Args:
stop_date_str (str): The stop date in ISO format.
Returns:
str: Description of last usage or indication of current use.
"""
if stop_date_str:
try:
stop_date = datetime.strptime(stop_date_str, '%Y-%m-%dT%H:%M:%SZ')
today = datetime.today()
days_since_last_use = (today - stop_date).days
return f"{days_since_last_use} days ago"
except ValueError:
return "Invalid date"
return "Currently used"
def extract_patient_details(root):
"""Extract basic patient details from the XML.
Args:
root (ElementTree): The root of the XML tree.
Returns:
dict: A dictionary containing patient details.
"""
patient_data = {}
record_target = root.find('hl7:recordTarget/hl7:patientRole', ns)
if record_target is not None:
# Patient ID
patient_id = record_target.find('hl7:id', ns)
if patient_id is not None:
patient_data['Patient ID'] = patient_id.get('extension')
# Given Name
given_name = record_target.find('.//hl7:name/hl7:given', ns).text if record_target.find('.//hl7:name/hl7:given', ns) is not None else None
patient_data['Given Name'] = given_name
# Gender
gender_code = record_target.find('hl7:patient/hl7:administrativeGenderCode', ns)
if gender_code is not None:
patient_data['Gender'] = gender_code.get('code')
# Birth Time and Age Calculation
birth_time = record_target.find('hl7:patient/hl7:birthTime', ns)
if birth_time is not None:
birth_time_value = birth_time.get('value')
patient_data['Birth Time'] = birth_time_value
patient_data['Age'] = calculate_age(birth_time_value)
# Race
race_code = record_target.find('hl7:patient/hl7:raceCode', ns)
if race_code is not None:
patient_data['Race'] = race_code.get('displayName')
# Ethnic Group
ethnic_group = record_target.find('hl7:patient/hl7:ethnicGroupCode', ns)
if ethnic_group is not None:
patient_data['Ethnic Group'] = ethnic_group.get('displayName')
# Extract languageCode code
language_code = record_target.find('hl7:patient/hl7:languageCommunication/hl7:languageCode', ns)
if language_code is not None:
patient_data['Language'] = language_code.get('code')
return patient_data
def extract_section_data(section_title, section, patient_data):
"""Extracts information from a given section and appends it to the patient_data dictionary.
Args:
section_title (str): The title of the section being extracted.
section (ElementTree): The XML section element to extract data from.
patient_data (dict): The patient data dictionary to update with the extracted information.
"""
print(f"Extracting Section: {section_title}")
# Use section_title as the data_key for patient_data
data_key = section_title
# Initialize the list for the specified data_key if it doesn't exist
if data_key not in patient_data:
patient_data[data_key] = []
# Extract the rows from the section (assuming table structure)
rows = section.findall('.//hl7:tbody/hl7:tr', ns)
for row in rows:
# Handle the "Medications" section specifically
if section_title == 'Medications':
start_date = row.find('hl7:td[1]', ns).text if row.find('hl7:td[1]', ns) is not None else None
stop_date = row.find('hl7:td[2]', ns).text if row.find('hl7:td[2]', ns) is not None else None
description = row.find('hl7:td[3]', ns).text if row.find('hl7:td[3]', ns) is not None else None
info = {
'Start': start_date,
'Stop': stop_date,
'Description': description,
'Duration of Usage': calculate_duration(start_date, stop_date),
'Last Usage': calculate_last_usage(stop_date)
}
patient_data[data_key].append(info)
# Handle the "Vital Signs" section specifically
elif section_title == 'Vital Signs':
info = {
'Start': row.find('hl7:td[1]', ns).text if row.find('hl7:td[1]', ns) is not None else None,
'Stop': row.find('hl7:td[2]', ns).text if row.find('hl7:td[2]', ns) is not None else None,
'Description': row.find('hl7:td[3]', ns).text if row.find('hl7:td[3]', ns) is not None else None,
'Value': row.find('hl7:td[5]', ns).text if row.find('hl7:td[5]', ns) is not None else None
}
patient_data[data_key].append(info)
# Handle other sections
else:
# For other sections, only extract Start, Stop, and Description
start_date = row.find('hl7:td[1]', ns).text if row.find('hl7:td[1]', ns) is not None else None
stop_date = row.find('hl7:td[2]', ns).text if row.find('hl7:td[2]', ns) is not None else None
description = row.find('hl7:td[3]', ns).text if row.find('hl7:td[3]', ns) is not None else None
info = {
'Start': start_date,
'Stop': stop_date,
'Description': description,
'Duration': calculate_duration(start_date, stop_date),
'Last': calculate_last_usage(stop_date)
}
patient_data[data_key].append(info)
def extract_all_sections(root, patient_data):
"""Extract all relevant sections (Allergies, Medications, etc.) from the XML.
Args:
root (ElementTree): The root of the XML tree.
patient_data (dict): The patient data dictionary to update with extracted sections.
"""
structured_body = root.find('.//hl7:structuredBody', ns)
if structured_body is not None:
for component in structured_body.findall('hl7:component', ns):
section = component.find('hl7:section', ns)
if section is not None:
title = section.find('hl7:title', ns)
if title is not None:
section_title = title.text.strip() # Get the section title and strip whitespace
# If the section title matches one of the sections we're interested in
for section_name in sections:
if section_name in section_title:
# Extract the section data
extract_section_data(section_title, section, patient_data)
def process_xml_files(xml_directory, output_directory):
"""Process multiple XML files in the specified directory.
Args:
xml_directory (str): The directory containing XML files to process.
output_directory (str): The directory where the processed JSON files will be saved.
This function reads all XML files from the specified xml_directory, extracts patient details and relevant data sections,
and saves the results as JSON files in the specified output_directory. Each JSON file is named based on the patient's ID
(extension) extracted from the XML data.
Returns:
None: This function does not return any value. It writes output directly to the file system.
Raises:
FileNotFoundError: If the xml_directory does not exist or cannot be accessed.
ET.ParseError: If any XML file is not well-formed or cannot be parsed.
"""
for file_name in os.listdir(xml_directory):
if file_name.endswith('.xml'):
xml_file_path = os.path.join(xml_directory, file_name)
tree = ET.parse(xml_file_path)
root = tree.getroot()
# Initialize patient_data dictionary for each XML file
patient_data = {}
# Extract basic patient details
patient_data.update(extract_patient_details(root))
# Extract additional sections (Allergies, Medications, etc.)
extract_all_sections(root, patient_data)
# Create JSON output file name based on patient ID (extension)
patient_id = patient_data.get('Patient ID', 'unknown')
output_file = f"{patient_id}_data.json"
output_file_path = os.path.join(output_directory, output_file)
# Write patient data to JSON
with open(output_file_path, 'w') as json_file:
json.dump(patient_data, json_file, indent=4)
print(f"Processed {xml_file_path} and saved to {output_file_path}")
# This block runs only if this script is executed directly
if __name__ == "__main__":
# Define the directories
xml_directory = '/Users/bharathbeeravelly/Desktop/patient-trials-matching/data/raw/patients_ehr'
output_directory = '/Users/bharathbeeravelly/Desktop/patient-trials-matching/data/processed/patients_json'
# Ensure the output directory exists
os.makedirs(output_directory, exist_ok=True)
# Process the XML files
process_xml_files(xml_directory, output_directory)