|
a |
|
b/common/spark.py |
|
|
1 |
import os |
|
|
2 |
import pyspark |
|
|
3 |
from pyspark.sql import SQLContext |
|
|
4 |
|
|
|
5 |
class spark_init(object): |
|
|
6 |
def __init__(self, sparkConfig=None, name='ehr'): |
|
|
7 |
self._setup_spark(sparkConfig) |
|
|
8 |
|
|
|
9 |
self.sc, self.sqlContext = self._init_spark(name=name) |
|
|
10 |
|
|
|
11 |
def _setup_spark(self, sparkConfig): |
|
|
12 |
|
|
|
13 |
if sparkConfig == None: |
|
|
14 |
config = {'memory': '300g', 'excutors': '4', 'exe_mem': '50G', 'result_size': '80g', |
|
|
15 |
'temp': '/home/yikuan/tmp', 'offHeap':'16g'} |
|
|
16 |
else: |
|
|
17 |
config = sparkConfig |
|
|
18 |
|
|
|
19 |
os.environ["PYSPARK_PYTHON"] = "" # python spark path |
|
|
20 |
pyspark_submit_args = ' --driver-memory ' + config['memory'] + ' --num-executors ' + config['excutors'] + \ |
|
|
21 |
' --executor-memory ' + config['exe_mem']+ \ |
|
|
22 |
' --conf spark.driver.maxResultSize={} --conf spark.memory.offHeap.size={} --conf spark.local.dir={}'\ |
|
|
23 |
.format(config['result_size'], config['offHeap'], config['temp']) +\ |
|
|
24 |
' pyspark-shell' |
|
|
25 |
|
|
|
26 |
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args |
|
|
27 |
|
|
|
28 |
def _init_spark(self, name='ehr'): |
|
|
29 |
sc = pyspark.SparkContext(appName=name) |
|
|
30 |
sqlContext = SQLContext(sc) |
|
|
31 |
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true") |
|
|
32 |
return sc, sqlContext |
|
|
33 |
|
|
|
34 |
|
|
|
35 |
def read_txt(sc, sqlContext, path): |
|
|
36 |
"""read from txt to pyspark dataframe""" |
|
|
37 |
file = sc.textFile(path) |
|
|
38 |
head = file.first() |
|
|
39 |
content = file.filter(lambda line: line != head).map(lambda k: k.split('\t')) |
|
|
40 |
df = sqlContext.createDataFrame(content, schema=head.split('\t')) |
|
|
41 |
return df |
|
|
42 |
|
|
|
43 |
|
|
|
44 |
def read_parquet(sqlContext, path): |
|
|
45 |
"""read from parquet to pyspark dataframe""" |
|
|
46 |
return sqlContext.read.parquet(path) |
|
|
47 |
|
|
|
48 |
def read_csv(sqlContext, path): |
|
|
49 |
"""read from parquet to pyspark dataframe""" |
|
|
50 |
return sqlContext.read.csv(path, header=True) |