--- a +++ b/BioAid/base.py @@ -0,0 +1,294 @@ +# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa +# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com + +## Contents: +## extractSeqFromFastaToList +## validateSquence +## compl +## rev_compl +## unique +## createChrList +## dataFrameImport + +import logging + +def extractSeqFromFastaToList(fasta_file_path: str) -> list: + '''Extracts sequence information from a FASTA file and returns it as a nested list. + + Args: + fasta_file_path (str): The path to the FASTA file. + + Returns: + list: A nested list of the form [[title_0, sequence_0], [title_1, sequence_1], ...]. + Each element of the list is a list containing the title and sequence of a sequence in the FASTA file. + ''' + fasta_file = open(fasta_file_path, 'r') + contents = fasta_file.readlines() + fasta_file.close() + + fasta_list=[] + + for i in contents: + if '>' in i: + fasta_list.append([i.strip('>').strip(),'']) + else: + fasta_list[-1][1] = fasta_list[-1][1]+i.strip() + logging.info(f"Extraction of sequence information from {fasta_file_path} finished.") + return fasta_list + +def validateSquence(sequence: str) -> bool: + '''Checks if a DNA sequence contains only valid nucleotides. + + Args: + sequence (str): A DNA sequence to be validated. + + Returns: + bool: True if the sequence contains only valid nucleotides, False otherwise. + If the sequence contains non-canonical nucleotides, a log message is generated. + If the sequence contains 'N's, a warning log message is generated. + The log messages are written to the default logger. + ''' + bases = "ATGCatgcN" + for i in sequence: + if i not in bases: + logging.info(f"Sequence doesn't contain cannonical nucleotides: {i}") + return(False) + if i == "N": + logging.warning(f"Warning, sequence contains 'N's") + return(True) + +def compl(base: str) -> str: + """ + Returns the complementary base for a given DNA base. + + Args: + base (str): A single character representing a DNA base. Must be one of 'A', 'T', 'G', 'C', or '-'. + + Returns: + str: The complementary base for the given DNA base. Returns '-' if the input is '-'. + + Raises: + ValueError: If the input is not one of 'A', 'T', 'G', 'C', or '-'. + """ + if base == "A": + return('T') + elif base == "T": + return('A') + elif base == "G": + return('C') + elif base == "C": + return('G') + elif base == "-": + return('-') + else: + logging.error(f"Invalid base: {base}") + raise ValueError(f"Invalid base: {base}") + +def rev_compl(seq: str) -> str: + """ + Returns the reverse complement of a given DNA sequence. + + Args: + seq (str): A string representing a DNA sequence. Must only contain characters 'A', 'T', 'G', 'C', or '-'. + + Returns: + str: The reverse complement of the given DNA sequence. + + Raises: + ValueError: If the input sequence contains characters other than 'A', 'T', 'G', 'C', or '-' (Through compl()) + """ + new_seq = "" + for base in seq: + new_base = compl(base) + new_seq = new_base + new_seq + return(new_seq) + +def unique(list1: list) -> list: + """ + Returns a new list containing only the unique elements of the input list. + + Args: + list1 (list): A list of elements. + + Returns: + list: A new list containing only the unique elements of the input list. + + Examples: + >>> unique([1, 2, 3, 2, 1]) + [1, 2, 3] + + >>> unique(['a', 'b', 'c', 'b', 'a']) + ['a', 'b', 'c'] + """ + + list_set = set(list1) + unique_list = (list(list_set)) + return unique_list + +def createChrList(chrNum: int) -> list: + """ + Creates a list of chromosome names. + + Args: + chrNum (int): The number of chromosomes to include in the list. + + Returns: + list: A list of chromosome names, where each name is a string of the form "chrX", where X is the chromosome number. + + Examples: + >>> createChrList(3) + ['chr1', 'chr2', 'chr3'] + + >>> createChrList(5) + ['chr1', 'chr2', 'chr3', 'chr4', 'chr5'] + """ + chrList = [] + for i in range(chrNum): + chrList.append(f"chr{i+1}") + return(chrList) + +def dataFrameImport(directory: str) -> tuple: + """ + Imports all TSV files in a directory as Pandas dataframes. + + Args: + directory (str): The path to the directory containing the TSV files. + + Returns: + tuple: A tuple containing two elements: + - A list of Pandas dataframes, where each dataframe corresponds to a TSV file in the directory. + - A list of sample names, where each name is a string representing the name of the corresponding TSV file. + """ + import os + import pandas as pd + frames_list_samples = [] + sample_names = [] + + counter=1 + for filename in os.listdir(directory): + if filename.endswith(".tsv"): + sample_names.append(filename[:10].strip()) + path = os.path.join(directory, filename) + globals()[f"sample{counter}"] = pd.read_csv(path, sep='\t') + print(f'dataframe {sample_names[-1]} has {len(globals()[f"sample{counter}"])} total rows') + frames_list_samples.append(globals()[f"sample{counter}"]) + counter+=1 + logging.info(f"found {len(frames_list_samples)} samples in {directory}") + logging.info(len(sample_names), sample_names) + + return(frames_list_samples, sample_names) + +def pullGenomicContext(list_of_positions: list, fasta_file_path: str, context_flank: int = 5) -> list: + """ + Extracts genomic context of a specified length around each position in a list of positions. + + Args: + list_of_positions (list): A list of positions, where each position is a list of two elements: the chromosome name and the position. + fasta_file_path (str): The path to the FASTA file containing the reference genome. + context_flank (int, optional): The length of the context to extract on either side of each position. Defaults to 5. + + Returns: + list: A list of genomic contexts, where each context is a list of four elements: + - The sequence of bases to the left of the position. + - The query base at the position. + - The sequence of bases to the right of the position. + - The chromosome name. + """ + fasta_list = extractSeqFromFastaToList(fasta_file_path) + context_list = [] + + for i in fasta_list: + logging.debug(f"extracting context from {i[0]}. It has {len(i[1])} bases.") + sequence = i[1] + for j in list_of_positions: + + #check if it is the same chromosome + if j[0] != i[0]: + continue + #set j to be the position + j = j[1] + #find the position in the fasta list + #remember that the fasta sequences start with 1, not 0, so we need to subtract 1 from the position + + try: + query_base = sequence[j-1] + + except: + logging.warning(f"position {j} not found in {i[0]}. Skipping...") + continue + + #extract the context + context_left = sequence[j-1-context_flank:j-1] + context_right = sequence[j:j+context_flank] + + context_list.append([context_left, query_base, context_right, i[0]]) + + return(context_list) + +def drawGenomicContext(context_list: list, show: bool = False, **kwargs: dict) -> None: + ''' + Draws a sequence logo of a list of genomic contexts. + + Args: + context_list (list): A list of genomic contexts, where each context is a list of four elements: + - The sequence of bases to the left of the position. + - The query base at the position. + - The sequence of bases to the right of the position. + - The chromosome name. + show (bool, optional): Whether to show the plot. Defaults to False. + **kwargs (dict): Additional keyword arguments to pass to the function. Supported arguments include: + - save_path (str): The path to save the plot to. + + Returns: + None + + Raises: + ImportError: If the required packages matplotlib.pyplot and pandas are not installed. + ''' + + import matplotlib.pyplot as plt + import pandas as pd + + #create a dataframe from the context list + df = pd.DataFrame(context_list, columns=['context_left', '0', 'context_right', 'chromosome']) + + #split the context into columns by base + split_left = df['context_left'].apply(lambda x: pd.Series(list(x))) + split_right = df['context_right'].apply(lambda x: pd.Series(list(x))) + + #rename the columns such that they are numbered. e.g. -3,-2,-1, 0, 1, 2, 3 + split_left.columns = [f"-{i}" for i in range(len(split_left.columns), 0, -1)] + split_right.columns = [f"{i+1}" for i in range(len(split_right.columns))] + + #make a new dataframe with the context split into columns, query still in the middle + df_context = pd.concat([split_left, df['0'], split_right], axis=1) + + df_context_freq = df_context.apply(pd.value_counts).fillna(0) #get the frequencies of each base in each column + df_context_freq = df_context_freq.div(df_context_freq.sum(axis=0), axis=1) #normalize the frequencies + df_context_freq = df_context_freq.transpose() + df_context_freq = df_context_freq*100 #set the frequencies to % + + #set color scheme for the bases so that they are consistent + colors = { + 'A':'tab:green', + 'T':'tab:red', + 'G':'tab:orange', + 'C':'tab:blue', + 'N':'tab:gray', + } + + #plot the dataframe as a stacked barplot + column_count = len(df_context_freq.columns) #count the number of columns + df_context_freq.plot.bar(stacked=True, figsize=(column_count+3,4), color = list(colors.values())) + plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) + plt.ylabel('Frequency (%)') + plt.xlabel('Relative Position') + + if 'save_path' in kwargs: + plt.savefig(kwargs['save_path'], bbox_inches='tight', dpi=300) + logging.info(f"saved plot to {kwargs['save_path']}") + + if show == True: + plt.show() + + plt.close()