# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function, division, absolute_import
import logging
from collections import OrderedDict
from os.path import exists
from typing import List, Dict, Union
import dask.dataframe as dd
import numpy as np
import pandas as pd
from six import string_types
from six.moves import intern
COLUMN_NAMES = [
"seqname",
"source",
"feature",
"start",
"end",
"score",
"strand",
"frame",
"attribute",
]
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def read_gtf(filepath_or_buffer, blocksize=None, compression=None, expand_attribute_column=True,
infer_biotype_column=False, column_converters={}, usecols=None, features=None, chunksize=1024 * 1024):
"""Parse a GTF into a dictionary mapping column names to sequences of
values.
Args:
filepath_or_buffer (str or buffer object): Path to GTF file (may be gzip
compressed) or buffer object such as StringIO
blocksize (int): Number of blocksize for the dask dataframe, if an integer > 10. Default None to use pandas.DataFrame instead.
compression (str): Compression type to be passed into dask.dataframe.read_table(). Default None.
expand_attribute_column (bool): Replace strings of semi-colon separated
key-value values in the 'attribute' column with one column per
distinct key, with a list of values for each row (using None for
rows where key didn't occur).
infer_biotype_column (bool): Due to the annoying ambiguity of the second
GTF column across multiple Ensembl releases, figure out if an older
GTF's source column is actually the gene_biotype or
transcript_biotype.
column_converters (dict, optional): Dictionary mapping column names to
conversion functions. Will replace empty strings with None and
otherwise passes them to given conversion function.
usecols (list of str or None): Restrict which columns are loaded to the
give set. If None, then load all columns.
features (set of str or None): Drop rows which aren't one of the
features in the supplied set
chunksize (int): The chunksize for pandas.read_csv(), ignored if blocksize is not None.
"""
if isinstance(filepath_or_buffer, string_types) and not exists(filepath_or_buffer):
raise ValueError("GTF file does not exist: %s" % filepath_or_buffer)
if expand_attribute_column:
result_df = parse_gtf_and_expand_attributes(filepath_or_buffer, blocksize=blocksize,
compression=compression,
chunksize=chunksize,
restrict_attribute_columns=usecols)
else:
if blocksize:
result_df = parse_gtf_dask(filepath_or_buffer, blocksize=blocksize, features=features,
compression=compression)
else:
result_df = parse_gtf(filepath_or_buffer, features=features, compression=compression)
for column_name, column_type in list(column_converters.items()):
result_df[column_name] = [
column_type(string_value) if len(string_value) > 0 else None
for string_value
in result_df[column_name]
]
# Hackishly infer whether the values in the 'source' column of this GTF
# are actually representing a biotype by checking for the most common
# gene_biotype and transcript_biotype value 'protein_coding'
if infer_biotype_column:
unique_source_values = set(result_df["source"])
if "protein_coding" in unique_source_values:
column_names = set(result_df.columns)
# Disambiguate between the two biotypes by checking if
# gene_biotype is already present in another column. If it is,
# the 2nd column is the transcript_biotype (otherwise, it's the
# gene_biotype)
if "gene_biotype" not in column_names:
logging.info("Using column 'source' to replace missing 'gene_biotype'")
result_df["gene_biotype"] = result_df["source"]
if "transcript_biotype" not in column_names:
logging.info("Using column 'source' to replace missing 'transcript_biotype'")
result_df["transcript_biotype"] = result_df["source"]
if usecols is not None:
column_names = set(result_df.columns)
valid_columns = [c for c in usecols if c in column_names]
result_df = result_df[valid_columns]
return result_df
def parse_gtf(filepath_or_buffer, chunksize=1024 * 1024, features=None,
intern_columns=["seqname", "source", "strand", "frame"],
fix_quotes_columns=["attribute"], compression=None) -> pd.DataFrame:
"""
Borrowed code from https://github.com/openvax/gtfparse
Args:
filepath_or_buffer (str or buffer object):
chunksize (int): Default 1048576.
features (set or None): Drop entries which aren't one of these features
intern_columns (list): These columns are short strings which should be
interned
fix_quotes_columns (list): Most commonly the 'attribute' column which
had broken quotes on some Ensembl release GTF files.
"""
if features is not None:
features = set(features)
def parse_frame(s):
if s == ".":
return 0
else:
return int(s)
# GTF columns:
# 1) seqname: str ("1", "X", "chrX", etc...)
# 2) source : str
# Different versions of GTF use second column as of:
# (a) gene biotype
# (b) transcript biotype
# (c) the annotation source
# See: https://www.biostars.org/p/120306/#120321
# 3) feature : str ("gene", "transcript", &c)
# 4) start : int
# 5) end : int
# 6) score : float or "."
# 7) strand : "+", "-", or "."
# 8) frame : 0, 1, 2 or "."
# 9) attribute : key-value pairs separated by semicolons
# (see more complete description in docstring at top of file)
chunk_iterator = pd.read_csv(
filepath_or_buffer,
sep="\t",
comment="#",
names=COLUMN_NAMES,
skipinitialspace=True,
skip_blank_lines=True,
on_bad_lines='error',
chunksize=chunksize,
compression=compression,
engine="c",
dtype={
"start": np.int64,
"end": np.int64,
"score": np.float32,
"seqname": str,
},
na_values=".",
converters={"frame": parse_frame})
dataframes = []
try:
for df in chunk_iterator:
for intern_column in intern_columns:
df[intern_column] = [intern(str(s)) for s in df[intern_column]]
# compare feature strings after interning
if features is not None:
df = df[df["feature"].isin(features)]
for fix_quotes_column in fix_quotes_columns:
# Catch mistaken semicolons by replacing "xyz;" with "xyz"
# Required to do this since the Ensembl GTF for Ensembl
# release 78 has mistakes such as:
# gene_name = "PRAMEF6;" transcript_name = "PRAMEF6;-201"
df[fix_quotes_column] = [
s.replace(';\"', '\"').replace(";-", "-")
for s in df[fix_quotes_column]
]
dataframes.append(df)
except Exception as e:
raise e
# raise Exception("ParsingError:" + str(e))
df = pd.concat(dataframes)
return df
def parse_gtf_dask(filepath_or_buffer, blocksize=None, compression=None, features=None) -> dd.DataFrame:
"""
Args:
filepath_or_buffer (str or buffer object):
blocksize (int): Number of chunksize for the dask dataframe. Default None.
compression (str): Compression type to be passed into dask.dataframe.read_table(). Default None.
features (set or None): Drop entries which aren't one of these features
"""
if features is not None:
features = set(features)
def parse_frame(s):
if s == ".":
return 0
else:
return int(s)
# GTF columns:
# 1) seqname: str ("1", "X", "chrX", etc...)
# 2) source : str
# Different versions of GTF use second column as of:
# (a) gene biotype
# (b) transcript biotype
# (c) the annotation source
# See: https://www.biostars.org/p/120306/#120321
# 3) feature : str ("gene", "transcript", &c)
# 4) start : int
# 5) end : int
# 6) score : float or "."
# 7) strand : "+", "-", or "."
# 8) frame : 0, 1, 2 or "."
# 9) attribute : key-value pairs separated by semicolons
# (see more complete description in docstring at top of file)
# Uses Dask
dataframe = dd.read_table(
filepath_or_buffer,
sep="\t",
compression=compression,
blocksize=None if isinstance(blocksize, bool) else blocksize,
comment="#",
names=COLUMN_NAMES,
skipinitialspace=True,
skip_blank_lines=True,
on_bad_lines='error',
engine="c",
dtype={
"start": np.int64,
"end": np.int64,
"score": np.float32,
"seqname": str,
},
na_values=".",
converters={"frame": parse_frame})
return dataframe
def parse_gtf_and_expand_attributes(filepath_or_buffer, blocksize=None, compression=None, chunksize=1024 * 1024,
restrict_attribute_columns=None, features=None) -> Union[
dd.DataFrame, pd.DataFrame]:
"""Parse lines into column->values dictionary and then expand the
'attribute' column into multiple columns. This expansion happens by
replacing strings of semi-colon separated key-value values in the
'attribute' column with one column per distinct key, with a list of values
for each row (using None for rows where key didn't occur).
Args:
compression:
filepath_or_buffer (str or buffer object):
blocksize:
chunksize (int):
restrict_attribute_columns (list/set of str or None): If given, then only uses attribute columns.
features (set or None): Ignore entries which don't correspond to one of the supplied features
"""
if blocksize:
# Dask dataframe
def expand_attr_strings_to_dict(attribute_string: str, quote_char='\"', usecols=None,
column_interned_strings={}, value_interned_strings={}) \
-> Dict[str, str]:
"""The last column of GTF has a variable number of key value pairs of the
format: "key1 value1; key2 value2;" Parse these into a dictionary mapping
each key onto a list of values, where the value is None for any row where
the key was missing.
Args:
attribute_strings (list of str):
quote_char (str): Quote character to remove from values
nan_value (any): If an attribute is missing from a row, give it this
value.
usecols (list of str or None): If not None, then only expand columns
included in this set, otherwise use all columns.
column_interned_strings: Mutable default argument used for multiple
calls to the same data.
"""
output = {}
for kv in attribute_string.split(";"):
# We're slicing the first two elements out of split() because
# Ensembl release 79 added values like:
# transcript_support_level "1 (assigned to previous version 5)";
# ...which gets mangled by splitting on spaces.
parts = kv.strip().split(" ", 2)[:2]
if len(parts) != 2: continue
column_name, value = parts
try:
column_name = column_interned_strings[column_name]
except KeyError:
column_name = intern(str(column_name))
column_interned_strings[column_name] = column_name
if usecols is not None and column_name not in usecols: continue
value = value.replace(quote_char, "") if value.startswith(quote_char) else value
try:
value = value_interned_strings[value]
except KeyError:
value = intern(str(value))
value_interned_strings[value] = value
output[column_name] = value
return output
df = parse_gtf_dask(filepath_or_buffer, blocksize=blocksize, compression=compression, features=features)
attribute_values: dd.Series = df.pop("attribute")
# Sample 500 rows to determine the meta dataframe
attr_vals = attribute_values.head(500)
expected_df = attr_vals.map(expand_attr_strings_to_dict).apply(pd.Series)
attr_df = attribute_values.to_bag(index=False, format='tuple') \
.map(expand_attr_strings_to_dict) \
.to_dataframe(meta=expected_df)
attr_df.index = df.index
df = dd.concat([df, attr_df], axis=1)
else:
# Pandas
df = parse_gtf(filepath_or_buffer, chunksize=chunksize, features=features, compression=compression)
attribute_values = df.pop("attribute")
for column_name, values in expand_attribute_strings(attribute_values,
usecols=restrict_attribute_columns).items():
df[column_name] = values
return df
def expand_attribute_strings(attribute_strings: List[str], quote_char='\"', nan_value=None,
usecols=None) -> OrderedDict:
"""The last column of GTF has a variable number of key value pairs of the
format: "key1 value1; key2 value2;" Parse these into a dictionary mapping
each key onto a list of values, where the value is None for any row where
the key was missing.
Args:
attribute_strings (list of str):
quote_char (str): Quote character to remove from values
nan_value (any): If an attribute is missing from a row, give it this
value.
usecols (list of str or None): If not None, then only expand columns
included in this set, otherwise use all columns.
"""
n = len(attribute_strings)
extra_columns = {}
column_order = []
#
# SOME NOTES ABOUT THE BIZARRE STRING INTERNING GOING ON BELOW
#
# While parsing millions of repeated strings (e.g. "gene_id" and "TP53"),
# we can save a lot of memory by making sure there's only one string
# object per unique string. The canonical way to do this is using
# the 'intern' function. One problem is that Py2 won't let you intern
# unicode objects, so to get around this we call intern(str(...)).
#
# It also turns out to be faster to check interned strings ourselves
# using a local dictionary, hence the two dictionaries below
# and pair of try/except blocks in the loop.
column_interned_strings = {}
value_interned_strings = {}
for (i, attribute_string) in enumerate(attribute_strings):
for kv in attribute_string.split(";"):
# We're slicing the first two elements out of split() because
# Ensembl release 79 added values like:
# transcript_support_level "1 (assigned to previous version 5)";
# ...which gets mangled by splitting on spaces.
parts = kv.strip().split(" ", 2)[:2]
if len(parts) != 2:
continue
column_name, value = parts
try:
column_name = column_interned_strings[column_name]
except KeyError:
column_name = intern(str(column_name))
column_interned_strings[column_name] = column_name
if usecols is not None and column_name not in usecols:
continue
try:
column = extra_columns[column_name]
except KeyError:
column = [nan_value] * n
extra_columns[column_name] = column
column_order.append(column_name)
value = value.replace(quote_char, "") if value.startswith(quote_char) else value
try:
value = value_interned_strings[value]
except KeyError:
value = intern(str(value))
value_interned_strings[value] = value
# if an attribute is used repeatedly then
# keep track of all its values in a list
old_value = column[i]
if old_value is nan_value:
column[i] = value
else:
column[i] = "%s,%s" % (old_value, value)
return OrderedDict(
(column_name, extra_columns[column_name])
for column_name in column_order)