--- a +++ b/openomics/io/read_gtf.py @@ -0,0 +1,446 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import print_function, division, absolute_import + +import logging +from collections import OrderedDict +from os.path import exists +from typing import List, Dict, Union + +import dask.dataframe as dd +import numpy as np +import pandas as pd +from six import string_types +from six.moves import intern + +COLUMN_NAMES = [ + "seqname", + "source", + "feature", + "start", + "end", + "score", + "strand", + "frame", + "attribute", +] + + +logging.basicConfig(level=logging.INFO) + +logger = logging.getLogger(__name__) + + +def read_gtf(filepath_or_buffer, blocksize=None, compression=None, expand_attribute_column=True, + infer_biotype_column=False, column_converters={}, usecols=None, features=None, chunksize=1024 * 1024): + """Parse a GTF into a dictionary mapping column names to sequences of + values. + + Args: + filepath_or_buffer (str or buffer object): Path to GTF file (may be gzip + compressed) or buffer object such as StringIO + blocksize (int): Number of blocksize for the dask dataframe, if an integer > 10. Default None to use pandas.DataFrame instead. + compression (str): Compression type to be passed into dask.dataframe.read_table(). Default None. + expand_attribute_column (bool): Replace strings of semi-colon separated + key-value values in the 'attribute' column with one column per + distinct key, with a list of values for each row (using None for + rows where key didn't occur). + infer_biotype_column (bool): Due to the annoying ambiguity of the second + GTF column across multiple Ensembl releases, figure out if an older + GTF's source column is actually the gene_biotype or + transcript_biotype. + column_converters (dict, optional): Dictionary mapping column names to + conversion functions. Will replace empty strings with None and + otherwise passes them to given conversion function. + usecols (list of str or None): Restrict which columns are loaded to the + give set. If None, then load all columns. + features (set of str or None): Drop rows which aren't one of the + features in the supplied set + chunksize (int): The chunksize for pandas.read_csv(), ignored if blocksize is not None. + """ + if isinstance(filepath_or_buffer, string_types) and not exists(filepath_or_buffer): + raise ValueError("GTF file does not exist: %s" % filepath_or_buffer) + + if expand_attribute_column: + result_df = parse_gtf_and_expand_attributes(filepath_or_buffer, blocksize=blocksize, + compression=compression, + chunksize=chunksize, + restrict_attribute_columns=usecols) + else: + if blocksize: + result_df = parse_gtf_dask(filepath_or_buffer, blocksize=blocksize, features=features, + compression=compression) + else: + result_df = parse_gtf(filepath_or_buffer, features=features, compression=compression) + + for column_name, column_type in list(column_converters.items()): + result_df[column_name] = [ + column_type(string_value) if len(string_value) > 0 else None + for string_value + in result_df[column_name] + ] + + # Hackishly infer whether the values in the 'source' column of this GTF + # are actually representing a biotype by checking for the most common + # gene_biotype and transcript_biotype value 'protein_coding' + if infer_biotype_column: + unique_source_values = set(result_df["source"]) + if "protein_coding" in unique_source_values: + column_names = set(result_df.columns) + # Disambiguate between the two biotypes by checking if + # gene_biotype is already present in another column. If it is, + # the 2nd column is the transcript_biotype (otherwise, it's the + # gene_biotype) + if "gene_biotype" not in column_names: + logging.info("Using column 'source' to replace missing 'gene_biotype'") + result_df["gene_biotype"] = result_df["source"] + if "transcript_biotype" not in column_names: + logging.info("Using column 'source' to replace missing 'transcript_biotype'") + result_df["transcript_biotype"] = result_df["source"] + + if usecols is not None: + column_names = set(result_df.columns) + valid_columns = [c for c in usecols if c in column_names] + result_df = result_df[valid_columns] + + return result_df + + +def parse_gtf(filepath_or_buffer, chunksize=1024 * 1024, features=None, + intern_columns=["seqname", "source", "strand", "frame"], + fix_quotes_columns=["attribute"], compression=None) -> pd.DataFrame: + """ + Borrowed code from https://github.com/openvax/gtfparse + + Args: + filepath_or_buffer (str or buffer object): + chunksize (int): Default 1048576. + features (set or None): Drop entries which aren't one of these features + intern_columns (list): These columns are short strings which should be + interned + fix_quotes_columns (list): Most commonly the 'attribute' column which + had broken quotes on some Ensembl release GTF files. + """ + if features is not None: + features = set(features) + + def parse_frame(s): + if s == ".": + return 0 + else: + return int(s) + + # GTF columns: + # 1) seqname: str ("1", "X", "chrX", etc...) + # 2) source : str + # Different versions of GTF use second column as of: + # (a) gene biotype + # (b) transcript biotype + # (c) the annotation source + # See: https://www.biostars.org/p/120306/#120321 + # 3) feature : str ("gene", "transcript", &c) + # 4) start : int + # 5) end : int + # 6) score : float or "." + # 7) strand : "+", "-", or "." + # 8) frame : 0, 1, 2 or "." + # 9) attribute : key-value pairs separated by semicolons + # (see more complete description in docstring at top of file) + + chunk_iterator = pd.read_csv( + filepath_or_buffer, + sep="\t", + comment="#", + names=COLUMN_NAMES, + skipinitialspace=True, + skip_blank_lines=True, + on_bad_lines='error', + chunksize=chunksize, + compression=compression, + engine="c", + dtype={ + "start": np.int64, + "end": np.int64, + "score": np.float32, + "seqname": str, + }, + na_values=".", + converters={"frame": parse_frame}) + + dataframes = [] + try: + for df in chunk_iterator: + for intern_column in intern_columns: + df[intern_column] = [intern(str(s)) for s in df[intern_column]] + + # compare feature strings after interning + if features is not None: + df = df[df["feature"].isin(features)] + + for fix_quotes_column in fix_quotes_columns: + # Catch mistaken semicolons by replacing "xyz;" with "xyz" + # Required to do this since the Ensembl GTF for Ensembl + # release 78 has mistakes such as: + # gene_name = "PRAMEF6;" transcript_name = "PRAMEF6;-201" + df[fix_quotes_column] = [ + s.replace(';\"', '\"').replace(";-", "-") + for s in df[fix_quotes_column] + ] + dataframes.append(df) + except Exception as e: + raise e + # raise Exception("ParsingError:" + str(e)) + + df = pd.concat(dataframes) + + return df + + +def parse_gtf_dask(filepath_or_buffer, blocksize=None, compression=None, features=None) -> dd.DataFrame: + """ + Args: + filepath_or_buffer (str or buffer object): + blocksize (int): Number of chunksize for the dask dataframe. Default None. + compression (str): Compression type to be passed into dask.dataframe.read_table(). Default None. + features (set or None): Drop entries which aren't one of these features + """ + if features is not None: + features = set(features) + + def parse_frame(s): + if s == ".": + return 0 + else: + return int(s) + + # GTF columns: + # 1) seqname: str ("1", "X", "chrX", etc...) + # 2) source : str + # Different versions of GTF use second column as of: + # (a) gene biotype + # (b) transcript biotype + # (c) the annotation source + # See: https://www.biostars.org/p/120306/#120321 + # 3) feature : str ("gene", "transcript", &c) + # 4) start : int + # 5) end : int + # 6) score : float or "." + # 7) strand : "+", "-", or "." + # 8) frame : 0, 1, 2 or "." + # 9) attribute : key-value pairs separated by semicolons + # (see more complete description in docstring at top of file) + + # Uses Dask + dataframe = dd.read_table( + filepath_or_buffer, + sep="\t", + compression=compression, + blocksize=None if isinstance(blocksize, bool) else blocksize, + comment="#", + names=COLUMN_NAMES, + skipinitialspace=True, + skip_blank_lines=True, + on_bad_lines='error', + engine="c", + dtype={ + "start": np.int64, + "end": np.int64, + "score": np.float32, + "seqname": str, + }, + na_values=".", + converters={"frame": parse_frame}) + + return dataframe + + +def parse_gtf_and_expand_attributes(filepath_or_buffer, blocksize=None, compression=None, chunksize=1024 * 1024, + restrict_attribute_columns=None, features=None) -> Union[ + dd.DataFrame, pd.DataFrame]: + """Parse lines into column->values dictionary and then expand the + 'attribute' column into multiple columns. This expansion happens by + replacing strings of semi-colon separated key-value values in the + 'attribute' column with one column per distinct key, with a list of values + for each row (using None for rows where key didn't occur). + + Args: + compression: + filepath_or_buffer (str or buffer object): + blocksize: + chunksize (int): + restrict_attribute_columns (list/set of str or None): If given, then only uses attribute columns. + features (set or None): Ignore entries which don't correspond to one of the supplied features + """ + if blocksize: + # Dask dataframe + def expand_attr_strings_to_dict(attribute_string: str, quote_char='\"', usecols=None, + column_interned_strings={}, value_interned_strings={}) \ + -> Dict[str, str]: + """The last column of GTF has a variable number of key value pairs of the + format: "key1 value1; key2 value2;" Parse these into a dictionary mapping + each key onto a list of values, where the value is None for any row where + the key was missing. + + Args: + attribute_strings (list of str): + quote_char (str): Quote character to remove from values + nan_value (any): If an attribute is missing from a row, give it this + value. + usecols (list of str or None): If not None, then only expand columns + included in this set, otherwise use all columns. + column_interned_strings: Mutable default argument used for multiple + calls to the same data. + """ + output = {} + + for kv in attribute_string.split(";"): + # We're slicing the first two elements out of split() because + # Ensembl release 79 added values like: + # transcript_support_level "1 (assigned to previous version 5)"; + # ...which gets mangled by splitting on spaces. + parts = kv.strip().split(" ", 2)[:2] + if len(parts) != 2: continue + + column_name, value = parts + try: + column_name = column_interned_strings[column_name] + except KeyError: + column_name = intern(str(column_name)) + column_interned_strings[column_name] = column_name + + if usecols is not None and column_name not in usecols: continue + + value = value.replace(quote_char, "") if value.startswith(quote_char) else value + + try: + value = value_interned_strings[value] + except KeyError: + value = intern(str(value)) + value_interned_strings[value] = value + output[column_name] = value + + return output + + df = parse_gtf_dask(filepath_or_buffer, blocksize=blocksize, compression=compression, features=features) + + attribute_values: dd.Series = df.pop("attribute") + + # Sample 500 rows to determine the meta dataframe + attr_vals = attribute_values.head(500) + expected_df = attr_vals.map(expand_attr_strings_to_dict).apply(pd.Series) + + attr_df = attribute_values.to_bag(index=False, format='tuple') \ + .map(expand_attr_strings_to_dict) \ + .to_dataframe(meta=expected_df) + attr_df.index = df.index + df = dd.concat([df, attr_df], axis=1) + + else: + # Pandas + df = parse_gtf(filepath_or_buffer, chunksize=chunksize, features=features, compression=compression) + + attribute_values = df.pop("attribute") + + for column_name, values in expand_attribute_strings(attribute_values, + usecols=restrict_attribute_columns).items(): + df[column_name] = values + + return df + +def expand_attribute_strings(attribute_strings: List[str], quote_char='\"', nan_value=None, + usecols=None) -> OrderedDict: + """The last column of GTF has a variable number of key value pairs of the + format: "key1 value1; key2 value2;" Parse these into a dictionary mapping + each key onto a list of values, where the value is None for any row where + the key was missing. + + Args: + attribute_strings (list of str): + quote_char (str): Quote character to remove from values + nan_value (any): If an attribute is missing from a row, give it this + value. + usecols (list of str or None): If not None, then only expand columns + included in this set, otherwise use all columns. + """ + n = len(attribute_strings) + + extra_columns = {} + column_order = [] + + # + # SOME NOTES ABOUT THE BIZARRE STRING INTERNING GOING ON BELOW + # + # While parsing millions of repeated strings (e.g. "gene_id" and "TP53"), + # we can save a lot of memory by making sure there's only one string + # object per unique string. The canonical way to do this is using + # the 'intern' function. One problem is that Py2 won't let you intern + # unicode objects, so to get around this we call intern(str(...)). + # + # It also turns out to be faster to check interned strings ourselves + # using a local dictionary, hence the two dictionaries below + # and pair of try/except blocks in the loop. + column_interned_strings = {} + value_interned_strings = {} + + for (i, attribute_string) in enumerate(attribute_strings): + for kv in attribute_string.split(";"): + # We're slicing the first two elements out of split() because + # Ensembl release 79 added values like: + # transcript_support_level "1 (assigned to previous version 5)"; + # ...which gets mangled by splitting on spaces. + parts = kv.strip().split(" ", 2)[:2] + + if len(parts) != 2: + continue + + column_name, value = parts + + try: + column_name = column_interned_strings[column_name] + except KeyError: + column_name = intern(str(column_name)) + column_interned_strings[column_name] = column_name + + if usecols is not None and column_name not in usecols: + continue + + try: + column = extra_columns[column_name] + except KeyError: + column = [nan_value] * n + extra_columns[column_name] = column + column_order.append(column_name) + + value = value.replace(quote_char, "") if value.startswith(quote_char) else value + + try: + value = value_interned_strings[value] + except KeyError: + value = intern(str(value)) + value_interned_strings[value] = value + + # if an attribute is used repeatedly then + # keep track of all its values in a list + old_value = column[i] + if old_value is nan_value: + column[i] = value + else: + column[i] = "%s,%s" % (old_value, value) + + return OrderedDict( + (column_name, extra_columns[column_name]) + for column_name in column_order) + + +