Diff of /common/spark.py [000000] .. [bad60c]

Switch to unified view

a b/common/spark.py
1
import os
2
import pyspark
3
from pyspark.sql import SQLContext
4
5
class spark_init(object):
6
    def __init__(self, sparkConfig=None, name='ehr'):
7
        self._setup_spark(sparkConfig)
8
9
        self.sc, self.sqlContext = self._init_spark(name=name)
10
11
    def _setup_spark(self, sparkConfig):
12
13
        if sparkConfig == None:
14
            config = {'memory': '300g', 'excutors': '4', 'exe_mem': '50G', 'result_size': '80g',
15
                      'temp': '/home/yikuan/tmp', 'offHeap':'16g'}
16
        else:
17
            config = sparkConfig
18
19
        os.environ["PYSPARK_PYTHON"] = "" # python spark path
20
        pyspark_submit_args = ' --driver-memory ' + config['memory'] + ' --num-executors ' + config['excutors'] + \
21
                              ' --executor-memory ' + config['exe_mem']+ \
22
                              ' --conf spark.driver.maxResultSize={} --conf spark.memory.offHeap.size={} --conf spark.local.dir={}'\
23
                                  .format(config['result_size'], config['offHeap'], config['temp']) +\
24
                              ' pyspark-shell'
25
26
        os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
27
28
    def _init_spark(self, name='ehr'):
29
        sc = pyspark.SparkContext(appName=name)
30
        sqlContext = SQLContext(sc)
31
        sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")
32
        return sc, sqlContext
33
34
35
def read_txt(sc, sqlContext, path):
36
    """read from txt to pyspark dataframe"""
37
    file = sc.textFile(path)
38
    head = file.first()
39
    content = file.filter(lambda line: line != head).map(lambda k: k.split('\t'))
40
    df = sqlContext.createDataFrame(content, schema=head.split('\t'))
41
    return df
42
43
44
def read_parquet(sqlContext, path):
45
    """read from parquet to pyspark dataframe"""
46
    return sqlContext.read.parquet(path)
47
48
def read_csv(sqlContext, path):
49
    """read from parquet to pyspark dataframe"""
50
    return sqlContext.read.csv(path, header=True)