1 lines (1 with data), 11.8 kB
{"cells":[{"cell_type":"code","source":["from pyspark.sql import SparkSession\n","from pyspark.sql.functions import *\n","from pyspark.sql.types import *\n","\n","# Add this configuration before writing\n","spark.conf.set(\"spark.databricks.delta.properties.defaults.columnMapping.mode\", \"name\")\n","\n","# Then proceed with your original write operation\n","bronze_df.write.format(\"delta\") \\\n"," .mode(\"overwrite\") \\\n"," .saveAsTable(\"bronze_breast_cancer_patients\")\n","\n","# Initialize Spark\n","spark = SparkSession.builder.getOrCreate()\n","\n","# 1. Load raw data\n","bronze_df = spark.read.format(\"csv\") \\\n"," .option(\"header\", \"true\") \\\n"," .option(\"inferSchema\", \"true\") \\\n"," .load(\"Files/breast_cancer_patients.csv\")\n","\n","# 2. Add metadata columns\n","bronze_df = bronze_df.withColumn(\"ingestion_timestamp\", current_timestamp()) \\\n"," .withColumn(\"data_source\", lit(\"breast_cancer_patients.csv\")) \\\n"," .withColumn(\"record_id\", monotonically_increasing_id())\n","\n","# 3. Save to Bronze delta table\n","bronze_df.write.format(\"delta\") \\\n"," .mode(\"overwrite\") \\\n"," .saveAsTable(\"bronze_breast_cancer_patients\")\n","\n","print(f\"Bronze layer saved with {bronze_df.count()} records\")"],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":4,"statement_ids":[4],"state":"finished","livy_statement_state":"available","session_id":"c8ff5b30-65d9-4526-91ca-311dd09a3127","normalized_state":"finished","queued_time":"2025-04-03T11:35:55.9510397Z","session_start_time":null,"execution_start_time":"2025-04-03T11:35:55.9522027Z","execution_finish_time":"2025-04-03T11:36:16.3870521Z","parent_msg_id":"f3d38f56-edea-466c-9dcc-df0e5ecc3892"},"text/plain":"StatementMeta(, c8ff5b30-65d9-4526-91ca-311dd09a3127, 4, Finished, Available, Finished)"},"metadata":{}},{"output_type":"stream","name":"stdout","text":["Bronze layer saved with 1 records\n"]}],"execution_count":2,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"77440ec9-e024-4f19-b274-cbb8cee50cee"},{"cell_type":"code","source":["# 1. Read from Bronze\n","silver_df = spark.table(\"bronze_breast_cancer_patients\")\n","\n","# 2. Data Cleaning - first rename problematic columns\n","silver_df = silver_df.withColumnRenamed(\"Weight (kg)\", \"weight\") \\\n"," .withColumnRenamed(\"Cancer Stage\", \"cancer_stage\") \\\n"," .withColumnRenamed(\"Date Diagnosed\", \"diagnosis_date\") \\\n"," .withColumnRenamed(\"Name\", \"full_name\") \\\n"," .withColumnRenamed(\"Email\", \"email\") \\\n"," .withColumnRenamed(\"Age\", \"age\")\n","\n","# Now perform the transformations\n","silver_df = silver_df.dropDuplicates() \\\n"," .withColumn(\"age\", col(\"age\").cast(\"integer\")) \\\n"," .withColumn(\"weight\", col(\"weight\").cast(\"float\")) \\\n"," .withColumn(\"diagnosis_date\", to_date(col(\"diagnosis_date\"), \"yyyy-MM-dd\")) \\\n"," .withColumn(\"email\", regexp_extract(col(\"email\"), r'([^@\\s]+@[^@\\s]+\\.[^@\\s]+)', 0)) \\\n"," .filter(col(\"email\").isNotNull()) \\\n"," .withColumn(\"cancer_stage\", \n"," when(col(\"cancer_stage\").rlike(\"^[0-4]$\"), col(\"cancer_stage\"))\n"," .otherwise(lit(None)))\n","\n","# 3. Pseudonymize PII\n","from pyspark.sql.functions import sha2\n","silver_df = silver_df.withColumn(\"patient_hash\", sha2(concat(col(\"full_name\"), col(\"email\")), 256)) \\\n"," .drop(\"full_name\", \"email\")\n","\n","# 4. Save to Silver\n","silver_df.write.format(\"delta\") \\\n"," .mode(\"overwrite\") \\\n"," .saveAsTable(\"silver_breast_cancer_patients\")\n","\n","print(f\"Silver layer saved with {silver_df.count()} records\")\n","display(silver_df.limit(5))"],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":6,"statement_ids":[6],"state":"finished","livy_statement_state":"available","session_id":"c8ff5b30-65d9-4526-91ca-311dd09a3127","normalized_state":"finished","queued_time":"2025-04-03T11:39:58.5977764Z","session_start_time":null,"execution_start_time":"2025-04-03T11:39:58.5989717Z","execution_finish_time":"2025-04-03T11:40:06.5473089Z","parent_msg_id":"fdd89bff-ee7e-41c7-9943-1d1cf3ba9a5f"},"text/plain":"StatementMeta(, c8ff5b30-65d9-4526-91ca-311dd09a3127, 6, Finished, Available, Finished)"},"metadata":{}},{"output_type":"stream","name":"stdout","text":["Silver layer saved with 1 records\n"]},{"output_type":"display_data","data":{"application/vnd.synapse.widget-view+json":{"widget_id":"1f440458-639d-4f0b-9db1-eada5e252a21","widget_type":"Synapse.DataFrame"},"text/plain":"SynapseWidget(Synapse.DataFrame, 1f440458-639d-4f0b-9db1-eada5e252a21)"},"metadata":{}}],"execution_count":4,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"4caea0a5-dedd-4fb2-86c4-d511ab5f6b20"},{"cell_type":"code","source":["# 1. Read from Silver\n","gold_df = spark.table(\"silver_breast_cancer_patients\")\n","\n","# 2. Create analytical tables\n","# Patient Demographics Table\n","demographics_df = gold_df.select(\n"," \"patient_hash\",\n"," \"age\",\n"," \"weight\",\n"," \"location\",\n"," \"diagnosis_date\"\n",").distinct()\n","\n","# Cancer Stage Analysis Table\n","stage_analysis_df = gold_df.groupBy(\"cancer_stage\", \"location\") \\\n"," .agg(\n"," count(\"*\").alias(\"patient_count\"),\n"," avg(\"age\").alias(\"avg_age\"),\n"," avg(\"weight\").alias(\"avg_weight\"),\n"," min(\"diagnosis_date\").alias(\"earliest_diagnosis\"),\n"," max(\"diagnosis_date\").alias(\"latest_diagnosis\")\n"," ).orderBy(\"cancer_stage\", \"location\")\n","\n","# 3. Save Gold tables\n","(demographics_df.write.format(\"delta\")\n"," .mode(\"overwrite\")\n"," .option(\"overwriteSchema\", \"true\")\n"," .saveAsTable(\"gold_patient_demographics\"))\n","\n","(stage_analysis_df.write.format(\"delta\")\n"," .mode(\"overwrite\")\n"," .option(\"overwriteSchema\", \"true\")\n"," .saveAsTable(\"gold_stage_analysis\"))\n","\n","# 4. Create analysis view (works in any Spark environment)\n","spark.sql(\"\"\"\n","CREATE OR REPLACE VIEW breast_cancer_analysis\n","AS SELECT * FROM gold_stage_analysis\n","\"\"\")\n","\n","print(\"Gold layer processing completed:\")\n","print(\"- Created gold_patient_demographics table\")\n","print(\"- Created gold_stage_analysis table\")\n","print(\"- Created breast_cancer_analysis view for queries\")"],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":10,"statement_ids":[10],"state":"finished","livy_statement_state":"available","session_id":"c8ff5b30-65d9-4526-91ca-311dd09a3127","normalized_state":"finished","queued_time":"2025-04-03T11:58:01.1652199Z","session_start_time":null,"execution_start_time":"2025-04-03T11:58:01.1664035Z","execution_finish_time":"2025-04-03T11:58:12.9999162Z","parent_msg_id":"ba552e6b-3b1a-4813-a554-4448737a05d5"},"text/plain":"StatementMeta(, c8ff5b30-65d9-4526-91ca-311dd09a3127, 10, Finished, Available, Finished)"},"metadata":{}},{"output_type":"stream","name":"stdout","text":["Gold layer processing completed:\n- Created gold_patient_demographics table\n- Created gold_stage_analysis table\n- Created breast_cancer_analysis view for queries\n"]}],"execution_count":8,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"47b7d0a3-3990-4f6c-8e97-6105b757a0c6"},{"cell_type":"code","source":["display(spark.sql(\"SELECT * FROM breast_cancer_analysis\"))"],"outputs":[{"output_type":"display_data","data":{"application/vnd.livy.statement-meta+json":{"spark_pool":null,"statement_id":11,"statement_ids":[11],"state":"finished","livy_statement_state":"available","session_id":"c8ff5b30-65d9-4526-91ca-311dd09a3127","normalized_state":"finished","queued_time":"2025-04-03T11:58:35.2637446Z","session_start_time":null,"execution_start_time":"2025-04-03T11:58:35.2648634Z","execution_finish_time":"2025-04-03T11:58:38.658178Z","parent_msg_id":"278a719c-aac2-439e-8ed6-8e6655deaeaf"},"text/plain":"StatementMeta(, c8ff5b30-65d9-4526-91ca-311dd09a3127, 11, Finished, Available, Finished)"},"metadata":{}},{"output_type":"display_data","data":{"application/vnd.synapse.widget-view+json":{"widget_id":"eec4c7ef-0bbe-4ce0-a390-34b597c3dde5","widget_type":"Synapse.DataFrame"},"text/plain":"SynapseWidget(Synapse.DataFrame, eec4c7ef-0bbe-4ce0-a390-34b597c3dde5)"},"metadata":{}}],"execution_count":9,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"25a1731e-a96f-4e9e-a04c-7cffd6c16358"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"nteract-front-end@1.0.0"},"synapse_widget":{"version":"0.1","state":{"1f440458-639d-4f0b-9db1-eada5e252a21":{"type":"Synapse.DataFrame","sync_state":{"table":{"rows":[{"0":"78","1":"67.0","2":"Nairobi","4":"2025-03-30","5":"2025-04-03 11:36:11.701288","6":"breast_cancer_patients.csv","7":"0","8":"7391d69610ee1b3530a0dcf95d07e2cf8f91eed8934cfcd6a4ca4ba5d56d20f0"}],"schema":[{"key":"0","name":"age","type":"int"},{"key":"1","name":"weight","type":"float"},{"key":"2","name":"Location","type":"string"},{"key":"3","name":"cancer_stage","type":"string"},{"key":"4","name":"diagnosis_date","type":"date"},{"key":"5","name":"ingestion_timestamp","type":"timestamp"},{"key":"6","name":"data_source","type":"string"},{"key":"7","name":"record_id","type":"bigint"},{"key":"8","name":"patient_hash","type":"string"}],"truncated":false},"isSummary":false,"language":"scala","wranglerEntryContext":{"dataframeType":"pyspark"}},"persist_state":{"view":{"type":"details","tableOptions":{},"chartOptions":{"chartType":"bar","categoryFieldKeys":["2"],"seriesFieldKeys":["0"],"aggregationType":"sum","isStacked":false,"binsNumber":10,"wordFrequency":"-1","evaluatesOverAllRecords":false},"viewOptionsGroup":[{"tabItems":[{"type":"table","name":"Table","key":"0","options":{}}]}]}}},"eec4c7ef-0bbe-4ce0-a390-34b597c3dde5":{"type":"Synapse.DataFrame","sync_state":{"table":{"rows":[{"1":"Nairobi","2":"1","3":"78.0","4":"67.0","5":"2025-03-30","6":"2025-03-30"}],"schema":[{"key":"0","name":"cancer_stage","type":"string"},{"key":"1","name":"location","type":"string"},{"key":"2","name":"patient_count","type":"bigint"},{"key":"3","name":"avg_age","type":"double"},{"key":"4","name":"avg_weight","type":"double"},{"key":"5","name":"earliest_diagnosis","type":"date"},{"key":"6","name":"latest_diagnosis","type":"date"}],"truncated":false},"isSummary":false,"language":"scala","wranglerEntryContext":{"dataframeType":"pyspark"}},"persist_state":{"view":{"type":"details","tableOptions":{},"chartOptions":{"chartType":"bar","categoryFieldKeys":["0"],"seriesFieldKeys":["3"],"aggregationType":"sum","isStacked":false,"binsNumber":10,"wordFrequency":"-1","evaluatesOverAllRecords":false},"viewOptionsGroup":[{"tabItems":[{"type":"table","name":"Table","key":"0","options":{}}]}]}}}}},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{"default_lakehouse":"10f68d8b-f1eb-41b0-b515-e0974aec5206","known_lakehouses":[{"id":"10f68d8b-f1eb-41b0-b515-e0974aec5206"}],"default_lakehouse_name":"Breast_Cancer_LakeHouse","default_lakehouse_workspace_id":"cde60769-1208-4712-9d88-602cb5dae476"}}},"nbformat":4,"nbformat_minor":5}