In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Add this configuration before writing
spark.conf.set("spark.databricks.delta.properties.defaults.columnMapping.mode", "name")

# Then proceed with your original write operation
bronze_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze_breast_cancer_patients")

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# 1. Load raw data
bronze_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/breast_cancer_patients.csv")

# 2. Add metadata columns
bronze_df = bronze_df.withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("data_source", lit("breast_cancer_patients.csv")) \
    .withColumn("record_id", monotonically_increasing_id())

# 3. Save to Bronze delta table
bronze_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze_breast_cancer_patients")

print(f"Bronze layer saved with {bronze_df.count()} records")

StatementMeta(, c8ff5b30-65d9-4526-91ca-311dd09a3127, 4, Finished, Available, Finished)

Bronze layer saved with 1 records


In [4]:
# 1. Read from Bronze
silver_df = spark.table("bronze_breast_cancer_patients")

# 2. Data Cleaning - first rename problematic columns
silver_df = silver_df.withColumnRenamed("Weight (kg)", "weight") \
    .withColumnRenamed("Cancer Stage", "cancer_stage") \
    .withColumnRenamed("Date Diagnosed", "diagnosis_date") \
    .withColumnRenamed("Name", "full_name") \
    .withColumnRenamed("Email", "email") \
    .withColumnRenamed("Age", "age")

# Now perform the transformations
silver_df = silver_df.dropDuplicates() \
    .withColumn("age", col("age").cast("integer")) \
    .withColumn("weight", col("weight").cast("float")) \
    .withColumn("diagnosis_date", to_date(col("diagnosis_date"), "yyyy-MM-dd")) \
    .withColumn("email", regexp_extract(col("email"), r'([^@\s]+@[^@\s]+\.[^@\s]+)', 0)) \
    .filter(col("email").isNotNull()) \
    .withColumn("cancer_stage", 
        when(col("cancer_stage").rlike("^[0-4]$"), col("cancer_stage"))
        .otherwise(lit(None)))

# 3. Pseudonymize PII
from pyspark.sql.functions import sha2
silver_df = silver_df.withColumn("patient_hash", sha2(concat(col("full_name"), col("email")), 256)) \
    .drop("full_name", "email")

# 4. Save to Silver
silver_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_breast_cancer_patients")

print(f"Silver layer saved with {silver_df.count()} records")
display(silver_df.limit(5))

StatementMeta(, c8ff5b30-65d9-4526-91ca-311dd09a3127, 6, Finished, Available, Finished)

Silver layer saved with 1 records


SynapseWidget(Synapse.DataFrame, 1f440458-639d-4f0b-9db1-eada5e252a21)

In [8]:
# 1. Read from Silver
gold_df = spark.table("silver_breast_cancer_patients")

# 2. Create analytical tables
# Patient Demographics Table
demographics_df = gold_df.select(
    "patient_hash",
    "age",
    "weight",
    "location",
    "diagnosis_date"
).distinct()

# Cancer Stage Analysis Table
stage_analysis_df = gold_df.groupBy("cancer_stage", "location") \
    .agg(
        count("*").alias("patient_count"),
        avg("age").alias("avg_age"),
        avg("weight").alias("avg_weight"),
        min("diagnosis_date").alias("earliest_diagnosis"),
        max("diagnosis_date").alias("latest_diagnosis")
    ).orderBy("cancer_stage", "location")

# 3. Save Gold tables
(demographics_df.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("gold_patient_demographics"))

(stage_analysis_df.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("gold_stage_analysis"))

# 4. Create analysis view (works in any Spark environment)
spark.sql("""
CREATE OR REPLACE VIEW breast_cancer_analysis
AS SELECT * FROM gold_stage_analysis
""")

print("Gold layer processing completed:")
print("- Created gold_patient_demographics table")
print("- Created gold_stage_analysis table")
print("- Created breast_cancer_analysis view for queries")

StatementMeta(, c8ff5b30-65d9-4526-91ca-311dd09a3127, 10, Finished, Available, Finished)

Gold layer processing completed:
- Created gold_patient_demographics table
- Created gold_stage_analysis table
- Created breast_cancer_analysis view for queries


In [9]:
display(spark.sql("SELECT * FROM breast_cancer_analysis"))

StatementMeta(, c8ff5b30-65d9-4526-91ca-311dd09a3127, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, eec4c7ef-0bbe-4ce0-a390-34b597c3dde5)