|
a |
|
b/BioAid/base.py |
|
|
1 |
# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa |
|
|
2 |
# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com |
|
|
3 |
|
|
|
4 |
## Contents: |
|
|
5 |
## extractSeqFromFastaToList |
|
|
6 |
## validateSquence |
|
|
7 |
## compl |
|
|
8 |
## rev_compl |
|
|
9 |
## unique |
|
|
10 |
## createChrList |
|
|
11 |
## dataFrameImport |
|
|
12 |
|
|
|
13 |
import logging |
|
|
14 |
|
|
|
15 |
def extractSeqFromFastaToList(fasta_file_path: str) -> list: |
|
|
16 |
'''Extracts sequence information from a FASTA file and returns it as a nested list. |
|
|
17 |
|
|
|
18 |
Args: |
|
|
19 |
fasta_file_path (str): The path to the FASTA file. |
|
|
20 |
|
|
|
21 |
Returns: |
|
|
22 |
list: A nested list of the form [[title_0, sequence_0], [title_1, sequence_1], ...]. |
|
|
23 |
Each element of the list is a list containing the title and sequence of a sequence in the FASTA file. |
|
|
24 |
''' |
|
|
25 |
fasta_file = open(fasta_file_path, 'r') |
|
|
26 |
contents = fasta_file.readlines() |
|
|
27 |
fasta_file.close() |
|
|
28 |
|
|
|
29 |
fasta_list=[] |
|
|
30 |
|
|
|
31 |
for i in contents: |
|
|
32 |
if '>' in i: |
|
|
33 |
fasta_list.append([i.strip('>').strip(),'']) |
|
|
34 |
else: |
|
|
35 |
fasta_list[-1][1] = fasta_list[-1][1]+i.strip() |
|
|
36 |
logging.info(f"Extraction of sequence information from {fasta_file_path} finished.") |
|
|
37 |
return fasta_list |
|
|
38 |
|
|
|
39 |
def validateSquence(sequence: str) -> bool: |
|
|
40 |
'''Checks if a DNA sequence contains only valid nucleotides. |
|
|
41 |
|
|
|
42 |
Args: |
|
|
43 |
sequence (str): A DNA sequence to be validated. |
|
|
44 |
|
|
|
45 |
Returns: |
|
|
46 |
bool: True if the sequence contains only valid nucleotides, False otherwise. |
|
|
47 |
If the sequence contains non-canonical nucleotides, a log message is generated. |
|
|
48 |
If the sequence contains 'N's, a warning log message is generated. |
|
|
49 |
The log messages are written to the default logger. |
|
|
50 |
''' |
|
|
51 |
bases = "ATGCatgcN" |
|
|
52 |
for i in sequence: |
|
|
53 |
if i not in bases: |
|
|
54 |
logging.info(f"Sequence doesn't contain cannonical nucleotides: {i}") |
|
|
55 |
return(False) |
|
|
56 |
if i == "N": |
|
|
57 |
logging.warning(f"Warning, sequence contains 'N's") |
|
|
58 |
return(True) |
|
|
59 |
|
|
|
60 |
def compl(base: str) -> str: |
|
|
61 |
""" |
|
|
62 |
Returns the complementary base for a given DNA base. |
|
|
63 |
|
|
|
64 |
Args: |
|
|
65 |
base (str): A single character representing a DNA base. Must be one of 'A', 'T', 'G', 'C', or '-'. |
|
|
66 |
|
|
|
67 |
Returns: |
|
|
68 |
str: The complementary base for the given DNA base. Returns '-' if the input is '-'. |
|
|
69 |
|
|
|
70 |
Raises: |
|
|
71 |
ValueError: If the input is not one of 'A', 'T', 'G', 'C', or '-'. |
|
|
72 |
""" |
|
|
73 |
if base == "A": |
|
|
74 |
return('T') |
|
|
75 |
elif base == "T": |
|
|
76 |
return('A') |
|
|
77 |
elif base == "G": |
|
|
78 |
return('C') |
|
|
79 |
elif base == "C": |
|
|
80 |
return('G') |
|
|
81 |
elif base == "-": |
|
|
82 |
return('-') |
|
|
83 |
else: |
|
|
84 |
logging.error(f"Invalid base: {base}") |
|
|
85 |
raise ValueError(f"Invalid base: {base}") |
|
|
86 |
|
|
|
87 |
def rev_compl(seq: str) -> str: |
|
|
88 |
""" |
|
|
89 |
Returns the reverse complement of a given DNA sequence. |
|
|
90 |
|
|
|
91 |
Args: |
|
|
92 |
seq (str): A string representing a DNA sequence. Must only contain characters 'A', 'T', 'G', 'C', or '-'. |
|
|
93 |
|
|
|
94 |
Returns: |
|
|
95 |
str: The reverse complement of the given DNA sequence. |
|
|
96 |
|
|
|
97 |
Raises: |
|
|
98 |
ValueError: If the input sequence contains characters other than 'A', 'T', 'G', 'C', or '-' (Through compl()) |
|
|
99 |
""" |
|
|
100 |
new_seq = "" |
|
|
101 |
for base in seq: |
|
|
102 |
new_base = compl(base) |
|
|
103 |
new_seq = new_base + new_seq |
|
|
104 |
return(new_seq) |
|
|
105 |
|
|
|
106 |
def unique(list1: list) -> list: |
|
|
107 |
""" |
|
|
108 |
Returns a new list containing only the unique elements of the input list. |
|
|
109 |
|
|
|
110 |
Args: |
|
|
111 |
list1 (list): A list of elements. |
|
|
112 |
|
|
|
113 |
Returns: |
|
|
114 |
list: A new list containing only the unique elements of the input list. |
|
|
115 |
|
|
|
116 |
Examples: |
|
|
117 |
>>> unique([1, 2, 3, 2, 1]) |
|
|
118 |
[1, 2, 3] |
|
|
119 |
|
|
|
120 |
>>> unique(['a', 'b', 'c', 'b', 'a']) |
|
|
121 |
['a', 'b', 'c'] |
|
|
122 |
""" |
|
|
123 |
|
|
|
124 |
list_set = set(list1) |
|
|
125 |
unique_list = (list(list_set)) |
|
|
126 |
return unique_list |
|
|
127 |
|
|
|
128 |
def createChrList(chrNum: int) -> list: |
|
|
129 |
""" |
|
|
130 |
Creates a list of chromosome names. |
|
|
131 |
|
|
|
132 |
Args: |
|
|
133 |
chrNum (int): The number of chromosomes to include in the list. |
|
|
134 |
|
|
|
135 |
Returns: |
|
|
136 |
list: A list of chromosome names, where each name is a string of the form "chrX", where X is the chromosome number. |
|
|
137 |
|
|
|
138 |
Examples: |
|
|
139 |
>>> createChrList(3) |
|
|
140 |
['chr1', 'chr2', 'chr3'] |
|
|
141 |
|
|
|
142 |
>>> createChrList(5) |
|
|
143 |
['chr1', 'chr2', 'chr3', 'chr4', 'chr5'] |
|
|
144 |
""" |
|
|
145 |
chrList = [] |
|
|
146 |
for i in range(chrNum): |
|
|
147 |
chrList.append(f"chr{i+1}") |
|
|
148 |
return(chrList) |
|
|
149 |
|
|
|
150 |
def dataFrameImport(directory: str) -> tuple: |
|
|
151 |
""" |
|
|
152 |
Imports all TSV files in a directory as Pandas dataframes. |
|
|
153 |
|
|
|
154 |
Args: |
|
|
155 |
directory (str): The path to the directory containing the TSV files. |
|
|
156 |
|
|
|
157 |
Returns: |
|
|
158 |
tuple: A tuple containing two elements: |
|
|
159 |
- A list of Pandas dataframes, where each dataframe corresponds to a TSV file in the directory. |
|
|
160 |
- A list of sample names, where each name is a string representing the name of the corresponding TSV file. |
|
|
161 |
""" |
|
|
162 |
import os |
|
|
163 |
import pandas as pd |
|
|
164 |
frames_list_samples = [] |
|
|
165 |
sample_names = [] |
|
|
166 |
|
|
|
167 |
counter=1 |
|
|
168 |
for filename in os.listdir(directory): |
|
|
169 |
if filename.endswith(".tsv"): |
|
|
170 |
sample_names.append(filename[:10].strip()) |
|
|
171 |
path = os.path.join(directory, filename) |
|
|
172 |
globals()[f"sample{counter}"] = pd.read_csv(path, sep='\t') |
|
|
173 |
print(f'dataframe {sample_names[-1]} has {len(globals()[f"sample{counter}"])} total rows') |
|
|
174 |
frames_list_samples.append(globals()[f"sample{counter}"]) |
|
|
175 |
counter+=1 |
|
|
176 |
logging.info(f"found {len(frames_list_samples)} samples in {directory}") |
|
|
177 |
logging.info(len(sample_names), sample_names) |
|
|
178 |
|
|
|
179 |
return(frames_list_samples, sample_names) |
|
|
180 |
|
|
|
181 |
def pullGenomicContext(list_of_positions: list, fasta_file_path: str, context_flank: int = 5) -> list: |
|
|
182 |
""" |
|
|
183 |
Extracts genomic context of a specified length around each position in a list of positions. |
|
|
184 |
|
|
|
185 |
Args: |
|
|
186 |
list_of_positions (list): A list of positions, where each position is a list of two elements: the chromosome name and the position. |
|
|
187 |
fasta_file_path (str): The path to the FASTA file containing the reference genome. |
|
|
188 |
context_flank (int, optional): The length of the context to extract on either side of each position. Defaults to 5. |
|
|
189 |
|
|
|
190 |
Returns: |
|
|
191 |
list: A list of genomic contexts, where each context is a list of four elements: |
|
|
192 |
- The sequence of bases to the left of the position. |
|
|
193 |
- The query base at the position. |
|
|
194 |
- The sequence of bases to the right of the position. |
|
|
195 |
- The chromosome name. |
|
|
196 |
""" |
|
|
197 |
fasta_list = extractSeqFromFastaToList(fasta_file_path) |
|
|
198 |
context_list = [] |
|
|
199 |
|
|
|
200 |
for i in fasta_list: |
|
|
201 |
logging.debug(f"extracting context from {i[0]}. It has {len(i[1])} bases.") |
|
|
202 |
sequence = i[1] |
|
|
203 |
for j in list_of_positions: |
|
|
204 |
|
|
|
205 |
#check if it is the same chromosome |
|
|
206 |
if j[0] != i[0]: |
|
|
207 |
continue |
|
|
208 |
#set j to be the position |
|
|
209 |
j = j[1] |
|
|
210 |
#find the position in the fasta list |
|
|
211 |
#remember that the fasta sequences start with 1, not 0, so we need to subtract 1 from the position |
|
|
212 |
|
|
|
213 |
try: |
|
|
214 |
query_base = sequence[j-1] |
|
|
215 |
|
|
|
216 |
except: |
|
|
217 |
logging.warning(f"position {j} not found in {i[0]}. Skipping...") |
|
|
218 |
continue |
|
|
219 |
|
|
|
220 |
#extract the context |
|
|
221 |
context_left = sequence[j-1-context_flank:j-1] |
|
|
222 |
context_right = sequence[j:j+context_flank] |
|
|
223 |
|
|
|
224 |
context_list.append([context_left, query_base, context_right, i[0]]) |
|
|
225 |
|
|
|
226 |
return(context_list) |
|
|
227 |
|
|
|
228 |
def drawGenomicContext(context_list: list, show: bool = False, **kwargs: dict) -> None: |
|
|
229 |
''' |
|
|
230 |
Draws a sequence logo of a list of genomic contexts. |
|
|
231 |
|
|
|
232 |
Args: |
|
|
233 |
context_list (list): A list of genomic contexts, where each context is a list of four elements: |
|
|
234 |
- The sequence of bases to the left of the position. |
|
|
235 |
- The query base at the position. |
|
|
236 |
- The sequence of bases to the right of the position. |
|
|
237 |
- The chromosome name. |
|
|
238 |
show (bool, optional): Whether to show the plot. Defaults to False. |
|
|
239 |
**kwargs (dict): Additional keyword arguments to pass to the function. Supported arguments include: |
|
|
240 |
- save_path (str): The path to save the plot to. |
|
|
241 |
|
|
|
242 |
Returns: |
|
|
243 |
None |
|
|
244 |
|
|
|
245 |
Raises: |
|
|
246 |
ImportError: If the required packages matplotlib.pyplot and pandas are not installed. |
|
|
247 |
''' |
|
|
248 |
|
|
|
249 |
import matplotlib.pyplot as plt |
|
|
250 |
import pandas as pd |
|
|
251 |
|
|
|
252 |
#create a dataframe from the context list |
|
|
253 |
df = pd.DataFrame(context_list, columns=['context_left', '0', 'context_right', 'chromosome']) |
|
|
254 |
|
|
|
255 |
#split the context into columns by base |
|
|
256 |
split_left = df['context_left'].apply(lambda x: pd.Series(list(x))) |
|
|
257 |
split_right = df['context_right'].apply(lambda x: pd.Series(list(x))) |
|
|
258 |
|
|
|
259 |
#rename the columns such that they are numbered. e.g. -3,-2,-1, 0, 1, 2, 3 |
|
|
260 |
split_left.columns = [f"-{i}" for i in range(len(split_left.columns), 0, -1)] |
|
|
261 |
split_right.columns = [f"{i+1}" for i in range(len(split_right.columns))] |
|
|
262 |
|
|
|
263 |
#make a new dataframe with the context split into columns, query still in the middle |
|
|
264 |
df_context = pd.concat([split_left, df['0'], split_right], axis=1) |
|
|
265 |
|
|
|
266 |
df_context_freq = df_context.apply(pd.value_counts).fillna(0) #get the frequencies of each base in each column |
|
|
267 |
df_context_freq = df_context_freq.div(df_context_freq.sum(axis=0), axis=1) #normalize the frequencies |
|
|
268 |
df_context_freq = df_context_freq.transpose() |
|
|
269 |
df_context_freq = df_context_freq*100 #set the frequencies to % |
|
|
270 |
|
|
|
271 |
#set color scheme for the bases so that they are consistent |
|
|
272 |
colors = { |
|
|
273 |
'A':'tab:green', |
|
|
274 |
'T':'tab:red', |
|
|
275 |
'G':'tab:orange', |
|
|
276 |
'C':'tab:blue', |
|
|
277 |
'N':'tab:gray', |
|
|
278 |
} |
|
|
279 |
|
|
|
280 |
#plot the dataframe as a stacked barplot |
|
|
281 |
column_count = len(df_context_freq.columns) #count the number of columns |
|
|
282 |
df_context_freq.plot.bar(stacked=True, figsize=(column_count+3,4), color = list(colors.values())) |
|
|
283 |
plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) |
|
|
284 |
plt.ylabel('Frequency (%)') |
|
|
285 |
plt.xlabel('Relative Position') |
|
|
286 |
|
|
|
287 |
if 'save_path' in kwargs: |
|
|
288 |
plt.savefig(kwargs['save_path'], bbox_inches='tight', dpi=300) |
|
|
289 |
logging.info(f"saved plot to {kwargs['save_path']}") |
|
|
290 |
|
|
|
291 |
if show == True: |
|
|
292 |
plt.show() |
|
|
293 |
|
|
|
294 |
plt.close() |