--- a +++ b/openomics/transforms/df.py @@ -0,0 +1,142 @@ +from collections.abc import Iterable +from typing import Union, Dict, List, Tuple, Optional, Set + +import numpy as np +import pandas as pd +from dask import dataframe as dd +from logzero import logger + + +def has_iterables(series: Union[dd.Series, pd.Series], n=10) -> bool: + """ + Check whether any element in series is an Iterable, e.g. list, set, or np.ndarray. + Args: + series (): + n (int): number of elements to test + + Returns: + bool + """ + if isinstance(series, (dd.Series, dd.Index)): + try: + values = series.head(n=n, npartitions=-1) + except: + values = series.head(n=n) + elif isinstance(series, pd.Series): + values = series.head(n=n) + elif isinstance(series, pd.Index): + values = series[:n] + else: + return False + + is_iterables = values.map(lambda x: not isinstance(x, str) and isinstance(x, Iterable)).any() + return is_iterables + + +def match_iterable_keys(left: Union[dd.Series, pd.Series], right: Union[dd.Series, pd.Series]) \ + -> Tuple[dd.Series, dd.Series]: + left_iterables = has_iterables(left) + right_iterables = has_iterables(right) + + def _list_to_key(list_values: List[str], possibilities: Set[str]) -> Optional[str]: + if list_values is None: + return None + elif len(list_values) == 1: + return list_values[0] + + for key in list_values: + if key in possibilities: + return key + + def _list_to_list(list_values: List[str], multi_possibilities: List[Set[str]]) -> Optional[str]: + if list_values is None: + return None + elif len(list_values) == 1: + return list_values[0] + + for possibilities in multi_possibilities: + if isinstance(possibilities, set) and list_values is not None: + match = possibilities.intersection(list_values) + if len(match): + return "|".join(sorted(match)) + + left_on, right_on = left, right + + if left_iterables and not right_iterables: + possibilities = set(right.dropna()) if isinstance(right, (pd.Series, pd.Index)) else set( + right.dropna().compute()) + + left_on = left.map(lambda values: _list_to_key(values, possibilities)) + elif not left_iterables and right_iterables: + possibilities = set(left.dropna()) if isinstance(left, (pd.Series, pd.Index)) else set( + left.dropna().compute()) + + right_on = right.map(lambda values: _list_to_key(values, possibilities)) + elif left_iterables and right_iterables: + right_possibilities = right.map(lambda x: set(x) if isinstance(x, Iterable) and not isinstance(x, str) else x) + if isinstance(right_possibilities, (dd.Series, dd.Index)): + right_possibilities = right_possibilities.compute() + left_on = left.map(lambda values: _list_to_list(values, right_possibilities)) + + left_possibilities = left.map(lambda x: set(x) if isinstance(x, Iterable) and not isinstance(x, str) else x) + if isinstance(left_possibilities, (dd.Series, dd.Index)): + left_possibilities = left_possibilities.compute() + right_on = right.map(lambda values: _list_to_list(values, left_possibilities)) + + return left_on, right_on + + +def drop_duplicate_columns(df: Union[pd.DataFrame, dd.DataFrame]) -> Union[pd.DataFrame, dd.DataFrame]: + """ + Args: + df: + """ + if df.columns.duplicated().any(): + _, i = np.unique(df.columns, return_index=True) + df = df.iloc[:, i] + + return df + + +def filter_rows(df: pd.DataFrame, filters: Union[str, Dict[str, List]], uncased=False): + """ + + Args: + df (pd.DataFrame): + filters (str or dict): + Either a pandas query expression or a dict of column names for keys and matching values. + uncased (bool): Default False. + Whether to match case in pd.Series.str.contains if filters is a dict of values. + + Returns: + + """ + num_samples = df.shape[0] + if filters is None: + return df + + elif isinstance(filters, str): + df = df.query(filters) + + elif isinstance(filters, dict): + for col, values in filters.items(): + if col not in df.columns: + logger.warn("Filter key `", col, "` must be in one of ", df.columns) + continue + + if isinstance(values, list): + values = {val.upper() for val in values} + if uncased: + df = df.loc[df[col].str.upper().isin(values)] + else: + df = df.loc[df[col].isin(values)] + + elif isinstance(values, str): + df = df.loc[df[col].str.contains(values, case=not uncased)] + else: + df = df.loc[df[col] == values] + + if isinstance(num_samples, int): + logger.info(f'Removed {num_samples - df.shape[0]} rows from query: {filters}') + + return df