a b/openomics/transforms/df.py
1
from collections.abc import Iterable
2
from typing import Union, Dict, List, Tuple, Optional, Set
3
4
import numpy as np
5
import pandas as pd
6
from dask import dataframe as dd
7
from logzero import logger
8
9
10
def has_iterables(series: Union[dd.Series, pd.Series], n=10) -> bool:
11
    """
12
    Check whether any element in series is an Iterable, e.g. list, set, or np.ndarray.
13
    Args:
14
        series ():
15
        n (int): number of elements to test
16
17
    Returns:
18
        bool
19
    """
20
    if isinstance(series, (dd.Series, dd.Index)):
21
        try:
22
            values = series.head(n=n, npartitions=-1)
23
        except:
24
            values = series.head(n=n)
25
    elif isinstance(series, pd.Series):
26
        values = series.head(n=n)
27
    elif isinstance(series, pd.Index):
28
        values = series[:n]
29
    else:
30
        return False
31
32
    is_iterables = values.map(lambda x: not isinstance(x, str) and isinstance(x, Iterable)).any()
33
    return is_iterables
34
35
36
def match_iterable_keys(left: Union[dd.Series, pd.Series], right: Union[dd.Series, pd.Series]) \
37
    -> Tuple[dd.Series, dd.Series]:
38
    left_iterables = has_iterables(left)
39
    right_iterables = has_iterables(right)
40
41
    def _list_to_key(list_values: List[str], possibilities: Set[str]) -> Optional[str]:
42
        if list_values is None:
43
            return None
44
        elif len(list_values) == 1:
45
            return list_values[0]
46
47
        for key in list_values:
48
            if key in possibilities:
49
                return key
50
51
    def _list_to_list(list_values: List[str], multi_possibilities: List[Set[str]]) -> Optional[str]:
52
        if list_values is None:
53
            return None
54
        elif len(list_values) == 1:
55
            return list_values[0]
56
57
        for possibilities in multi_possibilities:
58
            if isinstance(possibilities, set) and list_values is not None:
59
                match = possibilities.intersection(list_values)
60
                if len(match):
61
                    return "|".join(sorted(match))
62
63
    left_on, right_on = left, right
64
65
    if left_iterables and not right_iterables:
66
        possibilities = set(right.dropna()) if isinstance(right, (pd.Series, pd.Index)) else set(
67
            right.dropna().compute())
68
69
        left_on = left.map(lambda values: _list_to_key(values, possibilities))
70
    elif not left_iterables and right_iterables:
71
        possibilities = set(left.dropna()) if isinstance(left, (pd.Series, pd.Index)) else set(
72
            left.dropna().compute())
73
74
        right_on = right.map(lambda values: _list_to_key(values, possibilities))
75
    elif left_iterables and right_iterables:
76
        right_possibilities = right.map(lambda x: set(x) if isinstance(x, Iterable) and not isinstance(x, str) else x)
77
        if isinstance(right_possibilities, (dd.Series, dd.Index)):
78
            right_possibilities = right_possibilities.compute()
79
        left_on = left.map(lambda values: _list_to_list(values, right_possibilities))
80
81
        left_possibilities = left.map(lambda x: set(x) if isinstance(x, Iterable) and not isinstance(x, str) else x)
82
        if isinstance(left_possibilities, (dd.Series, dd.Index)):
83
            left_possibilities = left_possibilities.compute()
84
        right_on = right.map(lambda values: _list_to_list(values, left_possibilities))
85
86
    return left_on, right_on
87
88
89
def drop_duplicate_columns(df: Union[pd.DataFrame, dd.DataFrame]) -> Union[pd.DataFrame, dd.DataFrame]:
90
    """
91
    Args:
92
        df:
93
    """
94
    if df.columns.duplicated().any():
95
        _, i = np.unique(df.columns, return_index=True)
96
        df = df.iloc[:, i]
97
98
    return df
99
100
101
def filter_rows(df: pd.DataFrame, filters: Union[str, Dict[str, List]], uncased=False):
102
    """
103
104
    Args:
105
        df (pd.DataFrame):
106
        filters (str or dict):
107
            Either a pandas query expression or a dict of column names for keys and matching values.
108
        uncased (bool): Default False.
109
            Whether to match case in pd.Series.str.contains if filters is a dict of values.
110
111
    Returns:
112
113
    """
114
    num_samples = df.shape[0]
115
    if filters is None:
116
        return df
117
118
    elif isinstance(filters, str):
119
        df = df.query(filters)
120
121
    elif isinstance(filters, dict):
122
        for col, values in filters.items():
123
            if col not in df.columns:
124
                logger.warn("Filter key `", col, "` must be in one of ", df.columns)
125
                continue
126
127
            if isinstance(values, list):
128
                values = {val.upper() for val in values}
129
                if uncased:
130
                    df = df.loc[df[col].str.upper().isin(values)]
131
                else:
132
                    df = df.loc[df[col].isin(values)]
133
134
            elif isinstance(values, str):
135
                df = df.loc[df[col].str.contains(values, case=not uncased)]
136
            else:
137
                df = df.loc[df[col] == values]
138
139
    if isinstance(num_samples, int):
140
        logger.info(f'Removed {num_samples - df.shape[0]} rows from query: {filters}')
141
142
    return df