a b/tests/data/test_parquet.py
1
from itertools import islice
2
from pathlib import Path
3
4
import pyarrow.dataset
5
import pyarrow.fs
6
import pytest
7
from confit.utils.random import set_seed
8
from typing_extensions import Literal
9
10
import edsnlp
11
from edsnlp.data.converters import get_dict2doc_converter, get_doc2dict_converter
12
from edsnlp.utils.collections import dl_to_ld
13
14
15
def assert_doc_read(doc):
16
    assert doc._.note_id == "subfolder/doc-1"
17
    assert doc._.context_var == "test"
18
19
    attrs = ("etat", "assertion")
20
    spans_and_attributes = {
21
        "__ents__": sorted(
22
            [
23
                (e.start, e.end, e.text, tuple(getattr(e._, key) for key in attrs))
24
                for e in doc.ents
25
            ]
26
        ),
27
        **{
28
            name: sorted(
29
                [
30
                    (e.start, e.end, e.text, tuple(getattr(e._, key) for key in attrs))
31
                    for e in doc.spans[name]
32
                ]
33
            )
34
            for name in doc.spans
35
        },
36
    }
37
38
    assert spans_and_attributes == {
39
        "__ents__": [
40
            (6, 7, "douleurs", (None, None)),
41
            (7, 11, "dans le bras droit", (None, None)),
42
            (17, 21, "problème \nde locomotion", (None, "absent")),
43
            (25, 26, "AVC", ("passé", "non-associé")),
44
            (35, 36, "rhume", ("présent", "hypothétique")),
45
            (45, 46, "rhume", ("présent", "hypothétique")),
46
            (51, 52, "Douleurs", (None, None)),
47
            (52, 56, "dans le bras droit", (None, None)),
48
            (68, 69, "anomalie", (None, "absent")),
49
        ],
50
        "anatomie": [
51
            (9, 11, "bras droit", (None, None)),
52
            (54, 56, "bras droit", (None, None)),
53
        ],
54
        "localisation": [
55
            (7, 11, "dans le bras droit", (None, None)),
56
            (52, 56, "dans le bras droit", (None, None)),
57
        ],
58
        "pathologie": [
59
            (17, 21, "problème \nde locomotion", (None, "absent")),
60
            (25, 26, "AVC", ("passé", "non-associé")),
61
            (35, 36, "rhume", ("présent", "hypothétique")),
62
            (45, 46, "rhume", ("présent", "hypothétique")),
63
        ],
64
        "sosy": [
65
            (6, 7, "douleurs", (None, None)),
66
            (51, 52, "Douleurs", (None, None)),
67
            (68, 69, "anomalie", (None, "absent")),
68
        ],
69
    }
70
71
72
GOLD_OMOP = {
73
    "entities": [
74
        {
75
            "assertion": None,
76
            "end_char": 38,
77
            "etat": "test",
78
            "lexical_variant": "douleurs",
79
            "note_nlp_id": 0,
80
            "note_nlp_source_value": "sosy",
81
            "start_char": 30,
82
        },
83
        {
84
            "assertion": None,
85
            "end_char": 57,
86
            "etat": None,
87
            "lexical_variant": "dans le bras droit",
88
            "note_nlp_id": 1,
89
            "note_nlp_source_value": "localisation",
90
            "start_char": 39,
91
        },
92
        {
93
            "assertion": None,
94
            "end_char": 57,
95
            "etat": None,
96
            "lexical_variant": "bras droit",
97
            "note_nlp_id": 2,
98
            "note_nlp_source_value": "anatomie",
99
            "start_char": 47,
100
        },
101
        {
102
            "assertion": "absent",
103
            "end_char": 98,
104
            "etat": None,
105
            "lexical_variant": "problème \nde locomotion",
106
            "note_nlp_id": 3,
107
            "note_nlp_source_value": "pathologie",
108
            "start_char": 75,
109
        },
110
        {
111
            "assertion": "non-associé",
112
            "end_char": 117,
113
            "etat": "passé",
114
            "lexical_variant": "AVC",
115
            "note_nlp_id": 4,
116
            "note_nlp_source_value": "pathologie",
117
            "start_char": 114,
118
        },
119
        {
120
            "assertion": "hypothétique",
121
            "end_char": 164,
122
            "etat": "présent",
123
            "lexical_variant": "rhume",
124
            "note_nlp_id": 5,
125
            "note_nlp_source_value": "pathologie",
126
            "start_char": 159,
127
        },
128
        {
129
            "assertion": "hypothétique",
130
            "end_char": 296,
131
            "etat": "présent",
132
            "lexical_variant": "rhume",
133
            "note_nlp_id": 6,
134
            "note_nlp_source_value": "pathologie",
135
            "start_char": 291,
136
        },
137
        {
138
            "assertion": None,
139
            "end_char": 314,
140
            "etat": None,
141
            "lexical_variant": "Douleurs",
142
            "note_nlp_id": 7,
143
            "note_nlp_source_value": "sosy",
144
            "start_char": 306,
145
        },
146
        {
147
            "assertion": None,
148
            "end_char": 333,
149
            "etat": None,
150
            "lexical_variant": "dans le bras droit",
151
            "note_nlp_id": 8,
152
            "note_nlp_source_value": "localisation",
153
            "start_char": 315,
154
        },
155
        {
156
            "assertion": None,
157
            "end_char": 333,
158
            "etat": None,
159
            "lexical_variant": "bras droit",
160
            "note_nlp_id": 9,
161
            "note_nlp_source_value": "anatomie",
162
            "start_char": 323,
163
        },
164
        {
165
            "assertion": "absent",
166
            "end_char": 386,
167
            "etat": None,
168
            "lexical_variant": "anomalie",
169
            "note_nlp_id": 10,
170
            "note_nlp_source_value": "sosy",
171
            "start_char": 378,
172
        },
173
    ],
174
    "note_id": "subfolder/doc-1",
175
    "context_var": "test",
176
    "note_text": "Le patient est admis pour des douleurs dans le bras droit, mais "
177
    "n'a pas de problème \n"
178
    "de locomotion. \n"
179
    "Historique d'AVC dans la famille. pourrait être un cas de "
180
    "rhume.\n"
181
    "NBNbWbWbNbWbNBNbNbWbWbNBNbWbNbNbWbNBNbWbNbNBWbWbNbNbNBWbNbWbNbWBNbNbWbNbNBNbWb"
182
    "WbNbWBNbNbWbNBNbWbWbNb\n"
183
    "Pourrait être un cas de rhume.\n"
184
    "Motif :\n"
185
    "Douleurs dans le bras droit.\n"
186
    "ANTÉCÉDENTS\n"
187
    "Le patient est déjà venu\n"
188
    "Pas d'anomalie détectée.\n",
189
}
190
191
192
def assert_doc_write_omop(exported_obj):
193
    assert exported_obj == GOLD_OMOP
194
195
196
def assert_doc_write_ents(exported_objs):
197
    in_converter, kwargs = get_dict2doc_converter(
198
        "omop",
199
        dict(
200
            span_attributes=["etat", "assertion"],
201
            doc_attributes=["context_var"],
202
        ),
203
    )
204
    doc = in_converter(GOLD_OMOP, **kwargs)
205
    out_converter, kwargs = get_doc2dict_converter(
206
        "ents",
207
        dict(
208
            span_attributes=["etat", "assertion"],
209
            doc_attributes=["context_var"],
210
            span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"],
211
        ),
212
    )
213
    GOLD_ENTS = out_converter(doc, **kwargs)
214
    assert exported_objs == GOLD_ENTS
215
216
217
def test_read_write_in_worker(blank_nlp, tmpdir):
218
    input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet"
219
    output_dir = Path(tmpdir)
220
    edsnlp.data.read_parquet(
221
        input_dir,
222
        converter="omop",
223
        span_attributes=["etat", "assertion"],
224
        doc_attributes=["context_var"],
225
        read_in_worker=True,
226
    ).write_parquet(
227
        output_dir / "docs.parquet",
228
        converter="omop",
229
        doc_attributes=["context_var"],
230
        span_attributes=["etat", "assertion"],
231
        span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"],
232
        write_in_worker=True,
233
    )
234
    # fmt: off
235
    assert (
236
            list(dl_to_ld(pyarrow.dataset.dataset(output_dir / "docs.parquet").to_table().to_pydict()))  # noqa: E501
237
            == list(dl_to_ld(pyarrow.dataset.dataset(input_dir).to_table().to_pydict()))
238
    )
239
    # fmt: on
240
241
242
def test_read_to_parquet(blank_nlp, tmpdir, run_in_test_dir):
243
    output_dir = Path(tmpdir)
244
    fs = pyarrow.fs.LocalFileSystem()
245
    doc = list(
246
        edsnlp.data.read_parquet(
247
            "../resources/docs.parquet",
248
            converter="omop",
249
            span_attributes=["etat", "assertion"],
250
            doc_attributes=["context_var"],
251
            filesystem=fs,
252
        )
253
    )[0]
254
    assert_doc_read(doc)
255
    doc.ents[0]._.etat = "test"
256
257
    edsnlp.data.write_parquet(
258
        [doc],
259
        output_dir,
260
        converter="omop",
261
        doc_attributes=["context_var"],
262
        span_attributes=["etat", "assertion"],
263
        span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"],
264
    )
265
266
    assert_doc_write_omop(
267
        next(dl_to_ld(pyarrow.dataset.dataset(output_dir).to_table().to_pydict()))
268
    )
269
270
    with pytest.raises(FileExistsError):
271
        edsnlp.data.write_parquet(
272
            [doc],
273
            output_dir,
274
            converter="omop",
275
            doc_attributes=["context_var"],
276
            span_attributes=["etat", "assertion"],
277
            span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"],
278
        )
279
280
    edsnlp.data.write_parquet(
281
        [doc],
282
        output_dir,
283
        converter="omop",
284
        doc_attributes=["context_var"],
285
        span_attributes=["etat", "assertion"],
286
        span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"],
287
        overwrite=True,
288
    )
289
290
291
def test_read_to_parquet_ents(blank_nlp, tmpdir):
292
    input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet"
293
    output_dir = Path(tmpdir)
294
    fs = pyarrow.fs.LocalFileSystem()
295
    doc = list(
296
        edsnlp.data.read_parquet(
297
            input_dir,
298
            converter="omop",
299
            span_attributes=["etat", "assertion"],
300
            doc_attributes=["context_var"],
301
            filesystem=fs,
302
        )
303
    )[0]
304
    assert_doc_read(doc)
305
    doc.ents[0]._.etat = "test"
306
307
    edsnlp.data.write_parquet(
308
        [doc],
309
        output_dir,
310
        converter="ents",
311
        doc_attributes=["context_var"],
312
        span_attributes=["etat", "assertion"],
313
        span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"],
314
        num_rows_per_file=1024,  # deprecated but test for backward compatibility
315
    )
316
317
    assert_doc_write_ents(
318
        list(dl_to_ld(pyarrow.dataset.dataset(output_dir).to_table().to_pydict()))
319
    )
320
321
    with pytest.raises(FileExistsError):
322
        edsnlp.data.write_parquet(
323
            [doc],
324
            output_dir,
325
            converter="ents",
326
            doc_attributes=["context_var"],
327
            span_attributes=["etat", "assertion"],
328
            span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"],
329
        )
330
331
332
@pytest.mark.parametrize("num_cpu_workers", [0, 2])
333
@pytest.mark.parametrize("shuffle", ["dataset", "fragment"])
334
@pytest.mark.parametrize("shuffle_reader", [False, None])
335
def test_read_shuffle_loop(
336
    num_cpu_workers: int,
337
    shuffle: Literal["dataset", "fragment"],
338
    shuffle_reader: bool,
339
):
340
    input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet"
341
    notes = (
342
        edsnlp.data.read_parquet(input_dir, loop=True)
343
        .shuffle(batch_by=shuffle, seed=42, shuffle_reader=shuffle_reader)
344
        .map(lambda x: x["note_id"])
345
        .set_processing(num_cpu_workers=num_cpu_workers)
346
    )
347
    notes = list(islice(notes, 6))
348
    if not (num_cpu_workers > 1 and not shuffle_reader):
349
        assert notes == [
350
            "subfolder/doc-2",
351
            "subfolder/doc-1",
352
            "subfolder/doc-3",
353
            "subfolder/doc-3",
354
            "subfolder/doc-2",
355
            "subfolder/doc-1",
356
        ]
357
358
359
@pytest.mark.parametrize("num_cpu_workers", [0, 2])
360
@pytest.mark.parametrize("work_unit", ["record", "fragment"])
361
@pytest.mark.parametrize("shuffle", [False, "dataset", "fragment"])
362
def test_read_work_unit(
363
    num_cpu_workers,
364
    work_unit: Literal["record", "fragment"],
365
    shuffle: Literal[False, "dataset", "fragment"],
366
):
367
    if shuffle == "dataset" and work_unit == "fragment":
368
        pytest.skip("Dataset-level shuffle is not supported with fragment work unit")
369
    input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet"
370
    set_seed(42)
371
    stream = edsnlp.data.read_parquet(
372
        input_dir, work_unit=work_unit, shuffle=shuffle
373
    ).set_processing(
374
        num_cpu_workers=num_cpu_workers,
375
    )
376
    stream = stream.map_batches(
377
        lambda b: "|".join(sorted([x["note_id"] for x in b])), batch_size=1000
378
    )
379
    if work_unit == "fragment" and num_cpu_workers == 2 or num_cpu_workers == 0:
380
        assert list(stream) == ["subfolder/doc-1|subfolder/doc-2|subfolder/doc-3"]
381
    else:
382
        assert list(stream) == ["subfolder/doc-1|subfolder/doc-3", "subfolder/doc-2"]
383
384
385
@pytest.mark.parametrize(
386
    "num_cpu_workers,write_in_worker",
387
    [
388
        (0, False),
389
        (2, True),
390
        (2, False),
391
    ],
392
)
393
def test_write_parquet_fragment(tmpdir, num_cpu_workers, write_in_worker):
394
    input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.parquet"
395
    output_dir = Path(tmpdir)
396
    notes = edsnlp.data.read_parquet(input_dir, converter="omop")
397
    notes = notes.map_batches(
398
        lambda b: [y for x in b for y in (x,) * (4 if "doc-2" in x._.note_id else 1)]
399
    )
400
    notes = notes.set_processing(
401
        num_cpu_workers=num_cpu_workers,
402
        deterministic=True,  # by default
403
    )
404
    notes.write_parquet(
405
        output_dir,
406
        batch_size="fragment",
407
        converter=lambda x: {"note_id": x._.note_id},
408
        write_in_worker=write_in_worker,
409
    )
410
    input_ds = pyarrow.dataset.dataset(input_dir)
411
    inputs = [o["note_id"] for o in dl_to_ld(input_ds.to_table().to_pydict())]
412
    assert len(list(input_ds.get_fragments())) == 1
413
    assert inputs == ["subfolder/doc-1", "subfolder/doc-2", "subfolder/doc-3"]
414
    out_ds = pyarrow.dataset.dataset(output_dir)
415
    outs = [o["note_id"] for o in dl_to_ld(out_ds.to_table().to_pydict())]
416
    if write_in_worker and num_cpu_workers == 2:
417
        # Depending on the order in which the 2 workers produces the batches
418
        assert len(list(out_ds.get_fragments())) == 2
419
        assert outs == [
420
            "subfolder/doc-1",
421
            "subfolder/doc-3",
422
            "subfolder/doc-2",
423
            "subfolder/doc-2",
424
            "subfolder/doc-2",
425
            "subfolder/doc-2",
426
        ] or outs == [
427
            "subfolder/doc-2",
428
            "subfolder/doc-2",
429
            "subfolder/doc-2",
430
            "subfolder/doc-2",
431
            "subfolder/doc-1",
432
            "subfolder/doc-3",
433
        ]
434
    elif not write_in_worker and num_cpu_workers == 2:
435
        assert len(list(out_ds.get_fragments())) == 1
436
        assert outs == [
437
            "subfolder/doc-1",
438
            "subfolder/doc-2",
439
            "subfolder/doc-3",
440
            "subfolder/doc-2",
441
            "subfolder/doc-2",
442
            "subfolder/doc-2",
443
        ]
444
    else:  # simple case
445
        assert len(list(out_ds.get_fragments())) == 1
446
        assert outs == [
447
            "subfolder/doc-1",
448
            "subfolder/doc-2",
449
            "subfolder/doc-2",
450
            "subfolder/doc-2",
451
            "subfolder/doc-2",
452
            "subfolder/doc-3",
453
        ]