[deb8e5]: / BioAid / MMBSearchTK / AnnoMMBS.py

Download this file

184 lines (158 with data), 7.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa
# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com
## Contents:
## verifyImperfectHomology
## add_gene
## add_exon
## createAnnotatedOutput
import pandas as pd
from pyensembl import EnsemblRelease
from pyensembl.exon import Exon
import logging
import regex as re
from ..base import createChrList
from ..base import rev_compl
from ..complexity import movingWindow
def verifyImperfectHomology(ref, query, min_homology=0.8):
mmbir = rev_compl(query)
mmbir_errors = round(len(mmbir)*(1-min_homology))
if mmbir_errors > 10:
mmbir_errors = 10
output_list = re.findall( '(' + mmbir + '){e<=' + str(mmbir_errors) + '}', ref)
print(f'Found {len(output_list)} possible templates for {query}: {output_list}')
if len(output_list) > 0:
return(True)
else:
return(False)
def add_gene(row):
data = EnsemblRelease(104)
#print("loaded genome! #addgene")
output = data.gene_names_at_locus(contig=row['chr'].strip("chr"), position=int(row["iBirStart"]))
print(output)
#output = str(output)
out_list = []
for gname in output:
if gname != "":
out_list.append(gname)
#print("appended_gene!")
return(out_list) #output
def add_exon(row):
data = EnsemblRelease(104)
#print("loaded genome! #addexon")
output = data.exons_at_locus(contig=row['chr'].strip("chr"), position=int(row["iBirStart"]))
#output = str(output)
out_list = []
for exon in output:
if isinstance(exon, Exon):
out_list.append(exon.exon_id)
return(out_list) #output
def createAnnotatedOutput(f_name, path, output_f_name):
import os
chrList=createChrList(25)
df = pd.DataFrame()
for chr in chrList:
try:
if os.stat(f"{path}{chr}/{f_name}").st_size == 0:
print(f"WARNING! File {path}{chr}/{f_name} is empty. Skipping...")
continue
with open(f"{path}{chr}/{f_name}") as f:
lines = f.readlines()
except:
print(f"WARNING! Couldn't read {path}{chr}/{f_name}. Make sure it's there.")
lines = None
continue
if (type(lines) == None):
print("WARNING! the type of lines is None. Skipping...")
else:
print(f"hello {chr}")
for i in range(len(lines)):
if "###" in lines[i]:
i-=1
if "Microhomology Insertion:" in lines[i+11]:
var0 = lines[i].strip()
var1 = lines[i+1].strip()
var2 = lines[i+2].strip("iBirStart:").strip()
var3 = lines[i+3].strip("Consensus/cluster number:").strip()
var4 = lines[i+4].strip("iDepth:").strip()
var5 = lines[i+5].strip("sBir:").strip()
var6 = lines[i+6].strip("sBirReversed:").strip()
var7 = lines[i+7].strip()
var8 = lines[i+8].strip("ref:").strip()
var9 = lines[i+9].strip("bir:").strip()
var10 = lines[i+10].strip("TEM:").strip()
var11 = lines[i+11].strip("Microhomology Insertion:").strip()
var12 = lines[i+12].strip("Microhomology template:").strip()
var13 = lines[i+13].strip("readStart: ").strip()
var14 = lines[i+14].strip("ref:").strip()
var15 = lines[i+15].strip("bir:").strip()
var16 = lines[i+16].strip()
var17 = lines[i+17].strip()
var18 = lines[i+18].strip()
var19 = lines[i+19].strip()
else:
var0 = lines[i].strip()
var1 = lines[i+1].strip()
var2 = lines[i+2].strip("iBirStart:").strip()
var3 = lines[i+3].strip("Consensus/cluster number:").strip()
var4 = lines[i+4].strip("iDepth:").strip()
var5 = lines[i+5].strip("sBir:").strip()
var6 = lines[i+6].strip("sBirReversed:").strip()
var7 = lines[i+7].strip()
var8 = lines[i+8].strip("ref:").strip()
var9 = lines[i+9].strip("bir:").strip()
var10 = lines[i+10].strip("TEM:").strip()
var11 = "Empty"
var12 = "Empty"
var13 = lines[i+11].strip("readStart: ").strip()
var14 = lines[i+12].strip("ref:").strip()
var15 = lines[i+13].strip("bir:").strip()
var16 = lines[i+14].strip()
var17 = lines[i+15].strip()
var18 = lines[i+16].strip()
event_dict = {
"chr":chr,
"iBirStart":var2,
"Consensus/cluster_number":var3,
"iDepth":var4,
"sBir":var5,
"sBirReversed":var6,
"ref":var8,
"bir":var9,
"TEM":var10,
"Microhomology_Insertion":var11,
"Microhomology_template":var12,
"readStart":var13,
"var16":var16,
"var17":var17,
"var18":var18
}
#make sure that the ref and bir are not empty, otherwise the movingWindow will fail
if (len(var8) == 0) or (len(var9) == 0):
print(f"ref or bir are empty for {path}{chr}/{f_name}")
print("Cheeck if they run correctly in the original output file...")
print("Skipping this event...")
#print the same messages to the error log
logging.basicConfig(filename='error.log', level=logging.DEBUG)
logging.debug(f"ref or bir are empty for {path}{chr}/{f_name}")
logging.debug("Cheeck if they run correctly in the original output file...")
logging.debug("Skipping this event...")
continue
event_df = pd.DataFrame.from_records([event_dict])
df = pd.concat([df, event_df], ignore_index=True)
print("Starting complexity")
try:
df[['ref_complexity_fail', 'ref_complexity_score', 'ref_complexity_tree']] = df.apply(lambda row: movingWindow(row.ref, complexity_threshold=0.2), axis=1, result_type ='expand')
except:
print(f"Error with ref_complexity for {path}{chr}/{f_name}")
try:
df[['bir_complexity_fail', 'bir_complexity_score', 'bir_complexity_tree']] = df.apply(lambda row: movingWindow(row.bir, complexity_threshold=0.2), axis=1, result_type ='expand')
except:
print(f"Error with bir_complexity for {path}{chr}/{f_name}")
print("Starting homology check")
df['homology_check_ref'] = df.apply(lambda row: verifyImperfectHomology(row.ref, row.sBir), axis=1)
df['homology_check_bir'] = df.apply(lambda row: verifyImperfectHomology(row.bir, row.sBir), axis=1)
print("Starting gene/exon annotation")
df["genes"] = df.apply(lambda row: add_gene(row), axis=1)
df["exones"] = df.apply(lambda row: add_exon(row), axis=1)
df.to_csv(output_f_name, sep="\t",index=False)
print(f"finished annotation... DF saved to {output_f_name}")