|
a |
|
b/src/main.py |
|
|
1 |
# main.py |
|
|
2 |
""" |
|
|
3 |
Clinical Trial Processing Application |
|
|
4 |
|
|
|
5 |
This module serves as the entry point for the clinical trial processing application. |
|
|
6 |
It provides a command-line interface for selecting and processing clinical trials. |
|
|
7 |
|
|
|
8 |
The application can: |
|
|
9 |
1. Process trials specified manually by NCT ID |
|
|
10 |
2. Process all cancer trials from the CHIA dataset |
|
|
11 |
3. Exit the application |
|
|
12 |
|
|
|
13 |
Each trial is processed to extract and structure its eligibility criteria. |
|
|
14 |
""" |
|
|
15 |
import logging |
|
|
16 |
import os |
|
|
17 |
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
18 |
from typing import Generator |
|
|
19 |
|
|
|
20 |
from src.services.trial_manager import process_trial |
|
|
21 |
from src.utils.config import DEFAULT_OUTPUT_DIR, setup_logging |
|
|
22 |
from src.utils.helpers import curl_with_status_check |
|
|
23 |
|
|
|
24 |
# Configure application logging |
|
|
25 |
setup_logging(log_to_file=True, log_level=logging.DEBUG) |
|
|
26 |
|
|
|
27 |
|
|
|
28 |
def getChiaCancerTrials() -> list[str]: |
|
|
29 |
""" |
|
|
30 |
Retrieve a list of all cancer trial NCT IDs from the CHIA dataset. |
|
|
31 |
|
|
|
32 |
Returns: |
|
|
33 |
list[str]: List of NCT IDs for cancer trials. |
|
|
34 |
""" |
|
|
35 |
onlyCancerFolder = os.path.join("..", "Trials", "CHIA", "OnlyCancerTrials") |
|
|
36 |
return [ |
|
|
37 |
file.split(".")[0] |
|
|
38 |
for file in os.listdir(onlyCancerFolder) |
|
|
39 |
if file.endswith(".json") |
|
|
40 |
] |
|
|
41 |
|
|
|
42 |
|
|
|
43 |
def getTrialsFromUser() -> list[str]: |
|
|
44 |
""" |
|
|
45 |
Prompt the user to enter trial NCT IDs manually. |
|
|
46 |
|
|
|
47 |
Returns: |
|
|
48 |
list[str]: List of user-specified NCT IDs. |
|
|
49 |
""" |
|
|
50 |
trials = [] |
|
|
51 |
while nct_id := input( |
|
|
52 |
"Enter the NCT ID of the trial you want to process (or press Enter to finish): " |
|
|
53 |
).strip(): |
|
|
54 |
trials.append(nct_id) |
|
|
55 |
return trials |
|
|
56 |
|
|
|
57 |
|
|
|
58 |
def get_all_nct_ids_from_folder(folder_path: str) -> list[str]: |
|
|
59 |
""" |
|
|
60 |
Retrieve all NCT IDs from the first 11 characters of JSON file names in the specified folder. |
|
|
61 |
|
|
|
62 |
Args: |
|
|
63 |
folder_path (str): Path to the folder containing JSON files. |
|
|
64 |
|
|
|
65 |
Returns: |
|
|
66 |
list[str]: List of NCT IDs. |
|
|
67 |
""" |
|
|
68 |
return [ |
|
|
69 |
file_name[:11] |
|
|
70 |
for file_name in os.listdir(folder_path) |
|
|
71 |
if file_name.endswith(".json") |
|
|
72 |
] |
|
|
73 |
|
|
|
74 |
|
|
|
75 |
def getAllCancerTrials(n: int) -> Generator[str, None, None]: |
|
|
76 |
""" |
|
|
77 |
Retrieve a generator of all cancer trial NCT IDs available in clinicaltrials.gov. |
|
|
78 |
|
|
|
79 |
Yields: |
|
|
80 |
str: NCT ID for a cancer trial. |
|
|
81 |
""" |
|
|
82 |
url = f"https://clinicaltrials.gov/api/v2/studies?query.cond=cancer&query.term=cancer&query.titles=Cancer&fields=NCTId&pageSize={n}" |
|
|
83 |
|
|
|
84 |
folder_path = os.path.join(DEFAULT_OUTPUT_DIR, "allTrials", "logical") |
|
|
85 |
nct_ids = get_all_nct_ids_from_folder(folder_path) |
|
|
86 |
response = curl_with_status_check(url) |
|
|
87 |
studies = response.get("studies", []) |
|
|
88 |
nextToken = response.get("nextPageToken", "") |
|
|
89 |
|
|
|
90 |
while True: |
|
|
91 |
for study in [ |
|
|
92 |
s |
|
|
93 |
for s in studies |
|
|
94 |
if s["protocolSection"]["identificationModule"]["nctId"] not in nct_ids |
|
|
95 |
]: |
|
|
96 |
yield study["protocolSection"]["identificationModule"]["nctId"] |
|
|
97 |
|
|
|
98 |
if not nextToken: |
|
|
99 |
break |
|
|
100 |
|
|
|
101 |
next_url = f"{url}&pageToken={nextToken}" |
|
|
102 |
response = curl_with_status_check(next_url) |
|
|
103 |
studies = response.get("studies", []) |
|
|
104 |
nextToken = response.get("nextPageToken", "") |
|
|
105 |
|
|
|
106 |
|
|
|
107 |
def get_trials(n: int = 100) -> list[str] | Generator[str] | None: |
|
|
108 |
""" |
|
|
109 |
Present options to the user for selecting trials to process. |
|
|
110 |
|
|
|
111 |
Returns: |
|
|
112 |
list[str] | None: List of NCT IDs to process, or None if user chooses to quit. |
|
|
113 |
""" |
|
|
114 |
while True: |
|
|
115 |
user_choice = ( |
|
|
116 |
input( |
|
|
117 |
"Please choose one of the following\n'm' for manual input\n'c' to process all cancer trials from CHIA\n'a' for all cancer trials\n'q' to quit: " |
|
|
118 |
) |
|
|
119 |
.strip() |
|
|
120 |
.lower() |
|
|
121 |
) |
|
|
122 |
|
|
|
123 |
if user_choice == "m": |
|
|
124 |
return getTrialsFromUser() |
|
|
125 |
elif user_choice == "c": |
|
|
126 |
return getChiaCancerTrials() |
|
|
127 |
elif user_choice == "a": |
|
|
128 |
return getAllCancerTrials(n) |
|
|
129 |
elif user_choice == "q": |
|
|
130 |
return None |
|
|
131 |
else: |
|
|
132 |
print("Invalid choice. Please try again.") |
|
|
133 |
|
|
|
134 |
|
|
|
135 |
def process_trial_wrapper(nct_id: str): |
|
|
136 |
""" |
|
|
137 |
Wrapper function to process a trial and handle exceptions. |
|
|
138 |
|
|
|
139 |
Args: |
|
|
140 |
nct_id (str): NCT ID of the trial to process. |
|
|
141 |
""" |
|
|
142 |
logger = logging.getLogger(__name__) |
|
|
143 |
try: |
|
|
144 |
process_trial(nct_id, os.path.join(DEFAULT_OUTPUT_DIR, "allTrials")) |
|
|
145 |
logger.info(f"Successfully processed trial {nct_id}") |
|
|
146 |
except Exception as e: |
|
|
147 |
logger.error(f"Failed to process trial {nct_id}: {str(e)}") |
|
|
148 |
|
|
|
149 |
|
|
|
150 |
def main(): |
|
|
151 |
""" |
|
|
152 |
Main application function. Handles user interaction and processes selected trials. |
|
|
153 |
""" |
|
|
154 |
logger = logging.getLogger(__name__) |
|
|
155 |
logger.info("Application started...") |
|
|
156 |
parallelMultiplier = 100 |
|
|
157 |
|
|
|
158 |
trials = get_trials(parallelMultiplier) |
|
|
159 |
if not trials: |
|
|
160 |
logger.info("No trials selected, exiting...") |
|
|
161 |
return |
|
|
162 |
|
|
|
163 |
if isinstance(trials, list): |
|
|
164 |
logger.info(f"Selected {len(trials)} trials for processing") |
|
|
165 |
logger.info("These are the trials selected: %s", trials) |
|
|
166 |
parallelMultiplier = min(parallelMultiplier, len(trials)) |
|
|
167 |
|
|
|
168 |
# Process each trial in parallel using ThreadPoolExecutor |
|
|
169 |
with ThreadPoolExecutor(max_workers=parallelMultiplier) as executor: |
|
|
170 |
future_to_nct_id = { |
|
|
171 |
executor.submit(process_trial_wrapper, nct_id): nct_id for nct_id in trials |
|
|
172 |
} |
|
|
173 |
for future in as_completed(future_to_nct_id): |
|
|
174 |
nct_id = future_to_nct_id[future] |
|
|
175 |
try: |
|
|
176 |
future.result() |
|
|
177 |
except Exception as e: |
|
|
178 |
logger.error(f"Error processing trial {nct_id}: {str(e)}") |
|
|
179 |
|
|
|
180 |
logger.info("Processing complete") |
|
|
181 |
|
|
|
182 |
|
|
|
183 |
if __name__ == "__main__": |
|
|
184 |
main() |