--- a +++ b/data/data_preparation.py @@ -0,0 +1,237 @@ +import os +import joblib +from pathlib import Path +import pandas as pd +import numpy as np +from typing import Tuple, List, Dict +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder +from src.preprocessing.preprocessing import create_ordered_medical_pipeline +from src.features.tfidf_features import MedicalTextFeatureExtractor +from src.features.word_embeddings import MedicalWordEmbeddings +from src.features.entity_recognition import MedicalEntityRecognizer +from src.features.text_statistics import TextStatisticsExtractor +from src.utils.logger import get_logger + +logger = get_logger(__name__) + + +class DataPreparator: + """Prepare data for model development""" + + def __init__(self, + test_size: float = 0.15, + val_size: float = 0.15, + random_state: int = 42): + self.test_size = test_size + self.val_size = val_size + self.random_state = random_state + self.logger = get_logger(self.__class__.__name__) + + # Initialize feature extractors + self.feature_extractors = { + 'tfidf': MedicalTextFeatureExtractor(), + 'embeddings': MedicalWordEmbeddings(model_type='fasttext'), + 'entities': MedicalEntityRecognizer(), + 'statistics': TextStatisticsExtractor() + } + + def load_data(self, file_path: str) -> pd.DataFrame: + """Load and validate the dataset""" + try: + df = pd.read_csv(file_path) + self.logger.info(f"Loaded dataset with shape: {df.shape}") + + # Validate required columns + required_columns = ['description', 'label'] + if not all(col in df.columns for col in required_columns): + raise ValueError(f"Missing required columns: {required_columns}") + + return df + except Exception as e: + self.logger.error(f"Error loading data: {str(e)}") + raise + + def split_data(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """Split data into train, validation, and test sets""" + # First split: separate test set + train_val, test = train_test_split( + df, + test_size=self.test_size, + stratify=df['label'], + random_state=self.random_state + ) + + # Second split: separate validation set + val_size_adjusted = self.val_size / (1 - self.test_size) + train, val = train_test_split( + train_val, + test_size=val_size_adjusted, + stratify=train_val['label'], + random_state=self.random_state + ) + + self.logger.info(f"Data split sizes - Train: {len(train)}, Val: {len(val)}, Test: {len(test)}") + return train, val, test + + def extract_features(self, texts: List[str], feature_types: List[str]) -> np.ndarray: + """Extract and combine features""" + all_features = [] + + for feature_type in feature_types: + if feature_type not in self.feature_extractors: + raise ValueError(f"Unknown feature type: {feature_type}") + + extractor = self.feature_extractors[feature_type] + + if feature_type == 'tfidf': + # Check if the TF-IDF vectorizer is already fitted. If not, fit it. + if not hasattr(extractor.vectorizer, 'vocabulary_'): + features, _ = extractor.fit_transform(texts) + else: + features, _ = extractor.transform(texts) + elif feature_type == 'embeddings': + features = np.vstack([ + extractor.get_document_embedding(text, method='weighted') + for text in texts + ]) + elif feature_type == 'entities': + features = np.vstack([ + list(extractor.get_entity_features(text).values()) + for text in texts + ]) + else: # statistics + features = np.vstack([ + extractor.get_feature_vector(text) + for text in texts + ]) + + all_features.append(features) + + return np.hstack(all_features) + + def _prepare_data_internal(self, file_path: str, feature_types: List[str]) -> Dict: + """Internal method to load, split, and prepare data.""" + # Load data + df = self.load_data(file_path) + + # Split data + train_df, val_df, test_df = self.split_data(df) + + # Extract features + train_features = self.extract_features(train_df['description'].tolist(), feature_types) + val_features = self.extract_features(val_df['description'].tolist(), feature_types) + test_features = self.extract_features(test_df['description'].tolist(), feature_types) + + # Prepare labels + label_encoder = LabelEncoder() + train_labels = label_encoder.fit_transform(train_df['label']) + val_labels = label_encoder.transform(val_df['label']) + test_labels = label_encoder.transform(test_df['label']) + + prepared_data = { + 'train': (train_features, train_labels), + 'val': (val_features, val_labels), + 'test': (test_features, test_labels), + 'label_encoder': label_encoder, + 'feature_types': feature_types + } + + self.logger.info("Data preparation completed successfully") + return prepared_data + + def save_prepared_data(self, prepared_data: Dict, output_dir: str): + """Save prepared data to files""" + # Create output directory if it doesn't exist + Path(output_dir).mkdir(parents=True, exist_ok=True) + + # Save features and labels for each split + for split_name in ['train', 'val', 'test']: + features, labels = prepared_data[split_name] + + # Save features + np.save( + os.path.join(output_dir, f'{split_name}_features.npy'), + features + ) + + # Save labels + np.save( + os.path.join(output_dir, f'{split_name}_labels.npy'), + labels + ) + + # Save metadata + metadata = { + 'label_encoder': prepared_data['label_encoder'], + 'feature_types': prepared_data['feature_types'] + } + joblib.dump( + metadata, + os.path.join(output_dir, 'metadata.joblib') + ) + + self.logger.info(f"Saved prepared data to {output_dir}") + + def load_prepared_data(self, input_dir: str) -> Dict: + """Load prepared data from files""" + prepared_data = {} + + # Load features and labels for each split + for split_name in ['train', 'val', 'test']: + features = np.load( + os.path.join(input_dir, f'{split_name}_features.npy') + ) + labels = np.load( + os.path.join(input_dir, f'{split_name}_labels.npy') + ) + prepared_data[split_name] = (features, labels) + + # Load metadata + metadata = joblib.load( + os.path.join(input_dir, 'metadata.joblib') + ) + prepared_data.update(metadata) + + self.logger.info(f"Loaded prepared data from {input_dir}") + return prepared_data + + def prepare_data(self, + file_path: str, + output_dir: str, + feature_types: List[str] = ['tfidf', 'statistics']) -> Dict: + """Main method to prepare data for modeling""" + # Load and prepare data internally + prepared_data = self._prepare_data_internal(file_path, feature_types) + + # Save prepared data + self.save_prepared_data(prepared_data, output_dir) + + return prepared_data + + +if __name__ == "__main__": + # Data preparation + preparator = DataPreparator() + + # Prepare and save data + prepared_data = preparator.prepare_data( + file_path='trials.csv', + output_dir='prepared_data', + feature_types=['tfidf', 'statistics'] + ) + + # Load prepared data + loaded_data = preparator.load_prepared_data('prepared_data') + + # Verify data + for split_name in ['train', 'val', 'test']: + original_features, original_labels = prepared_data[split_name] + loaded_features, loaded_labels = loaded_data[split_name] + + assert np.array_equal(original_features, loaded_features) + assert np.array_equal(original_labels, loaded_labels) + + logger.info(rf"\n{split_name.capitalize()} set loaded successfully:") + logger.info(rf"Features shape: {loaded_features.shape}") + logger.info(rf"Labels shape: {loaded_labels.shape}") \ No newline at end of file