[de07e6]: / src / DataLoader / download.py

Download this file

193 lines (163 with data), 7.1 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import requests
import sys
import xml.etree.ElementTree as ET
import os
import time
import joblib
from tqdm.auto import tqdm
import numpy as np
from tenacity import retry, wait_random_exponential, stop_after_attempt
# # Open the log file
# log_file = open('../logs/download.log', 'w')
# # Redirect standard output to the log file
# sys.stdout = log_file
def normalize_whitespace(s):
return ' '.join(s.split())
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(10))
def get_cancer_trials_list(max_trials=15000):
base_url = "https://clinicaltrials.gov/api/query/full_studies"
trials_set = set()
page_size = 100 # Number of trials per page
current_rank = 1
trials_fetched = 0
while trials_fetched < max_trials:
search_params = {
"expr": "((cancer) OR (neoplasm)) AND ((interventional) OR (treatment)) AND ((mutation) OR (variant))",
"min_rnk": current_rank,
"max_rnk": current_rank + page_size - 1,
"fmt": "json",
"fields": "NCTId"
}
response = requests.get(base_url, params=search_params)
if response.status_code == 200:
trials_data = response.json()
if "FullStudiesResponse" in trials_data:
studies = trials_data["FullStudiesResponse"]["FullStudies"]
if not studies:
break # No more studies found, exit the loop
for study in studies:
trials_set.add(study["Study"]["ProtocolSection"]["IdentificationModule"]["NCTId"])
trials_fetched += 1
if trials_fetched == max_trials:
break
current_rank += page_size
else:
print("No trials found matching the criteria.")
break
else:
print("Failed to retrieve data. Status code:", response.status_code)
break
return list(trials_set) # Convert set to list for output
def download_study_info(nct_id):
local_file_path = f"../data/trials_xmls/{nct_id}.xml"
if os.path.exists(local_file_path):
# Read the content of the existing local XML file
with open(local_file_path, "r") as f:
local_xml_content = f.read()
try:
local_root = ET.fromstring(local_xml_content)
except ET.ParseError as e:
print(f"Error parsing XML for trial {nct_id}: {e}")
os.remove(local_file_path)
# Download the online version of the XML
url = f"https://clinicaltrials.gov/ct2/show/{nct_id}?displayxml=true"
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
print(f"Error fetching XML for trial {nct_id}: {e}")
if response.status_code == 200:
online_xml_content = response.text
# Parse the XML content
try:
online_root = ET.fromstring(online_xml_content)
except ET.ParseError as e:
print(f"Error parsing online XML for trial {nct_id}: {e}")
else:
print(f"Error: received status code {response.status_code} when fetching XML for trial {nct_id}")
to_check = ["eligibility", "brief_title", "overall_status", "location"]
local_version = []
online_version = []
for s in to_check:
local_elem = local_root.find(".//%s" % s)
online_elem = online_root.find(".//%s" % s)
# Check if the element exists in both versions
if local_elem is not None and online_elem is not None:
local_version.append(local_elem)
online_version.append(online_elem)
else:
continue
is_updated = any([normalize_whitespace(ET.tostring(a, encoding='unicode').strip()) !=
normalize_whitespace(ET.tostring(b, encoding='unicode').strip())
for a, b in zip(local_version, online_version)])
if is_updated:
# Update the local XML with the online version
with open(local_file_path, "w") as f:
f.write(ET.tostring(online_root, encoding='unicode'))
print(f"Updated eligibility criteria for {nct_id}")
else:
print(f"No changes in eligibility criteria for {nct_id}.")
else:
# If the local file doesn't exist, download the online version
url = f"https://clinicaltrials.gov/ct2/show/{nct_id}?displayxml=true"
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
print(f"Error fetching XML for trial {nct_id}: {e}")
if response.status_code == 200:
try:
root = ET.fromstring(response.text)
with open(local_file_path, "w") as f:
f.write(ET.tostring(root, encoding='unicode'))
print(f"Study information downloaded for {nct_id}")
except ET.ParseError as e:
print(f"Error parsing online XML for trial {nct_id}: {e}")
else:
print(f"Error: received status code {response.status_code} when fetching XML for trial {nct_id}")
return []
memory = joblib.Memory(".")
def ParallelExecutor(use_bar="tqdm", **joblib_args):
"""Utility for tqdm progress bar in joblib.Parallel"""
all_bar_funcs = {
"tqdm": lambda args: lambda x: tqdm(x, **args),
"False": lambda args: iter,
"None": lambda args: iter,
}
def aprun(bar=use_bar, **tq_args):
def tmp(op_iter):
if str(bar) in all_bar_funcs.keys():
bar_func = all_bar_funcs[str(bar)](tq_args)
else:
raise ValueError("Value %s not supported as bar type" % bar)
# Pass n_jobs from joblib_args
return joblib.Parallel(n_jobs=joblib_args.get("n_jobs", 10))(bar_func(op_iter))
return tmp
return aprun
def parallel_downloader(
n_jobs,
nct_ids,
):
parallel_runner = ParallelExecutor(n_jobs=n_jobs)(total=len(nct_ids))
X = parallel_runner(
joblib.delayed(download_study_info)(
nct_id,
)
for nct_id in nct_ids
)
updated_cts = np.vstack(X).flatten()
return updated_cts
class Downloader:
def __init__(self, id_list, n_jobs):
self.id_list = id_list
self.n_jobs = n_jobs
def download_and_update_trials(self):
start_time = time.time()
updated_cts = parallel_downloader(self.n_jobs, self.id_list)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")
return updated_cts
if __name__ == "__main__":
id_list = [...] # Replace [...] with your list of IDs
n_jobs = ... # Replace ... with the number of parallel jobs
downloader = Downloader(id_list, n_jobs)
downloader.download_and_update_trials()