--- a +++ b/modas/regiongwas.py @@ -0,0 +1,358 @@ +import pandas as pd +import numpy as np +import modas.multiprocess as mp +import modas.gwas_cmd as gc +from sklearn.decomposition import PCA +from rpy2.robjects import pandas2ri +from rpy2.rinterface_lib.embedded import RRuntimeError +import rpy2.robjects as robjects +from rpy2.robjects.packages import importr +from rpy2.rinterface_lib.callbacks import logger as rpy2_logger +import subprocess +import logging +import glob, os +import shutil +import re + +pandas2ri.activate() +rpy2_logger.setLevel(logging.ERROR) +rMVP = importr('rMVP') +base = importr('base') +bigmemory = importr('bigmemory') + +utils_path = subprocess.check_output('locate modas/utils', shell=True, text=True, encoding='utf-8') +#utils_path = '/'.join(re.search('\n(.*site-packages.*)\n', utils_path).group(1).split('/')[:-1]) +utils_path = re.search('\n(.*site-packages.*)\n', utils_path).group(1) +if not utils_path.endswith('utils'): + utils_path = '/'.join(utils_path.split('/')[:-1]) + + +def region_gwas_parallel(bed_dir, threads, geno, gwas_model): + region_gwas_args = list() + geno_prefix = geno.split('/')[-1] + if gwas_model == 'GLM' or gwas_model == 'MLM': + fam = pd.read_csv(geno+'.fam', sep=r'\s+', header=None) + fam[5] = 1 + fam.to_csv(geno_prefix+'.link.fam', sep='\t', na_rep='NA', header=None, index=False) + if os.path.exists(geno_prefix+'.link.bed'): + os.remove(geno_prefix+'.link.bed') + if os.path.exists(geno_prefix+'.link.bim'): + os.remove(geno_prefix+'.link.bim') + os.symlink(geno+'.bed', geno_prefix+'.link.bed') + os.symlink(geno+'.bim', geno_prefix+'.link.bim') + if gwas_model=='MLM': + related_matrix_cmd = utils_path + '/gemma -bfile {0}.link -gk 1 -o {1}'.format(geno_prefix,geno_prefix) + s = mp.run(related_matrix_cmd) + if s!=0: + return None + # if gwas_model=='MLM': + # gemma_cmd = 'gemma.linux -bfile {0} -k ./output/{1}.cXX.txt -lmm -n 1 -o {2}' + # elif gwas_model=='LM': + # gemma_cmd = 'gemma.linux -bfile {0} -lm -o {1}' + + + for _, i in enumerate(glob.glob(bed_dir+'/*.bed')): + i = i.replace('.bed','') + i = i.replace('m/z','m.z') + prefix = i.split('/')[-1] + if gwas_model == 'MLM': + region_gwas_args.append((gc.gemma_cmd('MLM', i, geno_prefix, 1, prefix + '_plink'),)) + # region_gwas_args.append((gemma_cmd.format(i, geno_prefix, prefix+'_plink'),)) + elif gwas_model == 'LM': + region_gwas_args.append((gc.gemma_cmd('LM', i, None, None, prefix + '_plink'),)) + # region_gwas_args.append((gemma_cmd.format(i, prefix+'_plink'),)) + else: + phe = pd.read_csv(i+'.fam', sep='\s+',header=None) + phe = phe.iloc[:, [0, -1]] + phe.columns = ['Taxa', prefix] + # region_gwas_args.append((phe, '../'+geno_prefix + '.link', '../'+i, 1)) + region_gwas_args.append(('GLM', geno_prefix+'.link', i, phe, 1, './output')) + if gwas_model == 'LM' or gwas_model == 'MLM': + s = mp.parallel(mp.run, region_gwas_args, threads) + else: + if not os.path.exists('./output'): + os.mkdir('./output') + # os.chdir('./output') + # s = mp.parallel(glm_gwas, (region_gwas_args[0],), 1) + # s = mp.parallel(glm_gwas, region_gwas_args[1:], threads) + # os.chdir('../') + s = mp.parallel(gc.rmvp, (region_gwas_args[0],), 1) + s = mp.parallel(gc.rmvp, region_gwas_args[1:], threads) + if gwas_model == 'GLM' or gwas_model == 'MLM': + os.remove(geno_prefix+'.link.bed') + os.remove(geno_prefix+'.link.bim') + os.remove(geno_prefix+'.link.fam') + return s + + +def glm_gwas(omics_phe, pc_geno_prefix, geno_prefix, threads): + try: + base.sink('/dev/null') + if not os.path.exists(pc_geno_prefix + '.pc.desc'): + rMVP.MVP_Data(fileBed=pc_geno_prefix, fileKin=False, filePC=False, out=pc_geno_prefix, + verbose=False) + rMVP.MVP_Data_PC(True, mvp_prefix=pc_geno_prefix, pcs_keep=3, verbose=False) + rMVP.MVP_Data(fileBed=geno_prefix, fileKin=False, filePC=False, out=geno_prefix, verbose=False) + geno = bigmemory.attach_big_matrix(geno_prefix +'.geno.desc') + map_file = pd.read_csv(geno_prefix +'.geno.map', sep='\t') + Covariates_PC = bigmemory.as_matrix(bigmemory.attach_big_matrix(pc_geno_prefix + '.pc.desc')) + # base.setwd('./output') + rMVP.MVP(phe=omics_phe, geno=geno, map=map_file, CV_GLM=Covariates_PC, priority="speed", + ncpus=threads, maxLoop=10, threshold=0.05, method=['GLM'], file_output=True, verbose=False) + base.sink() + #gwas(omics_phe, pc_geno_prefix, geno_prefix, threads) + except RRuntimeError: + return 0 + except ValueError: + return 0 + else: + return 1 + + +def generate_qtl_batch(omics_phe,phe_sig_qtl,geno_name,threads,bed_dir,rs_dir): + plink_extract = utils_path + '/plink -bfile {} --extract {} --make-bed -out {}' + bim = pd.read_csv(geno_name+'.bim', sep='\t', header=None) + qtl_batch = list() + rs = dict() + for index,row in phe_sig_qtl.iterrows(): + rs.setdefault(row['phe_name'],[]).extend(bim.loc[(bim[0]==row['chr']) & (bim[3]>=row['start']) & (bim[3]<=row['end']),1].values.tolist()) + for phe_name in rs: + out_name = bed_dir.strip('/') + '/' + '_'.join(['tmp',phe_name]) + rs_name = rs_dir.strip('/') + '/' + '_'.join(['tmp',phe_name,'rs.txt']) + pd.Series(rs[phe_name]).to_frame().to_csv(rs_name,index=False,header=False) + qtl_batch.append((plink_extract.format(geno_name,rs_name,out_name),)) + mp.parallel(mp.run,qtl_batch,threads) + for fn in glob.glob(bed_dir.strip('/')+'/*fam'): + fam = pd.read_csv(fn,sep=' ',header=None) + phe_name = '_'.join(fn.split('/')[-1].split('_')[1:]).replace('m.z','m/z').replace('.fam','') + fam.loc[:,5] = omics_phe.loc[:,phe_name].reindex(fam.loc[:,0]).values + fam.to_csv(fn,index=False,header=None,sep=' ',na_rep='NA') + + +# def generate_clump_input(dir,num_threads): +# if os.path.exists('./clump_input'): +# shutil.rmtree('./clump_input') +# os.mkdir('./clump_input') +# cmd = '''awk '{if(NR==1)print "SNP\\tP"; else print $2"\\t"$11}' ''' +# cmds = list() +# fns = list() +# for fn in glob.glob(dir.strip('/')+'/*_plink.assoc.txt'): +# filename = fn.split('/')[-1] +# cmds.append((cmd+'{0} > ./clump_input/{1}'.format(fn, filename.replace('_plink.assoc.txt', '.assoc')),)) +# fns.append(filename) +# s = mp.parallel(mp.run, cmds, num_threads) +# if sum(s) != 0: +# print(','.join(list(np.array(fns)[s]))+' do not successfully generated clump input file.') +# return s +def generate_clump_input(dir, gwas_model): + if os.path.exists('./clump_input'): + shutil.rmtree('./clump_input') + os.mkdir('./clump_input') + if gwas_model == 'LM' or gwas_model == 'MLM': + for fn in glob.glob(dir.strip('/')+'/*_plink.assoc.txt'): + filename = fn.split('/')[-1] + assoc = pd.read_csv(fn, sep='\t') + assoc = assoc[['rs', 'p_wald']] + assoc.columns = ['SNP', 'P'] + assoc.to_csv('./clump_input/' + filename.replace('_plink.assoc.txt', '.assoc'), index=False, sep='\t') + else: + for fn in glob.glob(dir.strip('/')+'/tmp_*GLM.csv'): + filename = fn.split('/')[-1] + assoc = pd.read_csv(fn) + assoc = assoc.iloc[:, [0, -1]] + assoc.columns = ['SNP', 'P'] + assoc.to_csv('./clump_input/' + filename.replace('.GLM.csv', '.assoc'), index=False, sep='\t') + + +def plink_clump(geno_path, p1, p2, num_threads): + if os.path.exists('./clump_result'): + shutil.rmtree('./clump_result') + os.mkdir('./clump_result') + cmd = utils_path + '/plink --bfile {0} --clump {1} --clump-p1 {2} --clump-p2 {3} --clump-kb {4} --clump-r2 0.2 --out {5}' + cmds = list() + ms = list() + for fn in glob.glob('./clump_input/*'): + phe_name = fn.split('/')[-1].replace('.assoc','') + cmds.append((cmd.format(geno_path+'/'+phe_name, fn, p1, p2,str(500), './clump_result/' + phe_name + '_'+str(500)),)) + ms.append(phe_name) + s = mp.parallel(mp.run, cmds, num_threads) + if sum(s) != 0: + print(','.join(list(np.array(ms)[s]))+' do not successfully generated clumped file.') + return s + + +#def merge_qtl(qtl): +# qtl = qtl.sort_values(by=['CHR','BP']) +# merged_qtl = list() +# for index,row in qtl.iterrows(): +# if not merged_qtl: +# merged_qtl.append(row) +# else: +# if row['CHR'] != merged_qtl[-1]['CHR']: +# merged_qtl.append(row) +# else: +# if row['BP'] - merged_qtl[-1]['BP'] <= 1000000: +# if row['P'] < merged_qtl[-1]['P']: +# merged_qtl[-1]['P'] = row['P'] +# merged_qtl[-1]['BP'] = row['BP'] +# merged_qtl[-1]['SNP'] = row['SNP'] +# merged_qtl[-1]['SP2_num'] += row['SP2_num'] +# merged_qtl[-1]['SP2']+= ',' + row['SP2'] +# else: +# merged_qtl.append(row) +# merged_qtl = pd.DataFrame(merged_qtl) +# return merged_qtl + +def merge_qtl_phe(qtl): + qtl = qtl.sort_values(by=['CHR','qtl_start']) + merged_phe_qtl = list() + for index,row in qtl.iterrows(): + if not merged_phe_qtl: + merged_phe_qtl.append(row) + else: + if row['CHR'] != merged_phe_qtl[-1]['CHR']: + merged_phe_qtl.append(row) + else: + if row['qtl_start'] < merged_phe_qtl[-1]['qtl_end'] + 3000000: + if row['P'] < merged_phe_qtl[-1]['P']: + merged_phe_qtl[-1]['P'] = row['P'] + merged_phe_qtl[-1]['SNP'] = row['SNP'] + merged_phe_qtl[-1]['qtl_start'] = min(merged_phe_qtl[-1]['qtl_start'],row['qtl_start']) + merged_phe_qtl[-1]['qtl_end'] = max(merged_phe_qtl[-1]['qtl_end'], row['qtl_end']) + merged_phe_qtl[-1]['SP2_num'] += row['SP2_num'] + else: + merged_phe_qtl.append(row) + merged_phe_qtl = pd.DataFrame(merged_phe_qtl) + return merged_phe_qtl + + +def merge_qtl(qtl): + qtl = qtl.sort_values(by=['CHR','qtl_start']) + merged_qtl = pd.DataFrame() + for index,row in qtl.iterrows(): + if merged_qtl.empty: + merged_qtl = pd.concat([merged_qtl, row.to_frame().T]) + else: + qtl_length = row['qtl_end'] - row['qtl_start'] + qtl_ratio = (merged_qtl['qtl_end'] - row['qtl_start']) / qtl_length + qtl_index = (merged_qtl['CHR'] == row['CHR']) & (qtl_ratio >=0.1) + if qtl_index.sum()>0: + peak_dis = (merged_qtl.loc[qtl_index,'SNP'].apply(lambda x: int(x.split('_')[-1])) - int(row['SNP'].split('_')[-1])).abs() + if (peak_dis <= 2000000).sum()==0: + merged_qtl = pd.concat([merged_qtl, row.to_frame().T]) + else: + merged_qtl_index = peak_dis[qtl_index].idxmin() + if merged_qtl.loc[merged_qtl_index,'P'] > row['P']: + merged_qtl.loc[merged_qtl_index,'P'] = row['P'] + merged_qtl.loc[merged_qtl_index,'SNP'] = row['SNP'] + merged_qtl.loc[merged_qtl_index,'qtl_start'] = min(merged_qtl.loc[merged_qtl_index,'qtl_start'],row['qtl_start']) + merged_qtl.loc[merged_qtl_index,'qtl_end'] = max(merged_qtl.loc[merged_qtl_index,'qtl_end'], row['qtl_end']) + merged_qtl.loc[merged_qtl_index,'SP2_num'] += row['SP2_num'] + merged_qtl.loc[merged_qtl_index,'phe_name'] = merged_qtl.loc[merged_qtl_index,'phe_name'] + ',' + row['phe_name'] + + else: + merged_qtl = pd.concat([merged_qtl, row.to_frame().T]) + return merged_qtl + + +def phe_cluster(phe, phe_labeled, n): + pca = PCA(n_components=1) + phe_pc1 = pca.fit_transform(phe) + phe_corr = phe.corrwith(pd.Series(phe_pc1[:,0],index=phe.index)).abs() + if (phe_corr >= 0.6).all(): + phe_labeled.loc[phe_corr.index,'label'] = n + return pd.DataFrame(phe_pc1,index=phe.index,columns=['cluster'+str(n)+'_PC1']), phe_labeled, n+1 + else: + phe_pc1= pd.DataFrame() + while not phe.empty: + phe_corr = phe.corrwith(pd.Series(pca.fit_transform(phe)[:,0],index=phe.index)).abs() + if (phe_corr < 0.6).sum()==1: + if phe_corr.shape[0]==2: + phe_pc1 = pd.concat([phe_pc1,phe.loc[:,phe_corr.index]],axis=1) + else: + phe_pc1 = pd.concat([phe_pc1,pd.DataFrame(pca.fit_transform(phe.loc[:,phe_corr>=0.6]),index=phe.index,columns=['cluster'+str(n)+'_PC1'])],axis=1) + phe_labeled.loc[phe.loc[:,phe_corr>=0.6].columns,'label'] = n + n = n + 1 + phe_pc1 = pd.concat([phe_pc1,phe.loc[:, phe_corr < 0.6]],axis=1) + phe = pd.DataFrame() + else: + if (phe_corr>=0.6).any(): + if (phe_corr>=0.6).sum()==1: + phe_pc1 = pd.concat([phe_pc1,phe.loc[:,phe_corr>=0.6]],axis=1) + else: + phe_pc1 = pd.concat([phe_pc1,pd.DataFrame(pca.fit_transform(phe.loc[:,phe_corr>=0.6]),index=phe.index,columns=['cluster'+str(n)+'_PC1'])],axis=1) + phe_labeled.loc[phe.loc[:,phe_corr>=0.6].columns,'label'] = n + n = n + 1 + phe = phe.loc[:,phe_corr < 0.6] + else: + phe_pc1 = pd.concat([phe_pc1,phe],axis=1) + phe= pd.DataFrame() + #phe_corr = phe.corrwith(pd.Series(pca.fit_transform(phe)[:,0],index=phe.index)).abs() + return phe_pc1,phe_labeled,n + + + +def generate_qtl(clump_result_dir, p2): + qtl_res = list() + bad_qtl = list() + for fn in glob.glob(clump_result_dir.strip('/')+'/*clumped'): + phe_name = '_'.join(fn.split('/')[-1].split('_')[1:-1]) + clump_result = pd.read_csv(fn,sep='\s+') + clump_result = clump_result.loc[clump_result.SP2!='NONE',:] + qtl = clump_result[['CHR','BP','SNP','P','SP2']] + qtl.loc[:,'SP2_num'] = qtl['SP2'].apply(lambda x: len(x.split(','))) + qtl.loc[:,'log10P'] = -np.log10(qtl['P']) + if (qtl['SP2_num'] >= 10).sum() > 0: + qtl['qtl_start'] = qtl['SP2'].apply(lambda x:int(re.findall(r'_(\d+)',x)[0])) + qtl['qtl_end'] = qtl['SP2'].apply(lambda x:int(re.findall(r'_(\d+)',x)[-1])) + qtl['phe_name'] = phe_name + qtl_filter = qtl.loc[qtl.SP2_num>=5,:] + mer_qtl_filter = merge_qtl_phe(qtl_filter) + mer_qtl_filter.loc[:,'qtl_length'] = mer_qtl_filter['qtl_end'] - mer_qtl_filter['qtl_start'] + 1 + if mer_qtl_filter.shape[0] < 10: + qtl = qtl.loc[(qtl.SP2_num>=5) & (qtl.log10P >= -np.log10(p2)), ['CHR','qtl_start','qtl_end','SNP','P','SP2_num','phe_name']] + mer_qtl = merge_qtl_phe(qtl) + mer_qtl.loc[:,'qtl_length'] = mer_qtl['qtl_end'] - mer_qtl['qtl_start'] + 1 + mer_qtl = mer_qtl.loc[:,['CHR','qtl_start','qtl_end','SNP','P','SP2_num','qtl_length','phe_name']] + if mer_qtl.shape[0] < 4: + qtl_res.append(mer_qtl) + else: + bad_qtl.append(mer_qtl.loc[mer_qtl['SP2_num']>=10,:]) + else: + bad_qtl.append(mer_qtl_filter.loc[mer_qtl_filter['SP2_num']>=10,['CHR','qtl_start','qtl_end','SNP','P','SP2_num','qtl_length','phe_name']]) + qtl_res = pd.concat(qtl_res) + qtl_res = qtl_res.loc[qtl_res['SP2_num']>=10,:] + if not bad_qtl: + bad_qtl = pd.DataFrame() + else: + bad_qtl = pd.concat(bad_qtl) + return qtl_res, bad_qtl + + +def phe_PCA(omics_phe, qtl): + omics_phe = omics_phe.fillna(omics_phe.mean()) + qtl_uniq = pd.DataFrame() + for phe_name in qtl['phe_name'].unique(): + qtl_sub = qtl.loc[qtl.phe_name==phe_name,:] + if qtl_sub.shape[0]==1: + qtl_uniq = pd.concat([qtl_uniq,qtl_sub]) + else: + qtl_sub = qtl_sub.sort_values(by=['SP2_num'],ascending=False) + if qtl_sub.iloc[0,:]['SP2_num'] / qtl_sub.iloc[1,:]['SP2_num'] > 2: + qtl_uniq = pd.concat([qtl_uniq,qtl_sub.iloc[0,:].to_frame().T]) + else: + qtl_uniq = pd.concat([qtl_uniq,qtl_sub.loc[qtl_sub['P'].idxmin(),:].to_frame().T]) + merged_qtl_uniq = merge_qtl(qtl_uniq) + merged_qtl_uniq.loc[:,'phe_name_num'] = merged_qtl_uniq['phe_name'].apply(lambda x:len(x.split(','))) + merged_qtl_uniq = merged_qtl_uniq.sort_values(by='phe_name_num',ascending=False) + omics_sig_phe = omics_phe.loc[:,[p.replace('m.z','m/z') for p in qtl['phe_name'].unique()]] + omics_sig_phe_labeled = omics_sig_phe.T + omics_sig_phe_labeled.loc[:,'label'] = 0 + omics_phe_pc = pd.DataFrame() + n=1 + for phe_name in merged_qtl_uniq.loc[merged_qtl_uniq['phe_name_num']>=2,'phe_name']: + omics_phe_sub = omics_sig_phe.loc[:,[p.replace('m.z','m/z') for p in phe_name.split(',')]] + omics_phe_sub_pc,omics_sig_phe_labeled,n = phe_cluster(omics_phe_sub, omics_sig_phe_labeled, n) + omics_phe_pc = pd.concat([omics_phe_pc, omics_phe_sub_pc],axis=1) + clustered_omics_phe = pd.merge(omics_phe_pc.loc[:,~omics_phe_pc.columns.isin(omics_sig_phe.columns)],omics_sig_phe_labeled.loc[omics_sig_phe_labeled.label==0,:].drop('label',axis=1).T,left_index=True,right_index=True) + return clustered_omics_phe, omics_sig_phe_labeled