--- a +++ b/preprocess/behrtFormat.py @@ -0,0 +1,43 @@ +from common.spark import spark_init, read_parquet, read_txt +from CPRD.tabel import EHR +import pyspark.sql.functions as F +from pyspark.sql import Window + +spark = spark_init() + +config= { + 'diagnoses': '', # data path for diagnoses/medication + 'demographic': '', # data path for demographic information + 'output': '', # path to save formated file + 'col_name': '' # column name for ICD/Med code +} + +diagnoses = read_parquet(spark.sqlContext, config['diagnoses']).select(['patid','eventdate',config['col_name']]).na.drop().select(['patid','eventdate', config['col_name']]) +demographic = read_parquet(spark.sqlContext, config['demographic']) + +diagnoses = diagnoses.na.drop() +diagnoses = diagnoses.dropDuplicates() + +# demographic data +demographic = demographic.select(['patid', 'yob']) +diagnoses= diagnoses.join(demographic, diagnoses.patid == demographic.patid, 'inner').drop(demographic.patid) +diagnoses = EHR(diagnoses).cal_age('eventdate', 'yob', year=False).select(['patid', 'eventdate', 'age', config['col_name'], 'yob']) +diagnoses = diagnoses.dropDuplicates() + +# set age and code to string +diagnoses = EHR(diagnoses).set_col_to_str('age').set_col_to_str(config['col_name']) + +# group by date +diagnoses = diagnoses.groupby(['patid', 'eventdate']).agg(F.collect_list(config['col_name']).alias(config['col_name']), F.collect_list('age').alias('age'), F.first('yob').alias('yob')) +diagnoses = EHR(diagnoses).array_add_element(config['col_name'], 'SEP') + +# add extra age to fill the gap of sep +extract_age = F.udf(lambda x: x[0]) +diagnoses = diagnoses.withColumn('age_temp', extract_age('age')).withColumn('age', F.concat(F.col('age'),F.array(F.col('age_temp')))).drop('age_temp') + +w = Window.partitionBy('patid').orderBy('eventdate') +# sort and merge ccs and age +diagnoses = diagnoses.withColumn(config['col_name'], F.collect_list(config['col_name']).over(w)).withColumn('age', F.collect_list('age').over(w)).groupBy('patid').agg(F.max(config['col_name']).alias(config['col_name']), F.max('age').alias('age')) + +diagnoses = EHR(diagnoses).array_flatten(config['col_name']).array_flatten('age') +diagnoses.write.parquet(config['output']) \ No newline at end of file