[96a5a0]: / src / main.py

Download this file

185 lines (147 with data), 5.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# main.py
"""
Clinical Trial Processing Application
This module serves as the entry point for the clinical trial processing application.
It provides a command-line interface for selecting and processing clinical trials.
The application can:
1. Process trials specified manually by NCT ID
2. Process all cancer trials from the CHIA dataset
3. Exit the application
Each trial is processed to extract and structure its eligibility criteria.
"""
import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Generator
from src.services.trial_manager import process_trial
from src.utils.config import DEFAULT_OUTPUT_DIR, setup_logging
from src.utils.helpers import curl_with_status_check
# Configure application logging
setup_logging(log_to_file=True, log_level=logging.DEBUG)
def getChiaCancerTrials() -> list[str]:
"""
Retrieve a list of all cancer trial NCT IDs from the CHIA dataset.
Returns:
list[str]: List of NCT IDs for cancer trials.
"""
onlyCancerFolder = os.path.join("..", "Trials", "CHIA", "OnlyCancerTrials")
return [
file.split(".")[0]
for file in os.listdir(onlyCancerFolder)
if file.endswith(".json")
]
def getTrialsFromUser() -> list[str]:
"""
Prompt the user to enter trial NCT IDs manually.
Returns:
list[str]: List of user-specified NCT IDs.
"""
trials = []
while nct_id := input(
"Enter the NCT ID of the trial you want to process (or press Enter to finish): "
).strip():
trials.append(nct_id)
return trials
def get_all_nct_ids_from_folder(folder_path: str) -> list[str]:
"""
Retrieve all NCT IDs from the first 11 characters of JSON file names in the specified folder.
Args:
folder_path (str): Path to the folder containing JSON files.
Returns:
list[str]: List of NCT IDs.
"""
return [
file_name[:11]
for file_name in os.listdir(folder_path)
if file_name.endswith(".json")
]
def getAllCancerTrials(n: int) -> Generator[str, None, None]:
"""
Retrieve a generator of all cancer trial NCT IDs available in clinicaltrials.gov.
Yields:
str: NCT ID for a cancer trial.
"""
url = f"https://clinicaltrials.gov/api/v2/studies?query.cond=cancer&query.term=cancer&query.titles=Cancer&fields=NCTId&pageSize={n}"
folder_path = os.path.join(DEFAULT_OUTPUT_DIR, "allTrials", "logical")
nct_ids = get_all_nct_ids_from_folder(folder_path)
response = curl_with_status_check(url)
studies = response.get("studies", [])
nextToken = response.get("nextPageToken", "")
while True:
for study in [
s
for s in studies
if s["protocolSection"]["identificationModule"]["nctId"] not in nct_ids
]:
yield study["protocolSection"]["identificationModule"]["nctId"]
if not nextToken:
break
next_url = f"{url}&pageToken={nextToken}"
response = curl_with_status_check(next_url)
studies = response.get("studies", [])
nextToken = response.get("nextPageToken", "")
def get_trials(n: int = 100) -> list[str] | Generator[str] | None:
"""
Present options to the user for selecting trials to process.
Returns:
list[str] | None: List of NCT IDs to process, or None if user chooses to quit.
"""
while True:
user_choice = (
input(
"Please choose one of the following\n'm' for manual input\n'c' to process all cancer trials from CHIA\n'a' for all cancer trials\n'q' to quit: "
)
.strip()
.lower()
)
if user_choice == "m":
return getTrialsFromUser()
elif user_choice == "c":
return getChiaCancerTrials()
elif user_choice == "a":
return getAllCancerTrials(n)
elif user_choice == "q":
return None
else:
print("Invalid choice. Please try again.")
def process_trial_wrapper(nct_id: str):
"""
Wrapper function to process a trial and handle exceptions.
Args:
nct_id (str): NCT ID of the trial to process.
"""
logger = logging.getLogger(__name__)
try:
process_trial(nct_id, os.path.join(DEFAULT_OUTPUT_DIR, "allTrials"))
logger.info(f"Successfully processed trial {nct_id}")
except Exception as e:
logger.error(f"Failed to process trial {nct_id}: {str(e)}")
def main():
"""
Main application function. Handles user interaction and processes selected trials.
"""
logger = logging.getLogger(__name__)
logger.info("Application started...")
parallelMultiplier = 100
trials = get_trials(parallelMultiplier)
if not trials:
logger.info("No trials selected, exiting...")
return
if isinstance(trials, list):
logger.info(f"Selected {len(trials)} trials for processing")
logger.info("These are the trials selected: %s", trials)
parallelMultiplier = min(parallelMultiplier, len(trials))
# Process each trial in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=parallelMultiplier) as executor:
future_to_nct_id = {
executor.submit(process_trial_wrapper, nct_id): nct_id for nct_id in trials
}
for future in as_completed(future_to_nct_id):
nct_id = future_to_nct_id[future]
try:
future.result()
except Exception as e:
logger.error(f"Error processing trial {nct_id}: {str(e)}")
logger.info("Processing complete")
if __name__ == "__main__":
main()