In [20]:
# %% [markdown]
# # Genomic Data Processing Pipeline
# Comprehensive analysis of biospecimen data with proper handling of both numeric and categorical columns

# %%
# Initialize Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnull, isnan, count, lit
from pyspark.sql.types import DoubleType, IntegerType, StringType
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# Configuration
input_path = "Files/PDC_biospecimen_manifest_03272025_214257.csv"
output_path = "Files/processed-data/analysis_results.parquet"

# Initialize Spark
spark = SparkSession.builder.appName("GenomicDataProcessing").getOrCreate()

# %%
def load_and_analyze_data(spark, input_path):
    """Load and perform initial analysis of the biospecimen data"""
    try:
        # Load data with schema inference
        df = (spark.read
              .format("csv")
              .option("header", "true")
              .option("inferSchema", "true")
              .load(input_path))
        
        print("Initial data schema:")
        df.printSchema()
        
        # Show basic stats
        print(f"\nTotal records: {df.count()}")
        print(f"Columns: {len(df.columns)}")
        
        return df
        
    except Exception as e:
        print(f"Data loading error: {str(e)}")
        raise

# %%
def analyze_numeric_data(df):
    """Analyze numeric columns if they exist"""
    try:
        numeric_cols = [f.name for f in df.schema.fields 
                       if isinstance(f.dataType, (DoubleType, IntegerType))]
        
        if not numeric_cols:
            print("\nNo numeric columns found in dataset")
            return None
        
        print("\nNumeric columns analysis:")
        
        # Count nulls in numeric columns
        null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                       for c in numeric_cols]).collect()[0]
        
        print("\nNull values in numeric columns:")
        for col_name, null_count in zip(numeric_cols, null_counts):
            print(f"{col_name}: {null_count} nulls")
        
        # Show summary stats for non-null columns
        non_null_cols = [c for c in numeric_cols if null_counts[numeric_cols.index(c)] < df.count()]
        
        if non_null_cols:
            print("\nSummary statistics:")
            df.select(non_null_cols).describe().show()
            return non_null_cols
        else:
            print("\nAll numeric columns are completely null")
            return None
            
    except Exception as e:
        print(f"Numeric analysis failed: {str(e)}")
        return None

# %%
def analyze_categorical_data(df, top_n=5):
    """Analyze categorical columns in the data"""
    try:
        categorical_cols = [f.name for f in df.schema.fields 
                          if isinstance(f.dataType, StringType)]
        
        if not categorical_cols:
            print("\nNo categorical columns found")
            return None
            
        print("\nCategorical columns analysis:")
        
        for col_name in categorical_cols:
            print(f"\nColumn: {col_name}")
            df.groupBy(col_name).count().orderBy("count", ascending=False).show(top_n, truncate=False)
            
        return categorical_cols
        
    except Exception as e:
        print(f"Categorical analysis failed: {str(e)}")
        return None

# %%
def perform_pca_if_possible(df, numeric_cols):
    """Perform PCA if we have sufficient numeric data"""
    try:
        if not numeric_cols or len(numeric_cols) < 2:
            print("\nInsufficient numeric data for PCA")
            return None
            
        # Prepare data - fill nulls with mean
        from pyspark.sql.functions import mean
        for col_name in numeric_cols:
            col_mean = df.select(mean(col(col_name))).collect()[0][0]
            df = df.withColumn(col_name, when(col(col_name).isNull(), col_mean).otherwise(col(col_name)))
        
        # Convert to pandas for PCA
        pdf = df.select(numeric_cols).toPandas()
        
        # Perform PCA
        pca = PCA(n_components=2)
        X_pca = pca.fit_transform(pdf)
        
        # Create results DataFrame
        pca_results = pd.DataFrame(X_pca, columns=['PC1', 'PC2'])
        
        # Visualize
        plt.figure(figsize=(10, 6))
        plt.scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.5)
        plt.xlabel(f"PC1 ({pca.explained_variance_ratio_[0]:.2%} variance)")
        plt.ylabel(f"PC2 ({pca.explained_variance_ratio_[1]:.2%} variance)")
        plt.title("PCA of Numeric Features")
        plt.grid(True)
        plt.show()
        
        return spark.createDataFrame(pca_results)
        
    except Exception as e:
        print(f"PCA failed: {str(e)}")
        return None

# %%
# Main execution pipeline
try:
    print("Starting analysis pipeline...")
    
    # 1. Load data
    print("\n=== Loading Data ===")
    df = load_and_analyze_data(spark, input_path)
    
    # 2. Analyze numeric data
    print("\n=== Numeric Analysis ===")
    numeric_cols = analyze_numeric_data(df)
    
    # 3. Analyze categorical data
    print("\n=== Categorical Analysis ===")
    categorical_cols = analyze_categorical_data(df)
    
    # 4. Try PCA if we have numeric data
    print("\n=== Dimensionality Reduction ===")
    pca_results = perform_pca_if_possible(df, numeric_cols)
    
    # 5. Save results if we have them
    if pca_results:
        print("\nSaving PCA results...")
        (pca_results.write
         .mode("overwrite")
         .format("parquet")
         .save(output_path))
        print(f"Results saved to {output_path}")
    else:
        print("\nNo PCA results to save")
    
    print("\nPipeline completed successfully!")
    
except Exception as e:
    print(f"\nPipeline failed: {str(e)}")
    
finally:
    spark.stop()
    print("Spark session closed")

StatementMeta(, 56701f99-4897-447d-b694-5a8c7be6acc3, 22, Finished, Available, Finished)

Starting analysis pipeline...

=== Loading Data ===
Initial data schema:
root
 |-- Aliquot ID: string (nullable = true)
 |-- Aliquot Submitter ID: string (nullable = true)
 |-- Sample ID: string (nullable = true)
 |-- Sample Submitter ID: string (nullable = true)
 |-- Case ID: string (nullable = true)
 |-- Case Submitter ID: string (nullable = true)
 |-- Project Name: string (nullable = true)
 |-- Sample Type: string (nullable = true)
 |-- Primary Site: string (nullable = true)
 |-- Disease Type: string (nullable = true)
 |-- Aliquot Is Ref: string (nullable = true)
 |-- Aliquot Status: string (nullable = true)
 |-- Aliquot Quantity: string (nullable = true)
 |-- Aliquot Volume: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Analyte Type: string (nullable = true)
 |-- Concentration: string (nullable = true)
 |-- Case Status: string (nullable = true)
 |-- Sample Status: string (nullable = true)
 |-- Sample Is Ref: string (nullable = true)
 |-- Biospecimen Anatomic S