Diff of /edsnlp/data/pandas.py [000000] .. [cad161]

Switch to unified view

a b/edsnlp/data/pandas.py
1
from __future__ import annotations
2
3
import random
4
from typing import Any, Callable, Iterable, Optional, Union
5
6
import pandas as pd
7
from typing_extensions import Literal
8
9
from edsnlp import registry
10
from edsnlp.core.stream import Stream
11
from edsnlp.data.base import BaseWriter, MemoryBasedReader
12
from edsnlp.data.converters import get_dict2doc_converter, get_doc2dict_converter
13
from edsnlp.utils.collections import dl_to_ld, flatten, ld_to_dl
14
from edsnlp.utils.stream_sentinels import DatasetEndSentinel
15
from edsnlp.utils.typing import AsList
16
17
18
class PandasReader(MemoryBasedReader):
19
    DATA_FIELDS = ("data",)
20
21
    def __init__(
22
        self,
23
        data: pd.DataFrame,
24
        shuffle: Literal["dataset", False] = False,
25
        seed: Optional[int] = None,
26
        loop: bool = False,
27
    ):
28
        super().__init__()
29
        self.shuffle = shuffle
30
        seed = seed if seed is not None else random.getrandbits(32)
31
        self.rng = random.Random(seed)
32
        self.emitted_sentinels = {"dataset"}
33
        self.loop = loop
34
        self.data = data
35
        assert isinstance(data, pd.DataFrame)
36
37
    def read_records(self) -> Iterable[Any]:
38
        while True:
39
            data = self.data
40
            if self.shuffle == "dataset":
41
                data = data.sample(frac=1.0, random_state=self.rng.getrandbits(32))
42
            yield from dl_to_ld(dict(data))
43
            yield DatasetEndSentinel()
44
            if not self.loop:
45
                break
46
47
    def __repr__(self):
48
        return (
49
            f"{self.__class__.__name__}(data={object.__repr__(self.data)}, "
50
            f"shuffle={self.shuffle}, "
51
            f"loop={self.loop})"
52
        )
53
54
55
@registry.readers.register("pandas")
56
def from_pandas(
57
    data,
58
    converter: Optional[AsList[Union[str, Callable]]] = None,
59
    shuffle: Literal["dataset", False] = False,
60
    seed: Optional[int] = None,
61
    loop: bool = False,
62
    **kwargs,
63
) -> Stream:
64
    """
65
    The PandasReader (or `edsnlp.data.from_pandas`) handles reading from a table and
66
    yields documents. At the moment, only entities and attributes are loaded. Relations
67
    and events are not supported.
68
69
    Example
70
    -------
71
    ```{ .python .no-check }
72
73
    import edsnlp
74
75
    nlp = edsnlp.blank("eds")
76
    nlp.add_pipe(...)
77
    doc_iterator = edsnlp.data.from_pandas(df, nlp=nlp, converter="omop")
78
    annotated_docs = nlp.pipe(doc_iterator)
79
    ```
80
81
    !!! note "Generator vs list"
82
83
        `edsnlp.data.from_pandas` returns a
84
        [Stream][edsnlp.core.stream.Stream].
85
        To iterate over the documents multiple times efficiently or to access them by
86
        index, you must convert it to a list
87
88
        ```{ .python .no-check }
89
        docs = list(edsnlp.data.from_pandas(df, converter="omop"))
90
        ```
91
92
    Parameters
93
    ----------
94
    data: pd.DataFrame
95
        Pandas object
96
    shuffle: Literal["dataset", False]
97
        Whether to shuffle the data. If "dataset", the whole dataset will be shuffled
98
        before starting iterating on it (at the start of every epoch if looping).
99
    seed: Optional[int]
100
        The seed to use for shuffling.
101
    loop: bool
102
        Whether to loop over the data indefinitely.
103
    converter: Optional[AsList[Union[str, Callable]]]
104
        Converters to use to convert the rows of the DataFrame (represented as dicts)
105
        to Doc objects. These are documented on the [Converters](/data/converters) page.
106
    kwargs:
107
        Additional keyword arguments to pass to the converter. These are documented on
108
        the [Converters](/data/converters) page.
109
110
    Returns
111
    -------
112
    Stream
113
    """
114
115
    data = Stream(
116
        reader=PandasReader(
117
            data,
118
            shuffle=shuffle,
119
            seed=seed,
120
            loop=loop,
121
        )
122
    )
123
    if converter:
124
        for conv in converter:
125
            conv, kwargs = get_dict2doc_converter(conv, kwargs)
126
            data = data.map(conv, kwargs=kwargs)
127
    return data
128
129
130
class PandasWriter(BaseWriter):
131
    def __init__(self, dtypes: Optional[dict] = None):
132
        self.dtypes = dtypes
133
134
    def consolidate(self, items):
135
        columns = ld_to_dl(flatten(items))
136
        res = pd.DataFrame(columns)
137
        return res.astype(self.dtypes) if self.dtypes else res
138
139
140
@registry.writers.register("pandas")
141
def to_pandas(
142
    data: Union[Any, Stream],
143
    execute: bool = True,
144
    converter: Optional[Union[str, Callable]] = None,
145
    dtypes: Optional[dict] = None,
146
    **kwargs,
147
) -> pd.DataFrame:
148
    """
149
    `edsnlp.data.to_pandas` writes a list of documents as a pandas table.
150
151
    Example
152
    -------
153
    ```{ .python .no-check }
154
155
    import edsnlp
156
157
    nlp = edsnlp.blank("eds")
158
    nlp.add_pipe(...)
159
160
    doc = nlp("My document with entities")
161
162
    edsnlp.data.to_pandas([doc], converter="omop")
163
    ```
164
165
    Parameters
166
    ----------
167
    data: Union[Any, Stream],
168
        The data to write (either a list of documents or a Stream).
169
    dtypes: Optional[dict]
170
        Dictionary of column names to dtypes. This is passed to `pd.DataFrame.astype`.
171
    execute: bool
172
        Whether to execute the writing operation immediately or to return a stream
173
    converter: Optional[Union[str, Callable]]
174
        Converter to use to convert the documents to dictionary objects before storing
175
        them in the dataframe. These are documented on the
176
        [Converters](/data/converters) page.
177
    kwargs:
178
        Additional keyword arguments to pass to the converter. These are documented on
179
        the [Converters](/data/converters) page.
180
    """
181
    data = Stream.ensure_stream(data)
182
    if converter:
183
        converter, kwargs = get_doc2dict_converter(converter, kwargs)
184
        data = data.map(converter, kwargs=kwargs)
185
186
    return data.write(PandasWriter(dtypes), execute=execute)