Switch to side-by-side view

--- a
+++ b/openomics/database/sequence.py
@@ -0,0 +1,1025 @@
+import os
+import re
+import traceback
+from abc import abstractmethod
+from collections import defaultdict, OrderedDict
+from typing import Union, List, Callable, Dict, Tuple, Optional, Iterable
+
+import numpy as np
+import pandas as pd
+import tqdm
+from Bio import SeqIO
+from Bio.SeqFeature import ExactPosition
+from dask import dataframe as dd
+from logzero import logger
+from pyfaidx import Fasta
+from six.moves import intern
+
+import openomics
+from openomics.io.read_gtf import read_gtf
+from .base import Database
+from ..io.files import select_files_with_ext
+from ..transforms.agg import get_agg_func
+from ..transforms.df import drop_duplicate_columns
+
+__all__ = ['GENCODE', 'UniProt', 'MirBase', 'RNAcentral']
+
+SEQUENCE_COL = 'sequence'
+
+
+class SequenceDatabase(Database):
+    """Provides a series of methods to extract sequence data from
+    SequenceDataset.
+    """
+
+    def __init__(self, **kwargs):
+        """
+        Args:
+            **kwargs:
+        """
+        super().__init__(**kwargs)
+        self.close()
+
+    @abstractmethod
+    def load_sequences(self, fasta_file: str, index=None, keys: Union[pd.Index, List[str]] = None, blocksize=None) \
+        -> pd.DataFrame:
+        """Returns a pandas DataFrame containing the fasta sequence entries.
+        With a column named 'sequence'.
+
+        Args:
+            index ():
+            fasta_file (str): path to the fasta file, usually as
+                self.file_resources[<file_name>]
+            keys (pd.Index): list of keys to
+            blocksize:
+        """
+        raise NotImplementedError
+
+    @abstractmethod
+    def get_sequences(self, index: str, omic: str, agg: str, **kwargs) -> Union[pd.Series, Dict]:
+        """Returns a dictionary where keys are 'index' and values are
+        sequence(s).
+
+        Args:
+            index (str): {"gene_id", "gene_name", "transcript_id",
+                "transcript_name"}
+            omic (str): {"lncRNA", "microRNA", "messengerRNA"}
+            agg (str): {"all", "shortest", "longest"}
+            **kwargs: any additional argument to pass to
+                SequenceDataset.get_sequences()
+        """
+        raise NotImplementedError
+
+    @staticmethod
+    def aggregator_fn(agg: Union[str, Callable] = None) -> Callable:
+        """Returns a function used aggregate a list of sequences from a groupby
+        on a given key.
+
+        Args:
+            agg: One of ("all", "shortest", "longest", "first"), default "all". If "all",
+                then return a list of sequences.
+        """
+        if agg == "all":
+            agg_func = lambda x: list(x) if not isinstance(x, str) else x
+        elif agg == "shortest":
+            agg_func = lambda x: min(x, key=len) if isinstance(x, list) else x
+        elif agg == "longest":
+            agg_func = lambda x: max(x, key=len) if isinstance(x, list) else x
+        elif agg == 'first':
+            agg_func = lambda x: x[0] if isinstance(x, list) else x
+        elif callable(agg):
+            return agg
+        else:
+            raise Exception(
+                "agg_sequences argument must be one of {'all', 'shortest', 'longest'}"
+            )
+        return agg_func
+
+class GENCODE(SequenceDatabase):
+    """Loads the GENCODE database from https://www.gencodegenes.org/ .
+
+    Default path: ftp://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_32/ .
+    Default file_resources: {
+        "basic.annotation.gtf": "gencode.v32.basic.annotation.gtf.gz",
+        "long_noncoding_RNAs.gtf": "gencode.v32.long_noncoding_RNAs.gtf.gz",
+        "lncRNA_transcripts.fa": "gencode.v32.lncRNA_transcripts.fa.gz",
+        "transcripts.fa": "gencode.v32.transcripts.fa.gz",
+    }
+    """
+    def __init__(
+        self,
+        path="ftp://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_32/",
+        file_resources=None,
+        col_rename=None,
+        blocksize=0,
+        remove_version_num=False,
+        **kwargs
+    ):
+        """
+        Args:
+            path:
+            file_resources:
+            col_rename:
+            blocksize:
+            remove_version_num (bool): Whether to drop the version number on the
+                ensembl ID.
+        """
+        if file_resources is None:
+            file_resources = {
+                "basic.annotation.gtf.gz": "gencode.v32.basic.annotation.gtf.gz",
+                "long_noncoding_RNAs.gtf.gz": "gencode.v32.long_noncoding_RNAs.gtf.gz",
+                "lncRNA_transcripts.fa.gz": "gencode.v32.lncRNA_transcripts.fa.gz",
+                "transcripts.fa.gz": "gencode.v32.transcripts.fa.gz",
+            }
+
+        self.remove_version_num = remove_version_num
+
+        super().__init__(path=path, file_resources=file_resources, col_rename=col_rename, blocksize=blocksize, **kwargs)
+
+    def load_dataframe(self, file_resources, blocksize=None):
+        """
+        Loads the GENCODE annotation file into a pandas or dask DataFrame.
+        Args:
+            file_resources:
+            blocksize:
+
+        Returns:
+
+        """
+        gtf_files = select_files_with_ext(file_resources, ".gtf")
+        if not gtf_files:
+            gtf_files = select_files_with_ext(file_resources, ".gtf.gz")
+
+        ddfs = []
+        for filename, filepath in gtf_files.items():
+            if blocksize and not isinstance(filepath, str): continue
+            ddf = read_gtf(filepath, blocksize=blocksize,
+                           compression="gzip" if filename.endswith(".gz") else None)
+
+            ddfs.append(ddf)
+
+        annotation_df = dd.concat(ddfs, interleave_partitions=True,
+                                  ignore_unknown_divisions=True) if blocksize else pd.concat(ddfs)
+
+        if self.remove_version_num:
+            annotation_df = annotation_df.assign(
+                gene_id=annotation_df["gene_id"].str.replace("[.]\d*", "", regex=True),
+                transcript_id=annotation_df["transcript_id"].str.replace("[.]\d*", "", regex=True))
+
+        return annotation_df
+
+    def load_sequences(self, fasta_file: str, index=None, keys: pd.Index = None, blocksize=None):
+        """
+        Args:
+            index ():
+            keys ():
+            fasta_file:
+            blocksize:
+        """
+        if hasattr(self, '_seq_df_dict') and fasta_file in self._seq_df_dict:
+            return self._seq_df_dict[fasta_file]
+
+        def get_transcript_id(x):
+            key = x.split('|')[0]  # transcript_id
+            if self.remove_version_num:
+                return re.sub("[.]\d*", "", key)
+            else:
+                return key
+
+        fa = Fasta(fasta_file, key_function=get_transcript_id, as_raw=True)
+
+        entries = []
+        for key, record in tqdm.tqdm(fa.items(), desc=str(fasta_file)):
+            if keys is not None and key not in keys: continue
+
+            attrs = record.long_name.split("|")
+            record_dict = {
+                "transcript_id": attrs[0],
+                "gene_id": attrs[1],
+                "gene_name": attrs[5],
+                "transcript_name": attrs[4],
+                "transcript_length": attrs[6],
+                "transcript_biotype": intern(attrs[7]),
+                SEQUENCE_COL: str(record),
+            }
+
+            entries.append(record_dict)
+
+        seq_df = pd.DataFrame(entries)
+        if blocksize:
+            seq_df = dd.from_pandas(seq_df, chunksize=blocksize)
+
+        if self.remove_version_num:
+            seq_df["gene_id"] = seq_df["gene_id"].str.replace("[.]\d*", "", regex=True)
+            seq_df["transcript_id"] = seq_df["transcript_id"].str.replace("[.]\d*", "", regex=True)
+
+        # Cache the seq_df
+        if not hasattr(self, '_seq_df_dict'):
+            self._seq_df_dict = {}
+        if keys is not None:
+            self._seq_df_dict[fasta_file] = seq_df
+
+        return seq_df
+
+    def get_sequences(self, index: Union[str, Tuple[str]], omic: str, agg: str = 'all', biotypes: List[str] = None):
+        """
+        Args:
+            index (str):
+            omic (str):
+            agg (str):
+            biotypes (List[str]):
+        """
+        agg_func = self.aggregator_fn(agg)
+
+        # Parse lncRNA & mRNA fasta
+        if omic == openomics.MessengerRNA.name():
+            fasta_file = self.file_resources["transcripts.fa"]
+        elif omic == openomics.LncRNA.name():
+            fasta_file = self.file_resources["lncRNA_transcripts.fa"]
+        else:
+            raise Exception("omic argument must be one of {'MessengerRNA', 'LncRNA'}")
+
+        assert isinstance(fasta_file, str), \
+            f"Fasta file provided in `file_resources` must be an uncompressed .fa file. Given {fasta_file}."
+
+        seq_df = self.load_sequences(fasta_file)
+
+        if "gene" in index:
+            if biotypes:
+                seq_df = seq_df[seq_df["transcript_biotype"].isin(biotypes)]
+            else:
+                print("INFO: You can pass in a list of transcript biotypes to filter using the argument 'biotypes'.")
+
+            return seq_df.groupby(index)[SEQUENCE_COL].agg(agg_func)
+
+        else:
+            return seq_df.groupby(index)[SEQUENCE_COL].first()
+
+    def get_rename_dict(self, from_index="gene_id", to_index="gene_name"):
+        """
+        Args:
+            from_index:
+            to_index:
+        """
+        ensembl_id_to_gene_name = pd.Series(
+            self.data[to_index].values, index=self.data[from_index]).to_dict()
+        return ensembl_id_to_gene_name
+
+
+class UniProt(SequenceDatabase):
+    COLUMNS_RENAME_DICT = {
+        # idmapping_selected.tab
+        "UniProtKB-AC": 'protein_id',
+        "UniProtKB-ID": 'protein_name',
+        "Ensembl": "gene_id",
+        "Ensembl_TRS": "transcript_id",
+        "Ensembl_PRO": "protein_embl_id",
+        "NCBI-taxon": "species_id",
+        "GeneID (EntrezGene)": "entrezgene_id",
+        "GO": "go_id",
+        # FASTA headers
+        "OS": 'species', "OX": 'species_id', 'GN': 'gene_name', 'PE': 'ProteinExistence', 'SV': "version",
+        # UniProt XML headers
+        "accession": "UniProtKB-AC", "name": "protein_name", "gene": "gene_name", "keyword": "keywords",
+        "geneLocation": "subcellular_location",
+
+    }
+
+    SPECIES_ID_NAME = {
+        '10090': 'MOUSE', '10116': 'RAT', '226900': 'BACCR', '243273': 'MYCGE', '284812': 'SCHPO', '287': 'PSEAI',
+        '3702': 'ARATH', '99287': 'SALTY', '44689': 'DICDI', '4577': 'MAIZE', '559292': 'YEAST', '6239': 'CAEEL',
+        '7227': 'DROME', '7955': 'DANRE', '83333': 'ECOLI', '9606': 'HUMAN', '9823': 'PIG', }
+
+    SPECIES_ID_TAXONOMIC = {
+        'HUMAN': 'human', 'MOUSE': 'rodents', 'RAT': 'rodents', 'BACCR': 'bacteria', 'MYCGE': 'bacteria',
+        'SCHPO': 'fungi', 'PSEAI': 'bacteria', 'ARATH': 'plants', 'SALTY': 'bacteria', 'DICDI': 'bacteria',
+        'MAIZE': 'plants', 'YEAST': 'fungi', 'CAEEL': 'vertebrates', 'DROME': 'invertebrates', 'DANRE': 'vertebrates',
+        'ECOLI': 'bacteria', 'PIG': 'mammals',
+    }
+
+    def __init__(self, path="https://ftp.uniprot.org/pub/databases/uniprot/current_release/",
+                 file_resources: Dict[str, str] = None,
+                 species_id: str = "9606", remove_version_num=True,
+                 index_col='UniProtKB-AC', keys=None,
+                 col_rename=COLUMNS_RENAME_DICT,
+                 **kwargs):
+        """
+        Loads the UniProt database from https://uniprot.org/ .
+
+        Default path: 'https://ftp.uniprot.org/pub/databases/uniprot/current_release/'
+        Default file_resources: {
+            file_resources['uniprot_sprot.xml.gz'] = "knowledgebase/complete/uniprot_sprot.xml.gz
+            file_resources['uniprot_trembl.xml.gz'] = "knowledgebase/complete/uniprot_trembl.xml.gz
+            file_resources["idmapping_selected.tab.gz"] = "knowledgebase/idmapping/idmapping_selected.tab.gz'
+            file_resources["proteomes.tsv"] = "https://rest.uniprot.org/proteomes/stream?compressed=true&
+                fields=upid%2Corganism%2Corganism_id&format=tsv&query=%28%2A%29%20AND%20%28proteome_type%3A1%29"
+            file_resources['speclist.txt'] = 'https://ftp.uniprot.org/pub/databases/uniprot/current_release/
+                knowledgebase/complete/docs/speclist'
+        }
+
+        Args:
+            path:
+            file_resources:
+            col_rename:
+            verbose:
+            blocksize:
+        """
+        self.species_id = species_id
+        self.species = UniProt.SPECIES_ID_NAME[species_id] if species_id in UniProt.SPECIES_ID_NAME else None
+        self.taxonomic_id = UniProt.SPECIES_ID_TAXONOMIC[
+            self.species] if self.species in UniProt.SPECIES_ID_TAXONOMIC else None
+        self.remove_version_num = remove_version_num
+
+        if file_resources is None:
+            file_resources = {}
+
+            file_resources['uniprot_sprot.xml.gz'] = os.path.join(path, "knowledgebase/complete/uniprot_sprot.xml.gz")
+            file_resources['uniprot_trembl.xml.gz'] = os.path.join(path, "knowledgebase/complete/uniprot_trembl.xml.gz")
+            file_resources["idmapping_selected.tab.gz"] = os.path.join(
+                path, "knowledgebase/idmapping/idmapping_selected.tab.gz")
+
+            if self.species:
+                file_resources['uniprot_sprot.xml.gz'] = os.path.join(
+                    path, "knowledgebase/taxonomic_divisions/", f'uniprot_sprot_{self.taxonomic_id}.xml.gz')
+                file_resources['uniprot_trembl.xml.gz'] = os.path.join(
+                    path, "knowledgebase/taxonomic_divisions/", f'uniprot_trembl_{self.taxonomic_id}.xml.gz')
+                file_resources["idmapping_selected.tab.gz"] = os.path.join(
+                    path, "knowledgebase/idmapping/by_organism/",
+                    f'{self.species}_{self.species_id}_idmapping_selected.tab.gz')
+
+            file_resources["proteomes.tsv"] = \
+                "https://rest.uniprot.org/proteomes/stream?compressed=true&fields=upid%2Corganism%2Corganism_id&format=tsv&query=%28%2A%29%20AND%20%28proteome_type%3A1%29"
+
+        super().__init__(path=path, file_resources=file_resources, index_col=index_col, keys=keys,
+                         col_rename=col_rename,
+                         **kwargs)
+
+    def load_dataframe(self, file_resources, blocksize=None):
+        """
+        Args:
+            file_resources:
+            blocksize:
+        """
+        # Load idmapping_selected.tab
+        args = dict(
+            names=['UniProtKB-AC', 'UniProtKB-ID', 'GeneID (EntrezGene)', 'RefSeq', 'GI', 'PDB', 'GO', 'UniRef100',
+                   'UniRef90', 'UniRef50', 'UniParc', 'PIR', 'NCBI-taxon', 'MIM', 'UniGene', 'PubMed', 'EMBL',
+                   'EMBL-CDS', 'Ensembl', 'Ensembl_TRS', 'Ensembl_PRO', 'Additional PubMed'],
+            usecols=['UniProtKB-AC', 'UniProtKB-ID', 'GeneID (EntrezGene)', 'RefSeq', 'GI', 'PDB', 'GO',
+                     'NCBI-taxon', 'Ensembl', 'Ensembl_TRS', 'Ensembl_PRO'],
+            dtype='str')
+
+        if blocksize:
+            if "idmapping_selected.parquet" in file_resources and \
+                isinstance(file_resources["idmapping_selected.parquet"], str):
+                idmapping = dd.read_parquet(file_resources["idmapping_selected.parquet"])
+
+            elif "idmapping_selected.tab" in file_resources and \
+                isinstance(file_resources["idmapping_selected.tab"], str):
+                idmapping = dd.read_table(file_resources["idmapping_selected.tab"], blocksize=blocksize, **args)
+            else:
+                idmapping = dd.read_table(file_resources["idmapping_selected.tab.gz"], compression="gzip", **args, )
+
+            idmapping: dd.DataFrame
+        else:
+            if "idmapping_selected.parquet" in file_resources and \
+                isinstance(file_resources["idmapping_selected.parquet"], str):
+                idmapping = pd.read_parquet(file_resources["idmapping_selected.parquet"])
+            else:
+                idmapping = pd.read_table(file_resources["idmapping_selected.tab"], index_col=self.index_col, **args)
+
+        # Filter UniProt accession keys
+        if self.keys is not None and idmapping.index.name == self.index_col:
+            idmapping = idmapping.loc[idmapping.index.isin(self.keys)]
+        elif self.keys is not None and idmapping.index.name != self.index_col:
+            idmapping = idmapping.loc[idmapping[self.index_col].isin(self.keys)]
+
+        if idmapping.index.name != self.index_col:
+            idmapping = idmapping.set_index(self.index_col, sorted=False)
+        if not idmapping.known_divisions:
+            idmapping.divisions = idmapping.compute_current_divisions()
+
+        # Transform list columns
+        if isinstance(idmapping, dd.DataFrame):
+            idmapping = idmapping.assign(**self.assign_transforms(idmapping))
+        else:
+            idmapping = idmapping.assign(**self.assign_transforms(idmapping))
+
+        # Join metadata from uniprot_sprot.parquet
+        if any(fn.startswith('uniprot') and fn.endswith('.parquet') for fn in file_resources):
+            uniprot_anns = self.load_uniprot_parquet(file_resources, blocksize=blocksize)
+            uniprot_anns = uniprot_anns[uniprot_anns.columns.difference(idmapping.columns)]
+            uniprot_anns = drop_duplicate_columns(uniprot_anns)
+            assert idmapping.index.name == uniprot_anns.index.name, f"{idmapping.index.name} != {uniprot_anns.index.name}"
+            idmapping = idmapping.join(uniprot_anns, on=idmapping.index.name, how='left')
+
+        # Load proteome.tsv
+        if "proteomes.tsv" in file_resources:
+            proteomes = pd.read_table(file_resources["proteomes.tsv"],
+                                      usecols=['Organism Id', 'Proteome Id'],
+                                      dtype={'Organism Id': 'str', 'Proteome Id': 'str'}) \
+                .rename(columns={'Organism Id': 'NCBI-taxon', 'Proteome Id': 'proteome_id'}) \
+                .dropna().set_index('NCBI-taxon')
+            idmapping = idmapping.join(proteomes, on='NCBI-taxon')
+
+        # Load species info from speclist.txt
+        if 'speclist.txt' in file_resources:
+            speclist = self.get_species_list(file_resources['speclist.txt'])
+            idmapping = idmapping.join(speclist, on='NCBI-taxon')
+
+        return idmapping
+
+    def assign_transforms(self, idmapping: pd.DataFrame) -> Dict[str, Union[dd.Series, pd.Series]]:
+        # Convert string of list elements to a np.array
+        list2array = lambda x: np.array(x) if isinstance(x, Iterable) else x
+        assign_fn = {}
+        for col in {'PDB', 'GI', 'GO', 'RefSeq'}.intersection(idmapping.columns):
+            try:
+                # Split string to list
+                assign_fn[col] = idmapping[col].str.split("; ").map(list2array)
+            except:
+                continue
+
+        for col in {'Ensembl', 'Ensembl_TRS', 'Ensembl_PRO'}.intersection(idmapping.columns):
+            # Removing .# ENGS gene version number at the end
+            try:
+                if self.remove_version_num:
+                    series = idmapping[col].str.replace("[.]\d*", "", regex=True)
+                else:
+                    series = idmapping[col]
+
+                assign_fn[col] = series.str.split("; ").map(list2array)
+
+                if col == 'Ensembl_PRO':
+                    # Prepend species_id to ensembl protein ids to match with STRING PPI
+                    concat = dd.concat([idmapping["NCBI-taxon"], assign_fn[col]]) \
+                        if isinstance(idmapping, dd.DataFrame) else \
+                        pd.concat([idmapping["NCBI-taxon"], assign_fn[col]])
+
+                    assign_fn['protein_external_id'] = concat.apply(
+                        lambda row: np.char.add(row['NCBI-taxon'] + ".", row['Ensembl_PRO']) \
+                            if isinstance(row['Ensembl_PRO'], Iterable) else None,
+                        axis=1)
+            except:
+                continue
+
+        return assign_fn
+
+    def load_uniprot_parquet(self, file_resources: Dict[str, str], blocksize=None) -> Union[dd.DataFrame, pd.DataFrame]:
+        dfs = []
+        for filename, file_path in file_resources.items():
+            if not ('uniprot' in filename and filename.endswith('.parquet')): continue
+            if blocksize:
+                df: dd.DataFrame = dd.read_parquet(file_path) \
+                    .rename(columns=UniProt.COLUMNS_RENAME_DICT)
+                if df.index.name in UniProt.COLUMNS_RENAME_DICT:
+                    df.index = df.index.rename(UniProt.COLUMNS_RENAME_DICT[df.index.name])
+
+                if self.keys is not None:
+                    if self.index_col in df.columns:
+                        df = df.loc[df[self.index_col].isin(self.keys)]
+                    elif df.index.name == self.index_col:
+                        df = df.loc[df.index.isin(self.keys)]
+
+                if df.index.size.compute() == 0: continue
+
+                if df.index.name != self.index_col:
+                    try:
+                        df = df.set_index(self.index_col, sorted=True)
+                    except Exception as e:
+                        print(file_path, e)
+                        df = df.set_index(self.index_col, sorted=False)
+
+                if not df.known_divisions:
+                    df.divisions = df.compute_current_divisions()
+
+            else:
+                df = pd.read_parquet(file_path).rename(columns=UniProt.COLUMNS_RENAME_DICT).set_index(self.index_col)
+
+                if self.keys is not None:
+                    df_keys = df.index if df.index.name == self.index_col else df[self.index_col]
+                    df = df.loc[df_keys.isin(self.keys)]
+                if df.index.size == 0: continue
+
+            dfs.append(df)
+
+        if dfs:
+            dfs = dd.concat(dfs, interleave_partitions=True) if blocksize else pd.concat(dfs)
+
+            return dfs
+        else:
+            return None
+
+    def load_uniprot_xml(self, file_path: str, keys=None, blocksize=None) -> pd.DataFrame:
+        records = []
+        seqfeats = []
+        if isinstance(keys, str):
+            index = keys
+            keys_set = self.data.index if keys == self.data.index.name else self.data[keys]
+        elif isinstance(keys, (dd.Index, dd.Series)):
+            index = keys.name
+            keys_set = keys.compute()
+        else:
+            index = keys_set = None
+
+        for record in tqdm.tqdm(SeqIO.parse(file_path, "uniprot-xml"), desc=str(file_path)):
+            # Sequence features
+            annotations = defaultdict(None, record.annotations)
+            record_dict = {
+                'protein_id': record.id,
+                "protein_name": record.name,
+                'gene_name': annotations['gene_name_primary'],
+                'description': record.description,
+                'molecule_type': annotations['molecule_type'],
+                'created': annotations['created'],
+                'ec_id': annotations['type'],
+                'subcellular_location': annotations['comment_subcellularlocation_location'],
+                'taxonomy': annotations['taxonomy'],
+                'keywords': annotations['keywords'],
+                'sequence_mass': annotations['sequence_mass'],
+                SEQUENCE_COL: str(record.seq),
+            }
+            if index is not None:
+                if record_dict[keys] not in keys_set: continue
+
+            records.append(record_dict)
+
+            # Sequence interval features
+            _parse_interval = lambda sf: pd.Interval(left=sf.location.start, right=sf.location.end, )
+            feature_type_intervals = defaultdict(lambda: [])
+            for sf in record.features:
+                if isinstance(sf.location.start, ExactPosition) and isinstance(sf.location.end, ExactPosition):
+                    feature_type_intervals[sf.type].append(_parse_interval(sf))
+
+            features_dict = {type: pd.IntervalIndex(intervals, name=type) \
+                             for type, intervals in feature_type_intervals.items()}
+            seqfeats.append({"protein_id": record.id, **features_dict})
+
+        records_df = pd.DataFrame(records) if not blocksize else dd.from_pandas(records, chunksize=blocksize)
+        records_df = records_df.set_index(['protein_id'])
+
+        seqfeats_df = pd.DataFrame(seqfeats) if not blocksize else dd.from_pandas(seqfeats, chunksize=blocksize)
+        seqfeats_df = seqfeats_df.set_index(['protein_id'])
+        seqfeats_df.columns = [f"seq/{col}" for col in seqfeats_df.columns]
+
+        # Join new metadata to self.data
+        if SEQUENCE_COL not in self.data.columns:
+            exclude_cols = records_df.columns.intersection(self.data.columns)
+            self.data = self.data.join(records_df.drop(columns=exclude_cols, errors="ignore"),
+                                       on='protein_id', how="left")
+        else:
+            self.data.update(records_df, overwrite=False)
+
+        # Add new seq features
+        if len(seqfeats_df.columns.difference(self.data.columns)):
+            self.data = self.data.join(seqfeats_df.drop(columns=seqfeats_df.columns.intersection(self.data.columns)),
+                                       on='protein_id', how="left")
+        # Fillna seq features
+        if len(seqfeats_df.columns.intersection(self.data.columns)):
+            self.data.update(seqfeats_df.filter(seqfeats_df.columns.intersection(self.data.columns), axis='columns'),
+                             overwrite=False)
+
+        return records_df
+
+    @classmethod
+    def get_species_list(cls, file_path):
+        speclist = pd.read_fwf(file_path,
+                               names=['species_code', 'Taxon', 'species_id', 'attr'],
+                               comment="==", skipinitialspace=True, skiprows=59, skipfooter=4)
+        speclist = speclist.drop(index=speclist.index[~speclist['attr'].str.contains("=")])
+        speclist['species_id'] = speclist['species_id'].str.rstrip(":")
+        speclist = speclist.fillna(method='ffill')
+        speclist = speclist.groupby(speclist.columns[:3].tolist())['attr'] \
+            .apply('|'.join) \
+            .apply(lambda s: dict(map(str.strip, sub.split('=', 1)) for sub in s.split("|") if '=' in sub)) \
+            .apply(pd.Series)
+        speclist = speclist.rename(columns={'N': 'Official (scientific) name', 'C': 'Common name', 'S': 'Synonym'}) \
+            .reset_index() \
+            .set_index('species_id')
+        speclist['Taxon'] = speclist['Taxon'].replace(
+            {'A': 'archaea', 'B': 'bacteria', 'E': 'eukaryota', 'V': 'viruses', 'O': 'others'})
+        speclist.index.name = 'NCBI-taxon'
+        return speclist
+
+    def load_sequences(self, fasta_file: str, index=None, keys: Union[pd.Index, List[str]] = None, blocksize=None) \
+        -> OrderedDict:
+        def get_id(s: str):
+            if index == 'protein_id':
+                return s.split('|')[1]
+            elif index == 'protein_name':
+                return s.split('|')[2]
+            else:
+                return s.split('|')[1]
+
+        fa = Fasta(fasta_file, key_function=get_id, as_raw=True, )
+
+        return fa.records
+
+    def get_sequences(self, index: str, omic: str = None, agg: str = "first", **kwargs):
+        assert index, '`index` must be either "protein_id" or "protein_name"'
+
+        # Parse lncRNA & mRNA fasta
+        seq_df = self.load_sequences(self.file_resources["uniprot_sprot.fasta"], index=index, blocksize=self.blocksize)
+        if "uniprot_trembl.fasta" in self.file_resources:
+            trembl_seq_df = self.load_sequences(self.file_resources["uniprot_trembl.fasta"], index=index,
+                                                blocksize=self.blocksize)
+            seq_df.update(trembl_seq_df)
+
+        return seq_df
+
+class MirBase(SequenceDatabase):
+    """Loads the MirBase database from https://mirbase.org .
+
+    Default path: "ftp://mirbase.org/pub/mirbase/CURRENT/" .
+    Default file_resources: {
+        "aliases.txt": "aliases.txt.gz",
+        "mature.fa": "mature.fa.gz",
+        "hairpin.fa": "hairpin.fa.gz",
+        "rnacentral.mirbase.tsv": "ftp://ftp.ebi.ac.uk/pub/databases/RNAcentral/current_release/id_mapping/database_mappings/mirbase.tsv",
+    }
+    """
+
+    def __init__(
+        self,
+        path="http://mirbase.org/ftp/CURRENT/",
+        file_resources=None,
+        species_id: Optional[str] = '9606',
+        index_col: str = "mirbase_id",
+        col_rename=None,
+        **kwargs,
+    ):
+        """
+        Args:
+            path:
+            file_resources:
+            sequence (str):
+            species_id (str): Species code, e.g., 9606 for human
+            col_rename:
+            blocksize:
+        """
+        if file_resources is None:
+            file_resources = {}
+            file_resources["aliases.txt.gz"] = "aliases.txt.gz"
+            file_resources["mature.fa.gz"] = "mature.fa.gz"
+            file_resources["hairpin.fa.gz"] = "hairpin.fa.gz"
+
+        if 'rnacentral.mirbase.tsv' not in file_resources:
+            file_resources["rnacentral.mirbase.tsv"] = "ftp://ftp.ebi.ac.uk/pub/databases/RNAcentral/current_release/" \
+                                                       "id_mapping/database_mappings/mirbase.tsv"
+
+        self.species_id = species_id
+        super().__init__(path=path, file_resources=file_resources, index_col=index_col, col_rename=col_rename, **kwargs)
+
+    def load_dataframe(self, file_resources, blocksize=None):
+        """
+        Args:
+            file_resources: dict of file name and path
+            blocksize:
+        """
+        rnacentral_mirbase = pd.read_table(
+            file_resources["rnacentral.mirbase.tsv"], low_memory=True, header=None,
+            names=["RNAcentral id", "database", "mirbase_id", "species_id", "RNA type", "NA"],
+            usecols=["RNAcentral id", "database", "mirbase_id", "species_id", "RNA type"],
+            index_col="RNAcentral id",
+            dtype={'mirbase_id': 'str', "species_id": "category", 'database': 'category', 'RNA type': 'category'})
+
+        if isinstance(self.species_id, str):
+            rnacentral_mirbase = rnacentral_mirbase[rnacentral_mirbase["species_id"] == self.species_id]
+        elif isinstance(self.species_id, Iterable):
+            rnacentral_mirbase = rnacentral_mirbase[rnacentral_mirbase["species_id"].isin(set(self.species_id))]
+
+        mirbase_df = pd.read_table(file_resources["aliases.txt"], low_memory=True, header=None,
+                                   names=["mirbase_id", "mirbase_name"], index_col=self.index_col,
+                                   dtype='str', )
+        if mirbase_df.index.name == 'mirbase id':
+            mirbase_df = mirbase_df.join(rnacentral_mirbase, on=self.index_col, how="left", rsuffix='_rnacentral')
+        else:
+            mirbase_df = mirbase_df.merge(rnacentral_mirbase, on=self.index_col, how="left")
+
+        # Expanding miRNA names in each MirBase Ascension ID
+        mirbase_df['mirbase_name'] = mirbase_df['mirbase_name'].str.rstrip(";").str.split(";")
+
+        seq_dfs = []
+        for filename in file_resources:
+            if filename.endswith('.fa') or filename.endswith('.fasta'):
+                assert isinstance(file_resources[filename], str), f"Must provide a path to an uncompressed .fa file. " \
+                                                                  f"Given {file_resources[filename]}"
+                df = self.load_sequences(file_resources[filename], index=self.index_col, keys=self.keys)
+                seq_dfs.append(df)
+
+        if len(seq_dfs):
+            seq_dfs = pd.concat(seq_dfs, axis=0)
+            mirbase_df = mirbase_df.join(seq_dfs, how='left', on=self.index_col)
+        else:
+            logger.info('Missing sequence data because no "hairpin.fa" or "mature.fa" file were given.')
+
+        # mirbase_df = mirbase_df.explode(column='gene_name')
+        # mirbase_name["miRNA name"] = mirbase_name["miRNA name"].str.lower()
+        # mirbase_name["miRNA name"] = mirbase_name["miRNA name"].str.replace("-3p.*|-5p.*", "")
+
+        return mirbase_df
+
+    def load_sequences(self, fasta_file, index=None, keys=None, blocksize=None):
+        """
+        Args:
+            fasta_file:
+            index ():
+            keys ():
+            blocksize:
+        """
+        if hasattr(self, '_seq_df_dict') and fasta_file in self._seq_df_dict:
+            return self._seq_df_dict[fasta_file]
+
+        fa = Fasta(fasta_file, read_long_names=True, as_raw=True)
+        mirna_types = {'stem-loop', 'stem', 'type', 'loop'}
+
+        entries = []
+        for key, record in tqdm.tqdm(fa.items(), desc=str(fasta_file)):
+            attrs: List[str] = record.long_name.split(" ")
+
+            if attrs[-1] in mirna_types:
+                if attrs[-2] in mirna_types:
+                    mirna_type = intern(' '.join(attrs[-2:]))
+                    gene_name_idx = -3
+                else:
+                    mirna_type = intern(attrs[-1])
+                    gene_name_idx = -2
+            else:
+                mirna_type = None
+                gene_name_idx = -1
+
+            record_dict = {
+                "gene_id": attrs[0],
+                "mirbase_id": attrs[1],
+                "species": intern(" ".join(attrs[2:gene_name_idx])),
+                "gene_name": attrs[gene_name_idx],
+                "mirna_type": mirna_type,
+                SEQUENCE_COL: str(record),
+            }
+            if keys is not None and index:
+                if record_dict[index] not in keys:
+                    del record_dict
+                    continue
+
+            entries.append(record_dict)
+
+        df = pd.DataFrame(entries)
+        if index:
+            df = df.set_index(index)
+        # if blocksize:
+        #     df = dd.from_pandas(df, chunksize=blocksize)
+
+        if not hasattr(self, '_seq_df_dict'):
+            self._seq_df_dict = {}
+        self._seq_df_dict[fasta_file] = df
+
+        return df
+
+    def get_sequences(self,
+                      index="gene_name",
+                      omic=None,
+                      agg="all",
+                      **kwargs):
+        """
+        Args:
+            index:
+            omic:
+            agg:
+            **kwargs:
+        """
+        dfs = []
+        for filename in self.file_resources:
+            if filename.endswith('.fa'):
+                seq_df = self.load_sequences(self.file_resources[filename])
+                dfs.append(seq_df)
+        seq_df = pd.concat(dfs, axis=0)
+
+        seq_df = seq_df.groupby(index)[SEQUENCE_COL].agg(self.aggregator_fn(agg))
+
+        return seq_df
+
+
+class RNAcentral(SequenceDatabase):
+    """
+    Loads the RNAcentral database from https://rnacentral.org/ and provides a series of methods to extract sequence data from it.
+
+    Default path: https://ftp.ebi.ac.uk/pub/databases/RNAcentral/current_release/ .
+    Default file_resources: {
+        "rnacentral_rfam_annotations.tsv": "go_annotations/rnacentral_rfam_annotations.tsv.gz",
+        "database_mappings/gencode.tsv": "id_mapping/database_mappings/gencode.tsv",
+        "gencode.fasta": "sequences/by-database/gencode.fasta",
+        ...
+    }
+    """
+    COLUMNS_RENAME_DICT = {
+        'ensembl_gene_id': 'gene_id',
+        'external id': 'transcript_id',
+        'GO terms': 'go_id',
+        'gene symbol': 'gene_id',
+    }
+
+    def __init__(self, path="https://ftp.ebi.ac.uk/pub/databases/RNAcentral/current_release/", file_resources=None,
+                 col_rename=COLUMNS_RENAME_DICT, species_id: Union[List[str], str, None] = None,
+                 index_col="RNAcentral id", keys=None,
+                 remove_version_num=True, remove_species_suffix=True, **kwargs):
+        """
+        Provide
+
+        Args:
+            path ():
+            file_resources ():
+            col_rename ():
+            species_id ():
+            index_col ():
+            keys ():
+            remove_version_num ():
+            remove_species_suffix ():
+            **kwargs ():
+        """
+        self.species_id = species_id
+        self.remove_version_num = remove_version_num
+        self.remove_species_suffix = remove_species_suffix
+
+        if file_resources is None:
+            file_resources = {}
+            file_resources["rnacentral_rfam_annotations.tsv.gz"] = "go_annotations/rnacentral_rfam_annotations.tsv.gz"
+            file_resources["database_mappings/ensembl_gencode.tsv"] = "id_mapping/database_mappings/ensembl_gencode.tsv"
+            file_resources["database_mappings/mirbase.tsv"] = "id_mapping/database_mappings/mirbase.tsv"
+
+        super().__init__(path=path, file_resources=file_resources, col_rename=col_rename, index_col=index_col,
+                         keys=keys, **kwargs)
+
+    def load_dataframe(self, file_resources, blocksize=None):
+        """
+        Args:
+            file_resources:
+            blocksize:
+        """
+        transcripts_df = []
+        # Concatenate transcripts ids by combining `database_mappings/` files from multiple RNAcentral databases
+        for filename in (fname for fname in file_resources if "database_mappings" in fname):
+            args = dict(low_memory=True, header=None,
+                        names=["RNAcentral id", "database", "external id", "species_id", "RNA type", "gene symbol"],
+                        dtype={'gene symbol': 'str',
+                               'database': 'category', 'species_id': 'category', 'RNA type': 'category', })
+
+            if blocksize:
+                if filename.endswith('.tsv'):
+                    id_mapping: dd.DataFrame = dd.read_table(
+                        file_resources[filename], blocksize=None if isinstance(blocksize, bool) else blocksize, **args)
+                elif filename.endswith('.parquet'):
+                    id_mapping: dd.DataFrame = dd.read_parquet(
+                        file_resources[filename], blocksize=None if isinstance(blocksize, bool) else blocksize, )
+                else:
+                    id_mapping = None
+            else:
+                if filename.endswith('.tsv'):
+                    id_mapping = pd.read_table(file_resources[filename], **args)
+                elif filename.endswith('.parquet'):
+                    id_mapping = pd.read_parquet(file_resources[filename])
+                else:
+                    id_mapping = None
+
+            if id_mapping is None:
+                raise Exception("Must provide a file with 'database_mappings/(*).tsv' in file_resources")
+
+            # Filter by species
+            if isinstance(self.species_id, str):
+                id_mapping = id_mapping.where(id_mapping["species_id"] == self.species_id)
+            elif isinstance(self.species_id, Iterable):
+                id_mapping = id_mapping.where(id_mapping["species_id"].isin(self.species_id))
+
+            # Filter by index
+            if self.keys and id_mapping.index.name == self.index_col:
+                id_mapping = id_mapping.loc[id_mapping.index.isin(self.keys)]
+            elif self.keys and id_mapping.index.name != self.index_col:
+                id_mapping = id_mapping.loc[id_mapping[self.index_col].isin(self.keys)]
+
+            # Add species_id prefix to index values to match the sequence ids
+            if "RNAcentral id" in id_mapping.columns:
+                id_mapping["RNAcentral id"] = id_mapping["RNAcentral id"] + "_" + id_mapping["species_id"].astype(str)
+            elif "RNAcentral id" == id_mapping.index.name:
+                id_mapping.index = id_mapping.index + "_" + id_mapping["species_id"].astype(str)
+
+            # Add sequence column if a FASTA file provided for the database
+            fasta_filename = f"{filename.split('/')[-1].split('.')[0]}.fasta"
+            if fasta_filename in file_resources:
+                seq_df = self.load_sequences(file_resources[fasta_filename])
+                id_mapping = id_mapping.merge(seq_df, how='left',
+                                              left_on="RNAcentral id",
+                                              left_index=True if id_mapping.index.name == "RNAcentral id" else False,
+                                              right_index=True)
+            else:
+                logger.info(f"{fasta_filename} not provided for `{filename}` so missing sequencing data")
+
+            if self.remove_version_num and 'gene symbol' in id_mapping.columns:
+                id_mapping["gene symbol"] = id_mapping["gene symbol"].str.replace("[.].\d*", "", regex=True)
+            if self.remove_species_suffix:
+                id_mapping["RNAcentral id"] = id_mapping["RNAcentral id"].str.replace("_(\d*)", '', regex=True)
+
+            # Set index
+            args = dict(sorted=True) if blocksize else {}
+            id_mapping = id_mapping.set_index(self.index_col, **args)
+            if isinstance(id_mapping, dd.DataFrame) and not id_mapping.known_divisions:
+                id_mapping.divisions = id_mapping.compute_current_divisions()
+
+            transcripts_df.append(id_mapping)
+
+        # Concatenate multiple `database_mappings` files from different databases
+        if blocksize:
+            transcripts_df = dd.concat(transcripts_df, axis=0, interleave_partitions=True, join='outer')
+        else:
+            transcripts_df = pd.concat(transcripts_df, axis=0, join='outer')
+
+        # Join go_id and Rfams annotations to each "RNAcentral id" from 'rnacentral_rfam_annotations.tsv'
+        try:
+            transcripts_df = self.add_rfam_annotation(transcripts_df, file_resources, blocksize)
+        except Exception as e:
+            logger.warning(f"Failed to add Rfam annotations to transcripts_df:")
+            traceback.print_exc()
+
+        return transcripts_df
+
+    def add_rfam_annotation(self, transcripts_df: Union[pd.DataFrame, dd.DataFrame],
+                            file_resources, blocksize=None) -> Union[pd.DataFrame, dd.DataFrame]:
+        args = dict(low_memory=True, names=["RNAcentral id", "GO terms", "Rfams"])
+
+        if blocksize:
+            if 'rnacentral_rfam_annotations.tsv' in file_resources and isinstance(
+                file_resources['rnacentral_rfam_annotations.tsv'], str):
+                anns = dd.read_table(file_resources["rnacentral_rfam_annotations.tsv"], **args)
+            else:
+                anns = dd.read_table(file_resources["rnacentral_rfam_annotations.tsv.gz"], compression="gzip", **args)
+            anns = anns.set_index("RNAcentral id", sorted=True)
+
+            # Filter annotations by "RNAcentral id" in `transcripts_df`
+            anns = anns.loc[anns.index.isin(transcripts_df.index.compute())]
+
+            if not anns.known_divisions:
+                anns.divisions = anns.compute_current_divisions()
+
+            # Groupby on index
+            anns_groupby: dd.DataFrame = anns \
+                .groupby(by=lambda idx: idx) \
+                .agg({col: get_agg_func('unique', use_dask=True) for col in ["GO terms", 'Rfams']})
+
+        else:
+            anns = pd.read_table(file_resources["rnacentral_rfam_annotations.tsv"], index_col='RNAcentral id', **args)
+            idx = transcripts_df.index.compute() if isinstance(transcripts_df, dd.DataFrame) else transcripts_df.index
+            anns = anns.loc[anns.index.isin(set(idx))]
+            anns_groupby = anns.groupby("RNAcentral id").agg({col: 'unique' for col in ["GO terms", 'Rfams']})
+
+        transcripts_df = transcripts_df.merge(anns_groupby, how='left', left_index=True, right_index=True)
+        return transcripts_df
+
+    def load_sequences(self, fasta_file: str, index=None, keys=None, blocksize=None):
+        """
+        Args:
+            index ():
+            fasta_file:
+            keys ():
+            blocksize:
+        """
+        fa = Fasta(fasta_file, as_raw=True)
+
+        entries = []
+        for key, record in tqdm.tqdm(fa.items(), desc=str(fasta_file)):
+            id = re.sub("_(\d*)", '', key) if self.remove_species_suffix else key
+            if keys is not None and self.index_col == 'RNAcentral id' and id not in keys:
+                continue
+            desc = record.long_name.split(" ", maxsplit=1)[-1]
+
+            record_dict = {
+                'RNAcentral id': key,
+                'description': desc,
+                SEQUENCE_COL: str(record),
+            }
+
+            entries.append(record_dict)
+
+        df = pd.DataFrame(entries).set_index("RNAcentral id")
+
+        return df
+
+    def get_sequences(self,
+                      index="RNAcentral id",
+                      omic=None,
+                      agg="all",
+                      **kwargs):
+        """
+        Args:
+            index:
+            omic:
+            agg:
+            **kwargs:
+        """
+        dfs = []
+        for filename in self.file_resources:
+            if filename.endswith('.fa') or filename.endswith('.fasta'):
+                seq_df = self.load_sequences(self.file_resources[filename])
+                dfs.append(seq_df)
+        seq_df = pd.concat(dfs, axis=0)
+
+        seq_df = seq_df.groupby(index)[SEQUENCE_COL].agg(self.aggregator_fn(agg))
+
+        return seq_df