--- a +++ b/common/spark.py @@ -0,0 +1,50 @@ +import os +import pyspark +from pyspark.sql import SQLContext + +class spark_init(object): + def __init__(self, sparkConfig=None, name='ehr'): + self._setup_spark(sparkConfig) + + self.sc, self.sqlContext = self._init_spark(name=name) + + def _setup_spark(self, sparkConfig): + + if sparkConfig == None: + config = {'memory': '300g', 'excutors': '4', 'exe_mem': '50G', 'result_size': '80g', + 'temp': '/home/yikuan/tmp', 'offHeap':'16g'} + else: + config = sparkConfig + + os.environ["PYSPARK_PYTHON"] = "" # python spark path + pyspark_submit_args = ' --driver-memory ' + config['memory'] + ' --num-executors ' + config['excutors'] + \ + ' --executor-memory ' + config['exe_mem']+ \ + ' --conf spark.driver.maxResultSize={} --conf spark.memory.offHeap.size={} --conf spark.local.dir={}'\ + .format(config['result_size'], config['offHeap'], config['temp']) +\ + ' pyspark-shell' + + os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args + + def _init_spark(self, name='ehr'): + sc = pyspark.SparkContext(appName=name) + sqlContext = SQLContext(sc) + sqlContext.sql("SET spark.sql.parquet.binaryAsString=true") + return sc, sqlContext + + +def read_txt(sc, sqlContext, path): + """read from txt to pyspark dataframe""" + file = sc.textFile(path) + head = file.first() + content = file.filter(lambda line: line != head).map(lambda k: k.split('\t')) + df = sqlContext.createDataFrame(content, schema=head.split('\t')) + return df + + +def read_parquet(sqlContext, path): + """read from parquet to pyspark dataframe""" + return sqlContext.read.parquet(path) + +def read_csv(sqlContext, path): + """read from parquet to pyspark dataframe""" + return sqlContext.read.csv(path, header=True) \ No newline at end of file