Diff of /BioAid/base.py [000000] .. [deb8e5]

Switch to side-by-side view

--- a
+++ b/BioAid/base.py
@@ -0,0 +1,294 @@
+# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa 
+# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com
+
+## Contents:
+## extractSeqFromFastaToList
+## validateSquence
+## compl
+## rev_compl
+## unique
+## createChrList
+## dataFrameImport
+
+import logging
+
+def extractSeqFromFastaToList(fasta_file_path: str) -> list:
+    '''Extracts sequence information from a FASTA file and returns it as a nested list.
+
+    Args:
+        fasta_file_path (str): The path to the FASTA file.
+
+    Returns:
+        list: A nested list of the form [[title_0, sequence_0], [title_1, sequence_1], ...].
+            Each element of the list is a list containing the title and sequence of a sequence in the FASTA file.
+    '''
+    fasta_file = open(fasta_file_path, 'r')
+    contents = fasta_file.readlines()
+    fasta_file.close()
+
+    fasta_list=[]
+
+    for i in contents:
+        if '>' in i:
+            fasta_list.append([i.strip('>').strip(),''])
+        else:
+            fasta_list[-1][1] = fasta_list[-1][1]+i.strip()
+    logging.info(f"Extraction of sequence information from {fasta_file_path} finished.")
+    return fasta_list
+
+def validateSquence(sequence: str) -> bool:
+    '''Checks if a DNA sequence contains only valid nucleotides.
+
+    Args:
+        sequence (str): A DNA sequence to be validated.
+
+    Returns:
+        bool: True if the sequence contains only valid nucleotides, False otherwise.
+            If the sequence contains non-canonical nucleotides, a log message is generated.
+            If the sequence contains 'N's, a warning log message is generated.
+            The log messages are written to the default logger.
+    '''
+    bases = "ATGCatgcN"
+    for i in sequence:
+        if i not in bases:
+            logging.info(f"Sequence doesn't contain cannonical nucleotides: {i}")
+            return(False)
+        if i == "N":
+            logging.warning(f"Warning, sequence contains 'N's")
+    return(True)
+
+def compl(base: str) -> str:
+    """
+    Returns the complementary base for a given DNA base.
+
+    Args:
+        base (str): A single character representing a DNA base. Must be one of 'A', 'T', 'G', 'C', or '-'.
+
+    Returns:
+        str: The complementary base for the given DNA base. Returns '-' if the input is '-'.
+
+    Raises:
+        ValueError: If the input is not one of 'A', 'T', 'G', 'C', or '-'.
+    """
+    if base == "A":
+        return('T')
+    elif base == "T":
+        return('A')
+    elif base == "G":
+        return('C')
+    elif base == "C":
+        return('G')
+    elif base == "-":
+        return('-')
+    else:
+        logging.error(f"Invalid base: {base}")
+        raise ValueError(f"Invalid base: {base}")
+
+def rev_compl(seq: str) -> str:
+    """
+    Returns the reverse complement of a given DNA sequence.
+
+    Args:
+        seq (str): A string representing a DNA sequence. Must only contain characters 'A', 'T', 'G', 'C', or '-'.
+
+    Returns:
+        str: The reverse complement of the given DNA sequence.
+
+    Raises:
+        ValueError: If the input sequence contains characters other than 'A', 'T', 'G', 'C', or '-' (Through compl())
+    """
+    new_seq = ""
+    for base in seq:
+        new_base = compl(base)
+        new_seq = new_base + new_seq
+    return(new_seq)
+
+def unique(list1: list) -> list:
+    """
+    Returns a new list containing only the unique elements of the input list.
+
+    Args:
+        list1 (list): A list of elements.
+
+    Returns:
+        list: A new list containing only the unique elements of the input list.
+
+    Examples:
+        >>> unique([1, 2, 3, 2, 1])
+        [1, 2, 3]
+
+        >>> unique(['a', 'b', 'c', 'b', 'a'])
+        ['a', 'b', 'c']
+    """
+
+    list_set = set(list1) 
+    unique_list = (list(list_set))
+    return unique_list
+
+def createChrList(chrNum: int) -> list:
+    """
+    Creates a list of chromosome names.
+
+    Args:
+        chrNum (int): The number of chromosomes to include in the list.
+
+    Returns:
+        list: A list of chromosome names, where each name is a string of the form "chrX", where X is the chromosome number.
+
+    Examples:
+        >>> createChrList(3)
+        ['chr1', 'chr2', 'chr3']
+
+        >>> createChrList(5)
+        ['chr1', 'chr2', 'chr3', 'chr4', 'chr5']
+    """
+    chrList = []
+    for i in range(chrNum):
+        chrList.append(f"chr{i+1}")
+    return(chrList)
+
+def dataFrameImport(directory: str) -> tuple:
+    """
+    Imports all TSV files in a directory as Pandas dataframes.
+
+    Args:
+        directory (str): The path to the directory containing the TSV files.
+
+    Returns:
+        tuple: A tuple containing two elements:
+            - A list of Pandas dataframes, where each dataframe corresponds to a TSV file in the directory.
+            - A list of sample names, where each name is a string representing the name of the corresponding TSV file.
+    """
+    import os
+    import pandas as pd
+    frames_list_samples = []
+    sample_names = []
+
+    counter=1
+    for filename in os.listdir(directory):
+            if filename.endswith(".tsv"):
+                    sample_names.append(filename[:10].strip())
+                    path = os.path.join(directory, filename)
+                    globals()[f"sample{counter}"] = pd.read_csv(path, sep='\t')
+                    print(f'dataframe {sample_names[-1]} has {len(globals()[f"sample{counter}"])} total rows')
+                    frames_list_samples.append(globals()[f"sample{counter}"])
+                    counter+=1
+    logging.info(f"found {len(frames_list_samples)} samples in {directory}")
+    logging.info(len(sample_names), sample_names)
+
+    return(frames_list_samples, sample_names)
+
+def pullGenomicContext(list_of_positions: list, fasta_file_path: str, context_flank: int = 5) -> list:
+    """
+    Extracts genomic context of a specified length around each position in a list of positions.
+
+    Args:
+        list_of_positions (list): A list of positions, where each position is a list of two elements: the chromosome name and the position.
+        fasta_file_path (str): The path to the FASTA file containing the reference genome.
+        context_flank (int, optional): The length of the context to extract on either side of each position. Defaults to 5.
+
+    Returns:
+        list: A list of genomic contexts, where each context is a list of four elements:
+            - The sequence of bases to the left of the position.
+            - The query base at the position.
+            - The sequence of bases to the right of the position.
+            - The chromosome name.
+    """
+    fasta_list = extractSeqFromFastaToList(fasta_file_path)
+    context_list = []
+
+    for i in fasta_list:
+        logging.debug(f"extracting context from {i[0]}. It has {len(i[1])} bases.")
+        sequence = i[1]
+        for j in list_of_positions:
+
+            #check if it is the same chromosome
+            if j[0] != i[0]:
+                continue
+            #set j to be the position
+            j = j[1]
+            #find the position in the fasta list
+            #remember that the fasta sequences start with 1, not 0, so we need to subtract 1 from the position
+
+            try:
+                query_base = sequence[j-1]
+
+            except:
+                logging.warning(f"position {j} not found in {i[0]}. Skipping...")
+                continue
+
+            #extract the context
+            context_left = sequence[j-1-context_flank:j-1]
+            context_right = sequence[j:j+context_flank]
+
+            context_list.append([context_left, query_base, context_right, i[0]])
+
+    return(context_list)
+
+def drawGenomicContext(context_list: list, show: bool = False, **kwargs: dict) -> None:
+    '''
+    Draws a sequence logo of a list of genomic contexts.
+
+    Args:
+        context_list (list): A list of genomic contexts, where each context is a list of four elements:
+            - The sequence of bases to the left of the position.
+            - The query base at the position.
+            - The sequence of bases to the right of the position.
+            - The chromosome name.
+        show (bool, optional): Whether to show the plot. Defaults to False.
+        **kwargs (dict): Additional keyword arguments to pass to the function. Supported arguments include:
+            - save_path (str): The path to save the plot to.
+
+    Returns:
+        None
+
+    Raises:
+        ImportError: If the required packages matplotlib.pyplot and pandas are not installed.
+    '''
+    
+    import matplotlib.pyplot as plt
+    import pandas as pd
+
+    #create a dataframe from the context list
+    df = pd.DataFrame(context_list, columns=['context_left', '0', 'context_right', 'chromosome'])
+
+    #split the context into columns by base
+    split_left = df['context_left'].apply(lambda x: pd.Series(list(x)))
+    split_right = df['context_right'].apply(lambda x: pd.Series(list(x)))
+
+    #rename the columns such that they are numbered. e.g. -3,-2,-1, 0, 1, 2, 3
+    split_left.columns = [f"-{i}" for i in range(len(split_left.columns), 0, -1)]
+    split_right.columns = [f"{i+1}" for i in range(len(split_right.columns))]
+
+    #make a new dataframe with the context split into columns, query still in the middle
+    df_context = pd.concat([split_left, df['0'], split_right], axis=1)
+
+    df_context_freq = df_context.apply(pd.value_counts).fillna(0)              #get the frequencies of each base in each column
+    df_context_freq = df_context_freq.div(df_context_freq.sum(axis=0), axis=1) #normalize the frequencies
+    df_context_freq = df_context_freq.transpose()
+    df_context_freq = df_context_freq*100                                      #set the frequencies to %
+    
+    #set color scheme for the bases so that they are consistent
+    colors = {
+                'A':'tab:green', 
+                'T':'tab:red',
+                'G':'tab:orange',
+                'C':'tab:blue',
+                'N':'tab:gray',
+                }
+    
+    #plot the dataframe as a stacked barplot
+    column_count = len(df_context_freq.columns) #count the number of columns
+    df_context_freq.plot.bar(stacked=True, figsize=(column_count+3,4), color = list(colors.values()))
+    plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5))
+    plt.ylabel('Frequency (%)')
+    plt.xlabel('Relative Position')
+
+    if 'save_path' in kwargs:
+        plt.savefig(kwargs['save_path'], bbox_inches='tight', dpi=300)
+        logging.info(f"saved plot to {kwargs['save_path']}")
+
+    if show == True:
+        plt.show()
+
+    plt.close()