--- a +++ b/src/DataLoader/download.py @@ -0,0 +1,192 @@ +import requests +import sys +import xml.etree.ElementTree as ET +import os +import time +import joblib +from tqdm.auto import tqdm +import numpy as np +from tenacity import retry, wait_random_exponential, stop_after_attempt + +# # Open the log file +# log_file = open('../logs/download.log', 'w') +# # Redirect standard output to the log file +# sys.stdout = log_file + +def normalize_whitespace(s): + return ' '.join(s.split()) + + +@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(10)) +def get_cancer_trials_list(max_trials=15000): + base_url = "https://clinicaltrials.gov/api/query/full_studies" + trials_set = set() + page_size = 100 # Number of trials per page + current_rank = 1 + trials_fetched = 0 + while trials_fetched < max_trials: + search_params = { + "expr": "((cancer) OR (neoplasm)) AND ((interventional) OR (treatment)) AND ((mutation) OR (variant))", + "min_rnk": current_rank, + "max_rnk": current_rank + page_size - 1, + "fmt": "json", + "fields": "NCTId" + } + response = requests.get(base_url, params=search_params) + if response.status_code == 200: + trials_data = response.json() + if "FullStudiesResponse" in trials_data: + studies = trials_data["FullStudiesResponse"]["FullStudies"] + if not studies: + break # No more studies found, exit the loop + for study in studies: + trials_set.add(study["Study"]["ProtocolSection"]["IdentificationModule"]["NCTId"]) + trials_fetched += 1 + if trials_fetched == max_trials: + break + current_rank += page_size + else: + print("No trials found matching the criteria.") + break + else: + print("Failed to retrieve data. Status code:", response.status_code) + break + + return list(trials_set) # Convert set to list for output + + +def download_study_info(nct_id): + local_file_path = f"../data/trials_xmls/{nct_id}.xml" + + if os.path.exists(local_file_path): + # Read the content of the existing local XML file + with open(local_file_path, "r") as f: + local_xml_content = f.read() + try: + local_root = ET.fromstring(local_xml_content) + except ET.ParseError as e: + print(f"Error parsing XML for trial {nct_id}: {e}") + os.remove(local_file_path) + # Download the online version of the XML + url = f"https://clinicaltrials.gov/ct2/show/{nct_id}?displayxml=true" + try: + response = requests.get(url) + except requests.exceptions.RequestException as e: + print(f"Error fetching XML for trial {nct_id}: {e}") + + if response.status_code == 200: + online_xml_content = response.text + # Parse the XML content + try: + online_root = ET.fromstring(online_xml_content) + except ET.ParseError as e: + print(f"Error parsing online XML for trial {nct_id}: {e}") + + else: + print(f"Error: received status code {response.status_code} when fetching XML for trial {nct_id}") + + to_check = ["eligibility", "brief_title", "overall_status", "location"] + + local_version = [] + online_version = [] + + for s in to_check: + local_elem = local_root.find(".//%s" % s) + online_elem = online_root.find(".//%s" % s) + + # Check if the element exists in both versions + if local_elem is not None and online_elem is not None: + local_version.append(local_elem) + online_version.append(online_elem) + else: + continue + + is_updated = any([normalize_whitespace(ET.tostring(a, encoding='unicode').strip()) != + normalize_whitespace(ET.tostring(b, encoding='unicode').strip()) + for a, b in zip(local_version, online_version)]) + + if is_updated: + # Update the local XML with the online version + with open(local_file_path, "w") as f: + f.write(ET.tostring(online_root, encoding='unicode')) + print(f"Updated eligibility criteria for {nct_id}") + else: + print(f"No changes in eligibility criteria for {nct_id}.") + else: + # If the local file doesn't exist, download the online version + url = f"https://clinicaltrials.gov/ct2/show/{nct_id}?displayxml=true" + try: + response = requests.get(url) + except requests.exceptions.RequestException as e: + print(f"Error fetching XML for trial {nct_id}: {e}") + + if response.status_code == 200: + try: + root = ET.fromstring(response.text) + with open(local_file_path, "w") as f: + f.write(ET.tostring(root, encoding='unicode')) + print(f"Study information downloaded for {nct_id}") + except ET.ParseError as e: + print(f"Error parsing online XML for trial {nct_id}: {e}") + else: + print(f"Error: received status code {response.status_code} when fetching XML for trial {nct_id}") + return [] + + + +memory = joblib.Memory(".") +def ParallelExecutor(use_bar="tqdm", **joblib_args): + """Utility for tqdm progress bar in joblib.Parallel""" + all_bar_funcs = { + "tqdm": lambda args: lambda x: tqdm(x, **args), + "False": lambda args: iter, + "None": lambda args: iter, + } + def aprun(bar=use_bar, **tq_args): + def tmp(op_iter): + if str(bar) in all_bar_funcs.keys(): + bar_func = all_bar_funcs[str(bar)](tq_args) + else: + raise ValueError("Value %s not supported as bar type" % bar) + + # Pass n_jobs from joblib_args + return joblib.Parallel(n_jobs=joblib_args.get("n_jobs", 10))(bar_func(op_iter)) + + return tmp + return aprun + +def parallel_downloader( + n_jobs, + nct_ids, +): + parallel_runner = ParallelExecutor(n_jobs=n_jobs)(total=len(nct_ids)) + X = parallel_runner( + joblib.delayed(download_study_info)( + nct_id, + ) + for nct_id in nct_ids + ) + updated_cts = np.vstack(X).flatten() + return updated_cts + + +class Downloader: + def __init__(self, id_list, n_jobs): + self.id_list = id_list + self.n_jobs = n_jobs + + def download_and_update_trials(self): + start_time = time.time() + updated_cts = parallel_downloader(self.n_jobs, self.id_list) + end_time = time.time() + elapsed_time = end_time - start_time + print(f"Elapsed time: {elapsed_time} seconds") + return updated_cts + + +if __name__ == "__main__": + id_list = [...] # Replace [...] with your list of IDs + n_jobs = ... # Replace ... with the number of parallel jobs + downloader = Downloader(id_list, n_jobs) + downloader.download_and_update_trials() +