Diff of /src/main.py [000000] .. [96a5a0]

Switch to unified view

a b/src/main.py
1
# main.py
2
"""
3
Clinical Trial Processing Application
4
5
This module serves as the entry point for the clinical trial processing application.
6
It provides a command-line interface for selecting and processing clinical trials.
7
8
The application can:
9
1. Process trials specified manually by NCT ID
10
2. Process all cancer trials from the CHIA dataset
11
3. Exit the application
12
13
Each trial is processed to extract and structure its eligibility criteria.
14
"""
15
import logging
16
import os
17
from concurrent.futures import ThreadPoolExecutor, as_completed
18
from typing import Generator
19
20
from src.services.trial_manager import process_trial
21
from src.utils.config import DEFAULT_OUTPUT_DIR, setup_logging
22
from src.utils.helpers import curl_with_status_check
23
24
# Configure application logging
25
setup_logging(log_to_file=True, log_level=logging.DEBUG)
26
27
28
def getChiaCancerTrials() -> list[str]:
29
    """
30
    Retrieve a list of all cancer trial NCT IDs from the CHIA dataset.
31
32
    Returns:
33
        list[str]: List of NCT IDs for cancer trials.
34
    """
35
    onlyCancerFolder = os.path.join("..", "Trials", "CHIA", "OnlyCancerTrials")
36
    return [
37
        file.split(".")[0]
38
        for file in os.listdir(onlyCancerFolder)
39
        if file.endswith(".json")
40
    ]
41
42
43
def getTrialsFromUser() -> list[str]:
44
    """
45
    Prompt the user to enter trial NCT IDs manually.
46
47
    Returns:
48
        list[str]: List of user-specified NCT IDs.
49
    """
50
    trials = []
51
    while nct_id := input(
52
        "Enter the NCT ID of the trial you want to process (or press Enter to finish): "
53
    ).strip():
54
        trials.append(nct_id)
55
    return trials
56
57
58
def get_all_nct_ids_from_folder(folder_path: str) -> list[str]:
59
    """
60
    Retrieve all NCT IDs from the first 11 characters of JSON file names in the specified folder.
61
62
    Args:
63
        folder_path (str): Path to the folder containing JSON files.
64
65
    Returns:
66
        list[str]: List of NCT IDs.
67
    """
68
    return [
69
        file_name[:11]
70
        for file_name in os.listdir(folder_path)
71
        if file_name.endswith(".json")
72
    ]
73
74
75
def getAllCancerTrials(n: int) -> Generator[str, None, None]:
76
    """
77
    Retrieve a generator of all cancer trial NCT IDs available in clinicaltrials.gov.
78
79
    Yields:
80
        str: NCT ID for a cancer trial.
81
    """
82
    url = f"https://clinicaltrials.gov/api/v2/studies?query.cond=cancer&query.term=cancer&query.titles=Cancer&fields=NCTId&pageSize={n}"
83
84
    folder_path = os.path.join(DEFAULT_OUTPUT_DIR, "allTrials", "logical")
85
    nct_ids = get_all_nct_ids_from_folder(folder_path)
86
    response = curl_with_status_check(url)
87
    studies = response.get("studies", [])
88
    nextToken = response.get("nextPageToken", "")
89
90
    while True:
91
        for study in [
92
            s
93
            for s in studies
94
            if s["protocolSection"]["identificationModule"]["nctId"] not in nct_ids
95
        ]:
96
            yield study["protocolSection"]["identificationModule"]["nctId"]
97
98
        if not nextToken:
99
            break
100
101
        next_url = f"{url}&pageToken={nextToken}"
102
        response = curl_with_status_check(next_url)
103
        studies = response.get("studies", [])
104
        nextToken = response.get("nextPageToken", "")
105
106
107
def get_trials(n: int = 100) -> list[str] | Generator[str] | None:
108
    """
109
    Present options to the user for selecting trials to process.
110
111
    Returns:
112
        list[str] | None: List of NCT IDs to process, or None if user chooses to quit.
113
    """
114
    while True:
115
        user_choice = (
116
            input(
117
                "Please choose one of the following\n'm' for manual input\n'c' to process all cancer trials from CHIA\n'a' for all cancer trials\n'q' to quit: "
118
            )
119
            .strip()
120
            .lower()
121
        )
122
123
        if user_choice == "m":
124
            return getTrialsFromUser()
125
        elif user_choice == "c":
126
            return getChiaCancerTrials()
127
        elif user_choice == "a":
128
            return getAllCancerTrials(n)
129
        elif user_choice == "q":
130
            return None
131
        else:
132
            print("Invalid choice. Please try again.")
133
134
135
def process_trial_wrapper(nct_id: str):
136
    """
137
    Wrapper function to process a trial and handle exceptions.
138
139
    Args:
140
        nct_id (str): NCT ID of the trial to process.
141
    """
142
    logger = logging.getLogger(__name__)
143
    try:
144
        process_trial(nct_id, os.path.join(DEFAULT_OUTPUT_DIR, "allTrials"))
145
        logger.info(f"Successfully processed trial {nct_id}")
146
    except Exception as e:
147
        logger.error(f"Failed to process trial {nct_id}: {str(e)}")
148
149
150
def main():
151
    """
152
    Main application function. Handles user interaction and processes selected trials.
153
    """
154
    logger = logging.getLogger(__name__)
155
    logger.info("Application started...")
156
    parallelMultiplier = 100
157
158
    trials = get_trials(parallelMultiplier)
159
    if not trials:
160
        logger.info("No trials selected, exiting...")
161
        return
162
163
    if isinstance(trials, list):
164
        logger.info(f"Selected {len(trials)} trials for processing")
165
        logger.info("These are the trials selected: %s", trials)
166
        parallelMultiplier = min(parallelMultiplier, len(trials))
167
168
    # Process each trial in parallel using ThreadPoolExecutor
169
    with ThreadPoolExecutor(max_workers=parallelMultiplier) as executor:
170
        future_to_nct_id = {
171
            executor.submit(process_trial_wrapper, nct_id): nct_id for nct_id in trials
172
        }
173
        for future in as_completed(future_to_nct_id):
174
            nct_id = future_to_nct_id[future]
175
            try:
176
                future.result()
177
            except Exception as e:
178
                logger.error(f"Error processing trial {nct_id}: {str(e)}")
179
180
    logger.info("Processing complete")
181
182
183
if __name__ == "__main__":
184
    main()