a b/src/services/trial_manager.py
1
# services/trial_manager.py
2
"""
3
Clinical Trial Data Management Service
4
5
This module is responsible for retrieving, processing, and managing clinical trial data.
6
It handles fetching trial data from ClinicalTrials.gov, identifying and structuring criteria,
7
and persisting the processed data.
8
9
The trial management process follows these steps:
10
1. Fetch raw trial data from ClinicalTrials.gov API
11
2. Extract and normalize inclusion, exclusion, and miscellaneous criteria
12
3. Process trial data through identification and logical structuring pipelines
13
4. Store the processed data as JSON files
14
15
Functions:
16
    get_extra_criteria: Extract additional criteria from the eligibility module.
17
    convert_std_ages_to_numerical_ages: Convert standardized age groups to numerical ranges.
18
    get_trial_data: Retrieve raw trial data from ClinicalTrials.gov API.
19
    remove_pesky_slash: Clean backslashes from text content.
20
    process_trial: Process a trial through the entire pipeline.
21
"""
22
23
import logging
24
import os
25
import re
26
27
import rich
28
29
from src.models.identified_criteria import IdentifiedTrial, RawTrialData
30
from src.models.logical_criteria import LogicalTrial
31
from src.repositories.trial_repository import export_pydantic_to_json
32
from src.services.identifier import identify_criterions_from_rawTrial
33
from src.services.logical_structurizer import logically_structurize_trial
34
from src.utils.config import DEFAULT_OUTPUT_DIR
35
from src.utils.helpers import curl_with_status_check
36
37
# Configure logging
38
logger = logging.getLogger(__name__)
39
40
41
def remove_pesky_slash(text: str) -> str:
42
    """
43
    Remove backslashes from text to normalize the content.
44
45
    Args:
46
        text (str): The text to clean.
47
48
    Returns:
49
        str: The cleaned text without backslashes.
50
    """
51
    return re.sub(r"\\", "", text)
52
53
54
def get_extra_criteria(eligibility_module: dict) -> list[str]:
55
    """
56
    Process the eligibility module to extract additional criteria.
57
58
    Args:
59
        eligibility_module (dict): The eligibility module dictionary from ClinicalTrials.gov.
60
61
    Returns:
62
        list[str]: A list of additional criteria extracted from the eligibility module.
63
    """
64
    criteria: list[str] = []
65
66
    # Extract key eligibility fields
67
    healthy_volunteers = eligibility_module.get("healthyVolunteers")
68
    sex = eligibility_module.get("sex")
69
    minimum_age = eligibility_module.get("minimumAge")
70
    maximum_age = eligibility_module.get("maximumAge")
71
    std_ages = eligibility_module.get("stdAges")
72
73
    # Process healthy volunteers information
74
    if healthy_volunteers is not None:
75
        if healthy_volunteers == "false":
76
            criteria.append("No healthy volunteers allowed")
77
        else:
78
            criteria.append("Healthy volunteers allowed")
79
80
    # Process sex requirements
81
    if sex is not None and sex != "ALL":
82
        criteria.append(f"Must be {sex}")
83
84
    # Process age requirements
85
    if minimum_age is not None:
86
        criteria.append(f"Must have minimum age of {minimum_age}")
87
88
    if maximum_age is not None:
89
        criteria.append(f"Must have maximum age of {maximum_age}")
90
91
    # Handle standardized age groups if specific age limits aren't provided
92
    if std_ages and not minimum_age and not maximum_age:
93
        convert_std_ages_to_numerical_ages(std_ages, criteria)
94
95
    return criteria
96
97
98
def convert_std_ages_to_numerical_ages(std_ages: list, criteria: list[str]) -> None:
99
    """
100
    Convert standardized age groups to numerical age ranges and add to criteria.
101
102
    Args:
103
        std_ages (list): List of standardized age groups from ClinicalTrials.gov.
104
        criteria (list[str]): List to append the converted age criteria to.
105
    """
106
    min_age = 100
107
    max_age = 0
108
109
    # Define age group mappings
110
    age_groups = {"CHILD": (0, 17), "ADULT": (18, 64), "OLDER_ADULT": (65, 100)}
111
112
    # Determine the minimum and maximum ages based on std_ages
113
    for age_group in std_ages:
114
        if age_group in age_groups:
115
            min_age = min(min_age, age_groups[age_group][0])
116
            max_age = max(max_age, age_groups[age_group][1])
117
118
    # Add age criteria if they are restrictive
119
    if min_age != 0:
120
        criteria.append(f"Must be {min_age} or older")
121
    if max_age != 100:
122
        criteria.append(f"Must be {max_age} or younger")
123
124
125
def get_trial_data(nct_id: str) -> RawTrialData:
126
    """
127
    Retrieve trial data from ClinicalTrials.gov API and format it for processing.
128
129
    Args:
130
        nct_id (str): The NCT ID of the clinical trial.
131
132
    Returns:
133
        RawTrialData: Structured raw trial data.
134
135
    Raises:
136
        ValueError: If the trial data cannot be retrieved or processed.
137
    """
138
    logger.info("Fetching trial data for NCT ID: %s", nct_id)
139
    try:
140
        # Request data from ClinicalTrials.gov API
141
        url = f"https://clinicaltrials.gov/api/v2/studies/{nct_id}?fields=NCTId,OfficialTitle,EligibilityModule"
142
        data = curl_with_status_check(url)
143
144
        # Extract study data from the response
145
        study = data.get("studies", [{}])[0]
146
        if not study:
147
            study = data.get("protocolSection", None)
148
            if not study:
149
                logger.error("No data found for NCT ID: %s", nct_id)
150
                logger.debug("Response data: %s", data)
151
                raise ValueError(f"No data found for NCT ID: {nct_id}")
152
153
        # Extract key fields from the study data
154
        official_title = study.get("identificationModule", {}).get("officialTitle", "")
155
        eligibility_module = study.get("eligibilityModule", {})
156
        eligibility = remove_pesky_slash(
157
            eligibility_module.get("eligibilityCriteria", "")
158
        )
159
        extra_criteria = "\n".join(get_extra_criteria(eligibility_module))
160
161
        # Split the eligibility text into sections
162
        inclusion_pos = eligibility.find("Inclusion Criteria:")
163
        exclusion_pos = eligibility.find("Exclusion Criteria:")
164
165
        # Process the eligibility text based on the presence of section markers
166
        if inclusion_pos != -1 and exclusion_pos != -1:
167
            inclusion_text = eligibility[
168
                inclusion_pos + len("Inclusion Criteria:") : exclusion_pos
169
            ].strip()
170
            exclusion_text = eligibility[
171
                exclusion_pos + len("Exclusion Criteria:") :
172
            ].strip()
173
            miscellaneous_text = eligibility[:inclusion_pos].strip()
174
175
        elif inclusion_pos != -1:
176
            inclusion_text = eligibility[
177
                inclusion_pos + len("Inclusion Criteria:") :
178
            ].strip()
179
            exclusion_text = ""
180
            miscellaneous_text = eligibility[:inclusion_pos].strip()
181
        elif exclusion_pos != -1:
182
            inclusion_text = ""
183
            exclusion_text = eligibility[
184
                exclusion_pos + len("Exclusion Criteria:") :
185
            ].strip()
186
            miscellaneous_text = eligibility[:exclusion_pos].strip()
187
        else:
188
            inclusion_text = ""
189
            exclusion_text = ""
190
            miscellaneous_text = eligibility.strip()
191
192
        # Add extra criteria to inclusion criteria
193
        inclusion_text = (inclusion_text + "\n" + extra_criteria).strip()
194
195
        # Create the raw trial data object
196
        raw_data = RawTrialData(
197
            nct_id=nct_id,
198
            official_title=official_title,
199
            inclusion_criteria=inclusion_text,
200
            exclusion_criteria=exclusion_text,
201
            miscellaneous_criteria=miscellaneous_text,
202
        )
203
204
        logger.info("Successfully retrieved trial data.")
205
        logger.debug("Fully raw input: %s", data)
206
        logger.debug("Trial data: %s", raw_data)
207
        return raw_data
208
209
    except Exception as e:
210
        logger.error("Error fetching trial data: %s", e)
211
        raise ValueError(f"Error fetching trial data: {e}") from e
212
213
214
def process_trial(nct_id: str, folder: str = DEFAULT_OUTPUT_DIR) -> LogicalTrial:
215
    """
216
    Process a clinical trial through the complete identification and structuring pipeline.
217
218
    Args:
219
        nct_id (str): The NCT ID of the clinical trial.
220
        folder (str, optional): The output directory for storing results. Defaults to DEFAULT_OUTPUT_DIR.
221
222
    Returns:
223
        LogicalTrial: The trial data with criteria identified and logically structured.
224
225
    Raises:
226
        ValueError: If trial data processing fails at any stage.
227
    """
228
    logger.info("Starting processing for trial NCT ID: %s", nct_id)
229
230
    # Fetch raw trial data
231
    raw_data = get_trial_data(nct_id)
232
    if not raw_data:
233
        raise ValueError(f"Failed to fetch trial data for NCT ID: {nct_id}")
234
235
    # Identify atomic criteria from the raw trial data
236
    identified_trial: IdentifiedTrial = identify_criterions_from_rawTrial(raw_data)
237
    if logger.level <= logging.DEBUG:
238
        rich.print(
239
            identified_trial
240
        )  # Using rich.print for better readability in debug mode
241
242
    # Save the identified trial data
243
    export_pydantic_to_json(
244
        identified_trial,
245
        f"{nct_id}_identified.json",
246
        os.path.join(folder, "identified"),
247
    )
248
249
    # Structure the identified criteria into logical relationships
250
    logical_trial = logically_structurize_trial(identified_trial)
251
    if logger.level <= logging.DEBUG:
252
        rich.print(
253
            logical_trial
254
        )  # Using rich.print for better readability in debug mode
255
256
    # Save the logical trial data
257
    export_pydantic_to_json(
258
        logical_trial, f"{nct_id}_logical.json", os.path.join(folder, "logical")
259
    )
260
261
    logger.info("Trial processing complete for NCT ID: %s", nct_id)
262
263
    return logical_trial