|
a |
|
b/src/services/trial_manager.py |
|
|
1 |
# services/trial_manager.py |
|
|
2 |
""" |
|
|
3 |
Clinical Trial Data Management Service |
|
|
4 |
|
|
|
5 |
This module is responsible for retrieving, processing, and managing clinical trial data. |
|
|
6 |
It handles fetching trial data from ClinicalTrials.gov, identifying and structuring criteria, |
|
|
7 |
and persisting the processed data. |
|
|
8 |
|
|
|
9 |
The trial management process follows these steps: |
|
|
10 |
1. Fetch raw trial data from ClinicalTrials.gov API |
|
|
11 |
2. Extract and normalize inclusion, exclusion, and miscellaneous criteria |
|
|
12 |
3. Process trial data through identification and logical structuring pipelines |
|
|
13 |
4. Store the processed data as JSON files |
|
|
14 |
|
|
|
15 |
Functions: |
|
|
16 |
get_extra_criteria: Extract additional criteria from the eligibility module. |
|
|
17 |
convert_std_ages_to_numerical_ages: Convert standardized age groups to numerical ranges. |
|
|
18 |
get_trial_data: Retrieve raw trial data from ClinicalTrials.gov API. |
|
|
19 |
remove_pesky_slash: Clean backslashes from text content. |
|
|
20 |
process_trial: Process a trial through the entire pipeline. |
|
|
21 |
""" |
|
|
22 |
|
|
|
23 |
import logging |
|
|
24 |
import os |
|
|
25 |
import re |
|
|
26 |
|
|
|
27 |
import rich |
|
|
28 |
|
|
|
29 |
from src.models.identified_criteria import IdentifiedTrial, RawTrialData |
|
|
30 |
from src.models.logical_criteria import LogicalTrial |
|
|
31 |
from src.repositories.trial_repository import export_pydantic_to_json |
|
|
32 |
from src.services.identifier import identify_criterions_from_rawTrial |
|
|
33 |
from src.services.logical_structurizer import logically_structurize_trial |
|
|
34 |
from src.utils.config import DEFAULT_OUTPUT_DIR |
|
|
35 |
from src.utils.helpers import curl_with_status_check |
|
|
36 |
|
|
|
37 |
# Configure logging |
|
|
38 |
logger = logging.getLogger(__name__) |
|
|
39 |
|
|
|
40 |
|
|
|
41 |
def remove_pesky_slash(text: str) -> str: |
|
|
42 |
""" |
|
|
43 |
Remove backslashes from text to normalize the content. |
|
|
44 |
|
|
|
45 |
Args: |
|
|
46 |
text (str): The text to clean. |
|
|
47 |
|
|
|
48 |
Returns: |
|
|
49 |
str: The cleaned text without backslashes. |
|
|
50 |
""" |
|
|
51 |
return re.sub(r"\\", "", text) |
|
|
52 |
|
|
|
53 |
|
|
|
54 |
def get_extra_criteria(eligibility_module: dict) -> list[str]: |
|
|
55 |
""" |
|
|
56 |
Process the eligibility module to extract additional criteria. |
|
|
57 |
|
|
|
58 |
Args: |
|
|
59 |
eligibility_module (dict): The eligibility module dictionary from ClinicalTrials.gov. |
|
|
60 |
|
|
|
61 |
Returns: |
|
|
62 |
list[str]: A list of additional criteria extracted from the eligibility module. |
|
|
63 |
""" |
|
|
64 |
criteria: list[str] = [] |
|
|
65 |
|
|
|
66 |
# Extract key eligibility fields |
|
|
67 |
healthy_volunteers = eligibility_module.get("healthyVolunteers") |
|
|
68 |
sex = eligibility_module.get("sex") |
|
|
69 |
minimum_age = eligibility_module.get("minimumAge") |
|
|
70 |
maximum_age = eligibility_module.get("maximumAge") |
|
|
71 |
std_ages = eligibility_module.get("stdAges") |
|
|
72 |
|
|
|
73 |
# Process healthy volunteers information |
|
|
74 |
if healthy_volunteers is not None: |
|
|
75 |
if healthy_volunteers == "false": |
|
|
76 |
criteria.append("No healthy volunteers allowed") |
|
|
77 |
else: |
|
|
78 |
criteria.append("Healthy volunteers allowed") |
|
|
79 |
|
|
|
80 |
# Process sex requirements |
|
|
81 |
if sex is not None and sex != "ALL": |
|
|
82 |
criteria.append(f"Must be {sex}") |
|
|
83 |
|
|
|
84 |
# Process age requirements |
|
|
85 |
if minimum_age is not None: |
|
|
86 |
criteria.append(f"Must have minimum age of {minimum_age}") |
|
|
87 |
|
|
|
88 |
if maximum_age is not None: |
|
|
89 |
criteria.append(f"Must have maximum age of {maximum_age}") |
|
|
90 |
|
|
|
91 |
# Handle standardized age groups if specific age limits aren't provided |
|
|
92 |
if std_ages and not minimum_age and not maximum_age: |
|
|
93 |
convert_std_ages_to_numerical_ages(std_ages, criteria) |
|
|
94 |
|
|
|
95 |
return criteria |
|
|
96 |
|
|
|
97 |
|
|
|
98 |
def convert_std_ages_to_numerical_ages(std_ages: list, criteria: list[str]) -> None: |
|
|
99 |
""" |
|
|
100 |
Convert standardized age groups to numerical age ranges and add to criteria. |
|
|
101 |
|
|
|
102 |
Args: |
|
|
103 |
std_ages (list): List of standardized age groups from ClinicalTrials.gov. |
|
|
104 |
criteria (list[str]): List to append the converted age criteria to. |
|
|
105 |
""" |
|
|
106 |
min_age = 100 |
|
|
107 |
max_age = 0 |
|
|
108 |
|
|
|
109 |
# Define age group mappings |
|
|
110 |
age_groups = {"CHILD": (0, 17), "ADULT": (18, 64), "OLDER_ADULT": (65, 100)} |
|
|
111 |
|
|
|
112 |
# Determine the minimum and maximum ages based on std_ages |
|
|
113 |
for age_group in std_ages: |
|
|
114 |
if age_group in age_groups: |
|
|
115 |
min_age = min(min_age, age_groups[age_group][0]) |
|
|
116 |
max_age = max(max_age, age_groups[age_group][1]) |
|
|
117 |
|
|
|
118 |
# Add age criteria if they are restrictive |
|
|
119 |
if min_age != 0: |
|
|
120 |
criteria.append(f"Must be {min_age} or older") |
|
|
121 |
if max_age != 100: |
|
|
122 |
criteria.append(f"Must be {max_age} or younger") |
|
|
123 |
|
|
|
124 |
|
|
|
125 |
def get_trial_data(nct_id: str) -> RawTrialData: |
|
|
126 |
""" |
|
|
127 |
Retrieve trial data from ClinicalTrials.gov API and format it for processing. |
|
|
128 |
|
|
|
129 |
Args: |
|
|
130 |
nct_id (str): The NCT ID of the clinical trial. |
|
|
131 |
|
|
|
132 |
Returns: |
|
|
133 |
RawTrialData: Structured raw trial data. |
|
|
134 |
|
|
|
135 |
Raises: |
|
|
136 |
ValueError: If the trial data cannot be retrieved or processed. |
|
|
137 |
""" |
|
|
138 |
logger.info("Fetching trial data for NCT ID: %s", nct_id) |
|
|
139 |
try: |
|
|
140 |
# Request data from ClinicalTrials.gov API |
|
|
141 |
url = f"https://clinicaltrials.gov/api/v2/studies/{nct_id}?fields=NCTId,OfficialTitle,EligibilityModule" |
|
|
142 |
data = curl_with_status_check(url) |
|
|
143 |
|
|
|
144 |
# Extract study data from the response |
|
|
145 |
study = data.get("studies", [{}])[0] |
|
|
146 |
if not study: |
|
|
147 |
study = data.get("protocolSection", None) |
|
|
148 |
if not study: |
|
|
149 |
logger.error("No data found for NCT ID: %s", nct_id) |
|
|
150 |
logger.debug("Response data: %s", data) |
|
|
151 |
raise ValueError(f"No data found for NCT ID: {nct_id}") |
|
|
152 |
|
|
|
153 |
# Extract key fields from the study data |
|
|
154 |
official_title = study.get("identificationModule", {}).get("officialTitle", "") |
|
|
155 |
eligibility_module = study.get("eligibilityModule", {}) |
|
|
156 |
eligibility = remove_pesky_slash( |
|
|
157 |
eligibility_module.get("eligibilityCriteria", "") |
|
|
158 |
) |
|
|
159 |
extra_criteria = "\n".join(get_extra_criteria(eligibility_module)) |
|
|
160 |
|
|
|
161 |
# Split the eligibility text into sections |
|
|
162 |
inclusion_pos = eligibility.find("Inclusion Criteria:") |
|
|
163 |
exclusion_pos = eligibility.find("Exclusion Criteria:") |
|
|
164 |
|
|
|
165 |
# Process the eligibility text based on the presence of section markers |
|
|
166 |
if inclusion_pos != -1 and exclusion_pos != -1: |
|
|
167 |
inclusion_text = eligibility[ |
|
|
168 |
inclusion_pos + len("Inclusion Criteria:") : exclusion_pos |
|
|
169 |
].strip() |
|
|
170 |
exclusion_text = eligibility[ |
|
|
171 |
exclusion_pos + len("Exclusion Criteria:") : |
|
|
172 |
].strip() |
|
|
173 |
miscellaneous_text = eligibility[:inclusion_pos].strip() |
|
|
174 |
|
|
|
175 |
elif inclusion_pos != -1: |
|
|
176 |
inclusion_text = eligibility[ |
|
|
177 |
inclusion_pos + len("Inclusion Criteria:") : |
|
|
178 |
].strip() |
|
|
179 |
exclusion_text = "" |
|
|
180 |
miscellaneous_text = eligibility[:inclusion_pos].strip() |
|
|
181 |
elif exclusion_pos != -1: |
|
|
182 |
inclusion_text = "" |
|
|
183 |
exclusion_text = eligibility[ |
|
|
184 |
exclusion_pos + len("Exclusion Criteria:") : |
|
|
185 |
].strip() |
|
|
186 |
miscellaneous_text = eligibility[:exclusion_pos].strip() |
|
|
187 |
else: |
|
|
188 |
inclusion_text = "" |
|
|
189 |
exclusion_text = "" |
|
|
190 |
miscellaneous_text = eligibility.strip() |
|
|
191 |
|
|
|
192 |
# Add extra criteria to inclusion criteria |
|
|
193 |
inclusion_text = (inclusion_text + "\n" + extra_criteria).strip() |
|
|
194 |
|
|
|
195 |
# Create the raw trial data object |
|
|
196 |
raw_data = RawTrialData( |
|
|
197 |
nct_id=nct_id, |
|
|
198 |
official_title=official_title, |
|
|
199 |
inclusion_criteria=inclusion_text, |
|
|
200 |
exclusion_criteria=exclusion_text, |
|
|
201 |
miscellaneous_criteria=miscellaneous_text, |
|
|
202 |
) |
|
|
203 |
|
|
|
204 |
logger.info("Successfully retrieved trial data.") |
|
|
205 |
logger.debug("Fully raw input: %s", data) |
|
|
206 |
logger.debug("Trial data: %s", raw_data) |
|
|
207 |
return raw_data |
|
|
208 |
|
|
|
209 |
except Exception as e: |
|
|
210 |
logger.error("Error fetching trial data: %s", e) |
|
|
211 |
raise ValueError(f"Error fetching trial data: {e}") from e |
|
|
212 |
|
|
|
213 |
|
|
|
214 |
def process_trial(nct_id: str, folder: str = DEFAULT_OUTPUT_DIR) -> LogicalTrial: |
|
|
215 |
""" |
|
|
216 |
Process a clinical trial through the complete identification and structuring pipeline. |
|
|
217 |
|
|
|
218 |
Args: |
|
|
219 |
nct_id (str): The NCT ID of the clinical trial. |
|
|
220 |
folder (str, optional): The output directory for storing results. Defaults to DEFAULT_OUTPUT_DIR. |
|
|
221 |
|
|
|
222 |
Returns: |
|
|
223 |
LogicalTrial: The trial data with criteria identified and logically structured. |
|
|
224 |
|
|
|
225 |
Raises: |
|
|
226 |
ValueError: If trial data processing fails at any stage. |
|
|
227 |
""" |
|
|
228 |
logger.info("Starting processing for trial NCT ID: %s", nct_id) |
|
|
229 |
|
|
|
230 |
# Fetch raw trial data |
|
|
231 |
raw_data = get_trial_data(nct_id) |
|
|
232 |
if not raw_data: |
|
|
233 |
raise ValueError(f"Failed to fetch trial data for NCT ID: {nct_id}") |
|
|
234 |
|
|
|
235 |
# Identify atomic criteria from the raw trial data |
|
|
236 |
identified_trial: IdentifiedTrial = identify_criterions_from_rawTrial(raw_data) |
|
|
237 |
if logger.level <= logging.DEBUG: |
|
|
238 |
rich.print( |
|
|
239 |
identified_trial |
|
|
240 |
) # Using rich.print for better readability in debug mode |
|
|
241 |
|
|
|
242 |
# Save the identified trial data |
|
|
243 |
export_pydantic_to_json( |
|
|
244 |
identified_trial, |
|
|
245 |
f"{nct_id}_identified.json", |
|
|
246 |
os.path.join(folder, "identified"), |
|
|
247 |
) |
|
|
248 |
|
|
|
249 |
# Structure the identified criteria into logical relationships |
|
|
250 |
logical_trial = logically_structurize_trial(identified_trial) |
|
|
251 |
if logger.level <= logging.DEBUG: |
|
|
252 |
rich.print( |
|
|
253 |
logical_trial |
|
|
254 |
) # Using rich.print for better readability in debug mode |
|
|
255 |
|
|
|
256 |
# Save the logical trial data |
|
|
257 |
export_pydantic_to_json( |
|
|
258 |
logical_trial, f"{nct_id}_logical.json", os.path.join(folder, "logical") |
|
|
259 |
) |
|
|
260 |
|
|
|
261 |
logger.info("Trial processing complete for NCT ID: %s", nct_id) |
|
|
262 |
|
|
|
263 |
return logical_trial |