|
a |
|
b/BioAid/complexity.py |
|
|
1 |
# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa |
|
|
2 |
# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com |
|
|
3 |
|
|
|
4 |
## Contents: |
|
|
5 |
## wordsInSequence |
|
|
6 |
## findComplexity |
|
|
7 |
## movingWindow |
|
|
8 |
## createLogFile |
|
|
9 |
## createSequenceList |
|
|
10 |
## createDataFrameFromLogFile |
|
|
11 |
|
|
|
12 |
import pandas as pd |
|
|
13 |
import ast |
|
|
14 |
from pandas import DataFrame |
|
|
15 |
from .base import unique |
|
|
16 |
|
|
|
17 |
def wordsInSequence(sequence: str, treeLevel: int) -> list: |
|
|
18 |
""" |
|
|
19 |
Extracts all possible words of a specified length from a genetic sequence. |
|
|
20 |
|
|
|
21 |
Args: |
|
|
22 |
sequence (str): The genetic sequence to extract words from. |
|
|
23 |
treeLevel (int): The length of the words to extract. |
|
|
24 |
|
|
|
25 |
Returns: |
|
|
26 |
list: A list of words, where each word is a string of length treeLevel. |
|
|
27 |
|
|
|
28 |
Examples: |
|
|
29 |
>>> wordsInSequence("ATCGATCG", 3) |
|
|
30 |
['ATC', 'TCG', 'CGA', 'GAT', 'ATC', 'TCG'] |
|
|
31 |
|
|
|
32 |
>>> wordsInSequence("ATCGATCG", 4) |
|
|
33 |
['ATCG', 'TCGA', 'CGAT', 'GATC', 'ATCG'] |
|
|
34 |
""" |
|
|
35 |
length = len(sequence) |
|
|
36 |
max_possible_words_in_sequence = length-(treeLevel-1) |
|
|
37 |
wordList = [] |
|
|
38 |
for i in range(max_possible_words_in_sequence): |
|
|
39 |
wordList.append(sequence[0+i:treeLevel+i]) |
|
|
40 |
return wordList |
|
|
41 |
|
|
|
42 |
def findComplexity(sequence, treeLevel, complexity_threshold): |
|
|
43 |
# takes a genetic sequence and calculates the linguistic complexity |
|
|
44 |
# for that sequence for word length treeLevel |
|
|
45 |
|
|
|
46 |
wordList = wordsInSequence(sequence,treeLevel) |
|
|
47 |
wordList = unique(wordList) |
|
|
48 |
|
|
|
49 |
if len(sequence) > 4**treeLevel: |
|
|
50 |
complexity = len(wordList)/(4**treeLevel) |
|
|
51 |
else: |
|
|
52 |
complexity = len(wordList)/(len(sequence)-(treeLevel-1)) |
|
|
53 |
|
|
|
54 |
if (complexity < complexity_threshold) & (complexity > 0.2): |
|
|
55 |
print("Complexity at tree level " + str(treeLevel) + " is " + str(complexity) + " for sequence: "+str(sequence)) |
|
|
56 |
|
|
|
57 |
return ([complexity, treeLevel, sequence]) |
|
|
58 |
|
|
|
59 |
def movingWindow(sequence: str, treeLevel: int = 8, chunkSize: int = 20, complexity_threshold: float = 0.2, showGraph: bool = False) -> list: |
|
|
60 |
""" |
|
|
61 |
Calculates the linguistic complexity scores for windows of a specified size and word lengths from 1 to a specified level. |
|
|
62 |
|
|
|
63 |
Args: |
|
|
64 |
sequence (str): The genetic sequence to calculate complexity scores for. |
|
|
65 |
treeLevel (int, optional): The maximum length of words to use in the complexity calculation. Defaults to 8. |
|
|
66 |
chunkSize (int, optional): The size of the windows to use in the complexity calculation. Defaults to 20. |
|
|
67 |
complexity_threshold (float, optional): The threshold below which a window is considered to have low complexity. Defaults to 0.2. |
|
|
68 |
showGraph (bool, optional): Whether to show a graph of the complexity scores. Defaults to False. |
|
|
69 |
|
|
|
70 |
Returns: |
|
|
71 |
list: A list containing the lowest complexity score window in the format [Boolean, complexity, treeLevel], where Boolean denotes if the complexity score is lower (True) than complexity_threshold. |
|
|
72 |
|
|
|
73 |
Raises: |
|
|
74 |
ValueError: If the sequence is too short to calculate complexity scores. |
|
|
75 |
|
|
|
76 |
Examples: |
|
|
77 |
>>> movingWindow("ATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCG", treeLevel=4, chunkSize=10, complexity_threshold=0.1, showGraph=True) |
|
|
78 |
[True, 0.0, 4] |
|
|
79 |
|
|
|
80 |
>>> movingWindow("ATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCG", treeLevel=10, chunkSize=5, complexity_threshold=0.5, showGraph=True) |
|
|
81 |
[False, 0.5, 10] |
|
|
82 |
""" |
|
|
83 |
|
|
|
84 |
complexityList=[] |
|
|
85 |
|
|
|
86 |
for i in range(1, treeLevel+1): |
|
|
87 |
for chunk in range(len(sequence)-(chunkSize-1)): |
|
|
88 |
|
|
|
89 |
chunkSeq = sequence[chunk:(chunk+chunkSize)] |
|
|
90 |
complexityList.append(findComplexity(chunkSeq, i, complexity_threshold)) |
|
|
91 |
|
|
|
92 |
try: |
|
|
93 |
lowest=min(complexityList, key=lambda x: x[0]) |
|
|
94 |
|
|
|
95 |
except ValueError: |
|
|
96 |
print(f"ERROR with sequence: {sequence}. It is probably too short len={len(sequence)}. Skipping it.") |
|
|
97 |
print("complexityList: ", complexityList) |
|
|
98 |
lowest = [0,0,0] |
|
|
99 |
|
|
|
100 |
#print(complexityList) |
|
|
101 |
|
|
|
102 |
lowLvl=lowest[1] |
|
|
103 |
|
|
|
104 |
if showGraph: |
|
|
105 |
|
|
|
106 |
df = DataFrame(complexityList,columns=['complexity','tree','sequence']) |
|
|
107 |
df = df[['complexity','tree']] |
|
|
108 |
ax = df[df.tree == lowLvl].filter(items=["complexity"]).plot(fontsize=15, grid=True, figsize=(20,8)) |
|
|
109 |
ax.axhline(complexity_threshold, color="red", linestyle="--") |
|
|
110 |
ax.axhline(1, color="green", linestyle="--") |
|
|
111 |
ax.set_ylim(ymin=0) |
|
|
112 |
ax.set_title(str(lowest[2])) |
|
|
113 |
|
|
|
114 |
if lowest[0] < complexity_threshold: |
|
|
115 |
return([True, lowest[0], lowest[1]]) |
|
|
116 |
#return([True, lowest]) |
|
|
117 |
else: |
|
|
118 |
return([False, lowest[0], lowest[1]]) |
|
|
119 |
#return([False, lowest]) |
|
|
120 |
|
|
|
121 |
def createLogFile(seqlist: list, filename: str, complexity_threshold: float = 0.2, chunkSize: int = 20) -> None: |
|
|
122 |
""" |
|
|
123 |
Creates a log file containing the lowest linguistic complexity score for each sequence in a list. |
|
|
124 |
|
|
|
125 |
Args: |
|
|
126 |
seqlist (list): A list of genetic sequences to calculate complexity scores for. |
|
|
127 |
filename (str): The name of the output file to write the log to. |
|
|
128 |
complexity_threshold (float, optional): The threshold below which a sequence is considered to have low complexity. Defaults to 0.2. |
|
|
129 |
chunkSize (int, optional): The size of the windows to use in the complexity calculation. Defaults to 20. |
|
|
130 |
|
|
|
131 |
Returns: |
|
|
132 |
None |
|
|
133 |
""" |
|
|
134 |
outputf = open(filename, "w") |
|
|
135 |
truemmbir = 0 |
|
|
136 |
for i in seqlist: |
|
|
137 |
seqScore = movingWindow(i, complexity_threshold = complexity_threshold, chunkSize = chunkSize) |
|
|
138 |
if seqScore[0] == False: |
|
|
139 |
truemmbir += 1 |
|
|
140 |
outputf.write(str(seqScore[1])+"\n") |
|
|
141 |
print("sequences above threshold:", truemmbir) |
|
|
142 |
|
|
|
143 |
outputf.close() |
|
|
144 |
|
|
|
145 |
def createSequenceList(filename: str) -> list: |
|
|
146 |
""" |
|
|
147 |
Extracts genetic sequences from a file and returns them as a list. |
|
|
148 |
|
|
|
149 |
Args: |
|
|
150 |
filename (str): The name of the file to extract sequences from. |
|
|
151 |
|
|
|
152 |
Returns: |
|
|
153 |
list: A list of genetic sequences, where each sequence is a string. |
|
|
154 |
""" |
|
|
155 |
seqfile = open(filename, "r") |
|
|
156 |
lines = seqfile.readlines() |
|
|
157 |
seqlist=[] |
|
|
158 |
for line in lines: |
|
|
159 |
if "bir:" in line: |
|
|
160 |
if len(line) > 100: |
|
|
161 |
seqlist.append(line.strip("\n").strip("bir:-")) |
|
|
162 |
seqlist = list(dict.fromkeys(seqlist)) |
|
|
163 |
print(len(seqlist)) |
|
|
164 |
|
|
|
165 |
longseq=0 |
|
|
166 |
for i in seqlist: |
|
|
167 |
if len(i) > 300: |
|
|
168 |
longseq+=1 |
|
|
169 |
print(longseq) |
|
|
170 |
|
|
|
171 |
return seqlist |
|
|
172 |
|
|
|
173 |
def createDataFrameFromLogFile(logfile_path: str) -> DataFrame: |
|
|
174 |
""" |
|
|
175 |
Creates a pandas DataFrame from a log file containing linguistic complexity scores. |
|
|
176 |
|
|
|
177 |
Args: |
|
|
178 |
logfile (str): The name of the log file to create the DataFrame from. |
|
|
179 |
|
|
|
180 |
Returns: |
|
|
181 |
DataFrame: A pandas DataFrame containing the complexity scores, where each row represents a sequence and contains the complexity score, the word length, and the sequence itself. |
|
|
182 |
""" |
|
|
183 |
logfile = open(logfile_path, "r") |
|
|
184 |
lines = logfile.readlines() |
|
|
185 |
loglist=[] |
|
|
186 |
for line in lines: |
|
|
187 |
if "[" in line: |
|
|
188 |
loglist.append(line.strip("\n")) |
|
|
189 |
|
|
|
190 |
for i in range(len(loglist)): |
|
|
191 |
loglist[i] = ast.literal_eval(loglist[i]) |
|
|
192 |
print("total sequences:", len(loglist)) |
|
|
193 |
|
|
|
194 |
df = DataFrame(loglist,columns=['complexity', 'level', 'sequence']) |
|
|
195 |
|
|
|
196 |
return df |